This commit is contained in:
2025-05-02 10:45:06 +02:00
commit 7c9d809a82
29 changed files with 2931 additions and 0 deletions

View File

View File

@ -0,0 +1,168 @@
import logging
from pathlib import Path
import pandas as pd
from typing import Tuple, Optional, Dict, Any
from data_analysis.utils.config_model import settings
logger = logging.getLogger(__name__)
# Define constants for column names related to raw loading
TIME_COL_RAW = "MTU (CET/CEST)"
PRICE_COL_RAW = "Day-ahead Price [EUR/MWh]"
PRICE_COL = "Price" # Standardized column name after processing
def load_and_prepare_data(file_path: Path) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Loads the energy price CSV data, parses the time column, sets a
DatetimeIndex, renames columns, checks frequency, and handles missing values.
Args:
file_path: Path to the input CSV file.
Returns:
A tuple containing:
- pd.DataFrame: Processed DataFrame with DatetimeIndex and 'Price' column.
May include other columns if they exist in the source.
- str | None: Error message if loading fails, otherwise None.
"""
logger.info(f"Attempting to load data from: {file_path.resolve()}")
err = None
df = None
try:
# Load data, assuming header is on the first row
df = pd.read_csv(file_path, header=0)
# Basic check for expected columns
if TIME_COL_RAW not in df.columns or PRICE_COL_RAW not in df.columns:
err = f"Missing expected columns '{TIME_COL_RAW}' or '{PRICE_COL_RAW}' in {file_path}"
logger.error(err)
return None, err
# --- Time Parsing ---
df['StartTime'] = df[TIME_COL_RAW].str.split(' - ', expand=True)[0]
df['Timestamp'] = pd.to_datetime(df['StartTime'], format='%d.%m.%Y %H:%M', errors='coerce')
original_len = len(df)
df = df.dropna(subset=['Timestamp'])
if len(df) < original_len:
logger.warning(f"Dropped {original_len - len(df)} rows due to timestamp parsing errors.")
# --- Set Index and Select Columns ---
df = df.set_index('Timestamp')
# Convert price column to numeric, coercing errors
df[PRICE_COL] = pd.to_numeric(df[PRICE_COL_RAW], errors='coerce')
# Keep the price column and any other potential exogenous columns
# For now, just keep PRICE_COL, drop raw ones. Adapt if exog needed.
cols_to_keep = [PRICE_COL] + [col for col in df.columns if col not in [TIME_COL_RAW, PRICE_COL_RAW, 'StartTime', PRICE_COL]]
df = df[cols_to_keep].copy()
# --- Handle Missing Prices ---
missing_prices = df[PRICE_COL].isnull().sum()
if missing_prices > 0:
logger.warning(f"Found {missing_prices} missing '{PRICE_COL}' values. Forward-filling (ffill).")
df[PRICE_COL] = df[PRICE_COL].ffill()
if df[PRICE_COL].isnull().any():
logger.warning("Missing values remain after ffill. Backward-filling (bfill).")
df[PRICE_COL] = df[PRICE_COL].bfill()
# --- Check Time Index Frequency ---
df = df.sort_index()
inferred_freq = pd.infer_freq(df.index)
if inferred_freq == settings.expected_data_frequency:
logger.info(f"Inferred index frequency matches the expected '{settings.expected_data_frequency}': ({inferred_freq}). Setting frequency as {inferred_freq}.")
df = df.asfreq('h')
missing_after_asfreq = df[PRICE_COL].isnull().sum()
if missing_after_asfreq > 0:
logger.warning(f"{missing_after_asfreq} NaNs appeared after setting frequency to Hourly. Forward-filling again.")
df[PRICE_COL] = df[PRICE_COL].ffill().bfill()
elif inferred_freq:
logger.warning(f"Inferred frequency is '{inferred_freq}', not the expected '{settings.expected_data_frequency}'. Proceeding without setting frequency.")
else:
logger.warning("Could not infer frequency. Check data for gaps or irregularities. Proceeding without setting frequency.")
duplicates = df.index.duplicated().sum()
if duplicates > 0:
logger.warning(f"Found {duplicates} duplicate timestamps. Keeping the first occurrence.")
df = df[~df.index.duplicated(keep='first')]
logger.info(f"Data loaded and prepared. Final shape: {df.shape}")
except FileNotFoundError:
err = f"Data file not found: {file_path}"
logger.error(err)
except Exception as e:
err = f"An unexpected error occurred during data loading/preparation: {e}"
logger.error(err, exc_info=True)
df = None
return df, err
def get_data_summary(df: pd.DataFrame) -> Tuple[Optional[Dict[str, Any]], Optional[str]]:
"""
Generates summary information about the DataFrame.
Args:
df: The input DataFrame.
Returns:
A tuple containing:
- dict | None: Dictionary with summary data ('head', 'tail', 'dtypes', 'missing').
- str | None: Error message, otherwise None.
"""
logger.info("Generating data summary...")
summary = None
err = None
if df is None or df.empty:
return None, "Input DataFrame is empty or None."
try:
summary = {
'head': df.head(),
'tail': df.tail(),
'dtypes': df.dtypes,
'missing': df.isnull().sum()
}
logger.info("Data summary generated.")
except Exception as e:
err = f"Error generating data summary: {e}"
logger.error(err, exc_info=True)
return summary, err
def get_descriptive_stats(df: pd.DataFrame, price_col: str = PRICE_COL) -> Tuple[Optional[pd.Series | pd.DataFrame], Optional[str]]:
"""
Calculates descriptive statistics for specified column(s).
Args:
df: The input DataFrame.
price_col: The name of the column (or list of columns) for stats.
Defaults to the standard 'Price' column.
Returns:
A tuple containing:
- pd.Series | pd.DataFrame | None: Series/DataFrame with descriptive statistics.
- str | None: Error message, otherwise None.
"""
logger.info(f"Calculating descriptive statistics for column(s): '{price_col}'...")
stats = None
err = None
if df is None or df.empty:
return None, "Input DataFrame is empty or None."
try:
# Check if the target column(s) exist
target_cols = [price_col] if isinstance(price_col, str) else price_col
missing_cols = [col for col in target_cols if col not in df.columns]
if missing_cols:
err = f"Column(s) not found in DataFrame: {', '.join(missing_cols)}."
logger.error(err)
return None, err
stats = df[price_col].describe() # .describe() works on Series and DataFrame
logger.info("Descriptive statistics calculated.")
except Exception as e:
err = f"Error calculating descriptive statistics: {e}"
logger.error(err, exc_info=True)
return stats, err

View File

@ -0,0 +1,398 @@
import logging
from pathlib import Path
import pandas as pd
import numpy as np # Import numpy for CI calculation
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Optional, List
# Import analysis tools for plotting results
from statsmodels.tsa.seasonal import DecomposeResult
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf, seasonal_plot
from statsmodels.tsa.stattools import ccf # Import ccf
logger = logging.getLogger(__name__)
# --- Plotting Configuration ---
# Increase default figure size for better readability
plt.rcParams['figure.figsize'] = (15, 7)
# Use a clean style
plt.style.use('seaborn-v0_8-whitegrid')
def _save_plot(fig: plt.Figure, output_path: Path) -> Optional[str]:
"""Helper to save plots and handle errors."""
err = None
try:
fig.tight_layout() # Adjust layout before saving
fig.savefig(output_path, dpi=150, bbox_inches='tight')
logger.info(f"Plot saved to: {output_path}")
plt.close(fig) # Close the figure to free memory
except Exception as e:
err = f"Failed to save plot to {output_path}: {e}"
logger.error(err, exc_info=True)
plt.close(fig) # Still try to close figure on error
return err
def plot_full_time_series(df: pd.DataFrame, price_col: str, output_path: Path) -> Optional[str]:
"""Plots the entire time series."""
logger.info(f"Generating full time series plot to {output_path}")
fig, ax = plt.subplots()
err = None
try:
sns.lineplot(data=df, x=df.index, y=price_col, ax=ax, linewidth=1)
ax.set_title('Full Time Series: Price Over Time')
ax.set_xlabel('Time')
ax.set_ylabel(price_col)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting full time series: {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err
def plot_zoomed_time_series(df: pd.DataFrame, price_col: str, start_date: str, end_date: str, output_path: Path) -> Optional[str]:
"""Plots a specified time range of the series."""
logger.info(f"Generating zoomed time series plot ({start_date} to {end_date}) to {output_path}")
fig, ax = plt.subplots()
err = None
try:
# Ensure start_date and end_date are compatible with index type
df_zoomed = df.loc[start_date:end_date]
if df_zoomed.empty:
err = f"No data found in the specified zoom range: {start_date} to {end_date}"
logger.warning(err) # Use warning for empty range, not necessarily error
plt.close(fig)
return err
sns.lineplot(data=df_zoomed, x=df_zoomed.index, y=price_col, ax=ax, linewidth=1)
ax.set_title(f'Time Series: {start_date} to {end_date}')
ax.set_xlabel('Time')
ax.set_ylabel(price_col)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting zoomed time series: {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err
def plot_boxplot_by_period(df: pd.DataFrame, price_col: str, period: str, output_path: Path) -> Optional[str]:
"""
Generates box plots of the price grouped by a specific time period.
Periods: 'hour', 'dayofweek', 'month', 'year'.
"""
logger.info(f"Generating box plot by {period} to {output_path}")
fig, ax = plt.subplots()
err = None
try:
# Create temporary column for the period
if period == 'hour':
group_col = df.index.hour
title = 'Price Distribution by Hour of Day'
x_label = 'Hour'
elif period == 'dayofweek':
group_col = df.index.dayofweek # Monday=0, Sunday=6
title = 'Price Distribution by Day of Week'
x_label = 'Day of Week (0=Mon, 6=Sun)'
elif period == 'month':
group_col = df.index.month
title = 'Price Distribution by Month'
x_label = 'Month'
elif period == 'year':
group_col = df.index.year
title = 'Price Distribution by Year'
x_label = 'Year'
else:
err = f"Unsupported period '{period}' for boxplot."
logger.error(err)
plt.close(fig)
return err
# Ensure group_col is numeric or categorical for plotting
sns.boxplot(x=group_col, y=df[price_col], ax=ax, palette="viridis", hue=group_col)
ax.set_title(title)
ax.set_xlabel(x_label)
ax.set_ylabel(price_col)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting boxplot by {period}: {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err
# New function signature for seasonal subseries plot
def plot_seasonal_subseries(df: pd.DataFrame, price_col: str, period: int, period_name: str, output_path: Path) -> Optional[str]:
"""
Generates a seasonal subseries plot for a given period (e.g., 24 for daily).
"""
logger.info(f"Generating seasonal subseries plot for {period_name} (period={period}) to {output_path}")
err = None
try:
# Ensure the index is datetime and frequency is set or can be inferred
if not isinstance(df.index, pd.DatetimeIndex):
err = "DataFrame index must be a DatetimeIndex for seasonal subseries plot."
logger.error(err)
return err
# Create the appropriate grouping based on the period
if period == 24: # Daily
grouped = df[price_col].groupby(df.index.hour)
xticklabels = [f"{i:02d}:00" for i in range(24)]
elif period == 168: # Weekly
grouped = df[price_col].groupby(df.index.dayofweek)
xticklabels = ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
else:
# For other periods, create a custom grouping
grouped = df[price_col].groupby(df.index % period)
xticklabels = [str(i) for i in range(period)]
# Create the plot using seasonal_plot
fig = seasonal_plot(grouped, xticklabels=xticklabels, ylabel=price_col)
fig.suptitle(f'Seasonal Subseries Plot ({period_name})', y=1.02)
fig.set_size_inches(15, 10)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting seasonal subseries ({period_name}): {e}"
logger.error(err, exc_info=True)
plt.close('all')
return err
def plot_histogram(df: pd.DataFrame, price_col: str, output_path: Path, bins: int = 50) -> Optional[str]:
"""Plots a histogram of the price values."""
logger.info(f"Generating histogram of '{price_col}' to {output_path}")
fig, ax = plt.subplots()
err = None
try:
sns.histplot(data=df, x=price_col, bins=bins, kde=True, ax=ax)
ax.set_title(f'Distribution of {price_col}')
ax.set_xlabel(price_col)
ax.set_ylabel('Frequency')
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting histogram: {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err
def plot_decomposition(decomposition_result: DecomposeResult, period_name: str, output_path: Path) -> Optional[str]:
"""
Plots the observed, trend, seasonal, and residual components from a
time series decomposition result.
"""
logger.info(f"Generating {period_name} decomposition plot to {output_path}")
err = None
try:
# The plot method of DecomposeResult returns a Figure
fig = decomposition_result.plot()
fig.set_size_inches(15, 10) # Adjust size for better visibility
fig.suptitle(f'Time Series Decomposition ({period_name} Seasonality)', y=1.02)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting decomposition ({period_name}): {e}"
logger.error(err, exc_info=True)
# No access to the fig object if decomposition_result.plot() fails early
# Close all figures as a fallback
plt.close('all')
return err
def plot_residuals(residuals: pd.Series, title_suffix: str, output_path: Path) -> Optional[str]:
"""Plots the residuals over time."""
logger.info(f"Generating residuals plot ({title_suffix}) to {output_path}")
fig, ax = plt.subplots()
err = None
try:
residuals.plot(ax=ax, title=f'Residuals ({title_suffix})')
ax.set_xlabel('Time')
ax.set_ylabel('Residual Value')
# Add a horizontal line at zero
ax.axhline(0, color='r', linestyle='--', alpha=0.7)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting residuals ({title_suffix}): {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err
def plot_acf_pacf(series: pd.Series, series_name: str, lags: int | None, output_path_base: Path) -> Optional[str]:
"""
Plots the Autocorrelation Function (ACF) and Partial Autocorrelation
Function (PACF) for a given series, saving them as separate files.
"""
logger.info(f"Generating ACF/PACF plots for {series_name} to {output_path_base.parent}")
err_acf = None
err_pacf = None
# Plot ACF
try:
fig_acf = plt.figure()
ax_acf = fig_acf.add_subplot(111)
plot_acf(series, lags=lags, ax=ax_acf, title=f'ACF - {series_name}')
acf_path = output_path_base.with_name(f"{output_path_base.stem}_acf.png")
err_acf = _save_plot(fig_acf, acf_path)
except Exception as e:
err_acf = f"Error plotting ACF for {series_name}: {e}"
logger.error(err_acf, exc_info=True)
plt.close(fig_acf)
# Plot PACF
try:
fig_pacf = plt.figure()
ax_pacf = fig_pacf.add_subplot(111)
# Use method='ywm' for Yule-Walker method, often preferred
plot_pacf(series, lags=lags, ax=ax_pacf, title=f'PACF - {series_name}', method='ywm')
pacf_path = output_path_base.with_name(f"{output_path_base.stem}_pacf.png")
err_pacf = _save_plot(fig_pacf, pacf_path)
except Exception as e:
err_pacf = f"Error plotting PACF for {series_name}: {e}"
logger.error(err_pacf, exc_info=True)
plt.close(fig_pacf)
# Return the first error encountered, or None if both succeeded
return err_acf or err_pacf
# Update cross-correlation plot function
def plot_cross_correlation(
target_series: pd.Series,
exog_series: pd.Series,
target_name: str,
exog_name: str,
max_lags: int,
output_path: Path
) -> Optional[str]:
"""
Generates and saves a cross-correlation plot between a target series and an exogenous series.
Plots correlation of target_series(t) with exog_series(t-lag).
Args:
target_series: The main time series to analyze
exog_series: The exogenous time series to correlate with
target_name: Name of the target series for labeling
exog_name: Name of the exogenous series for labeling
max_lags: Maximum number of lags to compute correlation for
output_path: Where to save the plot
Returns:
Optional[str]: Error message if something went wrong, None if successful
"""
logger.info(f"Generating cross-correlation plot ({target_name} vs {exog_name}) for lags up to {max_lags} to {output_path}")
err = None
try:
# Ensure series are aligned and have no NaNs affecting calculation
combined = pd.concat([target_series.rename(target_name), exog_series.rename(exog_name)], axis=1).dropna()
# Check if we have enough data points
if combined.empty or len(combined) <= max_lags:
err = f"Not enough overlapping non-NaN data points between {target_name} and {exog_name} for CCF calculation (need > {max_lags})."
# Will warn above!
# logger.warning(err)
return err
# Check if the exogenous variable actually varies
if exog_series.nunique() <= 1:
err = f"Cannot compute cross-correlation: {exog_name} has no variation (all values are the same)."
# Will warn above!
# logger.warning(err)
return err
# Calculate CCF: ccf(x, y) computes corr(x[t], y[t-lag])
# We want corr(target[t], exog[t-lag]), so order is ccf(target, exog)
cross_corr_values = ccf(combined[target_name], combined[exog_name], adjusted=False, nlags=max_lags)
lags_range = range(max_lags + 1) # CCF includes lag 0
# Plotting
fig, ax = plt.subplots()
markerline, stemlines, baseline = ax.stem(
lags_range, cross_corr_values, markerfmt='o', basefmt="gray"
)
plt.setp(markerline, markersize=5)
plt.setp(stemlines, linewidth=1)
# Add approximate 95% confidence intervals (Bartlett's formula approximation)
conf_level = 1.96 / np.sqrt(len(combined))
ax.axhspan(-conf_level, conf_level, alpha=0.2, color='blue', zorder=0)
ax.set_title(f'Cross-Correlation: {target_name}(t) vs {exog_name}(t-lag)')
ax.set_xlabel('Lag (k)')
ax.set_ylabel(f'Corr({target_name}(t), {exog_name}(t-k))')
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting cross-correlation ({target_name} vs {exog_name}): {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err
def plot_weekly_autocorrelation(
series: pd.Series,
series_name: str,
output_path: Path,
max_weeks: int = 4
) -> Optional[str]:
"""
Generates and saves an autocorrelation plot between a series and its weekly lags.
This helps identify weekly seasonality patterns.
Args:
series: The time series to analyze
series_name: Name of the series for labeling
output_path: Where to save the plot
max_weeks: Maximum number of weeks to look back (default: 4)
Returns:
Optional[str]: Error message if something went wrong, None if successful
"""
logger.info(f"Generating weekly autocorrelation plot for {series_name} up to {max_weeks} weeks to {output_path}")
err = None
try:
# Ensure series has no NaNs
series = series.dropna()
if series.empty:
err = f"Series {series_name} is empty after dropping NaNs."
logger.warning(err)
return err
# Calculate weekly lags (168 hours = 1 week)
hours_per_week = 24 * 7
max_lags = max_weeks * hours_per_week
# Calculate autocorrelation
autocorr_values = ccf(series, series, adjusted=False, nlags=max_lags)
lags_range = list(range(0, min(max_lags + 1, autocorr_values.size - 1), hours_per_week)) # Only plot weekly intervals
# Plotting
fig, ax = plt.subplots()
markerline, stemlines, baseline = ax.stem(
[lag/hours_per_week for lag in lags_range], # Convert to weeks for x-axis
autocorr_values[lags_range],
markerfmt='o',
basefmt="gray"
)
plt.setp(markerline, markersize=5)
plt.setp(stemlines, linewidth=1)
# Add approximate 95% confidence intervals
conf_level = 1.96 / np.sqrt(len(series))
ax.axhspan(-conf_level, conf_level, alpha=0.2, color='blue', zorder=0)
ax.set_title(f'Weekly Autocorrelation: {series_name}')
ax.set_xlabel('Lag (weeks)')
ax.set_ylabel(f'Corr({series_name}(t), {series_name}(t-lag))')
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
# Add vertical lines at each week
for week in range(max_weeks + 1):
ax.axvline(x=week, color='gray', linestyle=':', alpha=0.3)
err = _save_plot(fig, output_path)
except Exception as e:
err = f"Error plotting weekly autocorrelation for {series_name}: {e}"
logger.error(err, exc_info=True)
plt.close(fig)
return err

556
data_analysis/io/report.py Normal file
View File

@ -0,0 +1,556 @@
import datetime
import logging
import re
import subprocess
from pathlib import Path
from typing import Optional, Dict, Any
import shutil
import pandas as pd
from data_analysis.utils.config_model import settings # Assuming settings are configured
from data_analysis.utils.report_model import ReportData
logger = logging.getLogger(__name__)
# --- Helper function to format DataFrames/Series as LaTeX tables ---
CHARS = {
'&': r'\&',
'%': r'\%',
'$': r'\$',
'#': r'\#',
'_': r'\_',
'{': r'\{',
'}': r'\}',
'~': r'\textasciitilde{}',
'^': r'\^{}',
'\\': r'\textbackslash{}',
'<': r'\textless{}',
'>': r'\textgreater{}',
}
def _escape_latex(text: str) -> str:
"""Escapes special LaTeX characters in a string."""
# Convert input to string first to handle potential non-string types
t = str(text)
# Use a compiled regex for efficiency if called many times
# The pattern needs to be carefully ordered to handle overlapping keys (e.g., '\' vs '\\') correctly,
# although the current CHARS doesn't have overlaps. Sorting by length desc is safest.
pattern = re.compile('|'.join(re.escape(str(key)) for key in sorted(CHARS.keys(), key=lambda item: - len(item))))
t = pattern.sub(lambda match: CHARS[match.group()], t)
return t
def dataframe_to_latex(df: Optional[pd.DataFrame], title: Optional[str] = None, caption: Optional[str] = None, label: Optional[str] = None, escape: bool = True) -> Optional[str]:
"""Converts a pandas DataFrame to a LaTeX tabular environment using booktabs."""
if df is None or df.empty:
return None
# Prepare DataFrame for LaTeX conversion
df_copy = df.copy()
# Include index if it's named or not a simple RangeIndex
include_index = df_copy.index.name is not None or not isinstance(df_copy.index, pd.RangeIndex)
# Escape column names and data if required
if escape:
# Ensure column names are strings before escaping
df_copy.columns = [_escape_latex(str(col)) for col in df_copy.columns]
if include_index and df_copy.index.name:
# Ensure index name is a string before escaping
df_copy.index.name = _escape_latex(str(df_copy.index.name))
# Escape data - map works element-wise, ensure elements are str first if necessary
# Using applymap instead of map for broader compatibility
df_copy = df_copy.map(lambda x: _escape_latex(str(x)))
# Determine column format (e.g., 'llr' for left, left, right)
# Default to left-aligned ('l') for all columns
num_cols = len(df_copy.columns) + (1 if include_index else 0)
col_format = "l" * num_cols
try:
# Ensure title and caption are escaped if they exist and escape=True was requested
# However, dataframe_to_latex itself handles caption/label escaping internally if its `escape` is True.
# We are setting escape=False because we do it manually above.
# If a title is provided separately, it should be escaped before adding.
escaped_title = _escape_latex(str(title)) if title and escape else title
escaped_caption = _escape_latex(str(caption)) if caption and escape else caption
latex_str = df_copy.to_latex(
index=include_index,
escape=False, # We already escaped manually if escape=True
column_format=col_format,
header=True,
# Pass potentially pre-escaped caption/title to to_latex's caption
caption=escaped_caption if escaped_caption else escaped_title,
label=f"tab:{label}" if label else None,
position='!htbp', # Placement suggestion
)
# Add the pre-escaped title above the table if provided and different from caption
if escaped_title and escaped_title != escaped_caption:
# Ensure title is treated as LaTeX command if needed, or just text
# Using \textbf might require braces if title contains commands
latex_str = fr"\textbf{{{escaped_title}}}\par\par\medskip{latex_str}" # Already escaped title
return latex_str
except Exception as e:
logger.error(f"Failed to convert DataFrame to LaTeX: {e}", exc_info=True)
# Escape the error message itself for safe inclusion in LaTeX
return fr"\textit{{Error generating LaTeX table: {_escape_latex(str(e))}}}"
def series_to_latex(series: Optional[pd.Series], title: Optional[str] = None, caption: Optional[str] = None, label: Optional[str] = None, escape: bool = True) -> str:
"""Converts a pandas Series to a LaTeX table (two columns: Index, Value)."""
if series is None or series.empty:
# Ensure the default string is safe for LaTeX
return r"\textit{N/A}\par"
# Convert series to DataFrame
df = series.reset_index()
# Use clear default column names if none exist, ensure they are strings
index_name = str(series.index.name) if series.index.name else 'Index'
value_name = str(series.name) if series.name else 'Value'
df.columns = [index_name, value_name]
# Delegate to dataframe_to_latex, passing the escape parameter
return dataframe_to_latex(df, title=title, caption=caption, label=label, escape=escape)
# --- Report Generation Function (LaTeX) ---
def compile_latex_report(report_tex_path: Path, output_dir: Path) -> bool:
"""
Attempts to compile the LaTeX report using the local LaTeX installation.
Args:
report_tex_path: Path to the .tex file
output_dir: Directory where the PDF should be saved
Returns:
bool: True if compilation was successful, False otherwise
"""
logger.info(f"Attempting to compile LaTeX report: {report_tex_path}")
# Create necessary directories
reports_dir = output_dir / "reports"
tmp_dir = output_dir / "_tmp"
reports_dir.mkdir(parents=True, exist_ok=True)
tmp_dir.mkdir(parents=True, exist_ok=True)
try:
# Run pdflatex twice to ensure proper references and table of contents
for i in range(2):
logger.info(f"Running pdflatex (attempt {i+1}/2)...")
result = subprocess.run(
["pdflatex", "-interaction=nonstopmode", "-output-directory", str(tmp_dir), str(report_tex_path)],
capture_output=False if settings.debug else True,
text=True
)
if result.returncode != 0:
logger.error(f"LaTeX compilation failed (attempt {i+1})")
return False
# Move the PDF to the reports directory
pdf_path = tmp_dir / f"{report_tex_path.stem}.pdf"
if pdf_path.exists():
target_pdf = reports_dir / "report.pdf"
shutil.move(str(pdf_path), str(target_pdf))
logger.info(f"Successfully compiled and moved report to: {target_pdf}")
# Clean up the _tmp directory
shutil.rmtree(tmp_dir)
logger.info("Cleaned up temporary LaTeX files")
return True
else:
logger.error(f"Expected PDF file not found: {pdf_path}")
return False
except FileNotFoundError:
logger.error("pdflatex command not found. Please ensure LaTeX is installed and in your PATH.")
return False
except Exception as e:
logger.error(f"Unexpected error during LaTeX compilation: {e}")
return False
def get_plot_path(key: str, plot_paths: Optional[Dict[str, str]]) -> str:
"""Get the correct path for a plot file."""
if plot_paths is None:
# Return placeholder if the entire dictionary is missing
return "reports/plots/placeholder.png"
# Lookup the specific filename using the key
filename = plot_paths.get(key)
# Construct path or return placeholder if key wasn't found
return f"reports/plots/{filename}" if filename else "reports/plots/placeholder.png"
def _format_latex_command(macro_name: str, value: str) -> str:
"""Formats a LaTeX \newcommand definition. Assumes value is correctly escaped/formatted."""
# Creates \newcommand{\macroName}{value}
# Using simple string concatenation to avoid f-string/raw-string issues.
return "\\newcommand{\\" + macro_name + "}{" + value + "}"
def _format_stationarity_results(results: Optional[Dict[str, Any]], test_name: str) -> str:
"""Formats stationarity test results dictionary into a LaTeX string."""
default_na = r"\textit{N/A}"
if not results:
return default_na
test_data = results.get(test_name.lower())
if not test_data:
return default_na
# Ensure keys and values are escaped correctly *before* creating the Series
formatted_data = {}
for key, value in test_data.items():
escaped_key = _escape_latex(str(key)) # Escape the key
if isinstance(value, dict): # Handle Critical Values
# Escape keys and format values within the string
cv_str = ", ".join([f"{_escape_latex(k)}: {v:.3f}" for k, v in value.items()])
formatted_data[escaped_key] = cv_str
elif isinstance(value, (int, float)):
# Apply specific formatting for p-value and test statistic
if 'p-value' in key.lower():
formatted_data[escaped_key] = f"{value:.4f}"
elif 'statistic' in key.lower():
formatted_data[escaped_key] = f"{value:.3f}"
else:
# Convert non-float numbers to string
formatted_data[escaped_key] = str(value)
else:
# Escape other string values
formatted_data[escaped_key] = _escape_latex(str(value))
if not formatted_data:
return default_na
series = pd.Series(formatted_data)
series.name = "Value" # This name doesn't get escaped by default in series_to_latex
series.index.name = "Metric" # This name doesn't get escaped by default in series_to_latex
# Use series_to_latex for table structure, disable its internal escaping
# as we have already escaped the content. Title also needs pre-escaping.
escaped_title = _escape_latex(f"{test_name.upper()} Test Results")
return series_to_latex(series, title=escaped_title, label=f"{test_name.lower()}_results", escape=False)
def generate_latex_report(
output_dir: Path,
df: Optional[pd.DataFrame],
report_data: ReportData,
series_name_stat: Optional[str],
acf_pacf_plot_paths: Optional[Dict[str, str]] = None,
decomposition_plot_paths: Optional[Dict[str, str]] = None,
other_plot_paths: Optional[Dict[str, str]] = None,
decomposition_model: str = 'additive',
acf_pacf_lags: Optional[int] = 48,
template_path: Path = Path("data_analysis/utils/_latex_report_template.tex")
):
"""Generates the LaTeX report (.tex file) by filling the template using macros."""
logger.info(f"Generating LaTeX EDA report using template: {template_path.resolve()}")
reports_dir = output_dir / "reports"
source_plots_dir = reports_dir / "plots" # Define source plot dir
tmp_dir = output_dir / "_tmp"
tmp_plots_dir = tmp_dir / "plots" # Define target plot dir within tmp
reports_dir.mkdir(parents=True, exist_ok=True)
tmp_dir.mkdir(parents=True, exist_ok=True)
# Ensure the target plot dir exists and is empty before copying
if tmp_plots_dir.exists():
shutil.rmtree(tmp_plots_dir)
tmp_plots_dir.mkdir()
shutil.copytree( output_dir / "plots", tmp_plots_dir, dirs_exist_ok=True)
report_tex_path = tmp_dir / "eda_report.tex"
if not template_path.exists():
logger.error(f"Report template not found at {template_path.resolve()}. Cannot generate report.")
raise FileNotFoundError(f"Report template not found: {template_path.resolve()}")
try:
with open(template_path, 'r', encoding='utf-8') as f:
template = f.read()
except Exception as e:
logger.error(f"Failed to read report template {template_path.resolve()}: {e}", exc_info=True)
raise IOError(f"Failed to read report template {template_path.resolve()}: {e}") from e
# --- Prepare LaTeX Definitions ---
latex_definitions = []
default_na = r"\textit{N/A}"
default_text = r"\textit{Not provided - requires manual interpretation or more data.}\medskip"
# Refined helper to add definitions
def add_def(macro_name: str, value: Optional[Any], formatter=None, default=default_na, escape_if_plain: bool = True):
"""
Adds a LaTeX definition. Handles None values, applies formatter if provided,
and escapes the result if it's considered plain text.
Args:
macro_name: The name of the LaTeX macro (without backslash).
value: The value for the macro.
formatter: A function to format the value (e.g., dataframe_to_latex).
If None, str() is used. If the formatter returns LaTeX code,
set escape_if_plain=False.
default: The default string to use if value is None. Assumed safe for LaTeX.
escape_if_plain: If True and the final value is not known to be LaTeX
(i.e., not from specific formatters or defaults), apply _escape_latex.
"""
final_str = default
is_known_latex = False
if value is not None:
if formatter:
final_str = formatter(value)
# Assume formatters producing tables/complex output return valid LaTeX
if formatter in [dataframe_to_latex, series_to_latex, _format_stationarity_results]:
is_known_latex = True
else:
final_str = str(value) # Default to string conversion
else:
# Value is None, using default. Check if default is known LaTeX.
if default in [default_na, default_text]:
is_known_latex = True
# Convert to string one last time in case formatter returned non-string
final_str = str(final_str)
# Escape the result *unless* it's known LaTeX or escaping is turned off
if escape_if_plain and not is_known_latex:
final_str = _escape_latex(final_str)
latex_definitions.append(_format_latex_command(macro_name, final_str))
# Helper for paths - Now points to plots/filename within the _tmp directory
# Uses example-image-a as the default placeholder
def add_path_def(macro_name: str, path_dict: Optional[Dict[str, str]], key: str, default_filename='example-image-a'): # Changed default
filename = default_filename
is_placeholder = True # Flag to track if we're using the placeholder
source_filename = None
if path_dict and key in path_dict and path_dict[key]:
actual_filename_from_dict = Path(path_dict[key]).name
if actual_filename_from_dict: # Check if it's not an empty string
filename = actual_filename_from_dict
source_filename = path_dict[key] # Keep original potentially relative path for source lookup
is_placeholder = False
# else: filename remains default_filename ('example-image-a')
# Construct path for \includegraphics
# If it's a real plot, use the "plots/" prefix for the copied location.
# If it's the placeholder, use the name directly (LaTeX finds it).
if not is_placeholder:
formatted_path = f"plots/{filename}".replace('\\', '/')
else:
# Ensure placeholder name itself doesn't get 'plots/' prefix
formatted_path = Path(filename).name # Use Path().name just in case
# Pass the path string to add_def, explicitly disable escaping
add_def(macro_name, formatted_path, escape_if_plain=False)
# Copy the actual plot file only if it's NOT the placeholder
if not is_placeholder and source_filename:
# Resolve source relative to the main reports/plots dir
source_file_path = source_plots_dir / Path(source_filename).name
target_file_path = tmp_plots_dir / filename # Target uses just the filename
if source_file_path.is_file():
try:
shutil.copy2(source_file_path, target_file_path)
except Exception as copy_e:
logger.warning(f"Could not copy plot file {source_file_path} to {target_file_path}: {copy_e}")
# else: # Optionally log if source plot missing
# logger.warning(f"Source plot file not found: {source_file_path}")
# Return the boolean flag indicating if it was a real plot or placeholder
return not is_placeholder
# --- Generate Definitions using the new add_def ---
# Basic Info
add_def("reportDateGenerated", datetime.date.today(), formatter=lambda d: d.strftime("%Y-%m-%d"))
add_def("dataSourceDescription", f"Hourly prices from {settings.data_file.name}")
add_def("priceVariableName", settings.data_file.stem)
# Info from DataFrame
if df is not None and not df.empty:
add_def("dateRangeStart", df.index.min().date())
add_def("dateRangeEnd", df.index.max().date())
add_def("numDataPoints", len(df))
freq_info = "Irregular/Not Inferred"
if isinstance(df.index, pd.DatetimeIndex):
try:
inferred = pd.infer_freq(df.index)
freq_info = inferred if inferred else freq_info
except Exception: # Handle potential errors in infer_freq
logger.warning("Could not infer frequency.", exc_info=True)
add_def("timeIndexFrequency", f"Hourly (Inferred: {freq_info})")
add_def("timeIndexConfirmation", f"DatetimeIndex, Hourly (Inferred: {freq_info})")
# Escape column names individually before joining
all_cols_str = ", ".join([_escape_latex(str(c)) for c in df.columns])
add_def("otherColumnsList", all_cols_str if all_cols_str else "None", escape_if_plain=False) # Already escaped
else:
add_def("dateRangeStart", None, default=default_na)
add_def("dateRangeEnd", None, default=default_na)
add_def("numDataPoints", None, default=default_na)
add_def("timeIndexFrequency", None, default=default_na)
add_def("timeIndexConfirmation", None, default=default_na)
add_def("otherColumnsList", "None") # Simple string, escape
# Section 1 Tables
summary_data = report_data.summary_data or {}
add_def("tableHeadData", summary_data.get('head'),
formatter=lambda df_val: dataframe_to_latex(df_val, title="First 5 Rows", label="head", escape=True),
escape_if_plain=False, default=default_na)
add_def("tableTailData", summary_data.get('tail'),
formatter=lambda df_val: dataframe_to_latex(df_val, title="Last 5 Rows", label="tail", escape=True),
escape_if_plain=False, default=default_na)
add_def("tableDtypesInfo", summary_data.get('dtypes'),
formatter=lambda s: series_to_latex(s, title="Data Types", label="dtypes", escape=True),
escape_if_plain=False, default=default_na)
# Section 2 Tables
desc_stats = report_data.descriptive_stats or {}
escaped_desc_title = _escape_latex(f"Descriptive Statistics ({settings.data_file.stem})")
add_def("tableDescriptiveStats", desc_stats.get('desc_price'),
formatter=lambda s: series_to_latex(s, title=escaped_desc_title, label="desc_price", escape=True),
escape_if_plain=False, default=default_na)
missing_counts = summary_data.get('missing')
add_def("tableMissingCounts", missing_counts,
formatter=lambda s: series_to_latex(s, title="Missing Value Counts (Post-Imputation)", label="missing_counts", escape=True),
escape_if_plain=False, default=default_na)
missing_pct = None
if missing_counts is not None and df is not None and len(df) > 0:
missing_pct = (missing_counts / len(df)) * 100
missing_pct = missing_pct.round(3)
add_def("tableMissingPercentages", missing_pct,
formatter=lambda s: series_to_latex(s, title="Missing Value Percentage (Post-Imputation)", label="missing_pct", escape=True),
escape_if_plain=False, default=default_na)
add_def("missingValuesObservations", report_data.imputation_message, default="Missing value check information not available.")
# Section 3 Plots
add_path_def("plotFullTimeseries", other_plot_paths, 'full_timeseries')
# Capture the return value of add_path_def to see if a real plot was added
show_zoomed = add_path_def("plotZoomedTimeseries", other_plot_paths, 'zoomed_timeseries')
add_def("ifShowZoomedTimeseries", "true" if show_zoomed else "false", escape_if_plain=False) # Add boolean macro
add_path_def("plotHistogram", other_plot_paths, 'histogram_price')
add_path_def("plotBoxplotHour", other_plot_paths, 'boxplot_hour')
add_path_def("plotBoxplotDayofweek", other_plot_paths, 'boxplot_dayofweek')
add_path_def("plotBoxplotMonth", other_plot_paths, 'boxplot_month')
add_path_def("plotBoxplotYear", other_plot_paths, 'boxplot_year')
add_path_def("plotSeasonalSubseriesDaily", other_plot_paths, 'seasonal_subseries_daily')
add_path_def("plotSeasonalSubseriesWeekly", other_plot_paths, 'seasonal_subseries_weekly')
add_def("seasonalInteractionsObservations", None, default=default_text, escape_if_plain=False)
# Section 4 Decomposition
add_def("decompositionMethodDetails", f"Statsmodels seasonal_decompose (model='{decomposition_model}')")
add_path_def("plotDecompositionDaily", decomposition_plot_paths, 'daily')
add_path_def("plotDecompositionWeekly", decomposition_plot_paths, 'weekly')
# Capture the return value for yearly decomp
show_yearly = add_path_def("plotDecompositionYearly", decomposition_plot_paths, 'yearly')
add_def("ifShowYearlyDecomp", "true" if show_yearly else "false", escape_if_plain=False) # Add boolean macro
add_def("decompositionObservations", None, default=default_text, escape_if_plain=False)
# Section 5 Stationarity
stationarity_tests = report_data.stationarity_tests or {}
add_def("stationaritySeriesTested", series_name_stat)
add_path_def("plotResiduals", other_plot_paths, 'residuals')
add_def("tableAdfResults", stationarity_tests,
formatter=lambda tests: _format_stationarity_results(tests, "ADF"),
escape_if_plain=False, default=default_na)
add_def("tableKpssResults", stationarity_tests,
formatter=lambda tests: _format_stationarity_results(tests, "KPSS"),
escape_if_plain=False, default=default_na)
findings_summary = r"\textit{Analysis requires both ADF and KPSS results.}"
try:
adf_res = stationarity_tests.get('adf')
kpss_res = stationarity_tests.get('kpss')
adf_p = adf_res.get('p-value') if adf_res else None
kpss_p = kpss_res.get('p-value') if kpss_res else None
if adf_p is not None and kpss_p is not None:
if adf_p < 0.05 and kpss_p >= 0.05:
findings_summary = "Tests suggest the series is stationary (ADF rejects H0, KPSS fails to reject H0)."
elif adf_p >= 0.05 and kpss_p < 0.05:
findings_summary = "Tests suggest the series is non-stationary (trend-stationary) and requires differencing (ADF fails to reject H0, KPSS rejects H0)."
elif adf_p < 0.05 and kpss_p < 0.05:
findings_summary = "Test results conflict: ADF suggests stationarity, KPSS suggests non-stationarity. May indicate difference-stationarity."
else:
findings_summary = "Tests suggest the series is non-stationary (unit root present) and requires differencing (Both fail to reject H0)."
elif adf_p is not None:
findings_summary = f"ADF test p-value: {adf_p:.4f}. Stationarity conclusion requires KPSS test."
elif kpss_p is not None:
findings_summary = f"KPSS test p-value: {kpss_p:.4f}. Stationarity conclusion requires ADF test."
except Exception as e:
logger.warning(f"Could not generate stationarity summary: {e}")
findings_summary = r"\textit{Error generating summary.}"
add_def("stationarityFindingsSummary", findings_summary)
# Section 6 Autocorrelation
add_def("autocorrSeriesAnalyzed", series_name_stat)
add_def("autocorrLagsShown", acf_pacf_lags)
add_path_def("plotAcf", acf_pacf_plot_paths, 'acf')
add_path_def("plotPacf", acf_pacf_plot_paths, 'pacf')
add_def("autocorrObservations", None, default=default_text, escape_if_plain=False)
# Section 7 Summary & Implications
add_def("summaryTrendCycles", None, default=default_text, escape_if_plain=False)
add_def("summarySeasonality", None, default=default_text, escape_if_plain=False)
add_def("summaryStationarity", None, default=default_text, escape_if_plain=False)
add_def("summaryAutocorrelations", None, default=default_text, escape_if_plain=False)
add_def("summaryOutliersVolatility", None, default=default_text, escape_if_plain=False)
add_def("implicationsModelChoice", None, default=default_text, escape_if_plain=False)
add_def("implicationsFeatureEngineering", None, default=default_text, escape_if_plain=False)
add_def("implicationsPreprocessing", None, default=default_text, escape_if_plain=False)
add_def("implicationsEvaluation", None, default=default_text, escape_if_plain=False)
add_def("implicationsProbabilistic", None, default=default_text, escape_if_plain=False)
# Section 8 Conclusion
add_def("conclusionStatement", None, default=default_text, escape_if_plain=False)
# --- Apply Definitions to Template ---
definitions_block = "\n".join(latex_definitions)
if "{{LATEX_DEFINITIONS}}" not in template:
logger.error("Placeholder '{{LATEX_DEFINITIONS}}' not found in the LaTeX template preamble.")
raise ValueError("Template missing '{{LATEX_DEFINITIONS}}' placeholder in preamble.")
report_content = template.replace("{{LATEX_DEFINITIONS}}", definitions_block)
# --- Write Report ---
try:
with open(report_tex_path, 'w', encoding='utf-8') as f:
f.write(report_content)
logger.info(f"Successfully generated LaTeX report source: {report_tex_path}")
# --- Copy Plots ---
# This is now handled within add_path_def to copy files individually
# logger.info(f"Copying plots from {source_plots_dir} to {tmp_plots_dir}")
# try:
# shutil.copytree(source_plots_dir, tmp_plots_dir, dirs_exist_ok=True) # dirs_exist_ok=True allows overwriting
# except FileNotFoundError:
# logger.error(f"Source plots directory not found: {source_plots_dir}")
# raise # Re-raise error if plots dir is essential
# except Exception as e:
# logger.error(f"Failed to copy plots directory: {e}", exc_info=True)
# raise # Re-raise error
# Attempt to compile the report
if compile_latex_report(report_tex_path, output_dir):
logger.info("LaTeX report successfully compiled to PDF")
else:
logger.warning("LaTeX compilation failed. Check logs above. The .tex file is available for manual compilation.")
# Consider raising an error if PDF generation is critical
# raise RuntimeError("LaTeX compilation failed.")
except Exception as e:
logger.error(f"Failed to write LaTeX report to {report_tex_path}: {e}", exc_info=True)
raise IOError(f"Failed to write LaTeX report to {report_tex_path}: {e}") from e