Source code for finds.structured.structured

"""Base class for structured data sets

- CRSP (daily, monthly, names, delistings, distributions, shares outstanding)
- S&P/CapitalIQ Compustat (Annual, Quarterly, Key Development, customers)
- IBES Summary

Notes:

- Optionally cache SQL query results to Redis store

Copyright 2022-2024, Terence Lim

MIT License
"""
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from pandas.api.types import is_list_like, is_integer_dtype
from sqlalchemy import Table, Column, Index
from sqlalchemy import Integer, String, Float, SmallInteger, Boolean, BigInteger
from datetime import datetime
from typing import Dict, List, Tuple, Any
from finds.database.sql import SQL, as_dtypes
from finds.database.redisdb import RedisDB
from .busday import BusDay

_VERBOSE = 0

[docs]class Structured(object): """Base class for interface to structured datasets, stored in SQL Args: sql: Connection instance to mysql database bd: Custom business calendar instance tables: Sqlalchemy Tables and names defined for this datasets group identifier: Name of field of unique identifier key name: Display name for this datasets group rdb: Connector to Redis cache store, if desired Attributes: identifier: Field name of identifier key by this dataset group name: Display name for this dataset group """ def __init__(self, sql: SQL, bd: BusDay, tables: Dict[str, Table], identifier: str, name: str, rdb: RedisDB | None = None, verbose: int = _VERBOSE): """Initialize a connection to structured datasets""" self.bd = bd self.sql = sql self.tables_ = tables self.rdb = rdb self.identifier = identifier self.name = name self._verbose = verbose def _print(self, *args, verbose: int = _VERBOSE, level: int = 0, **kwargs): """helper to print verbose messages""" if max(verbose, self._verbose) > 0: print(*args, **kwargs) def __str__(self) -> str: """String to identify this class of datasets group""" return self.name
[docs] def __getitem__(self, dataset: str) -> Table: """Return the table object corresponding to a dataset name""" assert dataset in self.tables_ return self.tables_[dataset]
[docs] def drop_all(self): """Drop all associated tables from SQL database""" if self.tables_: for table in self.tables_.values(): table.drop()
[docs] def load_dataframe(self, df: DataFrame, table: Table, to_replace: Any = None, value: Any = None, overwrite: bool = True) -> int: """Load dataframe to SQLAlchemy table object using associated schema Args: df: DataFrame to load from table: Destination Table object to_replace: Original value or list of values to replace value: Value to replace with overwrite: Whether to overwrite or append Returns: Number of rows loaded Notes: - DataFrame should contain same column names as Table, - DataFrame columns types are converted to Table column types - Duplicate primary fields are removed, keeping first """ df = as_dtypes(df=df, columns={k.lower(): v.type for k, v in table.columns.items()}) if to_replace is not None and value is not None: df = df.replace(to_replace, value) primary = table.primary_key.columns.keys() if primary: df = df.sort_values(by=primary) df.drop_duplicates(primary, keep='first', inplace=True) df = df.dropna() # NaN's last if overwrite: table.drop(checkfirst=True) #table.create(checkfirst=True) self.sql.create_all() self.sql.load_dataframe(table=table.key, df=df, index_label=None) self._print("(structured store)", table.key, len(df)) return len(df)
[docs] def read_dataframe(self, table: str, where: str = '') -> DataFrame: """Read signal values from sql and return as data frame Args: table: Table to read from where: Where clause str for sql select Returns: DataFrame of query results """ where = bool(where)*'WHERE ' + where return self.sql.read_dataframe(f"SELECT * FROM {table} {where}")
[docs] def load_csv(self, dataset: str, csvfile: str, drop: Dict[str, List[Any]] = {}, keep: Dict[str, List[Any]] = {}, replace: Dict[str, Tuple[Any, Any]] = {}, sep: str = ',', encoding: str = 'latin-1', header: Any = 0, low_memory: bool = False, na_filter: bool = False, **kwargs) -> DataFrame: """Insert ignore into SQL table from csvfile, and return as DataFrame Args: dataset: dataset name csvfile: csv file name keep: {column: values} keep rows whose columns have any of values drop: {column: values} drop rows with any given value in column replace: {column: [old,new]} specifies values to replace in column sep, encoding, header, low_memory, na_filter: args for pd.read_csv Returns: DataFrame containing loaded data Notes: - Create new table, if not exists, using associated schema - New records with duplicate key are dropped (insert ignore used) """ # Load csv to DataFrame table = self[dataset] # Table object for dataset assert table is not None df = pd.read_csv(csvfile, sep=sep, encoding=encoding, header=header, low_memory=low_memory, na_filter=na_filter) # 'utf-8' codec can't decode byte 0xf6 => encoding='latin-1' df.columns = df.columns.map(str.lower).map(str.rstrip) self._print('(read_csv)', len(df), csvfile) # drop rows where col has value val for col, vals in drop.items(): rows = df.index[df[col].isin(vals)] self._print('Dropping', len(rows), 'rows with', col, 'in', vals) df.drop(index=rows, inplace=True) # clean up column dtypes and rows df = as_dtypes( df=df, columns={k.lower(): v.type for k, v in table.columns.items()}, drop_duplicates=[p.key.lower() for p in table.primary_key], replace=replace) self._print("(load_csv)", len(df), table) # drop again in case dtypes got changed for col, vals in drop.items(): rows = df.index[df[col].isin(vals)] df.drop(index=rows, inplace=True) # keep rows where col has value vals rows = df.index for col, vals in keep.items(): rows = rows[df.loc[rows, col].isin(vals).values] self._print('Keeping', len(rows), 'of', len(df), 'rows with', keep) df = df.loc[rows] # Create sql table and load from DataFrame #table.create(self.sql.engine, checkfirst=True) self.sql.create_all() self.sql.load_dataframe(table=table.key, df=df, index_label=None) return df
[docs] def build_lookup(self, source: str, target: str, date_field: str, dataset: str, fillna: Any) -> Any: """Helper to build lookup of target from source identifiers Args: source: Name of source identifier key target: Name of target identifier key to return date_field: Name of date field in database table dataset: Internal name of table containing identifier mappings fillna: Value to return if not found """ table = self[dataset] # physical name of table assert table is not None assert source in table.c assert target in table.c return Lookup(sql=self.sql, source=source, target=target, date_field=date_field, table=table.key, fillna=fillna)
[docs] def get_permnos(self, keys: List[str], date: int, link_perm: str, link_date: str, permno: str) -> DataFrame: """Returns matching permnos as of a prevailing date from 'links' table Args: keys: Input list of identifiers to lookup date: Prevailing date of link link_perm: Name of permno field in 'links' table link_date: Name of link date field in 'links' table permno: Name of field to output permnos to Returns: List of Linked permnos, as of prevailing date; missing set to 0 """ assert self['links'] is not None key = self.identifier s = ("SELECT {links}.{key}," " {links}.{link_perm} AS {permno}, " " {links}.{link_date} FROM" " (SELECT {key}, MAX({link_date})" " AS dt FROM {links} " " WHERE {link_date} <= {date} " " AND {link_perm} > 0 " " GROUP BY {key}, {link_date}) AS a" " INNER JOIN {links} " " ON {links}.{key} = a.{key} " " AND {links}.{link_date} = a.dt").format( key=self.identifier, date=date, links = self['links'].key, # Table object for links dataset link_perm=link_perm, link_date=link_date, permno=permno) permnos = self.sql.read_dataframe(s).set_index(self.identifier) permnos = permnos[~permnos.index.duplicated(keep='last')] keys = DataFrame(keys) keys.columns = [self.identifier] result = keys.join(permnos, on=self.identifier, how='left')[permno] return result.fillna(0).astype(int).to_list()
[docs] def get_linked(self, dataset: str, fields: List[str], date_field: str, link_perm: str, link_date: str, where: str = '', limit: int | str | None = None) -> DataFrame: """Query a dataset, and join 'links' table to return data with permno Args: dataset: Name internal Table to query data from fields: Data fields to retrieve date_field: Name of date field in data table link_date: Name of link date field in 'links' table link_perm: Name of permno field in 'links' table where: Where clause (optional) limit: Maximum rows to return (optional) Returns: DataFrame containing result of query """ if where: where = ' AND ' + where limit = " LIMIT " + str(limit) if limit else '' fields = ", ".join([f"{self[dataset].key}.{f.lower()}" for f in set([self.identifier, date_field] + fields)]) q = ("SELECT {links}.{link_perm} as {permno}, " " {links}.{link_date}, " " {fields} " "FROM {table} " "LEFT JOIN {links} " " ON {table}.{key} = {links}.{key} " " AND {links}.{link_date} = " " (SELECT MAX(c.{link_date}) AS {link_date} " " FROM {links} AS c " " WHERE c.{key} = {table}.{key} " " AND (c.{link_date} <= {table}.{date_field} " " OR c.{link_date} = 0)) " "WHERE {links}.{link_perm} IS NOT NULL AND {links}.{link_perm} > 0" " AND {table}.{key} IS NOT NULL {where} {limit}").format( links=self['links'].key, permno='permno', link_perm=link_perm, link_date=link_date, fields=fields, table=self[dataset].key, key=self.identifier, date_field=date_field, where=where, limit=limit) self._print("(get_linked)", q) return self.sql.read_dataframe(q)
[docs]class Lookup: """Loads dated identifier mappings to memory, to lookup by date Args: sql: SQL connection instance source: Name of source identifier key target: Name of target identifier key to return date_field: Name of date field in database table table: Physical SQL table name containing identifier mappings fillna: Value to return if not found """ def __init__(self, sql: SQL, source: str, target: str, date_field: str, table: str, fillna: Any): lookups = sql.read_dataframe(f"SELECT {source} as source," f" {target} AS target," f" {date_field} AS date" f" FROM {table}" f" WHERE {source} IS NOT NULL" f" AND {target} IS NOT NULL") lookups = lookups.sort_values(['source', 'target', 'date']) try: lookups = lookups.loc[lookups['source'] > 0] except: lookups = lookups.loc[lookups['source'].str.len() > 0] self.lookups = lookups.groupby('source') self.keys = set(self.lookups.indices.keys()) self.source = source self.target = target self.fillna = fillna
[docs] def __call__(self, label: List | str, date: int = 99999999) -> Any: """Return target identifiers matched to source as of date""" if is_list_like(label): return [self(b) for b in label] if label in self.keys: a = self.lookups.get_group(label) b = a[a['date'] <= date].sort_values('date') return (b.iloc[-1] if len(b) else # latest before prevailing date a.iloc[0]).at['target'] # else first return self.fillna
[docs] def __getitem__(self, labels): return self(labels)