Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions mlforecast/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,7 @@ def cross_validation(
fitted: bool = False,
as_numpy: bool = False,
weight_col: Optional[str] = None,
fold_transform: Optional[Callable[..., Tuple[DFType, DFType]]] = None,
validate_data: bool = True,
) -> DFType:
"""Perform time series cross validation.
Expand Down Expand Up @@ -984,6 +985,10 @@ def cross_validation(
fitted (bool): Store the in-sample predictions. Defaults to False.
as_numpy (bool): Cast features to numpy array. Defaults to True.
weight_col (str, optional): Column that contains the sample weights. Defaults to None.
fold_transform (callable, optional): Function applied to each train/validation split before fitting and predicting.
Must return the transformed `(train_df, valid_df)` pair. The function receives keyword arguments
`id_col`, `time_col` and `target_col`. This requires `refit=True` (or `refit=1`) so models are
retrained on each transformed fold. Defaults to None.
validate_data (bool): Run data quality validations on the full dataset before cross-validation. Warns about missing dates and raises on duplicate rows. Defaults to True.

Returns:
Expand All @@ -992,6 +997,8 @@ def cross_validation(
# Run data validations once on full dataset if requested
if validate_data:
self._validate_data(df, id_col, time_col)
if fold_transform is not None and refit is not True and refit != 1:
raise ValueError("`fold_transform` requires `refit=True` or `refit=1`.")
Comment thread
janrth marked this conversation as resolved.

results = []
cv_models = []
Expand All @@ -1007,6 +1014,14 @@ def cross_validation(
input_size=input_size,
)
for i_window, (cutoffs, train, valid) in enumerate(splits):
if fold_transform is not None:
train, valid = fold_transform(
train,
valid,
id_col=id_col,
time_col=time_col,
target_col=target_col,
)
should_fit = i_window == 0 or (refit > 0 and i_window % refit == 0)
if should_fit:
self.fit(
Expand Down
226 changes: 226 additions & 0 deletions nbs/docs/how-to-guides/cross_validation.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,232 @@
"Notice that in each cutoff period, we generated a forecast for the next 24 hours using only the data `y` before said period. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Custom fold transforms for categorical encoders\n",
"\n",
"Sometimes you want to learn extra features inside each cross validation fold before fitting the forecasting model. A common example is target encoding for a categorical exogenous features\n",
"\n",
"This is what `fold_transform` is for. The function is called on each `(train_df, valid_df)` split, so the encoder is fit on the training fold only and then applied to both the training and validation data. This avoids leakage from the validation fold into the training fold.\n",
"\n",
"Below we use a small subset of the M5 dataset and build two fold-learned features from the same categorical column, `dept_id`: a smoothed target encoder and a count encoder. We keep the encoder functions separate and compose them into a single `fold_transform`, which makes it easier to mix and match custom preprocessing steps. Since the downstream model is a linear regressor, we drop the original string category column before fitting.\n",
"\n",
"Based on such a functionality we don't rely on the usage of sklearn pipelines anymore. We can build our custom functions for any sort of encoding and be sure it is applied on each fold without data leakage.\n",
"\n",
"`fold_transform` expects a function with the signature fn(train_df, valid_df, *, id_col, time_col, target_col, ...), where any additional arguments such as cat_col or smoothing parameters must be optional keyword arguments with defaults defined inside the function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import polars as pl\n",
"from datasetsforecast.m5 import M5\n",
"from sklearn.linear_model import LinearRegression\n",
"\n",
"from mlforecast import MLForecast\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"data_dir = 'nbs/dev/data'\n",
"Y_df, _, S_df = M5.load(data_dir)\n",
"\n",
"# Build a small uneven sample across departments so both the target encoder\n",
"# and the count encoder have something meaningful to learn.\n",
"dept_limits = {\n",
" 'FOODS_1': 6,\n",
" 'FOODS_2': 4,\n",
" 'FOODS_3': 3,\n",
" 'HOBBIES_1': 5,\n",
" 'HOBBIES_2': 2,\n",
" 'HOUSEHOLD_1': 4,\n",
" 'HOUSEHOLD_2': 3,\n",
"}\n",
"sample_meta = S_df[['unique_id', 'dept_id']].copy()\n",
"sample_meta['_n'] = sample_meta.groupby('dept_id', observed=True).cumcount()\n",
"sample_meta = sample_meta[\n",
" sample_meta['_n'] < sample_meta['dept_id'].map(dept_limits).fillna(0)\n",
"].drop(columns=['_n'])\n",
"series_pd = Y_df.merge(sample_meta, on='unique_id', how='inner').copy()\n",
"series_pd['dept_id'] = series_pd['dept_id'].astype('category')\n",
"series_pd.head()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def smoothed_target_encode_pandas(\n",
" train_df, valid_df, *, id_col, time_col, target_col, cat_col='dept_id', alpha=20.0\n",
"):\n",
" enc_col = f'{cat_col}_smoothed_te'\n",
" stats = (\n",
" train_df.groupby(cat_col, observed=True)[target_col]\n",
" .agg(['mean', 'count'])\n",
" .reset_index()\n",
" )\n",
" global_mean = train_df[target_col].mean()\n",
" stats[enc_col] = (\n",
" stats['count'] * stats['mean'] + alpha * global_mean\n",
" ) / (stats['count'] + alpha)\n",
" stats = stats[[cat_col, enc_col]]\n",
" train_df = train_df.merge(stats, on=cat_col, how='left')\n",
" valid_df = valid_df.merge(stats, on=cat_col, how='left')\n",
" train_df[enc_col] = train_df[enc_col].fillna(global_mean)\n",
" valid_df[enc_col] = valid_df[enc_col].fillna(global_mean)\n",
" return train_df, valid_df\n",
"\n",
"\n",
"def count_encode_pandas(\n",
" train_df, valid_df, *, id_col, time_col, target_col, cat_col='dept_id'\n",
"):\n",
" enc_col = f'{cat_col}_count_enc'\n",
" stats = train_df.groupby(cat_col, observed=True).size().rename(enc_col).reset_index()\n",
" train_df = train_df.merge(stats, on=cat_col, how='left')\n",
" valid_df = valid_df.merge(stats, on=cat_col, how='left')\n",
" train_df[enc_col] = train_df[enc_col].fillna(0)\n",
" valid_df[enc_col] = valid_df[enc_col].fillna(0)\n",
" return train_df, valid_df\n",
"\n",
"\n",
"def drop_category_column_pandas(\n",
" train_df, valid_df, *, id_col, time_col, target_col, cat_col='dept_id'\n",
"):\n",
" return train_df.drop(columns=[cat_col]), valid_df.drop(columns=[cat_col])\n",
"\n",
"\n",
"def compose_fold_transforms(*transforms):\n",
" def _transform(train_df, valid_df, **kwargs):\n",
" for transform in transforms:\n",
" train_df, valid_df = transform(train_df, valid_df, **kwargs)\n",
" return train_df, valid_df\n",
"\n",
" return _transform\n",
"\n",
"\n",
"m5_fold_transform_pandas = compose_fold_transforms(\n",
" smoothed_target_encode_pandas,\n",
" count_encode_pandas,\n",
" drop_category_column_pandas,\n",
")\n",
"\n",
"fcst_pd = MLForecast(\n",
" models=LinearRegression(),\n",
" freq='D',\n",
" lags=[1, 2, 7],\n",
" date_features=['dayofweek'],\n",
")\n",
"\n",
"cv_pd_custom = fcst_pd.cross_validation(\n",
" series_pd,\n",
" n_windows=2,\n",
" h=7,\n",
" static_features=['dept_id_smoothed_te', 'dept_id_count_enc'],\n",
" fold_transform=m5_fold_transform_pandas,\n",
" as_numpy=True,\n",
")\n",
"cv_pd_custom.head()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The same pattern works with polars. We reuse the same composition helper but swap in native polars encoder functions.\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def smoothed_target_encode_polars(\n",
" train_df, valid_df, *, id_col, time_col, target_col, cat_col='dept_id', alpha=20.0\n",
"):\n",
" enc_col = f'{cat_col}_smoothed_te'\n",
" global_mean = train_df[target_col].mean()\n",
" stats = train_df.group_by(cat_col).agg(\n",
" pl.col(target_col).mean().alias('_mean'),\n",
" pl.len().alias('_count'),\n",
" ).with_columns(\n",
" ((pl.col('_count') * pl.col('_mean') + alpha * global_mean) / (pl.col('_count') + alpha)).alias(enc_col)\n",
" ).select(cat_col, enc_col)\n",
" train_df = train_df.join(stats, on=cat_col, how='left').with_columns(\n",
" pl.col(enc_col).fill_null(global_mean)\n",
" )\n",
" valid_df = valid_df.join(stats, on=cat_col, how='left').with_columns(\n",
" pl.col(enc_col).fill_null(global_mean)\n",
" )\n",
" return train_df, valid_df\n",
"\n",
"\n",
"def count_encode_polars(\n",
" train_df, valid_df, *, id_col, time_col, target_col, cat_col='dept_id'\n",
"):\n",
" enc_col = f'{cat_col}_count_enc'\n",
" stats = train_df.group_by(cat_col).agg(pl.len().alias(enc_col))\n",
" train_df = train_df.join(stats, on=cat_col, how='left').with_columns(\n",
" pl.col(enc_col).fill_null(0)\n",
" )\n",
" valid_df = valid_df.join(stats, on=cat_col, how='left').with_columns(\n",
" pl.col(enc_col).fill_null(0)\n",
" )\n",
" return train_df, valid_df\n",
"\n",
"\n",
"def drop_category_column_polars(\n",
" train_df, valid_df, *, id_col, time_col, target_col, cat_col='dept_id'\n",
"):\n",
" return train_df.drop(cat_col), valid_df.drop(cat_col)\n",
"\n",
"\n",
"m5_fold_transform_polars = compose_fold_transforms(\n",
" smoothed_target_encode_polars,\n",
" count_encode_polars,\n",
" drop_category_column_polars,\n",
")\n",
"\n",
"series_pl = pl.from_pandas(series_pd.astype({'dept_id': 'str'}))\n",
"\n",
"fcst_pl = MLForecast(\n",
" models=LinearRegression(),\n",
" freq='1d',\n",
" lags=[1, 2, 7],\n",
" date_features=['weekday'],\n",
")\n",
"\n",
"cv_pl_custom = fcst_pl.cross_validation(\n",
" series_pl,\n",
" n_windows=2,\n",
" h=7,\n",
" static_features=['dept_id_smoothed_te', 'dept_id_count_enc'],\n",
" fold_transform=m5_fold_transform_polars,\n",
" as_numpy=False\n",
")\n",
"cv_pl_custom.head()\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This kind of fold-level transform is useful when your categorical preprocessing is more expressive than the built-in global or grouped lag features. For example, you may want to combine smoothed encoders, count encoders, leave-one-out encoders, or custom business logic implemented directly in pandas or polars.\n",
"\n",
"> Note: `fold_transform` prevents leakage from the validation fold into the training fold. If you need a strictly causal encoder inside each training fold, that logic still needs to be implemented inside the encoder itself.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down
103 changes: 103 additions & 0 deletions tests/test_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import pytest
import utilsforecast.processing as ufp
import xgboost as xgb
from sklearn.compose import ColumnTransformer
from sklearn import set_config
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import TargetEncoder
Comment thread
janrth marked this conversation as resolved.
from utilsforecast.feature_engineering import fourier, time_features
from utilsforecast.processing import match_if_categorical

Expand Down Expand Up @@ -671,6 +674,106 @@ def test_cross_validation(non_std_series):
assert_cross_validation(non_std_series)
assert_cross_validation(non_std_series, add_exogenous=True)


def test_cross_validation_supports_sklearn_target_encoder_pipeline():
series = generate_series(n_series=10, min_length=30, max_length=30)
series["unique_id"] = series["unique_id"].astype("category")
model = make_pipeline(
ColumnTransformer(
[
(
"target_encoder",
TargetEncoder(target_type="continuous"),
["unique_id"],
)
],
remainder="passthrough",
),
LinearRegression(),
)
fcst = MLForecast(
models=model,
freq="D",
lags=[1, 2, 7],
date_features=["dayofweek"],
)
cv_results = fcst.cross_validation(
series,
n_windows=2,
h=3,
static_features=["unique_id"],
as_numpy=False,
)
assert not cv_results.empty
assert cv_results["LinearRegression"].notnull().all()


def _fold_mean_encode(train, valid, *, id_col, time_col, target_col):
encoded_col = f"{id_col}_mean_enc"
if isinstance(train, pd.DataFrame):
stats = (
train.groupby(id_col, observed=True)[target_col]
.mean()
.rename(encoded_col)
.reset_index()
)
global_mean = train[target_col].mean()
train = train.merge(stats, on=id_col, how="left")
valid = valid.merge(stats, on=id_col, how="left")
train[encoded_col] = train[encoded_col].fillna(global_mean)
valid[encoded_col] = valid[encoded_col].fillna(global_mean)
else:
stats = train.group_by(id_col).agg(pl.col(target_col).mean().alias(encoded_col))
global_mean = train[target_col].mean()
train = train.join(stats, on=id_col, how="left").with_columns(
pl.col(encoded_col).fill_null(global_mean)
)
valid = valid.join(stats, on=id_col, how="left").with_columns(
pl.col(encoded_col).fill_null(global_mean)
)
return train, valid


@pytest.mark.parametrize("engine", ["pandas", "polars"])
def test_cross_validation_fold_transform(engine):
series = generate_series(n_series=10, min_length=30, max_length=30)
freq = "D"
if engine == "polars":
series = pl.from_pandas(series)
freq = "1d"
fcst = MLForecast(
models=LinearRegression(),
freq=freq,
lags=[1, 2, 7],
date_features=["dayofweek"] if engine == "pandas" else ["weekday"],
)
cv_results = fcst.cross_validation(
series,
n_windows=2,
h=3,
static_features=["unique_id"],
fold_transform=_fold_mean_encode,
)
pred_col = "LinearRegression"
assert not cv_results.is_empty() if engine == "polars" else not cv_results.empty
if engine == "polars":
assert cv_results[pred_col].is_not_null().all()
else:
assert cv_results[pred_col].notnull().all()


def test_cross_validation_fold_transform_requires_refit():
series = generate_series(n_series=5, min_length=20, max_length=20)
fcst = MLForecast(models=LinearRegression(), freq="D", lags=[1, 2, 3])
with pytest.raises(ValueError, match="`fold_transform` requires `refit=True` or `refit=1`"):
fcst.cross_validation(
series,
n_windows=2,
h=3,
refit=False,
fold_transform=_fold_mean_encode,
)

# test short series in cv
def test_short_series_in_cv():
series = generate_daily_series(
Expand Down
Loading