Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
Jeremy Auclair
committed
@author: jeremy auclair
Jeremy Auclair
committed
"""
import xarray as xr # to manage datasets
from numba import njit, float32, int16 # to compile functions in nopython mode for faster calculation
import numpy as np # for vectorized maths
from typing import List, Tuple # to declare argument types
import netCDF4 as nc # to efficiently read and write netCDF files
from tqdm import tqdm # to show a progress bar
from psutil import virtual_memory # to check available ram
from modspa_pixel.parameters.params_samir_class import samir_parameters # to load SAMIR parameters
from modspa_pixel.config.config import config # to load modspa config file
Jeremy Auclair
committed
from modspa_pixel.source.code_toolbox import format_Byte_size # to print memory requirements
def rasterize_samir_parameters(csv_param_file: str, land_cover_raster: str) -> dict:
Creates a dictionnary containing raster parameter values and the scale factors from the csv parameter file
and the land cover raster. For each parameter, the function loops on land cover classes
to fill the raster.
Jeremy Auclair
committed
Jeremy Auclair
committed
Arguments
=========
Jeremy Auclair
committed
path to csv paramter file
Jeremy Auclair
committed
path to land cover netcdf raster
Jeremy Auclair
committed
Returns
=======
the dictionnary containing all the rasterized Parameters
and the scale factors
Jeremy Auclair
committed
"""
Jeremy Auclair
committed
# Load samir params into an object
table_param = samir_parameters(csv_param_file)
Jeremy Auclair
committed
# Set general variables
class_count = table_param.table.shape[1] - 2 # remove dtype and default columns
Jeremy Auclair
committed
# Open land cover raster
land_cover = xr.open_dataarray(land_cover_raster).to_numpy()
# Create parameter dictionnary
parameter_dict = {}
# Loop on samir parameters and create
Jeremy Auclair
committed
for parameter in table_param.table.index[1:]:
value = land_cover.copy().astype('int16')
Jeremy Auclair
committed
# Assign scale factor
# Scale factors have the following name scheme : s_ + parameter_name
parameter_dict['s_' + parameter] = np.float32(1/int(table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0]))
Jeremy Auclair
committed
# Loop on classes to set parameter values for each class
for class_val, class_name in zip(range(1, class_count + 1), table_param.table.columns[2:]):
Jeremy Auclair
committed
# Parameter values are multiplied by the scale factor in order to store all values as int16 types
# These values are then rounded to make sure there isn't any decimal point issues when casting the values to int16
# TODO vr: formule trop longue, trouver un moyen de rendre lisible
value = np.where(value == class_val, round(table_param.table.loc[table_param.table.index == parameter][class_name].values[0] * table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0]), value).astype('int16')
Jeremy Auclair
committed
# Assign parameter value
# Parameters have an underscore (_) behind their name for recognition
parameter_dict[parameter + '_'] = value
Jeremy Auclair
committed
return parameter_dict
def prepare_output_dataset(empty_dataset: xr.Dataset, additional_outputs: dict = None) -> xr.Dataset:
Jeremy Auclair
committed
Creates the `xarray Dataset` containing the outputs of the SAMIR model that will be saved.
Additional variables can be saved by adding their names to the `additional_outputs` list.
Jeremy Auclair
committed
Arguments
=========
empty dataset that contains the right structure
list of additional variable names to be saved
Jeremy Auclair
committed
Returns
=======
model_outputs = empty_dataset.copy(deep=True)
model_outputs['E'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
model_outputs['E'].attrs['units'] = 'mm'
model_outputs['E'].attrs['standard_name'] = 'Evaporation'
model_outputs['E'].attrs['description'] = 'Accumulated daily evaporation in milimeters'
model_outputs['E'].attrs['scale factor'] = '1000'
model_outputs['Tr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
model_outputs['Tr'].attrs['units'] = 'mm'
model_outputs['Tr'].attrs['standard_name'] = 'Transpiration'
model_outputs['Tr'].attrs['description'] = 'Accumulated daily plant transpiration in milimeters'
model_outputs['Tr'].attrs['scale factor'] = '1000'
model_outputs['SWCe'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
model_outputs['SWCe'].attrs['units'] = 'mm'
model_outputs['SWCe'].attrs['standard_name'] = 'Soil Water Content of the evaporative zone'
model_outputs['SWCe'].attrs['description'] = 'Soil water content of the evaporative zone in milimeters'
model_outputs['SWCe'].attrs['scale factor'] = '1000'
model_outputs['SWCr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
model_outputs['SWCr'].attrs['units'] = 'mm'
model_outputs['SWCr'].attrs['standard_name'] = 'Soil Water Content of the root zone'
model_outputs['SWCr'].attrs['description'] = 'Soil water content of the root zone in milimeters'
model_outputs['SWCr'].attrs['scale factor'] = '1000'
model_outputs['Irr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
model_outputs['Irr'].attrs['units'] = 'mm'
model_outputs['Irr'].attrs['standard_name'] = 'Irrigation'
model_outputs['Irr'].attrs['description'] = 'Simulated daily irrigation in milimeters'
model_outputs['Irr'].attrs['scale factor'] = '1000'
model_outputs['DP'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
model_outputs['DP'].attrs['units'] = 'mm'
model_outputs['DP'].attrs['standard_name'] = 'Deep Percolation'
model_outputs['DP'].attrs['description'] = 'Simulated daily Deep Percolation in milimeters'
model_outputs['DP'].attrs['scale factor'] = '1000'
for var in additional_outputs.keys():
model_outputs[var] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_diff_re(TAW: np.ndarray, Dr: np.ndarray, Zr: np.ndarray, RUE: np.ndarray, De: np.ndarray, Wfc: np.ndarray, Ze: np.ndarray, DiffE: np.ndarray) -> np.ndarray:
Calculates the diffusion between the top soil layer and the root layer. Uses numba for faster and parallel calculation.
Jeremy Auclair
committed
Arguments
=========
water capacity of root layer
depletion of root layer
height of root layer
total available surface water = (Wfc-Wwp)*Ze
depletion of the evaporative layer
field capacity
height of evaporative layer (paramter)
diffusion coefficient between evaporative
and root layers (unitless, parameter)
Jeremy Auclair
committed
Returns
=======
the diffusion between the top soil layer and
the root layer
"""
# Temporary variables to make calculation easier to read
tmp1 = ((TAW - Dr) / Zr - (RUE - De) / Ze) / (Wfc * DiffE)
tmp2 = ((TAW * Ze) - (RUE - De - Dr) * Zr) / (Zr + Ze) - Dr
# Calculate diffusion according to SAMIR equation
# Return zero values where the 'DiffE' parameter is equal to 0
return np.where(DiffE == 0, np.float32(0), np.where(tmp1 < 0, np.maximum(tmp1, tmp2), np.minimum(tmp1, tmp2)))
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_diff_dr(TAW: np.ndarray, TDW: np.ndarray, Dr: np.ndarray, Zr: np.ndarray, Dd: np.ndarray, Wfc: np.ndarray, Zsoil: np.ndarray, DiffR: np.ndarray) -> np.ndarray:
Calculates the diffusion between the root layer and the deep layer. Uses numba for faster and parallel calculation.
Jeremy Auclair
committed
Arguments
=========
water capacity of root layer
water capacity of deep layer
depletion of root layer
height of root layer
depletion of deep layer
field capacity
total height of soil (paramter)
Diffusion coefficient between root
and deep layers (unitless, parameter)
Jeremy Auclair
committed
Returns
=======
the diffusion between the root layer and the
deep layer
"""
# Temporary variables to make calculation easier to read
tmp1 = (((TDW - Dd) / (Zsoil - Zr) - (TAW - Dr) / Zr) / Wfc) * DiffR
tmp2 = ((TDW * Zr - (TAW - Dr - Dd) * (Zsoil - Zr)) / Zsoil) - Dd
# Calculate diffusion according to SAMIR equation
# Return zero values where the 'DiffR' parameter is equal to 0
return np.where(DiffR == 0, np.float32(0), np.where(tmp1 < 0, np.maximum(tmp1, tmp2), np.minimum(tmp1, tmp2)))
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_W(TEW: np.ndarray, Dei: np.ndarray, Dep: np.ndarray, fewi: np.ndarray, fewp: np.ndarray) -> np.ndarray:
Calculate W, the weighting factor to split the energy available
for evaporation depending on the difference in water availability
in the two evaporation components, ensuring that the larger and
the wetter, the more the evaporation occurs from that component.
Uses numba for faster and parallel calculation.
Jeremy Auclair
committed
Arguments
=========
water capacity of evaporative layer
depletion of the evaporative layer
(irrigation part)
depletion of the evaporative layer
(precipitation part)
soil fraction which is wetted by irrigation
and exposed to evaporation
soil fraction which is wetted by precipitation
and exposed to evaporation
Jeremy Auclair
committed
Returns
=======
weighting factor W
# Calculate the weighting factor to split the energy available for evaporation
# * Equation: W = 1 / (1 + (fewp * (TEW - Dep) / fewi * (TEW - Dei)))
# Return W
# * Equation: W = where(fewi * (TEW - Dei) > 0, W, 0)
return np.where(fewi * (TEW - Dei) > 0, 1 / (1 + (fewp * (TEW - Dep) / fewi * (TEW - Dei))), np.float32(0))
@njit((float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_Kr(TEW: np.ndarray, De: np.ndarray, REW: np.ndarray) -> np.ndarray:
calculates of the reduction coefficient for evaporation dependent
on the amount of water in the soil using the FAO-56 method. Uses numba for faster and parallel calculation.
Jeremy Auclair
committed
Arguments
=========
water capacity of evaporative layer
depletion of evaporative layer
fixed readily evaporable water
Jeremy Auclair
committed
Returns
=======
Kr coefficient
"""
# Return Kr
return np.maximum(np.float32(0), np.minimum((TEW - De) / (TEW - REW), np.float32(1)))
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_Ks(Dr: np.ndarray, TAW: np.ndarray, p: np.ndarray, E0: np.ndarray, Tr0: np.ndarray) -> np.ndarray:
Calculate Ks coefficient after day 1. Uses numba for faster and parallel calculation.
Arguments
=========
1. Dr: ``np.ndarray``
depletion of the root layer
water capacity of the root layer
fraction of TAW available for plant without inducing stress
surface evaporation of previous day
plant transpiration of previous day
Ks coefficient
"""
# Return Ks
# * Equation: Ks = min((TAW - Dr) / (TAW * (1 - (p + 0.04 * (5 - (E0 + Tr0))))), 1)
return np.minimum((TAW - Dr) / (TAW * (np.float32(1) - (p + 0.04 * (np.float32(5) - (E0 + Tr0))))), np.float32(1)).astype(np.float32)
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], int16[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_irrig(Dr: np.ndarray, TAW: np.ndarray, p: np.ndarray, Rain: np.ndarray, Kcb: np.ndarray, Irrig_auto: np.ndarray, Kcb_stop_irrig: np.ndarray, E0: np.ndarray, Tr0: np.ndarray) -> np.ndarray:
Calculate automatic irrigation after day one. Uses numba for faster and parallel calculation.
Arguments
=========
1. Dr: ``np.ndarray``
depletion of the root layer
water capacity of the root layer
fraction of TAW available for plant without inducing stress
precipitation of the current day
crop coefficient value
parameter for automatic irrigation
Kcb threshold to stop irrigation
surface evaporation of previous day
plant transpiration of previous day
Returns
=======
1. Irrig: ``np.ndarray``
simulated irrigation for the current day
"""
# First step
Irrig = Irrig_auto * np.maximum(Dr - Rain, np.float32(0))
return np.where((Dr > TAW * (p + 0.04 * (np.float32(5) - (E0 + Tr0)))) & (Kcb > np.maximum(Kcb, np.float32(1)) * Kcb_stop_irrig), Irrig, np.float32(0))
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_Te(De: np.ndarray, Dr: np.ndarray, Ks: np.ndarray, Kcb: np.ndarray, Ze: np.ndarray, Zr: np.ndarray, TEW: np.ndarray, TAW: np.ndarray, ET0: np.ndarray) -> np.ndarray:
Calculate Te (root uptake of water) coefficient for current day. Uses numba for faster and parallel calculation.
Arguments
=========
1. De: ``np.ndarray``
depletion of the evaporative layer
depletion of the roor layer
stress coefficient
crop coefficient
height of the evaporative layer
heigh of the root layer
water capacity of the evaporative layer
water capacity of the root layer
reference evapotranspiration
Te coefficient
"""
# * Equation: Kt = min( ((Ze / Zr)**0.6) * (1 - De/TEW) / max(1 - Dr / TAW, 0.01), 1)
# * Equation: Te = Kt * Ks * Kcb * ET0
return (np.minimum(((Ze / Zr)**0.6) * (np.float32(1) - De / TEW) / np.maximum(np.float32(1) - Dr / TAW, 0.001), np.float32(1)) * Ks * Kcb * ET0).astype(np.float32)
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def update_De_from_Diff(De : np.ndarray, few: np.ndarray, Ke: np.ndarray, Te: np.ndarray, Diff_re: np.ndarray, TEW: np.ndarray, ET0: np.ndarray) -> np.ndarray:
Last update step for Dei and Dep depletions. Uses numba for faster and parallel calculation.
Arguments
=========
1. De: ``np.ndarray``
Dei or Dep, depletion of the evaporative layer
fewi or fewp, soil fraction which is wetted by
irrigation or precipitation and exposed to evaporation
Kei or Kep, evaporation coefficient for soil fraction
irrigated or rainfed and exposed to evaporation
root uptake of water
dffusion between the root and evaporation layers
water capacity of the evaporative layer
reference evapotranspiration of the current day
updated Dei or Dep
# Update Dei and Dep depletions
# * Equation: De = where(few > 0, min(max(De + ET0 * Ke / few + Te - Diff_re, 0), TEW), min(max(De + Te - Diff_re, 0), TEW))
return np.where(few > np.float32(0), np.minimum(np.maximum(De + ET0 * Ke / few + Te - Diff_re, np.float32(0)), TEW), np.minimum(np.maximum(De + Te - Diff_re, np.float32(0)), TEW))
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def update_Dr_from_root(Wfc: np.ndarray, Wwp: np.ndarray, Zr: np.ndarray, Zsoil: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray) -> np.ndarray:
Return the updated depletion for the root layer. Uses numba for faster and parallel calculation.
Jeremy Auclair
committed
Arguments
=========
field capacity
field wilting point
root depth for current day
total soil depth (parameter)
depletion of the root layer for previous day
depletion of the deep laye for previous day
root layer height for previous day
Jeremy Auclair
committed
Returns
=======
updated depletion for the root layer
"""
# Temporary variables to make calculation easier to read
tmp1 = np.maximum(Dr0 + Dd0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * (Zsoil - Zr0)), np.float32(0))
tmp2 = np.maximum(Dr0 + Dr0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * Zr0), np.float32(0))
# Return updated Dr
return np.where(Zr > Zr0, tmp1, tmp2)
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def update_Dd_from_root(Wfc: np.ndarray, Wwp: np.ndarray, Zr: np.ndarray, Zsoil: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray) -> np.ndarray:
Return the updated depletion for the deep layer. Uses numba for faster and parallel calculation.
Jeremy Auclair
committed
Arguments
=========
field capacity
field wilting point
root depth for current day
total soil depth (parameter)
depletion of the root layer for previous day
depletion of the deep laye for previous day
root layer height for previous day
Jeremy Auclair
committed
Returns
=======
updated depletion for the deep layer
"""
# Temporary variables to make calculation easier to read
tmp1 = np.maximum(Dd0 - Dd0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * (Zsoil - Zr0)), np.float32(0))
tmp2 = np.maximum(Dd0 - Dr0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * Zr0), np.float32(0))
# Return updated Dd
return np.where(Zr > Zr0, tmp1, tmp2)
def calculate_memory_requirement(x_size: int, y_size: int, time_size: int, nb_inputs: int, nb_outputs: int, nb_variables: int, nb_params: int, nb_bits: int) -> float:
Calculate memory requirement (GiB) of calculation if all datasets where loaded in memory.
Used to determine how to divide the datasets in times chunks for more efficient I/O
operations.
x size of dataset
y size of dataset
number of time bands
number of input variables
number of ouput variables
number of calculation variables
number of raster parameters
number of bits of datatype
Returns
=======
1. total_memory_requirement: ``float``
calculation memory requirement in GiB
"""
# Memory requirement of input datasets
input_memory_requirement = (x_size * y_size * time_size * nb_inputs * nb_bits) / (1024**3)
Jeremy Auclair
committed
# Memory requirement of calculation variables
calculation_memory_requirement = (x_size * y_size * (nb_params + nb_variables) * nb_bits) / (1024**3)
# Memory requirement of output datasets
output_memory_requirement = (x_size * y_size * time_size * nb_outputs * nb_bits) / (1024**3)
# Total memory requirement
total_memory_requirement = input_memory_requirement + calculation_memory_requirement + output_memory_requirement
return total_memory_requirement
Jeremy Auclair
committed
def calculate_time_slices_to_load(memory_requirement: float, time_size: int, security_factor: float, available_ram: int) -> Tuple[int, int, bool]:
Calculate how many time slices to load in memory (for input and output data)
based on available ram and calculation requirements.
Arguments
=========
1. memory_requirement: ``float``
amount of memory needed if whole input/output
datasets would ne loaded.
number of time slices in datasets
float between 0 and 1 to adjust memory requirements
available ram for computation in GiB
number of times slices to load for
input and output data
remainder of euclidian division for
the number of time slices to load
(last block of data to load)
used to know wheather data has been loaded
when the whole time values fit in memory, not
used otherwise
"""
# Possible division factors
division_factors = [1, 2, 3, 4, 8, 16, 32, 64, 128, 256]
Jeremy Auclair
committed
# Determine the time slice to load
for scale in division_factors:
if memory_requirement / scale < security_factor * available_ram:
if scale == 1:
time_slice = time_size
remainder_to_load = None
already_loaded = False
return time_slice, remainder_to_load, already_loaded
else:
time_slice = time_size // scale
Jeremy Auclair
committed
remainder_to_load = time_size % time_slice
already_loaded = None
return time_slice, remainder_to_load, already_loaded
# If dataset is to big, load only one time slice per loop
time_slice = 1
remainder_to_load = 1 # in order to correctly save last date
already_loaded = None
return time_slice, remainder_to_load, already_loaded
def get_empty_arrays(x_size: int, y_size: int, time_size: int, nb_arrays: int, list: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
Short function to make the `run_samir()` function easier to read.
Generates a varying number (`nb_arrays`) of empty numpy arrays of shape
`(x_size, y_size, time_size)` in list or tuple mode. Used to
store variable values before writing in in the output file.
x size of dataset
y size of dataset
number of time bands
number of arrays to generate
weather to return a tuple or a list
Jeremy Auclair
committed
output: ``Tuple[np.ndarray * nb_arrays]`` or ``List[np.ndarray * nb_arrays]``
output empty arrays
"""
# Return empty arrays into a list
if list:
Jeremy Auclair
committed
return [np.zeros((x_size, y_size, time_size), dtype = np.float32) for k in range(nb_arrays)]
# Return empty arrays into a tuple
Jeremy Auclair
committed
return tuple([np.zeros((x_size, y_size, time_size), dtype = np.float32) for k in range(nb_arrays)])
# @profile # type: ignore
def read_inputs(ndvi_cube_path: str, weather_path: str, i: int, time_slice: int, load_all: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
Read input data into numpy arrays based on loop conditions
Arguments
=========
1. ndvi_cube_path: ``str``
path of input NDVI file
path of input weather file
current loop counter
number of time slices to load
boolean to load the whole datasets
Returns
=======
1. NDVI: ``np.ndarray``
scaled NDVI values into numpy array
scaled Rain values into numpy array
scaled ET0 values into numpy array
"""
# Load whole dataset
if load_all:
with nc.Dataset(ndvi_cube_path, mode='r') as ds:
Jeremy Auclair
committed
# Dimensions of ndvi dataset : (time, y, x)
NDVI = np.asarray(ds.variables['NDVI'][:, :, :] / 255, dtype = np.float32)
with nc.Dataset(weather_path, mode='r') as ds:
# Dimensions of ndvi dataset : (time, x, y)
Rain = np.asarray(ds.variables['Rain'][:, :, :] / 100, dtype = np.float32)
ET0 = np.asarray(ds.variables['ET0'][:, :, :] / 1000, dtype = np.float32)
# Load given number of time slices
else:
with nc.Dataset(ndvi_cube_path, mode='r') as ds:
Jeremy Auclair
committed
# Dimensions of ndvi dataset : (time, y, x)
NDVI = np.asarray(ds.variables['NDVI'][i: i + time_slice, :, :] / 255, dtype = np.float32)
with nc.Dataset(weather_path, mode='r') as ds:
# Dimensions of ndvi dataset : (time, x, y)
Rain = np.asarray(ds.variables['Rain'][i: i + time_slice, :, :] / 100, dtype = np.float32)
ET0 = np.asarray(ds.variables['ET0'][i: i + time_slice, :, :] / 1000, dtype = np.float32)
return NDVI, Rain, ET0
def write_outputs(save_path: str, DP: np.ndarray, SWCe: np.ndarray, SWCr: np.ndarray, E: np.ndarray, Tr: np.ndarray, Irrig: np.ndarray, additional_outputs: dict, additional_outputs_data: List[np.ndarray], i: int, time_slice: int, write_all = False) -> None:
Write outputs to netcdf file based on conditions of current loop.
Arguments
=========
1. save_path: ``str``
output netcdf save path
2. DP: ``np.ndarray``
deep percolaton ``np.ndarray``
3. SWCe: ``np.ndarray``
soil water content of evaporative layer ``np.ndarray``
4. SWCr: ``np.ndarray``
soil water content of root layer ``np.ndarray``
5. E: ``np.ndarray``
surface evaporation ``np.ndarray``
6. Tr: ``np.ndarray``
plant transpiration ``np.ndarray``
7. Irrig: ``np.ndarray``
simulated irrigation ``np.ndarray``
8. additional_outputs: ``dict``
dictionnary containing additional outputs and their scale factors
9. additional_outputs_data: ``List[np.ndarray]``
list of additional output ``np.ndarray``. Is ``None`` if no additional ouputs
10. i: ``int``
current loop counter
number of loaded time slices
weather to write the whole output dataset
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
"""
# Write whole output dataset
if write_all:
with nc.Dataset(save_path, mode='r+') as outputs:
# Dimensions of output dataset : (x, y, time)
# Deep percolation
outputs.variables['DP'][:, :, :] = np.round(DP * 100)
# Soil water content of the evaporative layer
outputs.variables['SWCe'][:, :, :] = np.round(SWCe * 1000)
# Soil water content of the root layer
outputs.variables['SWCr'][:, :, :] = np.round(SWCr * 1000)
# Evaporation
outputs.variables['E'][:, :, :] = np.round(E * 1000)
# Transpiration
outputs.variables['Tr'][:, :, :] = np.round(Tr * 1000)
# Irrigation
outputs.variables['Irr'][:, :, :] = np.round(Irrig * 100)
# Additionnal outputs
if additional_outputs:
k = 0
for var, scale in zip(additional_outputs.keys(), additional_outputs.values()):
outputs.variables[var][:, :, :] = np.round(additional_outputs_data[k][:,:,:] * scale)
k+=1
else:
# Write given number of time slices
with nc.Dataset(save_path, mode='r+') as outputs:
# Dimensions of output dataset : (x, y, time)
# Deep percolation
outputs.variables['DP'][:, :, i - time_slice + 1: i + 1] = np.round(DP * 100)
# Soil water content of the evaporative layer
outputs.variables['SWCe'][:, :, i - time_slice + 1: i + 1] = np.round(SWCe * 1000)
# Soil water content of the root layer
outputs.variables['SWCr'][:, :, i - time_slice + 1: i + 1] = np.round(SWCr * 1000)
# Evaporation
outputs.variables['E'][:, :, i - time_slice + 1: i + 1] = np.round(E * 1000)
# Transpiration
outputs.variables['Tr'][:, :, i - time_slice + 1: i + 1] = np.round(Tr * 1000)
# Irrigation
outputs.variables['Irr'][:, :, i - time_slice + 1: i + 1] = np.round(Irrig * 100)
# Additionnal outputs
if additional_outputs:
k=0
for var, scale in zip(additional_outputs.keys(), additional_outputs.values()):
outputs.variables[var][:, :, i - time_slice + 1: i + 1] = np.round(additional_outputs_data[k][:,:,:] * scale)
k+=1
Jeremy Auclair
committed
return None
Jeremy Auclair
committed
# @profile # type: ignore
def samir_daily(NDVI: np.ndarray, ET0: np.ndarray, Rain: np.ndarray, Wfc: np.ndarray, Wwp: np.ndarray, params: dict, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray, E0: np.ndarray, Tr0: np.ndarray, Dei0: np.ndarray, Dep0: np.ndarray, iday: int) -> Tuple[np.ndarray]:
Run the SAMIR model on a single day. Inputs and outputs are `numpy.ndarray`.
Calls functions compiled with numba for intermediary calculations.
Jeremy Auclair
committed
1. NDVI: ``np.ndarray``
input NDVI
Jeremy Auclair
committed
2. ET0: ``np.ndarray``
input ET0
input Rain
field capacity
field wilting point
dictionnary containing the rasterized
samir parameters and their scale factors
previous day root layer depletion
previous day deep layer depletion
previous day root depth
previous day surface evaporation
previous day plant transpiration
previous day surface layer depletion
for irrigation part
previous day surface layer depletion
for precipitation part
current loop counter
Jeremy Auclair
committed
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
1. current_day_outouts: `Tuple[np.ndarray]`
multiple `numpy.ndarray` arrays are returned as a tuple for current day
"""
# Create aliases
DiffE_ = params['DiffE_']
DiffR_ = params['DiffR_']
FW_ = params['FW_']
FmaxFC_ = params['FmaxFC_']
Foffset_ = params['Foffset_']
Fslope_ = params['Fslope_']
Init_RU_ = params['Init_RU_']
Irrig_auto_ = params['Irrig_auto_']
Kcb_max_ = params['Kcb_max_']
Kslope_ = params['Kslope_']
Koffset_ = params['Koffset_']
Kcb_stop_irrig_ = params['Kcb_stop_irrig_']
REW_ = params['REW_'] # used in eval function to calculate Kri and Krp
Ze_ = params['Ze_']
Zsoil_ = params['Zsoil_']
maxZr_ = params['maxZr_']
minZr_ = params['minZr_']
p_ = params['p_']
s_DiffE = params['s_DiffE']
s_DiffR = params['s_DiffR']
s_FW = params['s_FW']
s_FmaxFC = params['s_FmaxFC']
s_Foffset = params['s_Foffset']
s_Fslope = params['s_Fslope']
s_Init_RU = params['s_Init_RU']
s_Kcb_max = params['s_Kcb_max']
s_Kslope = params['s_Kslope']
s_Koffset = params['s_Koffset']
s_Kcb_stop_irrig = params['s_Kcb_stop_irrig']
s_REW = params['s_REW'] # used in eval function to calculate Kri and Krp
s_Ze = params['s_Ze']
s_Zsoil = params['s_Zsoil']
s_maxZr = params['s_maxZr']
s_minZr = params['s_minZr']
s_p = params['s_p']
# Frequently used parameters
Ze = s_Ze * Ze_
FW = s_FW * FW_
# Update variables
# Fraction cover
# * Equation: Fslope * NDVI + Foffset
FCov = s_Fslope * Fslope_ * NDVI + s_Foffset * Foffset_
# * Equation: min(max(FCov, 0), FmaxFC)
FCov = np.minimum(np.maximum(FCov, 0, dtype = np.float32), s_FmaxFC * FmaxFC_, dtype = np.float32)
# Root depth upate
Jeremy Auclair
committed
# * Equation: Zr = max(minZr + (FCov / FmaxFC) * (maxZr - minZr), Ze + 0.001)
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
Zr = np.maximum(s_minZr * minZr_ + (FCov / (s_FmaxFC * FmaxFC_)) * (s_maxZr * maxZr_ - s_minZr * minZr_), Ze + 0.001, dtype = np.float32)
# Water capacities
TAW = (Wfc - Wwp) * Zr
TDW = (Wfc - Wwp) * (s_Zsoil * Zsoil_ - Zr)
TEW = (Wfc - Wwp/2) * Ze
RUE = (Wfc - Wwp) * Ze
# Update depletions from root increase
if iday == 0:
Dei = RUE * (1 - s_Init_RU * Init_RU_)
Dep = RUE * (1 - s_Init_RU * Init_RU_)
Dr = TAW * (1 - s_Init_RU * Init_RU_)
Dd = TDW * (1 - s_Init_RU * Init_RU_)
else:
Dei = Dei0
Dep = Dep0
Dr = update_Dr_from_root(Wfc, Wwp, Zr, s_Zsoil * Zsoil_, Dr0, Dd0, Zr0)
Dd = update_Dd_from_root(Wfc, Wwp, Zr, s_Zsoil * Zsoil_, Dr0, Dd0, Zr0)
# Kcb
# * Equation: Kslope * NDVI + Koffset
Kcb = np.minimum(np.maximum(s_Kslope * Kslope_ * NDVI + s_Koffset * Koffset_, 0, dtype = np.float32), s_Kcb_max * Kcb_max_, dtype = np.float32)
# Irrigation
if iday == 0: # First day of simulation
Irrig = Irrig_auto_ * np.maximum(Dr - Rain, 0, dtype = np.float32)
Irrig = np.where((Dr > TAW * s_p * p_) & (Kcb > np.maximum(Kcb, np.float32(1)) * s_Kcb_stop_irrig * Kcb_stop_irrig_), Irrig, np.float32(0))
else:
Irrig = calculate_irrig(Dr, TAW, s_p * p_, Rain, Kcb, Irrig_auto_, s_Kcb_stop_irrig * Kcb_stop_irrig_, E0, Tr0)
# DP (Deep percolation)
DP = - np.minimum(Dd + np.minimum(Dr - Rain - Irrig, 0, dtype = np.float32), 0, dtype = np.float32)
# Update depletions with Rainfall and/or irrigation
# De
# * Equation: Dei = min(max(Dei - Rain - Irrig / FW, 0), TEW)
Dei = np.minimum(np.maximum(Dei - Rain - Irrig / FW, 0, dtype = np.float32), TEW, dtype = np.float32)
# * Equation: Dep = min(max(Dep - Rain - Irrig / FW, 0), TEW)
Dep = np.minimum(np.maximum(Dep - Rain, 0, dtype = np.float32), TEW, dtype = np.float32)
fewi = np.minimum(1 - FCov, FW, dtype = np.float32)
fewp = 1 - FCov - fewi
# * Equation: De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
De = np.nansum([Dei * fewi, Dep * fewp], dtype = np.float32) / np.nansum([fewi, fewp], dtype = np.float32)
# * Equation: De = where(De.isfinite, De, Dei * FW + Dep * (1 - FW))
De = np.where(np.isfinite(De), De, Dei * FW + Dep * (1 - FW))
# Update depletions from rain and irrigation
Dr, Dd = np.minimum(np.maximum(Dr - Rain - Irrig, 0, dtype = np.float32), TAW, dtype = np.float32), np.minimum(np.maximum(Dd + np.minimum(Dr - Rain - Irrig, 0, dtype = np.float32), 0, dtype = np.float32), TDW, dtype = np.float32)
# Diffusion coefficients
# * Equation: check calculate_diff_re() and calculate_diff_dr functions
Diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, Wfc, Ze, s_DiffE * DiffE_)
Diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, Wfc, Ze, s_DiffE * DiffE_)
Diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, Wfc, s_Zsoil * Zsoil_, s_DiffR * DiffR_)
# Water Stress coefficient
if iday == 0:
Ks = np.minimum((TAW - Dr) / (TAW * (1 - s_p * p_)), 1, dtype = np.float32)
else:
# When not first day
Ks = calculate_Ks(Dr, TAW, s_p * p_, E0, Tr0)
# Reduction coefficient for evaporation
# Create string expressions that are calculated later, uses less memory
W = calculate_W(TEW, Dei, Dep, fewi, fewp)
Kri = 'calculate_Kr(TEW, Dei, s_REW * REW_)'
Krp = 'calculate_Kr(TEW, Dep, s_REW * REW_)'
# * Equation: Kei = np.minimum(W * Kri * (Kcb_max - Kcb), fewi * Kcb_max)
Kei = np.minimum(W * eval(Kri) * (s_Kcb_max * Kcb_max_ - Kcb), fewi * s_Kcb_max * Kcb_max_, dtype = np.float32)
# * Equation: Kep = np.minimum((1 - W) * Krp * (Kcb_max - Kcb), fewp * Kcb_max)
Kep = np.minimum((1 - W) * eval(Krp) * (s_Kcb_max * Kcb_max_ - Kcb), fewp * s_Kcb_max * Kcb_max_, dtype = np.float32)
# Prepare coefficients for evapotranspiration
Tei = calculate_Te(Dei, Dr, Ks, Kcb, s_Ze * Ze_, Zr, TEW, TAW, ET0)
Tep = calculate_Te(Dep, Dr, Ks, Kcb, s_Ze * Ze_, Zr, TEW, TAW, ET0)