Published on

Cross-Validation in Finance, Challenges and Solutions

Cross-Validation in Finance: Challenges and Solutions

The Shortcomings of Ordinary Cross-Validation in Finance

In traditional settings, cross-validation is an effective tool for evaluating a machine learning model's performance. However, the complexities of financial data pose unique challenges:

  1. Data Dependency: Financial observations are often not independently and identically distributed (IID), contradicting a key assumption of cross-validation.

  2. Repeated Testing: Using the test set multiple times during model development can lead to selection bias.

  3. Data Leakage: This occurs when training and testing datasets share information, affecting the model's predictive accuracy.

K-Fold Cross-Validation: A Closer Look

In k-fold cross-validation, the data is partitioned into kk subsets. One subset is used for validation, while the rest are used for training. This is repeated kk times, and the performance metrics are averaged.

kfoldcv

Overcoming Challenges: Purging and Embargo

Purging

To mitigate the issue of data leakage, one solution is "purging." Purging involves eliminating observations from the training set that have labels overlapping in time with those in the testing set.

purging

Embargo

An additional step, known as "embargo," can be implemented to further eliminate data leakage. This involves excluding observations from the training set that immediately follow an observation in the testing set.

purged_kfold.py
class PurgedKFold(CrossValidator):
@staticmethod
    def filtered_training_indices_with_embargo(
        data_info_range: pd.Series,
        test_time_range: pd.Series,
        embargo_fraction: float = 0,
        continous_test_times: bool = False,
    ) -> pd.Series:
        indices_to_drop: Set[int] = set()
        embargo_length = int(len(data_info_range) * embargo_fraction)
        sorted_test_time_range = test_time_range.sort_index().copy()

        if not continous_test_times:
            sorted_test_time_range = pd.DataFrame({
                'start' : sorted_test_time_range.index,
                'end' : sorted_test_time_range.values
            })
            # Identify where the new range starts immediately after or before the previous range ends
            gaps = sorted_test_time_range['start'] > sorted_test_time_range['end'].shift(1)
            # Cumulative sum to identify contiguous blocks
            blocks = gaps.cumsum()
            # Aggregate to find the min start and max end for each block
            effective_test_time_range = sorted_test_time_range.groupby(blocks).agg({'start': 'min', 'end': 'max'})
            effective_test_time_range = pd.Series(effective_test_time_range['end'].values, index=effective_test_time_range['start'])

        else:
            effective_test_time_range = pd.Series(sorted_test_time_range.iloc[-1], index=[sorted_test_time_range.index[0]])

        if embargo_length == 0:
            embargoed_data_info_range = pd.Series(effective_test_time_range.values, index=effective_test_time_range.values)

        else:
            effective_sample = data_info_range.loc[effective_test_time_range.index.min():].copy().drop_duplicates()
            embargoed_data_info_range = pd.Series(effective_sample.values, index=effective_sample.values)
            embargoed_data_info_range = embargoed_data_info_range.shift(-embargo_length).fillna(embargoed_data_info_range.values[-1])   

        effective_ranges = pd.Series(embargoed_data_info_range.loc[effective_test_time_range].values, index=effective_test_time_range.index)

        for start_ix, end_ix_embargoed in effective_ranges.items():

            indices_to_drop.update(
                data_info_range[
                    ((start_ix <= data_info_range.index) & (data_info_range.index <= end_ix_embargoed)) |
                    ((start_ix <= data_info_range) & (data_info_range <= end_ix_embargoed)) |
                    ((data_info_range.index <= start_ix) & (end_ix_embargoed <= data_info_range))
                ].index
            )

        return data_info_range.drop(indices_to_drop)

View More: Julia | Python

Purged K-Fold Class in RiskLabAI

When building a machine learning model, it's essential to avoid data leakage between the training and test sets. The Purged K-Fold method in RiskLabAI is designed for this purpose. It takes into account parameters like the number of K-Fold splits, observation times, and the size of the embargo.

purged_kfold.py
class PurgedKFold(CrossValidator):
    def __init__(
            self,
            n_splits: int,
            times: Union[pd.Series, Dict[str, pd.Series]],
            embargo: float = 0,
    ) -> None:
        self.n_splits = n_splits
        self.times = times
        self.embargo = embargo
        self.is_multiple_datasets = isinstance(times, dict)

View More: Julia | Python

These functionalities are available in both Python and Julia in the RiskLabAI library.

References

  1. De Prado, M. L. (2018). Advances in financial machine learning. John Wiley & Sons.
  2. De Prado, M. M. L. (2020). Machine learning for asset managers. Cambridge University Press.