Source code for finds.recipes.econs

"""Economics tools

- Bai and Ng (2002), McCracken and Ng (2015, 2020) factors-EM algorithm

Copyright 2022, Terence Lim

MIT License
"""
import re
from datetime import datetime
import numpy as np
from numpy.ma import masked_invalid as valid
import pandas as pd
from pandas import DataFrame, Series
from pandas.api import types
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import adfuller, acf, pacf
from typing import Iterable, Mapping, List, Any, NamedTuple, Dict, Tuple
_VERBOSE = 1

[docs]def mrsq(X: DataFrame, kmax: int) -> DataFrame: """Return marginal R2 of each variable from incrementally adding factors Args: X: T observations/samples in rows, N variables/features in columns kmax: maximum number of factors. If 0, set to rank from SVD Returns: DataFrame with marginal R2 with component in each column Notes: From matlab code, Bai and Ng (2002) and McCracken at https://research.stlouisfed.org/econ/mccracken/fred-databases/ """ # pca.components_[i,:] is vT[i, :] # pca.explained_variance_ is s**2/(T-1) # y = pca.transform(x) # y = s * u: T x n "projection" # beta = np.diag(pca.singular_values_) @ pca.components_ # "loadings" # x.T @ x = beta.T @ beta is covariance matrix Z = (X - X.mean()) / X.std(ddof=0) u, s, vT = np.linalg.svd(Z, full_matrices=False) mrsq_ = pd.concat([np.mean((u[:,k-1:k] @ u[:,k-1:k].T @ Z)**2, axis=0) for k in np.arange(1, (kmax or len(s)) + 1)], axis=1) return mrsq_.div(np.mean((u @ u.T @ Z)**2, axis=0), axis=0)
[docs]def select_baing(X: DataFrame, kmax: int = 0, p: int = 2) -> int: """Determine number of factors based on Bai & Ng (2002) info criterion Args: X: T observations/samples in rows, N variables/features in columns p: int in [1, 2, 3] to use PCp1 or PCp2 or PCp3 penalty kmax: Maximum number of factors. If 0, set to rank from SVD Returns: best number of factors based on ICp{p} criterion, or 0 if not determined Notes: - Simplified the calculation of residual variance from adding components: is just the eigenvalues, no need to compute projections - The IC curve appears to have multiple minimums: the first "local" minimum is selected -- may be want prior bound on number of factors. """ assert p in [1, 2, 3], "p must be 1, 2 or 3" Z = ((X - X.mean()) / X.std(ddof=0)).to_numpy() T, N = Z.shape NT = N * T NT1 = N + T GCT = min(N, T) CT = [np.log(NT/NT1) * (NT1/NT), (NT1/NT) * np.log(GCT), np.log(GCT) / GCT] CT = [i * CT[p-1] for i in range(GCT)] u, s, vT = np.linalg.svd(Z, full_matrices=False) eigval = s**2 residual_variance = np.roll(np.sum(eigval) - eigval.cumsum(), 1) residual_variance[0] = sum(eigval) sigma = residual_variance / sum(eigval) ic = (np.log(sigma + 1e-12) + CT)[:(kmax or GCT)] local = np.where(ic[:-1] < ic[1:])[0] # find local minimum return local[0] if len(local) else len(ic)
[docs]def approximate_factors(X: DataFrame, kmax: int = 0, p: int = 2, max_iter: int = 50, tol: float = 1e-12, verbose: int = _VERBOSE) -> DataFrame: """Fill in missing values with factor model EM algorithm Bai and Ng (2002) Args: X: T observations/samples in rows, N variables/features in columns kmax: Maximum number of factors. If 0, set to rank from SVD minus 1 p: If 0, number of factors is fixed as kmax. Else picks one of three information criterion methods in Bai & Ng (2002) to auto-select Returns: DataFrame with missing values imputed with factor EM algorithm """ Z = X.copy() # passed by reference Y = np.isnan(Z) # missing entries assert(not np.any(np.all(Y, axis=1))) # no row can be all missing assert(not np.any(np.all(Y, axis=0))) # no column can be all missing # identify cols with missing values, and initially fill with column mean missing_cols = Z.isnull().sum().to_numpy().nonzero()[0] for col in missing_cols: Z.iloc[Y.iloc[:, col], col] = Z.iloc[:, col].mean() for n_iter in range(max_iter): old_Z = Z.copy() mean = Z.mean() std = Z.std() Z = (Z - mean) / std # standardize the data # "M" step: estimate factors u, s, vT = np.linalg.svd(Z) # auto-select number of factors if p>0 else fix number of factors if p: r = select_baing(Z, p=p, kmax=kmax or len(s) - 1) else: r = kmax or len(s) - 1 # "E" step: update missing entries E = u[:, :r] @ np.diag(s[:r]) @ vT[:r, :] for col in missing_cols: Z.iloc[Y.iloc[:, col], col] = E[Y.iloc[:, col], col] Z = (Z * std) + mean # undo standardization delta = (np.linalg.norm(Z - old_Z) / np.linalg.norm(Z))**2 if verbose: print(f"{n_iter:4d} {delta:8.3g} {r}") if delta < tol: # diff**2/prev**2 break return Z
[docs]def fillna_em(X: np.ndarray, add_intercept: bool = True, tol: float = 1e-12, maxiter: int = 200, verbose: int = 1) -> Tuple[np.ndarray, DataFrame]: """Fill missing data with EM Normal distribution""" if add_intercept: X = np.hstack((np.ones((X.shape[0], 1)), X)) missing = np.isnan(X) # identify missing entries assert(not np.any(np.all(missing, axis=1))) # no row all missing assert(not np.any(np.all(missing, axis=0))) # no column all missing cols = np.flatnonzero(np.any(missing, axis=0)) # columns with missing results = [] for niter in range(maxiter+1): if not niter: # Initially, just replace with column means for col in cols: X[missing[:, col], col] = np.nanmean(X[:, col]) else: XX = X.T @ X inv_XX = inv(XX) for col in cols: # E, M step for each column with missing values # "M" step: estimate covariance matrix mask = np.ones(X.shape[1], dtype=bool) mask[col] = 0 # x = np.delete(X, (col), axis=1) if False: #xx = np.delete(np.delete(XX, (col), axis=0), (col), axis=1) M = inv(XX[:, mask][mask, :]) @ X[:, mask].T @ X[:, col] else: M = -inv_XX[mask, col] / inv_XX[col, col] # "E" step: update expected missing values # y = X[:, mask] @ M X[missing[:, col], col] = X[missing[:, col],:][:, mask] @ M x = X[:, add_intercept:] # record the current NLL nll = -sum(multivariate_normal.logpdf(x, mean=np.mean(x, axis=0), cov=np.cov(x.T, bias=True), allow_singular=True)) if verbose: print(f"{niter} {nll:.6f}") if niter and prev_nll - nll < tol: break prev_nll = nll return x
###################### # # Econometrics # ######################
[docs]def integration_order(df: Series, verbose: bool = True, max_order: int = 5, pvalue: float = 0.05, lags: str | int = 'AIC') -> int: """Returns order of integration by iteratively testing for unit root Args: df: Input Series verbose: Whether to display results max_order: maximum number of orders to test pvalue: Required p-value to reject Dickey-Fuller unit root lags: Method automatically determine lag length, or maxlag; in {"AIC", "BIC", "t-stat"}, int (maxlag), 0 (12*(nobs/100)^{1/4}) Returns: Integration order, or -1 if max_order exceeded """ if not verbose: print("Augmented Dickey-Fuller unit root test:") for i in range(max_order): if not lags: # test with default maxlag dftest = adfuller(df, maxlag=None, autolag=None) elif isinstance(lags, str): # test using IC-determined lag length dftest = adfuller(df, autolag=lags) else: # test with given maximum number of lags dftest = adfuller(df, autolag=None, maxlag=lags) if not verbose: results = Series(dftest[0:4], index=['Test Statistic', 'p-value', 'Lags Used', 'Obs Used'], name=f"I({i})") for k,v in dftest[4].items(): results[f"Critical Value ({k})"] = v print(results.to_frame().T.to_string()) if dftest[1] < pvalue: # reject null that is a unit root return i df = df.diff().dropna() return -1
[docs]def least_squares(data: DataFrame, y: List[str] = ['y'], x: List[str] = ['x'], add_constant: bool = True, stdres: bool = False) -> Series | DataFrame: """To compute least square coefficients, supports groupby().apply Args: data: DataFrame with x and y series in columns x: List of x columns y: List of y columns add_constant: Whether to add intercept as first column stdres: Whether to output residual stdev Returns: Series (if only single y input) of regression coefficients, or DataFrame (multiple y) with coeffs, and optionally stdres, in columns """ X = data[x].to_numpy() Y = data[y].to_numpy() if add_constant: X = np.hstack([np.ones((X.shape[0], 1)), X]) x = ['_intercept'] + x b = np.dot(np.linalg.inv(np.dot(X.T, X)), np.dot(X.T, Y)).T if stdres: b = np.hstack([b, np.std(Y-(X @ b.T), axis=0).reshape(-1,1)]) x = x + ['_stdres'] return (DataFrame(b, columns=x, index=y) if len(b) > 1 else Series(b[0], x)) # return as Series for groupby.apply
[docs]def fstats(x: Series | np.ndarray, tail: float = 0.15) -> np.ndarray: """Helper to compute F-stats at all candidate break points Args: x: Input Series tail: Tail fractions to skip computations Returns: Array of f-stats at each candidate break-point """ n = len(x) rse = np.array(np.var(x, ddof=0)) sse = np.ones(n) * rse for i in range(int(n * tail), int((1-tail) * n)+1): sse[i] = (np.var(x[:i], ddof=0)*i + np.var(x[i:], ddof=0)*(n-i))/n return ((n-2)/2) * (rse - sse)/rse
# # TODO: remove lm() # from collections import namedtuple def _lm(x: np.ndarray | DataFrame | Series, y: np.ndarray | DataFrame | Series, add_constant: bool = True, flatten: bool = True) -> NamedTuple: """Calculate linear multiple regression model results as namedtuple Args: x: RHS independent variables y: LHS dependent variables add_constant: Whether to hstack 'Intercept' column before x variables flatten: Whether to flatten fitted and residuals series Returns: LinearModel named tuple, with key and values - coefficients: estimated linear regression coefficients - fitted: fitted y values - residuals: fitted - actual y values - rsq: R-squared - rvalue: square root of r-squared - stderr: std dev of residuals """ def f(a): """helper to optionally flatten 1D array""" if not flatten or not isinstance(a, np.ndarray): return a if len(a.shape) == 1 or a.shape[1] == 1: return float(a) if a.shape[0] == 1 else a.flatten() return a.flatten() if a.shape[0] == 1 else a X = np.array(x) Y = np.array(y) if len(X.shape) == 1 or X.shape[0]==1: X = X.reshape((-1,1)) if len(Y.shape) == 1 or Y.shape[0]==1: Y = Y.reshape((-1,1)) if add_constant: X = np.hstack([np.ones((X.shape[0], 1)), X]) b = np.dot(np.linalg.inv(np.dot(X.T, X)), np.dot(X.T, Y)) out = {'coefficients': f(b)} out['fitted'] = f(X @ b) out['residuals'] = f(Y) - out['fitted'] out['rsq'] = f(np.var(out['fitted'], axis=0)) / f(np.var(Y, axis=0)) out['rvalue'] = f(np.sqrt(out['rsq'])) out['stderr'] = f(np.std(out['residuals'], axis=0)) return namedtuple('LinearModel', out.keys())(**out)