Newer
Older
# -*- coding: UTF-8 -*-
# Python
Jeremy Auclair
committed
"""
Functions to call ECMWF Reanalysis with CDS-api
- ERA5-land daily request
Jeremy Auclair
committed
- request a list of daily variables dedicated to the calculus of ET0
Jeremy Auclair
committed
and the generation of MODSPA daily forcing files
Jeremy Auclair
committed
heavily modified from @rivallandv's original file
Jeremy Auclair
committed
Jeremy Auclair
committed
@author: auclairj
Jeremy Auclair
committed
"""
import os # for path exploration and file management
Jeremy Auclair
committed
from typing import List, Tuple # to declare variables
Jeremy Auclair
committed
import numpy as np # for math on arrays
import xarray as xr # to manage nc files
from datetime import datetime # to manage dates
from p_tqdm import p_map # for multiprocessing with progress bars
from dateutil.rrule import rrule, MONTHLY
from fnmatch import fnmatch # for file name matching
Jeremy Auclair
committed
import pandas as pd # to manage dataframes
import rasterio as rio # to manage geotiff images
Jeremy Auclair
committed
import geopandas as gpd # to manage shapefile crs projections
from rasterio.mask import mask # to mask images
from shapely.geometry import box # to extract parcel statistics
import netCDF4 as nc # to write netcdf4 files
from tqdm import tqdm # to follow progress
Jeremy Auclair
committed
from multiprocessing import Pool # to parallelize reprojection
Jeremy Auclair
committed
from psutil import virtual_memory # to check available ram
Jeremy Auclair
committed
from modspa_pixel.config.config import config # to import config file
Jeremy Auclair
committed
from modspa_pixel.source.modspa_samir import calculate_time_slices_to_load # to optimise I/O operations
Jeremy Auclair
committed
import re # for string comparison
import warnings # to suppress pandas warning
# CDS API external library
# source: https://pypi.org/project/cdsapi/
import cdsapi # to download cds data
import requests # to request data
# FAO ET0 calculator external library
# Notes
# source: https://github.com/Evapotranspiration/ETo
# documentation: https://eto.readthedocs.io/en/latest/
import eto # to calculate ET0
def era5_enclosing_shp_aera(area: List[float], pas: float) -> Tuple[float, float, float, float]:
Jeremy Auclair
committed
"""
Find the four coordinates including the boxbound scene
to agree with gridsize resolution
system projection: WGS84 lat/lon degree
Arguments
=========
1. area: ``List[float]``
bounding box of the demanded area
list of floats: [lat north, lon west, lat south, lon east] in degree WGS84
2. pas: ``float``
gridsize
Returns
=======
1. era5_area: ``Tuple[float, float, float, float]``
coordinates list corresponding to N,W,S,E corners of the grid in decimal degree
.. note::
Jeremy Auclair
committed
gdal coordinates reference upper left corner of pixel, ERA5 coordinates refere to center of grid. To resolve this difference an offset of pas/2 is applied
Jeremy Auclair
committed
"""
Jeremy Auclair
committed
lat_max, lon_min, lat_min, lon_max = area
# North
Jeremy Auclair
committed
era5_lat_max = round((lat_max//pas+2)*pas, 2)
Jeremy Auclair
committed
# West
era5_lon_min = round((lon_min//pas)*pas, 2)
# South
era5_lat_min = round((lat_min//pas)*pas, 2)
# Est
Jeremy Auclair
committed
era5_lon_max = round((lon_max//pas+2)*pas, 2)
Jeremy Auclair
committed
era5_area = era5_lat_max, era5_lon_min, era5_lat_min, era5_lon_max
return era5_area # [N,W,S,E]
def call_era5land_daily(args: Tuple[str, str, str, str, List[int], str]) -> None:
Jeremy Auclair
committed
"""
Query of one month of daily ERA5-land data of a selected variable
Jeremy Auclair
committed
according to a selected statistic
Documentation:
`cds_climate <https://datastore.copernicus-climate.eu/documents/app-c3s-daily-era5-statistics/C3S_Application-Documentation_ERA5-daily-statistics-v2.pdf>`_
Jeremy Auclair
committed
Arguments
=========
(packed in args: ``tuple``)
1. year: ``str``
Jeremy Auclair
committed
year at YYYY format.
Jeremy Auclair
committed
month at MM format.
Jeremy Auclair
committed
user-selectable variable
cf. Appendix A Table 3 for list of input variables availables.
4. statistic: ``str``
Jeremy Auclair
committed
daily statistic choosed, 3 possibility
daily_mean or daily_minimum or daily_maximum.
5. area: ``List[int]``
bounding box of the demanded area
Jeremy Auclair
committed
area = [lat_max, lon_min, lat_min, lon_max]
6. output_path: ``str``
Jeremy Auclair
committed
path for output file.
Returns
=======
``None``
Jeremy Auclair
committed
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
"""
year, month, variable, statistic, area, output_path = args
# set name of output file for each month (statistic, variable, year, month)
output_filename = \
output_path+os.sep +\
"ERA5-land_"+year+"_"+month+"_"+variable+"_"+statistic+".nc"
if os.path.isfile(output_filename):
print(output_filename, ' already exist')
else:
try:
c = cdsapi.Client(timeout=300)
result = c.service("tool.toolbox.orchestrator.workflow",
params={
"realm": "c3s",
"project": "app-c3s-daily-era5-statistics",
"version": "master",
"kwargs": {
"dataset": "reanalysis-era5-land",
"product_type": "reanalysis",
"variable": variable,
"statistic": statistic,
"year": year,
"month": month,
"time_zone": "UTC+00:0",
"frequency": "1-hourly",
"grid": "0.1/0.1",
"area": {"lat": [area[2], area[0]],
"lon": [area[1], area[3]]}
},
"workflow_name": "application"
})
location = result[0]['location']
res = requests.get(location, stream=True)
print("Writing data to " + output_filename)
with open(output_filename, 'wb') as fh:
for r in res.iter_content(chunk_size=1024):
fh.write(r)
fh.close()
except:
print('!! request', variable, ' failed !! -> year', year, 'month', month)
Jeremy Auclair
committed
return None
def call_era5land_daily_for_MODSPA(start_date: str, end_date: str, area: List[float], output_path: str, processes: int = 9) -> None:
"""
request ERA5-land daily variables needed for ET0 calculus and MODSPA forcing
`reanalysis_era5 <https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview>`_
Information on requested variables
----------------------------------
called land surface variables :
* **2m_temperature**
* **2m_dewpoint_temperature**
* **surface_solar_radiation_downward**
* **surface_net_solar_radiation**
* **surface_pressure**
* **mean_sea_level_pressure**
* **potential_evaporation**
* **evaporation**
* **total_evaporation**
* **total_precipitation**
* **snowfall**
* **10m_u_component_of_wind**
* **10m_v_component_of_wind**
Jeremy Auclair
committed
Arguments
=========
1. start_date: ``str``
start date in YYYY-MM-DD format
2. end_date: ``str``
end date in YYYY-MM-DD format
3. area: ``List[float]``
bounding box of the demanded area
area = [lat_max, lon_min, lat_min, lon_max]
4. output_path: ``str``
output file name, ``.nc`` extension
5. processes: ``int`` ``default = 9``
number of logical processors on which to run the download command.
can be higher than your actual number of processor cores,
download operations have a low CPU demand.
Jeremy Auclair
committed
Returns
=======
``None``
Jeremy Auclair
committed
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""
# list of first day of each month date into period
strt_dt = datetime.strptime(start_date, '%Y-%m-%d').replace(day=1)
end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(day=1)
periods = [dt for dt in rrule(
freq=MONTHLY, dtstart=strt_dt, until=end_dt, bymonthday=1)]
dico = {
'2m_temperature': ['daily_minimum', 'daily_maximum'],
'10m_u_component_of_wind': ['daily_mean'],
'10m_v_component_of_wind': ['daily_mean'],
'total_precipitation': ['daily_mean'],
'surface_solar_radiation_downwards': ['daily_mean'],
'2m_dewpoint_temperature': ['daily_minimum', 'daily_maximum']
}
args = []
# loop on variable to upload
for variable in dico.keys():
# loop on statistic associated to variable to upload
for statistic in dico[variable]:
# loop on year and month
for dt in periods:
year = str(dt.year)
month = '0'+str(dt.month)
month = month[-2:]
# Requete ERA5-land
args.append((year, month, variable, statistic, area, output_path))
# Start pool
p_map(call_era5land_daily, args, **{"num_cpus": processes})
return None
def filename_to_datetime(filename: str) -> datetime.date:
"""
filename_to_datetime returns a ``datetime.date`` object for the date of the given file name.
Arguments
=========
Jeremy Auclair
committed
Jeremy Auclair
committed
name or path of the product
Returns
=======
1. date: ``datetime.date``
Jeremy Auclair
committed
datetime.date object, date of the product
"""
# Search for a date pattern (yyyy_mm_dd) in the product name or path
match = re.search('\d{4}_\d{2}', filename)
format = '%Y_%m'
datetime_object = datetime.strptime(match[0], format)
return datetime_object.date()
def concat_monthly_nc_file(list_era5land_monthly_files: List[str], list_variables: List[str], output_path: str) -> List[str]:
"""
Concatenate monthly netcdf datasets into a single file for each given variable.
Arguments
=========
1. list_era5land_monthly_files: ``List[str]``
Jeremy Auclair
committed
list of daily files per month
2. list_variables: ``List[str]``
Jeremy Auclair
committed
names of the required variables as written in the filename
3. output_path: ``List[str]``
Jeremy Auclair
committed
path to which save the aggregated files
Returns
=======
1. list_era5land_files: ``List[str]``
Jeremy Auclair
committed
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
the list of paths to the aggregated files
"""
if not os.path.exists(output_path): os.mkdir(output_path)
list_era5land_monthly_files.sort()
list_era5land_files = []
# concatenate all dates into a single file for each variable
for variable in list_variables:
curr_var_list = []
dates = []
for file in list_era5land_monthly_files:
# find specific variable
if fnmatch(file, '*' + variable + '*'):
curr_var_list.append(file)
dates.append(filename_to_datetime(file))
curr_datasets = []
for file in curr_var_list:
# open all months for the given variable
curr_datasets.append(xr.open_dataset(file))
# Create file name
try:
concatenated_file = output_path + os.sep + 'era5-land_' + dates[0].strftime('%m-%Y') + '_' + dates[-1].strftime('%m-%Y') + '_' + variable + '.nc'
except:
print(variable)
# Concatenate monthly datasets
concatenated_dataset = xr.concat(curr_datasets, dim = 'time')
# Save datasets
concatenated_dataset.to_netcdf(path = concatenated_file, mode = 'w',)
# Add filename to output list
list_era5land_files.append(concatenated_file)
return list_era5land_files
def uz_to_u2(u_z: List[float], h: float) -> List[float]:
"""
The wind speed measured at heights other than 2 m can be adjusted according
to the follow equation
Arguments
----------
u_z : TYPE float array
measured wind speed z m above the ground surface, ms- 1.
h : TYPE float
height of the measurement above the ground surface, m.
Returns
-------
u2 : TYPE float array
average daily wind speed in meters per second (ms- 1 ) measured at 2 m above the ground.
"""
u2 = u_z*4.87/(np.log(67.8*h - 5.42))
return u2
def ea_calc(T: float) -> float:
"""
comments
Actual vapour pressure (ea) derived from dewpoint temperature '
Arguments
----------
T : Temperature in degree celsius.
Returns
-------
e_a :the actual Vapour pressure in Kpa
"""
e_a = 0.6108*np.exp(17.27*T/(T+237.15))
return e_a
def load_variable(file_name: str) -> xr.Dataset:
"""
Loads an ERA5 meteorological variable into a xarray
dataset according to the modspa architecture.
Arguments
=========
1. file_name: ``str``
Jeremy Auclair
committed
netcdf file to load
Returns
=======
1. variable: ``xr.Dataset``
Jeremy Auclair
committed
output xarray dataset
"""
# Rename temperature variables according to the statistic (max or min)
if fnmatch(file_name, '*era5-land*2m_temperature_daily_maximum*'): # maximum temperature
variable = xr.open_dataset(file_name).rename({'t2m': 't2m_max'}).drop_vars('realization') # netcdfs from ERA5 carry an unecessary 'realization' coordinate, so it is dropped
elif fnmatch(file_name, '*era5-land*2m_temperature_daily_minimum*'): # minimum temperature
variable = xr.open_dataset(file_name).rename({'t2m': 't2m_min'}).drop_vars('realization')
elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_maximum*'): # maximum dewpoint temperature
variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_max'}).drop_vars('realization')
elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_minimum*'): # minimum temperature
variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_min'}).drop_vars('realization')
# Other variables can be loaded without modification
else:
variable = xr.open_dataset(file_name).drop_vars('realization')
return variable
Jeremy Auclair
committed
def combine_weather2netcdf(rain_file: str, ET0_tile: str, ndvi_path: str, save_path: str, available_ram: int) -> None:
Jeremy Auclair
committed
Convert the Rain and ET0 geotiffs into a single weather netcdf dataset.
Arguments
=========
1. rain_file: ``str``
Jeremy Auclair
committed
path to Rain tif
Jeremy Auclair
committed
path to ET0 tif
3. ndvi_path: ``str``
Jeremy Auclair
committed
path to ndvi cube
4. save_path: ``str``
Jeremy Auclair
committed
save path of weather netcdf dataset
5. available_ram: ``int``
Jeremy Auclair
committed
available ram in GiB for conversion
Returns
=======
``None``
Jeremy Auclair
committed
"""
# Open tif files
rain_tif = rio.open(rain_file)
ET0_tif = rio.open(ET0_tile)
# Open ndvi netcdf to get structure
ndvi = xr.open_dataset(ndvi_path)
dates = ndvi.time
# Get empty dimensions
dimensions = ndvi.drop_sel(time = ndvi.time).dims # create dataset with a time dimension of length 0
Jeremy Auclair
committed
# Create empty dataset with same structure
Jeremy Auclair
committed
weather = ndvi.drop_vars(['NDVI']).copy(deep = True)
Jeremy Auclair
committed
weather['Rain'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
Jeremy Auclair
committed
weather['Rain'].attrs['units'] = 'mm'
weather['Rain'].attrs['standard_name'] = 'total_precipitation'
weather['Rain'].attrs['description'] = 'Accumulated daily precipitation in mm'
weather['Rain'].attrs['scale factor'] = '1000'
weather['ET0'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
Jeremy Auclair
committed
weather['ET0'].attrs['units'] = 'mm'
weather['ET0'].attrs['standard_name'] = 'Transpiration'
weather['ET0'].attrs['description'] = 'Accumulated daily reference evapotranspiration in mm'
weather['ET0'].attrs['scale factor'] = '1000'
# Create encoding dictionnary
for variable in list(weather.keys()):
# Write encoding dict
encoding_dict = {}
encod = {}
encod['dtype'] = 'u2'
Jeremy Auclair
committed
encod['_FillValue'] = 0
file_chunksize = (1, dimensions['y'], dimensions['x'])
Jeremy Auclair
committed
encod['chunksizes'] = file_chunksize
Jeremy Auclair
committed
# TODO: check if compression affects reading speed
Jeremy Auclair
committed
encod['zlib'] = True
encod['complevel'] = 1
Jeremy Auclair
committed
encoding_dict[variable] = encod
# Save empty output
print('\nWriting empty weather dataset')
weather.to_netcdf(save_path, encoding = encoding_dict, unlimited_dims = 'time')
Jeremy Auclair
committed
weather.close()
# Get geotiff dimensions (time, x, y)
dims = (rain_tif.count, rain_tif.height, rain_tif.width)
Jeremy Auclair
committed
# Determine the memory requirement of operation
Jeremy Auclair
committed
nb_bytes = 2 # int16
Jeremy Auclair
committed
nb_vars = 1 # one variable written at a time
Jeremy Auclair
committed
memory_requirement = ((dims[0] * dims[1] * dims[2]) * nb_vars * nb_bytes) / (1024**3) # in GiB
Jeremy Auclair
committed
security_factor = 0.8 # it is difficult to estimate true memory usage with compression algorithms, apply a security factor to prevent memory overload
# Get the number of time bands that can be loaded at once
Jeremy Auclair
committed
time_slice, remainder, already_written = calculate_time_slices_to_load(dims[2], dims[1], dims[0], nb_vars, 0, 0, 0, nb_bytes, security_factor, available_ram)
Jeremy Auclair
committed
Jeremy Auclair
committed
print('\nApproximate memory requirement of conversion:', round(memory_requirement, 3), 'GiB\nAvailable memory:', available_ram, 'GiB\n\nLoading blocks of', time_slice, 'time bands.\n')
Jeremy Auclair
committed
# Open empty dataset
weather = nc.Dataset(save_path, mode = 'r+')
# Create progress bar
progress_bar = tqdm(total = dims[0], desc='Writing weather data', unit=' bands')
Jeremy Auclair
committed
for i in range(dims[0]):
Jeremy Auclair
committed
if time_slice == dims[0] and not already_written: # if whole dataset fits in memory and it has not already been loaded
Jeremy Auclair
committed
weather.variables['Rain'][:,:,:] = rain_tif.read()
weather.variables['ET0'][:,:,:] = ET0_tif.read()
Jeremy Auclair
committed
already_written = True
Jeremy Auclair
committed
Jeremy Auclair
committed
elif i % time_slice == 0 and not already_written: # load a time slice every time i is divisible by the size of the time slice
Jeremy Auclair
committed
if i + time_slice <= dims[0]: # if the time slice does not gow over the dataset size
weather.variables['Rain'][i: i + time_slice, :, :] = rain_tif.read(tuple(k+1 for k in range(i, i + time_slice)))
weather.variables['ET0'][i: i + time_slice, :, :] = ET0_tif.read(tuple(k+1 for k in range(i, i + time_slice)))
else: # load the remainder when the time slice would go over the dataset size
weather.variables['Rain'][i: i + remainder, :, :] = rain_tif.read(tuple(k+1 for k in range(i, i + remainder)))
weather.variables['ET0'][i: i + remainder, :, :] = ET0_tif.read(tuple(k+1 for k in range(i, i + remainder)))
progress_bar.update()
# Write dates in weather dataset
weather.variables['time'].units = f'days since {np.datetime_as_string(dates[0], unit = "D")} 00:00:00' # set correct unit
weather.variables['time'][:] = np.arange(0, len(dates)) # save dates as integers representing the number of days since the first day
weather.sync() # flush data to disk
Jeremy Auclair
committed
Jeremy Auclair
committed
progress_bar.close()
Jeremy Auclair
committed
rain_tif.close()
ET0_tif.close()
weather.close()
Jeremy Auclair
committed
def calculate_ET0_pixel(pixel_dataset: xr.Dataset, lat: float, lon: float, h: float = 10) -> np.ndarray:
"""
Calculate ET0 over the year for a single pixel of the ERA5 weather dataset.
Arguments
=========
1. pixel_dataset: ``xr.Dataset``
Jeremy Auclair
committed
extracted dataset that contains all information for a single pixel
Jeremy Auclair
committed
latitudinal coordinate of that pixel
Jeremy Auclair
committed
longitudinal coordinate of that pixel
4. h: ``float`` ``default = 10``
Jeremy Auclair
committed
height of ERA5 wind measurement in meters
Returns
=======
1. ET0_values: ``np.ndarray``
Jeremy Auclair
committed
numpy array containing the ET0 values for each day
"""
# Conversion of xarray dataset to dataframe for ET0 calculation
ET0 = pixel_dataset.d2m_max.to_dataframe().rename(columns = {'d2m_max' : 'Dew_Point_T_max'}) - 273.15 # conversion of temperatures from K to °C
ET0['Dew_Point_T_min'] = pixel_dataset.d2m_min.to_dataframe()['d2m_min'].values - 273.15 # conversion of temperatures from K to °C
ET0['T_min'] = pixel_dataset.t2m_min.to_dataframe()['t2m_min'].values - 273.15 # conversion of temperatures from K to °C
ET0['T_max'] = pixel_dataset.t2m_max.to_dataframe()['t2m_max'].values - 273.15 # conversion of temperatures from K to °C
ET0['Rain'] = pixel_dataset.tp.to_dataframe()['tp'].values*1000 # conversion of total precipitation from meters to milimeters
# Conversion of easward and northward wind values to scalar wind
ET0['U_z'] = np.sqrt(pixel_dataset.u10.to_dataframe()['u10'].values**2 + pixel_dataset.v10.to_dataframe()['v10'].values**2)
ET0['RH_max'] = 100 * ea_calc(ET0['Dew_Point_T_min']) / ea_calc(ET0['T_min']) # calculation of relative humidity from dew point temperature and temperature
ET0['RH_min'] = 100 * ea_calc(ET0['Dew_Point_T_max']) / ea_calc(ET0['T_max']) # calculation of relative humidity from dew point temperature and temperature
ET0['R_s'] = pixel_dataset.ssrd.to_dataframe()['ssrd'].values/1e6 # to convert downward total radiation from J/m² to MJ/m²
ET0.drop(columns = ['Dew_Point_T_max', 'Dew_Point_T_min'], inplace = True) # drop unecessary columns
Jeremy Auclair
committed
Jeremy Auclair
committed
# Start ET0 calculation
eto_calc = eto.ETo()
warnings.filterwarnings('ignore') # remove pandas warning
# ET0 calculation for given pixel (lat, lon) values
eto_calc.param_est(ET0,
freq = 'D', # daily frequence
# Elevation of the met station above mean sea level (m) (only needed if P is not in df).
z_msl = 0.,
lat = lat,
lon = lon,
TZ_lon = None,
z_u = h) # h: height of raw wind speed measurement
# Retrieve ET0 values
Jeremy Auclair
committed
ET0_values = eto_calc.eto_fao(max_ETo = 15, min_ETo = 0, interp = True, maxgap = 10).values # ETo_FAO_mm
Jeremy Auclair
committed
return ET0_values
Jeremy Auclair
committed
def convert_interleave_mode(args: Tuple[str, str, bool]) -> None:
"""
Convert Geotiff files obtained from OTB to Band interleave mode for faster band reading.
Arguments
=========
(packed in args: ``tuple``)
1. input_image: ``str``
Jeremy Auclair
committed
input image to convert
2. output_image: ``str``
Jeremy Auclair
committed
output image to save
3. remove: ``bool`` ``default = True``
Jeremy Auclair
committed
weather to remove input image
Returns
=======
``None``
Jeremy Auclair
committed
"""
input_image, output_image, remove = args
# Open the input file in read mode
with rio.open(input_image, "r") as src:
# Open the output file in write mode
with rio.open(output_image, 'w', driver = src.driver, height = src.height, width = src.width, count = src.count, dtype = src.dtypes[0], crs = src.crs, transform = src.transform, interleave = 'BAND',) as dst:
# Loop over the blocks or windows of the input file
for _, window in src.block_windows(1):
# Write the data to the output file
dst.write(src.read(window = window), window = window)
# Remove unecessary image
if remove:
os.remove(input_image)
return None
Jeremy Auclair
committed
def era5Land_daily_to_yearly_pixel(list_era5land_files: List[str], output_file: str, raw_S2_image_ref: str, ndvi_path: str, start_date: str, end_date: str, h: float = 10, max_ram: int = 8, weather_overwrite: bool = False) -> str:
Jeremy Auclair
committed
"""
Calculate ET0 values from the ERA5 netcdf weather variables.
Output netcdf contains the ET0 and precipitation values for
each day in the selected time period and reprojected
(reprojection run on two processors) on the same grid as the
NDVI values.
Arguments
=========
Jeremy Auclair
committed
1. list_era5land_files: ``List[str]``
Jeremy Auclair
committed
list of netcdf files containing the necessary variables
2. output_file: ``str``
3. raw_S2_image_ref: ``str``
raw Sentinel 2 image at right resolution for reprojection
4. ndvi_path: ``str``
Jeremy Auclair
committed
path to ndvi dataset, used for attributes and coordinates
Jeremy Auclair
committed
5. start_date: ``str``
beginning of the time window to download (format: ``YYYY-MM-DD``)
6. end_date: ``str``
end of the time window to download (format: ``YYYY-MM-DD``)
7. h: ``float`` ``default = 10``
Jeremy Auclair
committed
height of ERA5 wind measurements in meters
Jeremy Auclair
committed
8. max_ram: ``int`` ``default = 8``
Jeremy Auclair
committed
max ram (in GiB) for reprojection and conversion. Two
subprocesses are spawned for OTB, each receiviving
half of requested memory.
Jeremy Auclair
committed
9. weather_overwrite: ``bool`` ``default = False``
Jeremy Auclair
committed
boolean to choose to overwrite weather netCDF
Jeremy Auclair
committed
Returns
=======
Jeremy Auclair
committed
1. output_file_final: ``str``
path to ``netCDF4`` file containing precipitation and ET0 data
Jeremy Auclair
committed
"""
Jeremy Auclair
committed
# Test if file exists
if os.path.exists(output_file + '.nc') and not weather_overwrite:
return output_file + '.nc'
Jeremy Auclair
committed
# Test if memory requirement is not loo large
if np.ceil(virtual_memory().available / (1024**3)) < max_ram:
print('\nRequested', max_ram, 'GiB of memory when available memory is approximately', round(virtual_memory().available / (1024**3), 1), 'GiB.\n\nExiting script.\n')
return None
Jeremy Auclair
committed
# Load all monthly files into a single xarray dataset that contains all dates (daily frequency)
raw_weather_ds = None
for file in list_era5land_files:
if not raw_weather_ds:
raw_weather_ds = load_variable(file)
else:
temp = load_variable(file)
raw_weather_ds = xr.merge([temp, raw_weather_ds])
del temp
Jeremy Auclair
committed
# Clip extra dates
raw_weather_ds = raw_weather_ds.sel({'time': slice(start_date, end_date)}).sortby(variables = 'time')
Jeremy Auclair
committed
# Create ET0 variable (that will be saved) and set attributes
Jeremy Auclair
committed
raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = 'float64')))
Jeremy Auclair
committed
# Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"
Jeremy Auclair
committed
# Fast enough for small datasets (low resolution)
Jeremy Auclair
committed
for lat in raw_weather_ds.coords['lat'].values:
for lon in raw_weather_ds.coords['lon'].values:
# Select whole time period for given (lat, lon) values
select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon'])
# Calculate ET0 values for given pixel
ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h)
# Write ET0 values in xarray Dataset
raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values
# Get necessary data for final dataset and rewrite netcdf attributes
final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min', 'd2m_max', 'd2m_min']) # remove unwanted variables
final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm
# Change datatype to reduce memory usage
Jeremy Auclair
committed
final_weather_ds['tp'] = (final_weather_ds['tp'] * 1000).astype('u2').chunk(chunks = {"time": 1})
final_weather_ds['ET0'] = (final_weather_ds['ET0'] * 1000).astype('u2').chunk(chunks = {"time": 1})
# Write projection
final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326')
Jeremy Auclair
committed
# Set variable attributes
final_weather_ds['ET0'].attrs['units'] = 'mm'
final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
Jeremy Auclair
committed
final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method (scale factor = 1000)'
Jeremy Auclair
committed
final_weather_ds['tp'].attrs['units'] = 'mm'
final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
Jeremy Auclair
committed
final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters (scale factor = 1000)'
Jeremy Auclair
committed
# Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
output_file_rain = output_file + '_rain.tif'
output_file_ET0 = output_file + '_ET0.tif'
Jeremy Auclair
committed
final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16')
final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
output_file_rain_reproj = output_file + '_rain_reproj.tif'
output_file_ET0_reproj = output_file + '_ET0_reproj.tif'
Jeremy Auclair
committed
# Converted image paths
Jeremy Auclair
committed
output_file_final = output_file + '.nc'
Jeremy Auclair
committed
# otbcli_SuperImpose commands
Jeremy Auclair
committed
OTB_command_reproj1 = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_rain + ' -out ' + output_file_rain_reproj + ' uint16 -interpolator linear -ram ' + str(int(max_ram * 1024/2))
OTB_command_reproj2 = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_ET0 + ' -out ' + output_file_ET0_reproj + ' uint16 -interpolator linear -ram ' + str(int(max_ram * 1024/2))
Jeremy Auclair
committed
commands_reproj = [OTB_command_reproj1, OTB_command_reproj2]
Jeremy Auclair
committed
with Pool(2) as p:
p.map(os.system, commands_reproj)
Jeremy Auclair
committed
# Combine to netCDF file
combine_weather2netcdf(output_file_rain_reproj, output_file_ET0_reproj, ndvi_path, output_file_final, available_ram = max_ram)
Jeremy Auclair
committed
Jeremy Auclair
committed
os.remove(output_file_rain_reproj)
os.remove(output_file_ET0_reproj)
Jeremy Auclair
committed
return output_file_final
Jeremy Auclair
committed
Jeremy Auclair
committed
def era5Land_daily_to_yearly_parcel(list_era5land_files: List[str], output_file: str, start_date: str, end_date: str, h: float = 10) -> str:
Jeremy Auclair
committed
"""
Calculate ET0 values from the ERA5 netcdf weather variables.
Output netcdf contains the ET0 and precipitation values for
each day in the selected time period.
Arguments
=========
1. list_era5land_files: ``List[str]``
list of netcdf files containing the necessary variables
2. output_file: ``str``
output file name without extension
Jeremy Auclair
committed
3. start_date: ``str``
beginning of the time window to download (format: ``YYYY-MM-DD``)
4. end_date: ``str``
end of the time window to download (format: ``YYYY-MM-DD``)
5. h: ``float`` ``default = 10``
Jeremy Auclair
committed
height of ERA5 wind measurements in meters
Returns
=======
1. output_file_rain: ``str``
path to ``Geotiff`` file containing precipitation data
2. output_file_ET0: ``str``
path to ``Geotiff`` file containing ET0 data
"""
# Load all monthly files into a single xarray dataset that contains all dates (daily frequency)
raw_weather_ds = None
for file in list_era5land_files:
if not raw_weather_ds:
raw_weather_ds = load_variable(file)
else:
temp = load_variable(file)
raw_weather_ds = xr.merge([temp, raw_weather_ds])
del temp
Jeremy Auclair
committed
# Clip extra dates
raw_weather_ds = raw_weather_ds.sel({'time': slice(start_date, end_date)})
Jeremy Auclair
committed
# Create ET0 variable (that will be saved) and set attributes
raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = 'float64')))
# Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"
for lat in raw_weather_ds.coords['lat'].values:
for lon in raw_weather_ds.coords['lon'].values:
# Select whole time period for given (lat, lon) values
select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon'])
# Calculate ET0 values for given pixel
ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h)
# Write ET0 values in xarray Dataset
raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values
# Get necessary data for final dataset and rewrite netcdf attributes
final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min', 'd2m_max', 'd2m_min']) # remove unwanted variables
final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm
# Change datatype to reduce memory usage
Jeremy Auclair
committed
final_weather_ds['tp'] = (final_weather_ds['tp'] * 1000).astype('u2').chunk(chunks = {"time": 1})
final_weather_ds['ET0'] = (final_weather_ds['ET0'] * 1000).astype('u2').chunk(chunks = {"time": 1})
Jeremy Auclair
committed
# Write projection
final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326')
# Set variable attributes
final_weather_ds['ET0'].attrs['units'] = 'mm'
final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method (scale factor = 1000)'
final_weather_ds['tp'].attrs['units'] = 'mm'
final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
Jeremy Auclair
committed
final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters (scale factor = 1000)'
Jeremy Auclair
committed
# Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
output_file_rain = output_file + '_rain.tif'
output_file_ET0 = output_file + '_ET0.tif'
final_weather_ds.tp.rio.to_raster(output_file_rain, dtype = 'uint16')
final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
return output_file_rain, output_file_ET0
Jeremy Auclair
committed
def extract_rasterstats(args: tuple) -> List[float]:
Jeremy Auclair
committed
"""
Jeremy Auclair
committed
Generate a dataframe for a given raster and a geopandas shapefile object.
It iterates over the features of the shapefile geometry (polygons). This
information is stored in a list.
Jeremy Auclair
committed
Jeremy Auclair
committed
It returns a list that contains the raster values, a feature ``id``
Jeremy Auclair
committed
and the date for the image and every polygon in the shapefile geometry.
It also has identification data relative to the shapefile: landcover (``LC``),
land cover identifier (``id``) This list is returned to be later agregated
in a ``DataFrame``.
This function is used to allow multiprocessing for weather extraction.
Arguments (packed in args: ``tuple``)
=====================================
Jeremy Auclair
committed
1. raster_path: ``str``
Jeremy Auclair
committed
path to multiband Geotiff
Jeremy Auclair
committed
2. shapefile: ``str``
Jeremy Auclair
committed
path to shapefile
Jeremy Auclair
committed
3. config_file: ``str``
Jeremy Auclair
committed
path to config file
Returns
=======
Jeremy Auclair
committed
1. raster_stats: ``List[float]``
Jeremy Auclair
committed
list containing weather values and feature information for every
polygon in the shapefile
"""
# Open arguments packed in args
Jeremy Auclair
committed
raster_path, shapefile, config_file = args
Jeremy Auclair
committed
# Open config file
config_params = config(config_file)
# Create dataframe where zonal statistics will be stored
Jeremy Auclair
committed
raster_stats = []
Jeremy Auclair
committed
# Get dates
dates = pd.to_datetime(pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')).values
# Open ndvi image and shapefile geometry
Jeremy Auclair
committed
raster_dataset = rio.open(raster_path)
Jeremy Auclair
committed
# Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference
Jeremy Auclair
committed
target_epsg = raster_dataset.crs
Jeremy Auclair
committed
# Open shapefile with geopandas and reproject its geometry
shapefile = gpd.read_file(shapefile)
shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg)
# Get no data value
Jeremy Auclair
committed
nodata = raster_dataset.nodata
Jeremy Auclair
committed
# Get the number of bands
Jeremy Auclair
committed
nbands = raster_dataset.count
Jeremy Auclair
committed
# Create progress bar
progress_bar = tqdm(total = len(shapefile.index), desc='Extracting polygon values', unit=' polygons')
# Loop on the individual polygons in the shapefile geometry
for index, row in shapefile.iterrows():
# Get the feature geometry as a shapely object
geom = row.geometry
# id number of the current parcel geometry
id = index + 1
# Get land cover
LC = row.LC
# Create a bounding box around the geometry
bbox = box(*geom.bounds)
# Crop the raster using the bounding box
try:
Jeremy Auclair
committed
cropped_raster, _ = mask(raster_dataset, [bbox], crop = True, all_touched = True)
Jeremy Auclair
committed
except:
print('\nShapefile bounds are not contained in weather dataset bounds.\n\nExiting script.')
return None
# Mask the raster using the geometry
Jeremy Auclair
committed
masked_raster, _ = mask(raster_dataset, [geom], crop = True, all_touched = True)
Jeremy Auclair
committed
# Replace the nodata values with nan
cropped_raster = cropped_raster.astype(np.float32)
cropped_raster[cropped_raster == nodata] = np.NaN
masked_raster = masked_raster.astype(np.float32)
masked_raster[masked_raster == nodata] = np.NaN
# Calculate the zonal statistics
Jeremy Auclair
committed
raster_stats.extend([[dates[i], id, np.nanmean(masked_raster[i]), LC] for i in range(nbands)])
Jeremy Auclair
committed
# Update progress bar
progress_bar.update(1)
# Close dataset and progress bar
Jeremy Auclair
committed
raster_dataset.close()
Jeremy Auclair
committed
progress_bar.close()
Jeremy Auclair
committed
return raster_stats
Jeremy Auclair
committed
def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, config_file: str, save_path: str) -> None:
"""
Extract a weather dataframe for each variable (Rain, ET0) and merge them in one
dataframe. This dataframe is saved as ``csv`` file.
Arguments
=========
Jeremy Auclair
committed
path to rain Geotiff file
Jeremy Auclair
committed
path to ET0 Geotiff file
Jeremy Auclair
committed
path to shapefile
Jeremy Auclair
committed
path to config file
Jeremy Auclair
committed
save path for weather dataframe
Returns
=======
``None``
"""