Files
entrix_case_challange/optim_run_daa.py
2025-05-06 09:11:36 +02:00

627 lines
34 KiB
Python

import pandas as pd
import numpy as np
import logging
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from datetime import timedelta
from pydantic import BaseModel
from tqdm import tqdm
# Import Forecasting Providers
from forecasting_model.utils.helper import load_config
from optimizer.forecasting.base import ForecastProvider
from optimizer.forecasting.single_model import SingleModelProvider
from optimizer.forecasting.ensemble import EnsembleProvider
from optimizer.optimization.battery import solve_battery_optimization_hourly
from optimizer.utils.optimizer_config_model import OptimizationRunConfig
from forecasting_model.utils.forecast_config_model import MainConfig, FeatureConfig #, DataConfig
# Import the loading functions
from optimizer.utils.model_io import load_single_model_artifact, load_ensemble_artifact
# Feature Engineering Import
from forecasting_model import engineer_features, load_raw_data
from typing import Dict, Any, Optional, Type, List, Tuple
# Silence overly verbose libraries if needed
mpl_logger = logging.getLogger('matplotlib')
mpl_logger.setLevel(logging.WARNING)
pil_logger = logging.getLogger('PIL')
pil_logger.setLevel(logging.WARNING)
pl_logger = logging.getLogger('pytorch_lightning')
pl_logger.setLevel(logging.WARNING)
# --- Basic Logging Setup ---
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)-7s - %(message)s',
datefmt='%H:%M:%S')
# Get the root logger
logger = logging.getLogger()
def load_optimization_config(config_path: str) -> Type[BaseModel] | None:
"""Loads the main optimization configuration from a YAML file."""
return load_config(Path(config_path), OptimizationRunConfig)
def load_main_forecasting_config(config_path: str) -> Type[BaseModel] | None:
"""Loads the main forecasting configuration from a YAML file."""
return load_config(Path(config_path), MainConfig)
def check_and_adjust_decisions_given_state(decision_p: float, current_state_b: float, max_capacity_b: float) -> tuple[float, float]:
"""Clips decision based on battery capacity (0 to max_capacity)."""
potential_state = current_state_b + decision_p
new_state = max(0.0, min(potential_state, max_capacity_b))
valid_decision = new_state - current_state_b
return valid_decision, new_state
def simulate_daily_schedule(
planned_p_schedule: np.ndarray,
actual_prices_day: np.ndarray,
initial_b_state: float,
max_capacity: float,
max_rate: float # Although max_rate isn't used in check_and_adjust, it's conceptually part of battery limits
) -> Tuple[float, float, List[float], List[float]]:
"""
Simulates the execution of a planned 24h schedule against actual prices.
Args:
planned_p_schedule: Array of 24 planned power decisions (charge positive).
actual_prices_day: Array of 24 actual prices for the day.
initial_b_state: Battery state at the beginning of the day (00:00).
max_capacity: Maximum battery capacity.
max_rate: Maximum charge/discharge rate (implicitly handled by optimizer output).
Returns:
Tuple containing:
- total_daily_profit: The profit realized over the 24 hours.
- final_b_state_day_end: Battery state at the end of the day (after the 23:00 action).
- executed_p_hourly: List of the actual power decisions made each hour.
- actual_b_hourly: List of the battery state *before* each hourly action.
"""
if len(planned_p_schedule) != 24 or len(actual_prices_day) != 24:
raise ValueError(f"Inputs must have length 24. Got schedule: {len(planned_p_schedule)}, prices: {len(actual_prices_day)}")
current_b_state = initial_b_state
total_daily_profit = 0.0
executed_p_hourly = []
actual_b_hourly = [] # State at the beginning of each hour
for h in range(24):
planned_p_decision = planned_p_schedule[h]
actual_price_hour = actual_prices_day[h]
actual_b_hourly.append(current_b_state) # Store state *before* action
# Check feasibility against current state and capacity
executed_p_decision, next_b_state = check_and_adjust_decisions_given_state(
planned_p_decision, current_b_state, max_capacity
)
# Calculate profit/cost for the executed action (Buy (+P) is cost, Sell (-P) is revenue)
# TODO: We might want to check this -> in my mind, DAA prices are *binding* so what happens if we cannot charge?
# Do we have to buy/pay anyways? -> Do this. planned_p_decision > executed_p_decision when calculating prices
hourly_profit = -1 * planned_p_decision * actual_price_hour
total_daily_profit += hourly_profit
# Store executed action and update state for the next hour
executed_p_hourly.append(executed_p_decision)
current_b_state = next_b_state
# final_b_state_day_end = current_b_state (state after the last hour's action)
return total_daily_profit, current_b_state, executed_p_hourly, actual_b_hourly
# --- Main Execution Logic ---
if __name__ == "__main__":
logger.info("Starting DAA battery optimization evaluation using provider-specific engineered features.")
# --- Load Main Optimization Config ---
optimization_config_path = "optim_config.yaml"
optimization_config = load_optimization_config(optimization_config_path)
if optimization_config is None:
logger.critical("Failed to load main optimization config. Exiting.")
exit(1)
if not optimization_config.models:
logger.critical("No models or ensembles specified in optimization config. Exiting.")
exit(1)
# Fixed DAA horizon
OPTIMIZATION_HORIZON_HOURS = 24
# --- Load Original Historical Data ---
try:
first_model_config_path_str = optimization_config.models[0].model_config_path
main_forecasting_config_for_data = load_main_forecasting_config(first_model_config_path_str)
if main_forecasting_config_for_data is None:
raise ValueError(f"Failed to load forecasting config ({first_model_config_path_str}) to get data path.")
historical_data_config = main_forecasting_config_for_data.data
target_col = historical_data_config.target_col # Assume consistent target for now
logger.info(f"Loading original historical data from: {historical_data_config.data_path}")
full_historical_df = load_raw_data(historical_data_config) # load_raw_data handles initial cleaning
if full_historical_df.empty or target_col not in full_historical_df.columns:
raise ValueError(f"Loaded original historical data is empty or missing target column '{target_col}'.")
if not isinstance(full_historical_df.index, pd.DatetimeIndex):
raise TypeError("Loaded historical data must have a DatetimeIndex.")
if full_historical_df.index.freq is None and historical_data_config.expected_frequency:
logger.warning(f"Data index frequency not set, attempting to set to '{historical_data_config.expected_frequency}'.")
try:
full_historical_df = full_historical_df.asfreq(historical_data_config.expected_frequency)
if full_historical_df[target_col].isnull().any():
logger.warning(f"NaNs found after setting frequency. Applying ffill().bfill() to '{target_col}'.")
full_historical_df[target_col] = full_historical_df[target_col].ffill().bfill()
if full_historical_df[target_col].isnull().any():
raise ValueError(f"NaNs remain after filling '{target_col}' post-asfreq.")
except Exception as e:
logger.error(f"Failed to set frequency to {historical_data_config.expected_frequency}: {e}", exc_info=True)
logger.warning("Proceeding without guaranteed index frequency.")
logger.info(f"Original historical data loaded. Shape: {full_historical_df.shape}, Range: {full_historical_df.index.min()} to {full_historical_df.index.max()}, Freq: {full_historical_df.index.freq}")
except Exception as e:
logger.critical(f"Failed during initial data loading: {e}", exc_info=True)
exit(1)
# --- Load Models, Engineer Features per Provider, Instantiate Providers ---
providers_data: Dict[str, Dict[str, Any]] = {} # Stores {'provider_name': {'provider': instance, 'df': engineered_df, 'first_valid_forecast_time': timestamp}}
for model_eval_config in optimization_config.models:
provider_name = model_eval_config.name
artifact_type = model_eval_config.type
artifact_path = Path(model_eval_config.model_path)
model_config_path = Path(model_eval_config.model_config_path)
logger.info(f"--- Processing Provider: {provider_name} ({artifact_type}) ---")
provider_instance: Optional[ForecastProvider] = None
provider_feature_config: Optional[FeatureConfig] = None
provider_target_scaler: Optional[Any] = None
provider_data_scaler: Optional[Any] = None
try:
# --- Load Model/Ensemble Artifact (Similar to original script) ---
if artifact_type == 'model':
logger.info(f"Loading single model artifact: {provider_name}")
# (Load artifact details as before...)
target_scaler_path = Path(model_eval_config.target_scaler_path) if model_eval_config.target_scaler_path else None
data_scaler_path = Path(model_eval_config.data_scaler_path) if model_eval_config.data_scaler_path else None
input_size_path = Path(model_eval_config.input_size_path) if model_eval_config.input_size_path else None
loaded_artifact_info = load_single_model_artifact(
model_path=artifact_path,
config_path=model_config_path,
input_size_path=input_size_path,
target_scaler_path=target_scaler_path,
data_scaler_path=data_scaler_path
)
if not loaded_artifact_info: raise ValueError("load_single_model_artifact returned None.")
current_main_config = loaded_artifact_info['main_forecasting_config']
if current_main_config.data.target_col != target_col: raise ValueError(f"Target column mismatch.")
provider_feature_config = loaded_artifact_info['feature_config']
provider_target_scaler = loaded_artifact_info['target_scaler']
provider_data_scaler = loaded_artifact_info['data_scaler']
if provider_data_scaler is None: logger.warning(f"Data scaler not found for '{provider_name}'.")
provider_instance = SingleModelProvider(
model_instance=loaded_artifact_info['model_instance'],
feature_config=provider_feature_config,
target_scaler=provider_target_scaler,
data_scaler=provider_data_scaler
)
if 1 not in provider_instance.get_forecast_horizons(): # Still useful check for interpolation
raise ValueError(f"Model must forecast horizon 1. Horizons: {provider_instance.get_forecast_horizons()}")
elif artifact_type == 'ensemble':
logger.info(f"Loading ensemble artifact: {provider_name}")
# (Load artifact details as before...)
hpo_base_output_dir_for_ensemble = artifact_path.parent
loaded_artifact_info = load_ensemble_artifact(
ensemble_definition_path=artifact_path,
hpo_base_output_dir=hpo_base_output_dir_for_ensemble
)
if not loaded_artifact_info or not loaded_artifact_info.get('fold_artifacts'): raise ValueError("load_ensemble_artifact failed.")
if not loaded_artifact_info['fold_artifacts'][0].get('feature_config'): raise ValueError(f"Missing feature_config in ensemble fold.")
provider_feature_config = loaded_artifact_info['fold_artifacts'][0]['feature_config']
provider_instance = EnsembleProvider(
fold_artifacts=loaded_artifact_info['fold_artifacts'],
ensemble_method=loaded_artifact_info['ensemble_method'],
)
if 1 not in provider_instance.get_forecast_horizons():
raise ValueError("Ensemble has no folds that forecast horizon 1.")
else:
raise ValueError(f"Unknown artifact type '{artifact_type}'.")
# --- Feature Engineering for this Provider ---
logger.info(f"Engineering features specifically for '{provider_name}'.")
if provider_feature_config is None: raise RuntimeError("Could not determine feature config.")
engineered_df_provider = engineer_features(
full_historical_df.copy(),
target_col=target_col,
feature_config=provider_feature_config
)
first_valid_index_dt = engineered_df_provider.first_valid_index()
if pd.isna(first_valid_index_dt): raise ValueError("Engineered DF contains only NaNs.")
logger.info(f"Feature engineering for '{provider_name}' complete. First valid index: {first_valid_index_dt}")
# --- Determine First Timestamp usable for *Starting* a Forecast ---
# This is the earliest timestamp where the model has enough historical features.
# The model will use data *up to* this timestamp to predict the *next* steps.
provider_seq_len = provider_instance.get_required_lookback()
try:
first_valid_loc = engineered_df_provider.index.get_loc(first_valid_index_dt)
first_possible_input_end_loc = first_valid_loc + provider_seq_len - 1
if first_possible_input_end_loc >= len(engineered_df_provider.index):
raise ValueError(f"Not enough data ({len(engineered_df_provider.index)} starting {first_valid_index_dt}) for lookback {provider_seq_len}.")
# This is the last timestamp included in the *input* for the *first possible* forecast
first_usable_input_timestamp = engineered_df_provider.index[first_possible_input_end_loc]
logger.info(f"First usable timestamp as input end (t=0) for '{provider_name}': {first_usable_input_timestamp} (needs seq_len={provider_seq_len})")
except (KeyError, IndexError) as e:
raise ValueError(f"Error determining first usable input time for {provider_name}: {e}")
providers_data[provider_name] = {
'provider': provider_instance,
'df': engineered_df_provider,
'first_usable_input_timestamp': first_usable_input_timestamp
}
logger.info(f"Successfully processed provider '{provider_name}'.")
except Exception as e:
logger.error(f"Failed to process provider '{provider_name}': {e}. Skipping.", exc_info=True)
if provider_name in providers_data: del providers_data[provider_name]
continue
# --- End Loading/Preparing Providers ---
if not providers_data:
logger.critical("No forecast providers successfully created. Exiting.")
exit(1)
successfully_loaded_provider_names = list(providers_data.keys())
logger.info(f"Successfully prepared {len(successfully_loaded_provider_names)} providers: {successfully_loaded_provider_names}")
# --- Determine Simulation Date Range ---
# Find the latest "first usable input timestamp" across all providers
global_first_usable_input_time = max(p_data['first_usable_input_timestamp'] for p_data in providers_data.values())
# The first *decision day* (Day D) must be such that the forecast input time (e.g., D 23:00)
# is at or after global_first_usable_input_time.
# The first *target day* (Day D+1) must have 24h of actual prices available.
first_decision_day = (global_first_usable_input_time + timedelta(hours=1)).normalize() # Start of Day D+1
first_target_day_start = first_decision_day
# The last *target day* (Day D+1) must end within the historical data
last_target_day_end = full_historical_df.index.max().normalize() + timedelta(hours=23) # End of the last full day in data
last_target_day_start = last_target_day_end.normalize()
last_decision_day = last_target_day_start - timedelta(days=1)
# Create the range of target day start times
simulation_target_days = pd.date_range(
start=first_target_day_start,
end=last_target_day_start,
freq='D' # Daily frequency
)
if simulation_target_days.empty or len(simulation_target_days) < 1:
logger.critical(f"Not enough data for DAA simulation. First possible target day: {first_target_day_start}, Last target day: {last_target_day_start}. Check data length, lookbacks.")
exit(1)
logger.info(f"Evaluating DAA strategy for target days from {simulation_target_days.min().strftime('%Y-%m-%d')} to {simulation_target_days.max().strftime('%Y-%m-%d')} ({len(simulation_target_days)} days).")
# --- DAA Evaluation Loop ---
daily_results_list = []
# Initialize battery states for baseline and each provider
# We need to store the state *at the start* of the target day
current_b_states = {
'baseline': optimization_config.initial_b,
**{name: optimization_config.initial_b for name in successfully_loaded_provider_names}
}
for target_day_start in tqdm(simulation_target_days, desc="Simulating DAA Days"):
decision_day = target_day_start - timedelta(days=1)
target_day_end = target_day_start + timedelta(hours=OPTIMIZATION_HORIZON_HOURS - 1)
logger.debug(f"Processing Target Day: {target_day_start.strftime('%Y-%m-%d')}")
# Timestamp for forecast generation input (e.g., end of decision day)
# This is t=0 for the forecast predicting target_day_start onwards
forecast_input_end_time = target_day_start - timedelta(hours=1) # e.g., Day D 23:00
# Check if forecast_input_end_time is valid for all providers
if forecast_input_end_time < global_first_usable_input_time:
logger.warning(f"Skipping target day {target_day_start.strftime('%Y-%m-%d')}: Forecast input time {forecast_input_end_time} is before required {global_first_usable_input_time}.")
continue
# Get actual prices for the target day
try:
actual_prices_target_day = full_historical_df.loc[target_day_start:target_day_end, target_col].values
if len(actual_prices_target_day) != OPTIMIZATION_HORIZON_HOURS:
logger.warning(f"Skipping target day {target_day_start.strftime('%Y-%m-%d')}: Actual prices slice length {len(actual_prices_target_day)} != {OPTIMIZATION_HORIZON_HOURS}. Check data continuity.")
continue
except Exception as e:
logger.warning(f"Skipping target day {target_day_start.strftime('%Y-%m-%d')}: Could not retrieve actual prices. Error: {e}")
continue
# Get last actual price at forecast input time (needed for potential interpolation)
try:
last_actual_price_anchor = full_historical_df.loc[forecast_input_end_time, target_col]
if pd.isna(last_actual_price_anchor):
logger.warning(f"Last actual price at {forecast_input_end_time} is NaN. Cannot use as anchor. Skipping day.")
continue
except KeyError:
logger.error(f"Cannot get last actual price at timestamp {forecast_input_end_time}. Skipping day.")
continue
except Exception as e:
logger.error(f"Error getting last actual price at {forecast_input_end_time}: {e}. Skipping day.")
continue
# --- Store results for this day ---
day_results = {
'decision_day': decision_day,
'target_day_start': target_day_start,
'actual_prices': actual_prices_target_day.tolist()
}
# --- Baseline Optimization (Perfect Foresight for Target Day) ---
logger.debug(f"Running baseline optimization for target day {target_day_start.strftime('%Y-%m-%d')}")
initial_b_state_baseline = current_b_states['baseline']
try:
# 1. Optimize using actual prices
baseline_status, _, baseline_P_perfect, baseline_B_perfect = solve_battery_optimization_hourly(
actual_prices_target_day, initial_b_state_baseline, optimization_config.max_capacity, optimization_config.max_rate
)
if baseline_status.lower() != "optimal":
logger.warning(f"Baseline optimization non-optimal ({baseline_status}) for target day {target_day_start.strftime('%Y-%m-%d')}")
# 2. Simulate the resulting schedule
if baseline_P_perfect is not None:
(daily_profit_baseline,
final_b_state_baseline,
executed_p_baseline,
actual_b_baseline) = simulate_daily_schedule(
baseline_P_perfect, actual_prices_target_day, initial_b_state_baseline,
optimization_config.max_capacity, optimization_config.max_rate)
day_results['baseline'] = {
"status": baseline_status,
"daily_profit": daily_profit_baseline,
"planned_P_schedule": baseline_P_perfect.tolist(),
"executed_P_schedule": executed_p_baseline,
"actual_B_schedule_start": actual_b_baseline, # State at start of each hour
"final_B_state": final_b_state_baseline
}
current_b_states['baseline'] = final_b_state_baseline # Update state for next day
logger.debug(f"Baseline daily profit: {daily_profit_baseline:.2f}, Final B: {final_b_state_baseline:.2f}")
else:
raise ValueError("Baseline optimization failed to produce a schedule.")
except Exception as e:
logger.error(f"Baseline simulation failed for target day {target_day_start.strftime('%Y-%m-%d')}: {e}", exc_info=True)
day_results['baseline'] = {"status": "Error", "daily_profit": np.nan, "final_B_state": initial_b_state_baseline}
# Keep previous state if error occurs
current_b_states['baseline'] = initial_b_state_baseline
# --- Forecast Provider Optimizations ---
for provider_name in successfully_loaded_provider_names:
provider_data = providers_data[provider_name]
provider_instance = provider_data['provider']
engineered_df_provider = provider_data['df']
initial_b_state_provider = current_b_states[provider_name]
logger.debug(f"Running DAA forecast and optimization for provider '{provider_name}'")
# 1. Generate 24h forecast for the target day
try:
forecast_prices_input = provider_instance.get_forecast(
engineered_df=engineered_df_provider,
forecast_start_time=forecast_input_end_time, # t=0 for forecast (end of Day D)
optimization_horizon_hours=OPTIMIZATION_HORIZON_HOURS,
last_actual_price=last_actual_price_anchor # Price at t=0
)
if forecast_prices_input is None: raise ValueError("Provider returned None forecast.")
if not isinstance(forecast_prices_input, np.ndarray) or forecast_prices_input.shape != (OPTIMIZATION_HORIZON_HOURS,):
raise ValueError(f"Forecast shape mismatch: Expected {(OPTIMIZATION_HORIZON_HOURS,)}, Got {forecast_prices_input.shape}")
except Exception as e:
logger.error(f"Forecast generation failed for '{provider_name}', target day {target_day_start.strftime('%Y-%m-%d')}: {e}", exc_info=True)
day_results[provider_name] = {"status": "Forecast Error", "daily_profit": np.nan, "final_B_state": initial_b_state_provider}
current_b_states[provider_name] = initial_b_state_provider # Keep previous state
continue
# 2. Run Optimization with Forecast Prices
try:
model_status, _, model_P_planned, model_B_planned = solve_battery_optimization_hourly(
forecast_prices_input, initial_b_state_provider, optimization_config.max_capacity, optimization_config.max_rate
)
if model_status.lower() != "optimal":
logger.warning(f"Provider '{provider_name}' optimization non-optimal ({model_status}) for target day {target_day_start.strftime('%Y-%m-%d')}")
# 3. Simulate the planned schedule against actual prices
if model_P_planned is not None:
(daily_profit_provider,
final_b_state_provider,
executed_p_provider,
actual_b_provider) = simulate_daily_schedule(
model_P_planned, actual_prices_target_day, initial_b_state_provider,
optimization_config.max_capacity, optimization_config.max_rate)
day_results[provider_name] = {
"status": model_status,
"daily_profit": daily_profit_provider,
"planned_P_schedule": model_P_planned.tolist(),
"executed_P_schedule": executed_p_provider,
"actual_B_schedule_start": actual_b_provider,
"final_B_state": final_b_state_provider,
"forecast_prices": forecast_prices_input.tolist() # Store forecast for analysis
}
current_b_states[provider_name] = final_b_state_provider # Update state
logger.debug(f"Provider '{provider_name}' daily profit: {daily_profit_provider:.2f}, Final B: {final_b_state_provider:.2f}")
else:
raise ValueError("Provider optimization failed to produce a schedule.")
except Exception as e:
logger.error(f"Optimization or Simulation failed for '{provider_name}', target day {target_day_start.strftime('%Y-%m-%d')}: {e}", exc_info=True)
day_results[provider_name] = {"status": "Optimization/Simulation Error", "daily_profit": np.nan, "final_B_state": initial_b_state_provider}
current_b_states[provider_name] = initial_b_state_provider # Keep previous state
# Append results for this day
daily_results_list.append(day_results)
logger.debug(f"Finished processing target day: {target_day_start.strftime('%Y-%m-%d')}")
logger.info(f"Finished simulating {len(simulation_target_days)} DAA target days.")
# --- Post-processing and Plotting ---
logger.info("Starting DAA results analysis and plotting.")
if not daily_results_list:
logger.warning("No daily results were collected. Skipping analysis.")
exit(0)
# Convert results list to a DataFrame
flat_results = []
for day_res in daily_results_list:
base_info = {'target_day_start': day_res['target_day_start']}
# Add baseline results
flat_results.append({**base_info, 'type': 'baseline', **day_res.get('baseline', {})})
# Add provider results
for provider_name in successfully_loaded_provider_names:
provider_res = day_res.get(provider_name, {"status": "Missing Result", "daily_profit": np.nan})
flat_results.append({**base_info, 'type': provider_name, **provider_res})
results_df = pd.DataFrame(flat_results)
results_df['target_day_start'] = pd.to_datetime(results_df['target_day_start'])
results_df.set_index('target_day_start', inplace=True) # Index by the day being evaluated
# Calculate Cumulative Profit
profit_pivot = results_df.pivot_table(index=results_df.index, columns='type', values='daily_profit')
cumulative_profit_df = profit_pivot.cumsum()
# --- Plotting ---
logger.info("Generating DAA plots.")
output_dir = Path(optimization_config.output_dir) if optimization_config.output_dir else Path(".")
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Saving plots to: {output_dir.resolve()}")
# Plot 1: Daily Profit Over Time
if not profit_pivot.empty and not profit_pivot.isnull().all().all():
fig1, ax = plt.subplots(figsize=(15, 7))
plot_data = profit_pivot.dropna(axis=1, how='all')
if not plot_data.empty:
sns.lineplot(data=plot_data, ax=ax, dashes=False) # Ensure solid lines for clarity
ax.set_xlabel('Target Day')
ax.set_ylabel('Daily Profit (€)')
ax.set_title('DAA Strategy: Daily Profit Comparison')
ax.legend(title='Strategy')
ax.axhline(0, color='grey', linestyle='--', linewidth=0.8) # Add zero line
plt.grid()
plt.tight_layout()
plot_path = output_dir / "daily_profit_daa.png"
plt.savefig(plot_path)
logger.info(f"Daily Profit plot saved to {plot_path}")
plt.close(fig1)
else:
logger.warning("Daily profit data is all NaN after filtering. Skipping Daily Profit plot.")
else:
logger.warning("No valid data available to plot Daily Profit.")
# Plot 2: Cumulative Profit Over Time
if not cumulative_profit_df.empty and not cumulative_profit_df.isnull().all().all():
fig2, ax = plt.subplots(figsize=(15, 7))
plot_data = cumulative_profit_df.dropna(axis=1, how='all')
if not plot_data.empty:
sns.lineplot(data=plot_data, ax=ax, dashes=False)
ax.set_xlabel('Target Day')
ax.set_ylabel('Cumulative Profit (€)')
ax.set_title('DAA Strategy: Cumulative Profit Comparison')
ax.legend(title='Strategy')
plt.grid()
plt.tight_layout()
plot_path = output_dir / "cumulative_profit_daa.png"
plt.savefig(plot_path)
logger.info(f"Cumulative Profit plot saved to {plot_path}")
plt.close(fig2)
else:
logger.warning("Cumulative profit data is all NaN after filtering. Skipping Cumulative Profit plot.")
else:
logger.warning("No valid data available to plot Cumulative Profit.")
# Optional Plot 3: Example Day Schedule (Planned vs Executed vs Actual Price)
# Select a representative day (e.g., first valid day)
example_day_index = results_df.index.unique()[0] # Take the first day
example_day_data = results_df.loc[[example_day_index]] # Select rows for that day
actual_prices_example = daily_results_list[0].get('actual_prices') # Get actual prices for the first day
if actual_prices_example and not example_day_data.empty:
fig3, ax1 = plt.subplots(figsize=(15, 7))
ax2 = ax1.twinx() # for prices
hours = np.arange(OPTIMIZATION_HORIZON_HOURS)
#plot_styles = {'baseline': 'r--', **{name: next(iter(ax1.lines))['color'] + '-' for name in successfully_loaded_provider_names}} # Cycle colors
for provider_type in ['baseline'] + successfully_loaded_provider_names:
provider_row = example_day_data[example_day_data['type'] == provider_type]
if not provider_row.empty:
planned_p = provider_row['planned_P_schedule'].iloc[0]
executed_p = provider_row['executed_P_schedule'].iloc[0]
if isinstance(planned_p, list) and isinstance(executed_p, list) and len(planned_p) == 24 and len(executed_p) == 24:
#color, style = plot_styles[provider_type][:-1], plot_styles[provider_type][-1]
ax1.plot(hours, planned_p, label=f'{provider_type} Planned P', linestyle=':', marker='.') # Dotted for planned
ax1.plot(hours, executed_p, label=f'{provider_type} Executed P', marker='x') # Solid/Dashed for executed
# Plot actual prices
ax2.plot(hours, actual_prices_example, label='Actual Price', color='grey', linestyle='-', marker='o', markersize=4)
ax2.set_ylabel('Price (€/MWh)', color='grey')
ax2.tick_params(axis='y', labelcolor='grey')
# ax2.legend(loc='upper right') # Maybe too crowded with other legends
ax1.set_xlabel('Hour of Day')
ax1.set_ylabel('Power (MW)')
ax1.set_title(f'DAA Strategy: Example Day Schedule ({example_day_index.strftime("%Y-%m-%d")})')
ax1.axhline(0, color='black', linestyle='-', linewidth=0.5) # Zero power line
# Combine legends
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines + lines2, labels + labels2, loc='best')
plt.xticks(hours) # Show all hours
plt.tight_layout()
plot_path = output_dir / "example_schedule_daa.png"
plt.grid()
plt.savefig(plot_path)
logger.info(f"Example Day Schedule plot saved to {plot_path}")
plt.close(fig3)
else:
logger.warning("Could not generate example day schedule plot (data missing).")
# --- Save Results DataFrame (Optional) ---
try:
results_save_path = output_dir / "optimization_results_daa.csv"
results_df_to_save = results_df.reset_index()
# Convert list columns to strings for CSV compatibility
list_cols = ['planned_P_schedule', 'executed_P_schedule', 'actual_B_schedule_start', 'actual_prices', 'forecast_prices']
for col in list_cols:
if col in results_df_to_save.columns:
results_df_to_save[col] = results_df_to_save[col].apply(lambda x: str(x) if isinstance(x, list) else x)
results_df_to_save.to_csv(results_save_path, index=False)
logger.info(f"Saved detailed DAA results DataFrame to {results_save_path}")
except Exception as e:
logger.error(f"Failed to save DAA results DataFrame: {e}", exc_info=True)
logger.info("DAA Evaluation and plotting completed.")