"""Financial functions
Copyright 2022, Terence Lim
MIT License
"""
import re
from datetime import datetime
from typing import Iterable, Mapping, List, Any, NamedTuple, Dict, Tuple
import numpy as np
from numpy.ma import masked_invalid as valid
from scipy.stats import norm, chi2
import pandas as pd
from pandas import DataFrame, Series
from pandas.api import types
import matplotlib.pyplot as plt
######################
#
# Mean Variance Optimization
#
######################
[docs]def gmv_portfolio(sigma: np.ndarray, mu: np.ndarray | None=None):
"""Returns position weights of global minimum variance portfolio
Args:
sigma : covariance matrix
mu : vector of expected excess (of the risk-free) returns
Returns:
Dict of weights, volatility and mean (if mu provided) of the GMV
"""
ones = np.ones((sigma.shape[0], 1))
w = la.inv(sigma).dot(ones) / ones.T.dot(la.inv(sigma)).dot(ones)
return {'weights': w, 'volatility': np.sqrt(w.T.dot(sigma).dot(w)),
'mean': None if mu is None else w.T.dot(mu)}
[docs]def efficient_portfolio(mu: np.ndarray, sigma: np.ndarray, target: float):
"""Returns weights of minimum variance portfolio that exceeds target return
Args:
sigma : covariance matrix
mu : vector of expected excess (of the risk-free) returns
target : required minimum expected return of portfolio
Returns:
Dict of weights, volatility and mean of efficient portfolio that achieves target
"""
mu = mu.flatten()
n = len(mu)
ones = np.ones((n, 1))
M = np.hstack([mu.reshape(-1, 1), ones])
B = M.T.dot(la.inv(sigma)).dot(M)
w = la.inv(sigma).dot(M).dot(la.inv(B)).dot(np.array([[target], [1]]))
return {'weights': w, 'volatility': np.sqrt(float(w.T.dot(sigma).dot(w))),
'mean': float(w.T.dot(mu))}
[docs]def tangency_portfolio(mu: np.ndarray, sigma: np.ndarray):
"""Returns weights of tangency portfolio with largest slope (sharpe ratio)
Args:
sigma : covariance matrix
mu : vector of expected excess (of the risk-free) returns
Returns:
Dict of weights, volatility and mean of the tangency portfolio
"""
mu = mu.flatten()
n = len(mu)
ones = np.ones((n, 1))
w = la.inv(sigma).dot(mu)/ones.T.dot(la.inv(sigma).dot(mu))
return {'weights': w, 'mean': float(w.T.dot(mu)),
'volatility': np.sqrt(float(w.T.dot(sigma).dot(w)))}
######################
#
# Bond Math
#
######################
[docs]def bootstrap_spot(coupon: float, spots: List[float], m: int,
price: float=1) -> float:
"""Compute spot rate to maturity of par bond from yield and sequence of spots
Args:
coupon : Annual coupon rate
spots : Simple annual spot rates each period (excl last period before maturity
m : Number of compounding periods per year
price: Present value of bond
Returns:
Simple spot interest rate till maturity
"""
if not spots: # trivial one-period bond
return coupon / price
n = len(spots) + 1 # number of coupons through maturity
# discount factors from given spot rates
discount = [(1 + spot/m)**(-(1+t)) for t, spot in enumerate(spots)]
# nominal amount of last payment
last_payment = 1 + coupon/m
# infer present value of last coupon and principal
last_pv = price - np.sum(discount) * coupon/m
# compute discount factor and annualize the effective rate
spot = ((last_payment/last_pv)**(1/n) - 1) * m
return spot
[docs]def bond_price(coupon: float, n: int, m: int, yields: float | List[float],
par: float = 1, **kwargs) -> float:
"""Compute present value of bond given spot rates or yield-to-maturity
Args:
coupon : Annual coupon rate
n : Number of remaining coupons
m : Number of compounding periods per year
yields : Simple annual yield-to-maturity or spot rates each period
par : face or par value of bond
Returns:
Present value of bond
"""
if not pd.api.types.is_list_like(yields):
yields = [yields] * n # same spot rate every period
assert len(yields) == n, "Number of spot rates must equal number of couponds"
pv = [(1 + yields[t-1]/m)**(-t) * (coupon/m + par*(t == n))
for t in range(1, n+1)] # discount every period's payment, plus last par
return np.sum(pv)
[docs]def forwards_from_spots(spots: List[float], m: int, skip: int=0) -> List[float]:
"""Compute forward rates given spot rates
Args:
spots : Sequence of simple annual spot rates
m : Number of compounding periods per year
skip: Number of initial periods skipped
Returns:
List of forward rates, excluding first period of spot rates input
"""
result = []
assert len(spots) >= 2, "Require at least two spot rates as input"
for t in range(1, len(spots)):
n = skip + t
numerator = (1 + spots[n]/m)**n # discounted value of period n
denominator = (1 + spots[n-1]/m)**(n-1) # discounter value of period n-1
result.append(((numerator / denominator) - 1) * m)
return result
[docs]def macaulay_duration(coupon: float, n: int, m: int, price: float,
yields: float | List[float], par: float = 1, **kwargs) -> float:
"""Compute macaulay duration of a bond given spot rates or yield-to-maturity
Args:
coupon : Annual coupon rate
n : Number of remaining coupons
m : Number of compounding periods per year
price : current market price of bond
yields : Simple annual yield-to-maturity or spot rates each period
par : face or par value of bond
Returns:
Macaulay duration
"""
if not pd.api.types.is_list_like(yields):
yields = [yields] * n # same spot rate every period
assert len(yields) == n, "Number of spot rates must equal number of couponds"
pv = [(1 + yields[t-1]/m)**(-t) * (t/m) * (coupon/m + par*(t == n))
for t in range(1, n+1)] # discount every period's time-weighted payment
return np.sum(pv) / price
[docs]def modified_duration(coupon: float, n: int, m: int, price: float,
yields: float | List[float], par: float = 1,
**kwargs) -> float:
"""Compute modified duration of a bond given spot rates or yield-to-maturity
Args:
coupon : Annual coupon rate
n : Number of remaining coupons
m : Number of compounding periods per year
price : current market price of bond
yields : Simple annual yield-to-maturity or spot rates each period
par : face or par value of bond
Returns:
Modified duration
"""
assert not pd.api.types.is_list_like(yields), "Not Implemented"
ytm = yields
return (macaulay_duration(coupon=coupon, n=n, m=m, price=price,
yields=yields, par=par) / (1 + ytm/2))
[docs]def modified_convexity(coupon: float, n: int, m: int, price: float,
yields: float | List[float], par: float = 1,
**kwargs) -> float:
"""Compute mocified convexity of a bond given spot rates or yield-to-maturity
Args:
coupon : Annual coupon rate
n : Number of remaining coupons
m : Number of compounding periods per year
price : current market price of bond
yields : Simple annual yield-to-maturity or spot rates each period
par : face or par value of bond
Returns:
Modified convexity
"""
assert not pd.api.types.is_list_like(yields), "Not Implemented"
ytm = yields
if not pd.api.types.is_list_like(yields):
yields = [yields] * n # same spot rate every period
assert len(yields) == n, "Number of spot rates must equal number of coupons"
pv = [(1 + yields[t-1]/m)**(-t) * ((t/m)**2 + t/(2*m)) * (coupon/m + par*(t == n))
for t in range(1, n+1)] # discount every period's time-weighted payment
return np.sum(pv) / (price * (1 + ytm/m)**2)
######################
#
# High Frequency
#
######################
[docs]def hl_vol(high: DataFrame,
low: DataFrame,
last: DataFrame = None) -> DataFrame:
"""Compute Parkinson volatility from high and low prices
Args:
high : DataFrame of high prices (observations x stocks)
low : DataFrame of low prices (observations x stocks)
last : DataFrame of last prices, for forward filling if high low missing
Returns:
Estimated volatility
"""
if last is not None:
high = high.where(high.notna(), last.shift())
low = low.where(high.notna(), last.shift())
return np.sqrt((np.log(high / low)**2).mean(axis=0, skipna=True)
/ (4 * np.log(2)))
[docs]def ohlc_vol(first: DataFrame, high: DataFrame, low: DataFrame,
last: DataFrame, ffill: bool = False,
zero_mean: bool = True) -> DataFrame:
"""Compute Garman-Klass or Rogers-Satchell (non zero mean) OHLC vol
Args:
first : DataFrame of open prices (observations x stocks)
high : DataFrame of high prices (observations x stocks)
low : DataFrame of low prices (observations x stocks)
last : DataFrame of close prices (observations x stocks)
Returns:
Estimated volatility
"""
if ffill:
last = last.ffill()
high = high.where(high.notna(), last.shift())
low = low.where(low.notna(), last.shift())
first = low.where(first.notna(), last.shift())
if zero_mean: # Garman-Klass (assuming zero mean drift)
v = ((np.log(high / low)**2) / 2
- (2*np.log(2) - 1) * (np.log(last / first)**2))\
.mean(axis=0, skipna=True)
else: # Rogers-Satchell (non zero mean drift)
v = ((np.log(high / close) * np.log(high / close))
+ (np.log(high / close) * np.log(high / close)))\
.mean(axis=0, skipna=True)
return np.sqrt(v)
######################
#
# Risk Management
#
######################
[docs]def maximum_drawdown(x: Series, is_price_level: bool = False) -> Series:
"""Compute max drawdown (max loss from previous high) period and returns
Args:
x: Returns or price level series
is_price_level: Whether input are price index levels, or returns
Returns:
Series with start and end levels, keyed by dates, of maximum drawdown
Notes:
MDD = (Trough - Peak) / Peak
"""
if is_price_level:
cumsum = np.log(x)
else:
cumsum = np.log(1 + x).cumsum()
cummax = cumsum.cummax()
end = (cummax - cumsum).idxmax()
beg = cumsum[cumsum.index <= end].idxmax()
dd = cumsum.loc[[beg, end]]
return np.exp(dd)
[docs]def parametric_risk(sigma: float | Series, alpha: float) -> Dict:
"""Calculate parametric gaussian VaR and ES
Args:
sigma : predicted volatility, or a Series
alpha : Var level (e.g. 0.95)
"""
var = -sigma * stats.norm.ppf(1 - alpha)
es = sigma * stats.norm.pdf(stats.norm.ppf(1 - alpha)) / (1 - alpha)
return dict(value_at_risk=var, expected_shortfall=es)
[docs]def historical_risk(X: Series, alpha: float):
"""Calculate historical VaR, ES, and sample moments
Args:
X : Series of observed returns
alpha : Var level (e.g. 0.95)
"""
X = X.dropna()
N = len(X)
var = -np.percentile(X, 100 * (1 - alpha))
es = -np.mean(X[X < var])
vol = np.std(X, ddof=0)
skew = stats.skew(X)
kurt = stats.kurtosis(X)
jb = stats.jarque_bera(X)[0]
jbp = stats.jarque_bera(X)[1]
return dict(N=N, value_at_risk=var, expected_shortfall=es, volatility=vol,
skewness=skew, excess_kurtosis=kurt-3, jb_statistic=jb, jb_pvalue=jbp)
[docs]def bootstrap_risk(X: Series, alpha: float, n: int) -> DataFrame:
"""Returned bootstrapped risk statistics
Args:
X : Series of observed returns
alpha : VaR level (e.g. 0.95)
n : Number of bootstrap samples
Returns:
DataFrame of n bootstrapped samples of risk statistics
"""
X = X.dropna()
N = len(X)
bootstraps = []
for _ in range(n):
Z = Series(np.random.choice(X, N), index=X.index)
bootstraps.append(historical_risk(Z, alpha=alpha))
bootstraps = DataFrame.from_records(bootstraps)
return bootstraps
[docs]def kupiec_LR(alpha: float, s: int, n: int):
"""Compute Kupiec likelihood ratio given s violations in n trials
Args:
s : number of violations
n : number of observations
alpha : VaR level (e.g. 0.95)
"""
p = 1 - alpha # prob of violation
num = np.log(1 - p)*(n - s) + np.log(p)*s
den = np.log(1 - (s/n))*(n - s) + np.log(s/n)*s
lr = -2 * (num - den)
return {'lr': lr, 'violations': s, 'N': n,
# '5%_critical': stats.chi2.ppf(0.95, df=1),
'pvalue': 1 - stats.chi2.cdf(lr, df=1)}
[docs]def kupiec(X: Series, VaR: Series, alpha: float) -> Dict:
"""Kupiec proportion of failures likelihood ratio test of VaR
Args:
X : Series of observed returns
VaR : Series of predicted VaR
alpha : VaR level (e.g. 0.95)
Returns:
Dict of likelihood statistic and pvalue
"""
Z = pd.concat([X, VaR], axis=1).dropna()
n = len(Z)
s = np.sum(Z.iloc[:, 0] < -Z.iloc[:, 1]) # number of violations < -VaR
return kupiec_LR(alpha=alpha, s=s, n=n)
# convert alpha to halflife
[docs]def halflife(alpha):
"""Returns halflife from alpha = -ln(2)/ln(lambda), where lambda=1-alpha"""
if types.is_list_like(alpha):
return [halflife(a) for a in alpha]
if 0 < alpha < 1:
return -np.log(2)/np.log(1-alpha)
else:
return np.inf if (alpha > 0) else 0
from cvxopt import matrix, solvers
[docs]def quadprog(sigma):
"""Quadratic solver for portfolio optimization"""
G = matrix(np.diag([-1.]*sigma.shape[1]))
A = matrix(np.ones((1, sigma.shape[1])))
b = matrix(np.ones((1, 1)))
h = matrix(np.zeros((sigma.shape[1], 1)))
sol = solvers.qp(P=matrix(sigma), q=h, G=G, h=h, A=A, b=b,
options=dict(show_progress=False))
x = np.array(sol['x']).ravel()
return x
if __name__=="__main__":
# Verify with Jorion Chapter 5 Solution
ytm = list(np.arange(0.0525, 0.1025, 0.0025))
spot = np.array([])
for y in ytm:
spot = np.append(spot, Interest.bootstrap_rates(y, nominal=spot, m=2))
jorion_sol5 = [.0797,.0827,.0859,.0892,.0925,.0961,.0997,.1036,.1077,.112]
assert np.allclose(jorion_sol5, spot[-len(jorion_sol5):], atol=0.0001)
print(spot)