Skip to content
Snippets Groups Projects
lib_era5_land_pixel.py 18.2 KiB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494
#!/usr/bin/env python
"""
Functions to call ECMWF Reanalysis with CDS-api

- ERA5-land hourly request
- ERA5-land daily request
- request a list of hourly variables dedicated to the calculus of ET0
 and the generation of MODSPA daily forcing files

@author: rivalland
"""

import os  # for path exploration
from typing import List  # to declare variables
import numpy as np  # for math on arrays
import xarray as xr  # to manage nc files
from datetime import datetime  # to manage dates
from p_tqdm import p_map  # for multiprocessing with progress bars
from dateutil.rrule import rrule, MONTHLY
from fnmatch import fnmatch  # for file name matching
import re  # for string comparison
import warnings  # to suppress pandas warning

# CDS API external library
# source: https://pypi.org/project/cdsapi/
import cdsapi  # to download cds data
import requests  # to request data

# FAO ET0 calculator external library
# Notes
# source: https://github.com/Evapotranspiration/ETo
# documentation: https://eto.readthedocs.io/en/latest/
import eto  # to calculate ET0


def era5_enclosing_shp_aera(area: List[float], pas: float) -> tuple:
    """
   Find the four coordinates including the boxbound scene
   to agree with gridsize resolution
   system projection: WGS84 lat/lon degree
   ARGS:
       area: [lat north, lon west, lat south, lon east] in degree WGS84, float
       pas: gridsize
    RETURN:
    -coordinates list corresponding to N,W,S,E corners of the grid in decimal degree
   Note: gdal coordinates reference upper left corner of pixel
         ERA5 coordinates refere to center of grid
         To resolve this difference an offset of pas/2 is apply
    """
    lat_max, lon_min, lat_min, lon_max = area

    # North
    era5_lat_max = round((lat_max//pas+1)*pas, 2)
    # West
    era5_lon_min = round((lon_min//pas)*pas, 2)
    # South
    era5_lat_min = round((lat_min//pas)*pas, 2)
    # Est
    era5_lon_max = round((lon_max//pas+1)*pas, 2)

    era5_area = era5_lat_max, era5_lon_min, era5_lat_min, era5_lon_max

    return era5_area  # [N,W,S,E]


def call_era5land_daily(args: tuple) -> None:
    """
    query of one month of daily ERA5-land data of a selected variable
    according to a selected statistic

    Documentation:
    https://datastore.copernicus-climate.eu/documents/app-c3s-daily-era5-statistics/C3S_Application-Documentation_ERA5-daily-statistics-v2.pdf


    Arguments
    ----------
    year : TYPE str
        year at YYYY format.
    month : TYPE str
        month at MM format.
    variable : TYPE str
        user-selectable variable
        cf. Appendix A Table 3 for list of input variables availables.
    statistic : TYPE str
        daily statistic choosed, 3 possibility
        daily_mean or daily_minimum or daily_maximum.
    area : TYPE list of 4 int
        area = [lat_max, lon_min, lat_min, lon_max]
    output_path : TYPE str
        path for output file.

    Returns
    -------
    Netcdf File.

    """
    year, month, variable, statistic, area, output_path = args
    
    # set name of output file for each month (statistic, variable, year, month)
    output_filename = \
        output_path+os.sep +\
        "ERA5-land_"+year+"_"+month+"_"+variable+"_"+statistic+".nc"

    if os.path.isfile(output_filename):
        print(output_filename, ' already exist')
    else:
        try:

            c = cdsapi.Client(timeout=300)

            result = c.service("tool.toolbox.orchestrator.workflow",
                               params={
                                   "realm": "c3s",
                                   "project": "app-c3s-daily-era5-statistics",
                                   "version": "master",
                                   "kwargs": {
                                       "dataset": "reanalysis-era5-land",
                                       "product_type": "reanalysis",
                                       "variable": variable,
                                       "statistic": statistic,
                                       "year": year,
                                       "month": month,
                                       "time_zone": "UTC+00:0",
                                       "frequency": "1-hourly",
                                       "grid": "0.1/0.1",
                                       "area": {"lat": [area[2], area[0]],
                                                "lon": [area[1], area[3]]}
                                   },
                                   "workflow_name": "application"
                               })

            location = result[0]['location']
            res = requests.get(location, stream=True)
            print("Writing data to " + output_filename)
            with open(output_filename, 'wb') as fh:
                for r in res.iter_content(chunk_size=1024):
                    fh.write(r)
            fh.close()
        except:
            print('!! request', variable, '  failed !! -> year', year, 'month', month)
    return None


def call_era5land_daily_for_MODSPA(start_date: str, end_date: str, area: List[float], output_path: str, processes: int = 9) -> None:
    """
    request ERA5-land daily variables needed for ET0 calculus and MODSPA forcing
    https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview

    ARGS:
        start_date: YYYY-MM-DD string
        end_date: YYYY-MM-DD string
        area: [nord, ouest, sud, est] in degree WGS84, float
        filename_out: fichier de sortie, format xxx.nc, string
        processes: nombre de processeurs logiques à utiliser

    INFO:
         called land surface variables :
             '2m_temperature',
             '2m_dewpoint_temperature',
             'surface_solar_radiation_downward',
             'surface_net_solar_radiation',
             'surface_pressure',
             'mean_sea_level_pressure',
             'potential_evaporation',
             'evaporation',
             'total_evaporation',
             'total_precipitation',
             'snowfall',
             '10m_u_component_of_wind',
             '10m_v_component_of_wind',

    Arguments
    ----------
    start_date : TYPE
        DESCRIPTION.
    end_date : TYPE
        DESCRIPTION.
    area : TYPE
        DESCRIPTION.
    output_path : TYPE
        DESCRIPTION.

    Returns
    -------
    None.
    """

    # list of first day of each month date into period
    strt_dt = datetime.strptime(start_date, '%Y-%m-%d').replace(day=1)
    end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(day=1)

    periods = [dt for dt in rrule(
        freq=MONTHLY, dtstart=strt_dt, until=end_dt, bymonthday=1)]

    dico = {
        '2m_temperature': ['daily_minimum', 'daily_maximum'],
        '10m_u_component_of_wind': ['daily_mean'],
        '10m_v_component_of_wind': ['daily_mean'],
        'total_precipitation': ['daily_mean'],
        'surface_solar_radiation_downwards': ['daily_mean'],
        '2m_dewpoint_temperature': ['daily_minimum', 'daily_maximum']
    }

    args = []
    
    # loop on variable to upload
    for variable in dico.keys():
        # loop on statistic associated to variable to upload
        for statistic in dico[variable]:
            # loop on year and month
            for dt in periods:
                year = str(dt.year)
                month = '0'+str(dt.month)
                month = month[-2:]
                # Requete ERA5-land
                args.append((year, month, variable, statistic, area, output_path))
                
    # Start pool
    p_map(call_era5land_daily, args, **{"num_cpus": processes})
    
    return None


def filename_to_datetime(filename: str) -> datetime.date:
    """
    filename_to_datetime returns a `datetime.date` object for the date of the given file name.

    ## Arguments
    1. filename: `str `
        name or path of the product

    ## Returns
    1. date: `datetime.date`
        datetime.date object, date of the product
    """

    # Search for a date pattern (yyyy_mm_dd) in the product name or path
    match = re.search('\d{4}_\d{2}', filename)
    format = '%Y_%m'
    datetime_object = datetime.strptime(match[0], format)
    return datetime_object.date()


def concat_monthly_nc_file(list_era5land_monthly_files: List[str], list_variables: List[str], output_path: str) -> List[str]:
    """
    Concatenate monthly netcdf datasets into a single file for each given variable.

    ## Arguments
    1. list_era5land_monthly_files: `List[str]`
        list of daily files per month
    2. list_variables: `List[str]`
        names of the required variables as written in the filename
    3. output_path: `List[str]`
        path to which save the aggregated files

    ## Returns
    1. list_era5land_files: `List[str]`
        the list of paths to the aggregated files
    """
    
    if not os.path.exists(output_path): os.mkdir(output_path)
    
    list_era5land_monthly_files.sort()
    
    list_era5land_files = []
    
    # concatenate all dates into a single file for each variable
    for variable in list_variables:
        curr_var_list = []
        dates = []
        for file in list_era5land_monthly_files:
            # find specific variable
            if fnmatch(file, '*' + variable + '*'):
                curr_var_list.append(file)
                dates.append(filename_to_datetime(file))
        
        curr_datasets = []
        for file in curr_var_list:
            # open all months for the given variable
            curr_datasets.append(xr.open_dataset(file))

        # Create file name
        try:
            concatenated_file = output_path + os.sep + 'era5-land_' + dates[0].strftime('%m-%Y') + '_' + dates[-1].strftime('%m-%Y') + '_' + variable + '.nc'
        except:
            print(variable)
        
        # Concatenate monthly datasets
        concatenated_dataset = xr.concat(curr_datasets, dim = 'time')
        
        # Save datasets
        concatenated_dataset.to_netcdf(path = concatenated_file, mode = 'w',)
        
        # Add filename to output list
        list_era5land_files.append(concatenated_file)
    
    return list_era5land_files


def uz_to_u2(u_z: List[float], h: float) -> List[float]:
    """
    The wind speed measured at heights other than 2 m can be adjusted according
    to the follow equation

    Arguments
    ----------
    u_z : TYPE float array
        measured wind speed z m above the ground surface, ms- 1.
    h : TYPE float
        height of the measurement above the ground surface, m.

    Returns
    -------
    u2 : TYPE float array
        average daily wind speed in meters per second (ms- 1 ) measured at 2 m above the ground.
    """

    u2 = u_z*4.87/(np.log(67.8*h - 5.42))
    return u2


def ea_calc(T: float) -> float:
    """
    comments
    Actual vapour pressure (ea) derived from dewpoint temperature '
    
    Arguments
    ----------
    T : Temperature in degree celsius.

    Returns
    -------
    e_a :the actual Vapour pressure in Kpa
    """

    e_a = 0.6108*np.exp(17.27*T/(T+237.15))
    return e_a


def load_variable(file_name: str) -> xr.Dataset:
    """
    Loads an ERA5 meteorological variable into a xarray
    dataset according to the modspa architecture.

    ## Arguments
    1. file_name: `str`
        netcdf file to load

    ## Returns
    1. variable: `xr.Dataset`
        output xarray dataset
    """
    
    # Rename temperature variables according to the statistic (max or min)
    if fnmatch(file_name, '*era5-land*2m_temperature_daily_maximum*'):  # maximum temperature
        variable = xr.open_dataset(file_name).rename({'t2m': 't2m_max'}).drop_vars('realization')  # netcdfs from ERA5 carry an unecessary 'realization' coordinate, so it is dropped 
        
    elif fnmatch(file_name, '*era5-land*2m_temperature_daily_minimum*'):  # minimum temperature
        variable = xr.open_dataset(file_name).rename({'t2m': 't2m_min'}).drop_vars('realization')
        
    elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_maximum*'):  # maximum dewpoint temperature
        variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_max'}).drop_vars('realization')
        
    elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_minimum*'):  # minimum temperature
        variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_min'}).drop_vars('realization')
        
    # Other variables can be loaded without modification
    else:
        variable = xr.open_dataset(file_name).drop_vars('realization')
    
    return variable


def calculate_ET0_pixel(pixel_dataset: xr.Dataset, lat: float, lon: float, h: float = 10) -> np.ndarray:
    """
    Calculate ET0 over the year for a single pixel of the ERA5 weather dataset.

    ## Arguments
    1. pixel_dataset: `xr.Dataset`
        extracted dataset that contains all information for a single pixel
    2. lat: `float`
        latitudinal coordinate of that pixel
    3. lon: `float`
        longitudinal coordinate of that pixel
    4. h: `float` `default = 10`
        height of ERA5 wind measurement in meters

    ## Returns
    1. ET0_values: `np.ndarray`
        numpy array containing the ET0 values for each day
    """
    
    # Conversion of xarray dataset to dataframe for ET0 calculation
    ET0 = pixel_dataset.d2m_max.to_dataframe().rename(columns = {'d2m_max' : 'Dew_Point_T_max'}) - 273.15  # conversion of temperatures from K to °C

    ET0['Dew_Point_T_min'] = pixel_dataset.d2m_min.to_dataframe()['d2m_min'].values - 273.15  # conversion of temperatures from K to °C
    ET0['T_min'] = pixel_dataset.t2m_min.to_dataframe()['t2m_min'].values - 273.15  # conversion of temperatures from K to °C
    ET0['T_max'] = pixel_dataset.t2m_max.to_dataframe()['t2m_max'].values - 273.15  # conversion of temperatures from K to °C
    
    ET0['Rain'] = pixel_dataset.tp.to_dataframe()['tp'].values*1000  # conversion of total precipitation from meters to milimeters
    
    # Conversion of easward and northward wind values to scalar wind
    ET0['U_z'] =  np.sqrt(pixel_dataset.u10.to_dataframe()['u10'].values**2 + pixel_dataset.v10.to_dataframe()['v10'].values**2)
    
    ET0['RH_max'] =  100 * ea_calc(ET0['Dew_Point_T_min']) / ea_calc(ET0['T_min'])  # calculation of relative humidity from dew point temperature and temperature
    ET0['RH_min'] =  100 * ea_calc(ET0['Dew_Point_T_max']) / ea_calc(ET0['T_max'])  # calculation of relative humidity from dew point temperature and temperature
    
    ET0['R_s'] = pixel_dataset.ssrd.to_dataframe()['ssrd'].values/1e6  # to convert downward total radiation from J/m² to MJ/m²

    ET0.drop(columns = ['Dew_Point_T_max', 'Dew_Point_T_min'], inplace = True)  # drop unecessary columns
    
    # Start ET0 calculation
    eto_calc = eto.ETo()
    warnings.filterwarnings('ignore')  # remove pandas warning

    # ET0 calculation for given pixel (lat, lon) values
    eto_calc.param_est(ET0,
                        freq = 'D',  # daily frequence
                        # Elevation of the met station above mean sea level (m) (only needed if P is not in df).
                        z_msl = 0.,
                        lat = lat,
                        lon = lon,
                        TZ_lon = None,
                        z_u = h)  # h: height of raw wind speed measurement

    # Retrieve ET0 values
    ET0_values = eto_calc.eto_fao(max_ETo=15, min_ETo=0, interp=True, maxgap=10).values  # ETo_FAO_mm
    
    return ET0_values


def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_nc_file: str, ndvi_path: str, h: float = 10) -> None:
    """
    Calculate ET0 values from the ERA5 netcdf weather variables.
    Output netcdf contains the ET0 values for each day in the selected
    time period and for each ERA5 pixel covering the required area.

    ## Arguments
    1. list_era5land_files: `List[str]`
        list of netcdf files containing the necessary variables
    2. output_nc_file: `str`
        output netcdf file to save
    3. h: `float` `default = 10`
        height of ERA5 wind measurements in meters

    ## Returns
    `None`
    """
    
    # Load all monthly files into a single xarray dataset that contains all dates (daily frequency)
    raw_weather_ds = None
    for file in list_era5land_files:
        if not raw_weather_ds:
            raw_weather_ds = load_variable(file)
        else:
            temp = load_variable(file)
            raw_weather_ds = xr.merge([temp, raw_weather_ds])
    del temp
    
    # Create ET0 variable (that will be saved) and set attributes 
    raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = 'float32')))

    # Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"
    for lat in raw_weather_ds.coords['lat'].values:
        for lon in raw_weather_ds.coords['lon'].values:
            # Select whole time period for given (lat, lon) values
            select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon'])

            # Calculate ET0 values for given pixel
            ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h)
            
            # Write ET0 values in xarray Dataset
            raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values
    
    # Get necessary data for final dataset and rewrite netcdf attributes
    final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min', 'd2m_max', 'd2m_min'])  # remove unwanted variables
    final_weather_ds['tp'] = final_weather_ds['tp'] * 1000  # conversion from m to mm
    
    # Change datatype to reduce memory usage
    final_weather_ds = (final_weather_ds * 1000).astype('u2')  
    
    # Set variable attributes 
    final_weather_ds['ET0'].attrs['units'] = 'mm'
    final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
    final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method'
    final_weather_ds['ET0'].attrs['scale factor'] = '1000'
    final_weather_ds['tp'].attrs['units'] = 'mm'
    final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
    final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters'
    final_weather_ds['tp'].attrs['scale factor'] = '1000'

    # Save dataset to netcdf, still in wgs84 (lat, lon) coordinates
    final_weather_ds.to_netcdf(path = output_nc_file, encoding = {"ET0": {"dtype": "u2"}, "tp": {"dtype": "u2"}})
    return None