Files
entrix_case_challange/optim_run.py
2025-05-12 20:05:28 +02:00

632 lines
36 KiB
Python

import pandas as pd
import numpy as np
import logging
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import timedelta
from pydantic import BaseModel
from tqdm import tqdm
# Import Forecasting Providers
from forecasting_model.utils.helper import load_config
from optimizer.forecasting.base import ForecastProvider
from optimizer.forecasting.single_model import SingleModelProvider
from optimizer.forecasting.ensemble import EnsembleProvider
from optimizer.optimization.battery import solve_battery_optimization_hourly
from optimizer.utils.optimizer_config_model import OptimizationRunConfig
from forecasting_model.utils.forecast_config_model import MainConfig, FeatureConfig #, DataConfig
# Import the loading functions
from optimizer.utils.model_io import load_single_model_artifact, load_ensemble_artifact
# Feature Engineering Import
from forecasting_model import engineer_features, load_raw_data
from typing import Dict, Any, Optional, Type
# Silence overly verbose libraries if needed
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING)
pil_logger = logging.getLogger('PIL')
pil_logger.setLevel(logging.WARNING)
pl_logger = logging.getLogger('pytorch_lightning')
pl_logger.setLevel(logging.WARNING)
# --- Basic Logging Setup ---
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)-7s - %(message)s',
datefmt='%H:%M:%S')
# Get the root logger
logger = logging.getLogger()
def load_optimization_config(config_path: str) -> Type[BaseModel] | None:
"""Loads the main optimization configuration from a YAML file."""
return load_config(Path(config_path), OptimizationRunConfig)
def load_main_forecasting_config(config_path: str) -> Type[BaseModel] | None:
"""Loads the main forecasting configuration from a YAML file."""
return load_config(Path(config_path), MainConfig)
def check_and_adjust_decisions_given_state(decision_p: float, current_state_b: float, max_capacity_b: float) -> tuple[float, float]:
"""Clips decision based on battery capacity (0 to max_capacity)."""
potential_state = current_state_b + decision_p
new_state = max(0.0, min(potential_state, max_capacity_b))
valid_decision = new_state - current_state_b
return valid_decision, new_state
# --- Main Execution Logic ---
if __name__ == "__main__":
logger.info("Starting battery optimization evaluation using provider-specific engineered features.")
# --- Load Main Optimization Config ---
optimization_config_path = "optim_config.yaml"
optimization_config = load_optimization_config(optimization_config_path)
if optimization_config is None:
logger.critical("Failed to load main optimization config. Exiting.")
exit(1)
if not optimization_config.models:
logger.critical("No models or ensembles specified in optimization config. Exiting.")
exit(1)
# --- Load Original Historical Data (Get path from the first specified model config) ---
try:
first_model_config_path_str = optimization_config.models[0].model_config_path
main_forecasting_config_for_data = load_main_forecasting_config(first_model_config_path_str)
if main_forecasting_config_for_data is None:
raise ValueError(f"Failed to load forecasting config ({first_model_config_path_str}) to get data path.")
historical_data_config = main_forecasting_config_for_data.data
target_col = historical_data_config.target_col # Assume consistent target for now
logger.info(f"Loading original historical data from: {historical_data_config.data_path}")
full_historical_df = load_raw_data(historical_data_config) # load_raw_data handles initial cleaning
if full_historical_df.empty or target_col not in full_historical_df.columns:
raise ValueError(f"Loaded original historical data is empty or missing target column '{target_col}'.")
# Ensure DatetimeIndex and Frequency (load_raw_data should ideally handle this)
if not isinstance(full_historical_df.index, pd.DatetimeIndex):
raise TypeError("Loaded historical data must have a DatetimeIndex.")
# Attempt to set frequency if not already set and expected_frequency is defined
if full_historical_df.index.freq is None and historical_data_config.expected_frequency:
logger.warning(f"Data index frequency not set, attempting to set to '{historical_data_config.expected_frequency}'.")
try:
full_historical_df = full_historical_df.asfreq(historical_data_config.expected_frequency)
if full_historical_df[target_col].isnull().any():
logger.warning(f"NaNs found after setting frequency. Applying ffill().bfill() to '{target_col}'.")
full_historical_df[target_col] = full_historical_df[target_col].ffill().bfill()
if full_historical_df[target_col].isnull().any():
raise ValueError(f"NaNs remain after filling '{target_col}' post-asfreq.")
except Exception as e:
logger.error(f"Failed to set frequency to {historical_data_config.expected_frequency}: {e}", exc_info=True)
# Decide whether to proceed or exit based on importance of frequency
# For now, proceed but log warning. Providers might fail later if they assume frequency.
logger.warning("Proceeding without guaranteed index frequency.")
logger.info(f"Original historical data loaded and prepared. Shape: {full_historical_df.shape}, Time range: {full_historical_df.index.min()} to {full_historical_df.index.max()}, Freq: {full_historical_df.index.freq}")
except Exception as e:
logger.critical(f"Failed during initial data loading or preparation: {e}", exc_info=True)
exit(1)
# --- Load Models, Engineer Features per Provider, Instantiate Providers ---
providers_data: Dict[str, Dict[str, Any]] = {} # Stores {'provider_name': {'provider': instance, 'df': engineered_df, 'first_valid_forecast_time': timestamp}}
for model_eval_config in optimization_config.models:
provider_name = model_eval_config.name
artifact_type = model_eval_config.type
artifact_path = Path(model_eval_config.model_path)
model_config_path = Path(model_eval_config.model_config_path)
logger.info(f"--- Processing Provider: {provider_name} ({artifact_type}) ---")
provider_instance: Optional[ForecastProvider] = None
provider_feature_config: Optional[FeatureConfig] = None # Will hold the config needed for engineering
provider_target_scaler: Optional[Any] = None
provider_data_scaler: Optional[Any] = None # Added to store the data scaler
try:
if artifact_type == 'model':
logger.info(f"Loading single model artifact: {provider_name}")
target_scaler_path = Path(model_eval_config.target_scaler_path) if model_eval_config.target_scaler_path else None
data_scaler_path = Path(model_eval_config.data_scaler_path) if model_eval_config.data_scaler_path else None
input_size_path = Path(model_eval_config.input_size_path) if model_eval_config.input_size_path else None
loaded_artifact_info = load_single_model_artifact(
model_path=artifact_path,
config_path=model_config_path,
input_size_path=input_size_path,
target_scaler_path=target_scaler_path,
data_scaler_path=data_scaler_path
)
if not loaded_artifact_info:
raise ValueError("load_single_model_artifact returned None.")
# Check target column consistency
current_main_config = loaded_artifact_info['main_forecasting_config']
if current_main_config.data.target_col != target_col:
raise ValueError(f"Target column mismatch. Expected '{target_col}', found '{current_main_config.data.target_col}'.")
provider_feature_config = loaded_artifact_info['feature_config'] # Get the config directly
provider_target_scaler = loaded_artifact_info['target_scaler'] # Get the fitted target scaler
provider_data_scaler = loaded_artifact_info['data_scaler'] # Get the fitted data scaler
if provider_data_scaler is None:
logger.warning(f"Data scaler not found or loaded for model '{provider_name}'. Prediction input scaling will be skipped in the provider.")
# Or raise ValueError if data scaler is mandatory
# Instantiate Provider
provider_instance = SingleModelProvider(
model_instance=loaded_artifact_info['model_instance'],
feature_config=provider_feature_config, # Pass config for seq_len, horizons
target_scaler=provider_target_scaler, # Pass the loaded target scaler
data_scaler=provider_data_scaler
)
# Basic validation
if 1 not in provider_instance.get_forecast_horizons():
raise ValueError(f"Model must forecast horizon 1 for interpolation anchor. Horizons: {provider_instance.get_forecast_horizons()}")
elif artifact_type == 'ensemble':
logger.info(f"Loading ensemble artifact: {provider_name}")
# Assume ensemble definition json is in the HPO output base dir
hpo_base_output_dir_for_ensemble = artifact_path.parent
loaded_artifact_info = load_ensemble_artifact(
ensemble_definition_path=artifact_path,
hpo_base_output_dir=hpo_base_output_dir_for_ensemble
)
if not loaded_artifact_info or not loaded_artifact_info.get('fold_artifacts'):
raise ValueError("load_ensemble_artifact returned None or no fold artifacts.")
# Get the feature config from the first fold (assuming all are identical)
if not loaded_artifact_info['fold_artifacts'][0].get('feature_config'):
raise ValueError(f"First fold artifact for ensemble '{provider_name}' is missing 'feature_config'.")
provider_feature_config = loaded_artifact_info['fold_artifacts'][0]['feature_config']
logger.debug(f"Using feature config from first fold for ensemble '{provider_name}' engineering.")
# Instantiate Provider (passes individual fold artifacts internally)
provider_instance = EnsembleProvider(
fold_artifacts=loaded_artifact_info['fold_artifacts'],
ensemble_method=loaded_artifact_info['ensemble_method'],
)
# Basic validation (check if any fold forecasts horizon 1)
can_forecast_h1 = 1 in provider_instance.get_forecast_horizons()
if not can_forecast_h1:
raise ValueError("Ensemble has no folds that forecast horizon 1. Cannot use for interpolation anchor.")
else:
raise ValueError(f"Unknown artifact type '{artifact_type}'.")
# --- Feature Engineering for this Provider ---
logger.info(f"Engineering features specifically for '{provider_name}' using its required configuration.")
if provider_feature_config is None:
raise RuntimeError(f"Could not determine feature config for {provider_name}")
engineered_df_provider = engineer_features(
full_historical_df.copy(), # Use copy of original data
target_col=target_col,
feature_config=provider_feature_config # Use the relevant config directly
)
# Handle NaNs introduced by feature engineering - crucial step !!!
initial_nan_count = engineered_df_provider.isnull().sum().sum()
first_valid_index_dt = engineered_df_provider.first_valid_index()
if pd.isna(first_valid_index_dt):
raise ValueError(f"Engineered DataFrame for '{provider_name}' contains only NaNs.")
logger.info(f"Feature engineering for '{provider_name}' complete. Shape: {engineered_df_provider.shape}. First valid index: {first_valid_index_dt}")
if initial_nan_count > 0:
logger.debug(f"Found {initial_nan_count} NaN values initially (expected due to lookback).")
# --- Determine First Valid Forecast Time ---
provider_seq_len = provider_instance.get_required_lookback()
try:
first_valid_loc = engineered_df_provider.index.get_loc(first_valid_index_dt)
first_possible_end_loc = first_valid_loc + provider_seq_len - 1
if first_possible_end_loc >= len(engineered_df_provider.index):
raise ValueError(f"Not enough data points ({len(engineered_df_provider.index)} starting {first_valid_index_dt}) to form sequence length {provider_seq_len}.")
first_valid_forecast_time = engineered_df_provider.index[first_possible_end_loc]
logger.info(f"First possible forecast time (t=0) for '{provider_name}': {first_valid_forecast_time} (needs seq_len={provider_seq_len})")
except KeyError:
raise ValueError(f"Consistency error: first_valid_index {first_valid_index_dt} not found in engineered DF index.")
except IndexError:
raise ValueError(f"Indexing error while determining first valid forecast time for {provider_name}.")
# Store provider instance, its dedicated DF, and its first valid time
providers_data[provider_name] = {
'provider': provider_instance,
'df': engineered_df_provider,
'first_valid_forecast_time': first_valid_forecast_time
}
logger.info(f"Successfully processed and prepared provider '{provider_name}'.")
except Exception as e:
logger.error(f"Failed to process or prepare provider '{provider_name}': {e}. Skipping this provider.", exc_info=True)
if provider_name in providers_data:
del providers_data[provider_name]
continue # Skip to the next provider in the config
# --- End Loading/Preparing Providers ---
if not providers_data:
logger.critical("No forecast providers were successfully created. Cannot proceed. Exiting.")
exit(1)
successfully_loaded_provider_names = list(providers_data.keys())
logger.info(f"Successfully prepared {len(successfully_loaded_provider_names)} providers: {successfully_loaded_provider_names}")
# --- Determine Overall Evaluation Window based on Timestamps ---
optimization_horizon_hours = optimization_config.optimization_horizon_hours
# step_size_hours = 1 # Assuming hourly steps for the sliding window, is fixed for our case.
# Find the latest "first valid forecast time" across all providers
# This determines the earliest time we can start the evaluation for *all* providers
global_first_valid_forecast_time = max(p_data['first_valid_forecast_time'] for p_data in providers_data.values())
# The first evaluation window STARTS at the hour *after* this time
first_window_start_time = global_first_valid_forecast_time + timedelta(hours=1) # Shift by one hour
# The last evaluation window START time is such that the window fits within the original historical data
last_possible_window_end_time = full_historical_df.index.max()
last_window_start_time = last_possible_window_end_time - timedelta(hours=optimization_horizon_hours - 1)
# Create the range of window start times
evaluation_start_times = pd.date_range(
start=first_window_start_time,
end=last_window_start_time,
freq='1h' # Use frequency string 'h' for hourly
)
if evaluation_start_times.empty:
logger.critical(f"Not enough data for evaluation window. Global first possible window start: {first_window_start_time}, Last possible window start: {last_window_start_time}. Check data length, lookbacks, sequence lengths, and optimization horizon.")
exit(1)
logger.info(f"Evaluating optimization over windows starting from {evaluation_start_times.min()} to {evaluation_start_times.max()} ({len(evaluation_start_times)} windows).")
# --- Evaluation Loop ---
window_results_list = []
# Wrap the iterable with idx, tqdm and add a description
# For debugging: # TODO implement a debug flag!
# for idx, window_start_time in enumerate(tqdm(evaluation_start_times[-200:], desc="Evaluating Optimization Windows"), start=0):
for idx, window_start_time in enumerate(tqdm(evaluation_start_times, desc="Evaluating Optimization Windows"), start=0):
window_end_time = window_start_time + timedelta(hours=optimization_horizon_hours - 1)
# Changed log level to debug to avoid flooding console when using tqdm
logger.debug(f"Processing window: {window_start_time.strftime('%Y-%m-%d %H:%M')} to {window_end_time.strftime('%Y-%m-%d %H:%M')}")
# Get actual prices for the optimization window from the *original* DF using timestamps
try:
actual_prices_in_window = full_historical_df.loc[window_start_time:window_end_time, target_col].tolist()
if len(actual_prices_in_window) != optimization_horizon_hours:
logger.warning(f"Skipping window starting {window_start_time}: Actual prices slice length {len(actual_prices_in_window)} != {optimization_horizon_hours}. Check data continuity.")
continue
except Exception as e: # Catch potential KeyError or other slicing issues
logger.warning(f"Skipping window starting {window_start_time}: Could not retrieve actual prices from original DF. Error: {e}")
continue
# --- Determine Forecast Input Point (Timestamp) ---
# The forecast needs to predict prices for the window starting at window_start_time.
# The input sequence for the forecast model ends at the timestamp *just before* this window starts.
# TODO!!!! This might be a major disadvantage! Check this consideration!!!
forecast_start_time = window_start_time - timedelta(hours=1) # t=0 for the forecast
# Get the last actual price (at t=0) needed for interpolation anchor
try:
last_actual_price = full_historical_df.loc[forecast_start_time, target_col]
if pd.isna(last_actual_price):
logger.warning(f"Last actual price at {forecast_start_time} is NaN. Cannot use as interpolation anchor. Skipping window.")
continue
except KeyError:
logger.error(f"Cannot get last actual price at timestamp {forecast_start_time} from original DataFrame. Skipping window.")
continue
except Exception as e:
logger.error(f"Error getting last actual price at {forecast_start_time}: {e}. Skipping window.")
continue
# --- Collect Window Results ---
window_results = {
'start_time': window_start_time,
'end_time': window_end_time,
'actual_prices': actual_prices_in_window
}
# --- Baseline Optimization (uses actual prices) ---
baseline_prices_input = np.array(actual_prices_in_window)
logger.debug(f"Running baseline optimization for window starting {window_start_time}")
# Set the "initial" battery state to the last battery state from the previous window
if idx:
# The battery state of the current window is the state of B_t_1 of the last window
# (since we only took, the initial optimal suggested power buy/sell
# Same goes for profit/cost
prev_B_state = window_results_list[-1]['baseline']['B_state']
prev_acc_profit = window_results_list[-1]['baseline']['acc_profit']
else:
prev_B_state = optimization_config.initial_b
prev_acc_profit = 0
try:
baseline_status, baseline_profit, baseline_P, baseline_B = solve_battery_optimization_hourly(
baseline_prices_input, prev_B_state, optimization_config.max_capacity, optimization_config.max_rate
)
# Check decision and only really buy if battery state allows for it!
decision = baseline_P[0]
allowed_p_decision, resulting_b_state = check_and_adjust_decisions_given_state(
decision, prev_B_state, optimization_config.max_capacity
)
# We must invert the sign because buy (=charge) means spent (=-money)!! (and the other way round)
acc_profit = prev_acc_profit + (-1 * allowed_p_decision * baseline_prices_input[0])
window_results['baseline'] = { "status": baseline_status,
"P_schedule": baseline_P.tolist() if baseline_P is not None else None,
"B_schedule": baseline_B.tolist() if baseline_B is not None else None,
"P_decision": allowed_p_decision,
"B_state": resulting_b_state,
"acc_profit": acc_profit}
logger.debug(f"Baseline profit: {baseline_profit if baseline_profit is not None else 'N/A'}")
except Exception as e:
logger.error(f"Baseline optimization failed for window {window_start_time}: {e}", exc_info=True)
window_results['baseline'] = {"status": "Error", "P_schedule": None, "P_decision": None,
"B_schedule": None, "B_state": None, "acc_profit": None}
# --- Forecast Provider Optimizations ---
for provider_name in successfully_loaded_provider_names:
provider_data = providers_data[provider_name]
provider_instance = provider_data['provider']
engineered_df_provider = provider_data['df'] # Get the dedicated *unscaled* DF
logger.debug(f"Running forecast and optimization for provider '{provider_name}'")
# Generate forecast using the new method signature with dedicated DF and timestamp
try:
forecast_prices_input = provider_instance.get_forecast(
engineered_df=engineered_df_provider, # Pass the provider's specific UNscaled DF
forecast_start_time=forecast_start_time, # Pass the t=0 timestamp
optimization_horizon_hours=optimization_horizon_hours,
last_actual_price=last_actual_price
)
except Exception as e:
logger.error(f"Error calling get_forecast for provider '{provider_name}' window {window_start_time}: {e}", exc_info=True)
forecast_prices_input = None
if forecast_prices_input is None:
logger.warning(f"Forecast generation failed for provider '{provider_name}' window {window_start_time}. Skipping optimization.")
window_results[provider_name] = {"status": "Forecast Generation Failed", "P_schedule": None,
"B_schedule": None, "P_decision": None, "B_state": None, "acc_profit": None}
continue
if not isinstance(forecast_prices_input, np.ndarray) or forecast_prices_input.shape != (optimization_horizon_hours,):
logger.error(f"Forecast from provider '{provider_name}' has incorrect shape {forecast_prices_input.shape} (expected {(optimization_horizon_hours,)}). Skipping optimization.")
window_results[provider_name] = {"status": "Invalid Forecast Format", "P_schedule": None,
"B_schedule": None, "P_decision": None, "B_state": None, "acc_profit": None}
continue
# --- Run Optimization with Forecast Prices ---
if idx:
# See above
prev_B_state_model = window_results_list[-1][provider_name]['B_state']
prev_acc_profit_model = window_results_list[-1][provider_name]['acc_profit']
else:
prev_B_state_model = optimization_config.initial_b
prev_acc_profit_model = 0
try:
model_status, optim_model_profit, model_P, model_B = solve_battery_optimization_hourly(
forecast_prices_input, prev_B_state_model, optimization_config.max_capacity, optimization_config.max_rate
)
# Important, get the price at real_t_0 (=window_start_time), do not calc the profit using the forecast!
real_price_at_t0 = engineered_df_provider.iloc[engineered_df_provider.index.get_loc(window_start_time)][target_col]
if real_price_at_t0 != baseline_prices_input[0]:
logger.warning(f"Baseline {baseline_prices_input[0]} and {provider_name} {real_price_at_t0} should buy for the same price but didn't.")
# Check decision and only really buy if battery state allows for it!
model_decision = model_P[0]
allowed_p_decision_model, resulting_b_state_model = check_and_adjust_decisions_given_state(
model_decision, prev_B_state_model, optimization_config.max_capacity
)
acc_profit_model = prev_acc_profit_model + (-1 * allowed_p_decision_model * real_price_at_t0)
window_results[provider_name] = { "status": model_status,
"P_schedule": model_P.tolist() if model_P is not None else None,
"B_schedule": model_B.tolist() if model_B is not None else None,
"P_decision": allowed_p_decision_model,
"B_state": resulting_b_state_model,
"acc_profit": acc_profit_model}
logger.debug(f"Provider '{provider_name}' profit: {optim_model_profit if optim_model_profit is not None else 'N/A'}")
except Exception as e:
logger.error(f"Optimization failed for provider '{provider_name}' window {window_start_time}: {e}", exc_info=True)
window_results[provider_name] = {"status": "Optimization Error", "P_schedule": None,
"B_schedule": None, "P_decision": None, "B_state": None, "acc_profit": None}
# Append results for this window
window_results_list.append(window_results)
logger.debug(f"Finished processing window starting at: {window_start_time.strftime('%Y-%m-%d %H:%M')}")
logger.info(f"Finished processing all {len(evaluation_start_times)} evaluation windows.")
# --- Post-processing and Plotting ---
logger.info("Starting results analysis and plotting.")
if not window_results_list:
logger.warning("No window results were collected. Skipping analysis and plotting.")
exit(0)
# Convert results list to a DataFrame
flat_results = []
for window_res in window_results_list:
base_info = { 'start_time': window_res['start_time'], 'end_time': window_res['end_time'], }
# Add baseline results
# Ensure schedules are lists, even if None (for consistent schema)
baseline_res = window_res.get('baseline', {})
baseline_res['P_schedule'] = baseline_res.get('P_schedule') if baseline_res.get('P_schedule') is not None else []
baseline_res['B_schedule'] = baseline_res.get('B_schedule') if baseline_res.get('B_schedule') is not None else []
flat_results.append({**base_info, 'type': 'baseline', **baseline_res})
# Add provider results
for provider_name in successfully_loaded_provider_names:
provider_res = window_res.get(provider_name, {"status": "Missing Result", "acc_profit": np.nan, "P_schedule": [], "B_schedule": []})
provider_res['P_schedule'] = provider_res.get('P_schedule') if provider_res.get('P_schedule') is not None else []
provider_res['B_schedule'] = provider_res.get('B_schedule') if provider_res.get('B_schedule') is not None else []
flat_results.append({**base_info, 'type': provider_name, **provider_res})
results_df = pd.DataFrame(flat_results)
results_df['start_time'] = pd.to_datetime(results_df['start_time'])
results_df.set_index('start_time', inplace=True) # Set time index
# Sort by index and type to ensure correct diff calculation
results_df.sort_index(inplace=True)
# Calculate Hourly Profit (difference in cumulative profit per type)
# Important: Group by 'type' before diffing to avoid comparing different types
results_df['hourly_profit'] = results_df.groupby('type')['acc_profit'].diff()
# Fill the first NaN value for each group with the first acc_profit value
results_df['hourly_profit'] = results_df['hourly_profit'].fillna(results_df['acc_profit'])
# --- Plotting ---
logger.info("Generating plots.")
output_dir = Path(optimization_config.output_dir) if optimization_config.output_dir else Path(".")
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving plots to: {output_dir.resolve()}")
# Plot 1: Hourly Profit Over Time
hourly_profit_pivot = results_df.pivot_table(index=results_df.index, columns='type', values='hourly_profit')
if not hourly_profit_pivot.empty and not hourly_profit_pivot.isnull().all().all():
fig1, ax = plt.subplots(figsize=(15, 7))
plot_data = hourly_profit_pivot.dropna(axis=1, how='all')
if not plot_data.empty:
sns.lineplot(data=plot_data, ax=ax, dashes=False) # Ensure solid lines for clarity
ax.set_xlabel('Time (Window Start)')
ax.set_ylabel('Hourly Profit (€)')
ax.set_title('Rolling Horizon: Hourly Profit Comparison')
ax.legend(title='Strategy')
ax.axhline(0, color='grey', linestyle='--', linewidth=0.8) # Add zero line
plt.grid()
plt.tight_layout()
plot_path = output_dir / "hourly_profit_rolling.png"
plt.savefig(plot_path)
logger.info(f"Hourly Profit plot saved to {plot_path}")
plt.close(fig1)
else:
logger.warning("Hourly profit data is all NaN after filtering. Skipping Hourly Profit plot.")
else:
logger.warning("No valid data available to plot Hourly Profit.")
# Plot 2: Cumulative Profit Over Time
cumulative_profit_pivot = results_df.pivot_table(index=results_df.index, columns='type', values='acc_profit')
if not cumulative_profit_pivot.empty and not cumulative_profit_pivot.isnull().all().all():
fig2, ax = plt.subplots(figsize=(15, 7))
plot_data = cumulative_profit_pivot.dropna(axis=1, how='all')
if not plot_data.empty:
sns.lineplot(data=plot_data, ax=ax, dashes=False)
ax.set_xlabel('Time (Window Start)')
ax.set_ylabel('Cumulative Profit (€)')
ax.set_title('Rolling Horizon: Cumulative Profit Comparison')
ax.legend(title='Strategy')
plt.tight_layout()
plot_path = output_dir / "cumulative_profit_rolling.png"
plt.savefig(plot_path)
logger.info(f"Cumulative Profit plot saved to {plot_path}")
plt.close(fig2)
else:
logger.warning("Cumulative profit data is all NaN after filtering. Skipping Cumulative Profit plot.")
else:
logger.warning("No valid data available to plot Cumulative Profit.")
# Plot 3: Example Window Schedule (Calculated Schedule vs Actual Price)
# Select the first valid window's results
if not results_df.empty:
example_window_start_time = results_df.index.unique()[0]
example_window_data = results_df.loc[[example_window_start_time]]
# Find the corresponding entry in the original list to get actual prices easily
example_window_details = next((item for item in window_results_list if item['start_time'] == example_window_start_time), None)
actual_prices_example_window = example_window_details.get('actual_prices') if example_window_details else None
if actual_prices_example_window and not example_window_data.empty and len(actual_prices_example_window) == optimization_config.optimization_horizon_hours:
fig3, ax1 = plt.subplots(figsize=(15, 7))
ax2 = ax1.twinx() # for prices
hours = np.arange(optimization_config.optimization_horizon_hours)
plot_styles = {'baseline': 'r--', **{name: next(ax1._get_lines.prop_cycler)['color'] + '-' for name in successfully_loaded_provider_names}}
for provider_type in ['baseline'] + successfully_loaded_provider_names:
provider_row = example_window_data[example_window_data['type'] == provider_type]
if not provider_row.empty:
# Get the *calculated* full schedule for this window
calculated_p = provider_row['P_schedule'].iloc[0]
# Get the single decision that was *executed*
executed_p_first_hour = provider_row['P_decision'].iloc[0]
if isinstance(calculated_p, list) and len(calculated_p) == optimization_config.optimization_horizon_hours:
color, style = plot_styles[provider_type][:-1], plot_styles[provider_type][-1]
# Plot the full calculated schedule
ax1.plot(hours, calculated_p, label=f'{provider_type} Calculated P Schedule', color=color, linestyle=':', marker='.') # Dotted for calculated
# Highlight the executed first step
if executed_p_first_hour is not None:
ax1.scatter([0], [executed_p_first_hour], label=f'{provider_type} Executed P (Hour 0)', color=color, marker='X', s=100, zorder=5) # Large 'X' for executed
# Plot actual prices for the window
ax2.plot(hours, actual_prices_example_window, label='Actual Price in Window', color='grey', linestyle='-', marker='o', markersize=4)
ax2.set_ylabel('Price (€/MWh)', color='grey')
ax2.tick_params(axis='y', labelcolor='grey')
ax1.set_xlabel('Hour within Window')
ax1.set_ylabel('Power (MW)')
ax1.set_title(f'Rolling Horizon: Example Window Schedule ({example_window_start_time.strftime("%Y-%m-%d %H:%M")})')
ax1.axhline(0, color='black', linestyle='-', linewidth=0.5) # Zero power line
# Combine legends
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
# Custom legend handling to avoid duplicate labels if many providers
unique_labels = {}
for line, label in zip(lines + lines2, labels + labels2):
if label not in unique_labels:
unique_labels[label] = line
ax1.legend(unique_labels.values(), unique_labels.keys(), loc='best')
plt.xticks(hours) # Show all hours
plt.grid(True, axis='x', linestyle=':', linewidth=0.5)
plt.tight_layout()
plot_path = output_dir / "example_window_schedule_rolling.png"
plt.savefig(plot_path)
logger.info(f"Example Window Schedule plot saved to {plot_path}")
plt.close(fig3)
else:
logger.warning("Could not generate example window schedule plot (data missing or incorrect length).")
else:
logger.warning("Results DataFrame is empty, cannot generate example window plot.")
# --- Save Results DataFrame (Optional) ---
try:
results_save_path = output_dir / "optimization_results_rolling.csv"
# Reset index to include start_time as a column
results_df_to_save = results_df.reset_index()
# Convert list columns (schedules) to strings for CSV compatibility AFTER plotting
for col in ['P_schedule', 'B_schedule']:
if col in results_df_to_save.columns:
results_df_to_save[col] = results_df_to_save[col].apply(lambda x: str(x) if isinstance(x, list) else x)
results_df_to_save.to_csv(results_save_path, index=False)
logger.info(f"Saved detailed rolling horizon results DataFrame to {results_save_path}")
except Exception as e:
logger.error(f"Failed to save rolling horizon results DataFrame: {e}", exc_info=True)
logger.info("Evaluation and plotting completed.")