#!/usr/bin/env python """ Functions to call ECMWF Reanalysis with CDS-api - ERA5-land daily request - request a list of daily variables dedicated to the calculus of ET0 and the generation of MODSPA daily forcing files heavily modified from rivallandv's original file @author: auclairj """ import os, shutil # for path exploration and file management from typing import List, Tuple # to declare variables import numpy as np # for math on arrays import xarray as xr # to manage nc files from datetime import datetime # to manage dates from p_tqdm import p_map # for multiprocessing with progress bars from dateutil.rrule import rrule, MONTHLY from fnmatch import fnmatch # for file name matching import rasterio as rio # to manage geotiff images import netCDF4 as nc # to write netcdf4 files from tqdm import tqdm # to follow progress from pandas import date_range from rasterio.warp import reproject, Resampling # to reproject import re # for string comparison import warnings # to suppress pandas warning # CDS API external library # source: https://pypi.org/project/cdsapi/ import cdsapi # to download cds data import requests # to request data # FAO ET0 calculator external library # Notes # source: https://github.com/Evapotranspiration/ETo # documentation: https://eto.readthedocs.io/en/latest/ import eto # to calculate ET0 def era5_enclosing_shp_aera(area: List[float], pas: float) -> tuple: """ Find the four coordinates including the boxbound scene to agree with gridsize resolution system projection: WGS84 lat/lon degree ARGS: area: [lat north, lon west, lat south, lon east] in degree WGS84, float pas: gridsize RETURN: -coordinates list corresponding to N,W,S,E corners of the grid in decimal degree Note: gdal coordinates reference upper left corner of pixel ERA5 coordinates refere to center of grid To resolve this difference an offset of pas/2 is apply """ lat_max, lon_min, lat_min, lon_max = area # North era5_lat_max = round((lat_max//pas+1)*pas, 2) # West era5_lon_min = round((lon_min//pas)*pas, 2) # South era5_lat_min = round((lat_min//pas)*pas, 2) # Est era5_lon_max = round((lon_max//pas+1)*pas, 2) era5_area = era5_lat_max, era5_lon_min, era5_lat_min, era5_lon_max return era5_area # [N,W,S,E] def call_era5land_daily(args: tuple) -> None: """ query of one month of daily ERA5-land data of a selected variable according to a selected statistic Documentation: https://datastore.copernicus-climate.eu/documents/app-c3s-daily-era5-statistics/C3S_Application-Documentation_ERA5-daily-statistics-v2.pdf Arguments ---------- year : TYPE str year at YYYY format. month : TYPE str month at MM format. variable : TYPE str user-selectable variable cf. Appendix A Table 3 for list of input variables availables. statistic : TYPE str daily statistic choosed, 3 possibility daily_mean or daily_minimum or daily_maximum. area : TYPE list of 4 int area = [lat_max, lon_min, lat_min, lon_max] output_path : TYPE str path for output file. Returns ------- Netcdf File. """ year, month, variable, statistic, area, output_path = args # set name of output file for each month (statistic, variable, year, month) output_filename = \ output_path+os.sep +\ "ERA5-land_"+year+"_"+month+"_"+variable+"_"+statistic+".nc" if os.path.isfile(output_filename): print(output_filename, ' already exist') else: try: c = cdsapi.Client(timeout=300) result = c.service("tool.toolbox.orchestrator.workflow", params={ "realm": "c3s", "project": "app-c3s-daily-era5-statistics", "version": "master", "kwargs": { "dataset": "reanalysis-era5-land", "product_type": "reanalysis", "variable": variable, "statistic": statistic, "year": year, "month": month, "time_zone": "UTC+00:0", "frequency": "1-hourly", "grid": "0.1/0.1", "area": {"lat": [area[2], area[0]], "lon": [area[1], area[3]]} }, "workflow_name": "application" }) location = result[0]['location'] res = requests.get(location, stream=True) print("Writing data to " + output_filename) with open(output_filename, 'wb') as fh: for r in res.iter_content(chunk_size=1024): fh.write(r) fh.close() except: print('!! request', variable, ' failed !! -> year', year, 'month', month) return None def call_era5land_daily_for_MODSPA(start_date: str, end_date: str, area: List[float], output_path: str, processes: int = 9) -> None: """ request ERA5-land daily variables needed for ET0 calculus and MODSPA forcing https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview ARGS: start_date: YYYY-MM-DD string end_date: YYYY-MM-DD string area: [nord, ouest, sud, est] in degree WGS84, float filename_out: fichier de sortie, format xxx.nc, string processes: nombre de processeurs logiques à utiliser INFO: called land surface variables : '2m_temperature', '2m_dewpoint_temperature', 'surface_solar_radiation_downward', 'surface_net_solar_radiation', 'surface_pressure', 'mean_sea_level_pressure', 'potential_evaporation', 'evaporation', 'total_evaporation', 'total_precipitation', 'snowfall', '10m_u_component_of_wind', '10m_v_component_of_wind', Arguments ---------- start_date : TYPE DESCRIPTION. end_date : TYPE DESCRIPTION. area : TYPE DESCRIPTION. output_path : TYPE DESCRIPTION. Returns ------- None. """ # list of first day of each month date into period strt_dt = datetime.strptime(start_date, '%Y-%m-%d').replace(day=1) end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(day=1) periods = [dt for dt in rrule( freq=MONTHLY, dtstart=strt_dt, until=end_dt, bymonthday=1)] dico = { '2m_temperature': ['daily_minimum', 'daily_maximum'], '10m_u_component_of_wind': ['daily_mean'], '10m_v_component_of_wind': ['daily_mean'], 'total_precipitation': ['daily_mean'], 'surface_solar_radiation_downwards': ['daily_mean'], '2m_dewpoint_temperature': ['daily_minimum', 'daily_maximum'] } args = [] # loop on variable to upload for variable in dico.keys(): # loop on statistic associated to variable to upload for statistic in dico[variable]: # loop on year and month for dt in periods: year = str(dt.year) month = '0'+str(dt.month) month = month[-2:] # Requete ERA5-land args.append((year, month, variable, statistic, area, output_path)) # Start pool p_map(call_era5land_daily, args, **{"num_cpus": processes}) return None def filename_to_datetime(filename: str) -> datetime.date: """ filename_to_datetime returns a `datetime.date` object for the date of the given file name. ## Arguments 1. filename: `str ` name or path of the product ## Returns 1. date: `datetime.date` datetime.date object, date of the product """ # Search for a date pattern (yyyy_mm_dd) in the product name or path match = re.search('\d{4}_\d{2}', filename) format = '%Y_%m' datetime_object = datetime.strptime(match[0], format) return datetime_object.date() def concat_monthly_nc_file(list_era5land_monthly_files: List[str], list_variables: List[str], output_path: str) -> List[str]: """ Concatenate monthly netcdf datasets into a single file for each given variable. ## Arguments 1. list_era5land_monthly_files: `List[str]` list of daily files per month 2. list_variables: `List[str]` names of the required variables as written in the filename 3. output_path: `List[str]` path to which save the aggregated files ## Returns 1. list_era5land_files: `List[str]` the list of paths to the aggregated files """ if not os.path.exists(output_path): os.mkdir(output_path) list_era5land_monthly_files.sort() list_era5land_files = [] # concatenate all dates into a single file for each variable for variable in list_variables: curr_var_list = [] dates = [] for file in list_era5land_monthly_files: # find specific variable if fnmatch(file, '*' + variable + '*'): curr_var_list.append(file) dates.append(filename_to_datetime(file)) curr_datasets = [] for file in curr_var_list: # open all months for the given variable curr_datasets.append(xr.open_dataset(file)) # Create file name try: concatenated_file = output_path + os.sep + 'era5-land_' + dates[0].strftime('%m-%Y') + '_' + dates[-1].strftime('%m-%Y') + '_' + variable + '.nc' except: print(variable) # Concatenate monthly datasets concatenated_dataset = xr.concat(curr_datasets, dim = 'time') # Save datasets concatenated_dataset.to_netcdf(path = concatenated_file, mode = 'w',) # Add filename to output list list_era5land_files.append(concatenated_file) return list_era5land_files def uz_to_u2(u_z: List[float], h: float) -> List[float]: """ The wind speed measured at heights other than 2 m can be adjusted according to the follow equation Arguments ---------- u_z : TYPE float array measured wind speed z m above the ground surface, ms- 1. h : TYPE float height of the measurement above the ground surface, m. Returns ------- u2 : TYPE float array average daily wind speed in meters per second (ms- 1 ) measured at 2 m above the ground. """ u2 = u_z*4.87/(np.log(67.8*h - 5.42)) return u2 def ea_calc(T: float) -> float: """ comments Actual vapour pressure (ea) derived from dewpoint temperature ' Arguments ---------- T : Temperature in degree celsius. Returns ------- e_a :the actual Vapour pressure in Kpa """ e_a = 0.6108*np.exp(17.27*T/(T+237.15)) return e_a def load_variable(file_name: str) -> xr.Dataset: """ Loads an ERA5 meteorological variable into a xarray dataset according to the modspa architecture. ## Arguments 1. file_name: `str` netcdf file to load ## Returns 1. variable: `xr.Dataset` output xarray dataset """ # Rename temperature variables according to the statistic (max or min) if fnmatch(file_name, '*era5-land*2m_temperature_daily_maximum*'): # maximum temperature variable = xr.open_dataset(file_name).rename({'t2m': 't2m_max'}).drop_vars('realization') # netcdfs from ERA5 carry an unecessary 'realization' coordinate, so it is dropped elif fnmatch(file_name, '*era5-land*2m_temperature_daily_minimum*'): # minimum temperature variable = xr.open_dataset(file_name).rename({'t2m': 't2m_min'}).drop_vars('realization') elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_maximum*'): # maximum dewpoint temperature variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_max'}).drop_vars('realization') elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_minimum*'): # minimum temperature variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_min'}).drop_vars('realization') # Other variables can be loaded without modification else: variable = xr.open_dataset(file_name).drop_vars('realization') return variable def combine_weather2netcdf(rain_file: str, ET0_tile: str, ndvi_path: str, save_path: str) -> None: # Open tif files rain_tif = rio.open(rain_file) ET0_tif = rio.open(ET0_tile) # Open ndvi netcdf to get structure ndvi = xr.open_dataset(ndvi_path) # Get geotiff dimensions (band, x, y) dims = (rain_tif.count, rain_tif.height, rain_tif.width) # Create empty dataset nc_ds = nc.Dataset(save_path, mode = 'w') # Create dimensions nc_ds.createDimension('time', dims[0]) # unlimited axis (can be appended to). nc_ds.createDimension('x', dims[1]) # latitude axis nc_ds.createDimension('y', dims[2]) # longitude axis # Create global attributes nc_ds.title = 'Weather datacube interpolated/calculated from ERA5 Land data' nc_ds.subtitle = 'total daily precipitation and total daily potential evapotranspiration' # Define two variables with the same names as dimensions, # a conventional way to define "coordinate variables". x = nc_ds.createVariable('x', np.float64, ('x',)) x.units = 'meters' x.long_name = 'UTM meters east' y = nc_ds.createVariable('y', np.float64, ('y',)) y.units = 'meters' y.long_name = 'UTM meters north' time = nc_ds.createVariable('time', np.float64, ('time',)) time.units = 'numpy datetime' time.long_name = 'time' spatial_ref = nc_ds.createVariable('spatial_ref', np.str0, ()) spatial_ref.long_name = 'Spatial reference' spatial_ref.GeoTransform = ndvi.spatial_ref.attrs['GeoTransform'] spatial_ref.spatial_ref = ndvi.spatial_ref.attrs['spatial_ref'] spatial_ref.crs_wkt = ndvi.spatial_ref.attrs['crs_wkt'] spatial_ref.reference_ellipsoid_name = ndvi.spatial_ref.attrs['reference_ellipsoid_name'] spatial_ref.horizontal_datum_name = ndvi.spatial_ref.attrs['horizontal_datum_name'] spatial_ref.false_easting = ndvi.spatial_ref.attrs['false_easting'] spatial_ref.false_northing = ndvi.spatial_ref.attrs['false_northing'] # spatial_ref.latitude_of_projection_origin = ndvi.spatial_ref.attrs['latitude_of_projection_origin'] # spatial_ref.crs_wkt= ndvi.spatial_ref.attrs['crs_wkt'] # spatial_ref.crs_wkt= ndvi.spatial_ref.attrs['crs_wkt'] # Define variables to hold the data Rain = nc_ds.createVariable('Rain', np.int16, ('time','x','y')) # note: unlimited dimension is leftmost Rain.units = 'mm' Rain.standard_name = 'total_precipitation' # this is a CF standard name ET0 = nc_ds.createVariable('ET0', np.int16, ('time','x','y')) # note: unlimited dimension is leftmost ET0.units = 'mm' ET0.standard_name = 'potential_evapotranspiration' # this is a CF standard name # Write data to variables # Coordinates x = ndvi.x.values y = ndvi.y.values time = list(range(1, dims[0])) # Data variables for i in tqdm(range(dims[0])): Rain[i,:,:] = rain_tif.read(i+1) ET0[i,:,:] = ET0_tif.read(i+1) nc_ds.close() return None def calculate_ET0_pixel(pixel_dataset: xr.Dataset, lat: float, lon: float, h: float = 10) -> np.ndarray: """ Calculate ET0 over the year for a single pixel of the ERA5 weather dataset. ## Arguments 1. pixel_dataset: `xr.Dataset` extracted dataset that contains all information for a single pixel 2. lat: `float` latitudinal coordinate of that pixel 3. lon: `float` longitudinal coordinate of that pixel 4. h: `float` `default = 10` height of ERA5 wind measurement in meters ## Returns 1. ET0_values: `np.ndarray` numpy array containing the ET0 values for each day """ # Conversion of xarray dataset to dataframe for ET0 calculation ET0 = pixel_dataset.d2m_max.to_dataframe().rename(columns = {'d2m_max' : 'Dew_Point_T_max'}) - 273.15 # conversion of temperatures from K to °C ET0['Dew_Point_T_min'] = pixel_dataset.d2m_min.to_dataframe()['d2m_min'].values - 273.15 # conversion of temperatures from K to °C ET0['T_min'] = pixel_dataset.t2m_min.to_dataframe()['t2m_min'].values - 273.15 # conversion of temperatures from K to °C ET0['T_max'] = pixel_dataset.t2m_max.to_dataframe()['t2m_max'].values - 273.15 # conversion of temperatures from K to °C ET0['Rain'] = pixel_dataset.tp.to_dataframe()['tp'].values*1000 # conversion of total precipitation from meters to milimeters # Conversion of easward and northward wind values to scalar wind ET0['U_z'] = np.sqrt(pixel_dataset.u10.to_dataframe()['u10'].values**2 + pixel_dataset.v10.to_dataframe()['v10'].values**2) ET0['RH_max'] = 100 * ea_calc(ET0['Dew_Point_T_min']) / ea_calc(ET0['T_min']) # calculation of relative humidity from dew point temperature and temperature ET0['RH_min'] = 100 * ea_calc(ET0['Dew_Point_T_max']) / ea_calc(ET0['T_max']) # calculation of relative humidity from dew point temperature and temperature ET0['R_s'] = pixel_dataset.ssrd.to_dataframe()['ssrd'].values/1e6 # to convert downward total radiation from J/m² to MJ/m² ET0.drop(columns = ['Dew_Point_T_max', 'Dew_Point_T_min'], inplace = True) # drop unecessary columns # Start ET0 calculation eto_calc = eto.ETo() warnings.filterwarnings('ignore') # remove pandas warning # ET0 calculation for given pixel (lat, lon) values eto_calc.param_est(ET0, freq = 'D', # daily frequence # Elevation of the met station above mean sea level (m) (only needed if P is not in df). z_msl = 0., lat = lat, lon = lon, TZ_lon = None, z_u = h) # h: height of raw wind speed measurement # Retrieve ET0 values ET0_values = eto_calc.eto_fao(max_ETo=15, min_ETo=0, interp=True, maxgap=10).values # ETo_FAO_mm return ET0_values def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_file: str, raw_S2_image_ref: str, h: float = 10, max_ram: int = 12288) -> Tuple[str, str]: """ Calculate ET0 values from the ERA5 netcdf weather variables. Output netcdf contains the ET0 values for each day in the selected time period and reprojected on the same grid as the NDVI values. ## Arguments 1. list_era5land_files: `List[str]` list of netcdf files containing the necessary variables 2. output_file: `str` output file name without extension 3. raw_S2_image_ref: `str` raw Sentinel 2 image at right resolution for reprojection 4. h: `float` `default = 10` height of ERA5 wind measurements in meters 5. max_ram: `int` `default = 12288` max ram (in MiB) to give to OTB ## Returns 1. output_file_rain: `str` path to file containing precipitation data 2. output_file_ET0: `str` path to file containing ET0 data """ # Load all monthly files into a single xarray dataset that contains all dates (daily frequency) raw_weather_ds = None for file in list_era5land_files: if not raw_weather_ds: raw_weather_ds = load_variable(file) else: temp = load_variable(file) raw_weather_ds = xr.merge([temp, raw_weather_ds]) del temp # Create ET0 variable (that will be saved) and set attributes raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = 'float32'))) # Loop on lattitude and longitude coordinates to calculate ET0 per "pixel" for lat in raw_weather_ds.coords['lat'].values: for lon in raw_weather_ds.coords['lon'].values: # Select whole time period for given (lat, lon) values select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon']) # Calculate ET0 values for given pixel ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h) # Write ET0 values in xarray Dataset raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values # Get necessary data for final dataset and rewrite netcdf attributes final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min', 'd2m_max', 'd2m_min']) # remove unwanted variables final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm # Change datatype to reduce memory usage final_weather_ds['tp'] = (final_weather_ds['tp'] * 100).astype('u2') final_weather_ds['ET0'] = (final_weather_ds['ET0'] * 1000).astype('u2') # Write projection final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326') # Set variable attributes final_weather_ds['ET0'].attrs['units'] = 'mm' final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration' final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method' final_weather_ds['ET0'].attrs['scale factor'] = '1000' final_weather_ds['tp'].attrs['units'] = 'mm' final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation' final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters' final_weather_ds['tp'].attrs['scale factor'] = '100' # Save dataset to geotiff, still in wgs84 (lat, lon) coordinates output_file_rain = output_file + '_rain.tif' output_file_ET0 = output_file + '_ET0.tif' final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16') final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16') # Reprojected image paths output_file_rain_reproj = output_file + '_rain_reproj.tif' output_file_ET0_reproj = output_file + '_ET0_reproj.tif' # Run otbcli_SuperImpose OTB_command = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_rain + ' -out ' + output_file_rain_reproj + ' uint16 -interpolator linear -ram ' + str(max_ram) os.system(OTB_command) OTB_command = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_ET0 + ' -out ' + output_file_ET0_reproj + ' uint16 -interpolator linear -ram ' + str(max_ram) os.system(OTB_command) # remove old files and rename outputs os.remove(output_file_rain) shutil.move(output_file_rain_reproj, output_file_rain) os.remove(output_file_ET0) shutil.move(output_file_ET0_reproj, output_file_ET0) return output_file_rain, output_file_ET0