Files
entrix_case_challange/forecasting_model/data_processing.py
2025-05-02 14:36:19 +02:00

751 lines
40 KiB
Python

import logging
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from typing import Tuple, Generator, List, Optional, Union, Dict, Literal, Type
# Use relative import for utils within the package
from .utils.config_model import DataConfig, FeatureConfig, TrainingConfig, EvaluationConfig, CrossValidationConfig
# Optional: Import wavelet library if needed later
# import pywt
logger = logging.getLogger(__name__)
# --- Data Loading ---
def load_raw_data(config: DataConfig) -> pd.DataFrame:
"""
Load raw time series data from a CSV file, handling specific formats,
performing initial cleaning, frequency checks, and NaN filling based on config.
Args:
config: DataConfig object containing file path, raw/standard column names,
frequency settings, and NaN handling flags.
Returns:
DataFrame with a standardized datetime index (named config.datetime_col)
and a standardized, cleaned target column (named config.target_col).
Raises:
FileNotFoundError: If the data path does not exist.
ValueError: If specified raw columns are not found, datetime parsing fails,
or frequency checks indicate critical issues.
Exception: For other pandas read_csv or processing errors.
"""
logger.info(f"Loading raw data from: {config.data_path}")
try:
# --- Initial Load ---
df = pd.read_csv(config.data_path, header=0)
logger.debug(f"Loaded raw data shape: {df.shape}")
# --- Validate Raw Columns ---
if config.raw_datetime_col not in df.columns:
raise ValueError(f"Raw datetime column '{config.raw_datetime_col}' not found in {config.data_path}")
if config.raw_target_col not in df.columns:
raise ValueError(f"Raw target column '{config.raw_target_col}' not found in {config.data_path}")
# --- Time Parsing (Specific Format Handling) ---
logger.info(f"Parsing raw datetime column: '{config.raw_datetime_col}'")
try:
# Extract the start time part 'dd.mm.yyyy hh:mm'
# Handle potential errors during split if format deviates
start_times = df[config.raw_datetime_col].astype(str).str.split(' - ', expand=True)[0]
# Define the specific format
datetime_format = config.raw_datetime_format or '%d.%m.%Y %H:%M'
# Parse to datetime, coercing errors to NaT
parsed_timestamps = pd.to_datetime(start_times, format=datetime_format, errors='coerce')
except Exception as e:
logger.error(f"Failed to split or parse raw datetime column '{config.raw_datetime_col}' using expected format: {e}", exc_info=True)
raise ValueError("Datetime parsing failed. Check raw_datetime_col format and data.")
# Check for parsing errors (NaT values)
num_parsing_errors = parsed_timestamps.isnull().sum()
if num_parsing_errors > 0:
original_len = len(df)
df = df.loc[parsed_timestamps.notnull()].copy() # Keep only rows with valid timestamps
parsed_timestamps = parsed_timestamps.dropna()
logger.warning(f"Dropped {num_parsing_errors} rows ({num_parsing_errors/original_len:.1%}) due to timestamp parsing errors "
f"(expected format: '{datetime_format}' on start time).")
if df.empty:
raise ValueError("No valid timestamps found after parsing. Check data format.")
# Assign parsed timestamp and set as index with standardized name
df[config.datetime_col] = parsed_timestamps
df = df.set_index(config.datetime_col)
logger.debug(f"Set '{config.datetime_col}' as index.")
# --- Target Column Processing ---
logger.info(f"Processing target column: '{config.raw_target_col}' -> '{config.target_col}'")
# Convert raw target to numeric, coercing errors
df[config.target_col] = pd.to_numeric(df[config.raw_target_col], errors='coerce')
# Handle NaNs caused by coercion
num_coercion_errors = df[config.target_col].isnull().sum()
if num_coercion_errors > 0:
logger.warning(f"Found {num_coercion_errors} non-numeric values in raw target column '{config.raw_target_col}'. Coerced to NaN.")
# Keep rows with NaN for now, handle based on config flag below
# --- Column Selection ---
# Keep only the standardized target column for the forecasting pipeline
# Discard raw columns and any others loaded initially
df = df[[config.target_col]]
logger.debug(f"Selected target column '{config.target_col}'. Shape: {df.shape}")
# --- Initial Target NaN Filling (Optional) ---
if config.fill_initial_target_nans:
missing_prices = df[config.target_col].isnull().sum()
if missing_prices > 0:
logger.info(f"Found {missing_prices} missing values in target column '{config.target_col}'. Applying ffill then bfill.")
df[config.target_col] = df[config.target_col].ffill()
df[config.target_col] = df[config.target_col].bfill() # Fill remaining NaNs at the start
final_missing = df[config.target_col].isnull().sum()
if final_missing > 0:
logger.error(f"{final_missing} missing values REMAIN in target column after ffill/bfill. Cannot proceed.")
raise ValueError("Target column contains unfillable NaN values.")
else:
logger.debug("No missing values found in target column.")
else:
logger.info("Skipping initial NaN filling for target column as per config.")
# Warning if NaNs exist and aren't being filled here
if df[config.target_col].isnull().any():
logger.warning(f"NaNs exist in target column '{config.target_col}' and initial filling is disabled.")
# --- Frequency Check & Setting ---
logger.info("Checking time index frequency...")
df = df.sort_index() # Ensure index is sorted before frequency checks
# Handle duplicate timestamps before frequency inference
duplicates = df.index.duplicated().sum()
if duplicates > 0:
logger.warning(f"Found {duplicates} duplicate timestamps. Keeping the first occurrence.")
df = df[~df.index.duplicated(keep='first')]
if config.expected_frequency:
inferred_freq = pd.infer_freq(df.index)
logger.debug(f"Inferred frequency: {inferred_freq}")
if inferred_freq == config.expected_frequency:
logger.info(f"Inferred frequency matches expected ('{config.expected_frequency}'). Setting index frequency.")
df = df.asfreq(config.expected_frequency)
# Check for NaNs introduced by asfreq (filling gaps)
missing_after_asfreq = df[config.target_col].isnull().sum()
if missing_after_asfreq > 0:
logger.warning(f"{missing_after_asfreq} NaNs appeared after setting frequency to '{config.expected_frequency}'. Applying ffill/bfill.")
# Only fill if initial filling was also enabled, otherwise just warn? Be explicit.
if config.fill_initial_target_nans:
df[config.target_col] = df[config.target_col].ffill().bfill()
if df[config.target_col].isnull().any():
logger.error("NaNs still present after attempting to fill gaps from asfreq. Check data continuity.")
raise ValueError("Unfillable NaNs after setting frequency.")
else:
logger.warning("Initial NaN filling was disabled, leaving NaNs introduced by asfreq.")
elif inferred_freq:
logger.warning(f"Inferred frequency ('{inferred_freq}') does NOT match expected ('{config.expected_frequency}'). Index frequency will not be explicitly set. This might affect time-based features or models assuming regular intervals.")
# Consider raising an error depending on strictness needed
# raise ValueError("Inferred frequency does not match expected frequency.")
else:
logger.error(f"Could not infer frequency, but expected frequency was set to '{config.expected_frequency}'. Check data for gaps or irregularities. Index frequency will not be explicitly set.")
# This is often a critical issue for time series models
raise ValueError("Could not infer frequency. Ensure data has regular intervals matching expected_frequency.")
else:
logger.info("No expected frequency specified in config. Skipping frequency check and setting.")
logger.info(f"Data loading and initial preparation complete. Final shape: {df.shape}")
return df
except FileNotFoundError:
logger.error(f"Data file not found at: {config.data_path}")
raise
except ValueError as e: # Catch ValueErrors raised internally or by pandas
logger.error(f"Data processing error: {e}", exc_info=True)
raise
except Exception as e: # Catch other unexpected errors
logger.error(f"Failed to load or process data from {config.data_path}: {e}", exc_info=True)
raise
# --- Feature Engineering ---
def engineer_features(df: pd.DataFrame, target_col: str, feature_config: FeatureConfig) -> pd.DataFrame:
"""
Create time-series features from the target column and datetime index.
This function operates on a specific slice of data provided during
cross-validation setup.
Args:
df: DataFrame containing the target column and a datetime index.
Should contain enough history for lookbacks (lags, rolling windows).
target_col: The name of the column to engineer features from.
feature_config: Configuration object specifying which features to create.
Returns:
DataFrame with original target and engineered features.
Raises:
ValueError: If target_col is not in df or configuration is invalid.
ImportError: If wavelets are requested but pywt is not installed.
"""
if target_col not in df.columns:
raise ValueError(f"Target column '{target_col}' not found in DataFrame for feature engineering.")
logger.info("Starting feature engineering...")
features_df = df[[target_col]].copy() # Start with the target
# 1. Lags
if feature_config.lags:
logger.debug(f"Creating lag features for lags: {feature_config.lags}")
for lag in feature_config.lags:
if lag <= 0:
logger.warning(f"Ignoring non-positive lag value: {lag}")
continue
features_df[f'{target_col}_lag_{lag}'] = df[target_col].shift(lag)
# 2. Rolling Window Statistics
if feature_config.rolling_window_sizes:
logger.debug(f"Creating rolling window features for sizes: {feature_config.rolling_window_sizes}")
for window in feature_config.rolling_window_sizes:
if window <= 0:
logger.warning(f"Ignoring non-positive rolling window size: {window}")
continue
# Shift by 1 so the window does not include the current observation
# Use closed='left' to ensure window ends *before* the current point
rolling_obj = df[target_col].shift(1).rolling(window=window, min_periods=window // 2, closed='left')
features_df[f'{target_col}_rolling_mean_{window}'] = rolling_obj.mean()
features_df[f'{target_col}_rolling_std_{window}'] = rolling_obj.std()
# Add more stats if needed (e.g., min, max, median)
# 3. Time/Calendar Features
if feature_config.use_time_features:
logger.debug("Creating time/calendar features.")
idx = features_df.index # Use index from features_df
features_df['hour'] = idx.hour
features_df['dayofweek'] = idx.dayofweek
features_df['dayofmonth'] = idx.day
features_df['dayofyear'] = idx.dayofyear
# Ensure 'weekofyear' is Int64 to handle potential NAs if index isn't perfectly continuous (though unlikely here)
features_df['weekofyear'] = idx.isocalendar().week.astype(pd.Int64Dtype()) # pandas >= 1.1.0
features_df['month'] = idx.month
features_df['year'] = idx.year
features_df['quarter'] = idx.quarter
features_df['is_weekend'] = (idx.dayofweek >= 5).astype(int)
# 4. Sinusoidal Time Features (Optional, based on config)
if feature_config.sinus_curve:
logger.debug("Creating sinusoidal daily time feature.")
seconds_in_day = 24 * 60 * 60
seconds_past_midnight = features_df.index.hour * 3600 + features_df.index.minute * 60 + features_df.index.second
features_df['sin_day'] = np.sin(2 * np.pi * seconds_past_midnight / seconds_in_day)
if feature_config.cosin_curve: # Assuming this means cos for day
logger.debug("Creating cosinusoidal daily time feature.")
seconds_in_day = 24 * 60 * 60
seconds_past_midnight = features_df.index.hour * 3600 + features_df.index.minute * 60 + features_df.index.second
features_df['cos_day'] = np.cos(2 * np.pi * seconds_past_midnight / seconds_in_day)
# 5. Wavelet Transform (Optional)
if feature_config.wavelet_transform and feature_config.wavelet_transform.apply:
logger.warning("Wavelet feature engineering is specified but not implemented yet.")
# 6. Handling NaNs generated during feature engineering (for *generated* features)
feature_cols_generated = [col for col in features_df.columns if col != target_col]
if feature_cols_generated: # Only fill NaNs if features were actually generated
nan_handler = feature_config.fill_nan
if nan_handler is not None:
fill_value: Optional[Union[str, float]] = None
fill_method: Optional[str] = None
if isinstance(nan_handler, str):
if nan_handler in ['ffill', 'bfill']:
fill_method = nan_handler
logger.debug(f"Filling NaNs in generated features using method: '{fill_method}'")
elif nan_handler == 'mean':
logger.warning("NaN filling with 'mean' in generated features is applied globally here;"
" consider per-fold mean filling if lookahead is a concern.")
# Calculate mean only on the slice provided, potentially leaking info if slice includes val/test
# Better to use ffill/bfill here or handle after split
fill_value = features_df[feature_cols_generated].mean() # Calculate mean per feature column
logger.debug("Filling NaNs in generated features using column means.")
else:
logger.warning(f"Unsupported string fill_nan method '{nan_handler}' for generated features. Using 'ffill'.")
fill_method = 'ffill'
elif isinstance(nan_handler, (int, float)):
fill_value = float(nan_handler)
logger.debug(f"Filling NaNs in generated features with value: {fill_value}")
else:
logger.warning(f"Invalid fill_nan type: {type(nan_handler)}. NaNs in features may remain.")
# Apply filling only to generated feature columns
if fill_method:
features_df[feature_cols_generated] = features_df[feature_cols_generated].fillna(method=fill_method)
if fill_method == 'ffill':
features_df[feature_cols_generated] = features_df[feature_cols_generated].fillna(method='bfill')
elif fill_value is not None:
# fillna with Series/dict for column-wise mean, or scalar for constant value
features_df[feature_cols_generated] = features_df[feature_cols_generated].fillna(value=fill_value)
else:
logger.warning("`fill_nan` is None. NaNs generated by feature engineering may remain.")
remaining_nans = features_df[feature_cols_generated].isnull().sum().sum()
if remaining_nans > 0:
logger.warning(f"{remaining_nans} NaN values remain in generated features.")
# 7. Clipping (Optional) - Apply *after* feature generation but *before* scaling
if feature_config.clipping and feature_config.clipping.apply: # Check nested config
clip_config = feature_config.clipping
logger.debug(f"Clipping features (excluding target '{target_col}') between {clip_config.clip_min} and {clip_config.clip_max}")
feature_cols_to_clip = [col for col in features_df.columns if col != target_col]
if not feature_cols_to_clip:
logger.warning("Clipping enabled, but no feature columns found to clip (only target exists?).")
else:
features_df[feature_cols_to_clip] = features_df[feature_cols_to_clip].clip(
lower=clip_config.clip_min, upper=clip_config.clip_max
)
logger.info(f"Feature engineering completed. DataFrame shape: {features_df.shape}")
logger.debug(f"Feature columns: {features_df.columns.tolist()}")
return features_df
# --- Cross Validation ---
class TimeSeriesCrossValidationSplitter:
"""
Generates indices for time series cross-validation using a rolling (sliding) window.
The training window has a fixed size. For each split, the entire window
(train, validation, and test sets) slides forward by a specified step size
(typically the size of the test set). Validation and test set sizes are
calculated as fractions of the fixed training window size.
"""
def __init__(self, config: CrossValidationConfig, n_samples: int):
"""
Args:
config: CrossValidationConfig with split parameters.
n_samples: Total number of samples in the dataset.
"""
self.n_splits = config.n_splits
self.val_frac = config.val_size_fraction
self.test_frac = config.test_size_fraction
self.initial_train_size = config.initial_train_size # Used as the FIXED train size for rolling window
self.n_samples = n_samples
if not (0 < self.val_frac < 1):
raise ValueError(f"val_size_fraction must be between 0 and 1, got {self.val_frac}")
if not (0 < self.test_frac < 1):
raise ValueError(f"test_size_fraction must be between 0 and 1, got {self.test_frac}")
if self.n_splits <= 0:
raise ValueError(f"n_splits must be positive, got {self.n_splits}")
logger.info(f"Initializing TimeSeriesCrossValidationSplitter (Rolling Window): n_splits={self.n_splits}, "
f"val_frac={self.val_frac}, test_frac={self.test_frac}, initial_train_size (fixed)={self.initial_train_size}") # Clarified log
def _calculate_initial_train_size(self) -> int:
"""Determines the fixed training window size based on config or estimation."""
# Check if integer is provided
if isinstance(self.initial_train_size, int) and self.initial_train_size > 0:
if self.initial_train_size >= self.n_samples:
raise ValueError(f"initial_train_size ({self.initial_train_size}) must be less than total samples ({self.n_samples})")
logger.info(f"Using specified fixed training window size: {self.initial_train_size}")
return self.initial_train_size
# Check if float/fraction is provided
elif isinstance(self.initial_train_size, float) and 0 < self.initial_train_size < 1:
calculated_size = int(self.n_samples * self.initial_train_size)
if calculated_size <= 0:
raise ValueError("initial_train_size fraction results in non-positive size.")
logger.info(f"Using fixed training window size calculated from fraction: {calculated_size}")
return calculated_size
# Estimate if None
elif self.initial_train_size is None:
min_samples_per_split_step = 2 # Heuristic minimum samples for val+test in one step
# Estimate val/test based on *potential* train size (crude)
# Assume train is roughly (1 - val - test) fraction for estimation
estimated_train_frac = max(0.1, 1.0 - self.val_frac - self.test_frac) # Ensure non-zero
estimated_train_n = int(self.n_samples * estimated_train_frac)
val_test_size_per_step = max(min_samples_per_split_step, int(estimated_train_n * (self.val_frac + self.test_frac)))
# Tentative initial train size is total minus one val/test block
fixed_train_n_est = self.n_samples - val_test_size_per_step
# Basic sanity checks
if fixed_train_n_est <= 0:
raise ValueError("Could not estimate a valid initial_train_size (<= 0). Please specify it or check CV fractions.")
# Need at least 1 sample for train, val, test each theoretically
est_val_size = max(1, int(fixed_train_n_est * self.val_frac))
est_test_size = max(1, int(fixed_train_n_est * self.test_frac))
if fixed_train_n_est + est_val_size + est_test_size > self.n_samples:
# If the simple estimate is too large, reduce it more drastically
# Try setting train size = 50% and see if val/test fit?
fixed_train_n_est = int(self.n_samples * 0.5)
est_val_size = max(1, int(fixed_train_n_est * self.val_frac))
est_test_size = max(1, int(fixed_train_n_est * self.test_frac))
if fixed_train_n_est <=0 or (fixed_train_n_est + est_val_size + est_test_size > self.n_samples):
raise ValueError("Could not estimate a valid initial_train_size. Data too small relative to val/test fractions? Please specify initial_train_size.")
logger.warning(f"initial_train_size not set, estimated fixed train size for rolling window: {fixed_train_n_est}. "
"This is a heuristic; viability depends on n_splits and step size. Validation happens in split().")
return fixed_train_n_est
else:
raise ValueError(f"Invalid initial_train_size: {self.initial_train_size}")
def split(self) -> Generator[Tuple[np.ndarray, np.ndarray, np.ndarray], None, None]:
"""
Generate train/validation/test indices for each fold using a rolling window.
Pre-calculates the number of possible splits based on data size and window parameters.
Yields:
Tuple of (train_indices, val_indices, test_indices) for each fold.
Raises:
ValueError: If parameters lead to invalid split sizes or overlaps,
or if the data is too small for the configuration.
"""
indices = np.arange(self.n_samples)
fixed_train_n = self._calculate_initial_train_size() # This is now the fixed size
# Calculate val/test sizes based on the *fixed* training size. Min size of 1.
val_size = max(1, int(fixed_train_n * self.val_frac))
test_size = max(1, int(fixed_train_n * self.test_frac))
# Calculate the total size of one complete train+val+test window
fold_window_size = fixed_train_n + val_size + test_size
# Check if even the first window fits
if fold_window_size > self.n_samples:
raise ValueError(f"Configuration Error: The total window size (Train {fixed_train_n} + Val {val_size} + Test {test_size} = {fold_window_size}) "
f"exceeds total samples ({self.n_samples}). Decrease initial_train_size, fractions, or increase data.")
# Determine the step size (how much the window slides)
# Default: slide by the test set size for contiguous, non-overlapping test periods
step_size = test_size
if step_size <= 0:
raise ValueError(f"Step size (derived from test_size {test_size}) must be positive.")
# --- Calculate the number of splits actually possible ---
# Last possible start index for the train set
last_possible_train_start_idx = self.n_samples - fold_window_size
# Calculate how many steps fit within this range (integer division)
# If last possible start is 5, step is 2: steps possible at 0, 2, 4 => (5 // 2) + 1 = 2 + 1 = 3
num_possible_steps = max(0, last_possible_train_start_idx // step_size) + 1 # +1 because we start at index 0
# Use the minimum of requested splits and possible splits
actual_n_splits = min(self.n_splits, num_possible_steps)
if actual_n_splits < self.n_splits:
logger.warning(f"Data size ({self.n_samples} samples) only allows for {actual_n_splits} splits "
f"with fixed train size {fixed_train_n}, val size {val_size}, test size {test_size} (total window {fold_window_size}) and step size {step_size} "
f"(requested {self.n_splits}).")
elif actual_n_splits == 0:
# This case should be caught by the fold_window_size > self.n_samples check, but belt-and-suspenders
logger.error("Data too small for even one split with the rolling window configuration.")
return # Return generator that yields nothing
# --- Generate the splits ---
for i in range(actual_n_splits):
logger.debug(f"Generating indices for fold {i+1}/{actual_n_splits} (Rolling Window)") # Log using actual_n_splits
# Calculate window boundaries for this fold
train_start_idx = i * step_size
train_end_idx = train_start_idx + fixed_train_n
val_start_idx = train_end_idx
val_end_idx = val_start_idx + val_size
test_start_idx = val_end_idx
test_end_idx = test_start_idx + test_size # = train_start_idx + fold_window_size
# Determine indices for this fold using slicing
train_indices = indices[train_start_idx:train_end_idx]
val_indices = indices[val_start_idx:val_end_idx]
test_indices = indices[test_start_idx:test_end_idx]
# --- Basic Validation Checks (Optional, should be guaranteed by calculations) ---
# Ensure no overlap (guaranteed by slicing if sizes > 0)
# Ensure sequence (guaranteed by slicing)
logger.info(f"Fold {i+1}: Train indices {train_indices[0]}-{train_indices[-1]} (size {len(train_indices)}), "
f"Val indices {val_indices[0]}-{val_indices[-1]} (size {len(val_indices)}), "
f"Test indices {test_indices[0]}-{test_indices[-1]} (size {len(test_indices)})")
yield train_indices, val_indices, test_indices
# --- Dataset Class ---
class TimeSeriesDataset(Dataset):
"""
PyTorch Dataset for time series forecasting.
Takes a NumPy array (features + target), sequence length, and forecast horizon,
and returns (input_sequence, target_sequence) tuples. Compatible with PyTorch
DataLoaders used by PyTorch Lightning.
"""
def __init__(self, data_array: np.ndarray, sequence_length: int, forecast_horizon: int, target_col_index: int = 0):
"""
Args:
data_array: Numpy array of shape (n_samples, n_features).
Assumes the target variable is one of the columns.
sequence_length: Length of the input sequence (lookback window).
forecast_horizon: Number of steps ahead to predict.
target_col_index: Index of the target column in data_array. Defaults to 0.
"""
if sequence_length <= 0:
raise ValueError("sequence_length must be positive.")
if forecast_horizon <= 0:
raise ValueError("forecast_horizon must be positive.")
if data_array.ndim != 2:
raise ValueError(f"data_array must be 2D, but got shape {data_array.shape}")
min_len_required = sequence_length + forecast_horizon
if min_len_required > data_array.shape[0]:
raise ValueError(f"sequence_length ({sequence_length}) + forecast_horizon ({forecast_horizon}) = {min_len_required} "
f"exceeds total samples provided ({data_array.shape[0]})")
if not (0 <= target_col_index < data_array.shape[1]):
raise ValueError(f"target_col_index ({target_col_index}) out of bounds for data with {data_array.shape[1]} columns.")
self.data = torch.tensor(data_array, dtype=torch.float32)
self.sequence_length = sequence_length
self.forecast_horizon = forecast_horizon
self.target_col_index = target_col_index
self.n_samples = data_array.shape[0]
self.n_features = data_array.shape[1]
logger.debug(f"TimeSeriesDataset created: data shape={self.data.shape}, "
f"seq_len={self.sequence_length}, forecast_horizon={self.forecast_horizon}, "
f"target_idx={self.target_col_index}")
def __len__(self) -> int:
"""Returns the total number of sequences that can be generated."""
return self.n_samples - self.sequence_length - self.forecast_horizon + 1
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Returns a single (input_sequence, target_sequence) pair.
"""
if not (0 <= idx < len(self)):
raise IndexError(f"Index {idx} out of bounds for dataset with length {len(self)}")
input_start = idx
input_end = idx + self.sequence_length
input_sequence = self.data[input_start:input_end, :]
target_start = input_end
target_end = target_start + self.forecast_horizon
target_sequence = self.data[target_start:target_end, self.target_col_index]
return input_sequence, target_sequence
# --- Data Preparation ---
def prepare_fold_data_and_loaders(
full_df: pd.DataFrame, # Should contain only the target initially
train_idx: np.ndarray,
val_idx: np.ndarray,
test_idx: np.ndarray,
target_col: str,
feature_config: FeatureConfig,
train_config: TrainingConfig,
eval_config: EvaluationConfig
) -> Tuple[DataLoader, DataLoader, DataLoader, Union[StandardScaler, MinMaxScaler, None], int]:
"""
Prepares data loaders for a single cross-validation fold.
This is essential for time-series CV where scaling must be fitted *only*
on the training data of the current fold to prevent lookahead bias.
The resulting DataLoaders can be used directly with a PyTorch Lightning Trainer
within the cross-validation loop in `main.py`.
Steps:
1. Determines the full data range needed for the fold (incl. history for features).
2. Engineers features on this slice using `engineer_features`.
3. Splits the engineered data into train, validation, test sets based on indices.
4. **Fits a scaler ONLY on the training data for the current fold.**
5. Transforms train, validation, and test sets using the fitted scaler.
6. Creates `TimeSeriesDataset` instances for each set.
7. Creates `DataLoader` instances for each set.
Args:
full_df: The complete raw DataFrame (datetime index, target column).
train_idx: Array of integer indices for the training set.
val_idx: Array of integer indices for the validation set.
test_idx: Array of integer indices for the test set.
target_col: Name of the target column.
feature_config: Configuration for feature engineering.
train_config: Configuration for training (used for batch size, device hints).
eval_config: Configuration for evaluation (used for batch size).
Returns:
Tuple containing:
- train_loader: DataLoader for the training set.
- val_loader: DataLoader for the validation set.
- test_loader: DataLoader for the test set.
- target_scaler: The scaler fitted on the target variable (for inverse transform). Can be None.
- input_size: The number of features in the input sequences (X).
Raises:
ValueError: If indices are invalid, data splitting fails, or NaNs persist.
ImportError: If feature engineering requires an uninstalled library.
"""
logger.info(f"Preparing data loaders for fold: train_size={len(train_idx)}, val_size={len(val_idx)}, test_size={len(test_idx)}")
if len(train_idx) == 0 or len(val_idx) == 0 or len(test_idx) == 0:
raise ValueError("Received empty indices for train, validation, or test set.")
# 1. Determine data slice needed including history for feature lookback
max_lookback = 0
if feature_config.lags:
max_lookback = max(max_lookback, max(feature_config.lags))
if feature_config.rolling_window_sizes:
max_lookback = max(max_lookback, max(feature_config.rolling_window_sizes) -1 )
max_history_needed = max(max_lookback, feature_config.sequence_length)
slice_start_idx = max(0, train_idx[0] - max_history_needed)
slice_end_idx = test_idx[-1] + 1
if slice_start_idx >= slice_end_idx:
raise ValueError(f"Calculated slice start ({slice_start_idx}) >= slice end ({slice_end_idx}). Check indices.")
fold_data_slice = full_df.iloc[slice_start_idx:slice_end_idx]
logger.debug(f"Required data slice for fold: indices {slice_start_idx} to {slice_end_idx-1} "
f"(size {len(fold_data_slice)}) for history and fold data.")
if fold_data_slice.empty:
raise ValueError(f"Data slice for fold is empty (indices {slice_start_idx} to {slice_end_idx-1}).")
# 2. Feature Engineering on the slice
try:
engineered_df = engineer_features(fold_data_slice.copy(), target_col, feature_config)
if engineered_df.empty:
raise ValueError("Feature engineering resulted in an empty DataFrame.")
except Exception as e:
logger.error(f"Feature engineering failed for fold: {e}")
raise
# 3. Map absolute indices to iloc positions in the potentially modified engineered_df
try:
# Use index intersection to find valid locations
train_indices_dt = full_df.index[train_idx]
val_indices_dt = full_df.index[val_idx]
test_indices_dt = full_df.index[test_idx]
adj_train_idx_loc = engineered_df.index.get_indexer(train_indices_dt.intersection(engineered_df.index))
adj_val_idx_loc = engineered_df.index.get_indexer(val_indices_dt.intersection(engineered_df.index))
adj_test_idx_loc = engineered_df.index.get_indexer(test_indices_dt.intersection(engineered_df.index))
# Filter out any -1s just in case (shouldn't happen with intersection)
adj_train_idx_loc = adj_train_idx_loc[adj_train_idx_loc != -1]
adj_val_idx_loc = adj_val_idx_loc[adj_val_idx_loc != -1]
adj_test_idx_loc = adj_test_idx_loc[adj_test_idx_loc != -1]
if len(adj_train_idx_loc) == 0 or len(adj_val_idx_loc) == 0 or len(adj_test_idx_loc) == 0:
logger.error(f"Index mapping resulted in empty splits: Train({len(adj_train_idx_loc)}), Val({len(adj_val_idx_loc)}), Test({len(adj_test_idx_loc)})")
logger.debug(f"Original counts: Train={len(train_idx)}, Val={len(val_idx)}, Test={len(test_idx)}")
logger.debug(f"Engineered DF index span: {engineered_df.index.min()} to {engineered_df.index.max()}")
raise ValueError("Mapping original indices to engineered DataFrame resulted in empty splits. Check CV indices and NaN handling.")
except Exception as e:
logger.error(f"Error mapping indices to engineered DataFrame: {e}")
raise ValueError("Failed to map CV indices to the feature-engineered data slice.")
# 4. Split engineered data using iloc positions
train_df = engineered_df.iloc[adj_train_idx_loc]
val_df = engineered_df.iloc[adj_val_idx_loc]
test_df = engineered_df.iloc[adj_test_idx_loc]
logger.debug(f"Fold split shapes after feature engineering: Train={train_df.shape}, Val={val_df.shape}, Test={test_df.shape}")
if train_df.empty or val_df.empty or test_df.empty:
raise ValueError("One or more data splits (train, val, test) are empty after feature engineering and splitting.")
# --- Final Check for NaNs before scaling/Dataset creation ---
if train_df.isnull().any().any():
nan_cols = train_df.columns[train_df.isnull().any()].tolist()
logger.error(f"NaNs found in FINAL training data before scaling. Columns: {nan_cols}")
logger.debug(f"NaN counts per column in train_df:\n{train_df.isnull().sum()[train_df.isnull().any()]}")
raise ValueError("NaNs present in training data before scaling. Check feature engineering NaN handling.")
if val_df.isnull().any().any() or test_df.isnull().any().any():
logger.warning("NaNs found in final validation or test data splits. This might cause issues during evaluation or testing.")
# 5. Scaling (Fit on Train, Transform All) - CRITICAL PER-FOLD STEP
feature_cols = train_df.columns.tolist()
try:
target_col_index_in_features = feature_cols.index(target_col)
except ValueError:
raise ValueError(f"Target column '{target_col}' not found in the final feature columns: {feature_cols}")
scaler: Optional[Union[StandardScaler, MinMaxScaler]] = None
target_scaler: Optional[Union[StandardScaler, MinMaxScaler]] = None
ScalerClass: Optional[Type[Union[StandardScaler, MinMaxScaler]]] = None
if feature_config.scaling_method == 'standard':
ScalerClass = StandardScaler
elif feature_config.scaling_method == 'minmax':
ScalerClass = MinMaxScaler
elif feature_config.scaling_method is None:
logger.info("No scaling applied for this fold.")
else:
raise ValueError(f"Unsupported scaling method: {feature_config.scaling_method}")
train_data = train_df[feature_cols].values
val_data = val_df[feature_cols].values
test_data = test_df[feature_cols].values
if ScalerClass is not None:
scaler = ScalerClass()
target_scaler = ScalerClass()
logger.info(f"Applying {feature_config.scaling_method} scaling. Fitting on training data for the fold.")
scaler.fit(train_data)
target_scaler.fit(train_data[:, target_col_index_in_features].reshape(-1, 1))
train_data_scaled = scaler.transform(train_data)
val_data_scaled = scaler.transform(val_data)
test_data_scaled = scaler.transform(test_data)
logger.debug("Scaling complete for the fold.")
else:
train_data_scaled = train_data
val_data_scaled = val_data
test_data_scaled = test_data
input_size = train_data_scaled.shape[1]
# 6. Dataset Instantiation
logger.debug("Creating TimeSeriesDataset instances for the fold.")
try:
train_dataset = TimeSeriesDataset(
train_data_scaled, feature_config.sequence_length, feature_config.forecast_horizon, target_col_index=target_col_index_in_features
)
val_dataset = TimeSeriesDataset(
val_data_scaled, feature_config.sequence_length, feature_config.forecast_horizon, target_col_index=target_col_index_in_features
)
test_dataset = TimeSeriesDataset(
test_data_scaled, feature_config.sequence_length, feature_config.forecast_horizon, target_col_index=target_col_index_in_features
)
except ValueError as e:
logger.error(f"Error creating TimeSeriesDataset: {e}")
logger.error(f"Shapes fed to Dataset: Train={train_data_scaled.shape}, Val={val_data_scaled.shape}, Test={test_data_scaled.shape}")
logger.error(f"SeqLen={feature_config.sequence_length}, Horizon={feature_config.forecast_horizon}")
raise
# 7. DataLoader Creation
logger.debug("Creating DataLoaders for the fold.")
num_workers = getattr(train_config, 'num_workers', 0)
pin_memory = torch.cuda.is_available() # Pin memory if CUDA is available
train_loader = DataLoader(
train_dataset, batch_size=train_config.batch_size, shuffle=True,
num_workers=num_workers, pin_memory=pin_memory, drop_last=False
)
val_loader = DataLoader(
val_dataset, batch_size=eval_config.eval_batch_size, shuffle=False,
num_workers=num_workers, pin_memory=pin_memory, drop_last=False
)
test_loader = DataLoader(
test_dataset, batch_size=eval_config.eval_batch_size, shuffle=False,
num_workers=num_workers, pin_memory=pin_memory, drop_last=False
)
logger.info("Data loaders prepared successfully for the fold.")
return train_loader, val_loader, test_loader, target_scaler, input_size