Source code for ipca.ipca

from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.model_selection import GroupKFold
from sklearn.linear_model import ElasticNet
from sklearn.metrics import r2_score
from joblib import Parallel, delayed
from numba import jit
import pandas as pd
import numpy as np
import scipy as sp
import progressbar
import warnings
import time

[docs]class InstrumentedPCA(BaseEstimator): """ This class implements the IPCA algorithm by Kelly, Pruitt, Su (2017). Parameters ---------- n_factors : int, default=1 The total number of factors to estimate. Note, the number of estimated factors is automatically reduced by the number of pre-specified factors. For example, if n_factors = 2 and one pre-specified factor is passed, then InstrumentedPCA will estimate one factor estimated in addition to the pre-specified factor. intercept : boolean, default=False Determines whether the model is estimated with or without an intercept max_iter : int, default=10000 Maximum number of alternating least squares updates before the estimation is stopped iter_tol : float, default=10e-6 Tolerance threshold for stopping the alternating least squares procedure alpha : scalar Regularizing constant for Gamma estimation. If this is set to zero then the estimation defaults to non-regularized. l1_ratio : scalar Ratio of l1 and l2 penalties for elastic net Gamma fit. n_jobs : scalar number of jobs for F step estimation in ALS, if set to one no parallelization is done backend : str label for Joblib backend used for F step in ALS """ def __init__(self, n_factors=1, intercept=False, max_iter=10000, iter_tol=10e-6, alpha=0., l1_ratio=1., n_jobs=1, backend="loky"): # paranoid parameter checking to make it easier for users to know when # they have gone awry and to make it safe to assume some variables can # only have certain settings if not isinstance(n_factors, int) or n_factors < 1: raise ValueError('n_factors must be an int greater / equal 1.') if not isinstance(intercept, bool): raise NotImplementedError('intercept must be boolean') if not isinstance(iter_tol, float) or iter_tol >= 1: raise ValueError('Iteration tolerance must be smaller than 1.') if l1_ratio > 1. or l1_ratio < 0.: raise ValueError("l1_ratio must be between 0 and 1") if alpha < 0.: raise ValueError("alpha must be greater than or equal to 0") # Save parameters to the object params = locals() for k, v in params.items(): if k != 'self': setattr(self, k, v)
[docs] def fit(self, X, y, indices=None, PSF=None, Gamma=None, Factors=None, data_type="portfolio", label_ind=False, **kwargs): """ Fits the regressor to the data using an alternating least squares scheme. Parameters ---------- X : numpy array or pandas DataFrame matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. If given as a DataFrame, we assume that it contains a MutliIndex mapping to each entity-time pair y : numpy array or pandas Series dependent variable where indices correspond to those in X If given as a Series, we assume that it contains a MutliIndex mapping to each entity-time pair indices : numpy array, optional array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. PSF : numpy array, optional Set of pre-specified factors as matrix of dimension (M, T) Gamma : numpy array, optional If provided, starting values for Gamma (see Notes) Factors : numpy array If provided, starting values for Factors (see Notes) data_type : str label for data-type used for ALS estimation, one of the following: 1. panel ALS uses the untransformed X and y for the estimation. This is currently marginally slower than the portfolio estimation but is necessary when performing regularized estimation (alpha > 0). 2. portfolio ALS uses a matrix of characteristic weighted portfolios (Q) as well as a matrix of weights (W) and count of non-missing observations for each time period (val_obs) for the estimation. See _build_portfolio for details on how these variables are formed from the initial X and y. Currently, the bootstrap procedure is only implemented in terms of the portfolio data_type. Returns ------- self Notes ----- Updates InstrumentedPCA instances to include param estimates: Gamma : numpy array Array with dimensions (L, n_factors) containing the mapping between characteristics and factors loadings. If there are M many pre-specified factors in the model then the matrix returned is of dimension (L, (n_factors+M)). If an intercept is included in the model, its loadings are returned in the last column of Gamma. Factors : numpy array Array with dimensions (n_factors, T) containing the estimated factors. If pre-specified factors were passed the returned array is of dimension ((n_factors - M), T), corresponding to the n_factors - M many factors estimated on top of the pre-specified ones. """ # handle input X, y, indices, metad = _prep_input(X, y, indices) N, L, T = metad["N"], metad["L"], metad["T"] # set data_type to panel if doing regularized estimation if self.alpha > 0.: data_type = "panel" # Handle pre-specified factors if PSF is not None: if np.size(PSF, axis=1) != np.size(metad["dates"]): raise ValueError("""Number of PSF observations must match number of unique dates""") self.has_PSF = True else: self.has_PSF = False if self.has_PSF: if np.size(PSF, axis=0) == self.n_factors: print("""Note: The number of factors (n_factors) to be estimated matches the number of pre-specified factors. No additional factors will be estimated. To estimate additional factors increase n_factors.""") # Treating intercept as if was a prespecified factor if self.intercept: self.n_factors_eff = self.n_factors + 1 if PSF is not None: PSF = np.concatenate((PSF, np.ones((1, T))), axis=0) elif PSF is None: PSF = np.ones((1, T)) else: self.n_factors_eff = self.n_factors # Check that enough features provided if np.size(X, axis=1) < self.n_factors_eff: raise ValueError("""The number of factors requested exceeds number of features""") # Determine fit case - if intercept or PSF or both use PSFcase fitting # Note PSFcase in contrast to has_PSF is only indicating # that the IPCA fitting is carried out as if PSF were passed even if # only an intercept was passed. self.PSFcase = True if self.has_PSF or self.intercept else False # store data self.X, self.y, self.indices, self.PSF = X, y, indices, PSF Q, W, val_obs = _build_portfolio(X, y, indices, metad) self.Q, self.W, self.val_obs = Q, W, val_obs self.metad = metad # Run IPCA Gamma, Factors = self._fit_ipca(X=X, y=y, indices=indices, Q=Q, W=W, val_obs=val_obs, PSF=PSF, Gamma=Gamma, Factors=Factors, data_type=data_type, **kwargs) # Store estimates if self.PSFcase: date_ln = len(metad["dates"]) if self.intercept and self.has_PSF: PSF = np.concatenate((PSF, np.ones((1, date_ln))), axis=0) elif self.intercept: PSF = np.ones((1, date_ln)) if Factors is not None: Factors = np.concatenate((Factors, PSF), axis=0) else: Factors = PSF self.Gamma, self.Factors = Gamma, Factors return self
[docs] def get_factors(self, label_ind=False): """returns a tuple containing Gamma and Factors Parameters ---------- label_ind : bool if provided we return the factors as pandas DataFrames with index info applied Returns ------- tuple containing Gamma and Factors """ Gamma, Factors = self.Gamma.copy(), self.Factors.copy() if label_ind: Gamma = pd.DataFrame(Gamma, index=self.metad["chars"]) Factors = pd.DataFrame(Factors, columns=self.metad["dates"]) return Gamma, Factors
[docs] def fit_path(self, X, y, indices=None, PSF=None, alpha_l=None, n_splits=10, split_method=GroupKFold, n_jobs=1, backend="loky", **kwargs): """Fit a path of elastic net fits for various regularizing constants Parameters ---------- X : numpy array or pandas DataFrame matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. If given as a DataFrame, we assume that it contains a mutliindex mapping to each entity-time pair y : numpy array or pandas Series dependent variable where indices correspond to those in X If given as a Series, we assume that it contains a mutliindex mapping to each entity-time pair indices : numpy array, optional array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. PSF : numpy array, optional Set of pre-specified factors as matrix of dimension (M, T) alpha_l : iterable, optional list of regularizing constants to use for path n_splits : scalar number of CV partitions split_method : sklearn cross-validation generator factory method to generate CV partitions n_jobs : scalar number of jobs for parallel CV estimation backend : str label for joblib backend Returns ------- cvmse : numpy matrix array of dim (P x (C + 1)) where P is the number of reg constants and C is the number of CV partitions """ # handle input X, y, indices, metad = _prep_input(X, y, indices) # handle data type, since we are doing regularized estimation # only the panel fit makes sense here if "data_type" in kwargs: data_type = kwargs.pop("data_type") else: data_type = "panel" if data_type == "portfolio": raise ValueError("Unsupported data_type for fit_path: \ 'portfolio'. Regularized estimation is only \ implemented for 'panel' data_type currently") # init alphas if alpha_l is None: alpha_l = np.linspace(0.0, 1., 100) # run cross-validation if n_jobs > 1: cvmse = Parallel(n_jobs=n_jobs, backend=backend)( delayed(_fit_cv)( self, X, y, indices, PSF, n_splits, split_method, alpha, data_type=data_type, **kwargs) for alpha in alpha_l) else: cvmse = [_fit_cv(self, X, y, indices, PSF, n_splits, split_method, alpha, data_type=data_type, **kwargs) for alpha in alpha_l] cvmse = np.stack(cvmse) cvmse = np.hstack((alpha_l[:,None], cvmse)) return cvmse
[docs] def predict(self, X=None, indices=None, W=None, mean_factor=False, data_type="panel", label_ind=False): """wrapper around different data type predict methods Parameters ---------- X : numpy array or pandas DataFrame, optional matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. If given as a DataFrame, we assume that it contains a mutliindex mapping to each entity-time pair If None we use the values associated with the current model indices : numpy array, optional array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. If None we use the values associated with the current model W : numpy array, optional portfolio weight matrix of dimension (L, L, T) If None, we use the values associated with the current model mean_factor: boolean If true, the estimated factors are averaged in the time-series before prediction. data_type : str label for data-type used for prediction, one of the following: 1. panel Uses the untransformed X and y for the estimation. 2. portfolio Uses a matrix of characteristic weighted portfolios (Q) as well as a matrix of weights (W) and count of non-missing observations for each time period (val_obs) for the estimation. See _build_portfolio for details on how these variables are formed from the initial X and y. label_ind : bool whether to apply the indices to fitted values and return pandas Series Returns ------- numpy array or pandas DataFrame/Series The exact value returned depends on two things: 1. The data_type a. If panel data_type is specified, this will be a series of values for the panel ys b. If portfolio data_type is specified, this will a matrix of predicted char formed portfolio Qs 2. label_ind If label_ind is True, we return pandas variants of the predicted values. If not, we return the underlying numpy arrays. """ if data_type == "panel": if X is None: X, indices, metad = self.X, self.indices, self.metad else: X, y, indices, metad = _prep_input(X, None, indices) N, L, T = metad["N"], metad["L"], metad["T"] pred = self.predict_panel(X, indices, T, mean_factor) if label_ind: pred = pd.DataFrame(pred, columns=["yhat"]) ind = pd.DataFrame(indices, columns=["ids", "dates"]) pred = pd.concat([ind, pred]).set_index(["ids", "dates"]) elif data_type == "portfolio": if W is not None: L = W.shape[0] T = W.shape[2] elif X is not None: X, y, indices, metad = _prep_input(X, None, indices) Q, W, val_obs = _build_portfolio(X, None, indices, metad) elif hasattr(self, "W"): W = self.W L = W.shape[0] T = W.shape[2] else: X, indices, metad = self.X, self.indices, self.metad N, L, T = metad["N"], metad["L"], metad["T"] Q, W, val_obs = _build_portfolio(X, None, indices, metad) pred = self.predict_portfolio(W, L, T, mean_factor) if label_ind: pred = pd.DataFrame(pred, index=metad["chars"], columns=metad["dates"]) else: raise ValueError("Unsupported data_type: %s" % data_type) return pred
[docs] def predict_panel(self, X, indices, T, mean_factor=False): """ Predicts fitted values for a previously fitted regressor + panel data Parameters ---------- X : numpy array matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. indices : numpy array array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. T : scalar number of time periods in X mean_factor: boolean If true, the estimated factors are averaged in the time-series before prediction. Returns ------- ypred : numpy array The length of the returned array matches the length of data. A nan will be returned if there is missing chars information. """ mean_Factors = np.mean(self.Factors, axis=1).reshape((-1, 1)) if mean_factor: ypred = np.squeeze(\ .dot(mean_Factors)) elif T != self.Factors.shape[1]: raise ValueError("If mean_factor isn't used date shape must align\ with Factors shape") else: ypred = np.full((X.shape[0]), np.nan) for t in range(T): ix = (indices[:, 1] == t) ypred[ix] = np.squeeze(X[ix, :].dot(self.Gamma)\ .dot(self.Factors[:, t])) return ypred
[docs] def predict_portfolio(self, W, L, T, mean_factor=False): """ Predicts fitted values for a previously fitted regressor + portfolios Parameters ---------- W : numpy array portfolio weight matrix of dimension (L, L, T) L : scalar number of characteristics T : scalar number of time periods mean_factor: boolean If true, the estimated factors are averaged in the time-series before prediction. Returns ------- Qpred : numpy array Same dimensions as a char formed portfolios (Q) """ # Compute goodness of fit measures, portfolio level Num_tot, Denom_tot, Num_pred, Denom_pred = 0, 0, 0, 0 if not mean_factor and T != self.Factors.shape[1]: raise ValueError("If mean_factor isn't used date shape must align\ with Factors shape") if mean_factor: mean_Factors = np.mean(self.Factors, axis=1).reshape((-1, 1)) Qpred = np.full((L, T), np.nan) for t in range(T): if mean_factor: qpred = W[:, :, t].dot(self.Gamma).dot(mean_Factors) qpred = np.squeeze(qpred) Qpred[:,t] = qpred else: qpred = W[:, :, t].dot(self.Gamma)\ .dot(self.Factors[:, t]) Qpred[:,t] = qpred return Qpred
[docs] def score(self, X, y=None, indices=None, mean_factor=False, data_type="panel"): """generate R^2 Parameters ---------- X : numpy array or pandas DataFrame matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. If given as a DataFrame, we assume that it contains a mutliindex mapping to each entity-time pair If None we use the values associated with the current model y : numpy array or pandas Series, optional dependent variable where indices correspond to those in X If given as a Series, we assume that it contains a mutliindex mapping to each entity-time pair indices : numpy array, optional array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. If None we use the values associated with the current model mean_factor: boolean If true, the estimated factors are averaged in the time-series before prediction. data_type : str label for data-type used for prediction, one of the following: 1. panel Uses the untransformed X and y for the estimation. 2. portfolio Uses a matrix of characteristic weighted portfolios (Q) as well as a matrix of weights (W) and count of non-missing observations for each time period (val_obs) for the estimation. See _build_portfolio for details on how these variables are formed from the initial X and y. label_ind : bool whether to apply the indices to fitted values and return pandas Series Returns ------- r2 : scalar summary of model performance """ if data_type == "panel": X, y, indices, metad = _prep_input(X, y, indices) if y is None: y = self.y yhat = self.predict(X=X, indices=indices, mean_factor=mean_factor, data_type="panel") return r2_score(y, yhat) elif data_type == "portfolio": X, y, indices, metad = _prep_input(X, y, indices) if y is None: y = self.y Q, W, val_obs = _build_portfolio(X, y, indices, metad) Qhat = self.predict(W=W, mean_factor=mean_factor, data_type="portfolio") return r2_score(Q, Qhat) else: return ValueError("Unsupported data_type: %s" % data_type)
[docs] def BS_Walpha(self, ndraws=1000, n_jobs=1, backend='loky'): """ Bootstrap inference on the hypothesis Gamma_alpha = 0 Parameters ---------- ndraws : integer, default=1000 Number of bootstrap draws and re-estimations to be performed backend : optional Value is either 'loky' or 'multiprocessing' n_jobs : integer Number of workers to be used. If -1, all available workers are used. Returns ------- pval : float P-value from the hypothesis test H0: Gamma_alpha=0 """ if self.alpha > 0.: raise ValueError("Bootstrap currently not supported for\ regularized estimation.") if not self.intercept: raise ValueError('Need to fit model with intercept first.') # fail if model isn't estimated if not hasattr(self, "Q"): raise ValueError("Bootstrap can only be run on fitted model.") N, L, T = self.metad["N"], self.metad["L"], self.metad["T"] # Compute Walpha Walpha = self.Gamma[:, -1][:, -1]) # Compute residuals d = np.full((L, T), np.nan) for t_i, t in enumerate(self.metad["dates"]): d[:, t_i] = self.Q[:, t_i]-self.W[:, :, t_i].dot(self.Gamma)\ .dot(self.Factors[:, t_i]) print("Starting Bootstrap...") Walpha_b = Parallel(n_jobs=n_jobs, backend=backend, verbose=10)( delayed(_BS_Walpha_sub)(self, n, d) for n in range(ndraws)) print("Done!") # print(Walpha_b, Walpha) pval = np.sum(Walpha_b > Walpha)/ndraws return pval
[docs] def BS_Wbeta(self, l, ndraws=1000, n_jobs=1, backend='loky'): """ Test of instrument significance. Bootstrap inference on the hypothesis l-th column of Gamma_beta = 0. Parameters ---------- l : integer Position of the characteristics for which the bootstrap is to be carried out. For example, if there are 10 characteristics, l is in the range 0 to 9 (left-/right-inclusive). ndraws : integer, default=1000 Number of bootstrap draws and re-estimations to be performed n_jobs : integer Number of workers to be used for multiprocessing. If -1, all available Workers are used. backend : optional Returns ------- pval : float P-value from the hypothesis test H0: Gamma_alpha=0 """ if self.alpha > 0.: raise ValueError("Bootstrap currently not supported for\ regularized estimation.") if self.PSFcase: raise ValueError('Need to fit model without intercept first.') # fail if model isn't estimated if not hasattr(self, "Q"): raise ValueError("Bootstrap can only be run on fitted model.") N, L, T = self.metad["N"], self.metad["L"], self.metad["T"] # Compute Wbeta_l if l-th characteristics is set to zero Wbeta_l = np.sum(np.square(self.Gamma[l, :])) # Compute residuals d = np.full((L, T), np.nan) for t_i, t in enumerate(self.metad["dates"]): d[:, t_i] = self.Q[:, t_i]-self.W[:, :, t_i].dot(self.Gamma)\ .dot(self.Factors[:, t_i]) print("Starting Bootstrap...") Wbeta_l_b = Parallel(n_jobs=n_jobs, backend=backend, verbose=10)( delayed(_BS_Wbeta_sub)(self, n, d, l) for n in range(ndraws)) print("Done!") pval = np.sum(Wbeta_l_b > Wbeta_l)/ndraws return pval
[docs] def BS_Wdelta(self, ndraws=1000, n_jobs=1, backend='loky'): """ Test of PSF significance. Bootstrap inference on the hypothesis Gamma_delta = 0. Assumes that only one PSF is used and no intercept is in use. Parameters ---------- ndraws : integer, default=1000 Number of bootstrap draws and re-estimations to be performed n_jobs : integer Number of workers to be used for multiprocessing. If -1, all available Workers are used. backend : optional Returns ------- pval : float P-value from the hypothesis test H0: Gamma_delta=0 """ if self.alpha > 0.: raise ValueError("Bootstrap currently not supported for\ regularized estimation.") if self.intercept: raise ValueError('Need to fit model without intercept first.') if not self.PSFcase: raise ValueError('Need to fit model with one PSF first.') K_PSF, _ = np.shape(self.PSF) if K_PSF > 1: raise ValueError('Not implemented for more than one PSF yet.') # fail if model isn't estimated if not hasattr(self, "Q"): raise ValueError("Bootstrap can only be run on fitted model.") N, L, T = self.metad["N"], self.metad["L"], self.metad["T"] # Compute Wdelta if Gamma_delta is set to zero Wdelta = self.Gamma[:, -1][:, -1]) # Compute residuals d = np.full((L, T), np.nan) for t_i, t in enumerate(self.metad["dates"]): d[:, t_i] = self.Q[:, t_i]-self.W[:, :, t_i].dot(self.Gamma)\ .dot(self.Factors[:, t_i]) print("Starting Bootstrap...") Wdelta_b = Parallel(n_jobs=n_jobs, backend=backend, verbose=10)( delayed(_BS_Wdelta_sub)(self, n, d) for n in range(ndraws)) print("Done!") #print(Wdelta_b, Wdelta) pval = np.sum(Wdelta_b > Wdelta)/ndraws return pval
[docs] def predictOOS(self, X=None, y=None, indices=None, mean_factor=False): """ Predicts time t+1 observation using an out-of-sample design. Parameters ---------- X : numpy array X of stacked data. Each row corresponds to an observation (i,t) where i denotes the entity index and t denotes the time index. All data must correspond to time t, i.e. all observations occur on the same date. If an observation contains missing data NaN will be returned. Note that the number of characteristics (L) passed, has to match the number of characteristics used when fitting the regressor. The columns of the panel are organized in the following order: - Column 1: entity id (i) - Column 2: time index (t) - Column 3 to column 3+L: characteristics. y : numpy array dependent variable where indices correspond to those in X indices : numpy array array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. mean_factor: boolean If true, the estimated factors are averaged in the time-series before prediction. Returns ------- ypred : numpy array The length of the returned array matches the the length of data. A nan will be returned if there is missing characteristics information. """ X, y, indices, metad = _prep_input(X, y, indices) N, L, T = metad["N"], metad["L"], metad["T"] ypred = np.full((N), np.nan) # Compute realized factor returns Numer = Denom = Factor_OOS = np.linalg.solve(Denom, Numer.reshape((-1, 1))) if mean_factor: ypred = np.squeeze(\ .dot(np.mean(self.Factors, axis=1).reshape((-1, 1)))) else: ypred = np.diag( return ypred
def _fit_ipca(self, X=None, y=None, indices=None, PSF=None, Q=None, W=None, val_obs=None, Gamma=None, Factors=None, quiet=False, data_type="portfolio", **kwargs): """ Fits the regressor to the data using alternating least squares Parameters ---------- X : numpy array, optional matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. y : numpy array, optional dependent variable where indices correspond to those in X indices : numpy array, optional array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. PSF : array-like of shape (M, T), optional pre-specified factors Q : array-like of shape (L,T), optional characteristics weighted portfolios W : array_like of shape (L, L,T), optional portfolio weights val_obs: array-like, optional matrix of dimension (T), containting the number of non missing observations at each point in time Gamma : numpy array, optional If provided, starting values for Gamma Factors : numpy array, optional If provided, starting values for Factors data_type : str label for method used when fitting ALS, should be one of: 1. panel 2. portfolio quiet : optional, bool If true no text output will be produced Returns ------- Gamma : array-like with dimensions (L, n_factors). If there are n_prespec many pre-specified factors in the model then the matrix returned is of dimension (L, (n_factors+M)). If an intercept is included in the model, its loadings are returned in the last column of Gamma. Factors : array_like with dimensions (n_factors, T). If pre-specified factors were passed the returned matrix is of dimension ((n_factors - M), T), corresponding to the n_factors - M many factors estimated on top of the pre- specified ones. """ if data_type == "panel": ALS_inputs = (X, y, indices) ALS_fit = self._ALS_fit_panel elif data_type == "portfolio": ALS_inputs = (Q, W, val_obs) ALS_fit = self._ALS_fit_portfolio else: raise ValueError("Unsupported ALS method: %s" % data_type) # Initialize the Alternating Least Squares Procedure if Gamma is None or Factors is None: Gamma_Old, s, v = np.linalg.svd(Q) Gamma_Old = Gamma_Old[:, :self.n_factors_eff] s = s[:self.n_factors_eff] v = v[:self.n_factors_eff, :] Factor_Old = np.diag(s).dot(v) if Gamma is not None: Gamma_Old = Gamma if Factors is not None: Factors_Old = Factors # Estimation Step tol_current = 1 iter = 0 while((iter <= self.max_iter) and (tol_current > self.iter_tol)): Gamma_New, Factor_New = ALS_fit(Gamma_Old, *ALS_inputs, PSF=PSF, **kwargs) if self.PSFcase: tol_current = np.max(np.abs(Gamma_New - Gamma_Old)) else: tol_current_G = np.max(np.abs(Gamma_New - Gamma_Old)) tol_current_F = np.max(np.abs(Factor_New - Factor_Old)) tol_current = max(tol_current_G, tol_current_F) # Update factors and loadings Factor_Old, Gamma_Old = Factor_New, Gamma_New iter += 1 if not quiet: print('Step', iter, '- Aggregate Update:', tol_current) if not quiet: print('-- Convergence Reached --') return Gamma_New, Factor_New def _ALS_fit_portfolio(self, Gamma_Old, Q, W, val_obs, PSF=None, **kwargs): """Alternating least squares procedure to fit params Runs using portfolio data as input The alternating least squares procedure switches back and forth between evaluating the first order conditions for Gamma_Beta, and the factors until convergence is reached. This function carries out one complete update procedure and will need to be called repeatedly using the updated Gamma's and factors as inputs. """ T = self.metad["T"] if PSF is None: L, K = np.shape(Gamma_Old) Ktilde = K else: L, Ktilde = np.shape(Gamma_Old) K_PSF, _ = np.shape(PSF) K = Ktilde - K_PSF # ALS Step 1 if K > 0: # case with no observed factors if PSF is None: if self.n_jobs > 1: F_New = Parallel(n_jobs=self.n_jobs, backend=self.backend)( delayed(_Ft_fit_portfolio)( Gamma_Old, W[:,:,t], Q[:,t]) for t in range(T)) F_New = np.stack(F_New, axis=1) else: F_New = np.full((K, T), np.nan) for t in range(T): F_New[:,t] = _Ft_fit_portfolio(Gamma_Old, W[:,:,t], Q[:,t]) # observed factors+latent factors case else: if self.n_jobs > 1: F_New = Parallel(n_jobs=n_jobs, backend=backend)( delayed(_Ft_fit_PSF_portfolio)( Gamma_Old, W[:,:,t], Q[:,t], PSF[:,t], K, Ktilde) for t in range(T)) F_New = np.stack(F_New, axis=1) else: F_New = np.full((K, T), np.nan) for t in range(T): F_New[:,t] = _Ft_fit_PSF_portfolio(Gamma_Old, W[:,:,t], Q[:,t], PSF[:,t], K, Ktilde) else: F_New = None # ALS Step 2 Gamma_New = _Gamma_fit_portfolio(F_New, Q, W, val_obs, PSF, L, K, Ktilde, T) # condition checks if K > 0: # Enforce Orthogonality in Gamma_alpha and Gamma_beta R1 = _numba_chol(Gamma_New[:, :K][:, :K])).T R2, _, _ = _numba_svd( Gamma_New[:, :K] = _numba_lstsq(Gamma_New[:, :K].T, R1.T)[0].dot(R2) F_New = _numba_solve(R2, # Enforce sign convention for Gamma_Beta and F_New sg = np.sign(np.mean(F_New, axis=1)).reshape((-1, 1)) sg[sg == 0] = 1 Gamma_New[:, :K] = np.multiply(Gamma_New[:, :K], sg.T) F_New = np.multiply(F_New, sg) if PSF is not None: Gamma_New[:, K:] = (np.identity(Gamma_New.shape[0]) - Gamma_New[:, :K].dot(Gamma_New[:, :K].T)).dot(Gamma_New[:, K:]) F_New += Gamma_New[:, :K][:, K:]).dot(PSF) sg = np.sign(np.mean(F_New, axis=1)).reshape((-1, 1)) sg[sg == 0] = 1 Gamma_New[:, :K] = np.multiply(Gamma_New[:, :K], sg.T) F_New = np.multiply(F_New, sg) return Gamma_New, F_New def _ALS_fit_panel(self, Gamma_Old, X, y, indices, PSF=None, **kwargs): """Alternating least squares procedure to fit params Runs using panel data as input The alternating least squares procedure switches back and forth between evaluating the first order conditions for Gamma_Beta, and the factors until convergence is reached. This function carries out one complete update procedure and will need to be called repeatedly using the updated Gamma's and factors as inputs. """ T, dates = self.metad["T"], self.metad["dates"] if PSF is None: L, K = np.shape(Gamma_Old) Ktilde = K else: L, Ktilde = np.shape(Gamma_Old) K_PSF, _ = np.shape(PSF) K = Ktilde - K_PSF # prep T-ind for iteration Tind = [np.where(indices[:,1] == t)[0] for t in range(T)] # ALS Step 1 if K > 0: # case with no observed factors if PSF is None: if self.n_jobs > 1: F_New = Parallel(n_jobs=self.n_jobs, backend=self.backend)( delayed(_Ft_fit_panel)( Gamma_Old, X[tind,:], y[tind]) for t, tind in enumerate(Tind)) F_New = np.stack(F_New, axis=1) else: F_New = np.full((K, T), np.nan) for t, tind in enumerate(Tind): F_New[:,t] = _Ft_fit_panel(Gamma_Old, X[tind,:], y[tind]) # observed factors+latent factors case else: if self.n_jobs > 1: F_New = Parallel(n_jobs=n_jobs, backend=backend)( delayed(_Ft_fit_PSF_panel)( Gamma_Old, X[tind,:], y[tind], PSF[:,t], K, Ktilde) for t, tind in enumerate(Tind)) F_New = np.stack(F_New, axis=1) else: F_New = np.full((K, T), np.nan) for t, tind in enumerate(Tind): F_New[:,t] = _Ft_fit_PSF_panel(Gamma_Old, X[tind,:], y[tind], PSF[:,t], K, Ktilde) else: F_New = None # ALS Step 2 Gamma_New = _Gamma_fit_panel(F_New, X, y, indices, PSF, L, Ktilde, self.alpha, self.l1_ratio, **kwargs) # condition checks if K > 0: # Enforce Orthogonality in Gamma_alpha and Gamma_beta R1 = _numba_chol(Gamma_New[:, :K][:, :K])).T R2, _, _ = _numba_svd( Gamma_New[:, :K] = _numba_lstsq(Gamma_New[:, :K].T, R1.T)[0].dot(R2) F_New = _numba_solve(R2, # Enforce sign convention for Gamma_Beta and F_New sg = np.sign(np.mean(F_New, axis=1)).reshape((-1, 1)) sg[sg == 0] = 1 Gamma_New[:, :K] = np.multiply(Gamma_New[:, :K], sg.T) F_New = np.multiply(F_New, sg) if PSF is not None: Gamma_New[:, K:] = (np.identity(Gamma_New.shape[0]) - Gamma_New[:, :K].dot(Gamma_New[:, :K].T)).dot(Gamma_New[:, K:]) F_New += Gamma_New[:, :K][:, K:]).dot(PSF) sg = np.sign(np.mean(F_New, axis=1)).reshape((-1, 1)) sg[sg == 0] = 1 Gamma_New[:, :K] = np.multiply(Gamma_New[:, :K], sg.T) F_New = np.multiply(F_New, sg) return Gamma_New, F_New
def _prep_input(X, y=None, indices=None): """handle mapping from different inputs type to consistent internal data Parameters ---------- X : numpy array or pandas DataFrame matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. If given as a DataFrame, we assume that it contains a mutliindex mapping to each entity-time pair y : numpy array or pandas Series, optional dependent variable where indices correspond to those in X If given as a Series, we assume that it contains a mutliindex mapping to each entity-time pair indices : numpy array or pandas MultiIndex, optional array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. Returns ------- X : numpy array matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. y : numpy array dependent variable where indices correspond to those in X indices : numpy array array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. metad : dict contains metadata on inputs: dates : array-like unique dates in panel ids : array-like unique ids in panel chars : array-like labels for X chars/columns T : scalar number of time periods N : scalar number of ids L : scalar total number of characteristics """ # Check panel input if X is None: raise ValueError('Must pass panel input data.') else: # remove panel rows containing missing obs non_nan_ind = ~np.any(np.isnan(X), axis=1) X = X[non_nan_ind] if y is not None: y = y[non_nan_ind] # if data-frames passed, break out indices from data if isinstance(X, pd.DataFrame) and not isinstance(y, pd.Series): indices = X.index chars = X.columns X = X.values elif not isinstance(X, pd.DataFrame) and isinstance(y, pd.Series): indices = y.index y = y.values chars = np.arange(X.shape[1]) elif isinstance(X, pd.DataFrame) and isinstance(y, pd.Series): Xind = X.index chars = X.columns yind = y.index X = X.values y = y.values if not np.array_equal(Xind, yind): raise ValueError("If indices are provided with both X and y\ they be the same") indices = Xind else: chars = np.arange(X.shape[1]) if indices is None: raise ValueError("entity-time indices must be provided either\ separately or as a MultiIndex with X/y") # extract numpy array and labels from multiindex if isinstance(indices, pd.MultiIndex): indices = indices.to_frame().values ids = np.unique(indices[:, 0]) dates = np.unique(indices[:, 1]) indices[:,0] = np.unique(indices[:,0], return_inverse=True)[1] indices[:,1] = np.unique(indices[:,1], return_inverse=True)[1] # init data dimensions T = np.size(dates, axis=0) N = np.size(ids, axis=0) L = np.size(chars, axis=0) # prep metadata metad = {} metad["dates"] = dates metad["ids"] = ids metad["chars"] = chars metad["T"] = T metad["N"] = N metad["L"] = L return X, y, indices, metad def _build_portfolio(X, y, indices, metad): """ Converts a stacked panel of data where each row corresponds to an observation (i, t) into a tensor of dimensions (N, L, T) where N is the number of unique entities, L is the number of characteristics and T is the number of unique dates Parameters ---------- X : numpy array matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. y : numpy array, optional dependent variable where indices correspond to those in X If None, we just build the portfolio info for ind vars indices : numpy array array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. metad : dict dictionary containing metadata Returns ------- Q: array-like, optional matrix of dimensions (L, T), containing the characteristics weighted portfolios W: array-like, optional matrix of dimension (L, L, T) val_obs: array-like matrix of dimension (T), containting the number of non missing observations at each point in time """ N, L, T = metad["N"], metad["L"], metad["T"] print('The panel dimensions are:') print('n_samples:', N, ', L:', L, ', T:', T) bar = progressbar.ProgressBar(maxval=T, widgets=[progressbar.Bar('=', '[', ']'), ' ', progressbar.Percentage()]) bar.start() # init portfolio outputs based on input type if y is not None: Q = np.full((L, T), np.nan) else: Q = None W = np.full((L, L, T), np.nan) val_obs = np.full((T), np.nan) if y is None: for t in range(T): ixt = (indices[:, 1] == t) val_obs[t] = np.sum(ixt) W[:, :, t] = X[ixt, :][ixt, :])/val_obs[t] bar.update(t) else: for t in range(T): ixt = (indices[:, 1] == t) val_obs[t] = np.sum(ixt) Q[:, t] = X[ixt, :][ixt])/val_obs[t] W[:, :, t] = X[ixt, :][ixt, :])/val_obs[t] bar.update(t) bar.finish() # return portfolio data return Q, W, val_obs def _Ft_fit_portfolio(Gamma_Old, W_t, Q_t): """helper func to parallelize F ALS fit""" m1 = m2 = return np.squeeze(_numba_solve(m1, m2.reshape((-1, 1)))) def _Ft_fit_PSF_portfolio(Gamma_Old, W_t, Q_t, PSF_t, K, Ktilde): """helper func to parallelize F ALS fit with observed factors""" m1 = Gamma_Old[:,:K][:,:K]) m2 = Gamma_Old[:,:K] m2 -= Gamma_Old[:,:K][:,K:Ktilde]).dot(PSF_t) return np.squeeze(_numba_solve(m1, m2.reshape((-1, 1)))) def _Ft_fit_panel(Gamma_Old, X_t, y_t): """fits F_t using panel data""" exog_t = Ft = _numba_lstsq(exog_t, y_t)[0] return Ft def _Ft_fit_PSF_panel(Gamma_Old, X_t, y_t, PSF_t, K, Ktilde): """fits F_t using panel data with PSF""" exog_t = y_t_resid = y_t - exog_t[:,K:Ktilde].dot(PSF_t) Ft = _numba_lstsq(exog_t[:,:K], y_t_resid)[0] return Ft def _Gamma_fit_portfolio(F_New, Q, W, val_obs, PSF, L, K, Ktilde, T): """helper function for fitting gamma without panel""" Numer = _numba_full((L*Ktilde, 1), 0.0) Denom = _numba_full((L*Ktilde, L*Ktilde), 0.0) # no observed factors if PSF is None: for t in range(T): Numer += _numba_kron(Q[:, t].reshape((-1, 1)), F_New[:, t].reshape((-1, 1)))\ * val_obs[t] Denom += _numba_kron(W[:, :, t], F_New[:, t].reshape((-1, 1)) .dot(F_New[:, t].reshape((1, -1)))) \ * val_obs[t] # observed+latent factors elif K > 0: for t in range(T): Numer += _numba_kron(Q[:, t].reshape((-1, 1)), np.vstack( (F_New[:, t].reshape((-1, 1)), PSF[:, t].reshape((-1, 1)))))\ * val_obs[t] Denom_temp = np.vstack((F_New[:, t].reshape((-1, 1)), PSF[:, t].reshape((-1, 1)))) Denom += _numba_kron(W[:, :, t], * val_obs[t]) # only observed factors else: for t in range(T): Numer += _numba_kron(Q[:, t].reshape((-1, 1)), PSF[:, t].reshape((-1, 1)))\ * val_obs[t] Denom += _numba_kron(W[:, :, t], PSF[:, t].reshape((-1, 1)) .dot(PSF[:, t].reshape((-1, 1)).T))\ * val_obs[t] Gamma_New = _numba_solve(Denom, Numer).reshape((L, Ktilde)) return Gamma_New def _Gamma_fit_panel(F_New, X, y, indices, PSF, L, Ktilde, alpha, l1_ratio, **kwargs): """helper function for estimating vectorized Gamma with panel""" # join observed factors with latent factors and map to panel if PSF is None: F = F_New else: if F_New is None: F = PSF else: F = np.vstack((F_New, PSF)) F = F[:,indices[:,1]] # interact factors and characteristics ZkF = np.hstack([F[k,:,None] * X for k in range(Ktilde)]) # elastic net fit if alpha: mod = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, **kwargs), y) gamma = mod.coef_ # OLS fit else: gamma = _numba_lstsq(ZkF, y)[0] gamma = gamma.reshape((Ktilde, L)).T return gamma def _fit_cv(model, X, y, indices, PSF, n_splits, split_method, alpha, **kwargs): """inner function for fit_path doing CV Parameters ---------- model : IPCA model instance contains the params X : numpy array matrix of characteristics where each row corresponds to a entity-time pair in indices. The number of characteristics (columns here) used as instruments is L. y : numpy array dependent variable where indices correspond to those in X indices : numpy array array containing the panel indices. Should consist of two columns: - Column 1: entity id (i) - Column 2: time index (t) The panel may be unbalanced. The number of unique entities is n_samples, the number of unique dates is T, and the number of characteristics used as instruments is L. PSF : numpy array, optional Set of pre-specified factors as matrix of dimension (M, T) n_splits : scalar number of CV partitions split_method : sklearn cross-validation generator factory method to generate CV partitions alpha : scalar regularizing constant for current step in elastic-net path Returns ------- mse : array array of MSEs for each CV partition Notes ----- Groups are defined by 'ids' """ # build iterator mse_l = [] split = split_method(n_splits=n_splits) full_tind = np.unique(indices[:,1]) for train, test in split.split(indices, groups=indices[:,0]): # build partitioned model train_X = X[train,:] train_y = y[train] train_indices = indices[train,:] test_X = X[test,:] test_y = y[test] test_indices = indices[test,:] if PSF is None: train_PSF = None else: train_tind = np.unique(train_indices[:,1]) train_tind = np.where(np.isin(full_tind, train_tind))[0] train_PSF = PSF[:,train_tind] # init new training model params = model.get_params() params["alpha"] = alpha train_IPCA = InstrumentedPCA(**params) train_IPCA =, train_y, train_indices, train_PSF, **kwargs) # get MSE test_pred = train_IPCA.predict(test_X, test_indices, mean_factor=True) mse = np.sum(np.square(test_y - test_pred)) mse /= test_pred.shape[0] mse_l.append(mse) return np.array(mse_l) def _BS_Walpha_sub(model, n, d): L, T = model.metad["L"], model.metad["T"] Q_b = np.full((L, T), np.nan) np.random.seed(n) # Re-estimate unrestricted model Gamma = None while Gamma is None: try: for t in range(T): d_temp = np.random.standard_t(5) d_temp *= d[:,np.random.randint(0,high=T)] Q_b[:, t] = model.W[:, :, t].dot(model.Gamma[:, :-1])\ .dot(model.Factors[:-1, t]) + d_temp Gamma, Factors = model._fit_ipca(Q=Q_b, W=model.W, val_obs=model.val_obs, PSF=model.PSF, quiet=True, data_type="portfolio") except np.linalg.LinAlgError: warnings.warn("Encountered singularity in bootstrap iteration.\ Observation discarded.") pass # Compute and store Walpha_b Walpha_b = Gamma[:, -1][:, -1]) return Walpha_b def _BS_Wbeta_sub(model, n, d, l): L, T = model.metad["L"], model.metad["T"] Q_b = np.full((L, T), np.nan) np.random.seed(n) #Modify Gamma_beta such that its l-th row is zero Gamma_beta_l = np.copy(model.Gamma) Gamma_beta_l[l, :] = 0 Gamma = None while Gamma is None: try: for t in range(T): d_temp = np.random.standard_t(5) d_temp *= d[:,np.random.randint(0,high=T)] Q_b[:, t] = model.W[:, :, t].dot(Gamma_beta_l)\ .dot(model.Factors[:, t]) + d_temp Gamma, Factors = model._fit_ipca(Q=Q_b, W=model.W, val_obs=model.val_obs, PSF=model.PSF, quiet=True, data_type="portfolio") except np.linalg.LinAlgError: warnings.warn("Encountered singularity in bootstrap iteration.\ Observation discarded.") pass # Compute and store Walpha_b Wbeta_l_b = Gamma[l, :].dot(Gamma[l, :].T) Wbeta_l_b = np.trace(Wbeta_l_b) return Wbeta_l_b def _BS_Wdelta_sub(model, n, d): L, T = model.metad["L"], model.metad["T"] Q_b = np.full((L, T), np.nan) np.random.seed(n) #Modify Gamma_delta such that its last row for the PSF is zero Gamma_delta = np.copy(model.Gamma) Gamma_delta[:, -1] = 0 Gamma = None while Gamma is None: try: for t in range(T): d_temp = np.random.standard_t(5) d_temp *= d[:,np.random.randint(0,high=T)] Q_b[:, t] = model.W[:, :, t].dot(Gamma_delta)\ .dot(model.Factors[:, t]) + d_temp Gamma, Factors = model._fit_ipca(Q=Q_b, W=model.W, val_obs=model.val_obs, PSF=model.PSF, quiet=True, data_type="portfolio") except np.linalg.LinAlgError: warnings.warn("Encountered singularity in bootstrap iteration.\ Observation discarded.") pass # Compute and store Wdelta_b Wdelta_b = Gamma[:, -1][:, -1]) return Wdelta_b @jit(nopython=True) def _numba_solve(m1, m2): return np.linalg.solve(np.ascontiguousarray(m1), np.ascontiguousarray(m2)) @jit(nopython=True) def _numba_lstsq(m1, m2): return np.linalg.lstsq(np.ascontiguousarray(m1), np.ascontiguousarray(m2)) @jit(nopython=True) def _numba_kron(m1, m2): return np.kron(np.ascontiguousarray(m1), np.ascontiguousarray(m2)) @jit(nopython=True) def _numba_chol(m1): return np.linalg.cholesky(np.ascontiguousarray(m1)) @jit(nopython=True) def _numba_svd(m1): return np.linalg.svd(np.ascontiguousarray(m1)) @jit(nopython=True) def _numba_full(m1, m2): return np.full(m1, m2)