Skip to content

timebasedcv

Here are the top-level classes available in timebasedcv.

timebasedcv.core.TimeBasedSplit

Bases: _CoreTimeBasedSplit

TimeBasedSplit generates splits based on time periods, independently from the number of samples in each split.

It inherits from _CoreTimeBasedSplit and it only implements the .split() method and logic.

Differences with scikit-learn

TimeBasedSplit is not compatible with scikit-learn CV Splitters.

In fact, we have made the (opinioned) choice to:

  • Return the sliced arrays from .split(...), while scikit-learn CV Splitters return train and test indices of the split.
  • Require to pass the time series as input to .split(...) method, while scikit-learn CV Splitters require to provide only X, y, groups to .split(...).
  • Such time series is used to generate the boolean masks with which we slice the original arrays into train and test for each split.

If you are looking for a class compatible with scikit-learn, check out our TimeBasedCVSplitter in the timebasedcv.sklearn module.

A few examples on how splits are generated given the parameters. Let:

  • = : train period unit
  • * : forecast period unit
  • / : gap period unit
  • > : stride period unit (absorbed in = if window="expanding")

Recall also that if stride is not provided, it is set to forecast_horizon:

train_size, forecast_horizon, gap, stride, window = (4, 3, 0, None, "rolling")
| ======= *****               |
| >>>>> ======= *****         |
|       >>>>> ======= *****   |
|             >>>>> ======= * |

train_size, forecast_horizon, gap, stride, window = (4, 3, 2, 2, "rolling")

| ======= /// *****           |
| >>> ======= /// *****       |
|     >>> ======= /// *****   |
|         >>> ======= /// *** |

train_size, forecast_horizon, gap, stride, window = (4, 3, 2, 2, "expanding")
| ======= /// *****           |
| =========== /// *****       |
| =============== /// *****   |
| =================== /// *** |

Parameters:

Name Type Description Default
frequency FrequencyUnit

The frequency (or time unit) of the time series. Must be one of "days", "seconds", "microseconds", "milliseconds", "minutes", "hours", "weeks". These are the only valid values for the unit argument of timedelta from python datetime standard library.

required
train_size int

Defines the minimum number of time units required to be in the train set.

required
forecast_horizon int

Specifies the number of time units to forecast.

required
gap int

Sets the number of time units to skip between the end of the train set and the start of the forecast set.

0
stride Union[int, None]

How many time unit to move forward after each split. If None (or set to 0), the stride is equal to the forecast_horizon quantity.

None
window WindowType

The type of window to use, either "rolling" or "expanding".

'rolling'
mode ModeType

Determines in which orders the splits are generated, either "forward" (start to end) or "backward" (end to start).

'forward'

Raises:

Type Description
ValueError
  • If frequency is not one of "days", "seconds", "microseconds", "milliseconds", "minutes", "hours", "weeks".
  • If window is not one of "rolling" or "expanding".
  • If mode is not one of "forward" or "backward"
  • If train_size, forecast_horizon, gap or stride are not strictly positive.
TypeError

If train_size, forecast_horizon, gap or stride are not of type int.

Examples:

# Let's first generate some data
import pandas as pd
import numpy as np

RNG = np.random.default_rng(seed=42)

dates = pd.Series(pd.date_range("2023-01-01", "2023-01-31", freq="D"))
size = len(dates)

df = (
    pd.concat(
        [
            pd.DataFrame(
                {
                    "time": pd.date_range(start, end, periods=_size, inclusive="left"),
                    "a": RNG.normal(size=_size - 1),
                    "b": RNG.normal(size=_size - 1),
                }
            )
            for start, end, _size in zip(dates[:-1], dates[1:], RNG.integers(2, 24, size - 1))
        ]
    )
    .reset_index(drop=True)
    .assign(y=lambda t: t[["a", "b"]].sum(axis=1) + RNG.normal(size=t.shape[0]) / 25)
)

df.set_index("time").resample("D").agg(count=("y", np.size)).head(5)
            count
time
2023-01-01      2
2023-01-02     18
2023-01-03     15
2023-01-04     10
2023-01-05     10

Now let's run split the data with the provided TimeBasedSplit instance:

from timebasedcv import TimeBasedSplit


tbs = TimeBasedSplit(
    frequency="days",
    train_size=10,
    forecast_horizon=5,
    gap=1,
    stride=3
)
X, y, time_series = df.loc[:, ["a", "b"]], df["y"], df["time"]

for X_train, X_forecast, y_train, y_forecast in tbs.split(X, y, time_series=time_series):
    print(f"Train: {X_train.shape}, Forecast: {X_forecast.shape}")
Train: (100, 2), Forecast: (51, 2)
Train: (114, 2), Forecast: (50, 2)
...
Train: (124, 2), Forecast: (40, 2)
Train: (137, 2), Forecast: (22, 2)
Source code in timebasedcv/core.py
class TimeBasedSplit(_CoreTimeBasedSplit):
    """`TimeBasedSplit` generates splits based on time periods, independently from the number of samples in each split.

    It inherits from [`_CoreTimeBasedSplit`][timebasedcv.core._CoreTimeBasedSplit] and it only implements the `.split()`
    method and logic.

    !!! warning "Differences with scikit-learn"

        `TimeBasedSplit` is **not** compatible with
        [scikit-learn CV Splitters](https://scikit-learn.org/stable/common_pitfalls.html#id3){:target="_blank"}.

        In fact, we have made the (opinioned) choice to:

        - Return the sliced arrays from `.split(...)`, while scikit-learn CV Splitters return train and test indices of
            the split.
        - Require to pass the time series as input to `.split(...)` method, while scikit-learn CV Splitters require to
            provide only `X, y, groups` to `.split(...)`.
        - Such time series is used to generate the boolean masks with which we slice the original arrays into train and
            test for each split.

        If you are looking for a class compatible with scikit-learn, check out our
        [`TimeBasedCVSplitter`][timebasedcv.sklearn.TimeBasedCVSplitter] in the `timebasedcv.sklearn` module.

    A few examples on how splits are generated given the parameters. Let:

    - `=` : train period unit
    - `*` : forecast period unit
    - `/` : gap period unit
    - `>` : stride period unit (absorbed in `=` if `window="expanding"`)

    Recall also that if `stride` is not provided, it is set to `forecast_horizon`:
    ```
    train_size, forecast_horizon, gap, stride, window = (4, 3, 0, None, "rolling")
    | ======= *****               |
    | >>>>> ======= *****         |
    |       >>>>> ======= *****   |
    |             >>>>> ======= * |

    train_size, forecast_horizon, gap, stride, window = (4, 3, 2, 2, "rolling")

    | ======= /// *****           |
    | >>> ======= /// *****       |
    |     >>> ======= /// *****   |
    |         >>> ======= /// *** |

    train_size, forecast_horizon, gap, stride, window = (4, 3, 2, 2, "expanding")
    | ======= /// *****           |
    | =========== /// *****       |
    | =============== /// *****   |
    | =================== /// *** |
    ```

    Arguments:
        frequency: The frequency (or time unit) of the time series. Must be one of "days", "seconds", "microseconds",
            "milliseconds", "minutes", "hours", "weeks". These are the only valid values for the `unit` argument of
            `timedelta` from python `datetime` standard library.
        train_size: Defines the minimum number of time units required to be in the train set.
        forecast_horizon: Specifies the number of time units to forecast.
        gap: Sets the number of time units to skip between the end of the train set and the start of the forecast set.
        stride: How many time unit to move forward after each split. If `None` (or set to 0), the stride is equal to the
            `forecast_horizon` quantity.
        window: The type of window to use, either "rolling" or "expanding".
        mode: Determines in which orders the splits are generated, either "forward" (start to end) or "backward"
            (end to start).

    Raises:
        ValueError:
            - If `frequency` is not one of "days", "seconds", "microseconds", "milliseconds", "minutes", "hours",
            "weeks".
            - If `window` is not one of "rolling" or "expanding".
            - If `mode` is not one of "forward" or "backward"
            - If `train_size`, `forecast_horizon`, `gap` or `stride` are not strictly positive.
        TypeError: If `train_size`, `forecast_horizon`, `gap` or `stride` are not of type `int`.


    Examples:
        ```python
        # Let's first generate some data
        import pandas as pd
        import numpy as np

        RNG = np.random.default_rng(seed=42)

        dates = pd.Series(pd.date_range("2023-01-01", "2023-01-31", freq="D"))
        size = len(dates)

        df = (
            pd.concat(
                [
                    pd.DataFrame(
                        {
                            "time": pd.date_range(start, end, periods=_size, inclusive="left"),
                            "a": RNG.normal(size=_size - 1),
                            "b": RNG.normal(size=_size - 1),
                        }
                    )
                    for start, end, _size in zip(dates[:-1], dates[1:], RNG.integers(2, 24, size - 1))
                ]
            )
            .reset_index(drop=True)
            .assign(y=lambda t: t[["a", "b"]].sum(axis=1) + RNG.normal(size=t.shape[0]) / 25)
        )

        df.set_index("time").resample("D").agg(count=("y", np.size)).head(5)
        ```

        ```terminal
                    count
        time
        2023-01-01      2
        2023-01-02     18
        2023-01-03     15
        2023-01-04     10
        2023-01-05     10
        ```

        Now let's run split the data with the provided `TimeBasedSplit` instance:

        ```py
        from timebasedcv import TimeBasedSplit


        tbs = TimeBasedSplit(
            frequency="days",
            train_size=10,
            forecast_horizon=5,
            gap=1,
            stride=3
        )
        X, y, time_series = df.loc[:, ["a", "b"]], df["y"], df["time"]

        for X_train, X_forecast, y_train, y_forecast in tbs.split(X, y, time_series=time_series):
            print(f"Train: {X_train.shape}, Forecast: {X_forecast.shape}")
        ```

        ```terminal
        Train: (100, 2), Forecast: (51, 2)
        Train: (114, 2), Forecast: (50, 2)
        ...
        Train: (124, 2), Forecast: (40, 2)
        Train: (137, 2), Forecast: (22, 2)
        ```
    """

    @overload
    def split(
        self: Self,
        *arrays: TensorLikeT,
        time_series: SeriesLike[DateTimeLike],
        start_dt: NullableDatetime = None,
        end_dt: NullableDatetime = None,
        return_splitstate: Literal[False],
    ) -> Generator[Tuple[TensorLikeT, ...], None, None]: ...  # pragma: no cover

    @overload
    def split(
        self: Self,
        *arrays: TensorLikeT,
        time_series: SeriesLike[DateTimeLike],
        start_dt: NullableDatetime = None,
        end_dt: NullableDatetime = None,
        return_splitstate: Literal[True],
    ) -> Generator[Tuple[Tuple[TensorLikeT, ...], SplitState], None, None]: ...  # pragma: no cover

    @overload
    def split(
        self: Self,
        *arrays: TensorLikeT,
        time_series: SeriesLike[DateTimeLike],
        start_dt: NullableDatetime = None,
        end_dt: NullableDatetime = None,
        return_splitstate: bool = False,
    ) -> Generator[
        Union[Tuple[TensorLikeT, ...], Tuple[Tuple[TensorLikeT, ...], SplitState]],
        None,
        None,
    ]: ...  # pragma: no cover

    def split(
        self: Self,
        *arrays: TensorLikeT,
        time_series: SeriesLike[DateTimeLike],
        start_dt: NullableDatetime = None,
        end_dt: NullableDatetime = None,
        return_splitstate: bool = False,
    ) -> Generator[Union[Tuple[TensorLikeT, ...], Tuple[Tuple[TensorLikeT, ...], SplitState]], None, None]:
        """Returns a generator of split arrays based on the `time_series`.

        The `time_series` argument is split on split state values to create boolean masks for training - from train_
        start (included) to train_end (excluded) - and forecast - from forecast_start (included) to forecast_end
        (excluded). These masks are then used to index the arrays passed as arguments.

        The `start_dt` and `end_dt` arguments can be used to specify the start and end of the time period. If provided,
        they are used in place of the `time_series.min()` and `time_series.max()` respectively.

        This is useful because the series does not necessarely starts from the first date and/or terminates in the last
        date of the time period of interest.

        The `return_splitstate` argument can be used to return the `SplitState` instance for each split. This can be
        useful if a particular logic has to be applied only on specific cases (e.g. if first day of the week, then
        retrain a model).

        By returning the split state, the user has the freedom and flexibility to apply any logic.

        Arguments:
            *arrays: The arrays to split. Must have the same length as `time_series`.
            time_series: The time series used to create boolean mask for splits. It is not required to be sorted, but it
                must support:

                - comparison operators (with other date-like objects).
                - bitwise operators (with other boolean arrays).
                - `.min()` and `.max()` methods.
                - `.shape` attribute.
            start_dt: The start of the time period. If provided, it is used in place of the `time_series.min()`.
            end_dt: The end of the time period. If provided,it is used in place of the `time_series.max()`.
            return_splitstate: Whether to return the `SplitState` instance for each split.

                - If True, the generator yields tuples of the form `(train_forecast_arrays, split_state)`, where
                `train_forecast_arrays` is a tuple of arrays containing the training and forecast data, and
                `split_state` is a `SplitState` instance representing the current split.
                - If False, the generator yields tuples of the form `train_forecast_arrays`.

        Returns:
            A generator of tuples of arrays containing the training and forecast data.
                Each tuple corresponds to a split generated by the `TimeBasedSplit` instance. If `return_splitstate` is
                True, each tuple is of the form `(train_forecast_arrays, split_state)`, othersiwe it is of the form
                `train_forecast_arrays`.

        Raises:
            ValueError:
                - If no arrays are provided as input.
                - If the arrays provided have different lengths.
                - If the length of the time series does not match the length of the arrays.
        """
        n_arrays = len(arrays)
        if n_arrays == 0:
            msg = "At least one array required as input"
            raise ValueError(msg)

        arrays_: Tuple[Union[nw.DataFrame, nw.Series, np.ndarray], ...] = tuple(
            nw.from_native(array, eager_only=True, allow_series=True, strict=False) for array in arrays
        )
        time_series_: Union[nw.Series, np.ndarray] = nw.from_native(time_series, series_only=True, strict=False)

        ts_shape = time_series_.shape
        if len(ts_shape) != 1:
            msg = f"Time series must be 1-dimensional. Got {len(ts_shape)} dimensions."
            raise ValueError(msg)

        a0 = arrays[0]
        arr_len = a0.shape[0]

        if n_arrays > 1 and not all(a.shape[0] == arr_len for a in arrays_[1:]):
            msg = f"All arrays must have the same length. Got {[a.shape[0] for a in arrays_]}"
            raise ValueError(msg)

        if arr_len != ts_shape[0]:
            msg = f"Time series and arrays must have the same length. Got {arr_len} and {ts_shape[0]}"
            raise ValueError(msg)

        time_start, time_end = start_dt or time_series_.min(), end_dt or time_series_.max()

        if time_start >= time_end:
            msg = "`time_start` must be before `time_end`."
            raise ValueError(msg)

        _index_methods = tuple(BACKEND_TO_INDEXING_METHOD.get(str(type(a)), default_indexing_method) for a in arrays_)
        for split in self._splits_from_period(time_start, time_end):
            train_mask = (time_series_ >= split.train_start) & (time_series_ < split.train_end)
            forecast_mask = (time_series_ >= split.forecast_start) & (time_series_ < split.forecast_end)

            train_forecast_arrays = tuple(
                chain.from_iterable(
                    (
                        nw.to_native(_idx_method(_arr, train_mask), strict=False),
                        nw.to_native(_idx_method(_arr, forecast_mask), strict=False),
                    )
                    for _arr, _idx_method in zip(arrays_, _index_methods)
                ),
            )

            if return_splitstate:
                yield train_forecast_arrays, split
            else:
                yield train_forecast_arrays

split(*arrays, time_series, start_dt=None, end_dt=None, return_splitstate=False)

Returns a generator of split arrays based on the time_series.

The time_series argument is split on split state values to create boolean masks for training - from train_ start (included) to train_end (excluded) - and forecast - from forecast_start (included) to forecast_end (excluded). These masks are then used to index the arrays passed as arguments.

The start_dt and end_dt arguments can be used to specify the start and end of the time period. If provided, they are used in place of the time_series.min() and time_series.max() respectively.

This is useful because the series does not necessarely starts from the first date and/or terminates in the last date of the time period of interest.

The return_splitstate argument can be used to return the SplitState instance for each split. This can be useful if a particular logic has to be applied only on specific cases (e.g. if first day of the week, then retrain a model).

By returning the split state, the user has the freedom and flexibility to apply any logic.

Parameters:

Name Type Description Default
*arrays TensorLikeT

The arrays to split. Must have the same length as time_series.

()
time_series SeriesLike[DateTimeLike]

The time series used to create boolean mask for splits. It is not required to be sorted, but it must support:

  • comparison operators (with other date-like objects).
  • bitwise operators (with other boolean arrays).
  • .min() and .max() methods.
  • .shape attribute.
required
start_dt NullableDatetime

The start of the time period. If provided, it is used in place of the time_series.min().

None
end_dt NullableDatetime

The end of the time period. If provided,it is used in place of the time_series.max().

None
return_splitstate bool

Whether to return the SplitState instance for each split.

  • If True, the generator yields tuples of the form (train_forecast_arrays, split_state), where train_forecast_arrays is a tuple of arrays containing the training and forecast data, and split_state is a SplitState instance representing the current split.
  • If False, the generator yields tuples of the form train_forecast_arrays.
False

Returns:

Type Description
Generator[Union[Tuple[TensorLikeT, ...], Tuple[Tuple[TensorLikeT, ...], SplitState]], None, None]

A generator of tuples of arrays containing the training and forecast data. Each tuple corresponds to a split generated by the TimeBasedSplit instance. If return_splitstate is True, each tuple is of the form (train_forecast_arrays, split_state), othersiwe it is of the form train_forecast_arrays.

Raises:

Type Description
ValueError
  • If no arrays are provided as input.
  • If the arrays provided have different lengths.
  • If the length of the time series does not match the length of the arrays.
Source code in timebasedcv/core.py
def split(
    self: Self,
    *arrays: TensorLikeT,
    time_series: SeriesLike[DateTimeLike],
    start_dt: NullableDatetime = None,
    end_dt: NullableDatetime = None,
    return_splitstate: bool = False,
) -> Generator[Union[Tuple[TensorLikeT, ...], Tuple[Tuple[TensorLikeT, ...], SplitState]], None, None]:
    """Returns a generator of split arrays based on the `time_series`.

    The `time_series` argument is split on split state values to create boolean masks for training - from train_
    start (included) to train_end (excluded) - and forecast - from forecast_start (included) to forecast_end
    (excluded). These masks are then used to index the arrays passed as arguments.

    The `start_dt` and `end_dt` arguments can be used to specify the start and end of the time period. If provided,
    they are used in place of the `time_series.min()` and `time_series.max()` respectively.

    This is useful because the series does not necessarely starts from the first date and/or terminates in the last
    date of the time period of interest.

    The `return_splitstate` argument can be used to return the `SplitState` instance for each split. This can be
    useful if a particular logic has to be applied only on specific cases (e.g. if first day of the week, then
    retrain a model).

    By returning the split state, the user has the freedom and flexibility to apply any logic.

    Arguments:
        *arrays: The arrays to split. Must have the same length as `time_series`.
        time_series: The time series used to create boolean mask for splits. It is not required to be sorted, but it
            must support:

            - comparison operators (with other date-like objects).
            - bitwise operators (with other boolean arrays).
            - `.min()` and `.max()` methods.
            - `.shape` attribute.
        start_dt: The start of the time period. If provided, it is used in place of the `time_series.min()`.
        end_dt: The end of the time period. If provided,it is used in place of the `time_series.max()`.
        return_splitstate: Whether to return the `SplitState` instance for each split.

            - If True, the generator yields tuples of the form `(train_forecast_arrays, split_state)`, where
            `train_forecast_arrays` is a tuple of arrays containing the training and forecast data, and
            `split_state` is a `SplitState` instance representing the current split.
            - If False, the generator yields tuples of the form `train_forecast_arrays`.

    Returns:
        A generator of tuples of arrays containing the training and forecast data.
            Each tuple corresponds to a split generated by the `TimeBasedSplit` instance. If `return_splitstate` is
            True, each tuple is of the form `(train_forecast_arrays, split_state)`, othersiwe it is of the form
            `train_forecast_arrays`.

    Raises:
        ValueError:
            - If no arrays are provided as input.
            - If the arrays provided have different lengths.
            - If the length of the time series does not match the length of the arrays.
    """
    n_arrays = len(arrays)
    if n_arrays == 0:
        msg = "At least one array required as input"
        raise ValueError(msg)

    arrays_: Tuple[Union[nw.DataFrame, nw.Series, np.ndarray], ...] = tuple(
        nw.from_native(array, eager_only=True, allow_series=True, strict=False) for array in arrays
    )
    time_series_: Union[nw.Series, np.ndarray] = nw.from_native(time_series, series_only=True, strict=False)

    ts_shape = time_series_.shape
    if len(ts_shape) != 1:
        msg = f"Time series must be 1-dimensional. Got {len(ts_shape)} dimensions."
        raise ValueError(msg)

    a0 = arrays[0]
    arr_len = a0.shape[0]

    if n_arrays > 1 and not all(a.shape[0] == arr_len for a in arrays_[1:]):
        msg = f"All arrays must have the same length. Got {[a.shape[0] for a in arrays_]}"
        raise ValueError(msg)

    if arr_len != ts_shape[0]:
        msg = f"Time series and arrays must have the same length. Got {arr_len} and {ts_shape[0]}"
        raise ValueError(msg)

    time_start, time_end = start_dt or time_series_.min(), end_dt or time_series_.max()

    if time_start >= time_end:
        msg = "`time_start` must be before `time_end`."
        raise ValueError(msg)

    _index_methods = tuple(BACKEND_TO_INDEXING_METHOD.get(str(type(a)), default_indexing_method) for a in arrays_)
    for split in self._splits_from_period(time_start, time_end):
        train_mask = (time_series_ >= split.train_start) & (time_series_ < split.train_end)
        forecast_mask = (time_series_ >= split.forecast_start) & (time_series_ < split.forecast_end)

        train_forecast_arrays = tuple(
            chain.from_iterable(
                (
                    nw.to_native(_idx_method(_arr, train_mask), strict=False),
                    nw.to_native(_idx_method(_arr, forecast_mask), strict=False),
                )
                for _arr, _idx_method in zip(arrays_, _index_methods)
            ),
        )

        if return_splitstate:
            yield train_forecast_arrays, split
        else:
            yield train_forecast_arrays

timebasedcv.core.ExpandingTimeSplit

Bases: TimeBasedSplit

Alias for TimeBasedSplit(..., window="expanding").

Source code in timebasedcv/core.py
class ExpandingTimeSplit(TimeBasedSplit):  # pragma: no cover
    """Alias for `TimeBasedSplit(..., window="expanding")`."""

    def __init__(  # noqa: PLR0913
        self: Self,
        *,
        frequency: FrequencyUnit,
        train_size: int,
        forecast_horizon: int,
        gap: int = 0,
        stride: Union[int, None] = None,
        mode: ModeType,
    ) -> None:
        super().__init__(
            frequency=frequency,
            train_size=train_size,
            forecast_horizon=forecast_horizon,
            gap=gap,
            stride=stride,
            window="expanding",
            mode=mode,
        )

timebasedcv.core.RollingTimeSplit

Bases: TimeBasedSplit

Alias for TimeBasedSplit(..., window="rolling").

Source code in timebasedcv/core.py
class RollingTimeSplit(TimeBasedSplit):  # pragma: no cover
    """Alias for `TimeBasedSplit(..., window="rolling")`."""

    def __init__(  # noqa: PLR0913
        self: Self,
        *,
        frequency: FrequencyUnit,
        train_size: int,
        forecast_horizon: int,
        gap: int = 0,
        stride: Union[int, None] = None,
        mode: ModeType,
    ) -> None:
        super().__init__(
            frequency=frequency,
            train_size=train_size,
            forecast_horizon=forecast_horizon,
            gap=gap,
            stride=stride,
            window="rolling",
            mode=mode,
        )