/ tests / statsmodels / model_fixtures.py
model_fixtures.py
  1  from typing import Any, NamedTuple
  2  
  3  import numpy as np
  4  import pandas as pd
  5  import statsmodels.api as sm
  6  from scipy.linalg import toeplitz
  7  from statsmodels.tsa.arima.model import ARIMA
  8  from statsmodels.tsa.arima_process import arma_generate_sample
  9  
 10  from mlflow.models import ModelSignature
 11  from mlflow.types.schema import Schema, TensorSpec
 12  
 13  
 14  class ModelWithResults(NamedTuple):
 15      model: Any
 16      alg: Any
 17      inference_dataframe: Any
 18  
 19  
 20  """
 21      Fixtures for a number of models available in statsmodels
 22      https://www.statsmodels.org/dev/api.html
 23  """
 24  
 25  
 26  def ols_model(**kwargs):
 27      # Ordinary Least Squares (OLS)
 28      np.random.seed(9876789)
 29      nsamples = 100
 30      x = np.linspace(0, 10, 100)
 31      X = np.column_stack((x, x**2))
 32      beta = np.array([1, 0.1, 10])
 33      e = np.random.normal(size=nsamples)
 34      X = sm.add_constant(X)
 35      y = np.dot(X, beta) + e
 36  
 37      ols = sm.OLS(y, X)
 38      model = ols.fit(**kwargs)
 39  
 40      return ModelWithResults(model=model, alg=ols, inference_dataframe=X)
 41  
 42  
 43  def ols_model_signature():
 44      return ModelSignature(
 45          inputs=Schema([TensorSpec(np.dtype("float64"), (-1, 3))]),
 46          outputs=Schema([TensorSpec(np.dtype("float64"), (-1,))]),
 47      )
 48  
 49  
 50  def failing_logit_model():
 51      X = pd.DataFrame(
 52          {
 53              "x0": np.array([2.0, 3.0, 1.0, 2.0, 20.0, 30.0, 10.0, 20.0]),
 54              "x1": np.array([2.0, 3.0, 1.0, 2.0, 20.0, 30.0, 10.0, 20.0]),
 55          },
 56          columns=["x0", "x1"],
 57      )
 58      y = np.array([0, 0, 0, 0, 1, 1, 1, 1])
 59      # building the model and fitting the data
 60      log_reg = sm.Logit(y, X)
 61      model = log_reg.fit()
 62  
 63      return ModelWithResults(model=model, alg=log_reg, inference_dataframe=X)
 64  
 65  
 66  def get_dataset(name):
 67      dataset_module = getattr(sm.datasets, name)
 68      data = dataset_module.load()
 69      data.exog = np.asarray(data.exog)
 70      data.endog = np.asarray(data.endog)
 71      return data
 72  
 73  
 74  def gls_model():
 75      # Generalized Least Squares (GLS)
 76      data = get_dataset("longley")
 77      data.exog = sm.add_constant(data.exog)
 78      ols_resid = sm.OLS(data.endog, data.exog).fit().resid
 79      res_fit = sm.OLS(ols_resid[1:], ols_resid[:-1]).fit()
 80      rho = res_fit.params
 81      order = toeplitz(np.arange(16))
 82      sigma = rho**order
 83      gls = sm.GLS(data.endog, data.exog, sigma=sigma)
 84      model = gls.fit()
 85  
 86      return ModelWithResults(model=model, alg=gls, inference_dataframe=data.exog)
 87  
 88  
 89  def glsar_model():
 90      # Generalized Least Squares with AR covariance structure
 91      X = range(1, 8)
 92      X = sm.add_constant(X)
 93      Y = [1, 3, 4, 5, 8, 10, 9]
 94      glsar = sm.GLSAR(Y, X, rho=2)
 95      model = glsar.fit()
 96  
 97      return ModelWithResults(model=model, alg=glsar, inference_dataframe=X)
 98  
 99  
100  def wls_model():
101      # Weighted Least Squares
102      Y = [1, 3, 4, 5, 2, 3, 4]
103      X = range(1, 8)
104      X = sm.add_constant(X)
105      wls = sm.WLS(Y, X, weights=list(range(1, 8)))
106      model = wls.fit()
107  
108      return ModelWithResults(model=model, alg=wls, inference_dataframe=X)
109  
110  
111  def recursivels_model():
112      # Recursive Least Squares
113      dta = sm.datasets.copper.load_pandas().data
114      dta.index = pd.date_range("1951-01-01", "1975-01-01", freq="AS")
115      endog = dta.WORLDCONSUMPTION
116  
117      # To the regressors in the dataset, we add a column of ones for an intercept
118      exog = sm.add_constant(dta[["COPPERPRICE", "INCOMEINDEX", "ALUMPRICE", "INVENTORYINDEX"]])
119      rls = sm.RecursiveLS(endog, exog)
120      model = rls.fit()
121  
122      inference_dataframe = pd.DataFrame([["1951-01-01", "1975-01-01"]], columns=["start", "end"])
123      return ModelWithResults(model=model, alg=rls, inference_dataframe=inference_dataframe)
124  
125  
126  def rolling_ols_model():
127      # Rolling Ordinary Least Squares (Rolling OLS)
128      from statsmodels.regression.rolling import RollingOLS
129  
130      data = get_dataset("longley")
131      exog = sm.add_constant(data.exog, prepend=False)
132      rolling_ols = RollingOLS(data.endog, exog)
133      model = rolling_ols.fit(reset=50)
134  
135      return ModelWithResults(model=model, alg=rolling_ols, inference_dataframe=exog)
136  
137  
138  def rolling_wls_model():
139      # Rolling Weighted Least Squares (Rolling WLS)
140      from statsmodels.regression.rolling import RollingWLS
141  
142      data = get_dataset("longley")
143      exog = sm.add_constant(data.exog, prepend=False)
144      rolling_wls = RollingWLS(data.endog, exog)
145      model = rolling_wls.fit(reset=50)
146  
147      return ModelWithResults(model=model, alg=rolling_wls, inference_dataframe=exog)
148  
149  
150  def gee_model():
151      # Example taken from
152      # https://www.statsmodels.org/devel/examples/notebooks/generated/gee_nested_simulation.html
153      np.random.seed(9876789)
154      p = 5
155      groups_var = 1
156      level1_var = 2
157      level2_var = 3
158      resid_var = 4
159      n_groups = 100
160      group_size = 20
161      level1_size = 10
162      level2_size = 5
163      n = n_groups * group_size * level1_size * level2_size
164      xmat = np.random.normal(size=(n, p))
165  
166      # Construct labels showing which group each observation belongs to at each level.
167      groups_ix = np.kron(np.arange(n // group_size), np.ones(group_size)).astype(int)
168      level1_ix = np.kron(np.arange(n // level1_size), np.ones(level1_size)).astype(int)
169      level2_ix = np.kron(np.arange(n // level2_size), np.ones(level2_size)).astype(int)
170  
171      # Simulate the random effects.
172      groups_re = np.sqrt(groups_var) * np.random.normal(size=n // group_size)
173      level1_re = np.sqrt(level1_var) * np.random.normal(size=n // level1_size)
174      level2_re = np.sqrt(level2_var) * np.random.normal(size=n // level2_size)
175  
176      # Simulate the response variable
177      y = groups_re[groups_ix] + level1_re[level1_ix] + level2_re[level2_ix]
178      y += np.sqrt(resid_var) * np.random.normal(size=n)
179  
180      # Put everything into a dataframe.
181      df = pd.DataFrame(xmat, columns=[f"x{j}" for j in range(p)])
182      df["y"] = y + xmat[:, 0] - xmat[:, 3]
183      df["groups_ix"] = groups_ix
184      df["level1_ix"] = level1_ix
185      df["level2_ix"] = level2_ix
186  
187      # Fit the model
188      cs = sm.cov_struct.Nested()
189      dep_fml = "0 + level1_ix + level2_ix"
190      gee = sm.GEE.from_formula(
191          "y ~ x0 + x1 + x2 + x3 + x4", cov_struct=cs, dep_data=dep_fml, groups="groups_ix", data=df
192      )
193      model = gee.fit()
194  
195      return ModelWithResults(model=model, alg=gee, inference_dataframe=df)
196  
197  
198  def glm_model():
199      # Generalized Linear Model (GLM)
200      data = get_dataset("scotland")
201      data.exog = sm.add_constant(data.exog)
202      glm = sm.GLM(data.endog, data.exog, family=sm.families.Gamma())
203      model = glm.fit()
204  
205      return ModelWithResults(model=model, alg=glm, inference_dataframe=data.exog)
206  
207  
208  def glmgam_model():
209      # Generalized Additive Model (GAM)
210      from statsmodels.gam.tests.test_penalized import df_autos
211  
212      x_spline = df_autos[["weight", "hp"]]
213      bs = sm.gam.BSplines(x_spline, df=[12, 10], degree=[3, 3])
214      alpha = np.array([21833888.8, 6460.38479])
215      gam_bs = sm.GLMGam.from_formula(
216          "city_mpg ~ fuel + drive", data=df_autos, smoother=bs, alpha=alpha
217      )
218      model = gam_bs.fit()
219  
220      return ModelWithResults(model=model, alg=gam_bs, inference_dataframe=df_autos)
221  
222  
223  def arma_model():
224      # Autoregressive Moving Average (ARMA)
225      np.random.seed(12345)
226      arparams = np.array([1, -0.75, 0.25])
227      maparams = np.array([1, 0.65, 0.35])
228      nobs = 250
229      y = arma_generate_sample(arparams, maparams, nobs)
230      dates = pd.date_range("1980-1-1", freq="M", periods=nobs)
231      y = pd.Series(y, index=dates)
232  
233      arima = ARIMA(y, order=(2, 0, 2), trend="n")
234      model = arima.fit()
235      inference_dataframe = pd.DataFrame([["1999-06-30", "2001-05-31"]], columns=["start", "end"])
236  
237      return ModelWithResults(model=model, alg=arima, inference_dataframe=inference_dataframe)