Source code for tools.indicators

from tools.helpers import hydrologicalize
import spotpy.hydrology.signatures as sig
from climate_indices.indices import spei, spi, Distribution
from climate_indices import compute, utils
import pandas as pd
import numpy as np
import inspect
from tqdm import tqdm


[docs] def prec_minmax(df): """ Compute the months of extreme precipitation for each year. Parameters ---------- df : pandas.DataFrame A DataFrame of daily precipitation data with a datetime index and a 'total_precipitation' column. Returns ------- pandas.DataFrame A DataFrame with the months of extreme precipitation as a number for every calendar year. """ # Use water years df = hydrologicalize(df) # group the data by year and month and sum the precipitation values grouped = df.groupby([df.water_year, df.index.month]).sum() # get the month with extreme precipitation for each year max_month = grouped.groupby(level=0)['total_precipitation'].idxmax() min_month = grouped.groupby(level=0)['total_precipitation'].idxmin() max_month = [p[1] for p in max_month] min_month = [p[1] for p in min_month] # create a new dataframe result = pd.DataFrame({'max_prec_month': max_month, 'min_prec_month': min_month}, index=pd.to_datetime(df.water_year.unique(), format='%Y')) return result
# Day of the Year with maximum flow
[docs] def peak_doy(df, smoothing_window_peakdoy=7): """ Compute the day of the calendar year with the peak value for each hydrological year. Parameters ---------- df : pandas.DataFrame A DataFrame of daily data with a datetime index. smoothing_window_peakdoy : int, optional The window size of the rolling mean used for smoothing the data. Default is 7. Returns ------- pandas.DataFrame A DataFrame with the day of the year with the peak value for each hydrological year. """ # Use water years df = hydrologicalize(df) # find peak day for each hydrological year peak_dates = [] for year in df.water_year.unique(): # slice data for hydrological year hy_data = df.loc[df.water_year == year, 'total_runoff'] # smooth data using rolling mean with window of 7 days smoothed_data = hy_data.rolling(smoothing_window_peakdoy, center=True).mean() # find day of peak value peak_day = smoothed_data.idxmax().strftime('%j') # append peak day to list peak_dates.append(peak_day) # create output dataframe with DatetimeIndex output_df = pd.DataFrame({'Hydrological Year': df.water_year.unique(), 'peak_day': pd.to_numeric(peak_dates)}) output_df.index = pd.to_datetime(output_df['Hydrological Year'], format='%Y') output_df = output_df.drop('Hydrological Year', axis=1) return output_df
# Melting season
[docs] def melting_season(df, smoothing_window_meltseas=14, min_weeks=10): """ Compute the start, end, and length of the melting season for each calendar year based on the daily the rolling mean of the temperature. Parameters ---------- df : pandas.DataFrame A DataFrame of daily mean temperature data with a datetime index. smoothing_window_meltseas : int, optional The size of the rolling window in days used to smooth the temperature data. Default is 14. min_weeks : int, optional The minimum number of weeks that the melting season must last for it to be considered valid. Default is 10. Returns ------- pandas.DataFrame A DataFrame with the start, end, and length of the melting season for each calendar year, with a DatetimeIndex. """ # Compute rolling mean of temperature data rolling_mean = df['avg_temp_catchment'].rolling(window=smoothing_window_meltseas).mean() # Find start of melting season for each year (first day above 0°C) start_mask = rolling_mean > 0 start_mask = start_mask.groupby(df.index.year).apply(lambda x: x.index[np.nanargmax(x)]) start_dates = start_mask - pd.Timedelta(days=smoothing_window_meltseas - 1) # rolling() selects last day, we want the first # Add minimum length of melting season to start dates earliest_end_dates = start_dates + pd.Timedelta(weeks=min_weeks) # group rolling_mean by year and apply boolean indexing to replace values before start_dates with 999 in every year rolling_mean = rolling_mean.groupby(rolling_mean.index.year).\ apply(lambda x: np.where(x.index < earliest_end_dates.loc[x.index.year], 999, x)) # Transform the resulting array back to a time series with DatetimeIndex rolling_mean = pd.Series(rolling_mean.values.flatten()) rolling_mean = rolling_mean.explode() rolling_mean = pd.DataFrame({'rolling_mean': rolling_mean}).set_index(df.index) # Filter values below 0 (including 999!) end_mask = rolling_mean < 0 # Find end of melting season end_mask = end_mask.groupby(df.index.year).apply(lambda x: x.index[np.nanargmax(x)]) end_dates = end_mask - pd.Timedelta(days=smoothing_window_meltseas - 1) # Compute length of melting season for each year lengths = (end_dates - start_dates).dt.days # Assemble output dataframe output_df = pd.DataFrame({'melt_season_start': [d.timetuple().tm_yday for d in start_dates], 'melt_season_end': [d.timetuple().tm_yday for d in end_dates]}, index=pd.to_datetime(df.index.year.unique(), format='%Y')) output_df['melt_season_length'] = output_df.melt_season_end - output_df.melt_season_start return output_df
# Aridity
[docs] def aridity(df, hist_starty=1986, hist_endy=2015): """ Calculates aridity indexes from precipitation, and potential and actual evaporation respectively. Aridity is defined as the mean annual ratio of potential/actual evapotranspiration and precipitation. The indexes are defined as the relative change of a 30-year period compared to a given historical period. Uses hydrological years (Oct–Sep). Inspired by climateinformation.org (https://doi.org/10.5194/egusphere-egu23-16216). Parameters ---------- df : pandas.DataFrame Input DataFrame containing columns 'evap_off_glaciers', 'actual_evaporation', and 'prec_off_glaciers'. hist_starty : int, optional Start year of the historical period in YYYY format. Default is 1986. hist_endy : int, optional End year of the historical period in YYYY format. Default is 2015. Returns ------- pandas.DataFrame DataFrame containing the relative change in aridity over time. Columns: - 'actual_aridity': Relative change in actual aridity. - 'potential_aridity': Relative change in potential aridity. """ # Use water years df = hydrologicalize(df) # Potential evapotranspiration (PET) pet = df['evap_off_glaciers'] # Actual evapotranspiration (AET) aet = df['actual_evaporation'] # Precipitation prec = df['prec_off_glaciers'] # Calculate the potential aridity as ratio of AET/PET to precipitation aridity_pot = pet.groupby(df['water_year']).sum() / prec.groupby(df['water_year']).sum() aridity_act = aet.groupby(df['water_year']).sum() / prec.groupby(df['water_year']).sum() # Filter historical period hist_pot = aridity_pot[(aridity_pot.index >= hist_starty) & (aridity_pot.index <= hist_endy)].mean() hist_act = aridity_act[(aridity_act.index >= hist_starty) & (aridity_act.index <= hist_endy)].mean() # Calculate rolling mean with a 30y period aridity_pot_rolling = aridity_pot.rolling(window=30, min_periods=1).mean() aridity_act_rolling = aridity_act.rolling(window=30, min_periods=1).mean() # Calculate the relative change in the aridity indexes pot_rel = 100 * (aridity_pot_rolling - hist_pot) / hist_pot act_rel = 100 * (aridity_act_rolling - hist_act) / hist_act # Concat in one dataframe aridity = pd.DataFrame({'actual_aridity': act_rel, 'potential_aridity': pot_rel}) aridity.set_index(pd.to_datetime(df.water_year.unique(), format='%Y'), inplace=True) aridity = aridity.dropna() return aridity
# Dry spells
[docs] def dry_spells(df, dry_spell_length=5): """ Compute the total length of dry spells in days per year. A dry spell is defined as a period for which the rolling mean of evaporation in a given window exceeds precipitation. Uses hydrological years (Oct - Sep). Parameters ---------- df : pandas.DataFrame Input DataFrame containing columns 'evap_off_glaciers' and 'prec_off_glaciers' with daily evaporation and precipitation data, respectively. dry_spell_length : int, optional Length of the rolling window in days. Default is 30. Returns ------- pandas.DataFrame DataFrame containing the number of days for which the rolling mean of evaporation exceeds precipitation for each year in the input DataFrame. """ # Use hydrological years df = hydrologicalize(df) # Find number of days when the rolling mean of evaporation exceeds precipitation periods = [] for year in df.water_year.unique(): year_data = df.loc[df.water_year == year] evap_roll = year_data['evap_off_glaciers'].rolling(window=dry_spell_length).mean() prec_roll = year_data['prec_off_glaciers'].rolling(window=dry_spell_length).mean() dry = evap_roll[evap_roll - prec_roll > 0] periods.append(len(dry)) # Assemble the output dataframe output_df = pd.DataFrame( {'dry_spell_days': periods}, index=pd.to_datetime(df.water_year.unique(), format='%Y')) return output_df
# Hydrological signatures
[docs] def get_qhf(data, global_median, measurements_per_day=1): """ Variation of spotpy.hydrology.signatures.get_qhf() that allows definition of a global median to investigate long-term trends. Calculates the frequency of high flow events defined as :math:`Q > 9 \\cdot Q_{50}` cf. [CLBGS2000]_, [WESMCM2015]_. The frequency is given as :math: :math:`yr^{-1}` :param data: the timeseries :param measurements_per_day: the measurements_per_day of the timeseries :return: Q_{HF}, Q_{HD} """ def highflow(value, median): return value > 9 * median fq, md = sig.flow_event(data, highflow, global_median) return fq * measurements_per_day * 365, md / measurements_per_day
[docs] def get_qlf(data, global_mean, measurements_per_day=1): """ Variation of spotpy.hydrology.signatures.get_qlf() that allows comparison of individual years with a global mean to investigate long-term trends. Calculates the frequency of low flow events defined as :math:`Q < 0.2 \\cdot \\overline{Q_{mean}}` cf. [CLBGS2000]_, [WESMCM2015]_. The frequency is given in :math:`yr^{-1}` and for the whole timeseries :param data: the timeseries :param measurements_per_day: the measurements_per_day of the timeseries :return: Q_{LF}, Q_{LD} """ def lowflow(value, mean): return value < 0.2 * mean fq, md = sig.flow_event(data, lowflow, global_mean) return fq * measurements_per_day * 365, md / measurements_per_day
[docs] def hydrological_signatures(df): """ Calculate hydrological signatures for a given input dataframe. Parameters ---------- df : pandas.DataFrame Input DataFrame containing a column 'total_runoff' and a DatetimeIndex. Returns ------- pandas.DataFrame A DataFrame containing the calculated hydrological signatures for each year in the input dataframe. The columns of the output dataframe are as follows: - 'q5': the 5th percentile of total runoff for each year - 'q50': the 50th percentile of total runoff for each year - 'q95': the 95th percentile of total runoff for each year - 'qlf_freq': the frequency of low flow events (Q < 2 × Qmean_global) for each year, in yr⁻¹ - 'qlf_dur': the mean duration of low flow events (Q < 2 × Qmean_global) for each year, in days - 'qhf_freq': the frequency of high flow events (Q > 9 × Q50_global) for each year, in yr⁻¹ - 'qhf_dur': the mean duration of high flow events (Q > 9 × Q50_global) for each year, in days """ # Create lists of quantile functions to apply and column names functions = [sig.get_q5, sig.get_q50, sig.get_q95] cols = ['q5', 'q50', 'q95'] # Create an empty dataframe to store the results results_df = pd.DataFrame() # Use water_year df = hydrologicalize(df) # Loop through each year in the input dataframe for year in df.water_year.unique(): # Select the data for the current year year_data = df[df.water_year == year].total_runoff # Apply each quantile function to the year data and store the results in a dictionary year_results = {} for i, func in enumerate(functions): year_results[cols[i]] = func(year_data) # Calculate frequency and duration of global low flows qlf_freq, qlf_dur = get_qlf(year_data, np.mean(df.total_runoff)) year_results['qlf_freq'] = qlf_freq year_results['qlf_dur'] = qlf_dur # Calculate frequency and duration of global high flows qhf_freq, qhf_dur = get_qhf(year_data, np.median(df.total_runoff)) year_results['qhf_freq'] = qhf_freq year_results['qhf_dur'] = qhf_dur # Convert the dictionary to a dataframe and append it to the results dataframe year_results_df = pd.DataFrame(year_results, index=[year]) results_df = pd.concat([results_df, year_results_df]) results_df.set_index(pd.to_datetime(df.water_year.unique(), format='%Y'), inplace=True) return results_df
# Drought indicators
[docs] def drought_indicators(df, freq='ME', dist='gamma'): """ Calculate the climatic water balance, SPI (Standardized Precipitation Index), and SPEI (Standardized Precipitation Evapotranspiration Index) for 1, 3, 6, 12, and 24 months. Parameters ---------- df : pandas.DataFrame Input DataFrame containing columns 'prec_off_glaciers' and 'evap_off_glaciers'. freq : str, optional Resampling frequency for precipitation and evaporation data. Default is 'M' for monthly. dist : str, optional Distribution for SPI and SPEI calculation. Either Pearson-Type III ('pearson') or Gamma distribution ('gamma'). Default is 'gamma'. Returns ------- pandas.DataFrame DataFrame containing the calculated indicators: 'clim_water_balance', 'spi', and 'spei'. Index is based on the resampled frequency of the input DataFrame. Raises ------ ValueError If 'freq' is not 'D' or 'ME'. If 'dist' is not 'pearson' or 'gamma'. Notes ----- SPI (Standardized Precipitation Index) and SPEI (Standardized Precipitation Evapotranspiration Index) are drought indicators that are used to quantify drought severity and duration. 'clim_water_balance' is the difference between total precipitation and total evapotranspiration. If 'freq' is 'D', the input data is transformed from Gregorian to a 366-day format for SPI and SPEI calculation, and then transformed back to Gregorian format for output. The default distribution for SPI and SPEI calculation is Gamma. The calibration period for SPI and SPEI calculation is from 1981 to 2020. """ # Check if frequency is valid if freq != 'D' and freq != 'ME': raise ValueError("Invalid value for 'freq'. Choose either 'D' or 'ME'.") # Resample precipitation and evaporation data based on frequency prec = df.prec_off_glaciers.resample(freq).sum().values evap = df.evap_off_glaciers.resample(freq).sum().values # Calculate water balance water_balance = prec - evap # If frequency is daily, transform data to 366-day format if freq == 'D': prec = utils.transform_to_366day(prec, year_start=df.index.year[0], total_years=len(df.index.year.unique())) evap = utils.transform_to_366day(evap, year_start=df.index.year[0], total_years=len(df.index.year.unique())) # Set distribution based on input if dist == 'pearson': distribution = Distribution.pearson elif dist == 'gamma': distribution = Distribution.gamma else: raise ValueError("Invalid value for 'dist'. Choose either 'pearson' or 'gamma'.") # Set periodicity based on frequency if freq == 'D': periodicity = compute.Periodicity.daily elif freq == 'ME': periodicity = compute.Periodicity.monthly # Set common parameters common_params = {'distribution': distribution, 'periodicity': periodicity, 'data_start_year': 1981, 'calibration_year_initial': 1981, 'calibration_year_final': 2020} # Set parameters for SPEI calculation spei_params = {'precips_mm': prec, 'pet_mm': evap, **common_params} # Set parameters for SPI calculation spi_params = {'values': prec, **common_params} # Calculate SPI and SPEI for various periods drought_df = pd.DataFrame() for s in [1, 3, 6, 12, 24]: spi_arr = spi(**spi_params, scale=s) spei_arr = spei(**spei_params, scale=s) # If frequency is daily, transform data back to Gregorian format if freq == 'D': spi_arr = utils.transform_to_gregorian(spi_arr, df.index.year[0]) spei_arr = utils.transform_to_gregorian(spei_arr, df.index.year[0]) drought_df['spi' + str(s)] = spi_arr drought_df['spei' + str(s)] = spei_arr drought_df.set_index(df.resample(freq).mean().index, inplace=True) # DataFrame resample Dataframe out_df = pd.DataFrame({'clim_water_balance': water_balance}, index=df.resample(freq).sum().index) out_df = pd.concat([out_df.resample('YS').sum(), drought_df.resample('YS').mean()], axis=1).rename(lambda x: x.replace(day=1)) return out_df
# Wrapper function
[docs] def cc_indicators(df, **kwargs): """ Apply a list of climate change indicator functions to the output DataFrame of MATILDA and concatenate the output columns into a single DataFrame. Parameters ---------- df : pandas.DataFrame Input DataFrame. **kwargs : optional Optional arguments to be passed to the functions in the list. Possible arguments are 'smoothing_window_peakdoy', 'smoothing_window_meltseas', 'min_weeks', and 'dry_spell_length'. Returns ------- pandas.DataFrame DataFrame containing the output columns of all functions applied to the input DataFrame. Notes ----- The list of functions to apply is hard-coded into the function and cannot be modified from outside. The optional arguments are passed to the respective functions only if they are relevant. If no optional arguments are passed, the function is applied to the input DataFrame with default arguments. """ # List of all functions to apply functions = [prec_minmax, peak_doy, melting_season, aridity, dry_spells, hydrological_signatures, drought_indicators] # Empty result dataframe indicator_df = pd.DataFrame() # Loop through all functions for func in functions: func_kwargs = {} # Apply only those optional kwargs relevant for the respective function for kwarg in kwargs: if kwarg in inspect.getfullargspec(func)[0]: func_kwargs.update({kwarg: kwargs.get(kwarg)}) result = func(df, **func_kwargs) # Concat all output columns in one dataframe indicator_df = pd.concat([indicator_df, result], axis=1) return indicator_df
indicator_vars = { 'max_prec_month': ('Month with Maximum Precipitation', '-'), 'min_prec_month': ('Month with Minimum Precipitation', '-'), 'peak_day': ('Timing of Peak Runoff', 'DoY'), 'melt_season_start': ('Beginning of Melting Season', 'DoY'), 'melt_season_end': ('End of Melting Season', 'DoY'), 'melt_season_length': ('Length of Melting Season', 'd'), 'actual_aridity': ('Relative Change of Actual Aridity', '%'), 'potential_aridity': ('Relative Change of Potential Aridity', '%'), 'dry_spell_days': ('Total Length of Dry Spells', 'd/a'), 'qlf_freq': ('Frequency of Low-flow events', 'yr^-1'), 'qlf_dur': ('Mean Duration of Low-flow events', 'd'), 'qhf_freq': ('Frequency of High-flow events', 'yr^-1'), 'qhf_dur': ('Mean Duration of High-flow events', 'd'), 'clim_water_balance': ('Climatic Water Balance', 'mm w.e.'), 'spi1': ('Standardized Precipitation Index (1 month)', '-'), 'spei1': ('Standardized Precipitation Evaporation Index (1 month)', '-'), 'spi3': ('Standardized Precipitation Index (3 months)', '-'), 'spei3': ('Standardized Precipitation Evaporation Index (3 months)', '-'), 'spi6': ('Standardized Precipitation Index (6 months)', '-'), 'spei6': ('Standardized Precipitation Evaporation Index (6 months)', '-'), 'spi12': ('Standardized Precipitation Index (12 months)', '-'), 'spei12': ('Standardized Precipitation Evaporation Index (12 months)', '-'), 'spi24': ('Standardized Precipitation Index (24 months)', '-'), 'spei24': ('Standardized Precipitation Evaporation Index (24 months)', '-') }
[docs] def calculate_indicators(dic, **kwargs): """ Calculate climate change indicators for all scenarios and models. Parameters ---------- dic : dict Dictionary containing MATILDA outputs for all scenarios and models. **kwargs : optional Optional keyword arguments to be passed to the cc_indicators() function. Returns ------- dict Dictionary with the same structure as the input but containing climate change indicators in annual resolution. """ # Create an empty dictionary to store the outputs out_dict = {} # Loop over the scenarios with progress bar for scenario in dic.keys(): model_dict = {} # Create an empty dictionary to store the model outputs # Loop over the models with progress bar for model in tqdm(dic[scenario].keys(), desc=scenario): # Get the dataframe for the current scenario and model df = dic[scenario][model]['model_output'] # Run the indicator function indicators = cc_indicators(df, **kwargs) # Store indicator time series in the model dictionary model_dict[model] = indicators # Store the model dictionary in the scenario dictionary out_dict[scenario] = model_dict return out_dict
[docs] def custom_df_indicators(dic, scenario, var): """ Takes a dictionary of climate change indicators and returns a combined dataframe of a specific variable for a given scenario. Parameters ---------- dic : dict Dictionary containing the outputs of calculate_indicators() for different scenarios and models. scenario : str Name of the selected scenario. var : str Name of the variable to extract from the DataFrame. Returns ------- pandas.DataFrame A DataFrame containing the selected variable from different models within the specified scenario. Raises ------ ValueError If the provided variable is not one of the function outputs. """ out_cols = ['max_prec_month', 'min_prec_month', 'peak_day', 'melt_season_start', 'melt_season_end', 'melt_season_length', 'actual_aridity', 'potential_aridity', 'dry_spell_days', 'qlf_freq', 'qlf_dur', 'qhf_freq', 'qhf_dur', 'clim_water_balance', 'spi1', 'spei1', 'spi3', 'spei3', 'spi6', 'spei6', 'spi12', 'spei12', 'spi24', 'spei24'] if var not in out_cols: raise ValueError("var needs to be one of the following strings: " + str([i for i in out_cols])) # Create an empty list to store the dataframes dfs = [] # Loop over the models in the selected scenario for model in dic[scenario].keys(): # Get the dataframe for the current model df = dic[scenario][model] # Append the dataframe to the list of dataframes dfs.append(df[var]) # Concatenate the dataframes into a single dataframe combined_df = pd.concat(dfs, axis=1) # Set the column names of the combined dataframe to the model names combined_df.columns = dic[scenario].keys() return combined_df