Files
entrix_case_challange/forecasting_model/utils/dataset_splitter.py
2025-05-06 09:11:36 +02:00

235 lines
13 KiB
Python

from typing import Generator, Tuple, List
import numpy as np
import torch
from torch.utils.data import Dataset
from forecasting_model.utils.helper import logger
from forecasting_model.utils import CrossValidationConfig
class TimeSeriesCrossValidationSplitter:
"""
Generates indices for time series cross-validation using a rolling (sliding) window.
The training window has a fixed size. For each split, the entire window
(train, validation, and test sets) slides forward by a specified step size
(typically the size of the test set). Validation and test set sizes are
calculated as fractions of the fixed training window size.
"""
def __init__(self, config: CrossValidationConfig, n_samples: int):
"""
Args:
config: CrossValidationConfig with split parameters.
n_samples: Total number of samples in the dataset.
"""
self.n_splits = config.n_splits
self.val_frac = config.val_size_fraction
self.test_frac = config.test_size_fraction
self.initial_train_size = config.initial_train_size # Used as the FIXED train size for rolling window
self.n_samples = n_samples
if not (0 < self.val_frac < 1):
raise ValueError(f"val_size_fraction must be between 0 and 1, got {self.val_frac}")
if not (0 < self.test_frac < 1):
raise ValueError(f"test_size_fraction must be between 0 and 1, got {self.test_frac}")
if self.n_splits <= 0:
raise ValueError(f"n_splits must be positive, got {self.n_splits}")
logger.info(f"Initializing TimeSeriesCrossValidationSplitter (Rolling Window): n_splits={self.n_splits}, "
f"val_frac={self.val_frac}, test_frac={self.test_frac}, initial_train_size (fixed)={self.initial_train_size}") # Clarified log
def _calculate_initial_train_size(self) -> int:
"""Determines the fixed training window size based on config or estimation."""
# Check if integer is provided
if isinstance(self.initial_train_size, int) and self.initial_train_size > 0:
if self.initial_train_size >= self.n_samples:
raise ValueError(f"initial_train_size ({self.initial_train_size}) must be less than total samples ({self.n_samples})")
logger.info(f"Using specified fixed training window size: {self.initial_train_size}")
return self.initial_train_size
# Check if float/fraction is provided
elif isinstance(self.initial_train_size, float) and 0 < self.initial_train_size < 1:
calculated_size = int(self.n_samples * self.initial_train_size)
if calculated_size <= 0:
raise ValueError("initial_train_size fraction results in non-positive size.")
logger.info(f"Using fixed training window size calculated from fraction: {calculated_size}")
return calculated_size
# Estimate if None
elif self.initial_train_size is None:
logger.info("Estimating fixed train size based on n_splits, val_frac, test_frac.")
# Estimate based on the total space needed for all splits:
# n_samples >= fixed_train_n + val_size + test_size + (n_splits - 1) * step_size
# n_samples >= fixed_train_n + int(fixed_train_n*val_frac) + n_splits * int(fixed_train_n*test_frac)
# n_samples >= fixed_train_n * (1 + val_frac + n_splits * test_frac)
# fixed_train_n <= n_samples / (1 + val_frac + n_splits * test_frac)
denominator = 1.0 + self.val_frac + self.n_splits * self.test_frac
if denominator <= 1.0: # Avoid division by zero or non-positive, and ensure train frac < 1
raise ValueError(f"Cannot estimate initial_train_size. Combination of val_frac ({self.val_frac}), "
f"test_frac ({self.test_frac}), and n_splits ({self.n_splits}) is invalid (denominator {denominator:.2f} <= 1.0).")
estimated_size = int(self.n_samples / denominator)
# Add a sanity check: ensure estimated size is reasonably large
min_required_for_features = 1 # Placeholder - ideally get from FeatureConfig if possible, but complex here
if estimated_size < min_required_for_features:
raise ValueError(f"Estimated fixed train size ({estimated_size}) is too small. "
f"Check CV config (n_splits={self.n_splits}, val_frac={self.val_frac}, test_frac={self.test_frac}) "
f"relative to total samples ({self.n_samples}). Consider specifying initial_train_size manually.")
logger.info(f"Estimated fixed training window size: {estimated_size}")
return estimated_size
else:
raise ValueError(f"Invalid initial_train_size type or value: {self.initial_train_size}")
def split(self) -> Generator[Tuple[np.ndarray, np.ndarray, np.ndarray], None, None]:
"""
Generate train/validation/test indices for each fold using a rolling window.
Pre-calculates the number of possible splits based on data size and window parameters.
Yields:
Tuple of (train_indices, val_indices, test_indices) for each fold.
Raises:
ValueError: If parameters lead to invalid split sizes or overlaps,
or if the data is too small for the configuration.
"""
indices = np.arange(self.n_samples)
fixed_train_n = self._calculate_initial_train_size() # This is now the fixed size
# Calculate val/test sizes based on the *fixed* training size. Min size of 1.
val_size = max(1, int(fixed_train_n * self.val_frac))
test_size = max(1, int(fixed_train_n * self.test_frac))
# Calculate the total size of one complete train+val+test window
fold_window_size = fixed_train_n + val_size + test_size
# Check if even the first window fits
if fold_window_size > self.n_samples:
raise ValueError(f"Configuration Error: The total window size (Train {fixed_train_n} + Val {val_size} + Test {test_size} = {fold_window_size}) "
f"exceeds total samples ({self.n_samples}). Decrease initial_train_size, fractions, or increase data.")
# Determine the step size (how much the window slides)
# Default: slide by the test set size for contiguous, non-overlapping test periods
step_size = test_size
if step_size <= 0:
raise ValueError(f"Step size (derived from test_size {test_size}) must be positive.")
# --- Calculate the number of splits actually possible ---
# Last possible start index for the train set
last_possible_train_start_idx = self.n_samples - fold_window_size
# Calculate how many steps fit within this range (integer division)
# If last possible start is 5, step is 2: steps possible at 0, 2, 4 => (5 // 2) + 1 = 2 + 1 = 3
num_possible_steps = max(0, last_possible_train_start_idx // step_size) + 1 # +1 because we start at index 0
# Use the minimum of requested splits and possible splits
actual_n_splits = min(self.n_splits, num_possible_steps)
if actual_n_splits < self.n_splits:
logger.warning(f"Data size ({self.n_samples} samples) only allows for {actual_n_splits} splits "
f"with fixed train size {fixed_train_n}, val size {val_size}, test size {test_size} (total window {fold_window_size}) and step size {step_size} "
f"(requested {self.n_splits}).")
elif actual_n_splits == 0:
# This case should be caught by the fold_window_size > self.n_samples check, but belt-and-suspenders
logger.error("Data too small for even one split with the rolling window configuration.")
return # Return generator that yields nothing
# --- Generate the splits ---
for i in range(actual_n_splits):
logger.debug(f"Generating indices for fold {i+1}/{actual_n_splits} (Rolling Window)") # Log using actual_n_splits
# Calculate window boundaries for this fold
train_start_idx = i * step_size
train_end_idx = train_start_idx + fixed_train_n
val_start_idx = train_end_idx
val_end_idx = val_start_idx + val_size
test_start_idx = val_end_idx
test_end_idx = test_start_idx + test_size # = train_start_idx + fold_window_size
# Determine indices for this fold using slicing
train_indices = indices[train_start_idx:train_end_idx]
val_indices = indices[val_start_idx:val_end_idx]
test_indices = indices[test_start_idx:test_end_idx]
# --- Basic Validation Checks (Optional, should be guaranteed by calculations) ---
# Ensure no overlap (guaranteed by slicing if sizes > 0)
# Ensure sequence (guaranteed by slicing)
logger.info(f"Fold {i+1}: Train indices {train_indices[0]}-{train_indices[-1]} (size {len(train_indices)}), "
f"Val indices {val_indices[0]}-{val_indices[-1]} (size {len(val_indices)}), "
f"Test indices {test_indices[0]}-{test_indices[-1]} (size {len(test_indices)})")
yield train_indices, val_indices, test_indices
class TimeSeriesDataset(Dataset):
"""
PyTorch Dataset for time series forecasting.
Takes a NumPy array (features + target), sequence length, and a list of
specific forecast horizons. Returns (input_sequence, target_vector) tuples,
where target_vector contains the target values at the specified future steps.
"""
def __init__(self, data_array: np.ndarray, sequence_length: int, forecast_horizon: List[int], target_col_index: int = 0):
"""
Args:
data_array: Numpy array of shape (n_samples, n_features).
Assumes the target variable is one of the columns.
sequence_length: Length of the input sequence (lookback window).
forecast_horizon: List of specific steps ahead to predict (e.g., [1, 6, 12]).
target_col_index: Index of the target column in data_array. Defaults to 0.
"""
if sequence_length <= 0:
raise ValueError("sequence_length must be positive.")
if not forecast_horizon or not isinstance(forecast_horizon, list) or any(h <= 0 for h in forecast_horizon):
raise ValueError("forecast_horizon must be a non-empty list of positive integers.")
if data_array.ndim != 2:
raise ValueError(f"data_array must be 2D, but got shape {data_array.shape}")
self.max_horizon = max(forecast_horizon) # Find the furthest point needed
min_len_required = sequence_length + self.max_horizon
if min_len_required > data_array.shape[0]:
raise ValueError(f"sequence_length ({sequence_length}) + max_horizon ({self.max_horizon}) = {min_len_required} "
f"exceeds total samples provided ({data_array.shape[0]})")
if not (0 <= target_col_index < data_array.shape[1]):
raise ValueError(f"target_col_index ({target_col_index}) out of bounds for data with {data_array.shape[1]} columns.")
self.data = torch.tensor(data_array, dtype=torch.float32)
self.sequence_length = sequence_length
self.forecast_horizon_list = sorted(forecast_horizon)
self.target_col_index = target_col_index
self.n_samples = data_array.shape[0]
self.n_features = data_array.shape[1]
logger.debug(f"TimeSeriesDataset created: data shape={self.data.shape}, "
f"seq_len={self.sequence_length}, forecast_horizons={self.forecast_horizon_list}, "
f"max_horizon={self.max_horizon}, target_idx={self.target_col_index}")
def __len__(self) -> int:
"""Returns the total number of sequences that can be generated."""
return self.n_samples - self.sequence_length - self.max_horizon + 1
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Returns a single (input_sequence, target_vector) pair.
Target vector contains values for the specified forecast horizons.
"""
if not (0 <= idx < len(self)):
raise IndexError(f"Index {idx} out of bounds for dataset with length {len(self)}")
input_start = idx
input_end = idx + self.sequence_length
input_sequence = self.data[input_start:input_end, :] # Shape: (seq_len, n_features)
# Calculate indices for each horizon relative to the end of the input sequence
# Horizon h corresponds to index: input_end + h - 1
target_indices = [input_end + h - 1 for h in self.forecast_horizon_list]
target_vector = self.data[target_indices, self.target_col_index] # Shape: (len(forecast_horizon_list),)
return input_sequence, target_vector