Source code for finds.utils.pyR

"""Wrapper class over rpy2 package to interface with R environment

Deconstruct and expose an rpy2 or numpy/pandas object interchangeably.

- rpy2

Copyright 2022, Terence Lim

MIT License
"""
import numpy as np
from pandas import DataFrame, Series
from pandas.api import types
from copy import deepcopy
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import rpy2.robjects as ro
from rpy2.robjects import FloatVector, ListVector, IntVector, StrVector, NULL
from typing import List, Tuple, Any

[docs]def StrListVector(strList: ListVector | StrVector | str | List) -> StrVector | ListVector: """Convert nested list input to StrVector or ListVector""" try: assert(len(strList) > 0) # NULL, None, '', non-str scalar etc except: return NULL if isinstance(strList, ListVector): # already a ListVector return ListVector(strList) elif isinstance(strList, StrVector): # already a StrVector return StrVector(strList) elif isinstance(strList, str): # str scalar, so apply StrVector return StrVector([strList]) elif any([types.is_list_like(s) for s in strList]): # not the deepest list return ListVector([(None, StrListVector(s)) for s in strList]) else: return StrVector(list(strList)) # is deepest list(-like) of str types
def _flatten(s): """Generator returns each terminal item (by DFS) from nested list""" try: for t in s: if types.is_list_like(t): yield from _flatten(t) else: yield t except: yield s def _combine(*args): """Flatten each arg, and concat all into a flat list""" return [item for sublist in args for item in list(_flatten(sublist))]
[docs]class PyR: """Store and expose as rpy2 or numpy/pandas objects Args: item: input R object, or numpy array, or pandas DataFrame or Series names: named labels of object items Attributes: iloc: dict or numpy array - internally, objects are stored as either numpy array, or dict of objects (when input was R ListVector or DataFrame, or Python dict). - TODO: should use other safer property getters to view object in target types: e,g, .frame (pandas), .ro (RObject), or .values (python dict or ndarray) dim (tuple of int): dimensions of data objects names, rownames, colnames (StrVectors): named labels of object items Notes: - input item can be of either Python or R object type - input labels in each dimension can be explicitly provided, and should have same dim as object, as error checking is minimal - In R, matrices are column-major ordered (aka Fortran-like index order, with the first index changing fastest) although the R constructor matrix() accepts a boolean argument byrow that, when true, will build the matrix as if row-major ordered (aka C-like, which is also Python numpy default order, where the last axis index changes fastest) - A suggested convention is to append '_' to R function names and '_r' to R objects, and capitalize initial letter of PyR instances. - r['plot'] may need to explicitliy set xlab='', ylab='' - TODO: if hasattri('slots'), esp 'ts' class, e.g. nile.slots.items() Examples: >>> from rpy2.robjects import r >>> from rpy2.robjects.packages import importr >>> amen_r = importr('amen') # use R library >>> c_ = r['c'] # link R routines >>> Nodevars = PyR(r['IR90s'].rx2['nodevars']) # retrieve R data >>> Gdp = Nodevars[:, 'gdp'] # getitem subset with slice >>> topgdp = Gdp.values > sorted(Gdp.py, reverse=True)[30] # python calcs >>> Dyadvars = PyR(r['IR90s'].rx2['dyadvars']) >>> Y = Dyadvars[topgdp, topgdp, 'exports'] # getitem with boolean index >>> Y.iloc = np.log(Y.iloc + 1) # update with python calculations """ def __init__(self, item: Any, names: StrVector | ListVector | str | List[str] | None = None, verbose: int = 0): """Make instance from an input python or R (rpy2) object""" self.verbose = verbose self.dim = () # extract names, colnnames, rownames, index, columns as StrVector attr if hasattr(item, 'names'): # R attributes self.names = item.names if names is not None: self.names = StrListVector(names) if hasattr(item, 'colnames'): self.colnames = item.colnames if hasattr(item, 'rownames'): self.rownames = item.rownames if isinstance(item, (Series, DataFrame)): # Pandas attributes self.rownames = StrVector(item.index) if isinstance(item, DataFrame): self.colnames = StrVector(item.columns) if isinstance(item, Series) and item.name is not None: self.colnames = StrVector([item.name]) if isinstance(item, (ListVector, ro.vectors.DataFrame, dict)): try: # convert to dict if dict-like (i.e. ListVector, R DataFrame) names = [self.names[i] if isinstance(self.names, StrVector) else self.names[0][i] for i in range(len(item))] except: names = [k for k,v in item.items()] self.iloc = {n: PyR(v) for n, (k,v) in zip(names, item.items())} if verbose: print(f"PyR: dict (len={len(self.iloc)}){type(item)}") else: # not dict-like, so convert to numpy array and apply shape dims self.iloc = np.array(item) if hasattr(item, 'dim'): self.dim = tuple(item.dim) if len(self.dim) > 1: self.iloc = self.iloc.reshape(tuple(item.dim), order='F') self.dim = self.iloc.shape # try to infer rownames if not attribute if (not hasattr(self, 'rownames') and len(self.iloc.shape) > 1 and self.names and isinstance(self.names, ListVector)): self.rownames = self.names[0] # try to infer colnames if not attribute if (not hasattr(self, 'colnames') and len(self.iloc.shape) > 1 and self.names and isinstance(self.names, ListVector)): self.colnames = self.names[1] # try to infer colnames if verbose: print(f"PyR: ndarray {self.iloc.shape} {type(item)}") # TODO: WARNING self.names.shape (if hasattr and not null) via try # len(names)==len(dict) else len(names)=len(self.dim) or sum(self.dim) def __repr__(self): """str representation, preferabaly pretty print as pandas DataFrame""" if not isinstance(self.iloc, dict): if len(self.iloc.shape) <= 2: return str(self.frame) return str(self.iloc)
[docs] @staticmethod def savefig(filename, display=True, ax=None, figsize=(12,12)): """Save R graphics to file, or return R command, optionally imshow""" s = "dev.copy(png, '{}'); dev.off()".format(filename) if display is not None: ro.r(s) if display: if not ax: fig, ax = plt.subplots(clear=True, figsize=figsize) img = mpimg.imread(filename) ax.imshow(img, interpolation='nearest') ax.axis('off') return s
[docs] def assign(self, obj): """Directly update internal data object (must be same numpy shape)""" if isinstance(obj, dict): assert(isinstance(self.iloc, dict) and len(self.iloc)==len(obj)) self.iloc = obj else: obj = np.array(obj) assert(obj.shape == self.iloc.shape) self.iloc = obj
@property def ro(self): """Expose a view as RObject, so that can pass to R environment""" # Convert to R vector of correct data type if isinstance(self.iloc, dict): out = ListVector([(None, PyR(v).ro) for v in self.iloc]) if types.is_float_dtype(self.iloc): out = FloatVector(self.iloc.reshape(-1, order='F')) elif types.is_integer_dtype(self.iloc): out = IntVector(self.iloc.reshape(-1, order='F')) else: out = StrVector(self.iloc.reshape(-1, order='F')) if len(self.dim) > 1: # reshape to R Array if has non-trivial dim out = ro.r.array(out, dim=IntVector(self.dim)) # Collect R object name attributes if hasattr(self, 'rownames'): out.rownames = StrVector(self.rownames) if hasattr(self, 'colnames'): out.colnames = StrVector(self.colnames) if hasattr(self, 'names'): out.names = ListVector(self.names) if isinstance( self.names, ListVector) else StrVector(self.names) return out @property def frame(self): """Expose a view as pandas DataFrame""" out = DataFrame(self.values) if hasattr(self, 'names') and isinstance(self.names, StrVector): if len(self.names) == len(out.columns): out.columns = list(self.names) if len(self.names) == len(out.index): out.index = list(self.names) if hasattr(self, 'rownames') and isinstance(self.rownames, StrVector): out.index = list(self.rownames) if hasattr(self, 'colnames') and isinstance(self.colnames, StrVector): out.columns = list(self.colnames) return out @property def values(self): """Expose view as python dict (when ListVector) or ndarray (when not)""" return ({k:v.values for k,v in self.iloc.items()} if isinstance(self.iloc, dict) else self.iloc)
[docs] def __getitem__(self, args): """Returns copy of subset of data object from given slice or index""" try: if isinstance(self.iloc, dict): # return item of dict if isinstance(args, int): try: args = list(self.names).index(args) except: args = list(self.iloc.keys()).index(args) return self.iloc[args] # replace any str labels in args with its index in self.names if isinstance(args, tuple) and self.names is not None: args = tuple(self.index(a, i) for i,a in enumerate(args)) # extract corresponding subset of names if self.names: names_ = deepcopy(self.names) names = ListVector(names_) for i in range(len(self.names)): if isinstance(names_[i], StrVector): s = np.array(names_[i])[args[i]] names[i] = StrVector([s] if isinstance(s, str) else s) else: names = NULL # finally extract by looping over each dim; enables R-like indexing out = deepcopy(self.iloc) for i, arg in enumerate(args): a = [slice(None)]*len(args) a[i] = arg dims = len(out.shape) out = out[tuple(a)] if self.verbose: print(i, out.shape, dims, tuple(a)) if len(out.shape) < dims: # if this dimension is flattened out names = names[:i] + names[(i+1):] return PyR(out, names=names) except: raise Exception(f"getitem: {args}")
@property def nrow(self): """Length of first dimension, as R IntVector type""" return IntVector([self.iloc.shape[0]]) @property def ncol(self): """Length of second dimension, as R IntVector type""" return IntVector([self.iloc.shape[1]])
[docs] def index(self, s: str | List[str], axis: int = -1): """Helper method to lookup index/es of (list of) str label in names""" if isinstance(s, str): return list(self.names[axis]).index(s) elif types.is_list_like(s): return [self.index(t, axis=axis) for t in s] else: return s
#if __name__ == "__main__": if False: # replicate Ch 1 Gaussian AME of Hoff (2018) "Amen" tutorial import numpy as np import numpy.ma as ma from numpy.ma import masked_invalid as valid import rpy2.robjects as ro from rpy2.robjects.packages import importr stats_ro = importr('stats') base_ro = importr('base') amen_ro = importr('amen') utils_ro = importr('utils') matrix_ro = ro.r['matrix'] t_ro = ro.r['t'] anova_ro = ro.r['anova'] lm_ro = ro.r['lm'] ame_ro = ro.r['ame'] # default nscan=10000, odens=25 => 400 samples summary_ro = ro.r['summary'] plot_ro = ro.r['plot'] circplot_ro = ro.r['circplot'] IR90s_ro = ro.r['IR90s'] # Load GDP and exports data Nodevars = PyR(IR90s_ro.rx2['nodevars']) Gdp = Nodevars[:, 'gdp'] Dyadvars = PyR(IR90s_ro.rx2['dyadvars']) topgdp = Gdp.values > sorted(Gdp.values, reverse=True)[30] Y = Dyadvars[topgdp, topgdp, 'exports'] Y.assign(np.log(Y.values + 1)) Y[:5,:4] # Simple ANOVA to show random effects rowcountry_ro = matrix_ro(Y.rownames, Y.nrow, Y.ncol) colcountry_ro = t_ro(rowcountry_ro) formula_ro = ro.Formula("c(Y) ~ c(Rowcountry) + c(Colcountry)") formula_ro.environment['Rowcountry'] = rowcountry_ro formula_ro.environment['Colcountry'] = colcountry_ro formula_ro.environment['Y'] = Y.ro fit_anova_ro = anova_ro(lm_ro(formula_ro)) print(fit_anova_ro) # display exporter and imported effects muhat = np.nanmean(Y.ro) Ahat = PyR(Y.frame.mean(axis=1) - muhat, names=['Ahat']) print(Ahat.frame['Ahat'].sort_values(ascending=False)[:6]) Bhat = PyR(Y.frame.mean(axis=0) - muhat, names=['Bhat']) print(Bhat.frame['Bhat'].sort_values(ascending=False)[:6]) # But ignores corr of random effects, fundamental characteristic of dyads print(np.cov(Ahat.values, Bhat.values)) print(np.corrcoef(Ahat.values, Bhat.values)[0,1]) outer = Y.values - (muhat + np.add.outer(Ahat.values, Bhat.values)) print(ma.cov(valid(outer.flatten()), valid(outer.T.flatten())).data) print(ma.corrcoef(valid(outer.flatten()), valid(outer.T.flatten()))[0,1]) # Social Relations Model fit_SRM_ro = ame_ro(Y.ro, plot=False, print=False, family='nrm') Fit_SRM = PyR(fit_SRM_ro) _ = summary_ro(fit_SRM_ro) plot_ro(fit_SRM_ro) # Compare empirical and model estimates print(muhat, np.nanmean(Fit_SRM['BETA'].values)) # overall mean print(_combine(np.cov(Ahat.values, Bhat.values))[:3]) # mean covariances vcmean = Fit_SRM['VC'][:, :4].frame.mean() # posterior variance parms print(vcmean[:3]) # Residual Dyadic Correlation print(vcmean['cab'] / (np.sqrt(vcmean['va']) * np.sqrt(vcmean['vb']))) print(ma.corrcoef(valid(_combine(outer)), valid(_combine(outer.T)))[0,1]) print(np.mean(Fit_SRM['VC'][:, 3].values)) # SRRM Xn = PyR(IR90s_ro.rx2('nodevars'))[topgdp, :] Xn.iloc[:, :2] = np.log(Xn.values[:, :2]) Xd = PyR(IR90s_ro.rx2('dyadvars'))[topgdp, topgdp, np.array([0,2,3,4])] Xd.iloc[:, :, 2] = np.log(Xd.values[:, :, 2]) fit_srrm_ro = ame_ro(Y.ro, Xd=Xd.ro, Xr=Xn.ro, Xc=Xn.ro, plot=False, print=False) Fit_srrm = PyR(fit_srrm_ro) _ = summary_ro(fit_srrm_ro) plot_ro(fit_srrm_ro) gof = Fit_srrm['GOF'].frame.iloc[:1,:] # actual in first row of gof gof.loc['mean', :] = np.nanmean(Fit_srrm['GOF'].values[1:,:], axis=0) gof.loc['std', :] = np.nanstd(Fit_srrm['GOF'].values[1:,:], axis=0) print(gof) # OLS fit_rm_ro = ame_ro(Y.ro, Xd=Xd.ro, Xr=Xn.ro, Xc=Xn.ro, print=False, plot=False, rvar=False, cvar=False, dcor=False) _ = summary_ro(fit_rm_ro) plot_ro(fit_rm_ro) # SRRM with latent factor multiplicative effects fit_ame2_ro = ame_ro(Y.ro, Xd=Xd.ro, Xr=Xn.ro, Xc=Xn.ro, R=2, plot=False, print=False) Fit_ame2 = PyR(fit_ame2_ro) _ = summary_ro(fit_ame2_ro) plot_ro(fit_ame2_ro) # plots circplot_ro(Y.ro, U=fit_ame2_ro.rx2['U'], V=fit_ame2_ro.rx2['V'], row_names=Y.rownames, col_names=Y.colnames, plotnames=True, pscale=FloatVector([1.5])) if False: from rpy2.robjects.packages import importr from rpy2.robjects import Formula, Environment import rpy2.robjects as ro from rpy2.robjects import FloatVector, ListVector, IntVector, StrVector, \ NULL stats = importr('stats') base = importr('base') # Create matrix in R v = ro.FloatVector([1.1, 2.2, 3.3, 4.4, 5.5, 6.6]) m = ro.r.matrix(v, nrow = 2) m = ro.r['matrix'](v, nrow = 2) ctl = FloatVector([4.17,5.58,5.18,6.11,4.50,4.61,5.17,4.53,5.33,5.14]) trt = FloatVector([4.81,4.17,4.41,3.59,5.87,3.83,6.03,4.89,4.32,4.69]) group = base.gl(2, 10, 20, labels = ["Ctl","Trt"]) weight = ctl + trt ro.globalenv["weight"] = weight ro.globalenv["group"] = group lm_D9 = stats.lm("weight ~ group") print(stats.anova(lm_D9)) lm_D90 = stats.lm("weight ~ group - 1") print(base.summary(lm_D90)) res = ro.StrVector(['abc', 'def']) v = ro.FloatVector([1.1, 2.2, 3.3, 4.4, 5.5, 6.6]) m = ro.r['matrix'](v, nrow = 2) letters = ro.r['letters'] rcode = 'paste(%s, collapse="-")' % (letters.r_repr()) res = ro.r(rcode)