Source code for finds.structured.crsp

"""CRSP daily and monthly stock files

Copyright 2022-2024, Terence Lim

MIT License
"""
from typing import Any, Dict, List, Tuple
import numpy as np
import pandas as pd
import time
from pandas import DataFrame, Series
from pandas.api.types import is_list_like, is_integer_dtype
from sqlalchemy import Table, Column, Index
from sqlalchemy import Integer, String, Float, SmallInteger, Boolean, BigInteger
from finds.database.sql import SQL
from finds.database.redisdb import RedisDB
from finds.structured.busday import BusDay
from finds.structured.stocks import Stocks, StocksBuffer
from finds.recipes.filters import fractile_split
_VERBOSE = 1

[docs]class CRSP(Stocks): """Implements an interface to CRSP structured stocks dataset Args: sql: Connection to mysql database bd: Business dates object rdb: Optional connection to Redis for caching selected query results monthly: Use monthly (True) or daily file; default (None) autoselects Notes: - Earliest CRSP prc is 19251231 """ def __init__(self, sql: SQL, bd: BusDay, rdb: RedisDB | None = None, monthly: bool | None = None, verbose: int = _VERBOSE): """Initialize connection to CRSP datasets""" tables = { 'daily': sql.Table( 'daily', Column('permno', Integer, primary_key=True), Column('date', Integer, primary_key=True), Column('bidlo', Float), Column('askhi', Float), Column('prc', Float), Column('vol', Float), Column('ret', Float), Column('retx', Float), # need retx! Column('bid', Float), Column('ask', Float), Column('shrout', Integer, default=0), # Column('shrout', Float), Column('openprc', Float), ), 'shares': sql.Table( 'shares', Column('permno', Integer, primary_key=True), Column('shrout', Integer, default=0), Column('shrsdt', Integer, primary_key=True), Column('shrenddt', Integer, primary_key=True), ), 'delist': sql.Table( 'delist', Column('permno', Integer, primary_key=True), Column('dlstdt', Integer, primary_key=True), Column('dlstcd', SmallInteger, primary_key=True), Column('nwperm', Integer, default=0), # '0' - '99841' (int64) Column('nwcomp', Integer, default=0), # '0' - '90044' (int64) Column('nextdt', Integer, default=0), # 'String(8) '19870612' @0 Column('dlamt', Float), # '0' - '2349.5' (float64) Column('dlretx', Float), # 'Float' '-0.003648' @ 3 Column('dlprc', Float), # '-1315' - '2349.5' (float64) Column('dlpdt', Integer, default=0), #'String(8)''19870612' @0 Column('dlret', Float), # 'Float' '-0.003648' @ 3 ), 'dist': sql.Table( 'dist', Column('permno', Integer, primary_key=True), Column('distcd', SmallInteger, primary_key=True), Column('divamt', Float), Column('facpr', Float), Column('facshr', Float), Column('dclrdt', Integer, default=0), Column('exdt', Integer, primary_key=True), Column('rcrddt', Integer, default=0), Column('paydt', Integer, default=0), Column('acperm', Integer, default=0), Column('accomp', Integer, default=0), ), 'names': sql.Table( 'names', Column('date', Integer, primary_key=True), Column('comnam', String(32)), Column('ncusip', String(8)), Column('shrcls', String(1)), Column('ticker', String(5)), Column('permno', Integer, primary_key=True), Column('nameendt', Integer, default=0), Column('shrcd', SmallInteger, default=0), Column('exchcd', SmallInteger, default=0), Column('siccd', SmallInteger, default=0), Column('tsymbol', String(7)), Column('naics', Integer, default=0), Column('primexch', String(1)), Column('trdstat', String(1)), Column('secstat', String(4)), Column('permco', Integer, default=0), sql.Index('ncusip', 'date') ), 'monthly': sql.Table( 'monthly', Column('permno', Integer, primary_key=True), Column('date', Integer, primary_key=True), Column('prc', Float), Column('ret', Float), Column('retx', Float), Column('dlstcd', SmallInteger), Column('dlret', Float) ) } super().__init__(sql, bd, tables, identifier='permno', name='CRSP', rdb=rdb, verbose=verbose) self._monthly = monthly
[docs] def build_lookup(self, source: str, target: str, date_field='date', dataset: str = 'names', fillna: Any = 0) -> Any: """Build lookup function to return target identifier from source""" return super().build_lookup(source=source, target=target, date_field=date_field, dataset=dataset, fillna=fillna)
[docs] def get_cap(self, date: int, cache_mode: str = "rw", use_shares: bool = False, use_permco: bool = False) -> Series: """Compute a cross-section of market capitalization values Args: date: YYYYMMDD int date of market cap cache_mode: 'r' to try read from cache first, 'w' to write to cache use_shares: If True, use shrout from 'shares' table, else 'daily' use_permco: If True, sum caps by permco, else by permno Returns: Series of market cap indexed by permno """ rkey = self.rdb.prefix + f"cap{'co' if use_permco else ''}_{str(self)}_{date}" if self.rdb and 'r' in cache_mode and self.rdb.redis.exists(rkey): self._print('(get_cap load)', rkey) return self.rdb.load(rkey)['cap'] if use_shares: # use shares tables permnos = list(self.get_section(dataset='daily', fields=[self.identifier], date_field='date', date=date).index) self._print('LENGTH PERMNOS =', len(permnos)) prc = self.get_section(dataset='daily', fields=['prc'], date_field='date', date=date).reindex(permnos) self._print('NULL PRC =', prc['prc'].isna().sum()) shr = self.get_section(dataset='shares', fields=['shrout'], date_field='shrsdt', date=date, start=0).reindex(permnos) self._print('NULL SHR =', shr['shrout'].isna().sum()) df = DataFrame(shr['shrout'] * prc['prc'].abs(), columns=['cap']) else: # where 'daily' table contains 'shrout' cap = self.get_section(dataset='daily', fields=['prc', 'shrout'], date_field='date', date=date) df = DataFrame(cap['shrout'] * cap['prc'].abs(), columns=['cap']) if use_permco: df = df.join(self.get_section(dataset='names', fields=['permco'], date_field='date', date=date, start=0).reindex(df.index)) sumcap = df.groupby(['permco'])[['cap']].sum() df = df[['permco']].join(sumcap, on='permco')[['cap']] self._print('NULL CAP =', sum(df['cap'].isna())) df = df[df > 0].dropna() if self.rdb and 'w' in cache_mode: self._print('(get_cap dump)', rkey) self.rdb.dump(rkey, df) return df['cap']
[docs] def get_universe(self, date: int, cache_mode: str = "rw") -> DataFrame: """Return standard CRSP universe of US-domiciled common stocks Args: date: Rebalance date (YYYYMMDD) cache_mode: 'r' to try read from cache first, 'w' to write to cache Returns: DataFrame of screened universe, indexed by permno, with columns: market cap "decile" (1..10), "nyse" bool, "siccd", "prc", "cap" Notes: - Market cap must be available on date, with non-missing prc - shrcd in [10, 11], exchcd in [1, 2, 3] """ assert date == self.bd.offset(date), f"get_universe: {date} not valid date" rkey = self.rdb.prefix + "_".join(["universe", str(self), str(date)]) if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(rkey): self._print('(get_universe load)', rkey) df = self.rdb.load(rkey) else: df = self.get_section(dataset='daily', fields=['prc', 'shrout'], date_field='date', date=date)\ .fillna(0) df['cap'] = df['prc'].abs().mul(df['shrout']) df = df.join(self.get_cap(date=date, cache_mode=cache_mode, use_shares=True, use_permco=True), rsuffix='co', how='inner')\ .fillna(0) df = df.join(self.get_section(dataset='names', fields=['shrcd', 'exchcd', 'siccd', 'naics'], date_field='date', date=date, start=0), how='inner') self._print('LENGTH PERMNOS', str(len(df))) self._print('PRC NULL:', df['prc'].isna().sum(), 'NEG:', df['prc'].le(0).sum()) self._print('CAP NON-POSITIVE:', len(df) - df['cap'].gt(0).sum()) df = df[df['capco'].gt(0) & df['cap'].gt(0) & df['shrcd'].isin([10, 11]) & df['exchcd'].isin([1, 2, 3])] df['nyse'] = df['exchcd'].eq(1) # nyse indicator df['decile'] = fractile_split(values=df['capco'], # size deciles pct=np.arange(10, 100, 10), keys=df.loc[df['nyse'], 'capco'], ascending=False) df = df[['cap', 'capco', 'decile', 'nyse', 'siccd', 'prc', 'naics']] if 'w' in cache_mode and self.rdb: self._print('(get_universe dump)', rkey) self.rdb.dump(rkey, df) return df
[docs] def get_divamt(self, beg: int, end: int) -> DataFrame: """Accmumulates total dollar dividends between beg and end dates Args: beg: Inclusive start date (YYYYMMDD) end: Inclusive end date (YYYYMMDD) Returns: DataFrame with accumulated divamts = per share divamt * shrout """ q = ("SELECT {dist}.{identifier} AS {identifier}, " " SUM({table}.shrout * {dist}.divamt) AS divamt " "FROM {dist} INNER JOIN {table} " " ON {table}.{identifier} = {dist}.{identifier} AND " " {table}.date = {dist}.exdt " " WHERE {dist}.divamt > 0 AND {dist}.exdt >= {beg} " " AND {dist}.exdt <= {end} GROUP BY {identifier} ").format( dist=self['dist'].key, identifier=self.identifier, table=self['daily'].key, beg=beg, end=end) return self.sql.read_dataframe(q).set_index(self.identifier)
dlstcodes_ = set([500, 520, 580, 584]).union(list(range(551,575)))
[docs] @classmethod def is_dlstcode(self, dlstcd: Series | int) -> Series | int: """Delisting returns if missing for these codes should be -0.3""" if isinstance(dlstcd, int): return dlstcd in CRSP.dlstcodes_ else: return dlstcd.isin(CRSP.dlstcodes_)
[docs] def get_dlret(self, beg: int, end: int, dataset: str = 'delist') -> Series: """Compounded delisting returns from beg to end dates for all permnos Args: beg: Inclusive start date (YYYYMMDD) end: Inclusive end date (YYYYMMDD) dataset: either 'delist' or 'monthly' containing delisting returns Returns: Series of delisting returns Notes: Sets to -0.3 if missing and code in [500, 520, 551...574, 580, 584] """ q = ("SELECT (1+dlret) AS dlret, {identifier}, dlstcd FROM {table} " " WHERE {dlstdt} >= {beg} AND {dlstdt} <= {end}" " AND dlstcd > 0").format( table=self[dataset].key, identifier=self.identifier, dlstdt='dlstdt' if dataset=='delist' else 'date', beg=beg, end=end) self._print('(get_dlst)', q) df = self.sql.read_dataframe(q).sort_values(self.identifier) if len(df): df.loc[(df['dlret'].isna() & CRSP.is_dlstcode(df['dlstcd'])).values, 'dlret'] = 0.7 df = df[[self.identifier, 'dlret']].groupby(self.identifier)\ .prod(min_count=1)\ .dropna() - 1 return df['dlret']
[docs] def get_ret(self, beg: int, end: int, dataset: str = 'daily', field: str = 'ret', **kwargs) -> Series: """Get compounded returns, with option to include delist returns Args: beg: starting returns date end: ending returns date dataset: name of returns dataset (ignore if initialized as 'monthly') field: Name of returns field in dataset, in {'ret', 'retx') """ # select or autoselect monthly data set if self._monthly or dataset == 'monthly': use_monthly = True elif self._monthly is None: use_monthly = ('monthly' in self.tables_ and beg <= self.bd.begmo(beg) and end >= self.bd.endmo(end)) else: use_monthly = False if use_monthly: # set beg and end to envelope calendar month dates dataset = 'monthly' beg = (beg // 100) * 100 end = (end // 100 * 100) + 99 df = super().get_ret(beg, end, dataset=dataset, field=field, **kwargs) # if monthly dataset, then adjust for delisting return if use_monthly: dlst = self.get_dlret(beg, end, dataset='monthly') df = DataFrame(df).join(dlst, how='outer') df = (1+df[field].fillna(0)) * (1+df['dlret'].fillna(0)) - 1 return df.rename(field)
[docs] def cache_ret(self, dates: List[Tuple[int, int]], replace: bool, dataset: str = 'daily', field: str = 'ret', date_field: str ='date'): """Pre-generate compounded returns from daily for redis store""" assert dataset == 'daily', "dataset must be daily" super().cache_ret(dates=dates, replace=replace, field=field, date_field=date_field, dataset=dataset)
[docs]class CRSPBuffer(StocksBuffer): """Cache returns into memory, and provide Stocks-like interface""" def __init__(self, stocks: Stocks, beg: int, end: int, fields: List[str], dataset: str): """Create object and load returns into its cache Args: stocks: Stocks structured data object to access stock returns data beg: Earliest date of daily stock returns to pre-load end: Latest date of daily stock returns to pre-load fields: Column names of returns fields, e.g. ['ret', 'retx', 'prc'] dataset: Name of dataset to extract from, e.g. 'daily', 'monthly' """ if dataset == 'monthly' and 'dlret' not in fields: fields += ['dlret'] super().__init__(stocks=stocks, beg=beg, end=end, dataset=dataset, identifier=stocks.identifier, fields=fields)
[docs] def get_ret(self, beg: int, end: int, field: str = 'ret') -> Series: """Return compounded stock returns between beg and end dates Args: beg: Begin date to compound returns end: End date (inclusive) to compound returns field: Name of returns field in dataset, in {'ret', 'retx') """ assert field in ['ret', 'retx'] df = super().get_ret(beg=beg, end=end, field=field) if self._dataset == 'monthly': # CRSP partial month delisting return dlst = super().get_ret(beg=beg, end=end, field='dlret') df = DataFrame(df).join(dlst, how='outer') df = (1+df[field].fillna(0)) * (1+df['dlret'].fillna(0)) - 1 return df.fillna(0).rename(field)
[docs] def get_universe(self, date: int, cache_mode: str = "rw") -> DataFrame: """Simply pass through original method to retrieve universe""" return self.stocks.get_universe(date=date, cache_mode=cache_mode)
if __name__ == "__main__": from pathlib import Path from finds.structured import CRSP, CRSPBuffer from secret import credentials, paths, CRSP_DATE VERBOSE = 1 sql = SQL(**credentials['sql'], verbose=VERBOSE) user = SQL(**credentials['user'], verbose=VERBOSE) rdb = RedisDB(**credentials['redis']) bd = BusDay(sql, endweek=3) # weekly frequency, 3=Wed-close-to-Wed-close crsp = CRSP(sql, bd, rdb=rdb, verbose=VERBOSE) downloads = paths['data'] / 'CRSP' ''' # load CRSP: TODO handle missing return codes (< -1, see below) df = crsp.load_csv('names', downloads / 'names.txt.gz', sep='\t') print(len(df), '~', 103383) df = crsp.load_csv('shares', downloads / 'shares.txt.gz', sep='\t') print(len(df), '~', 2346131) df = crsp.load_csv('dist', downloads / 'dist.txt.gz', sep='\t') print(len(df), '~', 935880) df = crsp.load_csv('delist', downloads / 'delist.txt.gz', sep='\t') print(len(df), '~', 33584) df = crsp.load_csv('monthly', downloads / 'monthly.txt.gz', sep='\t') print(len(df), '~', 4606907) ''' for s in sorted(downloads.glob('daily*.txt.gz'), reverse=True): tic = time.time() df = crsp.load_csv('daily', csvfile=s, sep='\t', drop={'permno': ['PERMNO', '.'], 'date': ['.'], 'shrout':['.']}) print(s, round(time.time() - tic, 0), 'secs:', len(df), s) # Pre-generate weekly returns and save in Redis cache begweek = 19251231 endweek = CRSP_DATE rebaldates = bd.date_range(begweek, endweek, freq='weekly') r = bd.date_tuples(rebaldates) batchsize = 40 batches = [r[i:(i+batchsize)] for i in range(0, len(r), batchsize)] batches.reverse() for batch in batches: crsp.cache_ret(batch, field='ret', replace=True) crsp.cache_ret(batch, field='retx', replace=True) # test CRSPBuffer stocks = CRSPBuffer(crsp, beg=20210101, end=20211231, fields=['ret'], dataset='monthly') beg, end = 20210101, 20210131 df = stocks.get_ret(beg, end) print(df) m = crsp.get_ret(beg, end, cache_mode="") print(m) ''' # changed so now may get from monthly column or compound delist table def get_dlstret(self, beg: int, end: int, cache_mode: str = "rw") -> Series: """Compounded delisting returns from beg to end dates for all permnos Args: beg: Inclusive start date (YYYYMMDD) end: Inclusive end date (YYYYMMDD) cache_mode: 'r' to try read from cache first, 'w' to write to cache Returns: Series of compounded returns """ rkey = self.rdb.prefix "_".join(["dlst", str(self), str(beg), str(end)]) if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(rkey): self._print("(get_dlstret load)", rkey, str(self)) return self.rdb.load(rkey)['ret'] q = ("SELECT (1+dlret) AS ret, {identifier} FROM {table} " " WHERE dlstdt >= {beg} AND dlstdt <= {end}").format( table=self['delist'].key, identifier=self.identifier, beg=beg, end=end) self._print('(get_dlst)', q) df = self.sql.read_dataframe(q).sort_values(self.identifier).fillna(0) if len(df): df = (df.groupby(self.identifier).prod(min_count=1)-1).dropna() if 'w' in cache_mode and self.rdb: self._print("(get_dlstret dump)", rkey, str(self)) self.rdb.dump(rkey, df) return df['ret'] '''