Newer
Older
Jeremy Auclair
committed
#!/usr/bin/env python
"""
Functions to call ECMWF Reanalysis with CDS-api
- ERA5-land hourly request
- ERA5-land daily request
- request a list of hourly variables dedicated to the calculus of ET0
and the generation of MODSPA daily forcing files
@author: rivalland
"""
import os, shutil # for path exploration and file management
Jeremy Auclair
committed
from typing import List # to declare variables
import numpy as np # for math on arrays
import xarray as xr # to manage nc files
from datetime import datetime # to manage dates
from p_tqdm import p_map # for multiprocessing with progress bars
from dateutil.rrule import rrule, MONTHLY
from fnmatch import fnmatch # for file name matching
import rasterio # to manage geotiff images
from pandas import date_range
from rasterio.warp import reproject, Resampling # to reproject
from dask.diagnostics import ProgressBar
Jeremy Auclair
committed
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
import re # for string comparison
import warnings # to suppress pandas warning
# CDS API external library
# source: https://pypi.org/project/cdsapi/
import cdsapi # to download cds data
import requests # to request data
# FAO ET0 calculator external library
# Notes
# source: https://github.com/Evapotranspiration/ETo
# documentation: https://eto.readthedocs.io/en/latest/
import eto # to calculate ET0
def era5_enclosing_shp_aera(area: List[float], pas: float) -> tuple:
"""
Find the four coordinates including the boxbound scene
to agree with gridsize resolution
system projection: WGS84 lat/lon degree
ARGS:
area: [lat north, lon west, lat south, lon east] in degree WGS84, float
pas: gridsize
RETURN:
-coordinates list corresponding to N,W,S,E corners of the grid in decimal degree
Note: gdal coordinates reference upper left corner of pixel
ERA5 coordinates refere to center of grid
To resolve this difference an offset of pas/2 is apply
"""
lat_max, lon_min, lat_min, lon_max = area
# North
era5_lat_max = round((lat_max//pas+1)*pas, 2)
# West
era5_lon_min = round((lon_min//pas)*pas, 2)
# South
era5_lat_min = round((lat_min//pas)*pas, 2)
# Est
era5_lon_max = round((lon_max//pas+1)*pas, 2)
era5_area = era5_lat_max, era5_lon_min, era5_lat_min, era5_lon_max
return era5_area # [N,W,S,E]
def call_era5land_daily(args: tuple) -> None:
"""
query of one month of daily ERA5-land data of a selected variable
according to a selected statistic
Documentation:
https://datastore.copernicus-climate.eu/documents/app-c3s-daily-era5-statistics/C3S_Application-Documentation_ERA5-daily-statistics-v2.pdf
Arguments
----------
year : TYPE str
year at YYYY format.
month : TYPE str
month at MM format.
variable : TYPE str
user-selectable variable
cf. Appendix A Table 3 for list of input variables availables.
statistic : TYPE str
daily statistic choosed, 3 possibility
daily_mean or daily_minimum or daily_maximum.
area : TYPE list of 4 int
area = [lat_max, lon_min, lat_min, lon_max]
output_path : TYPE str
path for output file.
Returns
-------
Netcdf File.
"""
year, month, variable, statistic, area, output_path = args
# set name of output file for each month (statistic, variable, year, month)
output_filename = \
output_path+os.sep +\
"ERA5-land_"+year+"_"+month+"_"+variable+"_"+statistic+".nc"
if os.path.isfile(output_filename):
print(output_filename, ' already exist')
else:
try:
c = cdsapi.Client(timeout=300)
result = c.service("tool.toolbox.orchestrator.workflow",
params={
"realm": "c3s",
"project": "app-c3s-daily-era5-statistics",
"version": "master",
"kwargs": {
"dataset": "reanalysis-era5-land",
"product_type": "reanalysis",
"variable": variable,
"statistic": statistic,
"year": year,
"month": month,
"time_zone": "UTC+00:0",
"frequency": "1-hourly",
"grid": "0.1/0.1",
"area": {"lat": [area[2], area[0]],
"lon": [area[1], area[3]]}
},
"workflow_name": "application"
})
location = result[0]['location']
res = requests.get(location, stream=True)
print("Writing data to " + output_filename)
with open(output_filename, 'wb') as fh:
for r in res.iter_content(chunk_size=1024):
fh.write(r)
fh.close()
except:
print('!! request', variable, ' failed !! -> year', year, 'month', month)
return None
def call_era5land_daily_for_MODSPA(start_date: str, end_date: str, area: List[float], output_path: str, processes: int = 9) -> None:
"""
request ERA5-land daily variables needed for ET0 calculus and MODSPA forcing
https://cds.climate.copernicus.eu/cdsapp#!/dataset/reanalysis-era5-land?tab=overview
ARGS:
start_date: YYYY-MM-DD string
end_date: YYYY-MM-DD string
area: [nord, ouest, sud, est] in degree WGS84, float
filename_out: fichier de sortie, format xxx.nc, string
processes: nombre de processeurs logiques à utiliser
INFO:
called land surface variables :
'2m_temperature',
'2m_dewpoint_temperature',
'surface_solar_radiation_downward',
'surface_net_solar_radiation',
'surface_pressure',
'mean_sea_level_pressure',
'potential_evaporation',
'evaporation',
'total_evaporation',
'total_precipitation',
'snowfall',
'10m_u_component_of_wind',
'10m_v_component_of_wind',
Arguments
----------
start_date : TYPE
DESCRIPTION.
end_date : TYPE
DESCRIPTION.
area : TYPE
DESCRIPTION.
output_path : TYPE
DESCRIPTION.
Returns
-------
None.
"""
# list of first day of each month date into period
strt_dt = datetime.strptime(start_date, '%Y-%m-%d').replace(day=1)
end_dt = datetime.strptime(end_date, '%Y-%m-%d').replace(day=1)
periods = [dt for dt in rrule(
freq=MONTHLY, dtstart=strt_dt, until=end_dt, bymonthday=1)]
dico = {
'2m_temperature': ['daily_minimum', 'daily_maximum'],
'10m_u_component_of_wind': ['daily_mean'],
'10m_v_component_of_wind': ['daily_mean'],
'total_precipitation': ['daily_mean'],
'surface_solar_radiation_downwards': ['daily_mean'],
'2m_dewpoint_temperature': ['daily_minimum', 'daily_maximum']
}
args = []
# loop on variable to upload
for variable in dico.keys():
# loop on statistic associated to variable to upload
for statistic in dico[variable]:
# loop on year and month
for dt in periods:
year = str(dt.year)
month = '0'+str(dt.month)
month = month[-2:]
# Requete ERA5-land
args.append((year, month, variable, statistic, area, output_path))
# Start pool
p_map(call_era5land_daily, args, **{"num_cpus": processes})
return None
def filename_to_datetime(filename: str) -> datetime.date:
"""
filename_to_datetime returns a `datetime.date` object for the date of the given file name.
## Arguments
1. filename: `str `
name or path of the product
## Returns
1. date: `datetime.date`
datetime.date object, date of the product
"""
# Search for a date pattern (yyyy_mm_dd) in the product name or path
match = re.search('\d{4}_\d{2}', filename)
format = '%Y_%m'
datetime_object = datetime.strptime(match[0], format)
return datetime_object.date()
def concat_monthly_nc_file(list_era5land_monthly_files: List[str], list_variables: List[str], output_path: str) -> List[str]:
"""
Concatenate monthly netcdf datasets into a single file for each given variable.
## Arguments
1. list_era5land_monthly_files: `List[str]`
list of daily files per month
2. list_variables: `List[str]`
names of the required variables as written in the filename
3. output_path: `List[str]`
path to which save the aggregated files
## Returns
1. list_era5land_files: `List[str]`
the list of paths to the aggregated files
"""
if not os.path.exists(output_path): os.mkdir(output_path)
list_era5land_monthly_files.sort()
list_era5land_files = []
# concatenate all dates into a single file for each variable
for variable in list_variables:
curr_var_list = []
dates = []
for file in list_era5land_monthly_files:
# find specific variable
if fnmatch(file, '*' + variable + '*'):
curr_var_list.append(file)
dates.append(filename_to_datetime(file))
curr_datasets = []
for file in curr_var_list:
# open all months for the given variable
curr_datasets.append(xr.open_dataset(file))
# Create file name
try:
concatenated_file = output_path + os.sep + 'era5-land_' + dates[0].strftime('%m-%Y') + '_' + dates[-1].strftime('%m-%Y') + '_' + variable + '.nc'
except:
print(variable)
# Concatenate monthly datasets
concatenated_dataset = xr.concat(curr_datasets, dim = 'time')
# Save datasets
concatenated_dataset.to_netcdf(path = concatenated_file, mode = 'w',)
# Add filename to output list
list_era5land_files.append(concatenated_file)
return list_era5land_files
def uz_to_u2(u_z: List[float], h: float) -> List[float]:
"""
The wind speed measured at heights other than 2 m can be adjusted according
to the follow equation
Arguments
----------
u_z : TYPE float array
measured wind speed z m above the ground surface, ms- 1.
h : TYPE float
height of the measurement above the ground surface, m.
Returns
-------
u2 : TYPE float array
average daily wind speed in meters per second (ms- 1 ) measured at 2 m above the ground.
"""
u2 = u_z*4.87/(np.log(67.8*h - 5.42))
return u2
def ea_calc(T: float) -> float:
"""
comments
Actual vapour pressure (ea) derived from dewpoint temperature '
Arguments
----------
T : Temperature in degree celsius.
Returns
-------
e_a :the actual Vapour pressure in Kpa
"""
e_a = 0.6108*np.exp(17.27*T/(T+237.15))
return e_a
def load_variable(file_name: str) -> xr.Dataset:
"""
Loads an ERA5 meteorological variable into a xarray
dataset according to the modspa architecture.
## Arguments
1. file_name: `str`
netcdf file to load
## Returns
1. variable: `xr.Dataset`
output xarray dataset
"""
# Rename temperature variables according to the statistic (max or min)
if fnmatch(file_name, '*era5-land*2m_temperature_daily_maximum*'): # maximum temperature
variable = xr.open_dataset(file_name).rename({'t2m': 't2m_max'}).drop_vars('realization') # netcdfs from ERA5 carry an unecessary 'realization' coordinate, so it is dropped
elif fnmatch(file_name, '*era5-land*2m_temperature_daily_minimum*'): # minimum temperature
variable = xr.open_dataset(file_name).rename({'t2m': 't2m_min'}).drop_vars('realization')
elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_maximum*'): # maximum dewpoint temperature
variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_max'}).drop_vars('realization')
elif fnmatch(file_name, '*era5-land*2m_dewpoint_temperature_daily_minimum*'): # minimum temperature
variable = xr.open_dataset(file_name).rename({'d2m': 'd2m_min'}).drop_vars('realization')
# Other variables can be loaded without modification
else:
variable = xr.open_dataset(file_name).drop_vars('realization')
return variable
def calculate_ET0_pixel(pixel_dataset: xr.Dataset, lat: float, lon: float, h: float = 10) -> np.ndarray:
"""
Calculate ET0 over the year for a single pixel of the ERA5 weather dataset.
## Arguments
1. pixel_dataset: `xr.Dataset`
extracted dataset that contains all information for a single pixel
2. lat: `float`
latitudinal coordinate of that pixel
3. lon: `float`
longitudinal coordinate of that pixel
4. h: `float` `default = 10`
height of ERA5 wind measurement in meters
## Returns
1. ET0_values: `np.ndarray`
numpy array containing the ET0 values for each day
"""
# Conversion of xarray dataset to dataframe for ET0 calculation
ET0 = pixel_dataset.d2m_max.to_dataframe().rename(columns = {'d2m_max' : 'Dew_Point_T_max'}) - 273.15 # conversion of temperatures from K to °C
ET0['Dew_Point_T_min'] = pixel_dataset.d2m_min.to_dataframe()['d2m_min'].values - 273.15 # conversion of temperatures from K to °C
ET0['T_min'] = pixel_dataset.t2m_min.to_dataframe()['t2m_min'].values - 273.15 # conversion of temperatures from K to °C
ET0['T_max'] = pixel_dataset.t2m_max.to_dataframe()['t2m_max'].values - 273.15 # conversion of temperatures from K to °C
ET0['Rain'] = pixel_dataset.tp.to_dataframe()['tp'].values*1000 # conversion of total precipitation from meters to milimeters
# Conversion of easward and northward wind values to scalar wind
ET0['U_z'] = np.sqrt(pixel_dataset.u10.to_dataframe()['u10'].values**2 + pixel_dataset.v10.to_dataframe()['v10'].values**2)
ET0['RH_max'] = 100 * ea_calc(ET0['Dew_Point_T_min']) / ea_calc(ET0['T_min']) # calculation of relative humidity from dew point temperature and temperature
ET0['RH_min'] = 100 * ea_calc(ET0['Dew_Point_T_max']) / ea_calc(ET0['T_max']) # calculation of relative humidity from dew point temperature and temperature
ET0['R_s'] = pixel_dataset.ssrd.to_dataframe()['ssrd'].values/1e6 # to convert downward total radiation from J/m² to MJ/m²
ET0.drop(columns = ['Dew_Point_T_max', 'Dew_Point_T_min'], inplace = True) # drop unecessary columns
# Start ET0 calculation
eto_calc = eto.ETo()
warnings.filterwarnings('ignore') # remove pandas warning
# ET0 calculation for given pixel (lat, lon) values
eto_calc.param_est(ET0,
freq = 'D', # daily frequence
# Elevation of the met station above mean sea level (m) (only needed if P is not in df).
z_msl = 0.,
lat = lat,
lon = lon,
TZ_lon = None,
z_u = h) # h: height of raw wind speed measurement
# Retrieve ET0 values
ET0_values = eto_calc.eto_fao(max_ETo=15, min_ETo=0, interp=True, maxgap=10).values # ETo_FAO_mm
return ET0_values
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def reproject_geotiff(source_image: str, destination_image: str, destination_crs: str):
# Open the original GeoTIFF file
with rasterio.open(source_image) as src:
# Get the source CRS and transform
src_crs = src.crs
src_transform = src.transform
# Read the data as a numpy array
source = src.read()
# Optionally, calculate the destination transform and shape based on the new CRS
dst_transform, dst_width, dst_height = rasterio.warp.calculate_default_transform(
src_crs, destination_crs, src.width, src.height, *src.bounds)
# Create an empty numpy array for the destination
destination = np.zeros((src.count, dst_height, dst_width))
# Reproject the source to the destination
reproject(
source,
destination,
src_transform=src_transform,
src_crs=src_crs,
dst_transform=dst_transform,
dst_crs=destination_crs,
resampling=Resampling.nearest)
# Save the reprojected data as a new GeoTIFF file
with rasterio.open(destination_image, "w", **src.meta) as dst:
# Update the metadata with the new CRS, transform and shape
dst.update(
crs=destination_crs,
transform=dst_transform,
width=dst_width,
height=dst_height)
# Write the reprojected data to the file
dst.write(destination)
return None
Jeremy Auclair
committed
def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_file: str, raw_S2_image_ref: str, h: float = 10, max_ram: int = 12288) -> None:
Jeremy Auclair
committed
"""
Calculate ET0 values from the ERA5 netcdf weather variables.
Output netcdf contains the ET0 values for each day in the selected
time period and reprojected on the same grid as the NDVI values.
Jeremy Auclair
committed
## Arguments
1. list_era5land_files: `List[str]`
list of netcdf files containing the necessary variables
2. output_file: `str`
output file name without extension
3. raw_S2_image_ref: `str`
raw Sentinel 2 image at right resolution for reprojection
4. h: `float` `default = 10`
Jeremy Auclair
committed
height of ERA5 wind measurements in meters
5. max_ram: `int` `default = 12288`
max ram (in MiB) to give to OTB
Jeremy Auclair
committed
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
## Returns
`None`
"""
# Load all monthly files into a single xarray dataset that contains all dates (daily frequency)
raw_weather_ds = None
for file in list_era5land_files:
if not raw_weather_ds:
raw_weather_ds = load_variable(file)
else:
temp = load_variable(file)
raw_weather_ds = xr.merge([temp, raw_weather_ds])
del temp
# Create ET0 variable (that will be saved) and set attributes
raw_weather_ds = raw_weather_ds.assign(ET0 = (raw_weather_ds.dims, np.zeros(tuple(raw_weather_ds.dims[d] for d in list(raw_weather_ds.dims)), dtype = 'float32')))
# Loop on lattitude and longitude coordinates to calculate ET0 per "pixel"
for lat in raw_weather_ds.coords['lat'].values:
for lon in raw_weather_ds.coords['lon'].values:
# Select whole time period for given (lat, lon) values
select_ds = raw_weather_ds.sel({'lat' : lat, 'lon' : lon}).drop_vars(['lat', 'lon'])
# Calculate ET0 values for given pixel
ET0_values = calculate_ET0_pixel(select_ds, lat, lon, h)
# Write ET0 values in xarray Dataset
raw_weather_ds['ET0'].loc[{'lat' : lat, 'lon' : lon}] = ET0_values
# Get necessary data for final dataset and rewrite netcdf attributes
final_weather_ds = raw_weather_ds.drop_vars(names = ['ssrd', 'v10', 'u10', 't2m_max', 't2m_min', 'd2m_max', 'd2m_min']) # remove unwanted variables
final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm
# Change datatype to reduce memory usage
final_weather_ds = (final_weather_ds * 1000).astype('u2')
# Write projection
final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326')
Jeremy Auclair
committed
# Set variable attributes
final_weather_ds['ET0'].attrs['units'] = 'mm'
final_weather_ds['ET0'].attrs['standard_name'] = 'Potential evapotranspiration'
final_weather_ds['ET0'].attrs['comment'] = 'Potential evapotranspiration accumulated over the day, calculated with the FAO-56 method'
final_weather_ds['ET0'].attrs['scale factor'] = '1000'
final_weather_ds['tp'].attrs['units'] = 'mm'
final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters'
Jeremy Auclair
committed
final_weather_ds['tp'].attrs['scale factor'] = '1000'
Jeremy Auclair
committed
# Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
output_file_prec = output_file + '_prec.tif'
output_file_ET0 = output_file + '_ET0.tif'
final_weather_ds.tp.rio.to_raster(output_file_prec, dtype = 'uint16')
final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
# Reprojected image paths
output_file_prec_reproj = output_file + '_prec_reproj.tif'
output_file_ET0_reproj = output_file + '_ET0_reproj.tif'
Jeremy Auclair
committed
OTB_command = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_prec + ' -out ' + output_file_prec_reproj + ' uint16 -interpolator linear -ram ' + str(max_ram)
Jeremy Auclair
committed
OTB_command = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_ET0 + ' -out ' + output_file_ET0_reproj + ' uint16 -interpolator linear -ram ' + str(max_ram)
os.system(OTB_command)
# remove old files and rename outputs
os.remove(output_file_prec)
shutil.move(output_file_prec_reproj, output_file_prec)
os.remove(output_file_ET0)
shutil.move(output_file_ET0_reproj, output_file_ET0)
Jeremy Auclair
committed
return None