Skip to content
Snippets Groups Projects
modspa_samir.py 48.1 KiB
Newer Older
Jeremy Auclair's avatar
Jeremy Auclair committed
"""11-07-2023
Jeremy Auclair's avatar
Jeremy Auclair committed
Usage of the SAMIR model in the Modspa framework. 

import xarray as xr  # to manage datasets
from numba import njit, float32, int16  # to compile functions in nopython mode for faster calculation
import numpy as np  # for vectorized maths
from typing import List, Tuple  # to declare argument types
import netCDF4 as nc  # to efficiently read and write netCDF files
from tqdm import tqdm  # to show a progress bar
from psutil import virtual_memory  # to check available ram
Jeremy Auclair's avatar
Jeremy Auclair committed
from modspa_pixel.parameters.params_samir_class import samir_parameters  # to load SAMIR parameters
from modspa_pixel.config.config import config  # to load modspa config file
from modspa_pixel.source.code_toolbox import format_Byte_size  # to print memory requirements
def rasterize_samir_parameters(csv_param_file: str, land_cover_raster: str) -> dict:
    Creates a dictionnary containing raster parameter values and the scale factors from the csv parameter file
    and the land cover raster. For each parameter, the function loops on land cover classes
    to fill the raster.
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. csv_param_file: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. land_cover_raster: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    parameter_dict: ``dict``
        the dictionnary containing all the rasterized Parameters
        and the scale factors

    # Load samir params into an object
    table_param = samir_parameters(csv_param_file)
    # Set general variables
    class_count = table_param.table.shape[1] - 2  # remove dtype and default columns
    land_cover = xr.open_dataarray(land_cover_raster).to_numpy()
    # Create parameter dictionnary
    parameter_dict = {}
    for parameter in table_param.table.index[1:]:
        value = land_cover.copy().astype('int16')
        # Assign scale factor
        # Scale factors have the following name scheme : s_ + parameter_name
        parameter_dict['s_' + parameter] = np.float32(1/int(table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0]))
        
        # Loop on classes to set parameter values for each class
        for class_val, class_name in zip(range(1, class_count + 1), table_param.table.columns[2:]):
            # Parameter values are multiplied by the scale factor in order to store all values as int16 types
            # These values are then rounded to make sure there isn't any decimal point issues when casting the values to int16
            # TODO vr: formule trop longue, trouver un moyen de rendre lisible
            value = np.where(value == class_val, round(table_param.table.loc[table_param.table.index == parameter][class_name].values[0] * table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0]), value).astype('int16')
        # Assign parameter value
        # Parameters have an underscore (_) behind their name for recognition
        parameter_dict[parameter + '_'] = value
def prepare_output_dataset(empty_dataset: xr.Dataset, additional_outputs: dict = None) -> xr.Dataset:
    Creates the `xarray Dataset` containing the outputs of the SAMIR model that will be saved.
    Additional variables can be saved by adding their names to the `additional_outputs` list.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. empty_dataset: ``xr.Dataset``
        empty dataset that contains the right structure
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. additional_outputs: ``List[str]``
        list of additional variable names to be saved

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. model_outputs: ``xr.Dataset``
        model outputs to be saved
    """
    # Evaporation and Transpiraion
    model_outputs = empty_dataset.copy(deep=True)

    model_outputs['E'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
    model_outputs['E'].attrs['units'] = 'mm'
    model_outputs['E'].attrs['standard_name'] = 'Evaporation'
    model_outputs['E'].attrs['description'] = 'Accumulated daily evaporation in milimeters'
    model_outputs['E'].attrs['scale factor'] = '1000'

    model_outputs['Tr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
    model_outputs['Tr'].attrs['units'] = 'mm'
    model_outputs['Tr'].attrs['standard_name'] = 'Transpiration'
    model_outputs['Tr'].attrs['description'] = 'Accumulated daily plant transpiration in milimeters'
    model_outputs['Tr'].attrs['scale factor'] = '1000'

    # Soil Water Content
    model_outputs['SWCe'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
    model_outputs['SWCe'].attrs['units'] = 'mm'
    model_outputs['SWCe'].attrs['standard_name'] = 'Soil Water Content of the evaporative zone'
    model_outputs['SWCe'].attrs['description'] = 'Soil water content of the evaporative zone in milimeters'
    model_outputs['SWCe'].attrs['scale factor'] = '1000'

    model_outputs['SWCr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
    model_outputs['SWCr'].attrs['units'] = 'mm'
    model_outputs['SWCr'].attrs['standard_name'] = 'Soil Water Content of the root zone'
    model_outputs['SWCr'].attrs['description'] = 'Soil water content of the root zone in milimeters'
    model_outputs['SWCr'].attrs['scale factor'] = '1000'

    # Irrigation
    model_outputs['Irr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
    model_outputs['Irr'].attrs['units'] = 'mm'
    model_outputs['Irr'].attrs['standard_name'] = 'Irrigation'
    model_outputs['Irr'].attrs['description'] = 'Simulated daily irrigation in milimeters'
    model_outputs['Irr'].attrs['scale factor'] = '1000'

    # Deep Percolation
    model_outputs['DP'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))
    model_outputs['DP'].attrs['units'] = 'mm'
    model_outputs['DP'].attrs['standard_name'] = 'Deep Percolation'
    model_outputs['DP'].attrs['description'] = 'Simulated daily Deep Percolation in milimeters'
    model_outputs['DP'].attrs['scale factor'] = '1000'

    if additional_outputs:
        for var in additional_outputs.keys():
            model_outputs[var] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = np.int16))

    return model_outputs


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_diff_re(TAW: np.ndarray, Dr: np.ndarray, Zr: np.ndarray, RUE: np.ndarray, De: np.ndarray, Wfc: np.ndarray, Ze: np.ndarray, DiffE: np.ndarray) -> np.ndarray:
    Calculates the diffusion between the top soil layer and the root layer. Uses numba for faster and parallel calculation.
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. RUE: ``np.ndarray``
        total available surface water = (Wfc-Wwp)*Ze
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. De: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Ze: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. DiffE: ``np.ndarray``
        diffusion coefficient between evaporative
        and root layers (unitless, parameter)
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. diff_re: ``np.ndarray``
        the diffusion between the top soil layer and
        the root layer
    """
    # Temporary variables to make calculation easier to read
    tmp1 = ((TAW - Dr) / Zr - (RUE - De) / Ze) / (Wfc * DiffE)
    tmp2 = ((TAW * Ze) - (RUE - De - Dr) * Zr) / (Zr + Ze) - Dr
    
    # Calculate diffusion according to SAMIR equation
    # Return zero values where the 'DiffE' parameter is equal to 0
    return np.where(DiffE == 0, np.float32(0), np.where(tmp1 < 0, np.maximum(tmp1, tmp2), np.minimum(tmp1, tmp2)))


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_diff_dr(TAW: np.ndarray, TDW: np.ndarray, Dr: np.ndarray, Zr: np.ndarray, Dd: np.ndarray, Wfc: np.ndarray, Zsoil: np.ndarray, DiffR: np.ndarray) -> np.ndarray:
    Calculates the diffusion between the root layer and the deep layer. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. TDW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Dd: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Zsoil: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. DiffR: ``np.ndarray``
        Diffusion coefficient between root
        and deep layers (unitless, parameter)

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Diff_dr: ``np.ndarray``
        the diffusion between the root layer and the
        deep layer
    """

    # Temporary variables to make calculation easier to read
    tmp1 = (((TDW - Dd) / (Zsoil - Zr) - (TAW - Dr) / Zr) / Wfc) * DiffR
    tmp2 = ((TDW * Zr - (TAW - Dr - Dd) * (Zsoil - Zr)) / Zsoil) - Dd

    # Calculate diffusion according to SAMIR equation
    # Return zero values where the 'DiffR' parameter is equal to 0
    return np.where(DiffR == 0, np.float32(0), np.where(tmp1 < 0, np.maximum(tmp1, tmp2), np.minimum(tmp1, tmp2)))


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_W(TEW: np.ndarray, Dei: np.ndarray, Dep: np.ndarray, fewi: np.ndarray, fewp: np.ndarray) -> np.ndarray:
    Calculate W, the weighting factor to split the energy available
    for evaporation depending on the difference in water availability
    in the two evaporation components, ensuring that the larger and
    the wetter, the more the evaporation occurs from that component.
    Uses numba for faster and parallel calculation.
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TEW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Dei: ``np.ndarray``
        depletion of the evaporative layer
        (irrigation part)
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Dep: ``np.ndarray``
        depletion of the evaporative layer
        (precipitation part)
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. fewi: ``np.ndarray``
        soil fraction which is wetted by irrigation
        and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. fewp: ``np.ndarray``
        soil fraction which is wetted by precipitation
        and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. W: ``np.ndarray``

    # Calculate the weighting factor to split the energy available for evaporation
    # * Equation: W = 1 / (1 + (fewp * (TEW - Dep) / fewi * (TEW - Dei)))
    # Return W
    # * Equation: W = where(fewi * (TEW - Dei) > 0, W, 0)
    return np.where(fewi * (TEW - Dei) > 0, 1 / (1 + (fewp * (TEW - Dep) / fewi * (TEW - Dei))), np.float32(0))


@njit((float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_Kr(TEW: np.ndarray, De: np.ndarray, REW: np.ndarray) -> np.ndarray:
    calculates of the reduction coefficient for evaporation dependent
    on the amount of water in the soil using the FAO-56 method. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TEW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. De: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. REW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Kr: ``np.ndarray``
        Kr coefficient
    """

    # Return Kr
    return np.maximum(np.float32(0), np.minimum((TEW - De) / (TEW - REW), np.float32(1)))


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_Ks(Dr: np.ndarray, TAW: np.ndarray, p: np.ndarray, E0: np.ndarray, Tr0: np.ndarray) -> np.ndarray:
    Calculate Ks coefficient after day 1. Uses numba for faster and parallel calculation.
Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. p: ``np.ndarray``
        fraction of TAW available for plant without inducing stress
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. E0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Tr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. Ks: ``np.ndarray``
        Ks coefficient
    """

    # Return Ks
    # * Equation: Ks = min((TAW - Dr) / (TAW * (1 - (p + 0.04 * (5 - (E0 + Tr0))))), 1)
    return np.minimum((TAW - Dr) / (TAW * (np.float32(1) - (p + 0.04 * (np.float32(5) - (E0 + Tr0))))), np.float32(1)).astype(np.float32)


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], int16[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_irrig(Dr: np.ndarray, TAW: np.ndarray, p: np.ndarray, Rain: np.ndarray, Kcb: np.ndarray, Irrig_auto: np.ndarray, Kcb_stop_irrig: np.ndarray, E0: np.ndarray, Tr0: np.ndarray) -> np.ndarray:
    Calculate automatic irrigation after day one. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. p: ``np.ndarray``
        fraction of TAW available for plant without inducing stress
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Rain: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Kcb: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Irrig_auto: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Kcb_stop_irrig: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. E0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    9. Tr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. Irrig: ``np.ndarray``
        simulated irrigation for the current day
    """
    
    # First step
    Irrig = Irrig_auto * np.maximum(Dr - Rain, np.float32(0))
    
    return np.where((Dr > TAW * (p + 0.04 * (np.float32(5) - (E0 + Tr0)))) & (Kcb > np.maximum(Kcb, np.float32(1)) * Kcb_stop_irrig), Irrig, np.float32(0))
    

@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def calculate_Te(De: np.ndarray, Dr: np.ndarray, Ks: np.ndarray, Kcb: np.ndarray, Ze: np.ndarray, Zr: np.ndarray, TEW: np.ndarray, TAW: np.ndarray, ET0: np.ndarray) -> np.ndarray:
    Calculate Te (root uptake of water) coefficient for current day. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. De: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Ks: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Kcb: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Ze: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. TEW: ``np.ndarray``
        water capacity of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    9. ET0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. Te: ``np.ndarray``
        Te coefficient
    """
    
    # * Equation: Kt = min( ((Ze / Zr)**0.6) * (1 - De/TEW) / max(1 - Dr / TAW, 0.01), 1)
    # * Equation: Te = Kt * Ks * Kcb * ET0
    return (np.minimum(((Ze / Zr)**0.6) * (np.float32(1) - De / TEW) / np.maximum(np.float32(1) - Dr / TAW, 0.001), np.float32(1)) * Ks * Kcb * ET0).astype(np.float32)


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def update_De_from_Diff(De : np.ndarray, few: np.ndarray, Ke: np.ndarray, Te: np.ndarray, Diff_re: np.ndarray, TEW: np.ndarray, ET0: np.ndarray) -> np.ndarray:
    Last update step for Dei and Dep depletions. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. De: ``np.ndarray``
        Dei or Dep, depletion of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. few: ``np.ndarray``
        fewi or fewp, soil fraction which is wetted by
        irrigation or precipitation and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Ke: ``np.ndarray``
        Kei or Kep, evaporation coefficient for soil fraction
        irrigated or rainfed and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Te: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Diff_re: ``np.ndarray``
        dffusion between the root and evaporation layers
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. TEW: ``np.ndarray``
        water capacity of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. ET0: ``np.ndarray``
        reference evapotranspiration of the current day

Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    De: ``np.ndarray``
    # Update Dei and Dep depletions
    # * Equation: De = where(few > 0, min(max(De + ET0 * Ke / few + Te - Diff_re, 0), TEW), min(max(De + Te - Diff_re, 0), TEW))
    return np.where(few > np.float32(0), np.minimum(np.maximum(De + ET0 * Ke / few + Te - Diff_re, np.float32(0)), TEW), np.minimum(np.maximum(De + Te - Diff_re, np.float32(0)), TEW))
        

@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def update_Dr_from_root(Wfc: np.ndarray, Wwp: np.ndarray, Zr: np.ndarray, Zsoil: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray) -> np.ndarray:
    Return the updated depletion for the root layer. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Wwp: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Zsoil: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Dr0: ``np.ndarray``
        depletion of the root layer for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Dd0: ``np.ndarray``
        depletion of the deep laye for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Zr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. output: ``np.ndarray``
    # Temporary variables to make calculation easier to read
    tmp1 = np.maximum(Dr0 + Dd0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * (Zsoil - Zr0)), np.float32(0))
    tmp2 = np.maximum(Dr0 + Dr0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * Zr0), np.float32(0))

    # Return updated Dr
    return np.where(Zr > Zr0, tmp1, tmp2)


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True)
def update_Dd_from_root(Wfc: np.ndarray, Wwp: np.ndarray, Zr: np.ndarray, Zsoil: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray) -> np.ndarray:
    Return the updated depletion for the deep layer. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Wwp: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Zsoil: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Dr0: ``np.ndarray``
        depletion of the root layer for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Dd0: ``np.ndarray``
        depletion of the deep laye for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Zr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. output: ``np.ndarray``
        updated depletion for the deep layer
    """

    # Temporary variables to make calculation easier to read
    tmp1 = np.maximum(Dd0 - Dd0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * (Zsoil - Zr0)), np.float32(0))
    tmp2 = np.maximum(Dd0 - Dr0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * Zr0), np.float32(0))

    # Return updated Dd
    return np.where(Zr > Zr0, tmp1, tmp2)


def calculate_memory_requirement(x_size: int, y_size: int, time_size: int, nb_inputs: int, nb_outputs: int, nb_variables: int, nb_params: int, nb_bits: int) -> float:
    Calculate memory requirement (GiB) of calculation if all datasets where loaded in memory.
    Used to determine how to divide the datasets in times chunks for more efficient I/O
    operations.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. x_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. y_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. time_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. nb_inputs: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. nb_outputs: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. nb_variables: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. nb_params: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. nb_bits: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. total_memory_requirement: ``float``
    # Memory requirement of input datasets
    input_memory_requirement = (x_size * y_size * time_size * nb_inputs * nb_bits) / (1024**3)
    # Memory requirement of calculation variables
    calculation_memory_requirement = (x_size * y_size * (nb_params + nb_variables) * nb_bits) / (1024**3)
    
    # Memory requirement of output datasets
    output_memory_requirement = (x_size * y_size * time_size * nb_outputs * nb_bits) / (1024**3)
    
    # Total memory requirement
    total_memory_requirement = input_memory_requirement + calculation_memory_requirement + output_memory_requirement

    return total_memory_requirement
def calculate_time_slices_to_load(memory_requirement: float, time_size: int, security_factor: float, available_ram: int) -> Tuple[int, int, bool]:
    Calculate how many time slices to load in memory (for input and output data)
    based on available ram and calculation requirements.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. memory_requirement: ``float``
        amount of memory needed if whole input/output
        datasets would ne loaded.
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. time_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. security_factor: ``float``
        float between 0 and 1 to adjust memory requirements
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. available_ram: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. time_slice: ``int``
        number of times slices to load for
        input and output data
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. remainder_to_load: ``int``
        remainder of euclidian division for
        the number of time slices to load
        (last block of data to load)
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. already_loaded: ``bool``
        used to know wheather data has been loaded
        when the whole time values fit in memory, not
        used otherwise
    """
    
    # Possible division factors
    division_factors = [1, 2, 3, 4, 8, 16, 32, 64, 128, 256]
    # Determine the time slice to load
    for scale in division_factors:
        
        if memory_requirement / scale < security_factor * available_ram:
            
            if scale == 1:
                
                time_slice = time_size
                remainder_to_load = None
                already_loaded = False
                
                return time_slice, remainder_to_load, already_loaded
            
            else:
                time_slice = time_size // scale
                remainder_to_load = time_size % time_slice
                already_loaded = None
                
                return time_slice, remainder_to_load, already_loaded
    
    # If dataset is to big, load only one time slice per loop
    time_slice = 1
    remainder_to_load = 1  # in order to correctly save last date
    already_loaded = None
        
    return time_slice, remainder_to_load, already_loaded


def get_empty_arrays(x_size: int, y_size: int, time_size: int, nb_arrays: int, list: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    Short function to make the `run_samir()` function easier to read.
    Generates a varying number (`nb_arrays`) of empty numpy arrays of shape 
    `(x_size, y_size, time_size)` in list or tuple mode. Used to
    store variable values before writing in in the output file.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. x_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. y_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. time_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. nb_arrays: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. list: ``bool`` ``default = False``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    output: ``Tuple[np.ndarray * nb_arrays]`` or ``List[np.ndarray * nb_arrays]``
        output empty arrays
    """
    
    # Return empty arrays into a list
    if list:
        return [np.zeros((x_size, y_size, time_size), dtype = np.float32) for k in range(nb_arrays)]
    return tuple([np.zeros((x_size, y_size, time_size), dtype = np.float32) for k in range(nb_arrays)])


# @profile  # type: ignore
def read_inputs(ndvi_cube_path: str, weather_path: str, i: int, time_slice: int, load_all: bool = False) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    Read input data into numpy arrays based on loop conditions

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. ndvi_cube_path: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. weather_path: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. i: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. time_slice: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. load_all: ``bool`` ``default = False``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. NDVI: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Rain: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. ET0: ``np.ndarray``
        scaled ET0 values into numpy array
        
    """
    
    # Load whole dataset
    if load_all:
        
        with nc.Dataset(ndvi_cube_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, y, x)
            NDVI = np.asarray(ds.variables['NDVI'][:, :, :] / 255, dtype = np.float32)
        with nc.Dataset(weather_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, x, y)
            Rain = np.asarray(ds.variables['Rain'][:, :, :] / 100, dtype = np.float32)
            ET0 = np.asarray(ds.variables['ET0'][:, :, :] / 1000, dtype = np.float32)
    
    # Load given number of time slices
    else:
        
        with nc.Dataset(ndvi_cube_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, y, x)
            NDVI = np.asarray(ds.variables['NDVI'][i: i + time_slice, :, :] / 255, dtype = np.float32)
        with nc.Dataset(weather_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, x, y)
            Rain = np.asarray(ds.variables['Rain'][i: i + time_slice, :, :] / 100, dtype = np.float32)
            ET0 = np.asarray(ds.variables['ET0'][i: i + time_slice, :, :] / 1000, dtype = np.float32)
    
    return NDVI, Rain, ET0


def write_outputs(save_path: str, DP: np.ndarray, SWCe: np.ndarray, SWCr: np.ndarray, E: np.ndarray, Tr: np.ndarray, Irrig: np.ndarray, additional_outputs: dict, additional_outputs_data: List[np.ndarray], i: int, time_slice: int, write_all = False) -> None:
    Write outputs to netcdf file based on conditions of current loop.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. save_path: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. DP: ``np.ndarray``
        deep percolaton ``np.ndarray``
    3. SWCe: ``np.ndarray``
        soil water content of evaporative layer ``np.ndarray``
    4. SWCr: ``np.ndarray``
        soil water content of root layer ``np.ndarray``
    5. E: ``np.ndarray``
        surface evaporation ``np.ndarray``
    6. Tr: ``np.ndarray``
        plant transpiration ``np.ndarray``
    7. Irrig: ``np.ndarray``
        simulated irrigation ``np.ndarray``
    8. additional_outputs: ``dict``
        dictionnary containing additional outputs and their scale factors
Jeremy Auclair's avatar
Jeremy Auclair committed
    9. additional_outputs_data: ``List[np.ndarray]``
        list of additional output ``np.ndarray``. Is ``None`` if no additional ouputs
    10. i: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    11. time_slice: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    12. write_all: ``bool`` ``default = False``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    ``None``
    """
    
    
    # Write whole output dataset
    if write_all:
        
        with nc.Dataset(save_path, mode='r+') as outputs:
            # Dimensions of output dataset : (x, y, time)
            # Deep percolation
            outputs.variables['DP'][:, :, :] = np.round(DP * 100)
            # Soil water content of the evaporative layer
            outputs.variables['SWCe'][:, :, :] = np.round(SWCe * 1000)
            # Soil water content of the root layer
            outputs.variables['SWCr'][:, :, :] = np.round(SWCr * 1000)
            # Evaporation
            outputs.variables['E'][:, :, :] = np.round(E * 1000)
            # Transpiration
            outputs.variables['Tr'][:, :, :] = np.round(Tr * 1000)
            # Irrigation
            outputs.variables['Irr'][:, :, :] = np.round(Irrig * 100)
            # Additionnal outputs
            if additional_outputs:
                k = 0
                for var, scale in zip(additional_outputs.keys(), additional_outputs.values()):
                    outputs.variables[var][:, :, :] = np.round(additional_outputs_data[k][:,:,:] * scale)
                    k+=1
    
    else:
        
        # Write given number of time slices
        with nc.Dataset(save_path, mode='r+') as outputs:
            # Dimensions of output dataset : (x, y, time)
            # Deep percolation
            outputs.variables['DP'][:, :, i - time_slice + 1: i + 1] = np.round(DP * 100)
            # Soil water content of the evaporative layer
            outputs.variables['SWCe'][:, :, i - time_slice + 1: i + 1] = np.round(SWCe * 1000)
            # Soil water content of the root layer
            outputs.variables['SWCr'][:, :, i - time_slice + 1: i + 1] = np.round(SWCr * 1000)
            # Evaporation
            outputs.variables['E'][:, :, i - time_slice + 1: i + 1] = np.round(E * 1000)
            # Transpiration
            outputs.variables['Tr'][:, :, i - time_slice + 1: i + 1] = np.round(Tr * 1000)
            # Irrigation
            outputs.variables['Irr'][:, :, i - time_slice + 1: i + 1] = np.round(Irrig * 100)
            # Additionnal outputs
            if additional_outputs:
                k=0
                for var, scale in zip(additional_outputs.keys(), additional_outputs.values()):
                    outputs.variables[var][:, :, i - time_slice + 1: i + 1] = np.round(additional_outputs_data[k][:,:,:] * scale)
                    k+=1
        
def samir_daily(NDVI: np.ndarray, ET0: np.ndarray, Rain: np.ndarray, Wfc: np.ndarray, Wwp: np.ndarray, params: dict, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray, E0: np.ndarray, Tr0: np.ndarray, Dei0: np.ndarray, Dep0: np.ndarray, iday: int) -> Tuple[np.ndarray]:
    Run the SAMIR model on a single day. Inputs and outputs are `numpy.ndarray`.
    Calls functions compiled with numba for intermediary calculations.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Rain: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Wwp: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. params: ``dict``
        dictionnary containing the rasterized
        samir parameters and their scale factors
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Dr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. Dd0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    9. Zr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    10. E0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    11. Tr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    12. Dei0: ``np.ndarray``
        previous day surface layer depletion
        for irrigation part
Jeremy Auclair's avatar
Jeremy Auclair committed
    13. Dep0: ``np.ndarray``
        previous day surface layer depletion
        for precipitation part
Jeremy Auclair's avatar
Jeremy Auclair committed
    14. iday: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======
    1. current_day_outouts: `Tuple[np.ndarray]`
        multiple `numpy.ndarray` arrays are returned as a tuple for current day
    """
    
    # Create aliases
    DiffE_ = params['DiffE_']
    DiffR_ = params['DiffR_']
    FW_ = params['FW_']
    FmaxFC_ = params['FmaxFC_']
    Foffset_ = params['Foffset_']
    Fslope_ = params['Fslope_']
    Init_RU_ = params['Init_RU_']
    Irrig_auto_ = params['Irrig_auto_']
    Kcb_max_ = params['Kcb_max_']
    Kslope_ = params['Kslope_']
    Koffset_ = params['Koffset_']
    Kcb_stop_irrig_ = params['Kcb_stop_irrig_']
    REW_ = params['REW_']  # used in eval function to calculate Kri and Krp
    Ze_ = params['Ze_']
    Zsoil_ = params['Zsoil_']
    maxZr_ = params['maxZr_']
    minZr_ = params['minZr_']
    p_ = params['p_']
    
    s_DiffE = params['s_DiffE']
    s_DiffR = params['s_DiffR']
    s_FW = params['s_FW']
    s_FmaxFC = params['s_FmaxFC']
    s_Foffset = params['s_Foffset']
    s_Fslope = params['s_Fslope']
    s_Init_RU = params['s_Init_RU']
    s_Kcb_max = params['s_Kcb_max']
    s_Kslope = params['s_Kslope']
    s_Koffset = params['s_Koffset']
    s_Kcb_stop_irrig = params['s_Kcb_stop_irrig']
    s_REW = params['s_REW']  # used in eval function to calculate Kri and Krp
    s_Ze = params['s_Ze']
    s_Zsoil = params['s_Zsoil']
    s_maxZr = params['s_maxZr']
    s_minZr = params['s_minZr']
    s_p = params['s_p']
    
    # Frequently used parameters
    Ze = s_Ze * Ze_
    FW = s_FW * FW_

    # Update variables
    # Fraction cover
    # * Equation: Fslope * NDVI + Foffset
    FCov = s_Fslope * Fslope_ * NDVI + s_Foffset * Foffset_
    # * Equation: min(max(FCov, 0), FmaxFC)
    FCov = np.minimum(np.maximum(FCov, 0, dtype = np.float32), s_FmaxFC * FmaxFC_, dtype = np.float32)

    # Root depth upate
    # * Equation: Zr = max(minZr + (FCov / FmaxFC) * (maxZr - minZr), Ze + 0.001)
    Zr = np.maximum(s_minZr * minZr_ + (FCov / (s_FmaxFC * FmaxFC_)) * (s_maxZr * maxZr_ - s_minZr * minZr_), Ze + 0.001, dtype = np.float32)

    # Water capacities
    TAW = (Wfc - Wwp) * Zr
    TDW = (Wfc - Wwp) * (s_Zsoil * Zsoil_ - Zr)
    TEW = (Wfc - Wwp/2) * Ze
    RUE = (Wfc - Wwp) * Ze

    # Update depletions from root increase
    if iday == 0:
        Dei = RUE * (1 - s_Init_RU * Init_RU_)
        Dep = RUE * (1 - s_Init_RU * Init_RU_)
        Dr = TAW * (1 - s_Init_RU * Init_RU_)
        Dd = TDW * (1 - s_Init_RU * Init_RU_)
    else:
        Dei = Dei0
        Dep = Dep0
        Dr = update_Dr_from_root(Wfc, Wwp, Zr, s_Zsoil * Zsoil_, Dr0, Dd0, Zr0)
        Dd = update_Dd_from_root(Wfc, Wwp, Zr, s_Zsoil * Zsoil_, Dr0, Dd0, Zr0)
    
    # Kcb
    # * Equation: Kslope * NDVI + Koffset
    Kcb = np.minimum(np.maximum(s_Kslope * Kslope_ * NDVI + s_Koffset * Koffset_, 0, dtype = np.float32), s_Kcb_max * Kcb_max_, dtype = np.float32)

    # Irrigation 
    if iday == 0:  # First day of simulation
        Irrig = Irrig_auto_ * np.maximum(Dr - Rain, 0, dtype = np.float32)
        Irrig = np.where((Dr > TAW * s_p * p_) & (Kcb > np.maximum(Kcb, np.float32(1)) * s_Kcb_stop_irrig * Kcb_stop_irrig_), Irrig, np.float32(0))
    else:
        Irrig = calculate_irrig(Dr, TAW, s_p * p_, Rain, Kcb, Irrig_auto_, s_Kcb_stop_irrig * Kcb_stop_irrig_, E0, Tr0)

    # DP (Deep percolation)
    DP = - np.minimum(Dd + np.minimum(Dr - Rain - Irrig, 0, dtype = np.float32), 0, dtype = np.float32)

    # Update depletions with Rainfall and/or irrigation
    # De
    # * Equation: Dei = min(max(Dei - Rain - Irrig / FW, 0), TEW)
    Dei = np.minimum(np.maximum(Dei - Rain - Irrig / FW, 0, dtype = np.float32), TEW, dtype = np.float32)
    # * Equation: Dep = min(max(Dep - Rain - Irrig / FW, 0), TEW)
    Dep = np.minimum(np.maximum(Dep - Rain, 0, dtype = np.float32), TEW, dtype = np.float32)

    fewi = np.minimum(1 - FCov, FW, dtype = np.float32)
    fewp = 1 - FCov - fewi

    # * Equation: De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
    De = np.nansum([Dei * fewi, Dep * fewp], dtype = np.float32) / np.nansum([fewi, fewp], dtype = np.float32)
    # * Equation: De = where(De.isfinite, De, Dei * FW + Dep * (1 - FW))
    De = np.where(np.isfinite(De), De, Dei * FW + Dep * (1 - FW))

    # Update depletions from rain and irrigation
    Dr, Dd = np.minimum(np.maximum(Dr - Rain - Irrig, 0, dtype = np.float32), TAW, dtype = np.float32), np.minimum(np.maximum(Dd + np.minimum(Dr - Rain - Irrig, 0, dtype = np.float32), 0, dtype = np.float32), TDW, dtype = np.float32)

    # Diffusion coefficients
    # * Equation: check calculate_diff_re() and calculate_diff_dr functions
    Diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, Wfc, Ze, s_DiffE * DiffE_)
    Diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, Wfc, Ze, s_DiffE * DiffE_)
    Diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, Wfc, s_Zsoil * Zsoil_, s_DiffR * DiffR_)

    # Water Stress coefficient
    if iday == 0:
        Ks = np.minimum((TAW - Dr) / (TAW * (1 - s_p * p_)), 1, dtype = np.float32)
    else:
        # When not first day
        Ks = calculate_Ks(Dr, TAW, s_p * p_, E0, Tr0)

    # Reduction coefficient for evaporation
    # Create string expressions that are calculated later, uses less memory
    W = calculate_W(TEW, Dei, Dep, fewi, fewp)
    Kri = 'calculate_Kr(TEW, Dei, s_REW * REW_)'
    Krp = 'calculate_Kr(TEW, Dep, s_REW * REW_)'

    # * Equation: Kei = np.minimum(W * Kri * (Kcb_max - Kcb), fewi * Kcb_max)
    Kei = np.minimum(W * eval(Kri) * (s_Kcb_max * Kcb_max_ - Kcb), fewi * s_Kcb_max * Kcb_max_, dtype = np.float32)

    # * Equation: Kep = np.minimum((1 - W) * Krp * (Kcb_max - Kcb), fewp * Kcb_max)
    Kep = np.minimum((1 - W) * eval(Krp) * (s_Kcb_max * Kcb_max_ - Kcb), fewp * s_Kcb_max * Kcb_max_, dtype = np.float32)

    # Prepare coefficients for evapotranspiration
    Tei = calculate_Te(Dei, Dr, Ks, Kcb, s_Ze * Ze_, Zr, TEW, TAW, ET0)
    Tep = calculate_Te(Dep, Dr, Ks, Kcb, s_Ze * Ze_, Zr, TEW, TAW, ET0)