Source code for finds.backtesting.eventstudy

"""Evaluate event studies

- Event studies: cumulative abnormal returns

Copyright 2022, Terence Lim

MIT License
"""
import sys
from os.path import dirname, abspath
import numpy as np
import scipy
from matplotlib import dates as mdates
import matplotlib.pyplot as plt
import pandas as pd
from pandas import DataFrame, Series
from pandas.api import types
import statsmodels
import statsmodels.api as sm
import statsmodels.formula.api as smf
from patsy.builtins import Q
from sqlalchemy import Integer, String, Float, Boolean, Column
from typing import Dict, Any, Tuple, List
from finds.structured.structured import Structured
from finds.structured.stocks import Stocks
from finds.structured.benchmarks import Benchmarks
from finds.database.sql import SQL
from finds.recipes.filters import fft_align
from finds.utils.plots import plot_date, plot_bands

_VERBOSE = 1

[docs]class EventStudy(Structured): """Class to support statistical tests of event studies Args: sql: connection to user database to store results bench: Benchmark dataset of index returns for market adjustment stocks: Stocks structured dataset containing stock returns max_date: last date to run event study table: physical name of table in user database to append summary results """ def __init__(self, sql: SQL, bench: Benchmarks, stocks: Stocks, max_date: int, table: str = 'events'): """Initialize class instance for event study calculations""" table = sql.Table(table, Column('label', String(32), primary_key=True), Column('model', String(32), primary_key=True), Column('beg', Integer), Column('end', Integer), Column('rows', Integer), Column('days', Integer), Column('rho', Float), Column('tau', Float), Column('effective', Float), Column('window', Float), Column('window_t', Float), Column('post', Float), Column('post_t', Float), Column('created', Integer)) self.bd = bench.bd self.sql = sql self.table_ = table self.identifier = 'label' self.name = 'eventstudy' self.bench = bench self.stocks = stocks self.max_date = max_date self.summary_ = {} # to hold summary statistics self.plot_ = {} # to hold plottable series
[docs] def __call__(self, label: str, df: DataFrame, left: int, right: int, post: int, date_field: str) -> DataFrame: """Construct event window market-adjusted returns where valid/available Args: label: Unique label (used for graph labels and table name) df: Input DataFrame of stock identifiers and event dates left: Start (inclusive) of announcement window around event date right: End (inclusive) of announcement window around event date post: End (inclusive) post-drift period date_field: Name of date column in df Notes: - populates 'car' attribute with cumulative arithmetic abnormal returns - populates 'bhar' attribute with abnormal cumulative product returns """ ret = self.stocks.get_window(dataset='daily', field='ret', permnos=df[self.stocks.identifier], dates=df[date_field], date_field='date', left=left, right=post)\ .rename(columns={self.stocks.identifier: 'permno'}) cols = list(range(post - left + 1)) # require at least window and one post-event returns available rets = ret[ret[cols[:(right-left+2)]].notna().all(axis=1)] rets.index = np.arange(len(rets)) # get market returns for market-model adjustment mkt = self.bench.get_window(dataset='daily', field='ret', permnos=['Mkt-RF'] * len(rets), date_field='date', dates=rets['date'], left=left, right=post)\ .rename(columns={self.bench.identifier: 'permno'}) rf = self.bench.get_window(dataset='daily', field='ret', permnos=['RF'] * len(rets), date_field='date', dates=rets['date'], left=left, right=post)\ .rename(columns={self.bench.identifier: 'permno'}) mkt = (mkt[cols] + rf[cols]).reset_index(drop=True) ar = (rets[cols] - mkt[cols]).cumsum(axis=1).fillna(0) br = ((1 + rets[cols]).cumprod(axis=1) - (1 + mkt[cols]).cumprod(axis=1)).fillna(0) self.car = rets[['permno', 'date']].join(ar) self.bhar = rets[['permno', 'date']].join(br) self.left = left self.right = right self.post = post self.rows = rets[['permno', 'date']] self.label = str(label) return self.rows
def _event_key(self, label: str) -> str: """Construct name of table to read/write event returns, given label""" return self.table_.key + '_' + str(self.label),
[docs] def write(self) -> int: """Store daily cumulative returns in new user table""" cols = list(range(self.post - self.left + 1)) df = [getattr(self, ar).melt(id_vars=['permno', 'date'], value_vars=cols, var_name='num', value_name=ar) for ar in ['car','bhar']] df = pd.concat([df[0], df[1].iloc[:, -1]], axis=1) df['num'] -= (self.right - self.left) table = self.sql.Table(self._event_key(self.label), Column('permno', Integer), Column('date', Integer), Column('num', Integer), Column('car', Float), Column('bhar', Float)) return self.load_dataframe(df, table)
[docs] def read(self, left: int, right: int, post: int) -> DataFrame: """Fetch daily cumulative abnormal returns series from user table Args: label: name suffix of table to retrieve from left: offset of left announcement window right: offet of right announcement window post: offset of right post-announcement window Returns: DataFrame of car and bhar daily cumulative returns """ df = self.read_dataframe(self._event_key(self.label),) df['num'] += (right - left) self.car = df.pivot(index=['permno', 'date'], columns=['num'], values='car').reset_index() self.bhar = df.pivot(index=['permno', 'date'], columns=['num'], values='bhar').reset_index() self.left = left self.right = right self.post = post return len(df)
_models = ['sbhar', 'scar'] # 'bhar', 'car', 'adj-sbhar', 'adj-scar', 'conv-sbhar', 'conv-scar']
[docs] def fit(self, model: str = 'scar', rows: List[int] = [], rho : float | None = None) -> Dict[str, Dict]: """Compute car, or bhar, and summary statistics of selected rows Args: model : name of predefined model to compute summary statistics rows : Subset of rows to evaluate; empty list selects all rows car : Whether to evaluate CAR (True) or BHAR (False) rho : Average cross correlation of event returns. If None, then compute from max convolution of post-announcement returns Returns: Dict of summary statistics of full and subsamples - 'window', 'window-tvalue' are CAR at end of event window - 'post', 'post-tvalue' are CAR from event end till post-drift end - 'car', 'car-stderr' are daily CAR from beginning of announcement - 'rows' is number of input rows - 'days' is number of unique dates (same announce dates are grouped) - 'effective' is the number of days after correlation effects TODO names of models: - "car" or "bhar" - unstandardized or 's'tandardized or 'adj-s'tandardized (simple rho) or 'conv-s'tandardized (with average rho from convolution max) Notes: - Kolari and Pynnonen (2010) eqn[3] cross-sectionally adjusted Patell or BMP with avg correlation: multiply variance by 1 + (p*(n-1)) - Kolari, Pape, Pynnonen (2018) eqn[15] adjusted by average overlap (tau) and average covariance ratio (correlation rho) """ #assert model in self._models window = self.right - self.left + 1 cols = ['date'] + list(range(self.post-self.left+1)) is_car = model.endswith('car') rets = (self.car if is_car else self.bhar)[cols] cumret = (rets.iloc[rows] if len(rows) else rets).copy() n = int(len(cumret)) b = int(min(cumret['date'])) e = int(max(cumret['date'])) L = self.post - self.left # total period including announcement and post D = self.post - self.right # length of post-announcement period # if announce date not a trading day, set to (after close of) previous cumret['date'] = self.bd.offset(cumret['date']) # portfolio method for same announcement dates cumret = cumret.groupby('date').mean() # Average Cumulative AR means = cumret.mean() # 1. compute the average overlap (truncate at 0) of all pairs date_idx = self.bd.date_range(min(cumret.index), max(cumret.index)) date_idx = Series(index=date_idx, data=np.arange(len(date_idx))) date_idx = np.sort(date_idx[cumret.index].values) overlap = [] for k, v in enumerate(date_idx[:-1]): x = D - (date_idx[k+1:] - v) # difference in dates less than D x[x < 0] = 0 # truncate "negative" overlaps overlap.extend(x.tolist()) tau = np.mean(overlap) / D # average of overlap days, divided by max length # 2. compute ratio of average covariance to variance as average max corr if rho is None: rets = np.log(1+cumret.where(cumret > -0.99, -0.99))\ .diff(axis=1)\ .iloc[:, window:]\ .fillna(0) corr, disp, cols = fft_align(rets.values.T) rho = np.nanmean(corr) # 3. apply simplification of eqn(15) of Kolari et al 2018 effective = len(cumret) / (1 + (rho * tau * (len(cumret) - 1))) stderr = cumret.std() / np.sqrt(effective) posterr = (cumret.iloc[:, window:]\ .sub(cumret.iloc[:, window-1], axis=0))\ .std() / np.sqrt(effective) #cumret.iloc[:, window:].std() / np.sqrt(effective) tstat = means[window - 1] / stderr[window - 1] post = cumret.iloc[:, -1] - cumret.iloc[:, window - 1] post_sem = post.std() / np.sqrt(effective) summary = {model : {'window' : means[window - 1], 'window_t' : means[window - 1]/stderr[window - 1], 'post' : post.mean(), 'post_t' : post.mean() / post_sem, 'beg' : b, 'end' : e, 'rho' : rho, 'tau' : tau, 'effective' : int(effective), 'days' : len(cumret), 'rows' : n}} self.plot_[model] = {'means' : means.values, 'stderr' : stderr.values, 'posterr' : posterr.values, 'car' : is_car} self.summary_.update(summary) return summary
[docs] def write_summary(self, overwrite=True) -> DataFrame: """Save event study summary to database Args: overwrite: Whether to overwrite, else append rows Returns: DataFrame of rows written """ self.table_.create(self.sql.engine, checkfirst=True) if overwrite: delete = self.table_.delete()\ .where(self.table_.c['label'] == self.label) self.sql.run(delete) summ = DataFrame.from_dict(self.summary_, orient='index')\ .reset_index()\ .rename(columns={'index': 'model'}) summ['label'] = self.label summ['created'] = self.bd.today() self.sql.load_dataframe(self.table_.key, summ) return summ
[docs] def read_summary(self, label: str = '', model: str = ''): """Load event study summary from database Args: label: Name of event to retrieve; if blank, retrieve all events model: Name of model to retrieve; if blank, retrieve all models Returns: DataFrame of rows written """ s = [f"{k}='{v}'" for k, v in [['label', label], ['model', model]] if v] where = bool(s) * ('where ' + ' and '.join(s)) q = f"SELECT * from {self.table_.key} {where}" return self.sql.read_dataframe(q)
[docs] def plot(self, model: str, drift: bool = False, ax: Any = None, loc: str = 'best', title: str = '', c: str = 'C0', vline: List[float] = [], fontsize: int = 10, hline: List[float] = [], width: float = 1.96): """Plot cumulative abnormal returns, drift and confidence bands Args: model: Name of model to plot computed results drift: Whether to start confidence bands at post-event drift start ax: Axes to plot loc: Legend location title: Main title c: Color vline: List of x-axis points to plot vertical line hline: List of y-axis points to plot horizontal line width: Number of std errs for confidence bands fontsize: Base font size """ ax = ax or plt.gca() window = self.right - self.left + 1 if not vline: vline = [self.right] if not hline: hline = [self.plot_[model]['means'][window-1] if drift else 0] p = self.plot_[model] # plottable series for model s = self.summary_[model] # summary stats for model plot_bands([0] + list(p['means']), ([0] + ([0] * (window if drift else 0)) + list(p['posterr' if drift else 'stderr'])), x=list(range(self.left-1, self.post+1)), loc=loc, hline=hline, vline=vline, title=title, c=c, width=width, fontsize=fontsize, legend=["CAR" if p['car'] else "BHAR", f"{width} stderrs"], xlabel=(f"{int(s['beg'])}-{int(s['end'])}" + f" (n={int(s['rows'])}," + f" days={int(s['days'])}," + f" eff={int(s['effective'])})"), ylabel="CAR" if p['car'] else "BHAR", ax=ax) plt.tight_layout(pad=3)
if __name__=="__main__": from conf import credentials, VERBOSE