632 lines
36 KiB
Python
632 lines
36 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
|
|
import logging
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from pathlib import Path
|
|
from datetime import timedelta
|
|
|
|
from pydantic import BaseModel
|
|
from tqdm import tqdm
|
|
|
|
# Import Forecasting Providers
|
|
from forecasting_model.utils.helper import load_config
|
|
from optimizer.forecasting.base import ForecastProvider
|
|
from optimizer.forecasting.single_model import SingleModelProvider
|
|
from optimizer.forecasting.ensemble import EnsembleProvider
|
|
|
|
from optimizer.optimization.battery import solve_battery_optimization_hourly
|
|
from optimizer.utils.optimizer_config_model import OptimizationRunConfig
|
|
from forecasting_model.utils.forecast_config_model import MainConfig, FeatureConfig #, DataConfig
|
|
|
|
# Import the loading functions
|
|
from optimizer.utils.model_io import load_single_model_artifact, load_ensemble_artifact
|
|
|
|
# Feature Engineering Import
|
|
from forecasting_model import engineer_features, load_raw_data
|
|
|
|
from typing import Dict, Any, Optional, Type
|
|
|
|
# Silence overly verbose libraries if needed
|
|
mpl_logger = logging.getLogger('matplotlib')
|
|
mpl_logger.setLevel(logging.WARNING)
|
|
pil_logger = logging.getLogger('PIL')
|
|
pil_logger.setLevel(logging.WARNING)
|
|
pl_logger = logging.getLogger('pytorch_lightning')
|
|
pl_logger.setLevel(logging.WARNING)
|
|
|
|
# --- Basic Logging Setup ---
|
|
logging.basicConfig(level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)-7s - %(message)s',
|
|
datefmt='%H:%M:%S')
|
|
# Get the root logger
|
|
logger = logging.getLogger()
|
|
|
|
def load_optimization_config(config_path: str) -> Type[BaseModel] | None:
|
|
"""Loads the main optimization configuration from a YAML file."""
|
|
return load_config(Path(config_path), OptimizationRunConfig)
|
|
|
|
def load_main_forecasting_config(config_path: str) -> Type[BaseModel] | None:
|
|
"""Loads the main forecasting configuration from a YAML file."""
|
|
return load_config(Path(config_path), MainConfig)
|
|
|
|
def check_and_adjust_decisions_given_state(decision_p: float, current_state_b: float, max_capacity_b: float) -> tuple[float, float]:
|
|
"""Clips decision based on battery capacity (0 to max_capacity)."""
|
|
potential_state = current_state_b + decision_p
|
|
new_state = max(0.0, min(potential_state, max_capacity_b))
|
|
valid_decision = new_state - current_state_b
|
|
return valid_decision, new_state
|
|
|
|
# --- Main Execution Logic ---
|
|
if __name__ == "__main__":
|
|
logger.info("Starting battery optimization evaluation using provider-specific engineered features.")
|
|
|
|
# --- Load Main Optimization Config ---
|
|
optimization_config_path = "optim_config.yaml"
|
|
optimization_config = load_optimization_config(optimization_config_path)
|
|
|
|
if optimization_config is None:
|
|
logger.critical("Failed to load main optimization config. Exiting.")
|
|
exit(1)
|
|
|
|
if not optimization_config.models:
|
|
logger.critical("No models or ensembles specified in optimization config. Exiting.")
|
|
exit(1)
|
|
|
|
# --- Load Original Historical Data (Get path from the first specified model config) ---
|
|
try:
|
|
first_model_config_path_str = optimization_config.models[0].model_config_path
|
|
main_forecasting_config_for_data = load_main_forecasting_config(first_model_config_path_str)
|
|
if main_forecasting_config_for_data is None:
|
|
raise ValueError(f"Failed to load forecasting config ({first_model_config_path_str}) to get data path.")
|
|
|
|
historical_data_config = main_forecasting_config_for_data.data
|
|
target_col = historical_data_config.target_col # Assume consistent target for now
|
|
|
|
logger.info(f"Loading original historical data from: {historical_data_config.data_path}")
|
|
full_historical_df = load_raw_data(historical_data_config) # load_raw_data handles initial cleaning
|
|
|
|
if full_historical_df.empty or target_col not in full_historical_df.columns:
|
|
raise ValueError(f"Loaded original historical data is empty or missing target column '{target_col}'.")
|
|
|
|
# Ensure DatetimeIndex and Frequency (load_raw_data should ideally handle this)
|
|
if not isinstance(full_historical_df.index, pd.DatetimeIndex):
|
|
raise TypeError("Loaded historical data must have a DatetimeIndex.")
|
|
# Attempt to set frequency if not already set and expected_frequency is defined
|
|
if full_historical_df.index.freq is None and historical_data_config.expected_frequency:
|
|
logger.warning(f"Data index frequency not set, attempting to set to '{historical_data_config.expected_frequency}'.")
|
|
try:
|
|
full_historical_df = full_historical_df.asfreq(historical_data_config.expected_frequency)
|
|
if full_historical_df[target_col].isnull().any():
|
|
logger.warning(f"NaNs found after setting frequency. Applying ffill().bfill() to '{target_col}'.")
|
|
full_historical_df[target_col] = full_historical_df[target_col].ffill().bfill()
|
|
if full_historical_df[target_col].isnull().any():
|
|
raise ValueError(f"NaNs remain after filling '{target_col}' post-asfreq.")
|
|
except Exception as e:
|
|
logger.error(f"Failed to set frequency to {historical_data_config.expected_frequency}: {e}", exc_info=True)
|
|
# Decide whether to proceed or exit based on importance of frequency
|
|
# For now, proceed but log warning. Providers might fail later if they assume frequency.
|
|
logger.warning("Proceeding without guaranteed index frequency.")
|
|
|
|
logger.info(f"Original historical data loaded and prepared. Shape: {full_historical_df.shape}, Time range: {full_historical_df.index.min()} to {full_historical_df.index.max()}, Freq: {full_historical_df.index.freq}")
|
|
|
|
except Exception as e:
|
|
logger.critical(f"Failed during initial data loading or preparation: {e}", exc_info=True)
|
|
exit(1)
|
|
|
|
|
|
# --- Load Models, Engineer Features per Provider, Instantiate Providers ---
|
|
providers_data: Dict[str, Dict[str, Any]] = {} # Stores {'provider_name': {'provider': instance, 'df': engineered_df, 'first_valid_forecast_time': timestamp}}
|
|
|
|
for model_eval_config in optimization_config.models:
|
|
provider_name = model_eval_config.name
|
|
artifact_type = model_eval_config.type
|
|
artifact_path = Path(model_eval_config.model_path)
|
|
model_config_path = Path(model_eval_config.model_config_path)
|
|
logger.info(f"--- Processing Provider: {provider_name} ({artifact_type}) ---")
|
|
|
|
provider_instance: Optional[ForecastProvider] = None
|
|
provider_feature_config: Optional[FeatureConfig] = None # Will hold the config needed for engineering
|
|
provider_target_scaler: Optional[Any] = None
|
|
provider_data_scaler: Optional[Any] = None # Added to store the data scaler
|
|
|
|
try:
|
|
if artifact_type == 'model':
|
|
logger.info(f"Loading single model artifact: {provider_name}")
|
|
target_scaler_path = Path(model_eval_config.target_scaler_path) if model_eval_config.target_scaler_path else None
|
|
data_scaler_path = Path(model_eval_config.data_scaler_path) if model_eval_config.data_scaler_path else None
|
|
input_size_path = Path(model_eval_config.input_size_path) if model_eval_config.input_size_path else None
|
|
|
|
loaded_artifact_info = load_single_model_artifact(
|
|
model_path=artifact_path,
|
|
config_path=model_config_path,
|
|
input_size_path=input_size_path,
|
|
target_scaler_path=target_scaler_path,
|
|
data_scaler_path=data_scaler_path
|
|
)
|
|
|
|
if not loaded_artifact_info:
|
|
raise ValueError("load_single_model_artifact returned None.")
|
|
|
|
# Check target column consistency
|
|
current_main_config = loaded_artifact_info['main_forecasting_config']
|
|
if current_main_config.data.target_col != target_col:
|
|
raise ValueError(f"Target column mismatch. Expected '{target_col}', found '{current_main_config.data.target_col}'.")
|
|
|
|
provider_feature_config = loaded_artifact_info['feature_config'] # Get the config directly
|
|
provider_target_scaler = loaded_artifact_info['target_scaler'] # Get the fitted target scaler
|
|
provider_data_scaler = loaded_artifact_info['data_scaler'] # Get the fitted data scaler
|
|
|
|
if provider_data_scaler is None:
|
|
logger.warning(f"Data scaler not found or loaded for model '{provider_name}'. Prediction input scaling will be skipped in the provider.")
|
|
# Or raise ValueError if data scaler is mandatory
|
|
|
|
# Instantiate Provider
|
|
provider_instance = SingleModelProvider(
|
|
model_instance=loaded_artifact_info['model_instance'],
|
|
feature_config=provider_feature_config, # Pass config for seq_len, horizons
|
|
target_scaler=provider_target_scaler, # Pass the loaded target scaler
|
|
data_scaler=provider_data_scaler
|
|
)
|
|
# Basic validation
|
|
if 1 not in provider_instance.get_forecast_horizons():
|
|
raise ValueError(f"Model must forecast horizon 1 for interpolation anchor. Horizons: {provider_instance.get_forecast_horizons()}")
|
|
|
|
elif artifact_type == 'ensemble':
|
|
logger.info(f"Loading ensemble artifact: {provider_name}")
|
|
# Assume ensemble definition json is in the HPO output base dir
|
|
hpo_base_output_dir_for_ensemble = artifact_path.parent
|
|
|
|
loaded_artifact_info = load_ensemble_artifact(
|
|
ensemble_definition_path=artifact_path,
|
|
hpo_base_output_dir=hpo_base_output_dir_for_ensemble
|
|
)
|
|
|
|
if not loaded_artifact_info or not loaded_artifact_info.get('fold_artifacts'):
|
|
raise ValueError("load_ensemble_artifact returned None or no fold artifacts.")
|
|
|
|
# Get the feature config from the first fold (assuming all are identical)
|
|
if not loaded_artifact_info['fold_artifacts'][0].get('feature_config'):
|
|
raise ValueError(f"First fold artifact for ensemble '{provider_name}' is missing 'feature_config'.")
|
|
provider_feature_config = loaded_artifact_info['fold_artifacts'][0]['feature_config']
|
|
logger.debug(f"Using feature config from first fold for ensemble '{provider_name}' engineering.")
|
|
|
|
# Instantiate Provider (passes individual fold artifacts internally)
|
|
provider_instance = EnsembleProvider(
|
|
fold_artifacts=loaded_artifact_info['fold_artifacts'],
|
|
ensemble_method=loaded_artifact_info['ensemble_method'],
|
|
)
|
|
|
|
# Basic validation (check if any fold forecasts horizon 1)
|
|
can_forecast_h1 = 1 in provider_instance.get_forecast_horizons()
|
|
if not can_forecast_h1:
|
|
raise ValueError("Ensemble has no folds that forecast horizon 1. Cannot use for interpolation anchor.")
|
|
|
|
else:
|
|
raise ValueError(f"Unknown artifact type '{artifact_type}'.")
|
|
|
|
# --- Feature Engineering for this Provider ---
|
|
logger.info(f"Engineering features specifically for '{provider_name}' using its required configuration.")
|
|
if provider_feature_config is None:
|
|
raise RuntimeError(f"Could not determine feature config for {provider_name}")
|
|
|
|
engineered_df_provider = engineer_features(
|
|
full_historical_df.copy(), # Use copy of original data
|
|
target_col=target_col,
|
|
feature_config=provider_feature_config # Use the relevant config directly
|
|
)
|
|
|
|
# Handle NaNs introduced by feature engineering - crucial step !!!
|
|
initial_nan_count = engineered_df_provider.isnull().sum().sum()
|
|
first_valid_index_dt = engineered_df_provider.first_valid_index()
|
|
|
|
if pd.isna(first_valid_index_dt):
|
|
raise ValueError(f"Engineered DataFrame for '{provider_name}' contains only NaNs.")
|
|
|
|
logger.info(f"Feature engineering for '{provider_name}' complete. Shape: {engineered_df_provider.shape}. First valid index: {first_valid_index_dt}")
|
|
if initial_nan_count > 0:
|
|
logger.debug(f"Found {initial_nan_count} NaN values initially (expected due to lookback).")
|
|
|
|
# --- Determine First Valid Forecast Time ---
|
|
provider_seq_len = provider_instance.get_required_lookback()
|
|
try:
|
|
first_valid_loc = engineered_df_provider.index.get_loc(first_valid_index_dt)
|
|
first_possible_end_loc = first_valid_loc + provider_seq_len - 1
|
|
if first_possible_end_loc >= len(engineered_df_provider.index):
|
|
raise ValueError(f"Not enough data points ({len(engineered_df_provider.index)} starting {first_valid_index_dt}) to form sequence length {provider_seq_len}.")
|
|
first_valid_forecast_time = engineered_df_provider.index[first_possible_end_loc]
|
|
logger.info(f"First possible forecast time (t=0) for '{provider_name}': {first_valid_forecast_time} (needs seq_len={provider_seq_len})")
|
|
except KeyError:
|
|
raise ValueError(f"Consistency error: first_valid_index {first_valid_index_dt} not found in engineered DF index.")
|
|
except IndexError:
|
|
raise ValueError(f"Indexing error while determining first valid forecast time for {provider_name}.")
|
|
|
|
|
|
# Store provider instance, its dedicated DF, and its first valid time
|
|
providers_data[provider_name] = {
|
|
'provider': provider_instance,
|
|
'df': engineered_df_provider,
|
|
'first_valid_forecast_time': first_valid_forecast_time
|
|
}
|
|
logger.info(f"Successfully processed and prepared provider '{provider_name}'.")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process or prepare provider '{provider_name}': {e}. Skipping this provider.", exc_info=True)
|
|
if provider_name in providers_data:
|
|
del providers_data[provider_name]
|
|
continue # Skip to the next provider in the config
|
|
|
|
# --- End Loading/Preparing Providers ---
|
|
|
|
if not providers_data:
|
|
logger.critical("No forecast providers were successfully created. Cannot proceed. Exiting.")
|
|
exit(1)
|
|
|
|
successfully_loaded_provider_names = list(providers_data.keys())
|
|
logger.info(f"Successfully prepared {len(successfully_loaded_provider_names)} providers: {successfully_loaded_provider_names}")
|
|
|
|
|
|
# --- Determine Overall Evaluation Window based on Timestamps ---
|
|
optimization_horizon_hours = optimization_config.optimization_horizon_hours
|
|
# step_size_hours = 1 # Assuming hourly steps for the sliding window, is fixed for our case.
|
|
|
|
# Find the latest "first valid forecast time" across all providers
|
|
# This determines the earliest time we can start the evaluation for *all* providers
|
|
global_first_valid_forecast_time = max(p_data['first_valid_forecast_time'] for p_data in providers_data.values())
|
|
|
|
# The first evaluation window STARTS at the hour *after* this time
|
|
first_window_start_time = global_first_valid_forecast_time + timedelta(hours=1) # Shift by one hour
|
|
|
|
# The last evaluation window START time is such that the window fits within the original historical data
|
|
last_possible_window_end_time = full_historical_df.index.max()
|
|
last_window_start_time = last_possible_window_end_time - timedelta(hours=optimization_horizon_hours - 1)
|
|
|
|
# Create the range of window start times
|
|
evaluation_start_times = pd.date_range(
|
|
start=first_window_start_time,
|
|
end=last_window_start_time,
|
|
freq='1h' # Use frequency string 'h' for hourly
|
|
)
|
|
|
|
if evaluation_start_times.empty:
|
|
logger.critical(f"Not enough data for evaluation window. Global first possible window start: {first_window_start_time}, Last possible window start: {last_window_start_time}. Check data length, lookbacks, sequence lengths, and optimization horizon.")
|
|
exit(1)
|
|
|
|
logger.info(f"Evaluating optimization over windows starting from {evaluation_start_times.min()} to {evaluation_start_times.max()} ({len(evaluation_start_times)} windows).")
|
|
|
|
# --- Evaluation Loop ---
|
|
window_results_list = []
|
|
# Wrap the iterable with idx, tqdm and add a description
|
|
# For debugging: # TODO implement a debug flag!
|
|
# for idx, window_start_time in enumerate(tqdm(evaluation_start_times[-200:], desc="Evaluating Optimization Windows"), start=0):
|
|
for idx, window_start_time in enumerate(tqdm(evaluation_start_times, desc="Evaluating Optimization Windows"), start=0):
|
|
window_end_time = window_start_time + timedelta(hours=optimization_horizon_hours - 1)
|
|
# Changed log level to debug to avoid flooding console when using tqdm
|
|
logger.debug(f"Processing window: {window_start_time.strftime('%Y-%m-%d %H:%M')} to {window_end_time.strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
# Get actual prices for the optimization window from the *original* DF using timestamps
|
|
try:
|
|
actual_prices_in_window = full_historical_df.loc[window_start_time:window_end_time, target_col].tolist()
|
|
if len(actual_prices_in_window) != optimization_horizon_hours:
|
|
logger.warning(f"Skipping window starting {window_start_time}: Actual prices slice length {len(actual_prices_in_window)} != {optimization_horizon_hours}. Check data continuity.")
|
|
continue
|
|
except Exception as e: # Catch potential KeyError or other slicing issues
|
|
logger.warning(f"Skipping window starting {window_start_time}: Could not retrieve actual prices from original DF. Error: {e}")
|
|
continue
|
|
|
|
# --- Determine Forecast Input Point (Timestamp) ---
|
|
# The forecast needs to predict prices for the window starting at window_start_time.
|
|
# The input sequence for the forecast model ends at the timestamp *just before* this window starts.
|
|
# TODO!!!! This might be a major disadvantage! Check this consideration!!!
|
|
forecast_start_time = window_start_time - timedelta(hours=1) # t=0 for the forecast
|
|
|
|
# Get the last actual price (at t=0) needed for interpolation anchor
|
|
try:
|
|
last_actual_price = full_historical_df.loc[forecast_start_time, target_col]
|
|
if pd.isna(last_actual_price):
|
|
logger.warning(f"Last actual price at {forecast_start_time} is NaN. Cannot use as interpolation anchor. Skipping window.")
|
|
continue
|
|
except KeyError:
|
|
logger.error(f"Cannot get last actual price at timestamp {forecast_start_time} from original DataFrame. Skipping window.")
|
|
continue
|
|
except Exception as e:
|
|
logger.error(f"Error getting last actual price at {forecast_start_time}: {e}. Skipping window.")
|
|
continue
|
|
|
|
# --- Collect Window Results ---
|
|
window_results = {
|
|
'start_time': window_start_time,
|
|
'end_time': window_end_time,
|
|
'actual_prices': actual_prices_in_window
|
|
}
|
|
|
|
# --- Baseline Optimization (uses actual prices) ---
|
|
baseline_prices_input = np.array(actual_prices_in_window)
|
|
logger.debug(f"Running baseline optimization for window starting {window_start_time}")
|
|
# Set the "initial" battery state to the last battery state from the previous window
|
|
if idx:
|
|
# The battery state of the current window is the state of B_t_1 of the last window
|
|
# (since we only took, the initial optimal suggested power buy/sell
|
|
# Same goes for profit/cost
|
|
prev_B_state = window_results_list[-1]['baseline']['B_state']
|
|
prev_acc_profit = window_results_list[-1]['baseline']['acc_profit']
|
|
else:
|
|
prev_B_state = optimization_config.initial_b
|
|
prev_acc_profit = 0
|
|
|
|
try:
|
|
baseline_status, baseline_profit, baseline_P, baseline_B = solve_battery_optimization_hourly(
|
|
baseline_prices_input, prev_B_state, optimization_config.max_capacity, optimization_config.max_rate
|
|
)
|
|
# Check decision and only really buy if battery state allows for it!
|
|
decision = baseline_P[0]
|
|
allowed_p_decision, resulting_b_state = check_and_adjust_decisions_given_state(
|
|
decision, prev_B_state, optimization_config.max_capacity
|
|
)
|
|
# We must invert the sign because buy (=charge) means spent (=-money)!! (and the other way round)
|
|
acc_profit = prev_acc_profit + (-1 * allowed_p_decision * baseline_prices_input[0])
|
|
|
|
window_results['baseline'] = { "status": baseline_status,
|
|
"P_schedule": baseline_P.tolist() if baseline_P is not None else None,
|
|
"B_schedule": baseline_B.tolist() if baseline_B is not None else None,
|
|
"P_decision": allowed_p_decision,
|
|
"B_state": resulting_b_state,
|
|
"acc_profit": acc_profit}
|
|
|
|
logger.debug(f"Baseline profit: {baseline_profit if baseline_profit is not None else 'N/A'}")
|
|
except Exception as e:
|
|
logger.error(f"Baseline optimization failed for window {window_start_time}: {e}", exc_info=True)
|
|
window_results['baseline'] = {"status": "Error", "P_schedule": None, "P_decision": None,
|
|
"B_schedule": None, "B_state": None, "acc_profit": None}
|
|
|
|
|
|
# --- Forecast Provider Optimizations ---
|
|
for provider_name in successfully_loaded_provider_names:
|
|
provider_data = providers_data[provider_name]
|
|
provider_instance = provider_data['provider']
|
|
engineered_df_provider = provider_data['df'] # Get the dedicated *unscaled* DF
|
|
|
|
logger.debug(f"Running forecast and optimization for provider '{provider_name}'")
|
|
|
|
# Generate forecast using the new method signature with dedicated DF and timestamp
|
|
try:
|
|
forecast_prices_input = provider_instance.get_forecast(
|
|
engineered_df=engineered_df_provider, # Pass the provider's specific UNscaled DF
|
|
forecast_start_time=forecast_start_time, # Pass the t=0 timestamp
|
|
optimization_horizon_hours=optimization_horizon_hours,
|
|
last_actual_price=last_actual_price
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error calling get_forecast for provider '{provider_name}' window {window_start_time}: {e}", exc_info=True)
|
|
forecast_prices_input = None
|
|
|
|
|
|
if forecast_prices_input is None:
|
|
logger.warning(f"Forecast generation failed for provider '{provider_name}' window {window_start_time}. Skipping optimization.")
|
|
window_results[provider_name] = {"status": "Forecast Generation Failed", "P_schedule": None,
|
|
"B_schedule": None, "P_decision": None, "B_state": None, "acc_profit": None}
|
|
continue
|
|
|
|
if not isinstance(forecast_prices_input, np.ndarray) or forecast_prices_input.shape != (optimization_horizon_hours,):
|
|
logger.error(f"Forecast from provider '{provider_name}' has incorrect shape {forecast_prices_input.shape} (expected {(optimization_horizon_hours,)}). Skipping optimization.")
|
|
window_results[provider_name] = {"status": "Invalid Forecast Format", "P_schedule": None,
|
|
"B_schedule": None, "P_decision": None, "B_state": None, "acc_profit": None}
|
|
continue
|
|
|
|
# --- Run Optimization with Forecast Prices ---
|
|
if idx:
|
|
# See above
|
|
prev_B_state_model = window_results_list[-1][provider_name]['B_state']
|
|
prev_acc_profit_model = window_results_list[-1][provider_name]['acc_profit']
|
|
else:
|
|
prev_B_state_model = optimization_config.initial_b
|
|
prev_acc_profit_model = 0
|
|
try:
|
|
model_status, optim_model_profit, model_P, model_B = solve_battery_optimization_hourly(
|
|
forecast_prices_input, prev_B_state_model, optimization_config.max_capacity, optimization_config.max_rate
|
|
)
|
|
# Important, get the price at real_t_0 (=window_start_time), do not calc the profit using the forecast!
|
|
real_price_at_t0 = engineered_df_provider.iloc[engineered_df_provider.index.get_loc(window_start_time)][target_col]
|
|
if real_price_at_t0 != baseline_prices_input[0]:
|
|
logger.warning(f"Baseline {baseline_prices_input[0]} and {provider_name} {real_price_at_t0} should buy for the same price but didn't.")
|
|
|
|
# Check decision and only really buy if battery state allows for it!
|
|
model_decision = model_P[0]
|
|
allowed_p_decision_model, resulting_b_state_model = check_and_adjust_decisions_given_state(
|
|
model_decision, prev_B_state_model, optimization_config.max_capacity
|
|
)
|
|
acc_profit_model = prev_acc_profit_model + (-1 * allowed_p_decision_model * real_price_at_t0)
|
|
|
|
window_results[provider_name] = { "status": model_status,
|
|
"P_schedule": model_P.tolist() if model_P is not None else None,
|
|
"B_schedule": model_B.tolist() if model_B is not None else None,
|
|
"P_decision": allowed_p_decision_model,
|
|
"B_state": resulting_b_state_model,
|
|
"acc_profit": acc_profit_model}
|
|
logger.debug(f"Provider '{provider_name}' profit: {optim_model_profit if optim_model_profit is not None else 'N/A'}")
|
|
except Exception as e:
|
|
logger.error(f"Optimization failed for provider '{provider_name}' window {window_start_time}: {e}", exc_info=True)
|
|
window_results[provider_name] = {"status": "Optimization Error", "P_schedule": None,
|
|
"B_schedule": None, "P_decision": None, "B_state": None, "acc_profit": None}
|
|
|
|
# Append results for this window
|
|
window_results_list.append(window_results)
|
|
logger.debug(f"Finished processing window starting at: {window_start_time.strftime('%Y-%m-%d %H:%M')}")
|
|
|
|
logger.info(f"Finished processing all {len(evaluation_start_times)} evaluation windows.")
|
|
|
|
|
|
# --- Post-processing and Plotting ---
|
|
logger.info("Starting results analysis and plotting.")
|
|
|
|
if not window_results_list:
|
|
logger.warning("No window results were collected. Skipping analysis and plotting.")
|
|
exit(0)
|
|
|
|
# Convert results list to a DataFrame
|
|
flat_results = []
|
|
for window_res in window_results_list:
|
|
base_info = { 'start_time': window_res['start_time'], 'end_time': window_res['end_time'], }
|
|
# Add baseline results
|
|
# Ensure schedules are lists, even if None (for consistent schema)
|
|
baseline_res = window_res.get('baseline', {})
|
|
baseline_res['P_schedule'] = baseline_res.get('P_schedule') if baseline_res.get('P_schedule') is not None else []
|
|
baseline_res['B_schedule'] = baseline_res.get('B_schedule') if baseline_res.get('B_schedule') is not None else []
|
|
flat_results.append({**base_info, 'type': 'baseline', **baseline_res})
|
|
# Add provider results
|
|
for provider_name in successfully_loaded_provider_names:
|
|
provider_res = window_res.get(provider_name, {"status": "Missing Result", "acc_profit": np.nan, "P_schedule": [], "B_schedule": []})
|
|
provider_res['P_schedule'] = provider_res.get('P_schedule') if provider_res.get('P_schedule') is not None else []
|
|
provider_res['B_schedule'] = provider_res.get('B_schedule') if provider_res.get('B_schedule') is not None else []
|
|
flat_results.append({**base_info, 'type': provider_name, **provider_res})
|
|
|
|
results_df = pd.DataFrame(flat_results)
|
|
results_df['start_time'] = pd.to_datetime(results_df['start_time'])
|
|
results_df.set_index('start_time', inplace=True) # Set time index
|
|
|
|
# Sort by index and type to ensure correct diff calculation
|
|
results_df.sort_index(inplace=True)
|
|
|
|
# Calculate Hourly Profit (difference in cumulative profit per type)
|
|
# Important: Group by 'type' before diffing to avoid comparing different types
|
|
results_df['hourly_profit'] = results_df.groupby('type')['acc_profit'].diff()
|
|
# Fill the first NaN value for each group with the first acc_profit value
|
|
results_df['hourly_profit'] = results_df['hourly_profit'].fillna(results_df['acc_profit'])
|
|
|
|
|
|
# --- Plotting ---
|
|
logger.info("Generating plots.")
|
|
output_dir = Path(optimization_config.output_dir) if optimization_config.output_dir else Path(".")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
logger.info(f"Saving plots to: {output_dir.resolve()}")
|
|
|
|
|
|
# Plot 1: Hourly Profit Over Time
|
|
hourly_profit_pivot = results_df.pivot_table(index=results_df.index, columns='type', values='hourly_profit')
|
|
if not hourly_profit_pivot.empty and not hourly_profit_pivot.isnull().all().all():
|
|
fig1, ax = plt.subplots(figsize=(15, 7))
|
|
plot_data = hourly_profit_pivot.dropna(axis=1, how='all')
|
|
if not plot_data.empty:
|
|
sns.lineplot(data=plot_data, ax=ax, dashes=False) # Ensure solid lines for clarity
|
|
ax.set_xlabel('Time (Window Start)')
|
|
ax.set_ylabel('Hourly Profit (€)')
|
|
ax.set_title('Rolling Horizon: Hourly Profit Comparison')
|
|
ax.legend(title='Strategy')
|
|
ax.axhline(0, color='grey', linestyle='--', linewidth=0.8) # Add zero line
|
|
plt.grid()
|
|
plt.tight_layout()
|
|
plot_path = output_dir / "hourly_profit_rolling.png"
|
|
plt.savefig(plot_path)
|
|
logger.info(f"Hourly Profit plot saved to {plot_path}")
|
|
plt.close(fig1)
|
|
else:
|
|
logger.warning("Hourly profit data is all NaN after filtering. Skipping Hourly Profit plot.")
|
|
else:
|
|
logger.warning("No valid data available to plot Hourly Profit.")
|
|
|
|
|
|
# Plot 2: Cumulative Profit Over Time
|
|
cumulative_profit_pivot = results_df.pivot_table(index=results_df.index, columns='type', values='acc_profit')
|
|
if not cumulative_profit_pivot.empty and not cumulative_profit_pivot.isnull().all().all():
|
|
fig2, ax = plt.subplots(figsize=(15, 7))
|
|
plot_data = cumulative_profit_pivot.dropna(axis=1, how='all')
|
|
if not plot_data.empty:
|
|
sns.lineplot(data=plot_data, ax=ax, dashes=False)
|
|
ax.set_xlabel('Time (Window Start)')
|
|
ax.set_ylabel('Cumulative Profit (€)')
|
|
ax.set_title('Rolling Horizon: Cumulative Profit Comparison')
|
|
ax.legend(title='Strategy')
|
|
plt.tight_layout()
|
|
plot_path = output_dir / "cumulative_profit_rolling.png"
|
|
plt.savefig(plot_path)
|
|
logger.info(f"Cumulative Profit plot saved to {plot_path}")
|
|
plt.close(fig2)
|
|
else:
|
|
logger.warning("Cumulative profit data is all NaN after filtering. Skipping Cumulative Profit plot.")
|
|
else:
|
|
logger.warning("No valid data available to plot Cumulative Profit.")
|
|
|
|
|
|
# Plot 3: Example Window Schedule (Calculated Schedule vs Actual Price)
|
|
# Select the first valid window's results
|
|
if not results_df.empty:
|
|
example_window_start_time = results_df.index.unique()[0]
|
|
example_window_data = results_df.loc[[example_window_start_time]]
|
|
# Find the corresponding entry in the original list to get actual prices easily
|
|
example_window_details = next((item for item in window_results_list if item['start_time'] == example_window_start_time), None)
|
|
actual_prices_example_window = example_window_details.get('actual_prices') if example_window_details else None
|
|
|
|
if actual_prices_example_window and not example_window_data.empty and len(actual_prices_example_window) == optimization_config.optimization_horizon_hours:
|
|
fig3, ax1 = plt.subplots(figsize=(15, 7))
|
|
ax2 = ax1.twinx() # for prices
|
|
|
|
hours = np.arange(optimization_config.optimization_horizon_hours)
|
|
plot_styles = {'baseline': 'r--', **{name: next(ax1._get_lines.prop_cycler)['color'] + '-' for name in successfully_loaded_provider_names}}
|
|
|
|
for provider_type in ['baseline'] + successfully_loaded_provider_names:
|
|
provider_row = example_window_data[example_window_data['type'] == provider_type]
|
|
if not provider_row.empty:
|
|
# Get the *calculated* full schedule for this window
|
|
calculated_p = provider_row['P_schedule'].iloc[0]
|
|
# Get the single decision that was *executed*
|
|
executed_p_first_hour = provider_row['P_decision'].iloc[0]
|
|
|
|
if isinstance(calculated_p, list) and len(calculated_p) == optimization_config.optimization_horizon_hours:
|
|
color, style = plot_styles[provider_type][:-1], plot_styles[provider_type][-1]
|
|
# Plot the full calculated schedule
|
|
ax1.plot(hours, calculated_p, label=f'{provider_type} Calculated P Schedule', color=color, linestyle=':', marker='.') # Dotted for calculated
|
|
# Highlight the executed first step
|
|
if executed_p_first_hour is not None:
|
|
ax1.scatter([0], [executed_p_first_hour], label=f'{provider_type} Executed P (Hour 0)', color=color, marker='X', s=100, zorder=5) # Large 'X' for executed
|
|
|
|
# Plot actual prices for the window
|
|
ax2.plot(hours, actual_prices_example_window, label='Actual Price in Window', color='grey', linestyle='-', marker='o', markersize=4)
|
|
ax2.set_ylabel('Price (€/MWh)', color='grey')
|
|
ax2.tick_params(axis='y', labelcolor='grey')
|
|
|
|
ax1.set_xlabel('Hour within Window')
|
|
ax1.set_ylabel('Power (MW)')
|
|
ax1.set_title(f'Rolling Horizon: Example Window Schedule ({example_window_start_time.strftime("%Y-%m-%d %H:%M")})')
|
|
ax1.axhline(0, color='black', linestyle='-', linewidth=0.5) # Zero power line
|
|
# Combine legends
|
|
lines, labels = ax1.get_legend_handles_labels()
|
|
lines2, labels2 = ax2.get_legend_handles_labels()
|
|
# Custom legend handling to avoid duplicate labels if many providers
|
|
unique_labels = {}
|
|
for line, label in zip(lines + lines2, labels + labels2):
|
|
if label not in unique_labels:
|
|
unique_labels[label] = line
|
|
ax1.legend(unique_labels.values(), unique_labels.keys(), loc='best')
|
|
|
|
plt.xticks(hours) # Show all hours
|
|
plt.grid(True, axis='x', linestyle=':', linewidth=0.5)
|
|
plt.tight_layout()
|
|
plot_path = output_dir / "example_window_schedule_rolling.png"
|
|
plt.savefig(plot_path)
|
|
logger.info(f"Example Window Schedule plot saved to {plot_path}")
|
|
plt.close(fig3)
|
|
else:
|
|
logger.warning("Could not generate example window schedule plot (data missing or incorrect length).")
|
|
else:
|
|
logger.warning("Results DataFrame is empty, cannot generate example window plot.")
|
|
|
|
|
|
# --- Save Results DataFrame (Optional) ---
|
|
try:
|
|
results_save_path = output_dir / "optimization_results_rolling.csv"
|
|
# Reset index to include start_time as a column
|
|
results_df_to_save = results_df.reset_index()
|
|
# Convert list columns (schedules) to strings for CSV compatibility AFTER plotting
|
|
for col in ['P_schedule', 'B_schedule']:
|
|
if col in results_df_to_save.columns:
|
|
results_df_to_save[col] = results_df_to_save[col].apply(lambda x: str(x) if isinstance(x, list) else x)
|
|
|
|
results_df_to_save.to_csv(results_save_path, index=False)
|
|
logger.info(f"Saved detailed rolling horizon results DataFrame to {results_save_path}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to save rolling horizon results DataFrame: {e}", exc_info=True)
|
|
|
|
|
|
logger.info("Evaluation and plotting completed.")
|