Source code for tools.geetools

import pandas as pd
import concurrent.futures
import os
import sys
import requests
from retry import retry
from tqdm import tqdm
import ee

[docs] def authenticate_and_initialize_ee(cloud_project): """ Robustly authenticates and initializes Earth Engine for local/notebook environments. It attempts to initialize Earth Engine: 1. Using existing credentials if available and valid for the project. 2. If a permission error occurs, it forces a new interactive browser-based authentication suitable for notebooks. Args: cloud_project (str): The Google Cloud Project ID to use for Earth Engine. Raises: RuntimeError: If Earth Engine initialization ultimately fails after retries. """ print(f"--- Attempting Earth Engine Setup for project: {cloud_project} ---") # --- First Attempt: Try to initialize with any existing, valid credentials --- try: print("1. Trying to initialize with existing credentials...") ee.Initialize(project=cloud_project) # Verify with a simple API call to ensure permissions are correct _ = ee.Image("CGIAR/SRTM90_V4").getInfo() print(f"✅ Earth Engine successfully initialized with existing credentials for project: {cloud_project}") return # Success, exit function except ee.EEException as e: msg = str(e) if "Caller does not have required permission" in msg: print(f"Initial attempt failed due to permission error: {e}") print("This likely means existing credentials are for the wrong account/project, or lack permissions.") print("Proceeding to force a new interactive authentication.") else: # Handle other EE exceptions (e.g., project not found, invalid API key etc.) print(f"Initial attempt failed with an unexpected Earth Engine error: {e}") print("Proceeding to force a new interactive authentication, as this might resolve it.") except Exception as e: print(f"Initial attempt failed with an unexpected general error: {e}") print("Proceeding to force a new interactive authentication, as this might resolve it.") # --- Second Attempt: Force a new interactive browser-based authentication --- print("\n--- Forcing New Earth Engine Authentication ---") print("A browser window should open (or instructions to copy/paste a URL).") print("Please select the Google Account that has access to your Earth Engine project.") print("-----------------------------------") try: # Clear in-memory credentials just in case ee.Reset() # Explicitly use 'notebook' auth_mode for Jupyter environments or 'paste' as a fallback try: print("Attempting authentication with auth_mode='notebook'...") ee.Authenticate(force=True, auth_mode='notebook') except Exception as notebook_auth_error: print(f"Auth_mode='notebook' failed or didn't prompt: {notebook_auth_error}") print("Falling back to auth_mode='paste' (you may need to copy/paste a URL).") ee.Authenticate(force=True, auth_mode='paste') # This will provide a URL if it can't open a browser print("\nAuthentication flow completed. Attempting re-initialization...") ee.Initialize(project=cloud_project) # Verify with a simple API call again _ = ee.Image("CGIAR/SRTM90_V4").getInfo() print(f"✅ Earth Engine successfully initialized with new credentials for project: {cloud_project}") return # Success, exit function except ee.EEException as e: msg = str(e) print(f"\n❌ Earth Engine setup FAILED even after forced authentication for project: {cloud_project}") print(f"Final Error: {e}") if "Caller does not have required permission" in msg: raise RuntimeError( f"Earth Engine permission error for project '{cloud_project}' after forced authentication. " "Please ensure: \n" " 1. Earth Engine is ENABLED for this project (check Google Cloud Console).\n" " 2. The Google Account you selected during authentication has the 'Earth Engine User' IAM role for this project.\n" " 3. You selected the CORRECT Google Account during the browser login." ) from e else: raise RuntimeError( f"Earth Engine initialization failed with an unexpected error after forced authentication for project '{cloud_project}'." ) from e except Exception as e: raise RuntimeError( f"An unexpected error occurred during Earth Engine setup after forced authentication: {e}" ) from e
[docs] class CMIPDownloader: """Class to download spatially averaged CMIP6 data for a given period, variable, and spatial subset.""" def __init__(self, var, starty, endy, shape, processes=10, dir='./'): self.var = var self.starty = starty self.endy = endy self.shape = shape self.processes = processes self.directory = dir # create the download directory if it doesn't exist if not os.path.exists(self.directory): os.makedirs(self.directory)
[docs] def download(self): """Runs a subset routine for CMIP6 data on GEE servers to create ee.FeatureCollections for all years in the requested period. Downloads individual years in parallel processes to increase the download time.""" print('Initiating download request for NEX-GDDP-CMIP6 data from ' + str(self.starty) + ' to ' + str(self.endy) + '.') def getRequests(starty, endy): """Generates a list of years to be downloaded. [Client side]""" return [i for i in range(starty, endy+1)] @retry(tries=10, delay=1, backoff=2) def getResult(index, year): """Handle the HTTP requests to download one year of CMIP6 data. [Server side]""" start = str(year) + '-01-01' end = str(year + 1) + '-01-01' startDate = ee.Date(start) endDate = ee.Date(end) n = endDate.difference(startDate, 'day').subtract(1) def getImageCollection(var): """Create and image collection of CMIP6 data for the requested variable, period, and region. [Server side]""" collection = ee.ImageCollection('NASA/GDDP-CMIP6') \ .select(var) \ .filterDate(startDate, endDate) \ .filterBounds(self.shape) \ .filter(ee.Filter.neq('model', 'NorESM2-LM')) # Exclude model (missing year 2096) return collection def renameBandName(b): """Edit variable names for better readability. [Server side]""" split = ee.String(b).split('_') return ee.String(split.splice(split.length().subtract(2), 1).join("_")) def buildFeature(i): """Create an area weighted average of the defined region for every day in the given year. [Server side]""" t1 = startDate.advance(i, 'day') t2 = t1.advance(1, 'day') # feature = ee.Feature(point) dailyColl = collection.filterDate(t1, t2) dailyImg = dailyColl.toBands() # renaming and handling names bands = dailyImg.bandNames() renamed = bands.map(renameBandName) # Daily extraction and adding time information dict = dailyImg.rename(renamed).reduceRegion( reducer=ee.Reducer.mean(), geometry=self.shape, ).combine( ee.Dictionary({'system:time_start': t1.millis(), 'isodate': t1.format('YYYY-MM-dd')}) ) return ee.Feature(None, dict) # Create features for all days in the respective year. [Server side] collection = getImageCollection(self.var) year_feature = ee.FeatureCollection(ee.List.sequence(0, n).map(buildFeature)) # Create a download URL for a CSV containing the feature collection. [Server side] url = year_feature.getDownloadURL() # Handle downloading the actual csv for one year. [Client side] r = requests.get(url, stream=True) if r.status_code != 200: r.raise_for_status() filename = os.path.join(self.directory, 'cmip6_' + self.var + '_' + str(year) + '.csv') with open(filename, 'w') as f: f.write(r.text) return index # Create a list of years to be downloaded. [Client side] items = getRequests(self.starty, self.endy) # Launch download requests in parallel processes and display a status bar. [Client side] with tqdm(total=len(items), desc="Downloading CMIP6 data for variable '" + self.var + "'") as pbar: results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.processes) as executor: for i, year in enumerate(items): results.append(executor.submit(getResult, i, year)) for future in concurrent.futures.as_completed(results): index = future.result() pbar.update(1) print("All downloads complete.")
[docs] class CMIPProcessor: """Class to read and pre-process CSV files downloaded by the CMIPDownloader class.""" def __init__(self, var, file_dir='.'): self.file_dir = file_dir self.var = var self.df_hist = self.append_df(self.var, self.file_dir, hist=True) self.df_ssp = self.append_df(self.var, self.file_dir, hist=False) self.ssp2_common, self.ssp5_common, self.hist_common,\ self.common_models, self.dropped_models = self.process_dataframes() self.ssp2, self.ssp5 = self.get_results()
[docs] def read_cmip(self, filename): """Reads CMIP6 CSV files and drops redundant columns.""" df = pd.read_csv(filename, index_col='isodate', parse_dates=['isodate']) df = df.drop(['system:index', '.geo', 'system:time_start'], axis=1) return df
[docs] def append_df(self, var, file_dir='.', hist=True): """Reads CMIP6 CSV files of individual years and concatenates them into dataframes for the full downloaded period. Historical and scenario datasets are treated separately. Converts precipitation unit to mm.""" df_list = [] if hist: starty = 1979 endy = 2014 else: starty = 2015 endy = 2100 for i in range(starty, endy + 1): filename = file_dir + 'cmip6_' + var + '_' + str(i) + '.csv' df_list.append(self.read_cmip(filename)) if hist: hist_df = pd.concat(df_list) if var == 'pr': hist_df = hist_df * 86400 # from kg/(m^2*s) to mm/day return hist_df else: ssp_df = pd.concat(df_list) if var == 'pr': ssp_df = ssp_df * 86400 # from kg/(m^2*s) to mm/day return ssp_df
[docs] def process_dataframes(self): """Separates the two scenarios and drops models not available for both scenarios and the historical period.""" ssp2 = self.df_ssp.loc[:, self.df_ssp.columns.str.startswith('ssp245')] ssp5 = self.df_ssp.loc[:, self.df_ssp.columns.str.startswith('ssp585')] hist = self.df_hist.loc[:, self.df_hist.columns.str.startswith('historical')] ssp2.columns = ssp2.columns.str.lstrip('ssp245_').str.rstrip('_' + self.var) ssp5.columns = ssp5.columns.str.lstrip('ssp585_').str.rstrip('_' + self.var) hist.columns = hist.columns.str.lstrip('historical_').str.rstrip('_' + self.var) # Get all the models the three datasets have in common common_models = set(ssp2.columns).intersection(ssp5.columns).intersection(hist.columns) # Get the model names that contain NaN values nan_models_list = [df.columns[df.isna().any()].tolist() for df in [ssp2, ssp5, hist]] # flatten the list nan_models = [col for sublist in nan_models_list for col in sublist] # remove duplicates nan_models = list(set(nan_models)) # Remove models with NaN values from the list of common models common_models = [x for x in common_models if x not in nan_models] ssp2_common = ssp2.loc[:, common_models] ssp5_common = ssp5.loc[:, common_models] hist_common = hist.loc[:, common_models] dropped_models = list(set([mod for mod in ssp2.columns if mod not in common_models] + [mod for mod in ssp5.columns if mod not in common_models] + [mod for mod in hist.columns if mod not in common_models])) return ssp2_common, ssp5_common, hist_common, common_models, dropped_models
[docs] def get_results(self): """Concatenates historical and scenario data to combined dataframes of the full downloaded period. Arranges the models in alphabetical order.""" ssp2_full = pd.concat([self.hist_common, self.ssp2_common]) ssp2_full.index.names = ['TIMESTAMP'] ssp5_full = pd.concat([self.hist_common, self.ssp5_common]) ssp5_full.index.names = ['TIMESTAMP'] ssp2_full = ssp2_full.reindex(sorted(ssp2_full.columns), axis=1) ssp5_full = ssp5_full.reindex(sorted(ssp5_full.columns), axis=1) return ssp2_full, ssp5_full