Source code for tools.plots

import seaborn as sns
import scienceplots
import pandas as pd
import warnings
from matplotlib.legend import Legend
import probscale
from tools.helpers import read_era5l
import configparser
from scipy import stats
import numpy as np
from tools.helpers import parquet_to_dict, read_yaml, pickle_to_dict, get_si, matilda_vars, confidence_interval
from tools.indicators import indicator_vars, custom_df_indicators
import warnings
import datetime as dt
from datetime import timedelta
import matplotlib.dates as mdates
from matplotlib.patches import Rectangle
from dash import Dash, dcc, html, Input, Output
from jupyter_server import serverapp
import plotly.graph_objects as go
import plotly.io as pio

try:
    import seaborn as sns
    import matplotlib.pyplot as plt
    import matplotlib as mpl
    import matplotlib.font_manager as fm
    import os

    sns.set_style("white")

    this_dir = os.path.abspath(os.path.dirname(__file__))
    font_path = os.path.join(this_dir, "cmu.0", "cmunrm.ttf")

    font_name = "Arial"  # Fallback
    if os.path.exists(font_path):
        fm.fontManager.addfont(font_path)
        font_prop = fm.FontProperties(fname=font_path)
        font_name = font_prop.get_name()

    mpl.rcParams['text.usetex'] = False
    mpl.rcParams['mathtext.fontset'] = 'cm'
    mpl.rcParams['font.family'] = font_name
    mpl.rcParams['font.size'] = 14
    mpl.rcParams['font.weight'] = 'bold'
    mpl.rcParams['axes.labelsize'] = 14
    plt.rcParams['legend.fontsize'] = 10
    mpl.rcParams['axes.titlesize'] = 16
    mpl.rcParams['axes.titleweight'] = 'bold'
    mpl.rcParams['figure.titlesize'] = 22
    mpl.rcParams['figure.titleweight'] = 'heavy'
    mpl.rcParams['axes.grid'] = True
    mpl.rcParams['grid.linestyle'] = '--'
    mpl.rcParams['grid.color'] = 'gray'
    mpl.rcParams['grid.linewidth'] = 0.5

except Exception as e:
    import warnings
    warnings.warn(f"Plot style setup skipped due to error: {e}")

## Definitions

[docs] def df2long(df, intv_sum='ME', intv_mean='YE', precip=False): """Resamples dataframes and converts them into long format to be passed to seaborn.lineplot().""" if precip: df = df.resample(intv_sum).sum().resample(intv_mean, label='left').mean() df = df.reset_index() df = df.melt('TIMESTAMP', var_name='model', value_name='prec') else: df = df.resample(intv_mean, label='left').mean() df = df.reset_index() df = df.melt('TIMESTAMP', var_name='model', value_name='temp') return df
[docs] def cmip_plot(ax, df, target, title=None, precip=False, smooth_window=10, agg_level='monthly', target_label='Target', show_target_label=False): """Plots climate model and target data using moving window smoothing.""" try: smooth_window = float(smooth_window) except ValueError: raise TypeError(f"smooth_window must be a number, got {type(smooth_window).__name__}") if precip: # Define aggregation frequency based on agg_level if agg_level == 'monthly': resample_freq = 'ME' smooth_window_units = int(smooth_window * 12) # Convert years to months elif agg_level == 'annual': resample_freq = 'YE' smooth_window_units = int(smooth_window) # Smooth window directly in years else: raise ValueError(f"Invalid agg_level: {agg_level}. Use 'monthly' or 'annual'.") # Convert daily data to specified aggregation level (monthly or annual) df_agg = df.resample(resample_freq).sum() target_agg = target['prec'].resample(resample_freq).sum() # Apply rolling mean on aggregated data ax.plot(df_agg.rolling(window=smooth_window_units, center=True).mean().iloc[:, :], linewidth=1.2) era_plot, = ax.plot(target_agg.rolling(window=smooth_window_units, center=True).mean(), linewidth=1.2, c='black', label=target_label) else: # Convert smoothing window from years to days for temperature smooth_window_days = int(smooth_window * 365.25) # Account for leap years # Apply rolling mean on daily data ax.plot(df.rolling(window=smooth_window_days, center=True).mean().iloc[:, :], linewidth=1.2) era_plot, = ax.plot(target['temp'].rolling(window=smooth_window_days, center=True).mean(), linewidth=1.2, c='black', label=target_label) ax.margins(x=0.001) if show_target_label: ax.legend(handles=[era_plot], loc='upper left') if title: ax.set_title(title) # Set y-labels for left-side plots using the corrected column check if ax.get_subplotspec().colspan.start == 0: if precip: ylabel = '[mm]' ax.set_ylabel(ylabel) else: ax.set_ylabel('[K]') ax.grid(True)
[docs] def cmip_plot_combined(data, target, precip=False, agg_level='monthly', smooth_window=10, target_label='Target', show=False, fig_path=None): """Combines multiple subplots of climate data in different scenarios before and after bias adjustment. Uses moving window smoothing in years instead of days.""" figure, axis = plt.subplots(2, 2, figsize=(12, 12), sharex="col", sharey="all") t_kwargs = {'target': target, 'smooth_window': smooth_window, 'target_label': target_label} p_kwargs = {'target': target, 'smooth_window': smooth_window, 'target_label': target_label, 'precip': True, 'agg_level': agg_level} if precip: cmip_plot(axis[0, 0], data['SSP2_raw'], show_target_label=True, title='SSP2 raw', **p_kwargs) cmip_plot(axis[0, 1], data['SSP2_adjusted'], title='SSP2 adjusted', **p_kwargs) cmip_plot(axis[1, 0], data['SSP5_raw'], title='SSP5 raw', **p_kwargs) cmip_plot(axis[1, 1], data['SSP5_adjusted'], title='SSP5 adjusted', **p_kwargs) figure.suptitle(f'{smooth_window} Year Rolling Mean of {agg_level.capitalize()} Precipitation') else: cmip_plot(axis[0, 0], data['SSP2_raw'], show_target_label=True, title='SSP2 raw', **t_kwargs) cmip_plot(axis[0, 1], data['SSP2_adjusted'], title='SSP2 adjusted', **t_kwargs) cmip_plot(axis[1, 0], data['SSP5_raw'], title='SSP5 raw', **t_kwargs) cmip_plot(axis[1, 1], data['SSP5_adjusted'], title='SSP5 adjusted', **t_kwargs) figure.suptitle(f'{smooth_window} Year Rolling Mean of Air Temperature') figure.legend(data['SSP5_adjusted'].columns, loc='lower right', ncol=6, mode="expand") figure.tight_layout() figure.subplots_adjust(bottom=0.15, top=0.92) if fig_path is not None: plt.savefig(fig_path) if show: plt.show()
[docs] def vplots(before, after, target, target_label='Target', precip=False, show=False, fig_path=None): """Creates violin plots of the kernel density estimation for all models before and after bias adjustment.""" period = slice('1979-01-01', '2022-12-31') if precip: var = 'prec' var_label = 'Annual Precipitation' unit = ' [mm]' else: var = 'temp' var_label = 'Mean Annual Air Temperature' unit = ' [K]' for i in before.keys(): before[i] = before[i].loc[period].copy() before[i][target_label] = target[var][period] for i in after.keys(): after[i] = after[i].loc[period].copy() after[i][target_label] = target[var][period] fig = plt.figure(figsize=(20, 20)) outer = fig.add_gridspec(1, 2) inner = outer[0].subgridspec(2, 1) axis = inner.subplots(sharex='col') all_data = pd.concat([df2long(before[i], precip=precip, intv_sum='YE') for i in before.keys()] + [df2long(after[i], precip=precip, intv_sum='YE') for i in after.keys()]) xmin, xmax = all_data[var].min(), all_data[var].max() xlim = (xmin * 0.95, xmax * 1.05) if precip else (xmin - 0.5, xmax + 0.5) # ---------------- "Before" Plots ---------------- for (i, k) in zip(before.keys(), range(0, 4, 1)): df = df2long(before[i], precip=precip, intv_sum='YE') axis[k].grid() sns.violinplot(ax=axis[k], x=var, y='model', hue='model', data=df, density_norm="count", bw_adjust=.2, palette='husl', legend=False) # Set y-axis labels manually axis[k].set_yticks(range(len(df['model'].unique()))) axis[k].set_yticklabels(df['model'].unique()) axis[k].set(xlim=xlim) axis[k].set_ylabel(i, fontsize=20, fontweight='bold') # Separate Reanalysis data tick_number = len(df['model'].unique()) before_last = tick_number - 1.5 axis[k].axhline(before_last, color='black', linestyle='--', linewidth=1) if k == 0: axis[k].set_title('Before Scaled Distribution Mapping') plt.xlabel(var_label + unit) # ---------------- "After" Plots ---------------- inner = outer[1].subgridspec(2, 1) axis = inner.subplots(sharex='col') for (i, k) in zip(after.keys(), range(0, 4, 1)): df = df2long(after[i], precip=precip, intv_sum='YE') axis[k].grid() sns.violinplot(ax=axis[k], x=var, y='model', hue='model', data=df, density_norm="count", bw_adjust=.2, palette='husl', legend=False) axis[k].set(xlim=xlim) axis[k].set_ylabel(i) axis[k].get_yaxis().set_visible(False) # Separate Reanalysis data tick_number = len(df['model'].unique()) before_last = tick_number - 1.5 axis[k].axhline(before_last, color='black', linestyle='--', linewidth=1) if k == 0: axis[k].set_title('After Scaled Distribution Mapping') plt.xlabel(var_label + unit) # ---------------- Global Title ---------------- starty = period.start.split('-')[0] endy = period.stop.split('-')[0] fig.suptitle('Kernel Density Estimation of ' + var_label + ' (' + starty + '-' + endy + ')') fig.tight_layout() fig.subplots_adjust(top=0.93) fig.subplots_adjust(hspace=0.05) if fig_path is not None: plt.savefig(fig_path) if show: plt.show()
[docs] def cmip_plot_ensemble(cmip, target, precip=False, intv_sum='ME', intv_mean='YE', figsize=(8, 6), show=True, fig_path=None): """ Plots the multi-model mean of climate scenarios including the 90% confidence interval. Parameters ---------- cmip : dict A dictionary with keys representing the different CMIP6 models and scenarios as pandas DataFrames containing data of temperature and/or precipitation. target : pandas.DataFrame DataFrame containing the historical reanalysis data. precip : bool, optional If True, plot the mean precipitation. If False, plot the mean temperature. Default is False. intv_sum : str, optional Interval for precipitation sums. Default is monthly ('ME'). intv_mean : str, optional Interval for the mean of temperature data or precipitation sums. Default is annual ('YE'). figsize : tuple, optional Figure size for the plot. Default is (8, 6). show : bool, optional If True, show the resulting plot. If False, do not show it. Default is True. fig_path : str or None, optional Path to save the plot. If None, the plot is not saved. """ warnings.filterwarnings(action='ignore') figure, axis = plt.subplots(figsize=figsize, constrained_layout=True) # Define color palette colors = ['#E25822', 'goldenrod', 'darkblue', 'dodgerblue'] # create a new dictionary with the same keys but new values from the list col_dict = {key: value for key, value in zip(cmip.keys(), colors)} #figure.tight_layout(rect=[0, 0.02, 1, 0.95]) # Make some room at the bottom if precip: for i in cmip.keys(): df = df2long(cmip[i], intv_sum=intv_sum, intv_mean=intv_mean, precip=True) sns.lineplot(data=df, x='TIMESTAMP', y='prec', color=col_dict[i]) axis.set(xlabel=None, ylabel='[mm]') if intv_sum=='ME': figure.suptitle('Mean Monthly Precipitation') elif intv_sum=='YE': figure.suptitle('Mean Annual Precipitation') target_plot = axis.plot(target.resample(intv_sum).sum().resample(intv_mean).mean(), linewidth=1.5, c='black', label='ERA5', linestyle='dashed') else: for i in cmip.keys(): df = df2long(cmip[i], intv_mean=intv_mean) sns.lineplot(data=df, x='TIMESTAMP', y='temp', color=col_dict[i]) axis.set(xlabel=None, ylabel='[K]') if intv_mean=='10YE': figure.suptitle('Mean 10y Air Temperature') elif intv_mean == 'YE': figure.suptitle('Mean Annual Air Temperature') elif intv_mean == 'ME': figure.suptitle('Mean Monthly Air Temperature') target_plot = axis.plot(target.resample(intv_mean).mean(), linewidth=1.5, c='black', label='ERA5', linestyle='dashed') axis.legend(['SSP2 raw', '_ci1', 'SSP2 adjusted', '_ci2', 'SSP5 raw', '_ci3', 'SSP5 adjusted', '_ci4'], loc="upper center", bbox_to_anchor=(0.39, -0.14), ncol=4, frameon=False) # First legend --> Workaround as seaborn lists CIs in legend leg = Legend(axis, target_plot, ['ERA5L'], loc='upper center', bbox_to_anchor=(0.89, -0.14), ncol=1, frameon=False) # Second legend (ERA5) axis.add_artist(leg) axis.grid(True, linestyle='--', alpha=0.7) # Dashed lines with transparency axis.margins(x=0.001) if fig_path is not None: plt.savefig(fig_path) if show: plt.show() warnings.filterwarnings(action='always')
[docs] def prob_plot(original, target, corrected, ax, title=None, ylabel="Temperature [K]", **kwargs): """ Combines probability plots of climate model data before and after bias adjustment and the target data. Parameters ---------- original : pandas.DataFrame The original climate model data. target : pandas.DataFrame The target data. corrected : pandas.DataFrame The climate model data after bias adjustment. ax : matplotlib.axes.Axes The axes on which to plot the probability plot. title : str, optional The title of the plot. Default is None. ylabel : str, optional The label for the y-axis. Default is "Temperature [K]". **kwargs : dict, optional Additional keyword arguments passed to the probscale.probplot() function. Returns ------- fig : matplotlib Figure The generated figure. """ scatter_kws = dict(label="", marker=None, linestyle="-") common_opts = dict(plottype="qq", problabel="", datalabel="", **kwargs) scatter_kws["label"] = "original" fig = probscale.probplot(original, ax=ax, scatter_kws=scatter_kws, **common_opts) scatter_kws["label"] = "target" fig = probscale.probplot(target, ax=ax, scatter_kws=scatter_kws, **common_opts) scatter_kws["label"] = "adjusted" fig = probscale.probplot(corrected, ax=ax, scatter_kws=scatter_kws, **common_opts) ax.set_title(title) ax.set_xlabel("Standard Normal Quantiles") ax.set_ylabel(ylabel) ax.grid(True) score = round(target.corr(corrected), 2) ax.text(0.05, 0.8, f"R² = {score}", transform=ax.transAxes, fontsize=15) return fig
[docs] def pp_matrix(original, target, corrected, scenario=None, nrow=7, ncol=5, precip=False, show=False, fig_path=None): """ Arranges the prob_plots of all CMIP6 models in a matrix and adds the R² score. Parameters ---------- original : pandas.DataFrame The original climate model data. target : pandas.DataFrame The target data. corrected : pandas.DataFrame The climate model data after bias adjustment. scenario : str, optional The climate scenario to be added to the plot title. nrow : int, optional The number of rows in the plot matrix. Default is 7. ncol : int, optional The number of columns in the plot matrix. Default is 5. precip : bool, optional Indicates whether the data is precipitation data. Default is False. show : bool, optional Indicates whether to display the plot. Default is False. Returns ------- None """ period = slice('1979-01-01', '2022-12-31') if precip: var = 'Precipitation' var_label = 'Monthly ' + var unit = ' [mm]' original = original.resample('ME').sum() target = target.resample('ME').sum() corrected = corrected.resample('ME').sum() else: var = 'Temperature' var_label = 'Daily Mean ' + var unit = ' [K]' fig = plt.figure(figsize=(16, 16)) for i, col in enumerate(original.columns): ax = plt.subplot(nrow, ncol, i + 1) prob_plot(original[col][period], target[period], corrected[col][period], ax=ax, ylabel=var + unit) ax.set_title(col, fontweight='bold') handles, labels = ax.get_legend_handles_labels() fig.legend(handles, ['original (CMIP6 raw)', 'target (ERA5-Land)', 'adjusted (CMIP6 after SDM)'], loc='lower right', bbox_to_anchor=(0.96, 0.024), fontsize=20) plt.tight_layout() fig.subplots_adjust(hspace=0.7, wspace=0.4) starty = period.start.split('-')[0] endy = period.stop.split('-')[0] if scenario is None: fig.suptitle('Probability Plots of CMIP6 and ERA5-Land ' + var_label + ' (' + starty + '-' + endy + ')') else: fig.suptitle('Probability Plots of CMIP6 (' + scenario + ') and ERA5-Land ' + var_label + ' (' + starty + '-' + endy + ')') plt.subplots_adjust(top=0.93) if fig_path is not None: plt.savefig(fig_path) if show: plt.show()
class MatildaSummary: def __init__(self, dir_input, dir_output, settings): self.dir_input = dir_input self.dir_output = dir_output self.settings = settings self.arrow_props = dict(facecolor='grey', edgecolor='grey', arrowstyle='-', linewidth=0.5) self.matilda_scenarios = None self.obs = None self.df_era5 = None self.tas = None self.pr = None self.runoff = None self.evaporation = None self.precipitation = None self.glacier_area = None self.snow_melt = None self.ice_melt = None self.off_melt = None self.melt_ssp2 = None self.melt_ssp5 = None self.melt_diff = None self.data_loaded = False def load_data(self): """Load all necessary data for plotting""" # Load observation data self.obs = pd.read_csv(self.dir_input + 'obs_runoff_example.csv') self.obs.index = pd.to_datetime(self.obs['Date']) self.obs['Date'] = self.obs.index self.obs["Qobs"] = self.obs["Qobs"] * 86400 / (self.settings['area_cat'] * 1000000) * 1000 # Load ERA5 data self.df_era5 = pd.read_csv(self.dir_output + 'ERA5L.csv', **{ 'usecols': ['temp', 'prec', 'dt'], 'index_col': 'dt', 'parse_dates': ['dt']}).resample('D').agg({'temp': 'mean', 'prec': 'sum'}) # Adjust start date self.df_era5.index = pd.to_datetime(self.df_era5.index) self.df_era5 = self.df_era5[self.df_era5.index >= '2000-01-01'] self.obs = self.obs[self.obs.index >= '2000-01-01'] # Load climate model data self.tas = pickle_to_dict(f"{self.dir_output}cmip6/adjusted/tas.pickle") self.pr = pickle_to_dict(f"{self.dir_output}cmip6/adjusted/pr.pickle") self.adjust_startdate(self.tas) self.adjust_startdate(self.pr) # Load MATILDA scenarios self.matilda_scenarios = pickle_to_dict(f"{self.dir_output}cmip6/adjusted/matilda_scenarios.pickle") # Prepare data for plotting self.prepare_data_for_plot() # Set data flag self.data_loaded = True def adjust_startdate(self, data_dict, start_date='2000-01-01'): """Adjust start date for dictionaries of dataframes""" for key in data_dict: df = data_dict[key] df.index = pd.to_datetime(df.index) data_dict[key] = df[df.index >= start_date] def get_matilda_result_all_models(self, scenario, result_name, dict_name='model_output'): """Extract specific result from all models for a given scenario""" df = pd.DataFrame() for key, value in self.matilda_scenarios[scenario].items(): s = value[dict_name][result_name] s.name = key df = pd.concat([df, s], axis=1) df.index = pd.to_datetime(df.index) df.index.name = 'TIMESTAMP' print(f'{result_name} extracted for {scenario}') return df def prepare_data_for_plot(self): """Prepare all necessary data for plotting""" self.runoff = {'SSP2': self.get_matilda_result_all_models('SSP2', 'total_runoff'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'total_runoff')} self.evaporation = {'SSP2': self.get_matilda_result_all_models('SSP2', 'actual_evaporation'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'actual_evaporation')} self.precipitation = {'SSP2': self.get_matilda_result_all_models('SSP2', 'total_precipitation'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'total_precipitation')} self.glacier_area = {'SSP2': self.get_matilda_result_all_models('SSP2', 'glacier_area', dict_name='glacier_rescaling'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'glacier_area', dict_name='glacier_rescaling')} self.snow_melt = {'SSP2': self.get_matilda_result_all_models('SSP2', 'snow_melt_on_glaciers'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'snow_melt_on_glaciers')} self.ice_melt = {'SSP2': self.get_matilda_result_all_models('SSP2', 'ice_melt_on_glaciers'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'ice_melt_on_glaciers')} self.off_melt = {'SSP2': self.get_matilda_result_all_models('SSP2', 'melt_off_glaciers'), 'SSP5': self.get_matilda_result_all_models('SSP5', 'melt_off_glaciers')} # Melt off glacier is always snow melt self.snow_melt['SSP2'] = self.snow_melt['SSP2'] + self.off_melt['SSP2'] self.snow_melt['SSP5'] = self.snow_melt['SSP5'] + self.off_melt['SSP5'] # Glacier area starts one year earlier self.adjust_startdate(self.glacier_area) # Convert Temp from K to °C for scenario in ['SSP2', 'SSP5']: for col in self.tas[scenario]: self.tas[scenario][col] = self.tas[scenario][col] - 273.15 self.df_era5['temp'] = self.df_era5['temp'] - 273.15 # Process melt data self.process_melt_data() def process_melt_data(self): """Process and aggregate melt data for plotting""" self.melt_ssp2 = pd.DataFrame() self.melt_ssp2['ssp2_avg_snow'] = self.snow_melt['SSP2'].mean(axis=1) self.melt_ssp2['ssp2_avg_ice'] = self.ice_melt['SSP2'].mean(axis=1) self.melt_ssp2 = self.melt_ssp2.resample('YE').sum() self.melt_ssp2.index = self.melt_ssp2.index.map(lambda x: x.replace(month=1, day=1)) self.melt_ssp5 = pd.DataFrame() self.melt_ssp5['ssp5_avg_snow'] = self.snow_melt['SSP5'].mean(axis=1) self.melt_ssp5['ssp5_avg_ice'] = self.ice_melt['SSP5'].mean(axis=1) self.melt_ssp5['ssp5_avg_off'] = self.off_melt['SSP5'].mean(axis=1) self.melt_ssp5 = self.melt_ssp5.resample('YE').sum() self.melt_ssp5.index = self.melt_ssp5.index.map(lambda x: x.replace(month=1, day=1)) self.melt_diff = pd.DataFrame() self.melt_diff['diff_snow'] = self.melt_ssp5['ssp5_avg_snow'] - self.melt_ssp2['ssp2_avg_snow'] self.melt_diff['diff_ice'] = self.melt_ssp5['ssp5_avg_ice'] - self.melt_ssp2['ssp2_avg_ice'] def custom_formatter_abs_val(self, value, _): """Format axis labels for absolute values""" return f'{abs(value):.0f}' def annotate_final_val(self, ax, y_axes, y_text, text, unit): """Annotate the final value on a plot""" ax.annotate(f'{text:.2f} {unit}', xy=(1, y_axes), xytext=(55, y_text), va='center', ha='right', fontsize=8, xycoords=('axes fraction', 'data'), textcoords=('offset points', 'data'), arrowprops=self.arrow_props) def annote_final_val_lines(self, ax, unit, min_dist=0): """Annotate final values for all lines on an axis""" # First collect all y-values yvals = [] for line in ax.lines: ls = line.get_ls() if ls == ':' or ls == '--': ydata = line.get_ydata() yvals.append(ydata[-1]) # Sort them and ensure minimum distance if supplied (lower values have to move) yvals.sort(reverse=True) for i, y in enumerate(yvals): if i > 0: diff = yvals[i-1] - yvals[i] if diff < min_dist: yvals[i] = yvals[i] + diff - min_dist self.annotate_final_val(ax, y, yvals[i], y, unit) def df2long(self, df, val_name, intv_sum=None, intv_mean='YE', rolling=None, cutoff=None): """Resamples dataframes and converts them into long format for seaborn plots""" if intv_sum is not None: df = df.resample(intv_sum, label='right').sum() df = df.resample(intv_mean, label='right').mean() if rolling is not None: df = df.rolling(rolling).mean() if cutoff is not None: df = df.loc[cutoff:] # Adjust the index to assign the first day of the period if intv_mean == 'YE': df.index = df.index.map(lambda x: x.replace(month=1, day=1)) elif intv_mean == 'M': df.index = df.index.map(lambda x: x.replace(day=1)) # Reset index to turn the timestamp index into a column df = df.reset_index() # Melt the dataframe into long format df = df.melt('TIMESTAMP', var_name='model', value_name=val_name) return df def add_cmip_ensemble(self, param_scenarios, val_name, ylabel, ax, ylim=None, target=None, target_color='black', linestyle='solid', rolling=None, cutoff=None, intv_sum='YE', ylabel_pad=0): """Add CMIP ensemble to a plot""" colors = ['orange', 'dodgerblue'] col_dict = {key: value for key, value in zip(param_scenarios.keys(), colors)} linestyles = ['dotted', 'dashed'] ls_dict = {key: value for key, value in zip(param_scenarios.keys(), linestyles)} for i in param_scenarios.keys(): df_pred = self.df2long(param_scenarios[i], val_name, intv_sum=intv_sum, intv_mean='YE', rolling=rolling, cutoff=cutoff) sns.lineplot(data=df_pred, x='TIMESTAMP', y=val_name, color=target_color, ax=ax, linestyle=ls_dict[i]) ax.set_ylabel(ylabel, labelpad=ylabel_pad, fontsize = 12) if ylim is not None: ax.set_ylim(ylim) if target is not None: ax.plot(target, linewidth=1.5, c=target_color) ax.yaxis.set_ticks_position('left') ax.tick_params(axis='y', right=False, labelright=False) def ensemble_max(self, param_scenarios, val_name, rolling=None, cutoff=None, intv_sum='YE'): """Calculate ensemble maximum with confidence interval""" all_dfs = [] # Step 1: Concatenate all scenarios into one long dataframe for i in param_scenarios.keys(): df_pred = self.df2long(param_scenarios[i], val_name, intv_sum=intv_sum, intv_mean='YE', rolling=rolling, cutoff=cutoff) df_pred["scenario"] = i all_dfs.append(df_pred) df_all = pd.concat(all_dfs) # Step 2: Compute ensemble mean and 95% CI at each timestamp grouped = df_all.groupby("TIMESTAMP")[val_name] df_ci = grouped.agg(['mean', 'count', 'std']).reset_index() df_ci['ci95'] = stats.t.ppf(0.975, df_ci['count'] - 1) * (df_ci['std'] / np.sqrt(df_ci['count'])) # Step 3: Add mean + upper confidence bound df_ci['upper'] = df_ci['mean'] + df_ci['ci95'] # Step 4: Return the maximum of the upper bound return round(df_ci['upper'].max()) def plot_summary(self, rolling=None, save_path=None, show=False): """Create the main summary plot with all components""" print("Creating MATILDA summary plot...") if not self.data_loaded: print("Loading data. Call .load_data() explicitly if you want to modify the data before plotting.") self.load_data() # Set up the figure with gridspec gridspec = dict(hspace=0.0, height_ratios=[1, 2, 4, 1]) figure, axs = plt.subplots(nrows=4, ncols=1, figsize=(8, 8), sharex=True, gridspec_kw=gridspec) # ----- Glacierized Area (top panel) ----- print("Plotting glacierized area...") ax0l = axs[0] self.add_cmip_ensemble(param_scenarios=self.glacier_area, val_name='glac_area', ylabel='Glacierized\nArea (km²)', ax=ax0l, target_color='darkviolet', ylabel_pad=10) # Annotate final glacier values with percentage for line in ax0l.lines: ls = line.get_ls() if ls == ':' or ls == '--': ydata = line.get_ydata() last_val = ydata[-1] perc_val = last_val / max(ydata) * 100 label = "{:.0f} km² ({:.0f}%)".format(last_val, perc_val) ax0l.annotate(label, xy=(1, last_val), xytext=(55, min(max(ydata) * 0.9, last_val * 4)), va='center', ha='right', fontsize=8, xycoords=('axes fraction', 'data'), textcoords=('offset points', 'data'), arrowprops=self.arrow_props) # 50% line half_y = ax0l.get_ylim()[1] / 2 ax0l.axhline(y=half_y, color='lightgrey', linestyle=':', linewidth=1) ax0l.text(dt.datetime(2001, 1, 1), half_y, f"50%", ha='left', va='bottom', size=8, color='grey') # ----- Snow & Ice Melt (second panel) ----- print("Plotting snow & ice melt...") ax1l = axs[1] # Create stacked plots for both scenarios col = ["#eaeaea", "#d1e3ff"] # Colors for snow and ice melt ax1l.stackplot(self.melt_ssp5.index, self.melt_ssp5['ssp5_avg_snow'], self.melt_ssp5['ssp5_avg_ice'], colors=col,edgecolor='none') ax1l.stackplot(self.melt_ssp2.index, self.melt_ssp2['ssp2_avg_snow']*-1, self.melt_ssp2['ssp2_avg_ice']*-1, colors=col, edgecolor='none') ax1l.axhline(y=0, color='white', linestyle='-') # Zero line # Plot differences ax1l.plot(self.melt_diff.index, self.melt_diff['diff_snow'], color='#b6b6b6') ax1l.plot(self.melt_diff.index, self.melt_diff['diff_ice'], color='#a1bceb') # Auto-scale y-axis to match data range ymax_ax1l = max(max(self.melt_ssp5['ssp5_avg_snow'] + self.melt_ssp5['ssp5_avg_ice']), max(self.melt_ssp2['ssp2_avg_snow'] + self.melt_ssp2['ssp2_avg_ice'])) ymax_ax1l_upper = round(ymax_ax1l*1.55, 1) # Add space for legend ymax_ax1l_lower = round(-ymax_ax1l*1.1, 1) ax1l.set_ylim(ymax_ax1l_lower, ymax_ax1l_upper) ax1l.set_ylabel('Melt (mm/a)', labelpad=8, fontsize = 12) # Annotate final valuess y = self.melt_ssp5['ssp5_avg_snow'].iloc[-1] self.annotate_final_val(ax1l, y, ymax_ax1l*0.3, abs(y), 'mm') y = self.melt_ssp5['ssp5_avg_ice'].iloc[-1]+self.melt_ssp5['ssp5_avg_snow'].iloc[-1] self.annotate_final_val(ax1l, y, ymax_ax1l*0.8, abs(y), 'mm') y = self.melt_ssp2['ssp2_avg_snow'].iloc[-1]*-1 self.annotate_final_val(ax1l, y, ymax_ax1l*-0.3, abs(y), 'mm') y = (self.melt_ssp2['ssp2_avg_ice'].iloc[-1]+self.melt_ssp2['ssp2_avg_snow'].iloc[-1])*-1 self.annotate_final_val(ax1l, y, ymax_ax1l*-0.8, abs(y), 'mm') ax1l.yaxis.set_major_formatter(lambda x, pos: f'{abs(x):.0f}') # ----- Runoff & Precipitation (third panel) ----- print("Plotting runoff & precipitation...") ax2l = axs[2] # Auto-scale y-axis to match data range ymax_ax2l = max(self.ensemble_max(param_scenarios=self.runoff, val_name='runoff', rolling=rolling, cutoff='2000-12-31'), self.ensemble_max(param_scenarios=self.evaporation, val_name='eva', rolling=rolling, cutoff='2000-12-31'), self.ensemble_max(param_scenarios=self.precipitation, val_name='prec', rolling=rolling, cutoff='2000-12-31')) ymax_ax2l = ymax_ax2l * 1.3 # Plot runoff with observations if rolling is not None: obs_rs = self.obs['Qobs'].resample('YE').agg(pd.Series.sum, skipna=False).rolling(rolling, min_periods=2).mean() else: obs_rs = self.obs['Qobs'].resample('YE').agg(pd.Series.sum, skipna=False).mean() self.add_cmip_ensemble(param_scenarios=self.runoff, val_name='runoff', ylabel=' (mm/a)', ylim=(0, ymax_ax2l), target=obs_rs, target_color='blue', ax=ax2l, rolling=rolling, cutoff='2000-12-31') # Plot evaporation self.add_cmip_ensemble(param_scenarios=self.evaporation, val_name='eva', ylabel=' (mm/a)', target=None, target_color='green', ax=ax2l, rolling=rolling, cutoff='2000-12-31') # Plot precipitation self.add_cmip_ensemble(param_scenarios=self.precipitation, val_name='prec', ylabel=' (mm/a)', target=None, target_color='darkgrey', ax=ax2l, rolling=rolling, cutoff='2000-12-31', ylabel_pad=5) self.annote_final_val_lines(ax2l, 'mm', min_dist=100) # Add observation data rectangles first = True for index, row in self.obs.dropna().iterrows(): #start = mdates.date2num(row['Date']) label = "Runoff observation period" if first else None start = row['Date'] ax2l.axvspan(start, start + timedelta(days=1), color=(1.0, 0.9311, 0.8219), alpha=1, zorder=0) first = False ax2l.axvline(self.df_era5.index[-1], color='salmon') # Line to separate historical and projection periods # ----- Temperature (bottom panel) ----- print("Plotting temperature...") ax3l = axs[3] # Process ERA5 temperature for plotting if rolling is not None: era5_temp_rs = self.df_era5['temp'].resample('YE').agg(pd.Series.mean, skipna=False).rolling(rolling, min_periods=2).mean() else: era5_temp_rs = self.df_era5['temp'].resample('YE').agg(pd.Series.mean, skipna=False).mean() self.add_cmip_ensemble(param_scenarios=self.tas, val_name='temp', ylabel='Temp. (°C)', target=era5_temp_rs, target_color='red', ax=ax3l, rolling=rolling, cutoff='2000-12-31', intv_sum=None, ylabel_pad=10) self.annote_final_val_lines(ax3l, '°C') # Add 0°C line ax3l.axhline(y=0, color='lightgrey', linestyle=':', linewidth=1) ax3l.text(dt.datetime(2001, 1, 1), 0, f"0 °C", ha='left', va='bottom', size=8, color='grey') ax3l.axvline(self.df_era5.index[-1], color='salmon') # Present day line # ----- Create legends ----- print("Creating legends...") # Snow & Ice melt legend ax1l.legend(['Snow Melt', 'Ice Melt', '_Snow', '_Ice', '_White', 'SSP5-SSP2', 'SSP5-SSP2'], ncol=2, fontsize="8", loc="upper left", frameon=False) # Runoff, evaporation, precipitation legend ax2l_legend = ax2l.legend(['_SSP2', 'Runoff', '_SSP5', '_CI5', '_Runoff', '_SSP2', 'Evaporation', '_SSP5', '_CI5', '_SSP2', 'Precipitation', '_SSP5', '_CI5', 'Runoff observation period'], ncol=4, fontsize="8", loc="upper left", frameon=True) ax2l_legend.get_frame().set_facecolor('white') ax2l_legend.get_frame().set_edgecolor('white') # Scenario legend scenario_legend = ax3l.legend(['SSP2 Scenario', '_ci1', 'SSP5 Scenario', '_ci2'], loc="lower right", bbox_to_anchor=(1, -0.8), ncol=2, frameon=True,fontsize = 8) # ----- Add text annotations ----- style = dict(size=8, color='black') ax1l.text(dt.datetime(2001, 1, 1), 5, f"SSP5", ha='left', va='bottom', **style) ax1l.text(dt.datetime(2001, 1, 1), -6, f"SSP2", ha='left', va='top', **style) ax1l.text(dt.datetime(2099, 1, 1), 120, f"Diff +", ha='right', va='bottom', **style) ax1l.text(dt.datetime(2099, 1, 1), -120, f"Diff -", ha='right', va='top', **style) if rolling is not None: ax2l.text(dt.datetime(2100, 1, 1), 0, f"{rolling} year rolling mean", ha='right', va='bottom', style='italic', **style) ax3l.text(dt.datetime(2100, 1, 1), era5_temp_rs.dropna().min(), f"{rolling} year rolling mean", ha='right', va='bottom', style='italic', **style) # ----- Final formatting ----- print("Final formatting...") ax3l.xaxis.set_major_locator(mdates.YearLocator(base=10)) for ax in axs: ax.margins(x=0) ax.set_xlim([dt.datetime(2000, 1, 1), None]) ax.grid(visible=False) ax.tick_params(axis='y', right=False, labelright=False) ax.tick_params(axis='x', which='both', bottom=True, top=False, labelbottom=True) for ax in [ax1l, ax2l]: ax.grid(axis='y', color='lightgrey', linestyle='--', dashes=(5, 5)) ax3l.set_xlabel("Year", fontsize = 12) plt.suptitle(f"MATILDA Summary", fontweight='bold', fontsize=16) figure.tight_layout() # Save figure if path is provided if save_path: figure.savefig(save_path, dpi=300) print(f"Figure saved to {save_path}") print(">> DONE! <<") return figure
[docs] def custom_df_matilda(dic, scenario, var, resample_freq=None): """ Takes a dictionary of model outputs and returns a combined dataframe of a specific variable for a given scenario. Parameters ---------- dic : dict A nested dictionary of model outputs. The outer keys are scenario names and the inner keys are model names. The corresponding values are dictionaries containing two keys: - 'model_output' (DataFrame): containing model outputs for a given scenario and model - 'glacier_rescaling' (DataFrame): containing glacier properties for a given scenario and model scenario : str The name of the scenario to select from the dictionary. var : str The name of the variable to extract from the model output DataFrame. resample_freq : str, optional The frequency of the resulting time series data. Defaults to None (i.e. no resampling). If provided, should be in pandas resample frequency string format. Returns ------- pandas.DataFrame A DataFrame containing the combined data of the specified variable for the selected scenario and models. The DataFrame is indexed by the time steps of the original models. The columns are the names of the models in the selected scenario. Raises ------ ValueError If the provided `var` string is not one of the following: ['avg_temp_catchment', 'avg_temp_glaciers', 'evap_off_glaciers', 'prec_off_glaciers', 'prec_on_glaciers', 'rain_off_glaciers', 'snow_off_glaciers', 'rain_on_glaciers', 'snow_on_glaciers', 'snowpack_off_glaciers', 'soil_moisture', 'upper_groundwater', 'lower_groundwater', 'melt_off_glaciers', 'melt_on_glaciers', 'ice_melt_on_glaciers', 'snow_melt_on_glaciers', 'refreezing_ice', 'refreezing_snow', 'total_refreezing', 'SMB', 'actual_evaporation', 'total_precipitation', 'total_melt', 'runoff_without_glaciers', 'runoff_from_glaciers', 'runoff_ratio', 'total_runoff', 'glacier_area', 'glacier_elev', 'smb_water_year', 'smb_scaled', 'smb_scaled_capped', 'smb_scaled_capped_cum', 'surplus', 'glacier_melt_perc', 'glacier_mass_mmwe', 'glacier_vol_m3', 'glacier_vol_perc'] """ out1_cols = ['avg_temp_catchment', 'avg_temp_glaciers', 'evap_off_glaciers', 'prec_off_glaciers', 'prec_on_glaciers', 'rain_off_glaciers', 'snow_off_glaciers', 'rain_on_glaciers', 'snow_on_glaciers', 'snowpack_off_glaciers', 'soil_moisture', 'upper_groundwater', 'lower_groundwater', 'melt_off_glaciers', 'melt_on_glaciers', 'ice_melt_on_glaciers', 'snow_melt_on_glaciers', 'refreezing_ice', 'refreezing_snow', 'total_refreezing', 'SMB', 'actual_evaporation', 'total_precipitation', 'total_melt', 'runoff_without_glaciers', 'runoff_from_glaciers', 'runoff_ratio', 'total_runoff'] out2_cols = ['glacier_area', 'glacier_elev', 'smb_water_year', 'smb_scaled', 'smb_scaled_capped', 'smb_scaled_capped_cum', 'surplus', 'glacier_melt_perc', 'glacier_mass_mmwe', 'glacier_vol_m3', 'glacier_vol_perc'] if var in out1_cols: output_df = 'model_output' elif var in out2_cols: output_df = 'glacier_rescaling' else: raise ValueError("var needs to be one of the following strings: " + str([i for i in [out1_cols, out2_cols]])) # Create an empty list to store the dataframes dfs = [] # Loop over the models in the selected scenario for model in dic[scenario].keys(): # Get the dataframe for the current model df = dic[scenario][model][output_df] # Append the dataframe to the list of dataframes dfs.append(df[var]) # Concatenate the dataframes into a single dataframe combined_df = pd.concat(dfs, axis=1) # Set the column names of the combined dataframe to the model names combined_df.columns = dic[scenario].keys() # Resample time series if resample_freq is not None: if output_df == 'glacier_rescaling': if resample_freq == '10YE': if var in ['glacier_area', 'glacier_elev']: combined_df = combined_df.resample(resample_freq).mean() else: combined_df = combined_df.resample(resample_freq).sum() else: if var in ['avg_temp_catchment', 'avg_temp_glaciers']: combined_df = combined_df.resample(resample_freq).mean() else: combined_df = combined_df.resample(resample_freq).sum() return combined_df
[docs] def plot_ci_matilda(var, dic, resample_freq='YE', show=False): """ Plot the multi-model mean and confidence intervals of a given variable for two different scenarios. Parameters ---------- var : str The variable to plot. dic : dict, optional A dictionary containing the scenarios as keys and the DataFrames as values. Default is matilda_scenarios. resample_freq : str, optional The resampling frequency to apply to the data. Default is 'YE'. show : bool, optional Whether to show the resulting plot or not. Default is False. Returns ------- go.Figure A plotly figure object containing the mean and confidence intervals for the given variable in the two selected scenarios. """ if var is None: var = 'total_runoff' # Default if nothing selected # SSP2 df1 = custom_df_matilda(dic, scenario='SSP2', var=var, resample_freq=resample_freq) df1_ci = confidence_interval(df1) # SSP5 df2 = custom_df_matilda(dic, scenario='SSP5', var=var, resample_freq=resample_freq) df2_ci = confidence_interval(df2) fig = go.Figure([ # SSP2 go.Scatter( name='SSP2', x=df1_ci.index, y=round(df1_ci['mean'], 2), mode='lines', line=dict(color='darkorange'), ), go.Scatter( name='95% CI Upper', x=df1_ci.index, y=round(df1_ci['ci_upper'], 2), mode='lines', marker=dict(color='#444'), line=dict(width=0), showlegend=False ), go.Scatter( name='95% CI Lower', x=df1_ci.index, y=round(df1_ci['ci_lower'], 2), marker=dict(color='#444'), line=dict(width=0), mode='lines', fillcolor='rgba(255, 165, 0, 0.3)', fill='tonexty', showlegend=False ), # SSP5 go.Scatter( name='SSP5', x=df2_ci.index, y=round(df2_ci['mean'], 2), mode='lines', line=dict(color='darkblue'), ), go.Scatter( name='95% CI Upper', x=df2_ci.index, y=round(df2_ci['ci_upper'], 2), mode='lines', marker=dict(color='#444'), line=dict(width=0), showlegend=False ), go.Scatter( name='95% CI Lower', x=df2_ci.index, y=round(df2_ci['ci_lower'], 2), marker=dict(color='#444'), line=dict(width=0), mode='lines', fillcolor='rgba(0, 0, 255, 0.3)', fill='tonexty', showlegend=False ) ]) fig.update_layout( xaxis_title='Year', yaxis_title=matilda_vars[var][0] + ' [' + matilda_vars[var][1] + ']', title={'text': '<b>' + matilda_vars[var][0] + '</b>', 'font': {'size': 28, 'color': 'darkblue', 'family': 'Arial'}}, legend={'font': {'size': 18, 'family': 'Arial'}}, hovermode='x', plot_bgcolor='rgba(255, 255, 255, 1)', # Set the background color to white margin=dict(l=10, r=10, t=90, b=10), # Adjust the margins to remove space around the plot xaxis=dict(gridcolor='lightgrey'), # set the grid color of x-axis to lightgrey yaxis=dict(gridcolor='lightgrey'), # set the grid color of y-axis to lightgrey ) fig.update_yaxes(rangemode='tozero') # show figure if show: fig.show() else: return fig
def matilda_dash(app,dic,fig_count=4, default_vars=['total_runoff', 'total_precipitation', 'runoff_from_glaciers', 'glacier_area'], display_mode='inLine'): # If number of default variables does not match the number of figures use variables in default order if fig_count != len(default_vars): default_vars = list(matilda_vars.keys()) # Create separate callback functions for each dropdown menu and graph combination for i in range(fig_count): @app.callback( Output(f'line-plot-{i}', 'figure'), Input(f'arg-dropdown-{i}', 'value'), Input(f'freq-dropdown-{i}', 'value') ) def update_figure(selected_arg, selected_freq, i=i): fig = plot_ci_matilda(selected_arg,dic=dic, resample_freq=selected_freq+'E') # adding 'E' to resample freq due to deprecated warning (e.g. 'YE' instead of 'Y') return fig # Define the dropdown menus and figures dropdowns_and_figures = [] for i in range(fig_count): arg_dropdown = dcc.Dropdown( id=f'arg-dropdown-{i}', options=[{'label': matilda_vars[var][0], 'value': var} for var in matilda_vars.keys()], value=default_vars[i], clearable=False, style={'width': '400px', 'fontFamily': 'Arial', 'fontSize': 15} ) freq_dropdown = dcc.Dropdown( id=f'freq-dropdown-{i}', options=[{'label': freq, 'value': freq} for freq in ['M', 'Y', '10Y']], value='Y', clearable=False, style={'width': '100px'} ) dropdowns_and_figures.append( html.Div([ html.Div([ html.Label("Variable:"), arg_dropdown, ], style={'display': 'inline-block', 'margin-right': '30px'}), html.Div([ html.Label("Frequency:"), freq_dropdown, ], style={'display': 'inline-block'}), dcc.Graph(id=f'line-plot-{i}'), ]) ) # Combine the dropdown menus and figures into a single layout app.layout = html.Div(dropdowns_and_figures)
[docs] def plot_ci_indicators(var, dic, plot_type='line', show=False): """ Plot the multi-model mean and confidence intervals of a given variable for two different scenarios. Parameters ---------- var : str The variable to plot. dic : dict, optional A dictionary containing the scenarios as keys and the DataFrames as values. Default is matilda_scenarios. plot_type : str, optional Whether the plot should be a line or a bar plot. Default is 'line'. show : bool, optional Whether to show the resulting plot or not. Default is False. Returns ------- go.Figure A plotly figure object containing the mean and confidence intervals for the given variable in the two selected scenarios. """ if var is None: var = 'total_runoff' # Default if nothing selected # SSP2 df1 = custom_df_indicators(dic, scenario='SSP2', var=var) df1_ci = confidence_interval(df1) # SSP5 df2 = custom_df_indicators(dic, scenario='SSP5', var=var) df2_ci = confidence_interval(df2) if plot_type == 'line': fig = go.Figure([ # SSP2 go.Scatter( name='SSP2', x=df1_ci.index, y=round(df1_ci['mean'], 2), mode='lines', line=dict(color='darkorange'), ), go.Scatter( name='95% CI Upper', x=df1_ci.index, y=round(df1_ci['ci_upper'], 2), mode='lines', marker=dict(color='#444'), line=dict(width=0), showlegend=False ), go.Scatter( name='95% CI Lower', x=df1_ci.index, y=round(df1_ci['ci_lower'], 2), marker=dict(color='#444'), line=dict(width=0), mode='lines', fillcolor='rgba(255, 165, 0, 0.3)', fill='tonexty', showlegend=False ), # SSP5 go.Scatter( name='SSP5', x=df2_ci.index, y=round(df2_ci['mean'], 2), mode='lines', line=dict(color='darkblue'), ), go.Scatter( name='95% CI Upper', x=df2_ci.index, y=round(df2_ci['ci_upper'], 2), mode='lines', marker=dict(color='#444'), line=dict(width=0), showlegend=False ), go.Scatter( name='95% CI Lower', x=df2_ci.index, y=round(df2_ci['ci_lower'], 2), marker=dict(color='#444'), line=dict(width=0), mode='lines', fillcolor='rgba(0, 0, 255, 0.3)', fill='tonexty', showlegend=False ) ]) elif plot_type == 'bar': fig = go.Figure([ # SSP2 go.Bar( name='SSP2', x=df1_ci.index, y=round(df1_ci['mean'], 2), marker=dict(color='darkorange'), error_y=dict( type='data', symmetric=False, array=round(df1_ci['mean'] - df1_ci['ci_lower'], 2), arrayminus=round(df1_ci['ci_upper'] - df1_ci['mean'], 2), color='grey' ) ), # SSP5 go.Bar( name='SSP5', x=df2_ci.index, y=round(df2_ci['mean'], 2), marker=dict(color='darkblue'), error_y=dict( type='data', symmetric=False, array=round(df2_ci['mean'] - df2_ci['ci_lower'], 2), arrayminus=round(df2_ci['ci_upper'] - df2_ci['mean'], 2), color='grey' ) ) ]) else: raise ValueError("Invalid property specified for 'plot_type'. Choose either 'line' or 'bar'") fig.update_layout( xaxis_title='Year', yaxis_title=indicator_vars[var][0] + ' [' + indicator_vars[var][1] + ']', title={'text': '<b>' + indicator_vars[var][0] + '</b>', 'font': {'size': 28, 'color': 'darkblue', 'family': 'Palatino'}}, legend={'font': {'size': 18, 'family': 'Palatino'}}, hovermode='x', plot_bgcolor='rgba(255, 255, 255, 1)', # Set the background color to white margin=dict(l=10, r=10, t=90, b=10), # Adjust the margins to remove space around the plot xaxis=dict(gridcolor='lightgrey'), # set the grid color of x-axis to lightgrey yaxis=dict(gridcolor='lightgrey'), # set the grid color of y-axis to lightgrey ) fig.update_yaxes(rangemode='tozero') # show figure if show: fig.show() else: return fig
def matilda_indicators_dash(app, indicator_data, fig_count=4, default_vars=['peak_day', 'melt_season_length', 'potential_aridity', 'spei12'], default_types=['line', 'line', 'line', 'bar']): for i in range(fig_count): @app.callback( Output(f'line-plot-{i}', 'figure'), Input(f'arg-dropdown-{i}', 'value'), Input(f'type-dropdown-{i}', 'value') ) def update_figure(selected_arg, selected_type, i=i): fig = plot_ci_indicators(selected_arg, indicator_data, selected_type) return fig dropdowns_and_figures = [] for i in range(fig_count): arg_dropdown = dcc.Dropdown( id=f'arg-dropdown-{i}', options=[{'label': indicator_vars[var][0], 'value': var} for var in indicator_vars.keys()], value=default_vars[i], clearable=False, style={'width': '400px', 'fontFamily': 'Palatino', 'fontSize': 15} ) type_dropdown = dcc.Dropdown( id=f'type-dropdown-{i}', options=[{'label': lab, 'value': val} for lab, val in [('Line', 'line'), ('Bar', 'bar')]], value=default_types[i], clearable=False, style={'width': '150px'} ) dropdowns_and_figures.append( html.Div([ html.Div([ html.Label("Variable:"), arg_dropdown, ], style={'display': 'inline-block', 'margin-right': '30px'}), html.Div([ html.Label("Plot Type:"), type_dropdown, ], style={'display': 'inline-block'}), dcc.Graph(id=f'line-plot-{i}'), ]) ) app.layout = html.Div(dropdowns_and_figures)
[docs] def ensemble_mean(matilda_scenarios, result_name, dict_name="model_output"): """ Compute the ensemble mean for a specific variable across all models in the scenarios. Parameters: matilda_scenarios (dict): Original scenario dictionary with nested model outputs. result_name (str): Name of the target variable to extract (e.g., 'total_runoff'). dict_name (str): Name of the dictionary in the model output to look for results (default: 'model_output'). Returns: dict: A dictionary with emission scenarios as keys and the ensemble mean time series (as pandas Series) as values. """ def get_matilda_result_all_models(scenario, result_name, dict_name="model_output"): df = pd.DataFrame() for key, value in matilda_scenarios[scenario].items(): s = value[dict_name][result_name] s.name = key df = pd.concat([df, s], axis=1) df.index = pd.to_datetime(df.index) df.index.name = "TIMESTAMP" print(f"{result_name} extracted for {scenario}") return df # Extract the result data for each scenario and compute the ensemble mean mean_results = {} for scenario in matilda_scenarios.keys(): # Get the DataFrame of all models for the given scenario and variable df = get_matilda_result_all_models(scenario, result_name, dict_name) # Compute the ensemble mean across models (columns) mean_results[scenario] = df.mean(axis=1) return mean_results
[docs] def plot_annual_cycles(matilda_scenarios, scenarios=("SSP2", "SSP5"), save_path=None): """ Creates a 2-column by 4-row plot of annual cycles for the given variables, comparing two scenarios side by side. Parameters: runoff (dict): Ensemble mean dictionary for runoff. snow_melt (dict): Ensemble mean dictionary for snowmelt. ice_melt (dict): Ensemble mean dictionary for ice melt. aet (dict): Ensemble mean dictionary for actual evaporation. scenarios (tuple): Tuple of two scenarios to compare (default: ("SSP2", "SSP5")) Returns: None: Displays the subplot figure. """ runoff = ensemble_mean(matilda_scenarios, "total_runoff") snow_melt = ensemble_mean(matilda_scenarios, "snow_melt_on_glaciers") ice_melt = ensemble_mean(matilda_scenarios, "ice_melt_on_glaciers") off_melt = ensemble_mean(matilda_scenarios, "melt_off_glaciers") aet = ensemble_mean(matilda_scenarios, "actual_evaporation") # Melt off glacier is always snow melt: snow_melt["SSP2"] = snow_melt["SSP2"] + off_melt["SSP2"] snow_melt["SSP5"] = snow_melt["SSP5"] + off_melt["SSP5"] variables = { "Total Runoff": ("YlGnBu", runoff), "Snowmelt": ("Blues", snow_melt), "Ice Melt": ("Blues", ice_melt), "AET": ("Oranges", aet), } short_month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] fig, axes = plt.subplots(4, 2, figsize=(8, 12), sharex=True) for row_idx, (var_name, (cmap, var_dict)) in enumerate(variables.items()): for col_idx, scenario in enumerate(scenarios): ax = axes[row_idx, col_idx] data = var_dict[scenario] # Prepare the data df = data.reset_index() df.columns = ["date", var_name] df["date"] = pd.to_datetime(df["date"]) df["year"] = df["date"].dt.year df["month"] = df["date"].dt.month # Group and pivot monthly_values = df.groupby(["year", "month"])[var_name].sum().reset_index() grid = monthly_values.pivot(index="month", columns="year", values=var_name) # Plot c = ax.pcolormesh(grid.index, grid.columns, grid.T, shading="nearest", cmap=cmap) # Colorbar cbar = fig.colorbar(c, ax=ax, ticks=[ round(grid.min().min() / 10) * 10, round(grid.max().max() / 10) * 10 ]) c.set_clim(round(grid.min().min() / 10) * 10, round(grid.max().max() / 10) * 10) cbar.ax.tick_params(labelsize=12) cbar.set_label("mm", labelpad=-13, rotation=90, fontsize=12) # Titles and labels if col_idx == 0: ax.set_ylabel(var_name, fontsize=16) if row_idx == 0: ax.set_title(scenario, fontsize=18) # Customizing ticks ax.set_xticks([1, 7, 12]) ax.set_xticklabels(["Jan", "Jul", "Dec"], fontsize=13) ax.set_yticks([grid.columns[0], grid.columns[len(grid.columns) // 2], grid.columns[-1]]) ax.set_yticklabels(["2000", "2050", "2100"], fontsize=13) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300) print(f"Figure saved to {save_path}") plt.show()
[docs] def plot_sensitivity_bars(*dfs, labels=None, show=False, bar_width=0.3, bar_gap=0.6): """ Plots a horizontal bar chart showing the total sensitivity index for each parameter in a MATILDA model. Parameters ---------- *dfs : pandas.DataFrame Multiple dataframes containing the sensitivity indices for each parameter. labels : list of str, optional Labels to use for the different steps in the sensitivity analysis. If not provided, the default labels will be 'No Limits', 'Step 1', 'Step 2', etc. bar_width : float, optional Width of each bar in the chart. bar_gap : float, optional Space between bars. """ traces = [] colors = ['darkblue', 'orange', 'purple', 'cyan'] # add more colors if needed for i, df in enumerate(dfs): df = get_si(df) if i > 0: if labels is None: label = 'Step ' + str(i) else: label = labels[i] else: label = 'No Limits' trace = go.Bar(y=df.index, x=df['ST'], name=label, orientation='h', marker=dict(color=colors[i]), width=bar_width) traces.append(trace) layout = go.Layout(title=dict(text='<b>' +'Total Sensitivity Index for MATILDA Parameters' + '<b>', font=dict(size=24)), xaxis_title='Total Sensitivity Index', yaxis_title='Parameter', yaxis=dict(automargin=True), bargap=bar_gap, height=700) fig = go.Figure(data=traces, layout=layout) if show: fig.show()