"""Stocks subclass for defining stocks datasets
Copyright 2022, Terence Lim
MIT License
"""
from typing import Dict, List, Tuple, Any
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from pandas.api.types import is_list_like, is_integer_dtype
from sqlalchemy import Table
from finds.database.sql import SQL
from finds.database.redisdb import RedisDB
from .structured import Structured
from .busday import BusDay
_VERBOSE = 1
[docs]class Stocks(Structured):
"""Provide interface to structured stock price datasets"""
def __init__(self,
sql: SQL,
bd: BusDay,
tables: Dict[str, Table],
identifier: str, name: str,
rdb: RedisDB | None = None,
verbose: int = _VERBOSE):
"""Initialize a connection to Stocks structured datasets"""
super().__init__(sql, bd, tables, identifier=identifier, name=name,
rdb=rdb, verbose=verbose)
[docs] def get_series(self,
permnos: int | str | List[str | int],
field: str,
dataset: str = 'daily',
date_field: str = 'date',
beg: int = 19000000,
end: int = 29001231) -> DataFrame | Series:
"""Return time series of a field for multiple permnos as DataFrame
Args:
permnos: Identifiers to filter
dataset: Name of dataset to retrieve from, default `daily`
field: Name of column to extract, e.g. 'ret'
beg: Inclusive start date (YYYYMMDD)
end: Inclusive end date (YYYYMMDD)
Returns:
DataFrame indexed by date with permnos in columns
"""
assert self[dataset] is not None
if isinstance(permnos, (int, str)):
permnos = [permnos]
q = ("SELECT {date_field}, {permno}, {field} "
" FROM {table}"
" WHERE {date_field} >= {beg} AND {date_field} <= {end} "
" AND {permno} IN ('{permnos}')").format(
permno=self.identifier,
field=field,
date_field=date_field,
table=self[dataset].key,
beg=int(beg),
end=int(end),
permnos="', '".join([str(p) for p in permnos]))
return self.sql.read_dataframe(q)\
.pivot(index='date',
columns=self.identifier,
values=field)[permnos].sort_index()
"""
q = ("SELECT {date_field}, {field}"
" FROM {table}"
" WHERE {date_field} >= {beg} AND {date_field} <= {end} "
" AND {permno} = '{permnos}'").format(
permno=self.identifier,
field=field,
date_field=date_field,
table=self[dataset].key,
beg=int(beg),
end=int(end),
permnos=permnos)
self._print('(get_series single)', q)
return self.sql.read_dataframe(q).set_index(date_field)[field]\
.sort_index().rename(permnos)
else:
q = ("SELECT {date_field}, {permno}, {field} "
" FROM {table}"
" WHERE {date_field}>={beg} AND {date_field}<={end}").format(
permno=self.identifier,
field=field,
date_field=date_field,
table=self[dataset].key,
beg=int(beg),
end=int(end))
#self._print('(get_series many)', q)
return self.sql.read_dataframe(q)\
.pivot(index='date',
columns=self.identifier,
values=field)[permnos].sort_index()
"""
[docs] def get_ret(self,
beg: int,
end: int,
dataset: str = 'daily',
field: str = 'ret',
date_field: str = 'date',
cache_mode: str = 'rw') -> Series:
"""Compounded returns between beg and end dates of all stocks
Args:
beg: Inclusive start date (YYYYMMDD)
end: Inclusive end date (YYYYMMDD)
dataset: Name of dataset to retrieve, default is `daily`
field: Name of returns field
date_field: Name of date field
cache_mode: 'r' to try read from cache first, 'w' to write to cache
Series:
DataFrame with prod(min_count=1) of returns in column `ret`,
with rows indexed by permno
"""
rkey = self.rdb.prefix + "_".join([field, str(self), str(beg), str(end)])
if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(rkey):
self._print('(get_ret load)', rkey)
return self.rdb.load(rkey)[field] # use cache
q = ("SELECT {field}, {identifier} FROM {table} "
" WHERE date >= {beg} AND date <= {end}").format(
table=self[dataset].key,
field=field,
identifier=self.identifier,
beg=beg,
end=end)
self._print('(get_ret)', q)
df = self.sql.read_dataframe(q).sort_values(self.identifier)
# compute compounded returns
df[field] += 1
df = (df.groupby(self.identifier).prod(min_count=1)-1).dropna()
if 'w' in cache_mode and self.rdb and beg != end: # if write cache
self._print('(get_ret dump)', rkey)
self.rdb.dump(rkey, df)
return df[field]
[docs] def get_compounded(self,
periods: List[Tuple[int, int]],
permnos: List[int],
field: str = 'ret',
cache_mode: str = "rw") -> DataFrame:
"""Compound returns within list of periods, for given permnos
Args:
periods: Tuples of inclusive begin and end dates of returns period
permnos: List of permnos
cache_mode: 'r' to try read from cache first, 'w' to write to cache
Returns:
DataFrame of compounded returns in rows, for permnos in cols
"""
# accumulate horizontally, then finally transpose
r = DataFrame(index=permnos)
for beg, end in periods:
r[end] = self.get_ret(beg, end, field=field, cache_mode=cache_mode)\
.reindex(permnos)
return r.transpose()
[docs] def cache_ret(self,
dates: List[Tuple[int, int]],
replace: bool,
dataset: str,
field: str = 'ret',
date_field: str ='date'):
"""Pre-generate compounded returns for redis store"""
assert self.rdb is not None
q = ("SELECT {field}, {identifier}, {date_field} FROM {table} "
" WHERE {date_field} >= {beg} "
" AND {date_field} <= {end}").format(
table=self[dataset].key,
field=field,
identifier=self.identifier,
date_field=date_field,
beg=dates[0][0],
end=dates[-1][-1])
self._print('(cache_ret)', q)
rets = self.sql.read_dataframe(q).sort_values(self.identifier)
rets[field] += 1
for beg, end in dates:
rkey = self.rdb.prefix + "_".join([field, str(self), str(beg), str(end)])
if not replace and self.rdb.redis.exists(rkey):
self._print('(cache_ret exists)', rkey)
continue
df = rets[rets['date'].ge(beg) & rets['date'].le(end)]\
.drop(columns='date')
df = (df.groupby(self.identifier).prod(min_count=1) - 1).dropna()
self._print('(cache_ret dump)', rkey, beg, end, len(df))
self.rdb.dump(rkey, df)
[docs] def get_window(self,
field: str,
permnos: List[Any],
dates: List[int],
left: int,
right: int,
dataset: str = 'daily',
date_field: str = 'date',
avg: bool = False) -> DataFrame:
"""Retrieve field values for permnos in window centered around dates
Args:
field: Name of field to retrieve
permnos: List of identifiers to retrieve
date_field: Name of date field in database
dates : List of corresponding dates of center of event window
left : Relative (inclusive) offset of start of event window
right : Relative (inclusive) offset of end of event window
dataset: Name of dataset, default 'daily'
Returns:
DataFrame columns [0:(right-left)] of field values in event window
"""
dates = list(dates)
permnos = list(permnos)
if avg:
# Generate and save dates to sql temp
df = DataFrame({'a': self.bd.offset(dates, left),
'b': self.bd.offset(dates, right),
self.identifier: permnos},
index=np.arange(len(dates)))
self.sql.load_dataframe(table=self.sql._t,
df=df,
index_label='n',
replace=True)
if is_integer_dtype(df[self.identifier].dtype):
q = f"CREATE INDEX a on {self.sql._t} ({self.identifier},a,b)"
self.sql.run(q)
q = f"CREATE INDEX b on {self.sql._t} ({self.identifier},b,a)"
self.sql.run(q)
# join on (permno, date) and retrieve from target table
q = ("SELECT {temp}.n, "
" AVG({field}) as {field} FROM {temp} LEFT JOIN {table}"
" ON {temp}.{identifier} = {table}.{identifier} "
" WHERE {table}.{date_field} >= {temp}.a "
" AND {table}.{date_field} <= {temp}.b"
" GROUP BY {temp}.n").format(
temp=self.sql._t,
identifier=self.identifier,
field=field,
date_field=date_field,
table=self[dataset].key)
df = self.sql.read_dataframe(q).drop_duplicates(subset=['n'])\
.set_index('n')
result = DataFrame({'permno': permnos, 'date': dates},
index=np.arange(len(dates)))\
.join(df, how='left')
else:
# Generate and save dates to sql temp
cols = ["day" + str(i) for i in range(1 + right - left)]
df = DataFrame(data=self.bd.offset(dates, left, right),
columns=cols)
df[self.identifier] = permnos
self.sql.load_dataframe(self.sql._t, df, replace=True)
# Loop over each date, and join as columns of result
result = DataFrame({'permno': permnos, 'date': dates})
for col in cols:
# create index on date to speed up join with target table
if is_integer_dtype(df[self.identifier].dtype):
q = "CREATE INDEX {col} on {temp} ({ident}, {col})".format(
temp=self.sql._t, ident=self.identifier, col=col)
self.sql.run(q)
# join on (permno, date) and retrieve from target table
q = ("SELECT {temp}.{identifier}, {field}"
" FROM {temp} LEFT JOIN {table}"
" ON {table}.{identifier} = {temp}.{identifier} "
" AND {table}.{date_field} = {temp}.{col}").format(
temp=self.sql._t,
identifier=self.identifier,
field=field,
date_field=date_field,
table=self[dataset].key,
col=col)
df = self.sql.read_dataframe(q)
# left join, so assume same order
result[col] = df[field].values
self.sql.run('drop table if exists ' + self.sql._t)
result.columns = [int(c[3:]) if c.startswith('day') else c
for c in result.columns]
return result.reset_index(drop=True)
[docs] def get_many(self,
permnos: List[str | int],
fields: List[str],
dates: List[int],
dataset: str = 'daily',
date_field: str = 'date',
exact: bool = True) -> DataFrame:
"""Retrieve multiple fields for lists of permnos and dates
Args:
permnos: List of identifiers to retrieve
dates: List of corresponding dates of center of event window
field: Names of fields to retrieve
dataset: Name of dataset, default 'daily'
date_field: Name of date field in database, default 'date'
exact: Whether require exact date match, or allow most recent
Returns:
DataFrame with permno, date, and retrieved fields across columns
"""
field = "`, `".join(list(fields))
self.sql.load_dataframe(table=self.sql._t,
df=DataFrame({self.identifier: list(permnos),
'date': list(dates)},
index=np.arange(len(permnos))),
index_label='_seq',
replace=True)
if exact:
q = ("SELECT {temp}._seq, {temp}.{identifier}, "
" {temp}.date AS date, `{field}` "
" FROM {temp} LEFT JOIN {table}"
" ON {table}.{identifier} = {temp}.{identifier} "
" AND {table}.{date_field} = {temp}.date").format(
temp=self.sql._t,
identifier=self.identifier,
date_field=date_field,
field=field,
table=self[dataset].key)
df = self.sql.read_dataframe(q).set_index('_seq').sort_index()
df.index.name = None
else:
q = ("SELECT {temp}._seq, {temp}.{identifier}, "
" {temp}.date AS date, `{field}` "
" FROM {temp} LEFT JOIN {table}"
" ON {table}.{identifier} = {temp}.{identifier} "
" AND {table}.{date_field} <= {temp}.date").format(
temp=self.sql._t,
identifier=self.identifier,
field=field,
date_field=date_field,
table=self[dataset].key)
df = self.sql.read_dataframe(q)\
.sort_values(['_seq', 'date'], na_position='first')\
.drop_duplicates(subset=['_seq'], keep='last')\
.set_index('_seq').sort_index()
self.sql.run('drop table if exists ' + self.sql._t)
return df
[docs] def get_section(self,
fields: List[str],
date: int,
dataset: str = 'daily',
date_field: str = 'date',
start: int = -1) -> DataFrame:
"""Return a cross-section of values of fields as of a single date
Args:
fields: list of columns to return
date: Desired date in YYYYMMDD format
date_field: Name of date column in the table, default 'date'
dataset: Dataset to extract from, default 'daily'
start: Non-inclusive date of starting range; if -1 then exact date
Returns:
Most recent row within date range, indexed by permno
Note:
- If start is not -1, then the latest prevailing record for each
between (non-inclusive) start and (inclusive) date is returned
Examples:
>>> t = crsp.get_section('shares', ['shrenddt','shrout'], 'shrsdt', dt)
>>> u = crsp.get_section('names', ['comnam'], 'date', dt-10000)
"""
assert is_list_like(fields)
if self.identifier not in fields:
fields += [self.identifier]
if start < 0:
q = ("SELECT {fields} FROM {table} "
" WHERE {date_field} = {date}").format(
fields=", ".join(fields),
table=self[dataset].key,
date_field=date_field,
date=date)
else:
q = ("SELECT {fields} FROM {table} JOIN"
" (SELECT {permno}, MAX({date_field}) AS {date_field} "
" FROM {table} "
" WHERE {date_field} <= {date} AND {date_field} > {start}"
" GROUP BY {permno}) as a "
" USING ({permno}, {date_field})").format(
fields=", ".join(fields),
table=self[dataset].key,
permno=self.identifier,
date_field=date_field,
date=date,
start=start)
self._print('(get_section)', q)
return self.sql.read_dataframe(q).set_index(self.identifier)
[docs] def get_range(self,
fields: List[str] | Dict[str, str],
beg: int,
end: int,
dataset: str = 'daily',
date_field: str = 'date',
cache_mode: str = "rw") -> DataFrame:
"""Return field values within a date range
Args:
fields: Names of columns to return (and optionally rename as)
beg: Inclusive start date in YYYYMMDD format
end: Inclusive end date in YYYYMMDD format
dataset: Name of dataset to extract from, default 'daily'
date_field: Name of date column in the table, default 'date'
cache_mode: 'r' to try read from cache first, 'w' to write to cache
Returns:
DataFrame multi-indexed by permno, date
"""
assert(fields)
if isinstance(fields, dict):
rename = fields
fields = list(fields.keys())
else:
rename = {k:k for k in fields}
if self.identifier not in fields:
fields += [self.identifier]
rkey = self.rdb.prefix + f"{str(self)}_{'_'.join(fields)}_{beg}_{end}"
if self.rdb and 'r' in cache_mode and self.rdb.redis.exists(rkey):
self._print('(get_range load)', rkey)
return self.rdb.load(rkey)
q = ("SELECT {fields}, {date_field} FROM {table} WHERE "
" {date_field} >= {beg} AND {date_field} <= {end}").format(
fields=", ".join(fields),
table=self[dataset].key,
date_field=date_field,
beg=beg,
end=end)
self._print('(get_range)', q)
r = self.sql.read_dataframe(q).set_index([self.identifier, date_field])
r = r.rename(columns=rename) if rename else r.iloc[:,0]
if 'w' in cache_mode and self.rdb:
self._print('(get_range dump)', rkey)
self.rdb.dump(rkey, r)
return r
[docs]class StocksBuffer(Stocks):
"""Cache daily returns into memory, and provide Stocks-like interface"""
def __init__(self,
stocks: Stocks,
beg: int,
end: int,
dataset: str,
fields: List[str],
identifier: str,
date_field: str = 'date'):
"""Create object and load returns into its cache
Args:
stocks: Stocks structured data object to access stock returns data
beg: Earliest date of daily stock returns to pre-load
end: Latest date of daily stock returns to pre-load
fields: Column names of returns fields, e.g. ['ret', 'retx', 'prc']
dataset: Name of dataset to extract from, e.g. 'daily', 'monthly'
date_field: Name of date field, default 'date'
identifier: Field name of stocks identifier
"""
self.fields = fields
self.identifier = identifier
self.date_field = date_field
self.bd = stocks.bd
self.stocks = stocks
self._dataset = dataset
q = (f"SELECT {identifier}, {date_field}, {', '.join(fields)} "
f" FROM {stocks[dataset].key}"
f" WHERE {date_field}>={beg} AND {date_field}<={end}")
self.rets = stocks.sql.read_dataframe(q)\
.sort_values([self.identifier, date_field])
if not len(self.rets) and _VERBOSE:
print('empty results from stocks.StocksBuffer:', q)
[docs] def get_section(self,
fields: List[str],
date: int,
dataset: str = '',
date_field: str = '',
start: int = -1) -> DataFrame:
"""Return a cross-section of values of fields as of a single date
Args:
dataset: Dataset to extract from
fields: list of columns to return
date_field: Name of date column in the table
date: Desired date in YYYYMMDD format
start: Non-inclusive date of starting range (ignored)
Returns:
Most recent row within date range, indexed by permno
"""
date_field = date_field or self.date_field
dataset = dataset or self._dataset
assert dataset == self._dataset, f"buffered dataset was {self._dataset}"
df = self.rets[self.rets[date_field].eq(date)]\
.drop(columns=[date_field])\
.set_index(self.identifier)[fields]
return df.dropna()
[docs] def get_ret(self, beg: int, end: int, field: str = 'ret') -> Series:
"""Return compounded stock returns between beg and end dates
Args:
beg: Begin date to compound returns
end: End date (inclusive) to compound returns
field: Name of returns field in dataset, in {'ret', 'retx')
"""
df = self.rets.loc[self.rets[self.date_field].between(beg, end),
[self.identifier, field]].dropna()
df.loc[:, field] += 1
df = (df.groupby(self.identifier).prod(min_count=1) - 1).fillna(0)
return df[field]
[docs]class StocksFrame(Stocks):
"""Mimic Stocks object given an input DataFrame of returns
Args:
df: DataFrame of returns with date in index and permno in columns
rsuffix: replicate output columns and append rsuffix to column name
identifier: name of identifier column
Notes:
- Limited interface to manipulate DataFrame of asset returns as
Stocks-like. Use when sql and BusDay not available.
"""
[docs] class bd:
"""Class to mimic basic behavior of BusDay object"""
[docs] @staticmethod
def begmo(date: int | List[int]) -> int | List[int]:
"""Returns same date"""
return date
[docs] @staticmethod
def endmo(date: int | List[int]) -> int | List[int]:
"""Returns same date"""
return date
[docs] @staticmethod
def date_tuples(dates: List[int]) -> List[Tuple[int, int]]:
"""Returns adjacent dates as the holding date tuples"""
return list(zip(dates[1:], dates[1:]))
def __init__(self,
df: DataFrame,
rsuffix: str = '',
identifier: str = 'permno'):
self.data = DataFrame(df)
if rsuffix is not None:
self.data = self.data.join(self.data, how='left', rsuffix=rsuffix)
self.identifier = identifier
[docs] def get_series(self, permnos: str | int | List[str | int],
*arg, **kwarg) -> Series:
"""Return the series for target permnos"""
return self.data[permnos]
[docs] def get_ret(self, beg: int, end: int, field: str = 'ret', **kwargs) -> Series:
"""Compounded returns between beg and end (inclusive) dates"""
df = DataFrame((self.data.loc[(self.data.index >= beg) &
(self.data.index <= end)] + 1).prod() - 1)
df.columns = [field]
df.index.name = self.identifier
return df[field]