"""Implement custom trading-day and weekly business date calendars
Copyright 2022, Terence Lim
MIT License
"""
from typing import Iterable, List, Dict, Mapping, Any, Callable, Tuple
import numpy as np
import pandas as pd
from pandas import DataFrame, Series, Timestamp, DatetimeIndex
import pandas_datareader as pdr
from pandas.tseries.holiday import USFederalHolidayCalendar
from pandas.api.types import is_list_like
from pandas.tseries.offsets import MonthEnd
from sqlalchemy import Column, Integer
from datetime import datetime
from finds.database.sql import SQL
_VERBOSE = 0
# List of anticipated NYSE holidays
_hols = [20230102, 20230116, 20230220, 20230407, 20230529,
20230619, 20230704, 20230904, 20231123, 20231225,
20240101, 20240115, 20240219, 20240329, 20240527,
20240619, 20240704, 20240902, 20241128, 20241225,
20250101, 20250129, 20250217, 20250418, 20250526,
20250619, 20250704, 20250901, 20251127, 20251225]
_MAXDATE = (max(_hols) // 10000) * 10000 + 1231 # last year with known hols
def _map(func, dates, *args, **kwargs) -> List:
"""Helper function to apply a func to each date in list
Args:
func: function to apply
dates: int dates to apply func on
*args: list of optional arguments
**kwargs: list of keyword arguments
Notes:
Func is applied to unique dates, then values copies in order
Examples:
>>> _map(busday.offset, [20190526, 20190528], -2)
>>> _map(busday.begmo, [20190526, 20190528], -2)
>>> _map(busday.endmo, [20190526, 20190528], -2)
"""
values = {d: func(d, *args, **kwargs) for d in np.unique(dates)}
return [values[d] for d in dates]
[docs]class BusDay:
"""Implement custom business-day and weekly dates calendar
Args:
sql: SQL connection instance to store dates
hols: List of expected future holidays
start: Start date of calendar
end: End date of calendar
endweek: Ending day of week for weekly calendar: 0-6 or 'Mon'-'Sun'
new: Load trading days from French library (True), else local database
Attributes:
weeks : DataFrame of weeks in rows, index is weeknum and
columns for beg, end dates and ismonthend indicator
Notes:
- Non-trading holidays inferred from Ken French and NYSE websites
- Earliest French daily factors is 19260701, except STRev is 19260126
"""
def __init__(self,
sql: SQL,
start: int = 19251231,
end: int = _MAXDATE,
hols: List[int] = _hols,
endweek: int | str = 0,
new: bool = False,
verbose: int = _VERBOSE):
"""Create or retrieve custom trading dates calendar"""
self.sql = sql
self.table = sql.Table('busdates',
Column('date', Integer, primary_key=True))
self.sql.create_all()
if new: # load 'F-F_Research_Data_Factors_daily' using pandas reader
f = pdr.data.DataReader(name='F-F_ST_Reversal_Factor_daily',
data_source='famafrench',
start=1900, end=2050)[0]\
.index.sort_values().unique()
df = DataFrame({'date': BusDay.to_date(f.astype(str), '%Y-%m-%d')})
sql.load_dataframe('busdates', df)
else: # else load from local database
df = sql.read_dataframe('SELECT * FROM busdates')
# 1. Initially, actual dates = actual FamaFrench busdays
dates = pd.DatetimeIndex(sorted(list(df['date'].unique().astype(str))))
last = BusDay.to_datetime(df.iloc[-1]['date'])
if verbose:
print('Last FamaFrench Date', last)
# 2. Extend with pandas 5-day calendar from last through to maxdate
dates = dates.append(pd.date_range(last,
end=pd.to_datetime(str(end)),
freq='B')[1:])
# 3. But remove list of anticipated NYSE holidays
hols = pd.to_datetime(hols, format='%Y%m%d')
dates = sorted(set(dates).difference(set(hols)))
# 4. Now list of all potential busdays from pandas 6-day calendar
freq = pd.offsets.CDay(calendar=np.busdaycalendar('1111110'),
normalize=True)
alldates = set(pd.date_range(dates[0], dates[-1], freq=freq))
# 5. Set actual holidays = all potential dates less actual dates
hols = np.array(list(alldates.difference(dates)),
dtype='datetime64[D]')
hols = sorted(set(hols).union([np.datetime64('1926-01-01')]))
# 6. Finalize custom cal and offsets as 6-day week less actual holidays
self._busdaycal = np.busdaycalendar(weekmask='1111110', holidays=hols)
self._customcal = pd.offsets.CDay(calendar=self._busdaycal)
self._begmocal = pd.offsets.CBMonthBegin(calendar=self._busdaycal)
self._endmocal = pd.offsets.CBMonthEnd(calendar=self._busdaycal)
"""Derive weekly trading calendar, ending on given day of week"""
dates = pd.date_range(self.datetime(19251231),
self.datetime(end),
freq=self._customcal)
# parse specified day-of-week
if isinstance(endweek, int):
endweek = ['Sun','Mon','Tue','Wed','Thu','Fri','Sat'][endweek % 7]
if not endweek.startswith('W-'):
endweek = 'W-' + endweek.upper()[:3]
# generate weekly calender end dates
weekly_end = pd.date_range(self.datetime(start),
self.datetime(end),
freq=endweek)
# require start(exclusve) and end(inclusive), and weekly end in dates
dates = dates[(dates > weekly_end[0]) & (dates <= weekly_end[-1])]
weeks = pd.Series(np.searchsorted(weekly_end, dates),
index=dates.astype(str).str.replace('-', ''))
# determine beg and end business date of each week, and save
g = weeks.index.astype(int).groupby(weeks)
self.weeks = DataFrame.from_dict(
{k: {'beg': min(v), 'end': max(v)}
for k,v in enumerate(g.values())}, orient='index')
m = (self.weeks['end'] // 100) % 100
self.weeks['ismonthend'] = m != m.shift(-1) # last week fully in month
self.weeks.index.name = 'numwk'
self.freq = endweek
[docs] def datetime(self,
date: Any | Iterable[Any],
month: int | None = None,
day: int | None = None) -> datetime | List[datetime]:
"""Convert int date or pandas timestamp to pydatetime type
Args:
date: Input date or year or pandas Timestamp or datetime64
month: If None, then month is inferred from input date
day: If None, then day is inferred from input date
Returns:
datetime object or list-like with same shape as input date
Notes:
- If input day is 00, then returns custom business month begin date
- If input day is 99, then returns custom business month end date
"""
def _datetime(date, month, day):
if hasattr(date, 'to_pydatetime'): # is pandas timestamp
return date.to_pydatetime() # or datetimeindex
try:
return datetime(self.year(date),
self.month(date) if month is None else month,
self.day(date) if day is None else day)
except: # catch if date is out of range of datetime.datetime
try: # if DD + 1 is <= 99, so return custom month begin
return (datetime(date, # // 100,
month=month,
day=(day or self.day(date))+1)
+ (0 * self._begmocal)).to_pydatetime()
except: # else must be DD == 99, so return custom month end
return (datetime(date, # // 100,
month=month,
day=1)\
+ (0 * self._endmocal)).to_pydatetime()
if is_list_like(date):
return [self.datetime(d) for d in date]
else:
return _datetime(date, month, day)
[docs] def offset(self,
dates: int | List[int],
offsets: int = 0,
end: int | None = None,
roll: str = 'preceding') -> int | List[int]:
"""Return valid business date with optional offset or roll treatment
Args:
dates: Input dates in YYYYMMDD int format
offsets: Number of business days to offset
end: End index of offset window, None to return a single date
roll: How to treat dates that are not a valid day, in
{'raise', 'forward', 'following', 'backward', 'preceding'}
"""
if end: # return all dates in window [left, right] around {date}
if is_list_like(dates):
return _map(self.offset, dates, offsets, end, roll=roll)
return [self.offset(dates, offsets=d, roll=roll)
for d in np.arange(offsets, end + 1)]
if is_list_like(dates):
return _map(self.offset, dates, offsets, end, roll)
date = np.busday_offset(dates=np.array([self.datetime(dates)],
dtype='datetime64[D]'),
offsets=offsets,
roll=roll,
busdaycal=self._busdaycal)
return int(pd.to_datetime(date).strftime('%Y%m%d')[0])
[docs] def date_range(self, start: int, end: int,
freq: str | int = 'daily') -> List[int]:
"""Return business dates at desired freq between beg and end dates
Args:
start: Inclusive start of date range
end: Inclusive end of date range
freq: If int, then annual ending on month-end business date, else in
{('d')aily, ('b')egmo, ('e')ndmo, ('w')eekly, ('q')uarterly
"""
try:
month = int(freq) # annually as of calendar month end
dates = pd.DatetimeIndex(
[self.datetime(self.year(d), month, 99)
for d in range(self.year(start), self.year(end) + 1)])
except:
match freq[:1].lower():
case 'a': # same as 12
dates = pd.DatetimeIndex(
[self.datetime(self.year(d), month, 99)
for d in range(self.year(start), self.year(end) + 1)])
case 'w': # custom weekly
return self.weeks['end']\
.iloc[self._numwk(start) : self._numwk(end) + 1]\
.to_list()
case "d": # custom business daily
dates = pd.date_range(start=self.datetime(date=start),
end=self.datetime(end),
freq=self._customcal)
case "b": # custom business month begin
dates = pd.date_range(start=self.datetime(date=start, day=1),
end=str(self.endmo(end)),
freq=self._begmocal)
case 'e': # custom business month end
dates = pd.date_range(start=self.datetime(date=start, day=1),
end=str(self.endmo(end)),
freq=self._endmocal)
case 'm': # custom business month end
dates = pd.date_range(start=self.datetime(date=start, day=1),
end=str(self.endmo(end)),
freq=self._endmocal)
case "q": # custom business quarter end
start = BusDay.to_quarterend(start)
dates = pd.date_range(start=self.datetime(date=start, day=1),
end=str(self.endmo(end)),
freq=self._endmocal)[::3]
case 'd':
dates = pd.DatetimeIndex([str(start), str(end)])
case _:
raise Exception("invalid freq")
return list(dates.strftime('%Y%m%d').astype(int))
[docs] def date_tuples(self, dates: List[int]) -> List[Tuple[int, int]]:
"""Return (beg, end) holding period between rebalance dates"""
dates = sorted(dates)
return [(self.offset(beg, offsets=1), self.offset(end, offsets=0))
for beg, end in zip(dates[:-1], dates[1:])]
[docs] @staticmethod
def to_datetime(arg: Any,
format: str = '%Y%m%d',
**kwargs) -> Timestamp | DatetimeIndex:
"""Wraps pd.to_datetime to convert string to pandas TimeStamp format"""
return pd.to_datetime(arg, format=format, **kwargs)
[docs] @staticmethod
def to_date(dates: Any,
format: str = '%Y-%m-%d') -> int | List[int]:
"""Construct int date from strings using input and output formats
Args:
date: Input date strings or Timestamp or pydatetime to convert
format: Optional format of input date string
Returns:
int dates with year, month, date components according to outformat
Formats specified as in strptime() and strftime():
- %b %B %h = input month name
- %F = %Y-%m-%d
- %T = %H:%M:%S
- %n = whitespace
- %w %W %U %V = week number
- %u = day of week (1-7)
Examples:
>>> to_date('12-31-1999', format='%m-%d-%Y')
>>> to_date(['1999-01-01', '1999-12-31'])
>>> int(datetime.strptime(str(19991231), '%Y%m%d').strftime('%Y%m%d'))
"""
if is_list_like(dates):
return [BusDay.to_date(s, format) for s in dates]
if not hasattr(dates, 'strftime'): # not already datetime or Timestamp
dates = datetime.strptime(str(dates), format)
return int(dates.strftime('%Y%m%d'))
return int(dates.strftime('%Y%m%d'))
[docs] @staticmethod
def to_monthend(dates: int | Iterable[int]) -> int | List[int]:
"""Return calendar monthend date given an int date or list
Args:
dates: input YYYYMMDD (or first 4-, 6-digits) int date, or list
Returns:
Output dates converted to monthend of calendar
"""
if is_list_like(dates):
return [BusDay.to_monthend(d) for d in dates]
if dates <= 9999:
dt = datetime(year=dates, month=12, day=1) + MonthEnd(0)
elif dates <= 999999:
dt = datetime(year=dates//100, month=dates%100, day=1) + MonthEnd(0)
else:
dt = BusDay.to_datetime(dates) + MonthEnd(0)
return int(dt.strftime('%Y%m%d'))
[docs] @staticmethod
def to_quarterend(dates: int | Iterable[int]) -> int | List[int]:
"""Return calendar monthend date given an int date or list
Args:
dates: input YYYYMMDD (or first 4-, 6-digits) int date, or list
Returns:
Output dates converted to quarterend of calendar
"""
if is_list_like(dates):
return [BusDay.to_quarterend(d) for d in dates]
if dates <= 9999:
dt = Busday.to_quarterend(dates*10000 + 1201)
elif dates <= 999999:
dt = BusDay.to_quarterend(dates*100 + 1)
else:
month = ((BusDay.month(dates) + 2) // 3) * 3 # round up to quarter
dt = datetime(year=dates // 10000, month=month, day=1) + MonthEnd(0)
return int(dt.strftime('%Y%m%d'))
[docs] @staticmethod
def year(date: int | Iterable[int]) -> int | List[int]:
"""Helper to extract int years from input date or list"""
if is_list_like(date):
return _map(BusDay.year, date)
while date > 9999: # input date may be missing month and day
date //= 100
return date
[docs] @staticmethod
def month(date: int | Iterable[int]) -> int | List[int]:
"""Helper to extract int months from input date or list"""
if is_list_like(date):
return _map(BusDay.month, date)
while date > 999999: # input date may be missing day
date //= 100
return date % 100
[docs] @staticmethod
def day(date: int | Iterable[int]) -> int | List[int]:
"""Helper to extract int day of month from input date or list"""
if is_list_like(date):
return _map(BusDay.day, date)
return date % 100 # date may be missing year or month
[docs] @staticmethod
def today() -> int:
"""Return today's int date"""
return BusDay.to_date(datetime.now())
[docs] def endqr(self, date: int | List[int], quarters: int = 0) -> int | List[int]:
"""Return (list of) business month end date, optional months offset"""
return (_map(self.endmo, date, quarters) if is_list_like(date) else
int((self.datetime(date, ((BusDay.month(date) + 2)//3)*3, day=1)
+ (0 * self._endmocal)
+ (quarters * 3 * self._endmocal)).strftime('%Y%m%d')))
[docs] def endmo(self, date: int | List[int], months: int = 0) -> int | List[int]:
"""Return (list of) business month end date, optional months offset"""
return (_map(self.endmo, date, months) if is_list_like(date) else
int((self.datetime(date, day=1)
+ (0 * self._endmocal)
+ (months * self._endmocal)).strftime('%Y%m%d')))
[docs] def begmo(self, date: int | List[int], months: int = 0) -> int | List[int]:
"""Return (list of) business month begin date, optional months offset"""
return (_map(self.begmo, date, months) if is_list_like(date) else
int((self.datetime(date, day=1)
+ (0 * self._begmocal)
+ (months * self._begmocal)).strftime('%Y%m%d')))
[docs] def endyr(self, date: int | List[int], years: int = 0) -> int | List[int]:
"""Return (list of) business year end date, optional years offset"""
return (_map(self.endyr, date, years) if is_list_like(date) else
int((self.datetime(date, 12, 1)
+ (0 * self._endmocal)
+ (12 * years * self._endmocal)).strftime('%Y%m%d')))
[docs] def begyr(self, date: int | List[int], years: int = 0) -> int | List[int]:
"""Return (list of) business year begin date, optional years offset"""
return (_map(self.begyr, date, years) if is_list_like(date) else
int((self.datetime(date, 1, 1)
+ (0 * self._begmocal)
+ (12 * years * self._begmocal)).strftime('%Y%m%d')))
def _numwk(self, dates):
"""Return index number of weeks matching to input dates"""
return np.searchsorted(self.weeks['end'], dates)
[docs] def begwk(self, date = int | List[int], weeks: int = 0) -> int | List[int]:
"""Return beginning business week dates, with optional offset"""
dates = self.weeks['beg'].iloc[self._numwk(date) + weeks]
return list(dates) if is_list_like(dates) else dates
[docs] def endwk(self, date = int | List[int], weeks: int = 0) -> int | List[int]:
"""Return ending business week dates, with optional offset"""
dates = self.weeks['end'].iloc[self._numwk(date) + weeks]
return list(dates) if is_list_like(dates) else dates
[docs] def ismonthend(self, date: int | List[int]) -> int | List[int]:
"""If dates is in last complete week in a month"""
dates = self.weeks['ismonthend'].iloc[self._numwk(date)]
return list(dates) if is_list_like(dates) else dates
[docs] def december_fiscal(self, dates: int | List[int]) -> int | List[int]:
"""Return (list of) Fama-French December fiscal year-end date/s"""
if is_list_like(dates):
return _map(self.december_fiscal, dates)
return self.endyr(dates, years=(self.month(dates) >= 6) - 2)
[docs] def june_universe(self, dates: int | List[int]) -> int | List[int]:
"""Return (list of) Fama-French June universe selection date/s"""
if is_list_like(dates):
return _map(self.june_universe, dates)
june = self.endmo(self.endyr(dates), months=-6)
return self.endmo(june, months =-12 * (dates < june))
if __name__ == "__main__":
from finds.database import SQL
from secret import credentials
VERBOSE = 1
sql = SQL(**credentials['sql'], verbose=VERBOSE)
# Create business date calendar from French research data library
bd = BusDay(sql, new=True, verbose=VERBOSE)
def test_to_monthend():
print(BusDay.to_monthend(1999))
print(BusDay.to_monthend(199901))
print(BusDay.to_monthend(19990101))
print(BusDay.to_monthend([19991231, 199901]))
def test_to_date():
print(BusDay.to_date('12-31-1999', format='%m-%d-%Y'))
print(BusDay.to_date(['1999-01-01', '1999-12-31']))
def test_daily():
sql = SQL(**credentials['sql'], verbose=VERBOSE)
bd = BusDay(sql)
print(bd.offset(19990101, 0))
print(bd.offset(19991231, -2, 3))
print(bd.offset([19991231, 19990101, 19991231], -2, 3))
print(BusDay.to_date(datetime(1999, 12, 31)))
print(bd.december_fiscal(20100331))
print(bd.december_fiscal(20100831))
print(bd.june_universe(20100331))
print(bd.june_universe(20100831))
print(bd.date_tuples([20220131, 20220228, 20220331, 20220430]))
def test_weekly():
print(bd.weeks)
print(bd.begwk([20220609, 20220601]))
print(bd.endwk([20220609, 20220601]))
print(bd.date_range(20210603, 20220610, freq='weekly'))
print(bd.date_tuples([20220520, 20220527, 20220603, 20220610]))
sql = SQL(**credentials['sql'])
bd = BusDay(sql)
test_to_date()
test_to_monthend()
test_daily()
test_weekly()
print(bd.date_range(19251231, 19260131))