You need to sign in or sign up before continuing.
Newer
Older
# -*- coding: UTF-8 -*-
# Python
Jeremy Auclair
committed
"""
Functions to call ECMWF Reanalysis with CDS-api
- ERA5-land daily request

Jérémy AUCLAIR
committed
- request a list of hourly variables dedicated to the calculus of ET0
and the generation of MODSPA daily forcing files
heavily modified from @rivallandv's original file
Jeremy Auclair
committed
Jeremy Auclair
committed
@author: auclairj
Jeremy Auclair
committed
"""
import os # for path exploration and file management
Jeremy Auclair
committed
import numpy as np # for math on arrays
import xarray as xr # to manage nc files

Jérémy AUCLAIR
committed
import rioxarray # to manage georeferenced images
Jeremy Auclair
committed
from datetime import datetime # to manage dates
from fnmatch import fnmatch # for file name matching
Jeremy Auclair
committed
import pandas as pd # to manage dataframes
import rasterio as rio # to manage geotiff images
Jeremy Auclair
committed
import geopandas as gpd # to manage shapefile crs projections
from rasterio.mask import mask # to mask images

Jérémy AUCLAIR
committed
from rasterio.enums import Resampling # reprojection algorithms
import netCDF4 as nc # to write netcdf4 files
from tqdm import tqdm # to follow progress

Jérémy AUCLAIR
committed
from multiprocessing import Pool, Manager # to parallelize functions
Jeremy Auclair
committed
from psutil import virtual_memory # to check available ram

Jérémy AUCLAIR
committed
from psutil import cpu_count # to get number of physical cores available
Jeremy Auclair
committed
from modspa_pixel.config.config import config # to import config file
Jeremy Auclair
committed
from modspa_pixel.source.modspa_samir import calculate_time_slices_to_load # to optimise I/O operations
Jeremy Auclair
committed
import warnings # to suppress pandas warning
# CDS API external library
# source: https://pypi.org/project/cdsapi/
import cdsapi # to download cds data
# FAO ET0 calculator external library
# Notes
# source: https://github.com/Evapotranspiration/ETo
# documentation: https://eto.readthedocs.io/en/latest/
import eto # to calculate ET0
def era5_enclosing_shp_aera(bbox: list[float], pas: float) -> tuple[float, float, float, float]:
Jeremy Auclair
committed
"""
Find the four coordinates including the boxbound scene
to agree with gridsize resolution
system projection: WGS84 lat/lon degree

Jérémy AUCLAIR
committed
Arguments
=========

Jérémy AUCLAIR
committed
1. area: ``list[float]``
bounding box of the demanded area
list of floats: [lat north, lon west, lat south, lon east] in degree WGS84
2. pas: ``float``
gridsize
Returns
=======
1. era5_area: ``tuple[float, float, float, float]``
coordinates list corresponding to N,W,S,E corners of the grid in decimal degree
.. note::
Jeremy Auclair
committed
gdal coordinates reference upper left corner of pixel, ERA5 coordinates refere to center of grid. To resolve this difference an offset of pas/2 is applied
Jeremy Auclair
committed
"""
lat_max, lon_min, lat_min, lon_max = bbox[3], bbox[0], bbox[1], bbox[2]
Jeremy Auclair
committed
# North
era5_lat_max = round((lat_max // pas + 1) * pas, 2)
Jeremy Auclair
committed
# West
era5_lon_min = round((lon_min // pas) * pas, 2)
Jeremy Auclair
committed
# South
era5_lat_min = round((lat_min // pas) * pas, 2)
Jeremy Auclair
committed
# Est
era5_lon_max = round((lon_max // pas + 1) * pas, 2)
Jeremy Auclair
committed
era5_area = [era5_lat_max, era5_lon_min, era5_lat_min, era5_lon_max]
Jeremy Auclair
committed
return era5_area # [N,W,S,E]
def split_dates_by_year(start_date: str, end_date: str) -> list[tuple[str, str]] | list:
Jeremy Auclair
committed
"""
Given a start and end date, returns tuples of start and end dates IN THE SAME YEAR.
Jeremy Auclair
committed
Arguments

Jérémy AUCLAIR
committed
1. start_date: ``str``
start date in YYYY-MM-DD format
2. end_date: ``str``
end date in YYYY-MM-DD format
Jeremy Auclair
committed
Returns
1. dates: ``list[tuple[str, str]] | list``
output tuples of start and end dates
Jeremy Auclair
committed
"""

Jérémy AUCLAIR
committed
start = datetime.strptime(start_date, '%Y-%m-%d')
end = datetime.strptime(end_date, '%Y-%m-%d')
if start.year == end.year:
return [(start_date, end_date)]
Jeremy Auclair
committed
dates = []
current_start = start
while current_start.year <= end.year:
if current_start.year == end.year:
current_end = end
else:
current_end = datetime(current_start.year, 12, 31)
Jeremy Auclair
committed
dates.append((current_start.strftime('%Y-%m-%d'), current_end.strftime('%Y-%m-%d')))
current_start = datetime(current_start.year + 1, 1, 1)
Jeremy Auclair
committed
return dates
Jeremy Auclair
committed
def call_era5landhourly(args: tuple) -> None:
Jeremy Auclair
committed
"""
Download weather data for the given variable. Arguments are packed in a tuple for multiprocessing.
Arguments
=========
Jeremy Auclair
committed
1. variable: ``str``
name of ER5-Land weather variable
2. output_path: ``str``
output path to download netcdf file
3. start_date: ``str``
start date in YYYY-MM-DD format
(start and end date must be in the same
year to reduce data to download)
4. end_date: ``str``
end date in YYYY-MM-DD format
(start and end date must be in the same
year to reduce data to download)
5. bbox: ``list[float, float, float, float]``
bounding box of area to download data
6. gridsize: ``float`` ``default = 0.1``
gridsize of data to download
Jeremy Auclair
committed
Returns
=======
1. output_filename: ``str``
output file name
"""
Jeremy Auclair
committed
variable, output_path, start_date, end_date, bbox, gridsize = args
Jeremy Auclair
committed
# full path name of the output file
output_filename = os.path.join(output_path, 'ERA5-land_' + variable + '_' + start_date + '_' + end_date + '.nc')
Jeremy Auclair
committed
# Get time periods for download
start_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date = datetime.strptime(end_date, '%Y-%m-%d')
Jeremy Auclair
committed
# Generate time inputs
months = []
current = start_date
while current <= end_date:
month_str = current.strftime('%m')
if month_str not in months:
months.append(month_str)
# Move to the next month
if current.month == 12:
current = current.replace(year=current.year + 1, month=1)
else:
current = current.replace(month=current.month + 1)
Jeremy Auclair
committed
# Generate the list of days
days = [f'{day:02}' for day in range(1, 32)]
Jeremy Auclair
committed
# Generate time
time = [f'{hour:02}:00' for hour in range(0, 24)]
Jeremy Auclair
committed
# Get modified bbox
area = era5_enclosing_shp_aera(bbox, gridsize)
Jeremy Auclair
committed
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# Check if file already exists
if os.path.isfile(output_filename):
print('\n', output_filename, 'already exist !\n')
else:
# cds api request
client = cdsapi.Client(timeout = 300)
try:
client.retrieve('reanalysis-era5-single-levels',
request = {
'product_type': ['reanalysis'],
'variable': [variable],
'year': [start_date.strftime(format = '%Y')],
'month': months,
'day': days,
'time': time,
'data_format': 'netcdf',
'download_format': 'unarchived',
'area': area,
'grid': [gridsize, gridsize],
},
target = output_filename)
print('\n', output_filename, ' downloaded !\n')
except Exception as e:
print('\nRequest failed, error message:\n\n', e, '\n')
return output_filename
def uz_to_u2(u_z: list[float], h: float) -> list[float]:
Jeremy Auclair
committed
"""
The wind speed measured at heights other than 2 m can be adjusted according
to the follow equation
Arguments
----------
u_z : TYPE float array
measured wind speed z m above the ground surface, ms- 1.
h : TYPE float
height of the measurement above the ground surface, m.
Returns
-------
u2 : TYPE float array
average daily wind speed in meters per second (ms- 1 ) measured at 2 m above the ground.
"""

Jérémy AUCLAIR
committed
return u_z * 4.87/(np.log(67.8 * h - 5.42))
Jeremy Auclair
committed
def ea_calc(T: float) -> float:
"""
comments
Actual vapour pressure (ea) derived from dewpoint temperature '
Arguments
----------
T : Temperature in degree celsius.
Returns
-------
e_a :the actual Vapour pressure in Kpa
"""

Jérémy AUCLAIR
committed
return 0.6108 * np.exp(17.27 * T / (T + 237.15))
Jeremy Auclair
committed
Jeremy Auclair
committed
def combine_weather2netcdf(rain_file: str, ET0_tile: str, ndvi_path: str, save_path: str, available_ram: int) -> None:
Jeremy Auclair
committed
Convert the Rain and ET0 geotiffs into a single weather netcdf dataset.
Arguments
=========
1. rain_file: ``str``
Jeremy Auclair
committed
path to Rain tif
Jeremy Auclair
committed
path to ET0 tif
3. ndvi_path: ``str``
Jeremy Auclair
committed
path to ndvi cube
4. save_path: ``str``
Jeremy Auclair
committed
save path of weather netcdf dataset
5. available_ram: ``int``
Jeremy Auclair
committed
available ram in GiB for conversion
Returns
=======
``None``
Jeremy Auclair
committed
"""
# Open tif files
rain_tif = rio.open(rain_file)
ET0_tif = rio.open(ET0_tile)
# Open ndvi netcdf to get structure
ndvi = xr.open_dataset(ndvi_path)
dimensions = ndvi.drop_sel(time = ndvi.time).sizes # create dataset with a time dimension of length 0
Jeremy Auclair
committed
# Create empty dataset with same structure
Jeremy Auclair
committed
weather = ndvi.drop_vars(['NDVI']).copy(deep = True)
Jeremy Auclair
committed
# Set dataset attributes
weather.attrs['name'] = 'ModSpa Pixel weather dataset'
weather.attrs['description'] = 'Weather variables (Rain and ET0) for the ModSpa SAMIR (FAO-56) model at the pixel scale. Variables are scaled to be stored as integers.'

Jérémy AUCLAIR
committed
weather.attrs['scaling'] = "{'Rain': 100, 'ET0': 1000}"
weather['Rain'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
Jeremy Auclair
committed
weather['Rain'].attrs['units'] = 'mm'
weather['Rain'].attrs['standard_name'] = 'total_precipitation'
weather['Rain'].attrs['description'] = 'Accumulated daily precipitation in mm'

Jérémy AUCLAIR
committed
weather['Rain'].attrs['scale factor'] = '100'
Jeremy Auclair
committed
weather['ET0'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
Jeremy Auclair
committed
weather['ET0'].attrs['units'] = 'mm'
weather['ET0'].attrs['standard_name'] = 'evapotranspiration'
Jeremy Auclair
committed
weather['ET0'].attrs['description'] = 'Accumulated daily reference evapotranspiration in mm'
weather['ET0'].attrs['scale factor'] = '1000'
# Create encoding dictionnary
for variable in list(weather.keys()):
# Write encoding dict
encoding_dict = {}
encod = {}
encod['dtype'] = 'u2'
Jeremy Auclair
committed
encod['_FillValue'] = 0
# TODO: figure out optimal file chunk size
file_chunksize = (1, dimensions['y'], dimensions['x'])
Jeremy Auclair
committed
encod['chunksizes'] = file_chunksize
Jeremy Auclair
committed
# TODO: check if compression affects reading speed
Jeremy Auclair
committed
encod['zlib'] = True
encod['complevel'] = 1
Jeremy Auclair
committed
encoding_dict[variable] = encod
# Save empty output
print('\nWriting empty weather dataset')
weather.to_netcdf(save_path, encoding = encoding_dict, unlimited_dims = 'time')
Jeremy Auclair
committed
weather.close()
# Get geotiff dimensions (time, x, y)
dims = (rain_tif.count, rain_tif.height, rain_tif.width)
Jeremy Auclair
committed
# Determine the memory requirement of operation
Jeremy Auclair
committed
nb_bytes = 2 # int16
Jeremy Auclair
committed
nb_vars = 1 # one variable written at a time
Jeremy Auclair
committed
memory_requirement = ((dims[0] * dims[1] * dims[2]) * nb_vars * nb_bytes) / (1024**3) # in GiB
Jeremy Auclair
committed
# Get the number of time bands that can be loaded at once
time_slice, remainder, already_written = calculate_time_slices_to_load(dims[2], dims[1], dims[0], nb_vars, 0, 0, 0, nb_bytes, available_ram)
Jeremy Auclair
committed
Jeremy Auclair
committed
print('\nApproximate memory requirement of conversion:', round(memory_requirement, 3), 'GiB\nAvailable memory:', available_ram, 'GiB\n\nLoading blocks of', time_slice, 'time bands.\n')
Jeremy Auclair
committed
# Open empty dataset
weather = nc.Dataset(save_path, mode = 'r+')
# Create progress bar
progress_bar = tqdm(total = dims[0], desc='Writing weather data', unit=' bands')
Jeremy Auclair
committed
for i in range(dims[0]):
Jeremy Auclair
committed
if time_slice == dims[0] and not already_written: # if whole dataset fits in memory and it has not already been loaded
Jeremy Auclair
committed
weather.variables['Rain'][:,:,:] = rain_tif.read()
weather.variables['ET0'][:,:,:] = ET0_tif.read()
Jeremy Auclair
committed
already_written = True
Jeremy Auclair
committed
Jeremy Auclair
committed
elif i % time_slice == 0 and not already_written: # load a time slice every time i is divisible by the size of the time slice
Jeremy Auclair
committed
if i + time_slice <= dims[0]: # if the time slice does not gow over the dataset size
weather.variables['Rain'][i: i + time_slice, :, :] = rain_tif.read(tuple(k+1 for k in range(i, i + time_slice)))
weather.variables['ET0'][i: i + time_slice, :, :] = ET0_tif.read(tuple(k+1 for k in range(i, i + time_slice)))
else: # load the remainder when the time slice would go over the dataset size
weather.variables['Rain'][i: i + remainder, :, :] = rain_tif.read(tuple(k+1 for k in range(i, i + remainder)))
weather.variables['ET0'][i: i + remainder, :, :] = ET0_tif.read(tuple(k+1 for k in range(i, i + remainder)))
progress_bar.update()
# Write dates in weather dataset
weather.variables['time'].units = f'days since {np.datetime_as_string(dates[0], unit = "D")} 00:00:00' # set correct unit
weather.variables['time'][:] = np.arange(0, len(dates)) # save dates as integers representing the number of days since the first day
weather.sync() # flush data to disk
Jeremy Auclair
committed
Jeremy Auclair
committed
progress_bar.close()
Jeremy Auclair
committed
rain_tif.close()
ET0_tif.close()
weather.close()

Jérémy AUCLAIR
committed
def calculate_ET0_pixel(input_dataset: xr.Dataset, h: float = 10, safran: bool = False) -> xr.DataArray:
Jeremy Auclair
committed
"""
Calculate ET0 over the year for a single pixel of the ERA5 weather dataset.
Arguments
=========

Jérémy AUCLAIR
committed
1. input_dataset: ``xr.Dataset``
extracted dataset chunked to contain only one spatial pixel
2. h: ``float`` ``default = 10``
Jeremy Auclair
committed
height of ERA5 wind measurement in meters

Jérémy AUCLAIR
committed
3. safran: ``bool`` ``default = False``

Jérémy AUCLAIR
committed
boolean to adapt to a custom SAFRAN weather dataset
Jeremy Auclair
committed
Returns
=======
1. ET0_values: ``np.ndarray``
Jeremy Auclair
committed
numpy array containing the ET0 values for each day
"""

Jérémy AUCLAIR
committed
# Adapt dataset structure
if safran:
lat, lon = input_dataset.coords['y'].values, input_dataset.coords['x'].values
pixel_dataset = input_dataset.squeeze(dim = ['x', 'y'], drop = True)
else:
lat, lon = input_dataset.coords['lat'].values, input_dataset.coords['lon'].values
pixel_dataset = input_dataset.squeeze(dim = ['lat', 'lon'], drop = True)

Jérémy AUCLAIR
committed
# TODO: adapt for safran
Jeremy Auclair
committed
# Conversion of xarray dataset to dataframe for ET0 calculation
# Conversion of temparature
ET0 = pixel_dataset.t2m.resample(time = '1D').min().to_dataframe().rename(columns = {'t2m' : 'T_min'}) - 273.15 # conversion of temperatures from K to °C
ET0['T_max'] = pixel_dataset.t2m.resample(time = '1D').max().to_dataframe()['t2m'].values - 273.15 # conversion of temperatures from K to °C
ET0['R_s'] = pixel_dataset.ssrd.resample(time = '1D').sum().to_dataframe()['ssrd'].values / 1e6 # to convert downward total radiation from J/m² to MJ/m²
Jeremy Auclair
committed
# Calculate relative humidity
pixel_dataset['ea'] = ea_calc(pixel_dataset.t2m - 273.15)
pixel_dataset['es'] = ea_calc(pixel_dataset.d2m - 273.15)
pixel_dataset['rh'] = np.clip(100.*(pixel_dataset.es / pixel_dataset.ea), a_min = 0, a_max = 100)
ET0['RH_max'] = pixel_dataset.rh.resample(time = '1D').max().to_dataframe()['rh'].values
ET0['RH_min'] = pixel_dataset.rh.resample(time = '1D').min().to_dataframe()['rh'].values

Jérémy AUCLAIR
committed
if safran:
# Add wind
ET0['U_z'] = pixel_dataset.U_z.to_dataframe()['U_z'].values
else:
# Conversion of eastward and northward wind values to scalar wind

Jérémy AUCLAIR
committed
pixel_dataset['uz'] = np.sqrt(pixel_dataset.u10 ** 2 + pixel_dataset.v10 ** 2)
ET0['U_z'] = pixel_dataset.uz.resample(time = '1D').mean().to_dataframe()['uz'].values
Jeremy Auclair
committed
Jeremy Auclair
committed
# Start ET0 calculation
eto_calc = eto.ETo()
warnings.filterwarnings('ignore') # remove pandas warning
# ET0 calculation for given pixel (lat, lon) values
eto_calc.param_est(ET0,
freq = 'D', # daily frequence
# Elevation of the met station above mean sea level (m) (only needed if P is not in df).
z_msl = 0.,
lat = lat,
lon = lon,
TZ_lon = None,
z_u = h) # h: height of raw wind speed measurement
# Retrieve ET0 values

Jérémy AUCLAIR
committed
ET0_values = np.reshape(eto_calc.eto_fao(max_ETo = 15, min_ETo = 0, interp = True, maxgap = 10).values, [len(ET0.index), 1, 1]) # ETo_FAO_mm
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
output_coords = input_dataset.resample(time = '1D').sum().coords
output_dims = input_dataset.resample(time = '1D').sum().sizes
output = xr.DataArray(data = ET0_values, coords = output_coords, dims = output_dims, name = 'ET0')
return output
Jeremy Auclair
committed
def convert_interleave_mode(args: tuple[str, str, bool]) -> None:
Jeremy Auclair
committed
"""
Convert Geotiff files obtained from OTB to Band interleave mode for faster band reading.
Arguments
=========
(packed in args: ``tuple``)
1. input_image: ``str``
Jeremy Auclair
committed
input image to convert
2. output_image: ``str``
Jeremy Auclair
committed
output image to save
3. remove: ``bool`` ``default = True``
Jeremy Auclair
committed
weather to remove input image
Returns
=======
``None``
Jeremy Auclair
committed
"""
input_image, output_image, remove = args
# Open the input file in read mode
with rio.open(input_image, "r") as src:
# Open the output file in write mode
with rio.open(output_image, 'w', driver = src.driver, height = src.height, width = src.width, count = src.count, dtype = src.dtypes[0], crs = src.crs, transform = src.transform, interleave = 'BAND',) as dst:
# Loop over the blocks or windows of the input file
for _, window in src.block_windows(1):
# Write the data to the output file
dst.write(src.read(window = window), window = window)
# Remove unecessary image
if remove:
os.remove(input_image)
return None
def era5Land_daily_to_yearly_pixel(weather_files: list[str], variables: list[str], output_file: str, raw_S2_image_ref: str, ndvi_path: str, start_date: str, end_date: str, h: float = 10, max_ram: int = 8, use_OTB: bool = False, weather_overwrite: bool = False, safran: bool = False) -> str:
Jeremy Auclair
committed
"""
Calculate ET0 values from the ERA5 netcdf weather variables.
Output netcdf contains the ET0 and precipitation values for

Jérémy AUCLAIR
committed
each day in the selected time period and reprojected on the
same grid as the NDVI values.
Arguments
=========
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
1. weather_file: ``str``
path to netCDF raw weather files
2. variables: ``list[str]``
list of variables downloaded from era5
3. output_file: ``str``
4. raw_S2_image_ref: ``str``
raw Sentinel 2 image at right resolution for reprojection
5. ndvi_path: ``str``
Jeremy Auclair
committed
path to ndvi dataset, used for attributes and coordinates
6. start_date: ``str``
Jeremy Auclair
committed
beginning of the time window to download (format: ``YYYY-MM-DD``)
7. end_date: ``str``
Jeremy Auclair
committed
end of the time window to download (format: ``YYYY-MM-DD``)
8. h: ``float`` ``default = 10``
Jeremy Auclair
committed
height of ERA5 wind measurements in meters
9. max_ram: ``int`` ``default = 8``
Jeremy Auclair
committed
max ram (in GiB) for reprojection and conversion. Two
subprocesses are spawned for OTB, each receiviving
half of requested memory.
10. use_OTB: ``bool`` ``default = False``

Jérémy AUCLAIR
committed
boolean to choose to use OTB or not, tests will be added later
11. weather_overwrite: ``bool`` ``default = False``
Jeremy Auclair
committed
boolean to choose to overwrite weather netCDF
12. safran: ``bool`` ``default = False``

Jérémy AUCLAIR
committed
boolean to adapt to a custom SAFRAN weather dataset
Jeremy Auclair
committed
Returns
=======
Jeremy Auclair
committed
1. output_file_final: ``str``
path to ``netCDF4`` file containing precipitation and ET0 data
Jeremy Auclair
committed
"""
Jeremy Auclair
committed
# Test if file exists
if os.path.exists(output_file + '.nc') and not weather_overwrite:
return output_file + '.nc'
Jeremy Auclair
committed
# Test if memory requirement is not loo large
if np.ceil(virtual_memory().available / (1024**3)) < max_ram:
print('\nRequested', max_ram, 'GiB of memory when available memory is approximately', round(virtual_memory().available / (1024**3), 1), 'GiB.\n\nExiting script.\n')
return None
# Load all weather files in a single dataset
raw_weather_ds = xr.Dataset()
for var in variables:
temp = []
for file in weather_files:
if fnmatch(file, '*' + var + '*'):
temp.append(file)
raw_weather_ds = xr.merge([raw_weather_ds, xr.open_mfdataset(temp).drop_vars(['number', 'expver']).rename({'valid_time': 'time', 'latitude': 'lat', 'longitude': 'lon'})])
Jeremy Auclair
committed
Jeremy Auclair
committed
# Clip extra dates
raw_weather_ds = raw_weather_ds.sel({'time': slice(start_date, end_date)}).sortby(variables = 'time')
resampled_weather_ds = raw_weather_ds.resample(time = '1D').sum()
Jeremy Auclair
committed
Jeremy Auclair
committed
# Create ET0 variable (that will be saved) and set attributes
resampled_weather_ds = resampled_weather_ds.assign(ET0 = (resampled_weather_ds.sizes, np.zeros(tuple(resampled_weather_ds.sizes[d] for d in list(resampled_weather_ds.sizes)), dtype = np.float32)))

Jérémy AUCLAIR
committed
if safran:

Jérémy AUCLAIR
committed
# Chunk weather dataset
raw_weather_ds = raw_weather_ds.chunk({'time': -1, 'y': 1, 'x': 1})
# Apply ET0 function
resampled_weather_ds['ET0'] = raw_weather_ds.map_blocks(calculate_ET0_pixel, args = (h, safran), template = resampled_weather_ds.ET0.chunk({'time': -1, 'y': 1, 'x': 1}))

Jérémy AUCLAIR
committed
# Get necessary data for final dataset
final_weather_ds = resampled_weather_ds.drop_vars(names = ['ssrd', 't2m', 'd2m', 'RH_max', 'RH_min', 'U_z']) # remove unwanted variables

Jérémy AUCLAIR
committed
else:

Jérémy AUCLAIR
committed
# Chunk weather dataset
raw_weather_ds = raw_weather_ds.chunk({'time': -1, 'lon': 1, 'lat': 1})
# Apply ET0 function
resampled_weather_ds['ET0'] = raw_weather_ds.map_blocks(calculate_ET0_pixel, args = (h, safran), template = resampled_weather_ds.ET0.chunk({'time': -1, 'lon': 1, 'lat': 1}))

Jérémy AUCLAIR
committed
# Get necessary data for final dataset
final_weather_ds = resampled_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m', 'd2m']) # remove unwanted variables

Jérémy AUCLAIR
committed
# Scale data and rewrite netcdf attributes
Jeremy Auclair
committed
final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm
# Change datatype to reduce memory usage

Jérémy AUCLAIR
committed
final_weather_ds['tp'] = (final_weather_ds['tp'] * 100).astype('u2').chunk(chunks = {"time": 1})
Jeremy Auclair
committed
final_weather_ds['ET0'] = (final_weather_ds['ET0'] * 1000).astype('u2').chunk(chunks = {"time": 1})

Jérémy AUCLAIR
committed
# TODO: fix for safran

Jérémy AUCLAIR
committed
final_weather_ds.rio.write_crs('EPSG:4326', inplace = True)
Jeremy Auclair
committed
# Set variable attributes

Jérémy AUCLAIR
committed
final_weather_ds['ET0'].attrs['unit'] = 'mm'
Jeremy Auclair
committed
final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
Jeremy Auclair
committed
final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method (scale factor = 1000)'

Jérémy AUCLAIR
committed
final_weather_ds['tp'].attrs['unit'] = 'mm'
Jeremy Auclair
committed
final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'

Jérémy AUCLAIR
committed
final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters (scale factor = 100)'
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
# TODO: find how to test OTB installation from python
if use_OTB:
# Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
output_file_rain = output_file + '_rain.tif'
output_file_ET0 = output_file + '_ET0.tif'
final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16')
final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
# Reprojected image paths
output_file_rain_reproj = output_file + '_rain_reproj.tif'
output_file_ET0_reproj = output_file + '_ET0_reproj.tif'
# Converted image paths
output_file_final = output_file + '.nc'
# otbcli_SuperImpose commands
OTB_command_reproj1 = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_rain + ' -out ' + output_file_rain_reproj + ' uint16 -interpolator linear -ram ' + str(int(max_ram * 1024/2))
OTB_command_reproj2 = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_ET0 + ' -out ' + output_file_ET0_reproj + ' uint16 -interpolator linear -ram ' + str(int(max_ram * 1024/2))
commands_reproj = [OTB_command_reproj1, OTB_command_reproj2]
with Pool(2) as p:
p.map(os.system, commands_reproj)
# Combine to netCDF file
combine_weather2netcdf(output_file_rain_reproj, output_file_ET0_reproj, ndvi_path, output_file_final, available_ram = max_ram)
# remove old files and rename outputs
os.remove(output_file_rain)
os.remove(output_file_ET0)
os.remove(output_file_rain_reproj)
os.remove(output_file_ET0_reproj)
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
else:
# Set dataset attributes
final_weather_ds.attrs['name'] = 'ModSpa Pixel weather dataset'
final_weather_ds.attrs['description'] = 'Weather variables (Rain and ET0) for the ModSpa SAMIR (FAO-56) model at the pixel scale. Variables are scaled to be stored as integers.'

Jérémy AUCLAIR
committed
final_weather_ds.attrs['scaling'] = "{'Rain': 100, 'ET0': 1000}"

Jérémy AUCLAIR
committed
# Set file names
output_file_final = output_file + '.nc'
# Open reference image
ref = rioxarray.open_rasterio(raw_S2_image_ref)

Jérémy AUCLAIR
committed
print('\nReprojecting weather dataset')

Jérémy AUCLAIR
committed

Jérémy AUCLAIR
committed
# Get metadata
target_crs = ref.rio.crs
spatial_ref = ref.spatial_ref.load()
# Define ressources
mem_limit = min([int(np.ceil(len(ref.x) * len(ref.y) * len(final_weather_ds.time) * len(final_weather_ds.data_vars) * np.dtype(np.float32).itemsize / (1024 ** 2)) * 1.1), 0.8 * virtual_memory().available / (1024**2), max_ram * 1024])
nb_threads = min([cpu_count(logical = True), len(os.sched_getaffinity(0))])
# Reproject
final_weather_ds = final_weather_ds.rio.reproject(target_crs, transform = ref.rio.transform(), shape = (ref.rio.height, ref.rio.width), resampling = Resampling.bilinear, num_threads = nb_threads, warp_mem_limit = mem_limit)
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
# Rename
final_weather_ds = final_weather_ds.rename({'tp': 'Rain'})
# Create encoding dictionnary
for variable in list(final_weather_ds.keys()):
# Write encoding dict
encod = {}
encod['dtype'] = 'u2'
if '_FillValue' in final_weather_ds[variable].attrs:
del final_weather_ds[variable].attrs['_FillValue']
encod['_FillValue'] = 0
# TODO: figure out optimal file chunk size
file_chunksize = (1, final_weather_ds.sizes['y'], final_weather_ds.sizes['x'])

Jérémy AUCLAIR
committed
encod['chunksizes'] = file_chunksize
# TODO: check if compression affects reading speed
encod['zlib'] = True
encod['complevel'] = 1
final_weather_ds[variable].encoding.update(encod)

Jérémy AUCLAIR
committed
# Rewrite georeferencing
final_weather_ds.rio.write_crs(target_crs, inplace = True)

Jérémy AUCLAIR
committed
final_weather_ds.rio.write_transform(inplace = True)

Jérémy AUCLAIR
committed
final_weather_ds['spatial_ref'] = spatial_ref
final_weather_ds.attrs['crs'] = final_weather_ds.rio.crs.to_string()

Jérémy AUCLAIR
committed
final_weather_ds = final_weather_ds.set_coords('spatial_ref')
# Save empty output
final_weather_ds.to_netcdf(output_file_final)
final_weather_ds.close()
Jeremy Auclair
committed
return output_file_final
Jeremy Auclair
committed
def era5Land_daily_to_yearly_parcel(weather_files: list[str], variables: list[str], output_file: str, start_date: str, end_date: str, h: float = 10) -> str:
Jeremy Auclair
committed
"""
Calculate ET0 values from the ERA5 netcdf weather variables.
Output netcdf contains the ET0 and precipitation values for
each day in the selected time period.
Arguments
=========
1. weather_file: ``list[str]``

Jérémy AUCLAIR
committed
path to netCDF raw weather files
2. variables: ``list[str]``
list of variables downloaded from era5
3. output_file: ``str``
Jeremy Auclair
committed
output file name without extension
Jeremy Auclair
committed
3. start_date: ``str``
beginning of the time window to download (format: ``YYYY-MM-DD``)
4. end_date: ``str``
end of the time window to download (format: ``YYYY-MM-DD``)
5. h: ``float`` ``default = 10``
Jeremy Auclair
committed
height of ERA5 wind measurements in meters
Returns
=======
1. output_file_rain: ``str``
path to ``Geotiff`` file containing precipitation data
2. output_file_ET0: ``str``
path to ``Geotiff`` file containing ET0 data
"""
# Load all weather files in a single dataset
raw_weather_ds = xr.Dataset()
for var in variables:
temp = []
for file in weather_files:
if fnmatch(file, '*' + var + '*'):
temp.append(file)
raw_weather_ds = xr.merge([raw_weather_ds, xr.open_mfdataset(temp).drop_vars(['number', 'expver']).rename({'valid_time': 'time', 'latitude': 'lat', 'longitude': 'lon'})])
Jeremy Auclair
committed
Jeremy Auclair
committed
# Clip extra dates

Jérémy AUCLAIR
committed
raw_weather_ds = raw_weather_ds.sel({'time': slice(start_date, end_date)}).sortby(variables = 'time')
resampled_weather_ds = raw_weather_ds.resample(time = '1D').sum()
Jeremy Auclair
committed
Jeremy Auclair
committed
# Create ET0 variable (that will be saved) and set attributes

Jérémy AUCLAIR
committed
resampled_weather_ds = resampled_weather_ds.assign(ET0 = (resampled_weather_ds.sizes, np.zeros(tuple(resampled_weather_ds.sizes[d] for d in list(resampled_weather_ds.sizes)), dtype = np.float32)))
Jeremy Auclair
committed
# Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"

Jérémy AUCLAIR
committed
# Chunk weather dataset
raw_weather_ds = raw_weather_ds.chunk({'time': -1, 'lon': 1, 'lat': 1})
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
# Apply ET0 function
resampled_weather_ds['ET0'] = raw_weather_ds.map_blocks(calculate_ET0_pixel, args = (h, False), template = resampled_weather_ds.ET0.chunk({'time': -1, 'lon': 1, 'lat': 1}))
Jeremy Auclair
committed
# Get necessary data for final dataset and rewrite netcdf attributes

Jérémy AUCLAIR
committed
final_weather_ds = resampled_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m', 'd2m']) # remove unwanted variables
Jeremy Auclair
committed
final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm
# Change datatype to reduce memory usage

Jérémy AUCLAIR
committed
final_weather_ds['tp'] = (final_weather_ds['tp'] * 100).astype('u2').chunk(chunks = {"time": 1})
Jeremy Auclair
committed
final_weather_ds['ET0'] = (final_weather_ds['ET0'] * 1000).astype('u2').chunk(chunks = {"time": 1})
Jeremy Auclair
committed
# Write projection
final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326')
# Set variable attributes

Jérémy AUCLAIR
committed
final_weather_ds['ET0'].attrs['unit'] = 'mm'
Jeremy Auclair
committed
final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method (scale factor = 1000)'

Jérémy AUCLAIR
committed
final_weather_ds['tp'].attrs['unit'] = 'mm'
Jeremy Auclair
committed
final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'

Jérémy AUCLAIR
committed
final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters (scale factor = 100)'
Jeremy Auclair
committed
# Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
output_file_rain = output_file + '_rain.tif'
output_file_ET0 = output_file + '_ET0.tif'
final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16')
final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
return output_file_rain, output_file_ET0
def extract_rasterstats(args: tuple) -> list[float]:
Jeremy Auclair
committed
"""
Jeremy Auclair
committed
Generate a dataframe for a given raster and a geopandas shapefile object.
It iterates over the features of the shapefile geometry (polygons). This
information is stored in a list.
Jeremy Auclair
committed
Jeremy Auclair
committed
It returns a list that contains the raster values, a feature ``id``
Jeremy Auclair
committed
and the date for the image and every polygon in the shapefile geometry.
It also has identification data relative to the shapefile: landcover (``LC``),
land cover identifier (``id``) This list is returned to be later agregated
in a ``DataFrame``.
This function is used to allow multiprocessing for weather extraction.
Arguments (packed in args: ``tuple``)
=====================================
Jeremy Auclair
committed
1. raster_path: ``str``
Jeremy Auclair
committed
path to multiband Geotiff
Jeremy Auclair
committed
2. shapefile: ``str``
Jeremy Auclair
committed
path to shapefile
Jeremy Auclair
committed
3. config_file: ``str``
Jeremy Auclair
committed
path to config file
Returns
=======
1. raster_stats: ``list[float]``
Jeremy Auclair
committed
list containing weather values and feature information for every
polygon in the shapefile
"""
# Open arguments packed in args
Jeremy Auclair
committed
raster_path, shapefile, config_file = args
Jeremy Auclair
committed
# Open config file
config_params = config(config_file)
# Create dataframe where zonal statistics will be stored
Jeremy Auclair
committed
raster_stats = []
Jeremy Auclair
committed
# Get dates
dates = pd.to_datetime(pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')).values
# Open ndvi image and shapefile geometry

Jérémy AUCLAIR
committed
raster_dataset = rio.open(raster_path, mode = 'r')
Jeremy Auclair
committed
# Get no data value
Jeremy Auclair
committed
nodata = raster_dataset.nodata
Jeremy Auclair
committed
# Get the number of bands
Jeremy Auclair
committed
nbands = raster_dataset.count
Jeremy Auclair
committed
# Loop on the individual polygons in the shapefile geometry
for index, row in shapefile.iterrows():
# Get the feature geometry as a shapely object
geom = row.geometry
# id number of the current parcel geometry
id = index + 1
# Get land cover
LC = row.LC
# Crop the raster using the bounding box
try:

Jérémy AUCLAIR
committed
masked_raster, _ = mask(raster_dataset, [geom], crop = True)
Jeremy Auclair
committed
except:
print('\nShapefile bounds are not contained in weather dataset bounds.\n\nExiting script.')
return None
# Mask the raster using the geometry

Jérémy AUCLAIR
committed
masked_raster, _ = mask(raster_dataset, [geom], crop = True, all_touched = True)
Jeremy Auclair
committed
# Replace the nodata values with nan
masked_raster = masked_raster.astype(np.float32)
masked_raster[masked_raster == nodata] = np.nan
Jeremy Auclair
committed
# Calculate the zonal statistics

Jérémy AUCLAIR
committed
mean = np.nanmean(masked_raster, axis = (1,2))
# Add statistics to output list
raster_stats.extend([[dates[i], id, mean[i], LC] for i in range(nbands)])
Jeremy Auclair
committed
# Update progress bar

Jérémy AUCLAIR
committed
with lock:
shared_value.value += 0.5
Jeremy Auclair
committed
Jeremy Auclair
committed
raster_dataset.close()
Jeremy Auclair
committed
Jeremy Auclair
committed
return raster_stats
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
def init_worker(shared_value_, lock_) -> None:
"""
Function to initialize the pool workers with shared value and lock.
Arguments
=========
1. shared_value_: ``float``
shared progress bar value
2. lock_: ``Lock``
lock to access shared value
"""
global shared_value, lock
shared_value = shared_value_
lock = lock_
def divide_geodataframe(gdf: gpd.GeoDataFrame, n: int) -> list[gpd.GeoDataFrame]:
"""
Divide geodataframes into n equal parts.
Arguments
=========
1. gdf: ``gpd.GeoDataFrame``
input geodataframe
2. n: ``int``
number of parts to divide into
Returns
=======
1. divided_gdfs: ``list[gpd.GeoDataFrame]``
output geodataframes
"""
# Calculate the size of each part
part_size = len(gdf) // n
remainder = len(gdf) % n
# Create a list to store the divided GeoDataFrames
divided_gdfs = []
start_idx = 0
for i in range(n):
end_idx = start_idx + part_size + (1 if i < remainder else 0)
divided_gdfs.append(gdf.iloc[start_idx:end_idx])
start_idx = end_idx
return divided_gdfs
def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, config_file: str, save_path: str, max_cpu: int = 4) -> None:
Jeremy Auclair
committed
"""
Extract a weather dataframe for each variable (Rain, ET0) and merge them in one
dataframe. This dataframe is saved as ``csv`` file.
Arguments
=========
Jeremy Auclair
committed
path to rain Geotiff file
Jeremy Auclair
committed
path to ET0 Geotiff file
Jeremy Auclair
committed
path to shapefile
Jeremy Auclair
committed
path to config file
Jeremy Auclair
committed
save path for weather dataframe

Jérémy AUCLAIR
committed
6. max_cpu: ``int`` ``default = 4``
max number of CPU cores to use for calculation
Jeremy Auclair
committed
Returns
=======
``None``
"""

Jérémy AUCLAIR
committed
print(f'\nStarting weather data extraction on {max_cpu} cores..\n')
Jeremy Auclair
committed

Jérémy AUCLAIR
committed
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Shared value and lock for controlling access to the value
manager = Manager()
shared_value = manager.Value('i', 0)
lock = manager.Lock()
# Get target epsg
with rio.open(rain_path, mode = 'r') as src:
target_epsg = src.crs
# Total iterations (assuming each extract_rasterstats call represents one iteration)
shapefile = gpd.read_file(shapefile)
shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg)
total_iterations = len(shapefile.index)
args1 = [(rain_path, smaller_shapefile, config_file) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)]
args2 = [(ET0_path, smaller_shapefile, config_file) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)]
# # Generate arguments for multiprocessing
# args = [(rain_path, shapefile, config_file), (ET0_path, shapefile, config_file)]
args = args1 + args2
# Create and initialize the pool
pool = Pool(processes = 2 * max_cpu, initializer = init_worker, initargs = (shared_value, lock))
# Progress bar
with tqdm(desc = 'Extracting zonal statistics', total = total_iterations, unit = ' polygons', dynamic_ncols = True) as pbar:
# Start the worker processes
results = [pool.apply_async(extract_rasterstats, args=(arg,)) for arg in args]
while shared_value.value < total_iterations: