Skip to content
Snippets Groups Projects
lib_era5_land_pixel.py 42.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • """
    Functions to call ECMWF Reanalysis with CDS-api
    
    - ERA5-land daily request
    
    - request a list of daily variables dedicated to the calculus of ET0
    
     and the generation of MODSPA daily forcing files
    
     heavily modified from @rivallandv's original file
    
    import os  # for path exploration and file management
    
    from typing import List, Tuple  # to declare variables
    
    import numpy as np  # for math on arrays
    import xarray as xr  # to manage nc files
    
    from datetime import datetime  # to manage dates
    from p_tqdm import p_map  # for multiprocessing with progress bars
    from dateutil.rrule import rrule, MONTHLY
    from fnmatch import fnmatch  # for file name matching
    
    import pandas as pd  # to manage dataframes
    
    import rasterio  as rio  # to manage geotiff images
    
    import geopandas as gpd  # to manage shapefile crs projections
    from rasterio.mask import mask  # to mask images
    from shapely.geometry import box  # to extract parcel statistics
    
    from rasterio.enums import Resampling  # reprojection algorithms
    
    import netCDF4 as nc  # to write netcdf4 files
    from tqdm import tqdm  # to follow progress
    
    from multiprocessing import Pool  # to parallelize reprojection
    
    from psutil import virtual_memory  # to check available ram
    
    from psutil import cpu_count  # to get number of physical cores available
    
    from modspa_pixel.config.config import config  # to import config file
    
    from modspa_pixel.source.modspa_samir import calculate_time_slices_to_load  # to optimise I/O operations
    
    import re  # for string comparison
    import warnings  # to suppress pandas warning
    
    # CDS API external library
    # source: https://pypi.org/project/cdsapi/
    import cdsapi  # to download cds data
    import requests  # to request data
    
    # FAO ET0 calculator external library
    # Notes
    # source: https://github.com/Evapotranspiration/ETo
    # documentation: https://eto.readthedocs.io/en/latest/
    import eto  # to calculate ET0
    
    
    
    def era5_enclosing_shp_aera(area: List[float], pas: float) -> Tuple[float, float, float, float]:
    
        Find the four coordinates including the boxbound scene
        to agree with gridsize resolution
        system projection: WGS84 lat/lon degree
       
        Arguments
        =========
           
        1. area: ``List[float]``
            bounding box of the demanded area
            list of floats: [lat north, lon west, lat south, lon east] in degree WGS84
        2. pas: ``float``
            gridsize
            
        Returns
        =======
        
        1. era5_area: ``Tuple[float, float, float, float]``
            coordinates list corresponding to N,W,S,E corners of the grid in decimal degree
            
        .. note:: 
        
    
            gdal coordinates reference upper left corner of pixel, ERA5 coordinates refere to center of grid. To resolve this difference an offset of pas/2 is applied
    
        lat_max, lon_min, lat_min, lon_max = area
    
        # North
    
        era5_lat_max = round((lat_max//pas+2)*pas, 2)
    
        # West
        era5_lon_min = round((lon_min//pas)*pas, 2)
        # South
        era5_lat_min = round((lat_min//pas)*pas, 2)
        # Est
    
        era5_lon_max = round((lon_max//pas+2)*pas, 2)
    
    
        era5_area = era5_lat_max, era5_lon_min, era5_lat_min, era5_lon_max
    
        return era5_area  # [N,W,S,E]
    
    
    
    def call_era5land_daily(args: Tuple[str, str, str, str, List[int], str]) -> None:
    
        Query of one month of daily ERA5-land data of a selected variable
    
        according to a selected statistic
    
        Documentation:
    
        `cds_climate <https://datastore.copernicus-climate.eu/documents/app-c3s-daily-era5-statistics/C3S_Application-Documentation_ERA5-daily-statistics-v2.pdf>`_
    
        =========
        
        (packed in args: ``tuple``)
        
        1. year: ``str``
    
        3. variable: ``str``
    
            user-selectable variable
            cf. Appendix A Table 3 for list of input variables availables.
    
        4. statistic: ``str``
    
            daily statistic choosed, 3 possibility
            daily_mean or daily_minimum or daily_maximum.
    
        5. area: ``List[int]``
            bounding box of the demanded area
    
            area = [lat_max, lon_min, lat_min, lon_max]
    
        6. output_path: ``str``
    
        """
        year, month, variable, statistic, area, output_path = args
        
        # set name of output file for each month (statistic, variable, year, month)
        output_filename = \
            output_path+os.sep +\
            "ERA5-land_"+year+"_"+month+"_"+variable+"_"+statistic+".nc"
    
        if os.path.isfile(output_filename):
            print(output_filename, ' already exist')
        else:
            try:
    
                c = cdsapi.Client(timeout=300)
    
                result = c.service("tool.toolbox.orchestrator.workflow",
                                   params={
                                       "realm": "c3s",
                                       "project": "app-c3s-daily-era5-statistics",
                                       "version": "master",
                                       "kwargs": {
                                           "dataset": "reanalysis-era5-land",
                                           "product_type": "reanalysis",
                                           "variable": variable,
                                           "statistic": statistic,
                                           "year": year,
                                           "month": month,
                                           "time_zone": "UTC+00:0",
                                           "frequency": "1-hourly",
                                           "grid": "0.1/0.1",
                                           "area": {"lat": [area[2], area[0]],
                                                    "lon": [area[1], area[3]]}
                                       },
                                       "workflow_name": "application"
                                   })
    
                location = result[0]['location']
                res = requests.get(location, stream=True)
                print("Writing data to " + output_filename)
                with open(output_filename, 'wb') as fh:
                    for r in res.iter_content(chunk_size=1024):
                        fh.write(r)
                fh.close()
            except:
                print('!! request', variable, '  failed !! -> year', year, 'month', month)
    
        return None
    
    
    def call_era5land_daily_for_MODSPA(start_date: str, end_date: str, area: List[float], output_path: str, processes: int = 9) -> None:
        """
        request ERA5-land daily variables needed for ET0 calculus and MODSPA forcing
    
        `reanalysis_era5  <https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview>`_
    
        Information on requested variables
        ----------------------------------
        
        called land surface variables :
            * **2m_temperature**
            * **surface_solar_radiation_downward**
            * **total_precipitation**
            * **10m_u_component_of_wind**
            * **10m_v_component_of_wind**
    
        =========
        
        1. start_date: ``str``
            start date in YYYY-MM-DD format
        2. end_date: ``str``
            end date in YYYY-MM-DD format
        3. area: ``List[float]``
            bounding box of the demanded area
            area = [lat_max, lon_min, lat_min, lon_max]
        4. output_path: ``str``
            output file name, ``.nc`` extension
        5. processes: ``int`` ``default = 9``
            number of logical processors on which to run the download command.
            can be higher than your actual number of processor cores,
            download operations have a low CPU demand.
    
        """
    
        # list of first day of each month date into period
        strt_dt = datetime.strptime(start_date, '%Y-%m-%d').replace(day=1)
        end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(day=1)
    
        periods = [dt for dt in rrule(
            freq=MONTHLY, dtstart=strt_dt, until=end_dt, bymonthday=1)]
    
        dico = {
            '2m_temperature': ['daily_minimum', 'daily_maximum'],
            '10m_u_component_of_wind': ['daily_mean'],
            '10m_v_component_of_wind': ['daily_mean'],
    
            'total_precipitation': ['daily_maximum'],
            'surface_solar_radiation_downwards': ['daily_maximum']
    
        args = []
        
        # loop on variable to upload
        for variable in dico.keys():
            # loop on statistic associated to variable to upload
            for statistic in dico[variable]:
                # loop on year and month
                for dt in periods:
                    year = str(dt.year)
                    month = '0'+str(dt.month)
                    month = month[-2:]
                    # Requete ERA5-land
                    args.append((year, month, variable, statistic, area, output_path))
                    
        # Start pool
        p_map(call_era5land_daily, args, **{"num_cpus": processes})
        
        return None
    
    
    def filename_to_datetime(filename: str) -> datetime.date:
        """
    
        filename_to_datetime returns a ``datetime.date`` object for the date of the given file name.
    
        Arguments
        =========
    
        1. filename: ``str``
    
        Returns
        =======
    
        1. date: ``datetime.date``
    
            datetime.date object, date of the product
        """
    
        # Search for a date pattern (yyyy_mm_dd) in the product name or path
        match = re.search('\d{4}_\d{2}', filename)
        format = '%Y_%m'
        datetime_object = datetime.strptime(match[0], format)
        return datetime_object.date()
    
    
    def concat_monthly_nc_file(list_era5land_monthly_files: List[str], list_variables: List[str], output_path: str) -> List[str]:
        """
        Concatenate monthly netcdf datasets into a single file for each given variable.
    
    
        Arguments
        =========
    
        1. list_era5land_monthly_files: ``List[str]``
    
        2. list_variables: ``List[str]``
    
            names of the required variables as written in the filename
    
        3. output_path: ``List[str]``
    
        Returns
        =======
    
        1. list_era5land_files: ``List[str]``
    
            the list of paths to the aggregated files
        """
        
        if not os.path.exists(output_path): os.mkdir(output_path)
        
        list_era5land_monthly_files.sort()
        
        list_era5land_files = []
        
        # concatenate all dates into a single file for each variable
        for variable in list_variables:
            curr_var_list = []
            dates = []
            for file in list_era5land_monthly_files:
                # find specific variable
                if fnmatch(file, '*' + variable + '*'):
                    curr_var_list.append(file)
                    dates.append(filename_to_datetime(file))
            
            curr_datasets = []
            for file in curr_var_list:
                # open all months for the given variable
                curr_datasets.append(xr.open_dataset(file))
    
            # Create file name
            try:
                concatenated_file = output_path + os.sep + 'era5-land_' + dates[0].strftime('%m-%Y') + '_' + dates[-1].strftime('%m-%Y') + '_' + variable + '.nc'
            except:
                print(variable)
            
            # Concatenate monthly datasets
            concatenated_dataset = xr.concat(curr_datasets, dim = 'time')
            
            # Save datasets
            concatenated_dataset.to_netcdf(path = concatenated_file, mode = 'w',)
            
            # Add filename to output list
            list_era5land_files.append(concatenated_file)
        
        return list_era5land_files
    
    
    def uz_to_u2(u_z: List[float], h: float) -> List[float]:
        """
        The wind speed measured at heights other than 2 m can be adjusted according
        to the follow equation
    
        Arguments
        ----------
        u_z : TYPE float array
            measured wind speed z m above the ground surface, ms- 1.
        h : TYPE float
            height of the measurement above the ground surface, m.
    
        Returns
        -------
        u2 : TYPE float array
            average daily wind speed in meters per second (ms- 1 ) measured at 2 m above the ground.
        """
    
        u2 = u_z*4.87/(np.log(67.8*h - 5.42))
        return u2
    
    
    def ea_calc(T: float) -> float:
        """
        comments
        Actual vapour pressure (ea) derived from dewpoint temperature '
        
        Arguments
        ----------
        T : Temperature in degree celsius.
    
        Returns
        -------
        e_a :the actual Vapour pressure in Kpa
        """
    
        e_a = 0.6108*np.exp(17.27*T/(T+237.15))
        return e_a
    
    
    def load_variable(file_name: str) -> xr.Dataset:
        """
        Loads an ERA5 meteorological variable into a xarray
        dataset according to the modspa architecture.
    
    
        Arguments
        =========
    
        1. file_name: ``str``
    
        Returns
        =======
    
        1. variable: ``xr.Dataset``
    
            output xarray dataset
        """
        
        # Rename temperature variables according to the statistic (max or min)
        if fnmatch(file_name, '*era5-land*2m_temperature_daily_maximum*'):  # maximum temperature
            variable = xr.open_dataset(file_name).rename({'t2m': 't2m_max'}).drop_vars('realization')  # netcdfs from ERA5 carry an unecessary 'realization' coordinate, so it is dropped 
            
        elif fnmatch(file_name, '*era5-land*2m_temperature_daily_minimum*'):  # minimum temperature
            variable = xr.open_dataset(file_name).rename({'t2m': 't2m_min'}).drop_vars('realization')
            
        elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_maximum*'):  # maximum dewpoint temperature
            variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_max'}).drop_vars('realization')
            
        elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_minimum*'):  # minimum temperature
            variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_min'}).drop_vars('realization')
            
        # Other variables can be loaded without modification
        else:
    
            try:
                variable = xr.open_dataset(file_name).drop_vars('realization')
                
            except:
                variable = xr.open_dataset(file_name)
    
    def combine_weather2netcdf(rain_file: str, ET0_tile: str, ndvi_path: str, save_path: str, available_ram: int) -> None:
    
        Convert the Rain and ET0 geotiffs into a single weather netcdf dataset.
    
    
        Arguments
        =========
    
        1. rain_file: ``str``
    
        2. ET0_tile: ``str``
    
        3. ndvi_path: ``str``
    
        4. save_path: ``str``
    
        5. available_ram: ``int``
    
        
        # Open tif files
        rain_tif = rio.open(rain_file)
        ET0_tif = rio.open(ET0_tile)
        
        # Open ndvi netcdf to get structure
        ndvi = xr.open_dataset(ndvi_path)
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        dates = ndvi.time
        
        # Get empty dimensions
        dimensions = ndvi.drop_sel(time = ndvi.time).dims  # create dataset with a time dimension of length 0
    
        weather = ndvi.drop_vars(['NDVI']).copy(deep = True)
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        weather = weather.drop_sel(time = weather.time)
    
        # Set dataset attributes
        weather.attrs['name'] = 'ModSpa Pixel weather dataset'
        weather.attrs['description'] = 'Weather variables (Rain and ET0) for the ModSpa SAMIR (FAO-56) model at the pixel scale. Variables are scaled to be stored as integers.'
    
        weather.attrs['scaling'] = "{'Rain': 1000, 'ET0': 1000}"
    
        
        # Set variable attributes
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        weather['Rain'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    
        weather['Rain'].attrs['units'] = 'mm'
        weather['Rain'].attrs['standard_name'] = 'total_precipitation'
        weather['Rain'].attrs['description'] = 'Accumulated daily precipitation in mm'
        weather['Rain'].attrs['scale factor'] = '1000'
    
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        weather['ET0'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    
        weather['ET0'].attrs['standard_name'] = 'evapotranspiration'
    
        weather['ET0'].attrs['description'] = 'Accumulated daily reference evapotranspiration in mm'
        weather['ET0'].attrs['scale factor'] = '1000'
        
        # Create encoding dictionnary
        for variable in list(weather.keys()):
            # Write encoding dict
            encoding_dict = {}
            encod = {}
            encod['dtype'] = 'u2'
    
            # TODO: figure out optimal file chunk size
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
            file_chunksize = (1, dimensions['y'], dimensions['x'])
    
            # TODO: check if compression affects reading speed
    
            encoding_dict[variable] = encod
    
        # Save empty output
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        print('\nWriting empty weather dataset')
        weather.to_netcdf(save_path, encoding = encoding_dict, unlimited_dims = 'time')
    
        weather.close()
    
        # Get geotiff dimensions (time, x, y)
    
        dims = (rain_tif.count, rain_tif.height, rain_tif.width)
    
        
        # Determine the memory requirement of operation
    
        nb_vars = 1  # one variable written at a time
    
        memory_requirement = ((dims[0] * dims[1] * dims[2]) * nb_vars * nb_bytes) / (1024**3)  # in GiB
    
        
        # Get the number of time bands that can be loaded at once
    
        time_slice, remainder, already_written = calculate_time_slices_to_load(dims[2], dims[1], dims[0], nb_vars, 0, 0, 0, nb_bytes, available_ram)
    
        print('\nApproximate memory requirement of conversion:', round(memory_requirement, 3), 'GiB\nAvailable memory:', available_ram, 'GiB\n\nLoading blocks of', time_slice, 'time bands.\n')
    
        
        # Open empty dataset
        weather = nc.Dataset(save_path, mode = 'r+')
        
        # Create progress bar
        progress_bar = tqdm(total = dims[0], desc='Writing weather data', unit=' bands')
    
    
        # Data variables
    
            if time_slice == dims[0] and not already_written:  # if whole dataset fits in memory and it has not already been loaded
    
                
                weather.variables['Rain'][:,:,:] = rain_tif.read()
                weather.variables['ET0'][:,:,:] = ET0_tif.read()
    
            elif i % time_slice == 0 and not already_written:  # load a time slice every time i is divisible by the size of the time slice
    
                if i + time_slice <= dims[0]:  # if the time slice does not gow over the dataset size
                    
                    weather.variables['Rain'][i: i + time_slice, :, :] = rain_tif.read(tuple(k+1 for k in range(i, i + time_slice)))
                    weather.variables['ET0'][i: i + time_slice, :, :] = ET0_tif.read(tuple(k+1 for k in range(i, i + time_slice)))
                
                else:  # load the remainder when the time slice would go over the dataset size
                    
                    weather.variables['Rain'][i: i + remainder, :, :] = rain_tif.read(tuple(k+1 for k in range(i, i + remainder)))
                    weather.variables['ET0'][i: i + remainder, :, :] = ET0_tif.read(tuple(k+1 for k in range(i, i + remainder)))
            
            progress_bar.update()
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
    
        # Write dates in weather dataset
        weather.variables['time'].units = f'days since {np.datetime_as_string(dates[0], unit = "D")} 00:00:00'  # set correct unit
        weather.variables['time'][:] = np.arange(0, len(dates))  # save dates as integers representing the number of days since the first day
        weather.sync() # flush data to disk
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        # Close progress bar
    
    Jeremy Auclair's avatar
    Jeremy Auclair committed
        # Close datasets
    
        rain_tif.close()
        ET0_tif.close()
        weather.close()
    
    def calculate_ET0_pixel(pixel_dataset: xr.Dataset, lat: float, lon: float, h: float = 10, safran: bool = False) -> np.ndarray:
    
        """
        Calculate ET0 over the year for a single pixel of the ERA5 weather dataset.
    
    
        Arguments
        =========
    
        1. pixel_dataset: ``xr.Dataset``
    
            extracted dataset that contains all information for a single pixel
    
            longitudinal coordinate of that pixel
    
        4. h: ``float`` ``default = 10``
    
            height of ERA5 wind measurement in meters
    
        5. safran: ``bool`` ``default = False``
            boolean to adapt to a custom SAFRAN weather dataset
    
        Returns
        =======
    
        1. ET0_values: ``np.ndarray``
    
            numpy array containing the ET0 values for each day
        """
        
    
        # Conversion of xarray dataset to dataframe for ET0 calculation
    
        # Conversion of temparature
        ET0 = pixel_dataset.t2m_min.to_dataframe().rename(columns = {'t2m_min' : 'T_min'}) - 273.15  # conversion of temperatures from K to °C
    
        ET0['T_max'] = pixel_dataset.t2m_max.to_dataframe()['t2m_max'].values - 273.15  # conversion of temperatures from K to °C
        
    
        ET0['R_s'] = pixel_dataset.ssrd.to_dataframe()['ssrd'].values / 1e6  # to convert downward total radiation from J/m² to MJ/m²
    
        
        if safran:
            # Add relative humidity
            ET0['RH_max'] = pixel_dataset.RH_max.to_dataframe()['RH_max'].values
            ET0['RH_min'] = pixel_dataset.RH_max.to_dataframe()['RH_min'].values
            
            # Add wind
            ET0['U_z'] = pixel_dataset.U_z.to_dataframe()['U_z'].values
        
        else:
            # Conversion of eastward and northward wind values to scalar wind
            ET0['U_z'] =  np.sqrt(pixel_dataset.u10.to_dataframe()['u10'].values**2 + pixel_dataset.v10.to_dataframe()['v10'].values**2)
    
        # Start ET0 calculation
        eto_calc = eto.ETo()
        warnings.filterwarnings('ignore')  # remove pandas warning
    
        # ET0 calculation for given pixel (lat, lon) values
        eto_calc.param_est(ET0,
                            freq = 'D',  # daily frequence
                            # Elevation of the met station above mean sea level (m) (only needed if P is not in df).
                            z_msl = 0.,
                            lat = lat,
                            lon = lon,
                            TZ_lon = None,
                            z_u = h)  # h: height of raw wind speed measurement
    
        # Retrieve ET0 values
    
        ET0_values = eto_calc.eto_fao(max_ETo = 15, min_ETo = 0, interp = True, maxgap = 10).values  # ETo_FAO_mm
    
    def convert_interleave_mode(args: Tuple[str, str, bool]) -> None:
        """
        Convert Geotiff files obtained from OTB to Band interleave mode for faster band reading.
    
    
        Arguments
        =========
        
        (packed in args: ``tuple``)
        
        1. input_image: ``str``
    
        2. output_image: ``str``
    
        3. remove: ``bool`` ``default = True``
    
        """
        
        input_image, output_image, remove = args
        
        # Open the input file in read mode
        with rio.open(input_image, "r") as src:
    
            # Open the output file in write mode
            with rio.open(output_image, 'w', driver = src.driver, height = src.height, width = src.width, count = src.count, dtype = src.dtypes[0], crs = src.crs, transform = src.transform, interleave = 'BAND',) as dst:
    
                # Loop over the blocks or windows of the input file
                for _, window in src.block_windows(1):
    
                    # Write the data to the output file
                    dst.write(src.read(window = window), window = window)
        
        # Remove unecessary image
        if remove:
            os.remove(input_image)
        
        return None
    
    
    
    def era5Land_daily_to_yearly_pixel(list_era5land_files: List[str], output_file: str, raw_S2_image_ref: str, ndvi_path: str, start_date: str, end_date: str, h: float = 10, max_ram: int = 8, use_OTB: bool = False, weather_overwrite: bool = False, safran: bool = False) -> str:
    
        """
        Calculate ET0 values from the ERA5 netcdf weather variables.
    
        Output netcdf contains the ET0 and precipitation values for
    
        each day in the selected time period and reprojected on the
        same grid as the NDVI values.
    
        1. list_era5land_files: ``List[str]``
    
            list of netcdf files containing the necessary variables
    
        2. output_file: ``str``
    
            output file name without extension
    
        3. raw_S2_image_ref: ``str``
    
            raw Sentinel 2 image at right resolution for reprojection
    
        4. ndvi_path: ``str``
    
            path to ndvi dataset, used for attributes and coordinates
    
        5. start_date: ``str``
            beginning of the time window to download (format: ``YYYY-MM-DD``)
        6. end_date: ``str``
            end of the time window to download (format: ``YYYY-MM-DD``)
        7. h: ``float`` ``default = 10``
    
            height of ERA5 wind measurements in meters
    
            max ram (in GiB) for reprojection and conversion. Two
            subprocesses are spawned for OTB, each receiviving 
            half of requested memory.
    
        9. use_OTB: ``bool`` ``default = False``
            boolean to choose to use OTB or not, tests will be added later
        10. weather_overwrite: ``bool`` ``default = False``
    
            boolean to choose to overwrite weather netCDF
    
            boolean to adapt to a custom SAFRAN weather dataset
    
        1. output_file_final: ``str``
            path to ``netCDF4`` file containing precipitation and ET0 data
    
        # Test if file exists
        if os.path.exists(output_file + '.nc') and not weather_overwrite:
            return output_file + '.nc'
        
    
        # Test if memory requirement is not loo large
        if np.ceil(virtual_memory().available / (1024**3)) < max_ram:
            print('\nRequested', max_ram, 'GiB of memory when available memory is approximately', round(virtual_memory().available / (1024**3), 1), 'GiB.\n\nExiting script.\n')
            return None
        
    
        # Load all monthly files into a single xarray dataset that contains all dates (daily frequency)
        raw_weather_ds = None
        for file in list_era5land_files:
            if not raw_weather_ds:
                raw_weather_ds = load_variable(file)
            else:
                temp = load_variable(file)
                raw_weather_ds = xr.merge([temp, raw_weather_ds])
        del temp
        
    
        # Clip extra dates
        raw_weather_ds = raw_weather_ds.sel({'time': slice(start_date, end_date)}).sortby(variables = 'time')
        
    
        # Create ET0 variable (that will be saved) and set attributes 
    
        raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = np.float32)))
    
        if safran:
            # Loop on y and x coordinates to calculate ET0 per "pixel"
            # Fast enough for small datasets (low resolution)
            for y in raw_weather_ds.coords['y'].values:
                for x in raw_weather_ds.coords['x'].values:
                    # Select whole time period for given (lat, lon) values
                    select_ds = raw_weather_ds.sel({'y' : era5Land_daily_to_yearly_parcel(), 'x' : x}).drop_vars(['y', 'x'])
    
                    # TODO: adapt for SAFRAN
                    # Calculate ET0 values for given pixel
                    ET0_values = calculate_ET0_pixel(select_ds, y, x, h)
                    
                    # Write ET0 values in xarray Dataset
                    raw_weather_ds['ET0'].loc[{'y' : y, 'x' : x}] = ET0_values
            
            # Get necessary data for final dataset
            final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 't2m_max', 't2m_min', 'RH_max', 'RH_min', 'U_z'])  # remove unwanted variables
            
        else:
            # Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"
            # Fast enough for small datasets (low resolution)
            for lat in raw_weather_ds.coords['lat'].values:
                for lon in raw_weather_ds.coords['lon'].values:
                    # Select whole time period for given (lat, lon) values
                    select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon'])
    
                    # Calculate ET0 values for given pixel
                    ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h)
                    
                    # Write ET0 values in xarray Dataset
                    raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values
            
            # Get necessary data for final dataset
            final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min'])  # remove unwanted variables
            
        # Scale data and rewrite netcdf attributes
    
        final_weather_ds['tp'] = final_weather_ds['tp'] * 1000  # conversion from m to mm
        
        # Change datatype to reduce memory usage
    
        final_weather_ds['tp'] = (final_weather_ds['tp']  * 1000).astype('u2').chunk(chunks = {"time": 1})
        final_weather_ds['ET0'] = (final_weather_ds['ET0']  * 1000).astype('u2').chunk(chunks = {"time": 1})
    
        # Write projection
    
        final_weather_ds.rio.write_crs('EPSG:4326', inplace = True)
    
        
        # Set variable attributes 
        final_weather_ds['ET0'].attrs['units'] = 'mm'
        final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
    
        final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method (scale factor = 1000)'
    
    
        final_weather_ds['tp'].attrs['units'] = 'mm'
        final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
    
        final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters (scale factor = 1000)'
    
        # TODO: find how to test OTB installation from python
        if use_OTB:
            # Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
            output_file_rain = output_file + '_rain.tif'
            output_file_ET0 = output_file + '_ET0.tif'
            final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16')
            final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
            
            # Reprojected image paths
            output_file_rain_reproj = output_file + '_rain_reproj.tif'
            output_file_ET0_reproj = output_file + '_ET0_reproj.tif'
            
            # Converted image paths
            output_file_final = output_file + '.nc'
            
            # otbcli_SuperImpose commands
            OTB_command_reproj1 = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_rain + ' -out ' + output_file_rain_reproj + ' uint16 -interpolator linear -ram ' + str(int(max_ram * 1024/2))
            OTB_command_reproj2 = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_ET0 + ' -out ' + output_file_ET0_reproj + ' uint16 -interpolator linear -ram ' + str(int(max_ram * 1024/2))
            commands_reproj = [OTB_command_reproj1, OTB_command_reproj2]
            
            with Pool(2) as p:
                p.map(os.system, commands_reproj)
            
            # Combine to netCDF file
            combine_weather2netcdf(output_file_rain_reproj, output_file_ET0_reproj, ndvi_path, output_file_final, available_ram = max_ram)
                
            # remove old files and rename outputs
            os.remove(output_file_rain)
            os.remove(output_file_ET0)
            os.remove(output_file_rain_reproj)
            os.remove(output_file_ET0_reproj)
    
        else:
            # Set dataset attributes
            final_weather_ds.attrs['name'] = 'ModSpa Pixel weather dataset'
            final_weather_ds.attrs['description'] = 'Weather variables (Rain and ET0) for the ModSpa SAMIR (FAO-56) model at the pixel scale. Variables are scaled to be stored as integers.'
            final_weather_ds.attrs['scaling'] = "{'Rain': 1000, 'ET0': 1000}"
            
            # Set file names
            output_file_final = output_file + '.nc'
            
            # Open reference image
            ref = rioxarray.open_rasterio(raw_S2_image_ref)
            
    
            # Get metadata
            target_crs = ref.rio.crs
            spatial_ref = ref.spatial_ref.load()
            
            # Define ressources
            mem_limit = min([int(np.ceil(len(ref.x) * len(ref.y) * len(final_weather_ds.time) * len(final_weather_ds.data_vars) * np.dtype(np.float32).itemsize / (1024 ** 2)) * 1.1), 0.8 * virtual_memory().available / (1024**2), max_ram * 1024])
            nb_threads = min([cpu_count(logical = True), len(os.sched_getaffinity(0))])
            
            # Reproject
            final_weather_ds = final_weather_ds.rio.reproject(target_crs, transform = ref.rio.transform(), shape = (ref.rio.height, ref.rio.width), resampling = Resampling.bilinear, num_threads = nb_threads, warp_mem_limit = mem_limit)
    
            # Rename
            final_weather_ds = final_weather_ds.rename({'tp': 'Rain'})
            
            # Create encoding dictionnary
            for variable in list(final_weather_ds.keys()):
                # Write encoding dict
                encod = {}
                encod['dtype'] = 'u2'
                if '_FillValue' in final_weather_ds[variable].attrs:
                    del final_weather_ds[variable].attrs['_FillValue']
                encod['_FillValue'] = 0
                # TODO: figure out optimal file chunk size
                file_chunksize = (1, final_weather_ds.dims['y'], final_weather_ds.dims['x'])
                encod['chunksizes'] = file_chunksize
                # TODO: check if compression affects reading speed
                encod['zlib'] = True
                encod['complevel'] = 1
                final_weather_ds[variable].encoding.update(encod)
                
    
            # Rewrite georeferencing
            final_weather_ds.rio.write_crs(target_crs, inplace = True)
    
            final_weather_ds['spatial_ref'] = spatial_ref
    
            final_weather_ds.attrs['crs'] = final_weather_ds.rio.crs.to_string()
    
            final_weather_ds = final_weather_ds.set_coords('spatial_ref')
    
            # Save empty output
            print('\nReprojecting weather dataset')
            final_weather_ds.to_netcdf(output_file_final)
            final_weather_ds.close()
    
    def era5Land_daily_to_yearly_parcel(list_era5land_files: List[str], output_file: str, start_date: str, end_date: str, h: float = 10) -> str:
    
        """
        Calculate ET0 values from the ERA5 netcdf weather variables.
        Output netcdf contains the ET0 and precipitation values for
        each day in the selected time period.
    
        Arguments
        =========
    
        1. list_era5land_files: ``List[str]``
            list of netcdf files containing the necessary variables
        2. output_file: ``str``
            output file name without extension
    
        3. start_date: ``str``
            beginning of the time window to download (format: ``YYYY-MM-DD``)
        4. end_date: ``str``
            end of the time window to download (format: ``YYYY-MM-DD``)
        5. h: ``float`` ``default = 10``
    
            height of ERA5 wind measurements in meters
    
        Returns
        =======
    
        1. output_file_rain: ``str``
            path to ``Geotiff`` file containing precipitation data
        2. output_file_ET0: ``str``
            path to ``Geotiff`` file containing ET0 data
        """
        
        # Load all monthly files into a single xarray dataset that contains all dates (daily frequency)
        raw_weather_ds = None
        for file in list_era5land_files:
            if not raw_weather_ds:
                raw_weather_ds = load_variable(file)
            else:
                temp = load_variable(file)
                raw_weather_ds = xr.merge([temp, raw_weather_ds])
        del temp
        
    
        # Clip extra dates
        raw_weather_ds = raw_weather_ds.sel({'time': slice(start_date, end_date)})
        
    
        # Create ET0 variable (that will be saved) and set attributes 
        raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = 'float64')))
    
        # Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"
        for lat in raw_weather_ds.coords['lat'].values:
            for lon in raw_weather_ds.coords['lon'].values:
                # Select whole time period for given (lat, lon) values
                select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon'])
    
                # Calculate ET0 values for given pixel
                ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h)
                
                # Write ET0 values in xarray Dataset
                raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values
        
        # Get necessary data for final dataset and rewrite netcdf attributes
    
        final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min'])  # remove unwanted variables
    
        final_weather_ds['tp'] = final_weather_ds['tp'] * 1000  # conversion from m to mm
        
        # Change datatype to reduce memory usage
    
        final_weather_ds['tp'] = (final_weather_ds['tp']  * 1000).astype('u2').chunk(chunks = {"time": 1})
        final_weather_ds['ET0'] = (final_weather_ds['ET0']  * 1000).astype('u2').chunk(chunks = {"time": 1})
    
        
        # Write projection
        final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326')
        
        # Set variable attributes 
        final_weather_ds['ET0'].attrs['units'] = 'mm'
        final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
        final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method (scale factor = 1000)'
    
        final_weather_ds['tp'].attrs['units'] = 'mm'
        final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
    
        final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters (scale factor = 1000)'
    
    
        # Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
        output_file_rain = output_file + '_rain.tif'
        output_file_ET0 = output_file + '_ET0.tif'
        final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16')
        final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
    
        return output_file_rain, output_file_ET0
    
    
    
    def extract_rasterstats(args: tuple) -> List[float]:
    
        Generate a dataframe for a given raster and a geopandas shapefile object. 
        It iterates over the features of the shapefile geometry (polygons). This
        information is stored in a list.
    
        It returns a list that contains the raster values, a feature ``id``
    
        and the date for the image and every polygon in the shapefile geometry.
        It also has identification data relative to the shapefile: landcover (``LC``),
        land cover identifier (``id``) This list is returned to be later agregated
        in a ``DataFrame``.
    
        This function is used to allow multiprocessing for weather extraction.
        
        Arguments (packed in args: ``tuple``)
        =====================================
    
    
            list containing weather values and feature information for every
            polygon in the shapefile
        """
        
        # Open arguments packed in args
    
        
        # Open config file
        config_params = config(config_file)
        
        # Create dataframe where zonal statistics will be stored
    
        
        # Get dates
        dates = pd.to_datetime(pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')).values
    
        # Open ndvi image and shapefile geometry