Source code for finds.readers.taq

"""Class and methods to process TAQ trade and quotes tick data

- NYSE Daily TAQ: Master, NBBO, Trades
- marker microstructure: bid-ask spreads, trade conditions, tick test

Copyright 2022, Terence Lim

MIT License
"""
import numpy as np
import pandas as pd
from pandas import DataFrame, Series, Timestamp
import indexed_gzip as igzip
import gzip, io, pickle, time, re, os
import matplotlib.pyplot as plt
from typing import List, Any
from finds.utils.plots import plot_time
_VERBOSE = 1

[docs]def taq_from_csv(chunk: str, columns: List[str] = []) -> DataFrame: """Convert csv from TAQ to dataframe with correct dtypes Args: chunk: A chunk of csv text columns: List of column names, else in first line of chunk Returns: DataFrame with correct dtypes and column names Notes: - column names (provided or parsed from first line) indicate the corresponding known list of dtypes for nbbo, trade or mast """ _dtypes = { # define dtypes for each TAQ file 'nbbo': {np.uint64: ['Time','Sequence_Number','Participant_Timestamp', 'FINRA_ADF_Timestamp'], np.float32: ['Bid_Price','Bid_Size','Offer_Price', 'Offer_Size','Best_Bid_Price','Best_Bid_Size', 'Best_Offer_Price','Best_Offer_Size']}, 'trade': {np.uint64: ['Time','Sequence_Number','Participant_Timestamp', 'Trade_Reporting_Facility_TRF_Timestamp'], np.uint8: ['Trade_Correction_Indicator', 'Trade_Through_Exempt_Indicator'], np.float32: ['Trade_Volume','Trade_Price']}, 'mast': {np.uint8: ['TradedOnNYSEMKT','TradedOnNASDAQBX','TradedOnNSX', 'TradedOnFINRA','TradedOnISE','TradedOnEdgeA', 'TradedOnEdgeX','TradedOnCHX','TradedOnNYSE', 'TradedOnArca','TradedOnNasdaq','TradedOnCBOE', 'TradedOnPSX','TradedOnBATSY','TradedOnBATS', 'TradedOnIEX'], np.uint16: ['Unit_Of_Trade','Round_Lot', 'Specialist_Clearing_Number', 'Specialist_Post_Number'], np.uint32: ['Shares_Outstanding', 'Effective_Date'], np.uint64: ['Unit_Of_Trade','Round_Lot', 'Specialist_Clearing_Number', 'Specialist_Post_Number', 'TradedOnNYSEMKT','TradedOnNASDAQBX','TradedOnNSX', 'TradedOnFINRA','TradedOnISE','TradedOnEdgeA', 'TradedOnEdgeX','TradedOnCHX','TradedOnNYSE', 'TradedOnArca','TradedOnNasdaq','TradedOnCBOE', 'TradedOnPSX','TradedOnBATSY','TradedOnBATS', 'TradedOnIEX','Effective_Date']}} df = pd.read_csv(io.StringIO(chunk), sep='|', na_filter=False, dtype=str, header=None if columns else 0, names=columns or None) # guess file type from column names, and use its dtypes dict if 'Best_Bid_Price' in df.columns: # NBBO file dtypes = _dtypes['nbbo'] elif 'Trade_Price' in df.columns: # TRADE file dtypes = _dtypes['trade'] elif 'Round_Lot' in df.columns: # MASTER file dtypes = _dtypes['mast'] else: dtypes = {} for t in dtypes: # coerce each column to its required dtype df[dtypes[t]] = df[dtypes[t]].apply(pd.to_numeric, errors='coerce') df[dtypes[t]] = df[dtypes[t]].fillna(0).astype(t) if 'Time' in df.columns: # for nbbo and trade: set timestamp as index df.index = pd.to_datetime(df['Time'].astype(str), format='%H%M%S%f') elif 'Symbol' in df.columns: # for master: set symbol field as index df.index = df['Symbol'].values if 'END' in df.index: # terminator dummy symbol df.drop('END', inplace=True) return df
[docs]class TAQ(object): """Base class to manipulate a daily TAQ .csv.gz file Args: taq_file: raw .csv.gz input data file name index_file: name of new (csv.gz) file to write indexed-gzip index symbols_file: name of new (csv.gz) file to write symbols index Notes: - NYSE historical samples: ftp://ftp.nyxdata.com/Historical%20Data%20Samples/ - Uses indexed_gzip package for random access into gzip file - Implements 3 methods to access raw daily TAQ csv.gz files, e.g.: - trade(n) - next n csv lines - iter(trade) - iterable, by chunk with same stock symbol - trade['AAPL'] - getitem, by symbol """ def __init__(self, taq_file: str, index_file: str = '', symbols_file: str = ''): """Initalize interface to daily TAQ file""" self.taq_file = taq_file self.date = re.findall(r"[12][90]\d\d\d\d\d\d", taq_file) self.index_file = index_file self.symbols_file = symbols_file self.igz_file = None # indexed gzip stream for getitem read
[docs] def close(self): """Close getitem file handle""" if self.igz_file is not None: self.igz_file.close() self.igz_file = None
[docs] def open(self, taq_file: str = ''): """Open with context manager protocol for sequential line reads""" class File(object): """Context manager procotol to open for sequential read""" def __init__(self, filename: str, *args, **kwargs): self.gz_file = gzip.open(filename, "rt", encoding='utf-8', errors='ignore') self.header = self.gz_file.readline() def __call__(self, n: int) -> DataFrame: """Read next n lines (-1 for entire file)""" if (n < 0): chunk = self.header + self.gz_file.read() self.close() else: chunk = [self.header] for _ in range(n): line = self.gz_file.readline() if len(line) <= 0: self.close() break chunk.append(line) chunk = "".join(chunk) return taq_from_csv(chunk) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_traceback): return self.close() def close(self): return self.gz_file.close() return File(taq_file or self.taq_file)
[docs] def read(self): """Read entire file as a DataFrame""" with self.open() as f: return f(-1)
def __iter__(self): """Iterator to access next symbol's chunk of rows""" f = gzip.open(self.taq_file, "rt", encoding='utf-8', errors='ignore') header = f.readline().rstrip('\n').replace(' ','_').split('|') def line_iterator(f): try: for line in f: yield line except Exception as e: print("*** Got exception (line_iterator):", e) yield None lines = line_iterator(f) line = next(lines) # keep first line as header columns curr = line.rstrip('\n').split('|')[header.index('Symbol')] while line and not line.startswith('END'): # check end-of-file prev = curr chunk = [] while line and curr == prev: # read until new symbol chunk += [line] line = next(lines) if line: curr = line.rstrip('\n').split('|')[header.index('Symbol')] df = taq_from_csv("".join(chunk), header) df.index.name = prev yield df f.close() yield None
[docs] def __call__(self, symbol: str) -> Series | None: """Return symbol location and size in daily taq gzip file""" if self.igz_file is None: self.pos = pd.read_csv(self.symbols_file, index_col=0) if self.index_file: self.igz_file = igzip\ .IndexedGzipFile(self.taq_file, index_file=self.index_file) else: self.igz_file = igzip.IndexedGzipFile(self.taq_file) line = self.igz_file.readline().decode('latin-1') self.columns = line.rstrip('\n').replace(' ','_').split('|') if symbol in self.pos.index and self.pos.loc[symbol, 'size'] > 0: return self.pos.loc[symbol] else: return None
[docs] def __getitem__(self, symbol: str) -> DataFrame: """Get chunk of all rows for the input symbol as a data frame""" pos = self(symbol) if pos is None: return None self.igz_file.seek(pos['start']) lines = self.igz_file.read(pos['size']).decode('latin-1') df = taq_from_csv(lines, columns=self.columns) df.index.name = symbol return df
[docs] def index_symbols(self, index_file: str = '', symbols_file: str = ''): """Generate indexed_gzip and symbols index files""" def _create_index(filename, index_file): """generate and save an indexed-gzip index file (12 secs)""" tic = time.time() try: with igzip.IndexedGzipFile(filename) as f: f.build_full_index() f.export_index(index_file) # save to {index_file} print('Full Index created in %d secs' % (time.time() - tic)) except: raise Exception('TAQ _create_index failed') def _create_symbols(filename, symbols_file): """generate symbol lookup locations (~100 secs)""" tic = time.time() try: with igzip.IndexedGzipFile(filename) as f: f.seek(0) line = f.readline().decode('latin-1') header = line.rstrip('\n').split('|') # header in first line symbols = DataFrame(columns=['start','size'],dtype=int) prev = None eof = lambda line: (not line) or line.startswith('END') while not eof(line): # iterate until reached end-of-file tell = f.tell() # current location line = f.readline().decode('latin-1') # parse next line if not eof(line): curr = line.rstrip('\n')\ .split('|')[header.index('Symbol')] else: curr = None if curr != prev: # new symbol: update location if prev: symbols.loc[prev, 'size']\ = tell - symbols.loc[prev, 'start'] if curr: symbols.loc[curr] = tell, 0 prev = curr print('%d symbols: %d secs' % (len(symbols), time.time() - tic)) symbols.to_csv(symbols_file) except: raise Exception('TAQ _create_symbols failed') if index_file: self.index_file = index_file if symbols_file: self.symbols_file = symbols_file _create_index(self.taq_file, self.index_file) _create_symbols(self.taq_file, self.symbols_file)
# # tick data transformation methods # open_t = pd.to_datetime('1900-01-01T09:30') # default open 9:30am close_t = pd.to_datetime('1900-01-01T16:00') # default close 4:00pm # np.datetime64('1900-01-01T09:30:01.000002') - np.timedelta64('30','ns')
[docs]def clean_trade(df: DataFrame | None, open_t: Timestamp = open_t, close_t: Timestamp = close_t, cond: str = "MOZBTLGWJK145789") -> DataFrame | None: """Remove bad trades Args: df: Dataframe containing one day's trades of a stock open_t: Exclude records on or before this opening time close_t: Exclude records after this closing time cond: condition chars to exclude Notes: - Requires correction code = 0, price and volume > 0 - Sale Conditions to exclude by default: - M = Market Center Close Price - O = Market Center Opening Trade - Z = Sold (Out of Sequence) - L = Sold Last (Late Reporting) - B = Bunched Trade - G = Bunched Sold Trade - W = Average Price Trade - 4 = Derivatively Priced - 5 = Re-opening Prints - 7 = Qualified Contingent Trade - 8 = Placeholder for 611 Exempt - 9 = Corrected Consolidated Close Price per the Listing Market - K = Rule 127 (NYSE only) or Rule 155 Trade (NYSE MKT only) - T = Extended Hours Trade """ def any_in(keys, values): """Returns True if any key is in values""" return np.any([v in values for v in keys]) if df is None: return None f = (df['Trade_Correction_Indicator'].eq(0) & df['Trade_Price'].gt(0) & df['Trade_Volume'].gt(0) & ~df['Sale_Condition'].apply(any_in, values=cond)) if open_t: f &= (df.index > open_t) if close_t: f &= (df.index <= close_t) df = df.loc[f, ['Trade_Price','Trade_Volume', 'Sale_Condition']] return df
[docs]def clean_nbbo(df: DataFrame | None, keep: List[str] = [ 'Best_Bid_Price','Best_Bid_Size', 'Best_Offer_Price', 'Best_Offer_Size']) -> DataFrame | None: """Remove bad quotes Args: df: Dataframe containing one day's nbbo quotes of a stock keep: List of columns to keep Notes: - requires prices and size > 0 and offer > bid price - spread <= $5 - cancel correction != 'B' - condition in ['A','B','H','O','R','W']) - keep largest sequence number if same time stamp - drop duplicated records """ if df is None: return None #df['Mid_Price'] = (df['Best_Offer_Price'] + df['Best_Bid_Price']) / 2 f = (df['Quote_Cancel_Correction'].ne('B') & df['Quote_Condition'].isin(['A','B','H','O','R','W']) & df['Bid_Price'].gt(0) & df['Bid_Size'].gt(0) & df['Offer_Price'].gt(0) & df['Offer_Size'].gt(0) & df['Best_Bid_Price'].gt(0) & df['Best_Bid_Size'].gt(0) & df['Best_Offer_Price'].gt(0) & df['Best_Offer_Size'].gt(0) & (df['Best_Offer_Price'] > df['Best_Bid_Price']) & #(df['Best_Offer_Price'] - df['Best_Bid_Price'] < df['Mid_Price']*0.05) (df['Offer_Price'] - df['Bid_Price']).le(5)) # spread <= $5 df = df.loc[f, keep + ['Time','Sequence_Number']] # bad conditions, values df = df.sort_values(['Time','Sequence_Number']) # keep later of same time df = df.drop_duplicates(subset='Time', keep='last')[keep] df = df.loc[(df.shift() != df).any(axis=1)] # keep new records only return df
[docs]def align_trades(ct: DataFrame, cq: DataFrame, open_t: Timestamp = open_t, inplace: bool = False) -> DataFrame | None: """Align each trade with prevailing and forward quotes Args: ct: Input dataframe of trades cq: Input dataframe of nbbo quotes open_t: drop quotes prior to open time inplace: whether to overwrite trades dataframe or return as new copy Returns: DataFrame of trades with additional columns, if not inplace. else None Notes: - prevailing quote at -1ns, forward quote at +5m, drop quotes before open_t - See Holden and Jacobsen (2014), "Liquidity Measurement" - Prevailing\_Mid: midquote prevailing before each trade - Forward\_Mid: midquote prevailing 5 minutes after trade - Tick\_Test: Whether trade price above, below or equals previous trade """ if not inplace: ct = ct.copy() f = cq.index >= (open_t or cq.index.min()) midprice = (cq.loc[f, 'Best_Offer_Price'] + cq.loc[f, 'Best_Bid_Price']) / 2 ct['Prevailing_Mid'] = midprice.reindex(ct.index - np.timedelta64(1, 'ns'), method='pad').values ct['Forward_Mid'] = midprice.reindex(ct.index + np.timedelta64(5, 'm'), method='pad').values ct['Tick_Test'] = np.sign((ct['Trade_Price'] - ct['Trade_Price'].shift(+1))).fillna(0) if not inplace: return ct
[docs]def bin_quotes(cq: DataFrame, value: int = 15, unit: str = 'm', open_t: Timestamp = open_t, close_t: Timestamp = close_t) -> DataFrame: """Resample quotes into time interval bins Args: cq: Input dataframe of nbbo quote value: number of time units per bin width unit: time unit in {'h', 'm', 's', 'ms', 'us', 'ns'} open_t: exclusive left bound of first bin close_t: inclusive right bound of last bin Returns: DataFrame of resampled derived quote liquidity metrics Notes: - quoted: time-weighted quoted half-spread - depth: time-weighted average of average bid and offer sizes - offersize: time-weighted average offer size - bidsize: time-weighted average bid size - mid: last midquote - firstmid: first midquote - maxmid: max midquote - minmid: min midquote - retq: last midquote-to-last midquote return Examples: >>> resample(closed='left', label='right') >>> agg(Series.sum, min_count=1) >>> result[result.index > open_t], result[result.index <= close_t] """ units = {'h':'H', 'm':'min', 's':'s', 'ms':'ms', 'us':'us', 'ns':'N'} if unit not in units: raise Exception(str(unit) + ' must be in ' + str(units.keys())) rule = dict(rule=str(value)+units[unit], closed='left', label='right') # supplement cq.index with required 'intervals' to be new quote times timedelta = np.timedelta64(value, unit.lower()) intervals = np.arange(open_t, close_t + timedelta, timedelta) intervals = pd.to_datetime(sorted(set(intervals).union(cq.index))) # compute forward duration of each (supplemented) cq row q = cq.reindex(intervals, method='pad') # forward fill quotes to intervals q['weight'] = q.index.to_series()\ .diff()\ .fillna(np.timedelta64(0))\ .astype(int)\ .shift(-1, fill_value=0) / 1e9 # sum up 'weight' denominator of each interval, for weighted resampling result = DataFrame(q['weight'].resample(**rule).sum()) # to compute weighted mean, with sum requiring at least one non-null weighted = lambda s: (s * q['weight']).resample(**rule)\ .agg(Series.sum, min_count=1)\ .div(result['weight']) # compute weighted resampling result['quoted'] = weighted(q['Best_Offer_Price'] - q['Best_Bid_Price']) / 2 result['depth'] = weighted(q['Best_Offer_Size'] + q['Best_Bid_Size']) / 2 result['offersize'] = weighted(q['Best_Offer_Size']) result['bidsize'] = weighted(q['Best_Bid_Size']) mid = ((q['Best_Offer_Price'] + q['Best_Bid_Price']) / 2).resample(**rule) result['mid'] = mid.last().ffill() # fillna(method='pad') result['firstmid'] = mid.first() result['maxmid'] = mid.max() result['minmid'] = mid.min() result['retq'] = (result['mid'] / result['mid'].shift(1)) - 1 return result[(result.index > open_t) & (result.index <= close_t)]
[docs]def bin_trades(ct: DataFrame, value: int = 5, unit: str = 'm', open_t: Timestamp = open_t, close_t: Timestamp = close_t) -> DataFrame: """Resample trades into time interval bins Args: ct: Input dataframe of trades value: number of time units per bin width unit: time unit in {'h', 'm', 's', 'ms', 'us', 'ns'} open_t: exclusive left bound of first bin close_t: inclusive right bound of last bin Returns: DataFrame of resampled derived trade liquidity metrics Notes: - counts: number of trades in bin - last: last trade price in bin (ffill if none) - first: first trade price in bin - maxtrade: max trade price in bin - mintrade: min trade price in bin - ret: last-to-last trade price return - vwap: volume weighted average trade price - effective: volume-weighted effective relative half-spread (trade price divided by prevailing midquote, minus 1) - realized: volume-weighted effective relative half-spread (trade price divided by 5-minute forward midquote, minus 1) - impact: volume-weighted realized relative half-spread (5-minute forward midquote divided by prevailing, minus 1) Examples: >>> resample(closed='left', label='right') >>> agg(Series.sum, min_count=1) >>> result[result.index > open_t], result[result.index <= close_t] """ units = {'h':'H', 'm':'min', 's':'s', 'ms':'ms', 'us':'us', 'ns':'N'} if unit not in units: raise Exception(str(unit) + ' must be in ' + str(units.keys())) rule = dict(rule=str(value) + units[unit], closed='left', label='right') # hack to extend ct.index timestamps to open_t and close_t if necessary timedelta = np.timedelta64(value, unit.lower()) t = ct.loc[~ct.index.duplicated(keep='first')] t = t.to_dict(orient='index') # pd.concat warns cannot empty or all-nan if open_t < ct.index[0]: t[open_t] = dict.fromkeys(ct.columns, None) if close_t > ct.index[0]: t[close_t] = dict.fromkeys(ct.columns, None) t = DataFrame.from_dict(t, orient='index').sort_index() t.index.name = ct.index.name result = DataFrame(t['Trade_Volume']\ .resample(**rule)\ .sum())\ .rename(columns = {'Trade_Volume': 'volume'}) result['counts'] = t['Trade_Price']\ .resample(**rule).count() price = t['Trade_Price'].resample(**rule) result['last'] = price.last().ffill() # fillna(method='pad') result['first'] = price.first() result['maxtrade'] = price.max() result['mintrade'] = price.min() # df.resample('1H', how='ohlc', axis=0, fill_method='bfill') # def ohlc(x): # ohlc={ "open":x["open"][0], "high":max(x["high"]), # "low":min(x["low"]),"close":x["close"][-1]} # return pd.Series(ohlc) # df.resample('1D').apply(ohlc) result['ret'] = result['last'].div(result['last'].shift(1)) - 1 result['vwap'] = (t['Trade_Price'] * t['Trade_Volume'])\ .resample(**rule)\ .sum()\ .div(result['volume']\ .where(result['volume'] > 0, np.nan)) #.agg(Series.sum, min_count=-1)\ result['effective'] = ((t['Trade_Price'] - t['Prevailing_Mid']).abs()\ .mul(t['Trade_Volume'])\ .resample(**rule))\ .sum()\ .div(result['volume']\ .where(result['volume'] > 0, np.nan)) #.agg(Series.sum, min_count=-1)\ if 'Tick_Test' in t: # Lee and Ready test: compare to midquote, then tick test if no change leeready = np.sign(t['Trade_Price']\ .sub(t['Prevailing_Mid'])\ .fillna(0)) leeready = leeready.where(leeready.ne(0), t['Tick_Test']) # compute trade weights (for averaging) from volume and sign weighted = (lambda spread: (spread * leeready * t['Trade_Volume'])\ .resample(**rule)\ .sum()\ .div(result['volume']\ .where(result['volume'] > 0, np.nan))) #.agg(Series.sum, min_count=1)\ result['realized'] = weighted(t['Trade_Price'] - t['Forward_Mid']) result['impact'] = weighted(t['Forward_Mid'] - t['Prevailing_Mid']) return result[(result.index > open_t) & (result.index <= close_t)]
[docs]def plot_taq(left1: DataFrame, right1: DataFrame | None = None, left2: DataFrame | None = None, right2: DataFrame | None = None, num: int | None = None, title: str = '', open_t: Timestamp | None = None, close_t: Timestamp | None = None): """Convenience method for 1x2 primary/secondary-y subplots of tick data""" if left2 is None: bx = None fig, ax = plt.subplots(num=num, figsize=(10, 6), clear=True) else: fig, (ax, bx) = plt.subplots(2, 1, num=num, figsize=(10,12), clear=True, sharex=True) plot_time(left2, right2, ax=bx, xmin=open_t, xmax=close_t) plot_time(left1, right1, ax=ax, title=title, xmin=open_t, xmax=close_t) plt.tight_layout(pad=3) return ax,bx
[docs]def itertaq(trades: TAQ, quotes: TAQ, master: DataFrame, open_t: Timestamp = open_t, close_t: Timestamp = 0, cusips: List[str] = [], symbols: List[str] = [], verbose = _VERBOSE, has_shares: bool = True): """Iterates over and filters daily taq trades and quotes by symbol Args: trades: Instance of TAQ trades object quotes: Instance of TAQ nbbo quotes object master: Reference table from Master file, indexed by symbol cusips: List of cusips to select symbols: List of symbols (space separated security class) to select. open_t: Earliest Timestamp of valid trades and quotes close_t: Latest Timestamp to keep trades and quotes, inclusive verbose: Whether to echo messages for debugging has_shares: If True, require 'Shares_Outstanding' > 0 in master table """ def _print(*args, **kwargs): if verbose: print(*args, **kwargs) cusips = list(cusips) # require list type symbols = list(symbols) # require list type trade = iter(trades) while True: t = next(trade) if t is None: _print('trade is none') break symbol = t.index.name if symbol not in master.index: _print(f"{symbol} not in master") continue name = master.loc[symbol, 'Security_Description'] if symbols and symbol not in symbols: _print(f"{symbol} {name} not in symbols") continue if has_shares and not master.loc[symbol, 'Shares_Outstanding'] > 0: _print(f"{symbol} {name} has no shares") continue cusip = master.loc[symbol, 'CUSIP'] if cusips and cusip[:8] not in cusips and cusip not in cusips: _print(f"{symbol} {name} {cusip} not in cusips") continue if t is None or not len(t): _print('trades is empty') continue q = quotes[symbol] if q is None or not len(q): _print('quotes is empty') continue ct = clean_trade(t, open_t=open_t, close_t=close_t) cq = clean_nbbo(q) if not len(ct) or not len(cq): _print('ct or cq empty') continue _print(symbol, cusip, len(t), len(q), len(ct), len(cq)) align_trades(ct, cq, open_t=open_t, inplace=True) yield ct, cq, master.loc[symbol]
[docs]def opentaq(date, taqdir: str): """Helper to initialize all master dataframe, trade and quote objects""" return (TAQ(os.path.join(taqdir, f'EQY_US_ALL_REF_MASTER_{date}.gz')).read(), TAQ(os.path.join(taqdir, f'EQY_US_ALL_TRADE_{date}.gz'), os.path.join(taqdir, f'EQY_US_ALL_TRADE_{date}.gzidx'), os.path.join(taqdir, f'EQY_US_ALL_TRADE_{date}.csv.gz')), TAQ(os.path.join(taqdir, f'EQY_US_ALL_NBBO_{date}.gz'), os.path.join(taqdir, f'EQY_US_ALL_NBBO_{date}.gzidx'), os.path.join(taqdir, f'EQY_US_ALL_NBBO_{date}.csv.gz')))
if __name__ == "__main__": # test access methods import os import numpy as np import matplotlib.pyplot as plt from secret import paths # daily taq files from ftp://ftp.nyxdata.com/Historical%20Data%20Samples/ dates = [20191007, 20191008, 20180305, 20180306, 20171101] taqdir = paths['taq'] # generate input filenames for TAQ, igzip and symbols index files master_file = os.path.join(taqdir, 'EQY_US_ALL_REF_MASTER_{}.gz').format trade_file = os.path.join(taqdir, 'EQY_US_ALL_TRADE_{}.gz').format nbbo_file = os.path.join(taqdir, 'EQY_US_ALL_NBBO_{}.gz').format nbbo_index = os.path.join(taqdir, 'EQY_US_ALL_NBBO_{}.gzidx').format trade_index = os.path.join(taqdir, 'EQY_US_ALL_TRADE_{}.gzidx').format nbbo_symbols = os.path.join(taqdir, 'EQY_US_ALL_NBBO_{}.csv.gz').format trade_symbols = os.path.join(taqdir, 'EQY_US_ALL_TRADE_{}.csv.gz').format date = dates[0] master, trades, quotes = opentaq(date, taqdir) ''' # # 1. Read lines # print(master) trade = trades.open() print(trade(5)) quote = quotes.open() print(quote(5)) # # 2. Read in sequential chunks, symbol by symbol # trade = iter(trades) for _ in range(5): df = next(trade) print(f'next Trade symbol {df.index.name}: {len(df)} records') quote = iter(quotes) for _ in range(5): df = next(quote) print(f'next Quote symbol {df.index.name}: {len(df)} records') ''' # # 3. getitem by symbol (uses index_gzipped package) # symbol = 'VTI' symbol = 'AAPL' #'ZM' # symbol = 'GS' symbol = "VOO" t = trades[symbol] q = quotes[symbol] ct = clean_trade(t, close_t=close_t + np.timedelta64('5','m')) cq = clean_nbbo(q) align_trades(ct, cq, inplace=True) plot_taq(ct[['Trade_Price', 'Prevailing_Mid']].groupby(level=0).last(), ct['Trade_Volume'].groupby(level=0).last(), (cq['Best_Offer_Price'] - cq['Best_Bid_Price'])\ .rename('Quoted Spread').groupby(level=0).last(), ((cq['Best_Bid_Size'] + cq['Best_Offer_Size']) / 2)\ .rename('Depth').groupby(level=0).last(), open_t=open_t, close_t=close_t + np.timedelta64('5','m'), num=1, title=f"Tick Prices, Volume, Quotes, Spreads, and Depths ({dates[0]})" ) plt.show() raise Exception value, unit = 5, 'm' timedelta = np.timedelta64(value, unit) bt = bin_trades(ct, value, unit, close_t=close_t + timedelta) bq = bin_quotes(cq, value, unit, close_t=close_t + timedelta) bq = bq.join(bt, how = 'left') plot_taq(bq[['last', 'vwap', 'mid']], bq['quoted'], bt['volume'], bt['counts'], num=2, open_t=open_t, close_t=close_t + np.timedelta64('5','m'), title=f"{value}{unit}-bin Prices, Quotes and Trades") print(f"Correlation of MidQuote and LastTrade {value}{unit}-bin returns") bq[['ret', 'retq']].corr() m1 = np.timedelta64('5','s') d = (t.index >= close_t) & (t.index <= close_t+m1) as_print(t.loc[d])