Files
entrix_case_challange/data_analysis/analysis/data.py
2025-05-02 10:45:06 +02:00

127 lines
5.1 KiB
Python

import logging
import pandas as pd
from typing import Tuple, Optional, Dict, Any
import warnings
from statsmodels.tools.sm_exceptions import InterpolationWarning
# Import analysis tools - ensure statsmodels is installed
from statsmodels.tsa.seasonal import seasonal_decompose, DecomposeResult
from statsmodels.tsa.stattools import adfuller, kpss
logger = logging.getLogger(__name__)
# PRICE_COL constant moved to io.data_handling
def perform_decomposition(series: pd.Series, model: str = 'additive', period: int = 24) -> Tuple[Optional[DecomposeResult], Optional[str]]:
"""
Performs time series decomposition using statsmodels.
Args:
series: The time series data (e.g., df['Price']).
model: Type of decomposition ('additive' or 'multiplicative').
period: The period of the seasonality.
Returns:
A tuple containing:
- DecomposeResult | None: The decomposition result object.
- str | None: Error message, otherwise None.
"""
logger.info(f"Performing {model} decomposition with period {period}...")
result = None
err = None
# Check if series is empty or None before proceeding
if series is None or series.empty:
err = "Input series for decomposition is empty or None."
logger.error(err)
return None, err
try:
if len(series) < 2 * period:
err = f"Series is too short for decomposition with period {period} (length {len(series)})."
logger.error(err)
return None, err
# Ensure Series has a DatetimeIndex with frequency for extrapolate_trend
if not isinstance(series.index, pd.DatetimeIndex) or series.index.freq is None:
logger.warning("Series index is not a DatetimeIndex with frequency. Decomposition might be less reliable.")
# Consider removing extrapolate_trend or handling differently if freq is often missing
result = seasonal_decompose(series, model=model, period=period)
else:
result = seasonal_decompose(series, model=model, period=period, extrapolate_trend='freq')
logger.info("Decomposition successful.")
except ValueError as ve:
# Catch specific ValueError often related to NaNs or period issues
err = f"ValueError during decomposition (check for NaNs or period > series length/2): {ve}"
logger.error(err, exc_info=True)
except Exception as e:
err = f"Error during decomposition: {e}"
logger.error(err, exc_info=True)
return result, err
def perform_stationarity_tests(series: pd.Series) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
"""
Performs ADF and KPSS stationarity tests.
Args:
series: The time series to test (often residuals or differenced series).
Returns:
A tuple containing:
- dict | None: Dictionary containing test results ('adf', 'kpss').
- str | None: Error message, otherwise None.
"""
logger.info("Performing stationarity tests (ADF, KPSS)...")
results = {}
err = None
# Check if series is empty or None
if series is None or series.empty:
err = "Input series for stationarity tests is empty or None."
logger.error(err)
return None, err
# Check for NaNs
if series.isnull().any():
err = "Input series contains NaNs. Please handle missing values before testing stationarity."
logger.error(err)
return None, err
try:
# ADF Test
adf_test = adfuller(series, autolag='AIC')
adf_keys = ['Test Statistic',
'p-value',
'#Lags Used',
'#Observations Used',
'Critical Values',
'IC Best' # Added by newer statsmodels
]
# Only map existing keys from result tuple
results['adf'] = {key: val for key, val in zip(adf_keys, adf_test) if key != 'IC Best'}
# Add IC Best separately if it exists
if len(adf_test) > 5: results['adf']['IC Best'] = adf_test[5]
logger.debug(f"ADF Test Results: {results['adf']}")
# KPSS Test (common to test for level stationarity 'c')
with warnings.catch_warnings(): # Suppress known KPSS p-value interpolation warnings
warnings.filterwarnings("ignore", category=InterpolationWarning)
kpss_test = kpss(series, regression='c', nlags="auto")
kpss_keys = ['Test Statistic',
'p-value',
'#Lags Used',
'Critical Values'
]
results['kpss'] = {key: val for key, val in zip(kpss_keys, kpss_test)}
# Handle potential p-value bounds reported as strings
if isinstance(results['kpss']['p-value'], str):
logger.warning(f"KPSS p-value reported as bounds: {results['kpss']['p-value']}")
logger.debug(f"KPSS Test Results: {results['kpss']}")
logger.info("Stationarity tests completed.")
except Exception as e:
err = f"Error performing stationarity tests: {e}"
logger.error(err, exc_info=True)
results = None
return results, err