Skip to content
Snippets Groups Projects
Commit ba969ba9 authored by Jeremy Auclair's avatar Jeremy Auclair
Browse files

input management almost finished

parent 9fff7850
No related branches found
No related tags found
No related merge requests found
tests.py
test_samir.py
test_numpy_xarray.py
*__pycache__*
*config_modspa.json
dl_S2.csv
......
%% Cell type:code id: tags:
``` python
import xarray as xr
from dask.distributed import Client
import os
import numpy as np
from typing import List, Tuple, Union
import warnings
import gc
from parameters.params_samir_class import samir_parameters
from config.config import config
```
%% Cell type:code id: tags:
``` python
def rasterize_samir_parameters(csv_param_file: str, empty_dataset: xr.Dataset, land_cover_raster: str, chunk_size: dict) -> Tuple[xr.Dataset, dict]:
"""
Creates a raster `xarray` dataset from the csv parameter file, the land cover raster and an empty dataset
that contains the right structure (emptied ndvi dataset for example). For each parameter, the function loops
on land cover classes to fill the raster.
## Arguments
1. csv_param_file: `str`
path to csv paramter file
2. empty_dataset: `xr.Dataset`
empty dataset that contains the right structure (emptied ndvi dataset for example).
3. land_cover_raster: `str`
path to land cover netcdf raster
4. chunk_size: `dict`
chunk_size for dask computation
## Returns
1. parameter_dataset: `xr.Dataset`
the dataset containing all the rasterized Parameters
2. scale_factor: `dict`
dictionnary containing the scale factors for each parameter
"""
# Load samir params into an object
table_param = samir_parameters(csv_param_file)
# Set general variables
class_count = table_param.table.shape[1] - 2 # remove dtype and default columns
# Open land cover raster
land_cover = xr.open_dataarray(land_cover_raster, chunks = chunk_size)
# Create dataset
parameter_dataset = empty_dataset.copy(deep = True)
# Create dictionnary containing the scale factors
scale_factor = {}
# Loop on samir parameters and create
for parameter in table_param.table.index[1:]:
# Create new variable and set attributes
parameter_dataset[parameter] = land_cover.copy(deep = True).astype('f4')
parameter_dataset[parameter].attrs['name'] = parameter
parameter_dataset[parameter].attrs['description'] = 'cf SAMIR Doc for detail'
parameter_dataset[parameter].attrs['scale factor'] = str(table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0])
# Assigne value in dictionnary
scale_factor[parameter] = 1/int(table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0])
# Loop on classes to set parameter values for each class
for class_val, class_name in zip(range(1, class_count + 1), table_param.table.columns[2:]):
# Parameter values are multiplied by the scale factor in order to store all values as int16 types
# These values are then rounded to make sure there isn't any decimal point issues when casting the values to int16
parameter_dataset[parameter].values = np.where(parameter_dataset[parameter].values == class_val, round(table_param.table.loc[table_param.table.index == parameter][class_name].values[0]*table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0]), parameter_dataset[parameter].values).astype('f4')
# Return dataset converted to 'int16' data type to reduce memory usage
# and scale_factor dictionnary for later conversion
return parameter_dataset, scale_factor
def setup_time_loop(calculation_variables_t1: List[str], calculation_variables_t2: List[str], empty_dataset: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]:
"""
Creates two temporary `xarray Datasets` that will be used in the SAMIR time loop.
`variables_t1` corresponds to the variables for the previous day and `variables_t2`
corresponds to the variables for the current day. After each loop, `variables_t1`
takes the value of `variables_t2` for the corresponding variables.
## Arguments
1. calculation_variables_t1: `List[str]`
list of strings containing the variable names
for the previous day dataset
2. calculation_variables_t2: `List[str]`
list of strings containing the variable names
for the current day dataset
3. empty_dataset: `xr.Dataset`
empty dataset that contains the right structure
## Returns
1. variables_t1: `xr.Dataset`
output dataset for previous day
2. variables_t2: `xr.Dataset`
output dataset for current day
"""
# Create new dataset
variables_t1 = empty_dataset.copy(deep = True)
# Create empty DataArray for each variable
for variable in calculation_variables_t1:
# Assign new empty DataArray
variables_t1[variable] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'float32'))
variables_t1[variable].attrs['name'] = variable # set name in attributes
# Create new dataset
variables_t2 = empty_dataset.copy(deep = True)
# Create empty DataArray for each variable
for variable in calculation_variables_t2:
# Assign new empty DataArray
variables_t2[variable] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'float32'))
variables_t2[variable].attrs['name'] = variable # set name in attributes
return variables_t1, variables_t2
def prepare_outputs(empty_dataset: xr.Dataset, additional_outputs: List[str] = None) -> xr.Dataset:
"""
Creates the `xarray Dataset` containing the outputs of the SAMIR model that will be saved.
Additional variables can be saved by adding their names to the `additional_outputs` list.
## Arguments
1. empty_dataset: `xr.Dataset`
empty dataset that contains the right structure
2. additional_outputs: `List[str]`
list of additional variable names to be saved
## Returns
1. model_outputs: `xr.Dataset`
model outputs to be saved
"""
# Evaporation and Transpiraion
model_outputs = empty_dataset.copy(deep = True)
model_outputs['E'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
model_outputs['E'].attrs['units'] = 'mm'
model_outputs['E'].attrs['standard_name'] = 'Evaporation'
model_outputs['E'].attrs['description'] = 'Accumulated daily evaporation in milimeters'
model_outputs['E'].attrs['scale factor'] = '1'
model_outputs['Tr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
model_outputs['Tr'].attrs['units'] = 'mm'
model_outputs['Tr'].attrs['standard_name'] = 'Transpiration'
model_outputs['Tr'].attrs['description'] = 'Accumulated daily plant transpiration in milimeters'
model_outputs['Tr'].attrs['scale factor'] = '1'
# Soil Water Content
model_outputs['SWCe'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
model_outputs['SWCe'].attrs['units'] = 'mm'
model_outputs['SWCe'].attrs['standard_name'] = 'Soil Water Content of the evaporative zone'
model_outputs['SWCe'].attrs['description'] = 'Soil water content of the evaporative zone in milimeters'
model_outputs['SWCe'].attrs['scale factor'] = '1'
model_outputs['SWCr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
model_outputs['SWCr'].attrs['units'] = 'mm'
model_outputs['SWCr'].attrs['standard_name'] = 'Soil Water Content of the root zone'
model_outputs['SWCr'].attrs['description'] = 'Soil water content of the root zone in milimeters'
model_outputs['SWCr'].attrs['scale factor'] = '1'
# Irrigation
model_outputs['Irr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
model_outputs['Irr'].attrs['units'] = 'mm'
model_outputs['Irr'].attrs['standard_name'] = 'Irrigation'
model_outputs['Irr'].attrs['description'] = 'Simulated daily irrigation in milimeters'
model_outputs['Irr'].attrs['scale factor'] = '1'
# Deep Percolation
model_outputs['DP'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
model_outputs['DP'].attrs['units'] = 'mm'
model_outputs['DP'].attrs['standard_name'] = 'Deep Percolation'
model_outputs['DP'].attrs['description'] = 'Simulated daily Deep Percolation in milimeters'
model_outputs['DP'].attrs['scale factor'] = '1'
if additional_outputs:
for var in additional_outputs:
model_outputs[var] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))
return model_outputs
def xr_maximum(ds: xr.DataArray, value: Union[xr.DataArray, float, int]) -> xr.DataArray:
"""
Equivalent of `numpy.maximum(ds, value)` for xarray DataArrays
## Arguments
1. ds: `xr.DataArray`
datarray to compare
2. value: `Union[xr.DataArray, float, int]`
value (scalar or dataarray) to compare
## Returns
1. output: `xr.DataArray`
resulting dataarray with maximum value element-wise
"""
return xr.where(ds <= value, value, ds, keep_attrs = True)
def xr_minimum(ds: xr.DataArray, value: Union[xr.DataArray, float, int]) -> xr.DataArray:
"""
Equivalent of `numpy.minimum(ds, value)` for xarray DataArrays
## Arguments
1. ds: `xr.DataArray`
datarray to compare
2. value: `Union[xr.DataArray, float, int]`
value (scalar or dataarray) to compare
## Returns
1. output: `xr.DataArray`
resulting dataarray with minimum value element-wise
"""
return xr.where(ds >= value, value, ds, keep_attrs = True)
def calculate_diff_re(TAW: xr.DataArray, Dr: xr.DataArray, Zr: xr.DataArray, RUE: xr.DataArray, De: xr.DataArray, FCov: xr.DataArray, Ze_: xr.DataArray, DiffE_: xr.DataArray, scale_dict: dict) -> xr.DataArray:
"""
Calculates the diffusion between the top soil layer and the root layer.
## Arguments
1. TAW: `xr.DataArray`
water capacity of root layer
2. Dr: `xr.DataArray`
depletion of root layer
3. Zr: `xr.DataArray`
height of root layer
4. RUE: `xr.DataArray`
total available surface water
5. De: `xr.DataArray`
depletion of the evaporative layer
6. FCov: `xr.DataArray`
fraction cover of plants
7. Ze_: `xr.DataArray`
height of evaporative layer (paramter)
8. DiffE_: `xr.DataArray`
diffusion coefficient between evaporative
and root layers (unitless, parameter)
9. scale_dict: `dict`
dictionnary containing the scale factors for
the rasterized parameters
## Returns
1. diff_re: `xr.Dataset`
the diffusion between the top soil layer and
the root layer
"""
# Temporary variables to make calculation easier to read
tmp1 = (((TAW - Dr) / Zr - (RUE - De) / (scale_dict['Ze'] * Ze_)) / FCov) * (scale_dict['DiffE'] * DiffE_)
tmp2 = ((TAW * scale_dict['Ze'] * Ze_) - (RUE - De - Dr) * Zr) / (Zr + scale_dict['Ze'] * Ze_) - Dr
# Calculate diffusion according to SAMIR equation
diff_re = xr.where(tmp1 < 0, xr_maximum(tmp1, tmp2), xr_minimum(tmp1, tmp2))
# Return zero values where the 'DiffE' parameter is equal to 0
return xr.where(DiffE_ == 0, 0, diff_re)
def calculate_diff_dr(TAW: xr.DataArray, TDW: xr.DataArray, Dr: xr.DataArray, Zr: xr.DataArray, Dd: xr.DataArray, FCov: xr.DataArray, Zsoil_: xr.DataArray, DiffR_: xr.DataArray, scale_dict: dict) -> xr.DataArray:
"""
Calculates the diffusion between the root layer and the deep layer.
## Arguments
1. TAW: `xr.DataArray`
water capacity of root layer
2. TDW: `xr.DataArray`
water capacity of deep layer
3. Dr: `xr.DataArray`
depletion of root layer
4. Zr: `xr.DataArray`
height of root layer
5. Dd: `xr.DataArray`
depletion of deep layer
6. FCov: `xr.DataArray`
fraction cover of plants
7. Zsoil_: `xr.DataArray`
total height of soil (paramter)
8. DiffR_: `xr.DataArray`
Diffusion coefficient between root
and deep layers (unitless, parameter)
9. scale_dict: `dict`
dictionnary containing the scale factors for
the rasterized parameters
## Returns
1. diff_dr: `xr.Dataset`
the diffusion between the root layer and the
deep layer
"""
# Temporary variables to make calculation easier to read
tmp1 = (((TDW - Dd) / (scale_dict['Zsoil'] * Zsoil_ - Zr) - (TAW - Dr) / Zr) / FCov) * scale_dict['DiffR'] * DiffR_
tmp2 = (TDW *Zr - (TAW - Dr - Dd) * (scale_dict['Zsoil'] * Zsoil_ - Zr)) / (scale_dict['Zsoil'] * Zsoil_) - Dd
# Calculate diffusion according to SAMIR equation
diff_dr = xr.where(tmp1 < 0, xr_maximum(tmp1, tmp2), xr_minimum(tmp1, tmp2))
# Return zero values where the 'DiffR' parameter is equal to 0
return xr.where(DiffR_ == 0, 0, diff_dr)
def calculate_W(TEW: xr.DataArray, Dei: xr.DataArray, Dep: xr.DataArray, fewi: xr.DataArray, fewp: xr.DataArray) -> xr.DataArray:
"""
Calculate W, the weighting factor to split the energy available
for evaporation depending on the difference in water availability
in the two evaporation components, ensuring that the larger and
the wetter, the more the evaporation occurs from that component
## Arguments
1. TEW: `xr.DataArray`
water capacity of evaporative layer
2. Dei: `xr.DataArray`
depletion of the evaporative layer
(irrigation part)
3. Dep: `xr.DataArray`
depletion of the evaporative layer
(precipitation part)
4. fewi: `xr.DataArray`
soil fraction which is wetted by irrigation
and exposed to evaporation
5. fewp: `xr.DataArray`
soil fraction which is wetted by precipitation
and exposed to evaporation
## Returns
1. W: `xr.DataArray`
weighting factor W
"""
# Temporary variables to make calculation easier to read
tmp = fewi * (TEW - Dei)
# Calculate the weighting factor to split the energy available for evaporation
W = 1 / (1 + (fewp * (TEW - Dep) / tmp ))
# Return W
return xr.where(tmp > 0, W, 0)
def calculate_Kr(TEW: xr.DataArray, De: xr.DataArray, REW_: xr.DataArray, scale_dict: dict) -> xr.DataArray:
"""
calculates of the reduction coefficient for evaporation dependent
on the amount of water in the soil using the FAO-56 method
## Arguments
1. TEW: `xr.DataArray`
water capacity of evaporative layer
2. De: `xr.DataArray`
depletion of evaporative layer
3. REW_: `xr.DataArray`
readily evaporable water
4. scale_dict: `dict`
dictionnary containing the scale factors for
the rasterized parameters
## Returns
1. Kr: `xr.DataArray`
Kr coefficient
"""
# Formula for calculating Kr
Kr = (TEW - De) / (TEW - scale_dict['REW'] * REW_)
# Return Kr
return xr_maximum(0, xr_minimum(Kr, 1))
def update_Dr(TAW: xr.DataArray, TDW: xr.DataArray, Zr: xr.DataArray, TAW0: xr.DataArray, TDW0: xr.DataArray, Dr0: xr.DataArray, Dd0: xr.DataArray, Zr0: xr.DataArray) -> xr.DataArray:
"""
Return the updated depletion for the root layer
## Arguments
1. TAW: `xr.DataArray`
water capacity of root layer for current day
2. TDW: `xr.DataArray`
water capacity of deep layer for current day
3. Zr: `xr.DataArray`
root layer height for current day
4. TAW0: `xr.DataArray`
water capacity of root layer for previous day
5. TDW0: `xr.DataArray`
water capacity of deep layer for previous day
6. Dr0: `xr.DataArray`
depletion of the root layer for previous day
7. Dd0: `xr.DataArray`
depletion of the deep laye for previous day
8. Zr0: `xr.DataArray`
root layer height for previous day
## Returns
1. output: `xr.DataArray`
updated depletion for the root layer
"""
# Temporary variables to make calculation easier to read
tmp1 = xr_maximum(Dr0 + Dd0 * (TAW - TAW0) / TDW0, 0)
tmp2 = xr_minimum(Dr0 + Dd0 * (TAW - TAW0) / TDW0, TDW)
# Return updated Dr
return xr.where(Zr > Zr0, tmp1, tmp2)
def update_Dd(TAW: xr.DataArray, TDW: xr.DataArray, Zr: xr.DataArray, TAW0: xr.DataArray, TDW0: xr.DataArray, Dd0: xr.DataArray, Zr0: xr.DataArray) -> xr.DataArray:
"""
Return the updated depletion for the deep layer
## Arguments
1. TAW: `xr.DataArray`
water capacity of root layer for current day
2. TDW: `xr.DataArray`
water capacity of deep layer for current day
3. TAW0: `xr.DataArray`
water capacity of root layer for previous day
5. TDW0: `xr.DataArray`
water capacity of deep layer for previous day
6. Dd0: `xr.DataArray`
depletion of the deep laye for previous day
7. Zr0: `xr.DataArray`
root layer height for previous day
## Returns
1. output: `xr.DataArray`
updated depletion for the deep layer
"""
# Temporary variables to make calculation easier to read
tmp1 = xr_maximum(Dd0 - Dd0 * (TAW - TAW0) / TDW0, 0)
tmp2 = xr_minimum(Dd0 - Dd0 * (TAW - TAW0) / TDW0, TDW)
# Return updated Dd
return xr.where(Zr > Zr0, tmp1, tmp2)
def run_samir(json_config_file: str, csv_param_file: str, ndvi_cube_path: str, weather_cube_path: str, soil_params_path: str, land_cover_path: str, chunk_size: dict, save_path: str) -> None:
def run_samir(json_config_file: str, csv_param_file: str, ndvi_cube_path: str, weather_cube_path: str, soil_params_path: str, land_cover_path: str, chunk_size: dict, save_path: str, max_GB: int = 2) -> None:
# warnings.simplefilter("error", category = RuntimeWarning())
warnings.filterwarnings("ignore", message="invalid value encountered in cast")
warnings.filterwarnings("ignore", message="invalid value encountered in divide")
np.errstate(all = 'raise')
gc.disable()
#============ General parameters ============#
config_params = config(json_config_file)
calculation_variables_t2 = ['diff_rei', 'diff_rep', 'diff_dr' , 'Dd', 'De', 'Dei', 'Dep', 'Dr', 'FCov', 'Irrig', 'Kcb', 'Kei', 'Kep', 'Ks', 'Kti', 'Ktp', 'RUE', 'TAW', 'TDW', 'TEW', 'Tei', 'Tep', 'W', 'Zr', 'fewi', 'fewp']
calculation_variables_t1 = ['Dr', 'Dd', 'TAW', 'TDW', 'Zr']
#============ Manage inputs ============#
# NDVI
ndvi_cube = xr.open_dataset(ndvi_cube_path, chunks = chunk_size).astype('f4')
# # Create a daily DateTimeIndex for the desired date range
# daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')
# # Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
# ndvi_cube = ndvi_cube.resample(time = '1D').asfreq().reindex(time = daily_index)
# # Interpolate the dataset along the time dimension to fill nan values
# ndvi_cube = ndvi_cube.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').astype('u1')
# Weather
weather_cube = xr.open_dataset(weather_cube_path, chunks = chunk_size).astype('f4')
# Soil
soil_params = xr.open_dataset(soil_params_path, chunks = chunk_size).astype('f4')
# SAMIR Parameters
param_dataset, scale_factor = rasterize_samir_parameters(csv_param_file, ndvi_cube.drop_vars(['ndvi', 'time']), land_cover_path, chunk_size = chunk_size)
# SAMIR Variables
variables_t1, variables_t2 = setup_time_loop(calculation_variables_t1, calculation_variables_t2, ndvi_cube.drop_vars(['ndvi', 'time']))
# Manage loading of data based on disk size of inputs
if ndvi_cube.nbytes < max_GB * (1024)**3:
ndvi_cube.load()
if weather_cube.nbytes < max_GB * (1024)**3:
weather_cube.load()
#============ Prepare outputs ============#
model_outputs = prepare_outputs(ndvi_cube.drop_vars(['ndvi']))
#============ Prepare time iterations ============#
dates = ndvi_cube.time.values
#============ Create aliases for better readability ============#
# Variables for current day
diff_rei = variables_t2['diff_rei']
diff_rep = variables_t2['diff_rep']
diff_dr = variables_t2['diff_dr']
Dd = variables_t2['Dd']
De = variables_t2['De']
Dei = variables_t2['Dei']
Dep = variables_t2['Dep']
Dr = variables_t2['Dr']
FCov = variables_t2['FCov']
Irrig = variables_t2['Irrig']
Kcb = variables_t2['Kcb']
Kei = variables_t2['Kei']
Kep = variables_t2['Kep']
Ks = variables_t2['Ks']
Kti = variables_t2['Kti']
Ktp = variables_t2['Ktp']
RUE = variables_t2['RUE']
TAW = variables_t2['TAW']
TDW = variables_t2['TDW']
TEW = variables_t2['TEW']
Tei = variables_t2['Tei']
Tep = variables_t2['Tep']
Zr = variables_t2['Zr']
W = variables_t2['W']
fewi = variables_t2['fewi']
fewp = variables_t2['fewp']
diff_rei = variables_t2.diff_rei
diff_rep = variables_t2.diff_rep
diff_dr = variables_t2.diff_dr
Dd = variables_t2.Dd
De = variables_t2.De
Dei = variables_t2.Dei
Dep = variables_t2.Dep
Dr = variables_t2.Dr
FCov = variables_t2.FCov
Irrig = variables_t2.Irrig
Kcb = variables_t2.Kcb
Kei = variables_t2.Kei
Kep = variables_t2.Kep
Ks = variables_t2.Ks
Kti = variables_t2.Kti
Ktp = variables_t2.Ktp
RUE = variables_t2.RUE
TAW = variables_t2.TAW
TDW = variables_t2.TDW
TEW = variables_t2.TEW
Tei = variables_t2.Tei
Tep = variables_t2.Tep
Zr = variables_t2.Zr
W = variables_t2.W
fewi = variables_t2.fewi
fewp = variables_t2.fewp
# Variables for previous day
TAW0 = variables_t1['TAW']
TDW0 = variables_t1['TDW']
Dr0 = variables_t1['Dr']
Dd0 = variables_t1['Dd']
Zr0 = variables_t1['Zr']
TAW0 = variables_t1.TAW
TDW0 = variables_t1.TDW
Dr0 = variables_t1.Dr
Dd0 = variables_t1.Dd
Zr0 = variables_t1.Zr
# Parameters
# Parameters have an underscore (_) behind their name for recognition
DiffE_ = param_dataset['DiffE']
DiffR_ = param_dataset['DiffR']
FW_ = param_dataset['FW']
Fc_stop_ = param_dataset['Fc_stop']
FmaxFC_ = param_dataset['FmaxFC']
Foffset_ = param_dataset['Foffset']
Fslope_ = param_dataset['Fslope']
Init_RU_ = param_dataset['Init_RU']
Irrig_auto_ = param_dataset['Irrig_auto']
Kcmax_ = param_dataset['Kcmax']
KmaxKcb_ = param_dataset['KmaxKcb']
Koffset_ = param_dataset['Koffset']
Kslope_ = param_dataset['Kslope']
Lame_max_ = param_dataset['Lame_max']
REW_ = param_dataset['REW']
Ze_ = param_dataset['Ze']
Zsoil_ = param_dataset['Zsoil']
maxZr_ = param_dataset['maxZr']
minZr_ = param_dataset['minZr']
p_ = param_dataset['p']
DiffE_ = param_dataset.DiffE
DiffR_ = param_dataset.DiffR
FW_ = param_dataset.FW
Fc_stop_ = param_dataset.Fc_stop
FmaxFC_ = param_dataset.FmaxFC
Foffset_ = param_dataset.Foffset
Fslope_ = param_dataset.Fslope
Init_RU_ = param_dataset.Init_RU
Irrig_auto_ = param_dataset.Irrig_auto
Kcmax_ = param_dataset.Kcmax
KmaxKcb_ = param_dataset.KmaxKcb
Koffset_ = param_dataset.Koffset
Kslope_ = param_dataset.Kslope
Lame_max_ = param_dataset.Lame_max
REW_ = param_dataset.REW
Ze_ = param_dataset.Ze
Zsoil_ = param_dataset.Zsoil
maxZr_ = param_dataset.maxZr
minZr_ = param_dataset.minZr
p_ = param_dataset.p
# scale factors
# Scale factors have the following name scheme : s_ + parameter_name
s_DiffE = scale_factor['DiffE']
s_DiffR = scale_factor['DiffR']
s_FW = scale_factor['FW']
s_Fc_stop = scale_factor['Fc_stop']
s_FmaxFC = scale_factor['FmaxFC']
s_Foffset = scale_factor['Foffset']
s_Fslope = scale_factor['Fslope']
s_Init_RU = scale_factor['Init_RU']
# s_Irrig_auto = scale_factor['Irrig_auto']
s_Kcmax = scale_factor['Kcmax']
s_KmaxKcb = scale_factor['KmaxKcb']
s_Koffset = scale_factor['Koffset']
s_Kslope = scale_factor['Kslope']
s_Lame_max = scale_factor['Lame_max']
s_REW = scale_factor['REW']
s_Ze = scale_factor['Ze']
s_Zsoil = scale_factor['Zsoil']
s_maxZr = scale_factor['maxZr']
s_minZr = scale_factor['minZr']
s_p = scale_factor['p']
#============ First day initialization ============#
# Fraction cover
FCov = s_Fslope * Fslope_ * (ndvi_cube['ndvi'].sel({'time': dates[0]})/255) + s_Foffset * Foffset_
FCov = s_Fslope * Fslope_ * (ndvi_cube.ndvi.sel({'time': dates[0]})/255) + s_Foffset * Foffset_
FCov = xr_minimum(xr_maximum(FCov, 0), s_Fc_stop * Fc_stop_)
# Root depth upate
Zr = s_minZr * minZr_ + (FCov / (s_FmaxFC * FmaxFC_)) * s_maxZr * (maxZr_ - minZr_)
# Water capacities
TEW = (soil_params['FC'] - soil_params['WP']/2) * s_Ze * Ze_
RUE = (soil_params['FC'] - soil_params['WP']) * s_Ze * Ze_
TAW = (soil_params['FC'] - soil_params['WP']) * Zr
TDW = (soil_params['FC'] - soil_params['WP']) * (s_Zsoil * Zsoil_ - Zr) # Zd = Zsoil - Zr
TEW = (soil_params.FC - soil_params.WP/2) * s_Ze * Ze_
RUE = (soil_params.FC - soil_params.WP) * s_Ze * Ze_
TAW = (soil_params.FC - soil_params.WP) * Zr
TDW = (soil_params.FC - soil_params.WP) * (s_Zsoil * Zsoil_ - Zr) # Zd = Zsoil - Zr
# Depletions
Dei = RUE * (1 - s_Init_RU * Init_RU_)
Dep = RUE * (1 - s_Init_RU * Init_RU_)
Dr = TAW * (1 - s_Init_RU * Init_RU_)
Dd = TDW * (1 - s_Init_RU * Init_RU_)
# Irrigation ==============!!!!!
Irrig = xr_minimum(xr_maximum(Dr - weather_cube['tp'].sel({'time': dates[0]}) / 1000, 0), s_Lame_max * Lame_max_) * Irrig_auto_
Irrig = xr_minimum(xr_maximum(Dr - weather_cube.tp.sel({'time': dates[0]}) / 1000, 0), s_Lame_max * Lame_max_) * Irrig_auto_
Irrig = xr.where(Dr > TAW * s_p * p_, Irrig, 0)
# Kcb
Kcb = xr_minimum(s_Kslope * Kslope_ * (ndvi_cube['ndvi'].sel({'time': dates[0]}) / 255) + s_Koffset * Koffset_, s_KmaxKcb * KmaxKcb_)
Kcb = xr_minimum(s_Kslope * Kslope_ * (ndvi_cube.ndvi.sel({'time': dates[0]}) / 255) + s_Koffset * Koffset_, s_KmaxKcb * KmaxKcb_)
# Update depletions with rainfall and/or irrigation
## DP
model_outputs['DP'].loc[{'time': dates[0]}] = -xr_minimum(Dd + xr_minimum(Dr - (weather_cube['tp'].sel({'time': dates[0]}) / 1000) - Irrig, 0), 0)
model_outputs.DP.loc[{'time': dates[0]}] = -xr_minimum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig, 0), 0)
## De
Dei = xr_minimum(xr_maximum(Dei - (weather_cube['tp'].sel({'time': dates[0]}) / 1000) - Irrig / (s_FW * FW_ / 100), 0), TEW)
Dep = xr_minimum(xr_maximum(Dep - (weather_cube['tp'].sel({'time': dates[0]}) / 1000), 0), TEW)
Dei = xr_minimum(xr_maximum(Dei - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig / (s_FW * FW_ / 100), 0), TEW)
Dep = xr_minimum(xr_maximum(Dep - (weather_cube.tp.sel({'time': dates[0]}) / 1000), 0), TEW)
fewi = xr_minimum(1 - FCov, (s_FW * FW_ / 100))
fewp = 1 - FCov - fewi
De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
# variables_t1['De'] = xr.where(variables_t1['De'].isfinite(), variables_t1['De'], variables_t1['Dei'] * (s_FW * FW_ / 100) + variables_t1['Dep'] * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!
## Dr
Dr = xr_minimum(xr_maximum(Dr - (weather_cube['tp'].sel({'time': dates[0]}) / 1000) - Irrig, 0), TAW)
Dr = xr_minimum(xr_maximum(Dr - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig, 0), TAW)
## Dd
Dd = xr_minimum(xr_maximum(Dd + xr_minimum(Dr - (weather_cube['tp'].sel({'time': dates[0]}) / 1000) - Irrig, 0), 0), TDW)
Dd = xr_minimum(xr_maximum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig, 0), 0), TDW)
# Diffusion coefficients
diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, FCov, Ze_, DiffE_, scale_factor)
diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, FCov, Ze_, DiffE_, scale_factor)
diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, FCov, Zsoil_, DiffR_, scale_factor)
# Weighing factor W
W = calculate_W(TEW, Dei, Dep, fewi, fewp)
# Soil water contents
model_outputs['SWCe'].loc[{'time': dates[0]}] = 1 - De/TEW
model_outputs['SWCr'].loc[{'time': dates[0]}] = 1 - Dr/TAW
model_outputs.SWCe.loc[{'time': dates[0]}] = 1 - De/TEW
model_outputs.SWCr.loc[{'time': dates[0]}] = 1 - Dr/TAW
# Water Stress coefficient
Ks = xr_minimum((TAW - Dr) / (TAW * (1 - s_p * p_)), 1)
# Reduction coefficient for evaporation
Kei = xr_minimum(W * calculate_Kr(TEW, Dei, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewi * s_Kcmax * Kcmax_)
Kep = xr_minimum((1 - W) * calculate_Kr(TEW, Dep, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewp * s_Kcmax * Kcmax_)
# Prepare coefficients for evapotranspiration
Kti = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dei / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)
Ktp = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dep / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)
Tei = Kti * Ks * Kcb * weather_cube['ET0'].sel({'time': dates[0]}) / 1000
Tep = Ktp * Ks * Kcb * weather_cube['ET0'].sel({'time': dates[0]}) / 1000
Tei = Kti * Ks * Kcb * weather_cube.ET0.sel({'time': dates[0]}) / 1000
Tep = Ktp * Ks * Kcb * weather_cube.ET0.sel({'time': dates[0]}) / 1000
# Update depletions
Dei = xr.where(fewi > 0, xr_minimum(xr_maximum(Dei + (weather_cube['ET0'].sel({'time': dates[0]}) / 1000) * Kei / fewi + Tei - diff_rei, 0), TEW), xr_minimum(xr_maximum(Dei + Tei - diff_rei, 0), TEW))
Dep = xr.where(fewp > 0, xr_minimum(xr_maximum(Dep + (weather_cube['ET0'].sel({'time': dates[0]}) / 1000) * Kep / fewp + Tep - diff_rep, 0), TEW), xr_minimum(xr_maximum(Dep + Tep - diff_rep, 0), TEW))
Dei = xr.where(fewi > 0, xr_minimum(xr_maximum(Dei + (weather_cube.ET0.sel({'time': dates[0]}) / 1000) * Kei / fewi + Tei - diff_rei, 0), TEW), xr_minimum(xr_maximum(Dei + Tei - diff_rei, 0), TEW))
Dep = xr.where(fewp > 0, xr_minimum(xr_maximum(Dep + (weather_cube.ET0.sel({'time': dates[0]}) / 1000) * Kep / fewp + Tep - diff_rep, 0), TEW), xr_minimum(xr_maximum(Dep + Tep - diff_rep, 0), TEW))
De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
# De = xr.where(De.isfinite(), De, Dei * (s_FW * FW_ / 100) + Dep * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!
# Evaporation
model_outputs['E'].loc[{'time': dates[0]}] = xr_maximum((Kei + Kep) * weather_cube['ET0'].sel({'time': dates[0]}) / 1000, 0)
model_outputs.E.loc[{'time': dates[0]}] = xr_maximum((Kei + Kep) * weather_cube.ET0.sel({'time': dates[0]}) / 1000, 0)
# Transpiration
model_outputs['Tr'].loc[{'time': dates[0]}] = Kcb * Ks * weather_cube['ET0'].sel({'time': dates[0]}) / 1000
model_outputs.Tr.loc[{'time': dates[0]}] = Kcb * Ks * weather_cube.ET0.sel({'time': dates[0]}) / 1000
# Potential evapotranspiration and evaporative fraction ??
# Update depletions (root and deep zones) at the end of the day
Dr = xr_minimum(xr_maximum(Dr + model_outputs['E'].loc[{'time': dates[0]}] + model_outputs['Tr'].loc[{'time': dates[0]}] - diff_dr, 0), TAW)
Dr = xr_minimum(xr_maximum(Dr + model_outputs.E.loc[{'time': dates[0]}] + model_outputs.Tr.loc[{'time': dates[0]}] - diff_dr, 0), TAW)
Dd = xr_minimum(xr_maximum(Dd + diff_dr, 0), TDW)
# Write outputs
model_outputs['Irr'].loc[{'time': dates[0]}] = Irrig
model_outputs.Irr.loc[{'time': dates[0]}] = Irrig
# Update variable_t1 values
for variable in calculation_variables_t1:
variables_t1[variable] = variables_t2[variable].copy(deep = True)
#============ Time loop ============#
for i in range(1, len(dates)):
# Update variables
## Fraction cover
FCov = s_Fslope * Fslope_ * (ndvi_cube['ndvi'].sel({'time': dates[0]})/255) + s_Foffset * Foffset_
FCov = s_Fslope * Fslope_ * (ndvi_cube.ndvi.sel({'time': dates[i]})/255) + s_Foffset * Foffset_
FCov = xr_minimum(xr_maximum(FCov, 0), s_Fc_stop * Fc_stop_)
## Root depth upate
Zr = s_minZr * minZr_ + (FCov / (s_FmaxFC * FmaxFC_)) * s_maxZr * (maxZr_ - minZr_)
# Water capacities
TAW = (soil_params['FC'] - soil_params['WP']) * Zr
TDW = (soil_params['FC'] - soil_params['WP']) * (s_Zsoil * Zsoil_ - Zr)
TAW = (soil_params.FC - soil_params.WP) * Zr
TDW = (soil_params.FC - soil_params.WP) * (s_Zsoil * Zsoil_ - Zr)
# Update depletions
Dr = update_Dr(TAW, TDW, Zr, TAW0, TDW0, Dr0, Dd0, Zr0)
Dd = update_Dd(TAW, TDW, Zr, TAW0, TDW0, Dd0, Zr0)
# Update param p
p_ = (xr_minimum(xr_maximum(s_p * p_ + 0.04 * (5 - weather_cube['ET0'].sel({'time': dates[i-1]}) / 1000), 0.1), 0.8) * (1 / s_p)).round(0).astype('i2')
p_ = (xr_minimum(xr_maximum(s_p * p_ + 0.04 * (5 - weather_cube.ET0.sel({'time': dates[i-1]}) / 1000), 0.1), 0.8) * (1 / s_p)).round(0).astype('i2')
# Irrigation ==============!!!!!
Irrig = xr_minimum(xr_maximum(Dr - weather_cube['tp'].sel({'time': dates[i]}) / 1000, 0), s_Lame_max * Lame_max_) * Irrig_auto_
Irrig = xr_minimum(xr_maximum(Dr - weather_cube.tp.sel({'time': dates[i]}) / 1000, 0), s_Lame_max * Lame_max_) * Irrig_auto_
Irrig = xr.where(Dr > TAW * s_p * p_, Irrig, 0)
# Kcb
Kcb = xr_minimum(s_Kslope * Kslope_ * (ndvi_cube['ndvi'].sel({'time': dates[i]}) / 255) + s_Koffset * Koffset_, s_KmaxKcb * KmaxKcb_)
Kcb = xr_minimum(s_Kslope * Kslope_ * (ndvi_cube.ndvi.sel({'time': dates[i]}) / 255) + s_Koffset * Koffset_, s_KmaxKcb * KmaxKcb_)
# Update depletions with rainfall and/or irrigation
## DP
model_outputs['DP'].loc[{'time': dates[i]}] = -xr_minimum(Dd + xr_minimum(Dr - (weather_cube['tp'].sel({'time': dates[i]}) / 1000) - Irrig, 0), 0)
model_outputs.DP.loc[{'time': dates[i]}] = -xr_minimum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig, 0), 0)
## De
Dei = xr_minimum(xr_maximum(Dei - (weather_cube['tp'].sel({'time': dates[i]}) / 1000) - Irrig / (s_FW * FW_ / 100), 0), TEW)
Dep = xr_minimum(xr_maximum(Dep - (weather_cube['tp'].sel({'time': dates[i]}) / 1000), 0), TEW)
Dei = xr_minimum(xr_maximum(Dei - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig / (s_FW * FW_ / 100), 0), TEW)
Dep = xr_minimum(xr_maximum(Dep - (weather_cube.tp.sel({'time': dates[i]}) / 1000), 0), TEW)
fewi = xr_minimum(1 - FCov, (s_FW * FW_ / 100))
fewp = 1 - FCov - fewi
De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
# variables_t1['De'] = xr.where(variables_t1['De'].isfinite(), variables_t1['De'], variables_t1['Dei'] * (s_FW * FW_ / 100) + variables_t1['Dep'] * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!
## Dr
Dr = xr_minimum(xr_maximum(Dr - (weather_cube['tp'].sel({'time': dates[i]}) / 1000) - Irrig, 0), TAW)
Dr = xr_minimum(xr_maximum(Dr - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig, 0), TAW)
## Dd
Dd = xr_minimum(xr_maximum(Dd + xr_minimum(Dr - (weather_cube['tp'].sel({'time': dates[i]}) / 1000) - Irrig, 0), 0), TDW)
Dd = xr_minimum(xr_maximum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig, 0), 0), TDW)
# Diffusion coefficients
diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, FCov, Ze_, DiffE_, scale_factor)
diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, FCov, Ze_, DiffE_, scale_factor)
diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, FCov, Zsoil_, DiffR_, scale_factor)
# Weighing factor W
W = calculate_W(TEW, Dei, Dep, fewi, fewp)
# Soil water contents
model_outputs['SWCe'].loc[{'time': dates[i]}] = 1 - De/TEW
model_outputs['SWCr'].loc[{'time': dates[i]}] = 1 - Dr/TAW
model_outputs.SWCe.loc[{'time': dates[i]}] = 1 - De/TEW
model_outputs.SWCr.loc[{'time': dates[i]}] = 1 - Dr/TAW
# Water Stress coefficient
Ks = xr_minimum((TAW - Dr) / (TAW * (1 - s_p * p_)), 1)
# Reduction coefficient for evaporation
Kei = xr_minimum(W * calculate_Kr(TEW, Dei, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewi * s_Kcmax * Kcmax_)
Kep = xr_minimum((1 - W) * calculate_Kr(TEW, Dei, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewp * s_Kcmax * Kcmax_)
# Prepare coefficients for evapotranspiration
Kti = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dei / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)
Ktp = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dep / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)
Tei = Kti * Ks * Kcb * weather_cube['ET0'].sel({'time': dates[i]}) / 1000
Tep = Ktp * Ks * Kcb * weather_cube['ET0'].sel({'time': dates[i]}) / 1000
Tei = Kti * Ks * Kcb * weather_cube.ET0.sel({'time': dates[i]}) / 1000
Tep = Ktp * Ks * Kcb * weather_cube.ET0.sel({'time': dates[i]}) / 1000
# Update depletions
Dei = xr.where(fewi > 0, xr_minimum(xr_maximum(Dei + (weather_cube['ET0'].sel({'time': dates[i]}) / 1000) * Kei / fewi + Tei - diff_rei, 0), TEW), xr_minimum(xr_maximum(Dei + Tei - diff_rei, 0), TEW))
Dep = xr.where(fewp > 0, xr_minimum(xr_maximum(Dep + (weather_cube['ET0'].sel({'time': dates[i]}) / 1000) * Kep / fewp + Tep - diff_rep, 0), TEW), xr_minimum(xr_maximum(Dep + Tep - diff_rep, 0), TEW))
Dei = xr.where(fewi > 0, xr_minimum(xr_maximum(Dei + (weather_cube.ET0.sel({'time': dates[i]}) / 1000) * Kei / fewi + Tei - diff_rei, 0), TEW), xr_minimum(xr_maximum(Dei + Tei - diff_rei, 0), TEW))
Dep = xr.where(fewp > 0, xr_minimum(xr_maximum(Dep + (weather_cube.ET0.sel({'time': dates[i]}) / 1000) * Kep / fewp + Tep - diff_rep, 0), TEW), xr_minimum(xr_maximum(Dep + Tep - diff_rep, 0), TEW))
De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
# De = xr.where(De.isfinite(), De, Dei * (s_FW * FW_ / 100) + Dep * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!
# Evaporation
model_outputs['E'].loc[{'time': dates[i]}] = xr_maximum((Kei + Kep) * weather_cube['ET0'].sel({'time': dates[i]}) / 1000, 0)
model_outputs.E.loc[{'time': dates[i]}] = xr_maximum((Kei + Kep) * weather_cube.ET0.sel({'time': dates[i]}) / 1000, 0)
# Transpiration
model_outputs['Tr'].loc[{'time': dates[i]}] = Kcb * Ks * weather_cube['ET0'].sel({'time': dates[i]}) / 1000
model_outputs.Tr.loc[{'time': dates[i]}] = Kcb * Ks * weather_cube.ET0.sel({'time': dates[i]}) / 1000
# Potential evapotranspiration and evaporative fraction ??
# Update depletions (root and deep zones) at the end of the day
Dr = xr_minimum(xr_maximum(Dr + model_outputs['E'].loc[{'time': dates[i]}] + model_outputs['Tr'].loc[{'time': dates[i]}] - diff_dr, 0), TAW)
Dr = xr_minimum(xr_maximum(Dr + model_outputs.E.loc[{'time': dates[i]}] + model_outputs.Tr.loc[{'time': dates[i]}] - diff_dr, 0), TAW)
Dd = xr_minimum(xr_maximum(Dd + diff_dr, 0), TDW)
# Write outputs
model_outputs['Irr'].loc[{'time': dates[i]}] = Irrig
model_outputs.Irr.loc[{'time': dates[i]}] = Irrig
# Update variable_t1 values
for variable in calculation_variables_t1:
variables_t1[variable] = variables_t2[variable].copy(deep = True)
print('day ', i+1, '/', len(dates), ' ', end = '\r')
# Scale the model_outputs variable to save in int16 format
model_outputs = model_outputs * 1000
# Write encoding dict
encoding_dict = {}
for variable in list(model_outputs.keys()):
encod = {}
encod['dtype'] = 'i2'
encoding_dict[variable] = encod
# Save model outputs to netcdf
model_outputs.to_netcdf(save_path, encoding = encoding_dict)
return None
```
%% Cell type:code id: tags:
``` python
client = Client()
client
```
%% Output
/home/auclairj/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37667 instead
warnings.warn(
<Client: 'tcp://127.0.0.1:42111' processes=4 threads=8, memory=23.47 GiB>
%% Cell type:code id: tags:
``` python
data_path = '/mnt/e/DATA/DEV_inputs_test'
ndvi_path = data_path + os.sep + 'ndvi.nc'
weather_path = data_path + os.sep + 'weather.nc'
land_cover_path = data_path + os.sep + 'land_cover.nc'
json_config_file = '/home/auclairj/GIT/modspa-pixel/config/config_modspa.json'
param_file = '/home/auclairj/GIT/modspa-pixel/parameters/csv_files/params_samir_test.csv'
soil_path = data_path + os.sep + 'soil.nc'
save_path = data_path + os.sep + 'outputs.nc'
chunk_size = {'x': 5, 'y': 5, 'time': -1}
chunk_size = {'x': -1, 'y': -1, 'time': -1}
run_samir(json_config_file, param_file, ndvi_path, weather_path, soil_path, land_cover_path, chunk_size, save_path)
```
%% Output
%% Cell type:code id: tags:
2023-07-19 17:36:42,921 - distributed.scheduler - ERROR - Couldn't gather keys {"('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)": []} state: ['waiting'] workers: []
NoneType: None
2023-07-19 17:36:42,922 - distributed.scheduler - ERROR - Shut down workers that don't have promised key: [], ('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)
NoneType: None
2023-07-19 17:36:42,924 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {"('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)": ()}
2023-07-19 17:36:43,297 - distributed.scheduler - ERROR - Couldn't gather keys {"('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)": []} state: [None] workers: []
NoneType: None
2023-07-19 17:36:43,298 - distributed.scheduler - ERROR - Shut down workers that don't have promised key: [], ('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)
NoneType: None
2023-07-19 17:36:43,300 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {"('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)": ()}
2023-07-19 17:36:43,454 - distributed.scheduler - ERROR - Couldn't gather keys {"('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)": []} state: [None] workers: []
NoneType: None
2023-07-19 17:36:43,455 - distributed.scheduler - ERROR - Shut down workers that don't have promised key: [], ('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)
NoneType: None
2023-07-19 17:36:43,456 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling {"('astype-1dead4f4f28400d17d384d6a2b513c87', 0, 0)": ()}
/home/auclairj/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in divide
return func(*(_execute_task(a, cache) for a in args))
/home/auclairj/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in divide
return func(*(_execute_task(a, cache) for a in args))
/home/auclairj/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in divide
return func(*(_execute_task(a, cache) for a in args))
day 2 / 366
/home/auclairj/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/core.py:121: RuntimeWarning: invalid value encountered in divide
return func(*(_execute_task(a, cache) for a in args))
day 42 / 366
---------------------------------------------------------------------------
KeyboardInterrupt Traceback (most recent call last)
Cell In[4], line 14
9 save_path = data_path + os.sep + 'outputs.nc'
11 chunk_size = {'x': -1, 'y': -1, 'time': -1}
---> 14 run_samir(json_config_file, param_file, ndvi_path, weather_path, soil_path, land_cover_path, chunk_size, save_path)
Cell In[2], line 734, in run_samir(json_config_file, csv_param_file, ndvi_cube_path, weather_cube_path, soil_params_path, land_cover_path, chunk_size, save_path)
730 De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
731 # De = xr.where(De.isfinite(), De, Dei * (s_FW * FW_ / 100) + Dep * (1 - (s_FW * FW_ / 100)))
732
733 # Evaporation
--> 734 model_outputs['E'].loc[{'time': dates[i]}] = xr_maximum((Kei + Kep) * weather_cube['ET0'].sel({'time': dates[i]}) / 1000, 0)
736 # Transpiration
737 model_outputs['Tr'].loc[{'time': dates[i]}] = Kcb * Ks * weather_cube['ET0'].sel({'time': dates[i]}) / 1000
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/xarray/core/dataarray.py:223, in _LocIndexer.__setitem__(self, key, value)
220 key = dict(zip(self.data_array.dims, labels))
222 dim_indexers = map_index_queries(self.data_array, key).dim_indexers
--> 223 self.data_array[dim_indexers] = value
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/xarray/core/dataarray.py:840, in DataArray.__setitem__(self, key, value)
835 # DataArray key -> Variable key
836 key = {
837 k: v.variable if isinstance(v, DataArray) else v
838 for k, v in self._item_key_to_dict(key).items()
839 }
--> 840 self.variable[key] = value
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/xarray/core/variable.py:977, in Variable.__setitem__(self, key, value)
974 value = np.moveaxis(value, new_order, range(len(new_order)))
976 indexable = as_indexable(self._data)
--> 977 indexable[index_tuple] = value
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/xarray/core/indexing.py:1338, in NumpyIndexingAdapter.__setitem__(self, key, value)
1336 array, key = self._indexing_array_and_key(key)
1337 try:
-> 1338 array[key] = value
1339 except ValueError:
1340 # More informative exception if read-only view
1341 if not array.flags.writeable and not array.flags.owndata:
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/array/core.py:1699, in Array.__array__(self, dtype, **kwargs)
1698 def __array__(self, dtype=None, **kwargs):
-> 1699 x = self.compute()
1700 if dtype and x.dtype != dtype:
1701 x = x.astype(dtype)
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/base.py:381, in DaskMethodsMixin.compute(self, **kwargs)
357 def compute(self, **kwargs):
358 """Compute this dask collection
359
360 This turns a lazy Dask collection into its in-memory equivalent.
(...)
379 dask.compute
380 """
--> 381 (result,) = compute(self, traverse=False, **kwargs)
382 return result
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/base.py:660, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
652 return args
654 schedule = get_scheduler(
655 scheduler=scheduler,
656 collections=collections,
657 get=get,
658 )
--> 660 dsk = collections_to_dsk(collections, optimize_graph, **kwargs)
661 keys, postcomputes = [], []
662 for x in collections:
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/base.py:433, in collections_to_dsk(collections, optimize_graph, optimizations, **kwargs)
431 for opt, val in groups.items():
432 dsk, keys = _extract_graph_and_keys(val)
--> 433 dsk = opt(dsk, keys, **kwargs)
435 for opt_inner in optimizations:
436 dsk = opt_inner(dsk, keys, **kwargs)
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/array/optimization.py:49, in optimize(dsk, keys, fuse_keys, fast_functions, inline_functions_fast_functions, rename_fused_keys, **kwargs)
46 if not isinstance(dsk, HighLevelGraph):
47 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
---> 49 dsk = optimize_blockwise(dsk, keys=keys)
50 dsk = fuse_roots(dsk, keys=keys)
51 dsk = dsk.cull(set(keys))
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/blockwise.py:1080, in optimize_blockwise(graph, keys)
1078 while out.dependencies != graph.dependencies:
1079 graph = out
-> 1080 out = _optimize_blockwise(graph, keys=keys)
1081 return out
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/blockwise.py:1154, in _optimize_blockwise(full_graph, keys)
1151 stack.append(d)
1153 # Merge these Blockwise layers into one
-> 1154 new_layer = rewrite_blockwise([layers[l] for l in blockwise_layers])
1155 out[layer] = new_layer
1157 # Get the new (external) dependencies for this layer.
1158 # This corresponds to the dependencies defined in
1159 # full_graph.dependencies and are not in blockwise_layers
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/blockwise.py:1341, in rewrite_blockwise(inputs)
1339 sub = {}
1340 # Map from (id(key), inds or None) -> index in indices. Used to deduplicate indices.
-> 1341 index_map = {(id(k), inds): n for n, (k, inds) in enumerate(indices)}
1342 for ii, index in enumerate(new_indices):
1343 id_key = (id(index[0]), index[1])
File ~/anaconda3/envs/modspa_pixel/lib/python3.10/site-packages/dask/blockwise.py:1341, in <dictcomp>(.0)
1339 sub = {}
1340 # Map from (id(key), inds or None) -> index in indices. Used to deduplicate indices.
-> 1341 index_map = {(id(k), inds): n for n, (k, inds) in enumerate(indices)}
1342 for ii, index in enumerate(new_indices):
1343 id_key = (id(index[0]), index[1])
KeyboardInterrupt:
``` python
client.close()
```
%% Cell type:code id: tags:
``` python
```
......
......@@ -15,11 +15,16 @@ import xarray as xr # to manage dataset
import pandas as pd # to manage dataframes
import rasterio as rio # to open geotiff files
import geopandas as gpd # to manage shapefile crs projections
from numpy import nan # to use xr.interpolate_na()
from shapely.geometry import box # to create boundary box
from config.config import config # to import config file
from input.input_toolbox import product_str_to_datetime
def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, boundary_shapefile_path: str, resolution: int = 20, chunk_size: dict = {'x': 4000, 'y': 4000, 'time': 8}, acorvi_corr: int = 500) -> str:
def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, boundary_shapefile_path: str, config_file: str, resolution: int = 20, chunk_size: dict = {'x': 512, 'y': 256, 'time': -1}, acorvi_corr: int = 500) -> str:
# Open config_file
config_params = config(config_file)
# Check resolution for Sentinel-2
if not resolution in [10, 20]:
......@@ -72,13 +77,13 @@ def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, bounda
dates = [product_str_to_datetime(prod) for prod in red_paths]
# Open datasets with xarray
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'}).astype('f4')
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'}).astype('f4')
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'}).astype('f4')
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'})
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'})
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'})
if resolution == 10:
mask = xr.where((mask == 4) | (mask == 5), 1, 0).interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
mask = xr.where((mask == 4) | (mask == 5), 1, nan).interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
else:
mask = xr.where((mask == 4) | (mask == 5), 1, 0)
mask = xr.where((mask == 4) | (mask == 5), 1, nan)
# Set time coordinate
red['time'] = pd.to_datetime(dates)
......@@ -94,8 +99,20 @@ def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, bounda
# Mask and scale ndvi
ndvi['ndvi'] = xr.where(ndvi.ndvi < 0, 0, ndvi.ndvi)
ndvi['ndvi'] = xr.where(ndvi.ndvi > 1, 1, ndvi.ndvi)
ndvi['ndvi'] = (ndvi.ndvi*255).sortby('time')
ndvi['ndvi'] = (ndvi.ndvi*255).chunk(chunk_size)
# Sort images by time
ndvi = ndvi.sortby('time')
# Interpolates on a daily frequency
daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index)
# Interpolate the dataset along the time dimension to fill nan values
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').astype('u1')
# Write attributes
ndvi['ndvi'].attrs['units'] = 'None'
ndvi['ndvi'].attrs['standard_name'] = 'NDVI'
......@@ -103,7 +120,7 @@ def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, bounda
ndvi['ndvi'].attrs['scale factor'] = '255'
# Create save path
ndvi_cube_path = save_dir + os.sep + 'NDVI_precube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
ndvi_cube_path = save_dir + os.sep + 'NDVI_cube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
# Save NDVI cube to netcdf
ndvi.to_netcdf(ndvi_cube_path, encoding = {"ndvi": {"dtype": "u1", "_FillValue": 0}})
......
......@@ -18,7 +18,7 @@ import input.lib_era5_land_pixel as era5land # custom built functions for ERA5-
from config.config import config # to import config file
def request_ER5_weather(input_file: str, ndvi_path: str) -> str:
def request_ER5_weather(input_file: str, raw_S2_image_ref: str) -> str:
# Get config file
config_params = config(input_file)
......@@ -120,7 +120,7 @@ def request_ER5_weather(input_file: str, ndvi_path: str) -> str:
print('----------')
# Save daily wheather data into ncfile
weather_daily_ncFile = save_dir + os.sep + config_params.start_date + '_' + config_params.end_date + '_' + config_params.run_name + '_era5-land-daily-meteo.nc'
weather_daily_ncFile = save_dir + os.sep + config_params.start_date + '_' + config_params.end_date + '_' + config_params.run_name + '_era5-land-daily-meteo'
# Temporary save directory for daily file merge
variable_list = ['2m_dewpoint_temperature_daily_maximum', '2m_dewpoint_temperature_daily_minimum', '2m_temperature_daily_maximum', '2m_temperature_daily_minimum', 'total_precipitation_daily_mean', '10m_u_component_of_wind_daily_mean', '10m_v_component_of_wind_daily_mean', 'surface_solar_radiation_downwards_daily_mean']
......@@ -129,7 +129,8 @@ def request_ER5_weather(input_file: str, ndvi_path: str) -> str:
aggregated_files = era5land.concat_monthly_nc_file(list_era5land_hourly_ncFiles, variable_list, save_dir)
# Calculate ET0 over the whole time period
era5land.era5Land_nc_daily_to_ET0(aggregated_files, weather_daily_ncFile, ndvi_path, h = wind_height)
print(weather_daily_ncFile)
era5land.era5Land_nc_daily_to_ET0(aggregated_files, weather_daily_ncFile, raw_S2_image_ref, config_params, h = wind_height)
print('\n', weather_daily_ncFile + '.nc', '\n')
return weather_daily_ncFile
\ No newline at end of file
return weather_daily_ncFile + '.nc'
\ No newline at end of file
......@@ -10,7 +10,7 @@ Functions to call ECMWF Reanalysis with CDS-api
@author: rivalland
"""
import os # for path exploration
import os, shutil # for path exploration and file management
from typing import List # to declare variables
import numpy as np # for math on arrays
import xarray as xr # to manage nc files
......@@ -18,6 +18,11 @@ from datetime import datetime # to manage dates
from p_tqdm import p_map # for multiprocessing with progress bars
from dateutil.rrule import rrule, MONTHLY
from fnmatch import fnmatch # for file name matching
import rasterio # to manage geotiff images
from pandas import date_range
from rasterio.warp import reproject, Resampling # to reproject
from dask.diagnostics import ProgressBar
import re # for string comparison
import warnings # to suppress pandas warning
......@@ -429,19 +434,64 @@ def calculate_ET0_pixel(pixel_dataset: xr.Dataset, lat: float, lon: float, h: fl
return ET0_values
def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_nc_file: str, h: float = 10) -> None:
def reproject_geotiff(source_image: str, destination_image: str, destination_crs: str):
# Open the original GeoTIFF file
with rasterio.open(source_image) as src:
# Get the source CRS and transform
src_crs = src.crs
src_transform = src.transform
# Read the data as a numpy array
source = src.read()
# Optionally, calculate the destination transform and shape based on the new CRS
dst_transform, dst_width, dst_height = rasterio.warp.calculate_default_transform(
src_crs, destination_crs, src.width, src.height, *src.bounds)
# Create an empty numpy array for the destination
destination = np.zeros((src.count, dst_height, dst_width))
# Reproject the source to the destination
reproject(
source,
destination,
src_transform=src_transform,
src_crs=src_crs,
dst_transform=dst_transform,
dst_crs=destination_crs,
resampling=Resampling.nearest)
# Save the reprojected data as a new GeoTIFF file
with rasterio.open(destination_image, "w", **src.meta) as dst:
# Update the metadata with the new CRS, transform and shape
dst.update(
crs=destination_crs,
transform=dst_transform,
width=dst_width,
height=dst_height)
# Write the reprojected data to the file
dst.write(destination)
return None
def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_file: str, raw_S2_image_ref: str, config_params, h: float = 10, max_ram: int = 12288) -> None:
"""
Calculate ET0 values from the ERA5 netcdf weather variables.
Output netcdf contains the ET0 values for each day in the selected
time period and for each ERA5 pixel covering the required area.
time period and reprojected on the same grid as the NDVI values.
## Arguments
1. list_era5land_files: `List[str]`
list of netcdf files containing the necessary variables
2. output_nc_file: `str`
output netcdf file to save
3. h: `float` `default = 10`
2. output_file: `str`
output file name without extension
3. raw_S2_image_ref: `str`
raw Sentinel 2 image at right resolution for reprojection
4. h: `float` `default = 10`
height of ERA5 wind measurements in meters
5. max_ram: `int` `default = 12288`
max ram (in MiB) to give to OTB
## Returns
`None`
......@@ -477,7 +527,10 @@ def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_nc_file: str
final_weather_ds['tp'] = final_weather_ds['tp'] * 1000 # conversion from m to mm
# Change datatype to reduce memory usage
final_weather_ds = (final_weather_ds * 1000).astype('u2')
final_weather_ds = (final_weather_ds * 1000).astype('u2')
# Write projection
final_weather_ds = final_weather_ds.rio.write_crs('EPSG:4326')
# Set variable attributes
final_weather_ds['ET0'].attrs['units'] = 'mm'
......@@ -487,9 +540,28 @@ def era5Land_nc_daily_to_ET0(list_era5land_files: List[str], output_nc_file: str
final_weather_ds['tp'].attrs['units'] = 'mm'
final_weather_ds['tp'].attrs['standard_name'] = 'Precipitation'
final_weather_ds['tp'].attrs['comment'] = 'Volume of total daily precipitation expressed as water height in milimeters'
final_weather_ds['tp'].attrs['scale factor'] = '1000'
final_weather_ds['tp'].attrs['scale factor'] = '1000'
# Save dataset to netcdf, still in wgs84 (lat, lon) coordinates
final_weather_ds.to_netcdf(path = output_nc_file, encoding = {"ET0": {"dtype": "u2"}, "tp": {"dtype": "u2"}})
# Save dataset to geotiff, still in wgs84 (lat, lon) coordinates
output_file_prec = output_file + '_prec.tif'
output_file_ET0 = output_file + '_ET0.tif'
final_weather_ds.tp.rio.to_raster(output_file_prec, dtype = 'uint16')
final_weather_ds.ET0.rio.to_raster(output_file_ET0, dtype = 'uint16')
# Reprojected image paths
output_file_prec_reproj = output_file + '_prec_reproj.tif'
output_file_ET0_reproj = output_file + '_ET0_reproj.tif'
# Run otbcli_SuperImpose
OTB_command = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_prec + ' -out ' + output_file_prec_reproj + ' uint16 -ram ' + str(max_ram)
os.system(OTB_command)
OTB_command = 'otbcli_Superimpose -inr ' + raw_S2_image_ref + ' -inm ' + output_file_ET0 + ' -out ' + output_file_ET0_reproj + ' uint16 -ram ' + str(max_ram)
os.system(OTB_command)
# remove old files and rename outputs
os.remove(output_file_prec)
shutil.move(output_file_prec_reproj, output_file_prec)
os.remove(output_file_ET0)
shutil.move(output_file_ET0_reproj, output_file_ET0)
return None
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment