Source code for tools.geetools

from configparser import ConfigParser
from pathlib import Path
import pandas as pd
import concurrent.futures
import os
import sys
import requests
from retry import retry
from tqdm import tqdm
import ee
import json
import geopandas as gpd
import matplotlib.pyplot as plt
import io
import time
import threading
import getpass

from configparser import ConfigParser
from pathlib import Path


[docs] def delineate_catchment_mghydro( lat, lon, watershed_output_path, rivers_output_path=None, precision="high", plot=False, timeout=120, fallback_to_local=False, fallback_dem_asset_id="MERIT/DEM/v1_0_3", fallback_dem_filename="dem_gee.tif", fallback_buffer_m=40000, fallback_bbox=None, dem_config_path=None, ): """ Delineate a catchment and fetch upstream rivers from the MG Hydro / Global Watersheds API. This helper uses the public API of the Global Watersheds web app: https://mghydro.com/watersheds/ API/help documentation: https://mghydro.com/watersheds/help.html Python demo notebook by the author: https://mghydro.com/demo-use-the-global-watersheds-api-and-python-to-automatically-delineate-watersheds/ Related GitHub materials: https://gist.github.com/mheberger/c05f10de225fbee8f572c5dfbb38d0b5 https://github.com/mheberger/delineator Author / maintainer: Matthew Heberger (mheberger) https://github.com/mheberger https://mghydro.com/author/mheberger/ Notes ----- The Global Watersheds app and API are based on global hydrographic datasets including MERIT-Hydro / MERIT-Basins and provide fast watershed delineation and upstream river extraction. The service is very useful for rapid catchment screening, but results should always be checked visually before further use in analysis workflows. Parameters ---------- lat : float Latitude of the outlet / pour point. lon : float Longitude of the outlet / pour point. watershed_output_path : str or Path Output path for the watershed GeoJSON. rivers_output_path : str or Path, optional Output path for the upstream rivers GeoJSON. If None, a filename based on watershed_output_path is created. precision : str, optional Delineation precision, either 'low' or 'high'. Default is 'high'. For very large basins, the service may automatically fall back to lower precision. plot : bool, optional If True, create a quick visual check plot with catchment boundary, river network, and outlet point. timeout : int, optional Timeout in seconds for each API request. fallback_to_local : bool, optional If True, fall back to local pysheds delineation when the MG Hydro request fails. Default is False. dem_asset_id : str, optional Earth Engine asset ID of the DEM used for local fallback delineation. dem_filename : str or Path, optional DEM filename used for local fallback delineation. fallback_buffer_m : int or float, optional Default buffer radius in meters for local fallback delineation. fallback_bbox : list or tuple, optional Optional custom bounding box [xmin, ymin, xmax, ymax] for local fallback delineation. If None, the user is prompted for a bbox. dem_config_path : str or Path, optional Optional path to webservices.ini for local DEM download. Returns ------- tuple (watershed_gdf, rivers_gdf) Raises ------ ValueError If precision is invalid. RuntimeError If the API request fails or returns invalid data. """ from pathlib import Path import json import requests import geopandas as gpd import matplotlib.pyplot as plt precision = str(precision).strip().lower() if precision not in {"low", "high"}: raise ValueError("precision must be 'low' or 'high'") watershed_output_path = Path(watershed_output_path) watershed_output_path.parent.mkdir(parents=True, exist_ok=True) if rivers_output_path is None: rivers_output_path = watershed_output_path.with_name( watershed_output_path.stem + "_rivers.geojson" ) else: rivers_output_path = Path(rivers_output_path) rivers_output_path.parent.mkdir(parents=True, exist_ok=True) base_url = "https://mghydro.com/app" params = { "lat": float(lat), "lng": float(lon), "precision": precision, } endpoints = { "watershed": f"{base_url}/watershed_api", "rivers": f"{base_url}/upstream_rivers_api", } def _fetch_geojson(url, label): try: response = requests.get(url, params=params, timeout=timeout) except requests.RequestException as e: raise RuntimeError(f"Failed to reach MG Hydro {label} API: {e}") from e if response.status_code == 400: raise RuntimeError( f"MG Hydro {label} request was rejected (400 Bad Request). " f"Check coordinates and parameters. Response: {response.text}" ) elif response.status_code == 404: raise RuntimeError( f"MG Hydro {label} could not create a result (404 Not Found). " f"This can happen for unsuitable outlet points, e.g. over the ocean. " f"Response: {response.text}" ) elif response.status_code == 500: raise RuntimeError( f"MG Hydro {label} returned 500 Internal Server Error. " f"Response: {response.text}" ) elif response.status_code != 200: raise RuntimeError( f"MG Hydro {label} request failed with status {response.status_code}. " f"Response: {response.text}" ) content_type = response.headers.get("Content-Type", "") if "json" not in content_type and "geojson" not in content_type and response.text[:1] not in "{[": raise RuntimeError( f"MG Hydro {label} returned unexpected content type '{content_type}'." ) try: data = response.json() except json.JSONDecodeError as e: raise RuntimeError(f"MG Hydro {label} returned invalid JSON") from e if not isinstance(data, dict) or data.get("type") != "FeatureCollection": raise RuntimeError( f"MG Hydro {label} did not return a GeoJSON FeatureCollection." ) return data try: watershed_data = _fetch_geojson(endpoints["watershed"], "watershed") rivers_data = _fetch_geojson(endpoints["rivers"], "rivers") watershed_output_path.write_text(json.dumps(watershed_data), encoding="utf-8") rivers_output_path.write_text(json.dumps(rivers_data), encoding="utf-8") watershed_gdf = gpd.read_file(watershed_output_path) rivers_gdf = gpd.read_file(rivers_output_path) if watershed_gdf.empty: raise RuntimeError("MG Hydro watershed result is empty.") if rivers_gdf.empty: print("Warning: MG Hydro returned an empty upstream rivers layer.") except Exception as e: if not fallback_to_local: raise if fallback_dem_asset_id is None or fallback_dem_filename is None: raise RuntimeError( "MG Hydro delineation failed and local fallback is not fully configured. " "Please provide fallback_dem_asset_id and dem_filename." ) from e print("MG Hydro delineation failed.") print(f"Reason: {e}") print("Falling back to local delineation with pysheds...") bbox_to_use = fallback_bbox if bbox_to_use is None: print( "Please provide a custom DEM bounding box in geographic coordinates:\n" "bbox = [xmin, ymin, xmax, ymax]" ) bbox_str = input("Enter bbox as xmin,ymin,xmax,ymax: ").strip() try: bbox_to_use = [float(x) for x in bbox_str.split(",")] except Exception as parse_err: raise RuntimeError( "Could not parse bbox input. Expected format: xmin,ymin,xmax,ymax" ) from parse_err if len(bbox_to_use) != 4: raise RuntimeError( "Invalid bbox input. Expected exactly 4 values: xmin,ymin,xmax,ymax" ) xmin, ymin, xmax, ymax = bbox_to_use if not (xmin < xmax and ymin < ymax): raise RuntimeError( "Invalid bbox input. Expected xmin < xmax and ymin < ymax." ) watershed_gdf, info = delineate_catchment_local( lat=lat, lon=lon, dem_filename=fallback_dem_filename, catchment_filename=watershed_output_path, asset_id=fallback_dem_asset_id, buffer_m=fallback_buffer_m, bbox=bbox_to_use, config_path=dem_config_path, ) rivers_gdf = gpd.GeoDataFrame(geometry=[], crs=watershed_gdf.crs) if rivers_output_path is not None: Path(rivers_output_path).parent.mkdir(parents=True, exist_ok=True) rivers_gdf.to_file(rivers_output_path, driver="GeoJSON") if plot: fig, ax = plt.subplots(figsize=(8, 8)) if not rivers_gdf.empty: rivers_gdf.plot(ax=ax, linewidth=0.8, label="Rivers") watershed_gdf.boundary.plot(ax=ax, linewidth=1.5, label="Catchment boundary") ax.scatter(lon, lat, marker="o", s=40, label="Outlet point") ax.set_title(f"Catchment delineation (MG Hydro, precision={precision})") ax.set_xlabel("Longitude") ax.set_ylabel("Latitude") ax.set_aspect("equal") ax.legend() plt.show() return watershed_gdf, rivers_gdf
def _prompt_for_api_key(prompt_text="API key not found in config or environment. Please paste your MATILDA-Online API key: "): """Prompt interactively for an API key without echoing it to the screen.""" try: return getpass.getpass(prompt_text).strip() except Exception: return input(prompt_text).strip()
[docs] def resolve_api_key(config=None, env_var="MATILDA_API_KEY", prompt_if_missing=True): """ Resolve the MATILDA webservice API key. Priority -------- 1. MATILDA_API_KEY environment variable 2. PUBLIC_API_KEY from webservices.ini 3. Interactive prompt (optional) """ env_key = os.environ.get(env_var, "").strip() if env_key: return env_key if config is not None: cfg_key = str(config.get("PUBLIC_API_KEY", "")).strip() if cfg_key: os.environ[env_var] = cfg_key return cfg_key if not prompt_if_missing: raise ValueError( "No MATILDA webservice API key found. Provide MATILDA_API_KEY as an " "environment variable, add PUBLIC_API_KEY to webservices.ini, or enter it " "interactively." ) api_key = _prompt_for_api_key() if not api_key: raise ValueError("No API key provided.") os.environ[env_var] = api_key return api_key
[docs] def load_webservice_config(config_path=None, section="GOOGLE", prompt_for_api_key=True): """ Load settings from webservices.ini. Parameters ---------- config_path : str or Path, optional Path to webservices.ini. If None, defaults to repo_root/webservices.ini assuming this file lives in repo_root. section : str, optional prompt_for_api_key : bool, optional If True and section is [GOOGLE], prompt interactively for an API key when neither MATILDA_API_KEY nor PUBLIC_API_KEY is available. Returns ------- dict Dictionary with keys and values from the requested section. Raises ------ FileNotFoundError If the config file does not exist. KeyError If the requested section is missing. ValueError If required keys for the section are missing. """ if config_path is None: config_path = Path(__file__).resolve().parent.parent / "webservices.ini" else: config_path = Path(config_path).resolve() if not config_path.exists(): raise FileNotFoundError(f"Webservice config file not found: {config_path}") parser = ConfigParser() parser.optionxform = str parser.read(config_path) if section not in parser: raise KeyError(f"Section [{section}] not found in {config_path}") config = dict(parser[section]) required_keys_by_section = { "GOOGLE": ["PUBLIC_CLOUD_PROJECT", "BASE_URL"], "HU": ["MEDIA_API_URL", "MEDIA_PRIVATE_KEY", "MEDIA_USER"], } required_keys = required_keys_by_section.get(section, []) missing = [key for key in required_keys if key not in config or not config[key].strip()] if missing: raise ValueError( f"Missing required keys in [{section}] of {config_path}: {missing}" ) if section == "GOOGLE": config["TIMEOUT"] = int(config.get("TIMEOUT", 120)) config["PUBLIC_API_KEY"] = resolve_api_key( config=config, prompt_if_missing=prompt_for_api_key, ) return config
[docs] def authenticate_and_initialize_ee(cloud_project): """ Robustly authenticates and initializes Earth Engine for local/notebook environments. It attempts to initialize Earth Engine: 1. Using existing credentials if available and valid for the project. 2. If a permission error occurs, it forces a new interactive browser-based authentication suitable for notebooks. Args: cloud_project (str): The Google Cloud Project ID to use for Earth Engine. Raises: RuntimeError: If Earth Engine initialization ultimately fails after retries. """ print(f"--- Attempting Earth Engine Setup for project: {cloud_project} ---") # --- First Attempt: Try to initialize with any existing, valid credentials --- try: print("1. Trying to initialize with existing credentials...") ee.Initialize(project=cloud_project) # Verify with a simple API call to ensure permissions are correct _ = ee.Image("CGIAR/SRTM90_V4").getInfo() print(f"✅ Earth Engine successfully initialized with existing credentials for project: {cloud_project}") return # Success, exit function except ee.EEException as e: msg = str(e) if "Caller does not have required permission" in msg: print(f"Initial attempt failed due to permission error: {e}") print("This likely means existing credentials are for the wrong account/project, or lack permissions.") print("Proceeding to force a new interactive authentication.") else: # Handle other EE exceptions (e.g., project not found, invalid API key etc.) print(f"Initial attempt failed with an unexpected Earth Engine error: {e}") print("Proceeding to force a new interactive authentication, as this might resolve it.") except Exception as e: print(f"Initial attempt failed with an unexpected general error: {e}") print("Proceeding to force a new interactive authentication, as this might resolve it.") # --- Second Attempt: Force a new interactive browser-based authentication --- print("\n--- Forcing New Earth Engine Authentication ---") print("A browser window should open (or instructions to copy/paste a URL).") print("Please select the Google Account that has access to your Earth Engine project.") print("-----------------------------------") try: # Clear in-memory credentials just in case ee.Reset() # Explicitly use 'notebook' auth_mode for Jupyter environments or 'paste' as a fallback try: print("Attempting authentication with auth_mode='notebook'...") ee.Authenticate(force=True, auth_mode='notebook') except Exception as notebook_auth_error: print(f"Auth_mode='notebook' failed or didn't prompt: {notebook_auth_error}") print("Falling back to auth_mode='paste' (you may need to copy/paste a URL).") ee.Authenticate(force=True, auth_mode='paste') # This will provide a URL if it can't open a browser print("\nAuthentication flow completed. Attempting re-initialization...") ee.Initialize(project=cloud_project) # Verify with a simple API call again _ = ee.Image("CGIAR/SRTM90_V4").getInfo() print(f"✅ Earth Engine successfully initialized with new credentials for project: {cloud_project}") return # Success, exit function except ee.EEException as e: msg = str(e) print(f"\n❌ Earth Engine setup FAILED even after forced authentication for project: {cloud_project}") print(f"Final Error: {e}") if "Caller does not have required permission" in msg: raise RuntimeError( f"Earth Engine permission error for project '{cloud_project}' after forced authentication. " "Please ensure: \n" " 1. Earth Engine is ENABLED for this project (check Google Cloud Console).\n" " 2. The Google Account you selected during authentication has the 'Earth Engine User' IAM role for this project.\n" " 3. You selected the CORRECT Google Account during the browser login." ) from e else: raise RuntimeError( f"Earth Engine initialization failed with an unexpected error after forced authentication for project '{cloud_project}'." ) from e except Exception as e: raise RuntimeError( f"An unexpected error occurred during Earth Engine setup after forced authentication: {e}" ) from e
[docs] def download_dem_webservice( output_path, asset_id, lat=None, lon=None, buffer_m=None, geometry=None, config_path=None, ): """ Download a DEM GeoTIFF from the MATILDA webservice and save it locally. Parameters ---------- output_path : str or Path Local output path for the downloaded GeoTIFF. asset_id : str Earth Engine asset ID of the DEM. lat : float, optional Latitude of the pour point. Must be used together with lon. lon : float, optional Longitude of the pour point. Must be used together with lat. buffer_m : int or float, optional Buffer radius in meters used to create the request box when using lat/lon input. geometry : dict or geopandas.GeoDataFrame or shapely geometry, optional Polygon geometry to send directly to the webservice. This is the preferred option when a catchment outline is already available. config_path : str or Path, optional Optional path to webservices.ini. Returns ------- Path Path to the saved GeoTIFF file. Raises ------ ValueError If neither a valid point+buffer nor a valid geometry is provided. RuntimeError If the webservice request fails or returns unexpected content. """ try: from IPython.display import clear_output in_notebook = True except Exception: in_notebook = False cfg = load_webservice_config(config_path=config_path, section="GOOGLE") service_url = f"{cfg['BASE_URL'].rstrip('/')}/download-dem" payload = { "asset_id": asset_id, } if geometry is not None: # GeoDataFrame -> first geometry if hasattr(geometry, "geometry"): if len(geometry) == 0: raise ValueError("Provided GeoDataFrame is empty.") geometry = geometry.geometry.iloc[0] # shapely geometry -> GeoJSON-like dict if hasattr(geometry, "__geo_interface__"): geometry = geometry.__geo_interface__ if not isinstance(geometry, dict): raise ValueError( "geometry must be a GeoJSON-like dict, a GeoDataFrame, or a shapely geometry." ) geom_type = geometry.get("type") coords = geometry.get("coordinates") if geom_type not in {"Polygon", "MultiPolygon"} or coords is None: raise ValueError( "geometry must be a Polygon or MultiPolygon with coordinates." ) payload["geometry"] = { "type": geom_type, "coordinates": coords, } else: if lat is None or lon is None or buffer_m is None: raise ValueError( "Provide either geometry, or lat/lon together with buffer_m." ) payload["lat"] = float(lat) payload["lon"] = float(lon) payload["buffer_m"] = float(buffer_m) headers = { "Content-Type": "application/json", "X-API-Key": cfg["PUBLIC_API_KEY"], } output_path = Path(output_path) output_path.parent.mkdir(parents=True, exist_ok=True) start_time = time.time() try: response = requests.post( service_url, json=payload, headers=headers, timeout=cfg["TIMEOUT"], stream=True, ) except requests.RequestException as e: raise RuntimeError(f"Failed to reach DEM webservice: {e}") from e if not response.ok: try: detail = response.json() except Exception: detail = response.text raise RuntimeError( f"DEM webservice request failed with status {response.status_code}: {detail}" ) content_type = response.headers.get("Content-Type", "") if "image/tiff" not in content_type: try: detail = response.json() except Exception: detail = response.text raise RuntimeError( f"DEM webservice returned unexpected content type '{content_type}': {detail}" ) total_size = int(response.headers.get("Content-Length", 0)) downloaded = 0 chunk_size = 1024 * 1024 # 1 MB with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=chunk_size): if not chunk: continue f.write(chunk) downloaded += len(chunk) elapsed = time.time() - start_time if in_notebook: clear_output(wait=True) print("⏳ Downloading DEM from the MATILDA web service...") if total_size > 0: progress_pct = downloaded / total_size * 100 print( f"Downloaded: {downloaded / 1024**2:,.1f} / {total_size / 1024**2:,.1f} MB " f"({progress_pct:.1f}%)" ) else: print(f"Downloaded: {downloaded / 1024**2:,.1f} MB") print(f"Elapsed time: {elapsed:,.1f} s") elapsed_total = time.time() - start_time if in_notebook: clear_output(wait=True) print("✅ DEM successfully downloaded from the MATILDA web service.") print(f"Saved to: {output_path}") print(f"File size: {downloaded / 1024**2:,.1f} MB") print(f"Elapsed time: {elapsed_total:,.1f} s") return
[docs] def download_dem_for_bbox( bbox, output_path, asset_id, config_path=None, ): """ Download a DEM for a custom geographic bounding box. Parameters ---------- bbox : list or tuple Bounding box as [xmin, ymin, xmax, ymax] in lon/lat coordinates. output_path : str or Path Local output path for the downloaded GeoTIFF. asset_id : str Earth Engine asset ID of the DEM. config_path : str or Path, optional Optional path to webservices.ini. Returns ------- Path Path to the saved GeoTIFF file. Raises ------ ValueError If bbox is invalid. """ if not isinstance(bbox, (list, tuple)) or len(bbox) != 4: raise ValueError("bbox must be a list or tuple: [xmin, ymin, xmax, ymax]") xmin, ymin, xmax, ymax = map(float, bbox) if not (xmin < xmax and ymin < ymax): raise ValueError("bbox must satisfy xmin < xmax and ymin < ymax") geometry = { "type": "Polygon", "coordinates": [[ [xmin, ymin], [xmax, ymin], [xmax, ymax], [xmin, ymax], [xmin, ymin], ]] } return download_dem_webservice( output_path=output_path, asset_id=asset_id, geometry=geometry, config_path=config_path, )
[docs] def delineate_catchment_local( lat, lon, dem_filename, catchment_filename, asset_id, buffer_m=40000, bbox=None, snap_acc_threshold=1000, config_path=None, ): """ Delineate a catchment locally with pysheds after downloading a DEM. The function uses the MATILDA DEM webservice to download a DEM either for a default square around the outlet point or for a custom bounding box. It then performs local delineation with pysheds. If the resulting catchment touches the DEM boundary, the DEM extent is likely too small and the function raises an error asking the user to provide a custom lat/lon bounding box. Parameters ---------- lat : float Latitude of the outlet / pour point. lon : float Longitude of the outlet / pour point. dem_filename : str or Path Output path for the downloaded DEM GeoTIFF. catchment_filename : str or Path Output path for the delineated catchment GeoJSON. asset_id : str Earth Engine asset ID of the DEM to download. buffer_m : int or float, optional Buffer radius in meters for the default DEM download box. Ignored when bbox is provided. bbox : list or tuple, optional Custom bounding box in geographic coordinates: [xmin, ymin, xmax, ymax]. snap_acc_threshold : int or float, optional Accumulation threshold used to snap the outlet point to the channel network. Default is 1000. config_path : str or Path, optional Optional path to webservices.ini. Returns ------- tuple (catchment_gdf, info) catchment_gdf : geopandas.GeoDataFrame Delineated catchment polygon. info : dict Dictionary with useful diagnostics: - dem_filename - catchment_filename - snapped_lon - snapped_lat - bbox_used - edge_touched Raises ------ RuntimeError If delineation fails or if the DEM extent appears too small. ValueError If bbox is invalid. """ from pathlib import Path import numpy as np import geopandas as gpd from shapely.geometry import shape from rasterio.features import shapes from pysheds.grid import Grid dem_filename = Path(dem_filename) catchment_filename = Path(catchment_filename) dem_filename.parent.mkdir(parents=True, exist_ok=True) catchment_filename.parent.mkdir(parents=True, exist_ok=True) if bbox is not None: if not isinstance(bbox, (list, tuple)) or len(bbox) != 4: raise ValueError("bbox must be a list or tuple: [xmin, ymin, xmax, ymax]") xmin, ymin, xmax, ymax = map(float, bbox) if not (xmin < xmax and ymin < ymax): raise ValueError("bbox must satisfy xmin < xmax and ymin < ymax") bbox_used = [xmin, ymin, xmax, ymax] if dem_filename.exists(): print(f"Using existing DEM: {dem_filename}") else: print("Downloading DEM for custom bounding box...") download_dem_for_bbox( bbox=bbox_used, output_path=dem_filename, asset_id=asset_id, config_path=config_path, ) else: bbox_used = None if dem_filename.exists(): print(f"Using existing DEM: {dem_filename}") else: print(f"Downloading DEM for default {buffer_m} m box around outlet point...") download_dem_webservice( output_path=dem_filename, asset_id=asset_id, lat=lat, lon=lon, buffer_m=buffer_m, config_path=config_path, ) print("Loading DEM into pysheds...") grid = Grid.from_raster(str(dem_filename)) dem = grid.read_raster(str(dem_filename)) print("Filling depressions...") flooded_dem = grid.fill_depressions(dem) print("Resolving flats...") inflated_dem = grid.resolve_flats(flooded_dem) dirmap = (64, 128, 1, 2, 4, 8, 16, 32) print("Computing flow directions...") fdir = grid.flowdir(inflated_dem, dirmap=dirmap) print("Computing flow accumulation...") acc = grid.accumulation(fdir, dirmap=dirmap) print("Snapping outlet point to channel network...") x_snap, y_snap = grid.snap_to_mask(acc > snap_acc_threshold, (lon, lat)) print("Delineating catchment...") catch = grid.catchment( x=x_snap, y=y_snap, fdir=fdir, dirmap=dirmap, xytype="coordinate", ) catch_view = np.asarray(grid.view(catch), dtype=np.uint8) edge_touched = bool( catch_view[0, :].any() or catch_view[-1, :].any() or catch_view[:, 0].any() or catch_view[:, -1].any() ) if edge_touched: raise RuntimeError( "Local delineation touches the DEM boundary. " "The DEM extent is likely too small. " "Please provide a larger custom lat/lon bounding box " "as bbox=[xmin, ymin, xmax, ymax]." ) print("Converting delineated catchment raster to polygon...") polygons = [] for geom, value in shapes( catch_view, mask=catch_view.astype(bool), transform=grid.affine, ): if value == 1: polygons.append(shape(geom)) if not polygons: raise RuntimeError("No catchment polygon could be created from the delineation result.") catchment_gdf = gpd.GeoDataFrame( geometry=polygons, crs=grid.crs, ) catchment_gdf = catchment_gdf.dissolve().explode(index_parts=False).reset_index(drop=True) if len(catchment_gdf) > 1: areas = catchment_gdf.to_crs(catchment_gdf.estimate_utm_crs()).area catchment_gdf = catchment_gdf.loc[[areas.idxmax()]].reset_index(drop=True) catchment_gdf.to_file(catchment_filename, driver="GeoJSON") print(f"Local catchment delineation saved to: {catchment_filename}") info = { "dem_filename": str(dem_filename), "catchment_filename": str(catchment_filename), "snapped_lon": float(x_snap), "snapped_lat": float(y_snap), "bbox_used": bbox_used, "edge_touched": edge_touched, } return catchment_gdf, info
[docs] def get_geopotential_webservice( catchment, config_path=None, asset_id=None, band=None, show_progress=True, ): """ Request catchment-mean geopotential and reference elevation from the MATILDA webservice. Parameters ---------- catchment : dict or geopandas.GeoDataFrame or shapely geometry Catchment geometry as a GeoJSON FeatureCollection, GeoDataFrame, or shapely geometry. If a GeoDataFrame or shapely geometry is provided, it will be converted to a FeatureCollection payload. config_path : str or Path, optional Optional path to webservices.ini. asset_id : str, optional Optional override of the geopotential Earth Engine asset. band : str, optional Optional override of the band name. show_progress : bool, optional If True, show a simple live status message while the request runs. Returns ------- dict Dictionary with keys: - "geopotential_mean" - "elevation_m" Raises ------ RuntimeError If the webservice request fails or returns malformed output. """ try: from IPython.display import clear_output in_notebook = True except Exception: in_notebook = False cfg = load_webservice_config(config_path=config_path, section="GOOGLE") service_url = f"{cfg['BASE_URL'].rstrip('/')}/geopotential" if hasattr(catchment, "to_json"): catchment = json.loads(catchment.to_json()) elif hasattr(catchment, "__geo_interface__") and not isinstance(catchment, dict): catchment = { "type": "FeatureCollection", "features": [ { "type": "Feature", "properties": {}, "geometry": catchment.__geo_interface__, } ], } if not isinstance(catchment, dict): raise ValueError( "catchment must be a GeoJSON dict, a GeoDataFrame, or a shapely geometry." ) payload = {"catchment": catchment} if asset_id is not None: payload["asset_id"] = asset_id if band is not None: payload["band"] = band headers = { "Content-Type": "application/json", "X-API-Key": cfg["PUBLIC_API_KEY"], } stop_flag = False start_time = time.time() def progress_worker(): symbols = ["⏳", "⌛"] i = 0 while not stop_flag: elapsed = time.time() - start_time msg1 = f"{symbols[i % 2]} Requesting reference elevation from the MATILDA web service..." msg2 = f"Elapsed time: {elapsed:,.1f} s" if in_notebook: clear_output(wait=True) print(msg1) print(msg2) else: print(msg1, msg2) time.sleep(1) i += 1 thread = None if show_progress: thread = threading.Thread(target=progress_worker, daemon=True) thread.start() try: response = requests.post( service_url, json=payload, headers=headers, timeout=cfg["TIMEOUT"], ) except requests.RequestException as e: raise RuntimeError(f"Failed to reach geopotential webservice: {e}") from e finally: stop_flag = True if thread is not None: thread.join() elapsed_total = time.time() - start_time if not response.ok: try: detail = response.json() except Exception: detail = response.text raise RuntimeError( f"Geopotential webservice request failed with status {response.status_code}: {detail}" ) try: data = response.json() except Exception as e: raise RuntimeError(f"Geopotential webservice did not return valid JSON: {e}") from e required = {"geopotential_mean", "elevation_m"} missing = required.difference(data.keys()) if missing: raise RuntimeError( f"Geopotential webservice response is missing keys: {sorted(missing)}" ) if in_notebook and show_progress: clear_output(wait=True) print("✅ Reference elevation successfully retrieved.") print(f"Geopotential mean:\t{data['geopotential_mean']:.2f} m2 s-2") print(f"Reference elevation:\t{data['elevation_m']:.2f} m a.s.l.") print(f"Elapsed time:\t\t{elapsed_total:,.1f} s") return data
[docs] def get_climate_data_webservice( catchment, date_range, config_path=None, show_progress=True, max_concurrent=6, ): """ Request spatially aggregated ERA5-Land forcing data from the MATILDA webservice and return them as a pandas DataFrame. The webservice streams one JSON record per year (NDJSON). """ import json import time import pandas as pd import requests try: from IPython.display import clear_output in_notebook = True except Exception: in_notebook = False cfg = load_webservice_config(config_path=config_path, section="GOOGLE") service_url = f"{cfg['BASE_URL'].rstrip('/')}/climate-data" if hasattr(catchment, "to_json"): catchment = json.loads(catchment.to_json()) elif hasattr(catchment, "__geo_interface__") and not isinstance(catchment, dict): catchment = { "type": "FeatureCollection", "features": [ { "type": "Feature", "properties": {}, "geometry": catchment.__geo_interface__, } ], } if not isinstance(catchment, dict): raise ValueError( "catchment must be a GeoJSON dict, a GeoDataFrame, or a shapely geometry." ) if not isinstance(date_range, (list, tuple)) or len(date_range) != 2: raise ValueError("date_range must be a list or tuple: [start_date, end_date]") start_year = int(str(date_range[0])[:4]) end_year = int(str(date_range[1])[:4]) total_years = end_year - start_year + 1 payload = { "catchment": catchment, "date_range": [str(date_range[0]), str(date_range[1])], "max_concurrent": int(max_concurrent), } headers = { "Content-Type": "application/json", "X-API-Key": cfg["PUBLIC_API_KEY"], } start_time = time.time() year_results = [] finished = 0 try: response = requests.post( service_url, json=payload, headers=headers, timeout=cfg["TIMEOUT"], stream=True, ) except requests.RequestException as e: raise RuntimeError(f"Failed to reach climate-data webservice: {e}") from e if not response.ok: try: detail = response.json() except Exception: detail = response.text raise RuntimeError( f"Climate-data webservice request failed with status {response.status_code}: {detail}" ) for line in response.iter_lines(decode_unicode=True): if not line: continue try: item = json.loads(line) except Exception as e: raise RuntimeError(f"Could not parse streamed climate-data response: {e}") from e if "error" in item: raise RuntimeError( f"Climate-data webservice failed for year {item.get('year')}: {item['error']}" ) year_results.append(item) finished += 1 if show_progress: elapsed = time.time() - start_time if in_notebook: clear_output(wait=True) print("⏳ Requesting ERA5-Land forcing data from the MATILDA web service...") print(f"Years completed: {finished}/{total_years}") print(f"Latest year received: {item.get('year')}") print(f"Elapsed time: {elapsed:,.1f} s") if not year_results: raise RuntimeError("Climate-data webservice returned no data.") year_results = sorted(year_results, key=lambda x: x["year"]) timestamps = [] temp = [] prec = [] for item in year_results: timestamps.extend(item["timestamps"]) temp.extend(item["temp"]) prec.extend(item["prec"]) df = pd.DataFrame({ "ts": timestamps, "temp": temp, "prec": prec, }) df = df.drop_duplicates(subset="ts").sort_values("ts").reset_index(drop=True) df["dt"] = pd.to_datetime(df["ts"], unit="ms") df["temp_c"] = df["temp"] - 273.15 df["prec"] = df["prec"] * 1000 elapsed_total = time.time() - start_time if in_notebook and show_progress: clear_output(wait=True) print("✅ ERA5-Land forcing data successfully retrieved.") print(f"Date range: {df['dt'].min().date()} to {df['dt'].max().date()}") print(f"Number of daily records: {len(df)}") print(f"Elapsed time: {elapsed_total:,.1f} s") return df[["dt", "temp", "temp_c", "prec"]]
[docs] class CMIPDownloader: """Class to download spatially averaged CMIP6 data for a given period, variable, and spatial subset.""" def __init__(self, var, starty, endy, shape, processes=10, dir='./'): self.var = var self.starty = starty self.endy = endy self.shape = shape self.processes = processes self.directory = dir # create the download directory if it doesn't exist if not os.path.exists(self.directory): os.makedirs(self.directory)
[docs] def download(self): """Runs a subset routine for CMIP6 data on GEE servers to create ee.FeatureCollections for all years in the requested period. Downloads individual years in parallel processes to increase the download time.""" print('Initiating download request for NEX-GDDP-CMIP6 data from ' + str(self.starty) + ' to ' + str(self.endy) + '.') def getRequests(starty, endy): """Generates a list of years to be downloaded. [Client side]""" return [i for i in range(starty, endy+1)] @retry(tries=10, delay=1, backoff=2) def getResult(index, year): """Handle the HTTP requests to download one year of CMIP6 data. [Server side]""" start = str(year) + '-01-01' end = str(year + 1) + '-01-01' startDate = ee.Date(start) endDate = ee.Date(end) n = endDate.difference(startDate, 'day').subtract(1) def getImageCollection(var): """Create and image collection of CMIP6 data for the requested variable, period, and region. [Server side]""" collection = ee.ImageCollection('NASA/GDDP-CMIP6') \ .select(var) \ .filterDate(startDate, endDate) \ .filterBounds(self.shape) \ .filter(ee.Filter.neq('model', 'NorESM2-LM')) # Exclude model (missing year 2096) return collection def renameBandName(b): """Edit variable names for better readability. [Server side]""" split = ee.String(b).split('_') return ee.String(split.splice(split.length().subtract(2), 1).join("_")) def buildFeature(i): """Create an area weighted average of the defined region for every day in the given year. [Server side]""" t1 = startDate.advance(i, 'day') t2 = t1.advance(1, 'day') # feature = ee.Feature(point) dailyColl = collection.filterDate(t1, t2) dailyImg = dailyColl.toBands() # renaming and handling names bands = dailyImg.bandNames() renamed = bands.map(renameBandName) # Daily extraction and adding time information dict = dailyImg.rename(renamed).reduceRegion( reducer=ee.Reducer.mean(), geometry=self.shape, ).combine( ee.Dictionary({'system:time_start': t1.millis(), 'isodate': t1.format('YYYY-MM-dd')}) ) return ee.Feature(None, dict) # Create features for all days in the respective year. [Server side] collection = getImageCollection(self.var) year_feature = ee.FeatureCollection(ee.List.sequence(0, n).map(buildFeature)) # Create a download URL for a CSV containing the feature collection. [Server side] url = year_feature.getDownloadURL() # Handle downloading the actual csv for one year. [Client side] r = requests.get(url, stream=True) if r.status_code != 200: r.raise_for_status() filename = os.path.join(self.directory, 'cmip6_' + self.var + '_' + str(year) + '.csv') with open(filename, 'w') as f: f.write(r.text) return index # Create a list of years to be downloaded. [Client side] items = getRequests(self.starty, self.endy) # Launch download requests in parallel processes and display a status bar. [Client side] with tqdm(total=len(items), desc="Downloading CMIP6 data for variable '" + self.var + "'") as pbar: results = [] with concurrent.futures.ThreadPoolExecutor(max_workers=self.processes) as executor: for i, year in enumerate(items): results.append(executor.submit(getResult, i, year)) for future in concurrent.futures.as_completed(results): index = future.result() pbar.update(1) print("All downloads complete.")
[docs] class CMIPDownloaderWebservice: """ Download spatially averaged CMIP6 yearly CSV files from the MATILDA webservice. This class mirrors the output of the local CMIPDownloader, but uses the /cmip6-data endpoint. It is designed as a low-disruption bridge so that CMIPProcessor can remain unchanged. Parameters ---------- starty : int First year to request. endy : int Last year to request. catchment : dict or geopandas.GeoDataFrame or shapely geometry Catchment geometry to send to the webservice. variables : list of str, optional Variables to request. Default is ["tas", "pr"]. dir : str, optional Output directory for yearly CSV files. max_concurrent : int, optional Concurrency passed to the webservice endpoint. block_size_years : int or None, optional If given, split each variable request into year blocks of this size. If None, request each variable over the full time period. config_path : str or Path, optional Optional path to webservices.ini. show_progress : bool, optional Whether to print progress information. """ def __init__( self, starty, endy, catchment, variables=None, dir="./", max_concurrent=6, block_size_years=None, config_path=None, show_progress=True, ): self.starty = int(starty) self.endy = int(endy) self.catchment = catchment self.variables = variables or ["tas", "pr"] self.directory = dir self.max_concurrent = int(max_concurrent) self.block_size_years = block_size_years self.config_path = config_path self.show_progress = show_progress if not os.path.exists(self.directory): os.makedirs(self.directory) def _normalize_catchment(self): catchment = self.catchment if hasattr(catchment, "to_json"): catchment = json.loads(catchment.to_json()) elif hasattr(catchment, "__geo_interface__") and not isinstance(catchment, dict): catchment = { "type": "FeatureCollection", "features": [ { "type": "Feature", "properties": {}, "geometry": catchment.__geo_interface__, } ], } if not isinstance(catchment, dict): raise ValueError( "catchment must be a GeoJSON dict, a GeoDataFrame, or a shapely geometry." ) return catchment def _iter_blocks(self): if self.block_size_years is None: yield self.starty, self.endy return y = self.starty while y <= self.endy: block_end = min(y + self.block_size_years - 1, self.endy) yield y, block_end y = block_end + 1 def _request_block(self, variable, block_start, block_end, pbar=None): cfg = load_webservice_config(config_path=self.config_path, section="GOOGLE") service_url = f"{cfg['BASE_URL'].rstrip('/')}/cmip6-data" payload = { "catchment": self._normalize_catchment(), "date_range": [f"{block_start}-01-01", f"{block_end}-12-31"], "variables": [variable], "max_concurrent": self.max_concurrent, } headers = { "Content-Type": "application/json", "X-API-Key": cfg["PUBLIC_API_KEY"], } try: response = requests.post( service_url, json=payload, headers=headers, timeout=cfg["TIMEOUT"], stream=True, ) except requests.RequestException as e: raise RuntimeError(f"Failed to reach cmip6-data webservice: {e}") from e if not response.ok: try: detail = response.json() except Exception: detail = response.text raise RuntimeError( f"cmip6-data webservice request failed with status {response.status_code}: {detail}" ) received_years = [] for raw_line in response.iter_lines(decode_unicode=False): if not raw_line: continue if isinstance(raw_line, bytes): line = raw_line.decode("utf-8", errors="replace") else: line = raw_line if not line.strip(): continue try: item = json.loads(line) except Exception as e: raise RuntimeError(f"Could not parse streamed cmip6-data response: {e}") from e if "error" in item: raise RuntimeError( f"cmip6-data webservice failed for var={item.get('var')} year={item.get('year')}: {item['error']}" ) year = int(item["year"]) var = item["var"] filename = item.get("filename", f"cmip6_{var}_{year}.csv") csv_text = item["csv"] out_path = os.path.join(self.directory, filename) with open(out_path, "w", encoding="utf-8") as f: f.write(csv_text) received_years.append(year) if pbar is not None: pbar.update(1) pbar.set_postfix_str(f"variable={var}, year={year}") return sorted(received_years)
[docs] def download(self): """ Download all requested variables and save yearly CSV files locally. Returns ------- dict Summary of downloaded years by variable. """ summary = {} t0 = time.time() _ = load_webservice_config(config_path=self.config_path, section="GOOGLE") total_expected = (self.endy - self.starty + 1) * len(self.variables) pbar = tqdm( total=total_expected, desc="Downloading CMIP6 yearly files", unit="file", disable=not self.show_progress, ) try: for variable in self.variables: if self.show_progress: pbar.set_postfix_str(f"variable={variable}") variable_years = [] for block_start, block_end in self._iter_blocks(): years = self._request_block(variable, block_start, block_end, pbar=pbar) variable_years.extend(years) variable_years = sorted(set(variable_years)) summary[variable] = variable_years expected = list(range(self.starty, self.endy + 1)) missing = sorted(set(expected) - set(variable_years)) if self.show_progress and missing: print(f"Missing years for {variable}: {missing[:10]}{' ...' if len(missing) > 10 else ''}") finally: pbar.close() elapsed = time.time() - t0 if self.show_progress: print(f"CMIP6 webservice download complete in {elapsed:,.1f} s.") return summary
[docs] class CMIPDownloaderManifest: """ Fast CMIP6 downloader using the /cmip6-manifest endpoint. The MATILDA webservice generates signed Earth Engine download URLs. This helper downloads those files directly in parallel and saves them with the same filenames expected by CMIPProcessor. """ def __init__( self, starty, endy, catchment, variables=None, dir="./", manifest_concurrent=6, download_workers=10, block_size_years=None, config_path=None, show_progress=True, ): self.starty = int(starty) self.endy = int(endy) self.catchment = catchment self.variables = variables or ["tas", "pr"] self.directory = dir self.manifest_concurrent = int(manifest_concurrent) self.download_workers = int(download_workers) self.block_size_years = block_size_years self.config_path = config_path self.show_progress = show_progress if not os.path.exists(self.directory): os.makedirs(self.directory) def _normalize_catchment(self): catchment = self.catchment if hasattr(catchment, "to_json"): catchment = json.loads(catchment.to_json()) elif hasattr(catchment, "__geo_interface__") and not isinstance(catchment, dict): catchment = { "type": "FeatureCollection", "features": [ { "type": "Feature", "properties": {}, "geometry": catchment.__geo_interface__, } ], } if not isinstance(catchment, dict): raise ValueError( "catchment must be a GeoJSON dict, a GeoDataFrame, or a shapely geometry." ) return catchment def _iter_blocks(self): if self.block_size_years is None: yield self.starty, self.endy return y = self.starty while y <= self.endy: block_end = min(y + self.block_size_years - 1, self.endy) yield y, block_end y = block_end + 1 def _fetch_manifest_block(self, variable, block_start, block_end): cfg = load_webservice_config(config_path=self.config_path, section="GOOGLE") service_url = f"{cfg['BASE_URL'].rstrip('/')}/cmip6-manifest" payload = { "catchment": self._normalize_catchment(), "date_range": [f"{block_start}-01-01", f"{block_end}-12-31"], "variables": [variable], "max_concurrent": self.manifest_concurrent, } headers = { "Content-Type": "application/json", "X-API-Key": cfg["PUBLIC_API_KEY"], } try: response = requests.post( service_url, json=payload, headers=headers, timeout=cfg["TIMEOUT"], stream=True, ) except requests.RequestException as e: raise RuntimeError(f"Failed to reach cmip6-manifest webservice: {e}") from e if not response.ok: try: detail = response.json() except Exception: detail = response.text raise RuntimeError( f"cmip6-manifest request failed with status {response.status_code}: {detail}" ) entries = [] for raw_line in response.iter_lines(decode_unicode=False): if not raw_line: continue line = raw_line.decode("utf-8", errors="replace") if isinstance(raw_line, bytes) else raw_line if not line.strip(): continue try: item = json.loads(line) except Exception as e: raise RuntimeError(f"Could not parse streamed cmip6-manifest response: {e}") from e if "error" in item: raise RuntimeError( f"cmip6-manifest failed for var={item.get('var')} year={item.get('year')}: {item['error']}" ) entries.append(item) return entries def _download_one(self, item): filename = item["filename"] url = item["download_url"] out_path = os.path.join(self.directory, filename) response = requests.get(url, timeout=300) response.raise_for_status() with open(out_path, "w", encoding="utf-8") as f: f.write(response.text) return { "filename": filename, "year": int(item["year"]), "var": item["var"], "path": out_path, } def download(self): summary = {} t0 = time.time() manifest_entries = [] for variable in self.variables: if self.show_progress: print(f"Fetching manifest for '{variable}'...") variable_entries = [] for block_start, block_end in self._iter_blocks(): variable_entries.extend( self._fetch_manifest_block(variable, block_start, block_end) ) manifest_entries.extend(variable_entries) if self.show_progress: print(f"Received {len(manifest_entries)} manifest entries.") total_expected = (self.endy - self.starty + 1) * len(self.variables) pbar = tqdm( total=total_expected, desc="Downloading CMIP6 yearly files", unit="file", disable=not self.show_progress, ) try: with concurrent.futures.ThreadPoolExecutor(max_workers=self.download_workers) as executor: futures = [executor.submit(self._download_one, item) for item in manifest_entries] downloaded = [] for future in concurrent.futures.as_completed(futures): result = future.result() downloaded.append(result) pbar.update(1) pbar.set_postfix_str(f"variable={result['var']}, year={result['year']}") finally: pbar.close() for variable in self.variables: years = sorted( {item["year"] for item in downloaded if item["var"] == variable} ) summary[variable] = years expected = list(range(self.starty, self.endy + 1)) missing = sorted(set(expected) - set(years)) if self.show_progress and missing: print(f"Missing years for {variable}: {missing[:10]}{' ...' if len(missing) > 10 else ''}") elapsed = time.time() - t0 if self.show_progress: print(f"CMIP6 manifest download complete in {elapsed:,.1f} s.") return summary
[docs] class CMIPProcessor: """Class to read and pre-process CSV files downloaded by the CMIPDownloader class.""" def __init__(self, var, file_dir='.'): self.file_dir = file_dir self.var = var self.df_hist = self.append_df(self.var, self.file_dir, hist=True) self.df_ssp = self.append_df(self.var, self.file_dir, hist=False) self.ssp2_common, self.ssp5_common, self.hist_common,\ self.common_models, self.dropped_models = self.process_dataframes() self.ssp2, self.ssp5 = self.get_results()
[docs] def read_cmip(self, filename): """Reads CMIP6 CSV files and drops redundant columns.""" df = pd.read_csv(filename, index_col='isodate', parse_dates=['isodate']) df = df.drop(['system:index', '.geo', 'system:time_start'], axis=1) return df
[docs] def append_df(self, var, file_dir='.', hist=True): """Reads CMIP6 CSV files of individual years and concatenates them into dataframes for the full downloaded period. Historical and scenario datasets are treated separately. Converts precipitation unit to mm.""" df_list = [] if hist: starty = 1979 endy = 2014 else: starty = 2015 endy = 2100 for i in range(starty, endy + 1): filename = file_dir + 'cmip6_' + var + '_' + str(i) + '.csv' df_list.append(self.read_cmip(filename)) if hist: hist_df = pd.concat(df_list) if var == 'pr': hist_df = hist_df * 86400 # from kg/(m^2*s) to mm/day return hist_df else: ssp_df = pd.concat(df_list) if var == 'pr': ssp_df = ssp_df * 86400 # from kg/(m^2*s) to mm/day return ssp_df
[docs] def process_dataframes(self): """Separates the two scenarios and drops models not available for both scenarios and the historical period.""" ssp2 = self.df_ssp.loc[:, self.df_ssp.columns.str.startswith('ssp245')] ssp5 = self.df_ssp.loc[:, self.df_ssp.columns.str.startswith('ssp585')] hist = self.df_hist.loc[:, self.df_hist.columns.str.startswith('historical')] ssp2.columns = ssp2.columns.str.lstrip('ssp245_').str.rstrip('_' + self.var) ssp5.columns = ssp5.columns.str.lstrip('ssp585_').str.rstrip('_' + self.var) hist.columns = hist.columns.str.lstrip('historical_').str.rstrip('_' + self.var) # Get all the models the three datasets have in common common_models = set(ssp2.columns).intersection(ssp5.columns).intersection(hist.columns) # Get the model names that contain NaN values nan_models_list = [df.columns[df.isna().any()].tolist() for df in [ssp2, ssp5, hist]] # flatten the list nan_models = [col for sublist in nan_models_list for col in sublist] # remove duplicates nan_models = list(set(nan_models)) # Remove models with NaN values from the list of common models common_models = [x for x in common_models if x not in nan_models] ssp2_common = ssp2.loc[:, common_models] ssp5_common = ssp5.loc[:, common_models] hist_common = hist.loc[:, common_models] dropped_models = list(set([mod for mod in ssp2.columns if mod not in common_models] + [mod for mod in ssp5.columns if mod not in common_models] + [mod for mod in hist.columns if mod not in common_models])) return ssp2_common, ssp5_common, hist_common, common_models, dropped_models
[docs] def get_results(self): """Concatenates historical and scenario data to combined dataframes of the full downloaded period. Arranges the models in alphabetical order.""" ssp2_full = pd.concat([self.hist_common, self.ssp2_common]) ssp2_full.index.names = ['TIMESTAMP'] ssp5_full = pd.concat([self.hist_common, self.ssp5_common]) ssp5_full.index.names = ['TIMESTAMP'] ssp2_full = ssp2_full.reindex(sorted(ssp2_full.columns), axis=1) ssp5_full = ssp5_full.reindex(sorted(ssp5_full.columns), axis=1) return ssp2_full, ssp5_full