Skip to content
Snippets Groups Projects
modspa_samir.py 67.7 KiB
Newer Older
Jeremy Auclair's avatar
Jeremy Auclair committed
Usage of the SAMIR model in the Modspa framework. 

import xarray as xr  # to manage datasets
import rioxarray  # to better manage dataset projections
from numba import njit, types, boolean, uint8, uint16, int16, float32, int64, set_num_threads  # to compile functions in nopython mode for faster calculation
from numba.typed import List  #type: ignore, to type lists
import numpy as np  # for vectorized maths
import pandas as pd  # to manage dataframes
import netCDF4 as nc  # to efficiently read and write netCDF files
from tqdm import tqdm  # to show a progress bar
from psutil import virtual_memory  # to check available ram
from psutil import cpu_count  # to get number of physical cores available
from gc import collect  # to free up unused memory
Jeremy Auclair's avatar
Jeremy Auclair committed
from modspa_pixel.parameters.params_samir_class import samir_parameters  # to load SAMIR parameters
from modspa_pixel.source.code_toolbox import format_Byte_size  # to print memory requirements
def rasterize_samir_parameters(csv_param_file: str, land_cover_raster: str, irrigation_raster: str, init_RU_raster: str) -> tuple[np.ndarray, np.dtype]:
    Creates a dictionnary containing raster parameter values and the scale factors from the csv parameter file
    and the land cover raster. For each parameter, the function loops on land cover classes to fill the raster.
    
    Before creating the dictionnary, it updates the parameter ``DataFrame`` and verifies the parameter range with
    the ``test_samir_parameter()`` function.
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. csv_param_file: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. land_cover_raster: ``str``
    4. init_RU_raster: ``str``
        path to initial soil water content. None if no input.
    1. parameter_dict: ``Dict[str, np.ndarray]``
        the dictionnary containing all the rasterized Parameters
        and the scale factors

    min_max_param_file = os.path.join(os.path.dirname(csv_param_file), 'params_samir_min_max.csv')
    # Load samir params into an object
    table_param = samir_parameters(csv_param_file)
    class_count = table_param.table.shape[1] - 2  # remove dtype and Default columns
    land_cover = xr.open_dataarray(land_cover_raster).to_numpy()
    # Modify parameter table based on its content
    table_param.test_samir_parameter(min_max_param_file)
    # If test_samir_parameter returns 0, parameters are incorrect
    if type(table_param.table) != pd.DataFrame:
        import sys  # to exit script
        print(f'\nModify the SAMIR parameter file: {csv_param_file}\n')
        sys.exit()
    # Loop through parameters to count them
    count = 0
    for parameter in table_param.table.index[1:]:
        if parameter == 'Irrig_auto' and table_param.table.at[parameter, 'load_raster'] == 1:
            pass
        else:
            count+=1
    
    parameter_array = np.zeros((count,) + shape, dtype = np.float32, order = 'C')
    param_list = []

    i = 0
    for parameter in table_param.table.index[1:]:
        if parameter == 'Irrig_auto' and table_param.table.at[parameter, 'load_raster'] == 1:
            Irrig_auto = np.ascontiguousarray(xr.open_dataarray(irrigation_raster, decode_coords = 'all').astype(np.bool_).values, dtype = np.bool_)
            
        elif parameter == 'Init_RU' and table_param.table.at[parameter, 'load_raster'] == 1:
            Init_RU = (xr.open_dataarray(init_RU_raster) / 1000).astype(np.float32).values
            i+=1
            
        # If scale_factor == 0, then the parameter does not have to be spatialized, it can stay scalar
            parameter_array[i] = np.ones(shape = shape, dtype = np.float32) * np.float32(table_param.table.at[parameter, 'Default'])
            i+=1         
        if (parameter != 'Irrig_auto') and (parameter != 'Init_RU'):
            dtype = np.float32
            param_list.append(parameter)
        value = land_cover.copy().astype(dtype)
        
        # Loop on classes to set parameter values for each class
        for class_val, class_name in zip(range(1, class_count + 1), table_param.table.columns[2:]):
            
            # Get parameter value
            param_value = table_param.table.at[parameter, class_name]
            # Load parameter value for each class
            value = np.where(land_cover == class_val, np.float32(param_value), value)
        if parameter == 'Irrig_auto':
            Irrig_auto = value.astype(np.bool_)
        elif parameter == 'Init_RU':
            Init_RU = value.astype(np.float32)
        else:
            parameter_array[i] = value.astype(dtype)
            i+=1
    
    # Assure parameter array is contiguous in memory
    parameter_array = np.ascontiguousarray(parameter_array, dtype = np.float32)
    
    # Change param_list type
    param_list = List(param_list)
    return parameter_array, Irrig_auto, Init_RU, param_list
def prepare_output_dataset(ndvi_path: str, dimensions: dict[str, int], scaling_dict: dict[str, int] = {'E': 1000, 'Tr': 1000, 'SWCe': 1000, 'SWCr': 1000, 'DP': 100, 'Irr': 100}, additional_outputs: dict[str, int] = None) -> xr.Dataset:
    Creates the `xarray Dataset` containing the outputs of the SAMIR model that will be saved.
    Additional variables can be saved by adding their names to the `additional_outputs` list.

        frozen dictionnary containing the dimensions of the output dataset
    3. scaling_dict: ``Dict[str, int]`` ``default = {'E': 1000, 'Tr': 1000, 'SWCe': 1000, 'SWCr': 1000, 'DP': 100, 'Irr': 100}``
        scaling dictionnary for the nominal outputs
        list of additional variable names to be saved

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. model_outputs: ``xr.Dataset``
        model outputs to be saved
    """
    # Evaporation and Transpiraion
    model_outputs = xr.open_dataset(ndvi_path).drop_vars(['NDVI']).copy(deep = True)
    model_outputs = model_outputs.drop_sel(time = model_outputs.time)
    model_outputs.attrs['name'] = 'ModSpa Pixel SAMIR output'
    model_outputs.attrs['description'] = 'Outputs of the ModSpa SAMIR (FAO-56) outputs at the pixel scale. Variables are upscaled to be stored as integers.'
    model_outputs.attrs['scaling'] = str(scaling_dict)
    model_outputs['E'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    model_outputs['E'].attrs['standard_name'] = 'Evaporation'
    model_outputs['E'].attrs['description'] = 'Accumulated daily evaporation in milimeters'
    model_outputs['E'].attrs['scale factor'] = scaling_dict['E']
    model_outputs['Tr'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    model_outputs['Tr'].attrs['standard_name'] = 'Transpiration'
    model_outputs['Tr'].attrs['description'] = 'Accumulated daily plant transpiration in milimeters'
    model_outputs['Tr'].attrs['scale factor'] = scaling_dict['Tr']
    # Soil Water Content
    model_outputs['SWCe'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    model_outputs['SWCe'].attrs['standard_name'] = 'Soil Water Content of the evaporative layer'
    model_outputs['SWCe'].attrs['description'] = 'Soil water content of the evaporative layer in milimeters'
    model_outputs['SWCe'].attrs['scale factor'] = scaling_dict['SWCe']
    model_outputs['SWCr'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    model_outputs['SWCr'].attrs['standard_name'] = 'Soil Water Content of the root layer'
    model_outputs['SWCr'].attrs['description'] = 'Soil water content of the root layer in milimeters'
    model_outputs['SWCr'].attrs['scale factor'] = scaling_dict['SWCr']
    # Irrigation
    model_outputs['Irr'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    model_outputs['Irr'].attrs['standard_name'] = 'Irrigation'
    model_outputs['Irr'].attrs['description'] = 'Simulated daily irrigation in milimeters'
    model_outputs['Irr'].attrs['scale factor'] = scaling_dict['Irr']
    # Deep Percolation
    model_outputs['DP'] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.uint16))
    model_outputs['DP'].attrs['standard_name'] = 'Deep Percolation'
    model_outputs['DP'].attrs['description'] = 'Simulated daily Deep Percolation in milimeters'
    model_outputs['DP'].attrs['scale factor'] = scaling_dict['DP']
    if additional_outputs:
        for var, scale in zip(additional_outputs.keys(), additional_outputs.values()):
            model_outputs[var] = (dimensions, np.zeros(tuple(dimensions[d] for d in list(dimensions)), dtype = np.int16))
            model_outputs[var].attrs['scale factor'] = scale
    return model_outputs


@njit(nogil = True, cache = True)
def find_index(numba_list: list, value: any) -> int:
    """
    Equivalent .index() method of python lists adapted for numba lists.

    Arguments
    =========

    1. numba_list: ``list``
        input numba typed list
    2. value: ``any``
        value to find in list

    Returns
    =======

    1. index: ``int``
        index of value in list
    """
    
    # Loop through list elements
    for i in range(len(numba_list)):
        if numba_list[i] == value:
            # Return index of value in numba_list
            return i
    
    # Return -1 if element is not found
    return -1


@njit((float32[:,:,:], types.ListType(types.unicode_type), uint8[:,:], float32[:,:], float32[:,:], float32[:,:], int64, int64), nogil = True, parallel = True, fastmath = True, cache = True)
def get_starting_conditions(parameter_array: np.ndarray, param_list: list[str], NDVI: np.ndarray, Init_RU: np.ndarray, Wfc: np.ndarray, Wwp: np.ndarray, y_size: int, x_size: int) -> tuple[np.ndarray]:
    """
    Calculate SAMIR starting variables.

    Arguments
    =========

    1. parameter_array: ``np.ndarray``
        parameter array
    2. param_list: ``list[str]``
        list of param names
    3. NDVI: ``np.ndarray``
        NDVI for first date
    4. Init_RU: ``np.ndarray``
        initial RU
    5. Wfc: ``np.ndarray``
        field capacity array
    6. Wwp: ``np.ndarray``
        weilting point array
    7. y_size: ``int``
        y_size of arrays
    8. x_size: ``int``
        x_size of arrays

    Returns
    =======

    1. output_arrays: ``tuple[np.ndarray]``
        E0, Tr0, Zr0, Dei0, Dep0, Dr0, Dd0, Irrig_test
    """
    
    E0 = np.zeros((y_size, x_size), dtype = np.uint16)
    Tr0 = np.zeros((y_size, x_size), dtype = np.uint16)
    Irrig_test = np.zeros((y_size, x_size), dtype = np.bool_)
    Zr0 = np.maximum(parameter_array[find_index(param_list, 'minZr')] + (np.minimum(np.maximum(parameter_array[find_index(param_list, 'Fslope')] * (NDVI / np.float32(255)).astype(np.float32) + parameter_array[find_index(param_list, 'Foffset')], np.float32(0)), parameter_array[find_index(param_list, 'FCmax')]) / parameter_array[find_index(param_list, 'FCmax')]) * (parameter_array[find_index(param_list, 'maxZr')] - parameter_array[find_index(param_list, 'minZr')]), parameter_array[find_index(param_list, 'Ze')] + np.float32(0.001))
    Dei0 = (Wfc - Wwp) * parameter_array[find_index(param_list, 'Ze')] * (np.float32(1) - Init_RU)
    Dep0 = (Wfc - Wwp) * parameter_array[find_index(param_list, 'Ze')] * (np.float32(1) - Init_RU)
    Dr0 = (Wfc - Wwp) * Zr0 * (np.float32(1) - Init_RU)
    Dd0 = (Wfc - Wwp) * (parameter_array[find_index(param_list, 'Zsoil')] - Zr0) * (np.float32(1) - Init_RU)
    
    return E0, Tr0, Zr0, Dei0, Dep0, Dr0, Dd0, Irrig_test


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_diff_re(TAW: np.ndarray, Dr: np.ndarray, Zr: np.ndarray, RUE: np.ndarray, De: np.ndarray, Wfc: np.ndarray, Ze: np.ndarray, DiffE: np.ndarray) -> np.ndarray:
    Calculates the diffusion between the top soil layer and the root layer. Uses numba for faster and parallel calculation.
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. RUE: ``np.ndarray``
        total available surface water = (Wfc-Wwp)*Ze
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. De: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Ze: ``np.ndarray``
        height of evaporative layer (paramter)
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. DiffE: ``np.ndarray``
        diffusion coefficient between evaporative
        and root layers (unitless, parameter)
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. diff_re: ``np.ndarray``
        the diffusion between the top soil layer and
        the root layer
    """
    # Temporary variables to make calculation easier to read
    # TODO: check equation
    tmp1 = (((TAW - Dr) / Zr - (RUE - De) / Ze) / Wfc) * DiffE
    tmp2 = ((TAW * Ze) - (RUE - De - Dr) * Zr) / (Zr + Ze) - Dr
    
    # Calculate diffusion according to SAMIR equation
    # Return zero values where the 'DiffE' parameter is equal to 0
    return np.where(DiffE == 0, np.float32(0), np.where(tmp1 < 0, np.maximum(tmp1, tmp2), np.minimum(tmp1, tmp2)))


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_diff_dr(TAW: np.ndarray, TDW: np.ndarray, Dr: np.ndarray, Zr: np.ndarray, Dd: np.ndarray, Wfc: np.ndarray, Zsoil: np.ndarray, DiffR: np.ndarray) -> np.ndarray:
    Calculates the diffusion between the root layer and the deep layer. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. TDW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Dd: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Zsoil: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. DiffR: ``np.ndarray``
        Diffusion coefficient between root
        and deep layers (unitless, parameter)

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Diff_dr: ``np.ndarray``
        the diffusion between the root layer and the
        deep layer
    """

    # Temporary variables to make calculation easier to read
    tmp1 = (((TDW - Dd) / (Zsoil - Zr) - (TAW - Dr) / Zr) / Wfc) * DiffR
    tmp2 = ((TDW * Zr - (TAW - Dr - Dd) * (Zsoil - Zr)) / Zsoil) - Dd

    # Calculate diffusion according to SAMIR equation
    # Return zero values where the 'DiffR' parameter is equal to 0
    return np.where(DiffR == 0, np.float32(0), np.where(tmp1 < 0, np.maximum(tmp1, tmp2), np.minimum(tmp1, tmp2)))


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_W(TEW: np.ndarray, Dei: np.ndarray, Dep: np.ndarray, fewi: np.ndarray, fewp: np.ndarray) -> np.ndarray:
    Calculate W, the weighting factor to split the energy available
    for evaporation depending on the difference in water availability
    in the two evaporation components, ensuring that the larger and
    the wetter, the more the evaporation occurs from that component.
    Uses numba for faster and parallel calculation.
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TEW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Dei: ``np.ndarray``
        depletion of the evaporative layer
        (irrigation part)
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Dep: ``np.ndarray``
        depletion of the evaporative layer
        (precipitation part)
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. fewi: ``np.ndarray``
        soil fraction which is wetted by irrigation
        and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. fewp: ``np.ndarray``
        soil fraction which is wetted by precipitation
        and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. W: ``np.ndarray``

    # Calculate the weighting factor to split the energy available for evaporation
    # Equation: W = 1 / (1 + (fewp * (TEW - Dep) / fewi * (TEW - Dei)))
    # Equation: W = where(fewi * (TEW - Dei) > 0, W, 0)
    return np.where(fewi * (TEW - Dei) > 0, 1 / (1 + (fewp * (TEW - Dep) / fewi * (TEW - Dei))), np.float32(0))


@njit((float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_Kr(TEW: np.ndarray, De: np.ndarray, REW: np.ndarray) -> np.ndarray:
    calculates of the reduction coefficient for evaporation dependent
    on the amount of water in the soil using the FAO-56 method. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. TEW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. De: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. REW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Kr: ``np.ndarray``
        Kr coefficient
    """

    # Return Kr
    return np.maximum(np.float32(0), np.minimum((TEW - De) / (TEW - REW), np.float32(1)))


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_Ks(Dr: np.ndarray, TAW: np.ndarray, p: np.ndarray, E0: np.ndarray, Tr0: np.ndarray) -> np.ndarray:
    Calculate Ks coefficient after day 1. Uses numba for faster and parallel calculation.
Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. p: ``np.ndarray``
        fraction of TAW available for plant without inducing stress
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. E0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Tr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. Ks: ``np.ndarray``
    # Equation: Ks = min((TAW - Dr) / (TAW * (1 - (p + 0.04 * (5 - (E0 + Tr0))))), 1)
    return np.minimum((TAW - Dr) / (TAW * (np.float32(1) - (p + 0.04 * (np.float32(5) - (E0 + Tr0))))), np.float32(1)).astype(np.float32)


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_Ke(W: np.ndarray, De: np.ndarray, TEW: np.ndarray, REW: np.ndarray, Kcmax: np.ndarray, Kcb: np.ndarray, few: np.ndarray) -> np.ndarray:
    """
    Calculate the evaporation Ke coefficient.

    Arguments
    =========

    1. W: ``np.ndarray``
        weighting factor to split the energy available
        for evaporation
    2. De: ``np.ndarray``
        Dei or Dep, depletion of the evaporative layer
    3. TEW: ``np.ndarray``
        water capacity of the evaporative layer
    4. REW: ``np.ndarray``
        fixed readily evaporable water
    5. Kcmax: ``np.float32``
        maximum possible evaporation in the atmosphere (scalar)
    6. Kcb: ``np.ndarray``
        crop coefficient
    7. few: ``np.ndarray``
        fewi or fewp, soil fraction which is wetted by
        irrigation or precipitation and exposed to evaporation

    Returns
    =======

    1. Ke: ``np.ndarray``
        evaporation coefficient
    """
    
    # Equation: Kei = np.minimum(W * Kri * (Kc_max - Kcb), fewi * Kc_max)
    # Equation: Kep = np.minimum((1 - W) * Krp * (Kc_max - Kcb), fewp * Kc_max)
    return np.minimum(W * calculate_Kr(TEW, De, REW) * (Kcmax - Kcb), few * Kcmax)
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], boolean[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], boolean[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_irrig(Dr: np.ndarray, TAW: np.ndarray, Rain: np.ndarray, Kcb: np.ndarray, Irrig_auto: np.ndarray, Lame_max: np.ndarray, Lame_min: np.ndarray, Kcb_min_start_Irrig: np.ndarray, frac_Kcb_stop_irrig: np.ndarray, Irrig_test: np.ndarray, frac_TAW: np.ndarray, Kcb_max_obs: np.ndarray) -> np.ndarray:
    Calculate automatic irrigation after day one. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. TAW: ``np.ndarray``
    5. Irrig_auto: ``np.ndarray`` ``boolean``
        maximum amount of possible irrigation in mm
        minimum value of Kcb under which no irrigation is allowed
        Kcb threshold to stop irrigation after reaching maximum
        boolean array that is true after the Kcb has reached
        frac_Kcb_stop_irrig of its maximum
        fraction of depleted TAW under which irrigation is triggered
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. Irrig: ``np.ndarray``
    # First step: calculate irrigation to fill the depletion up to a maximum value of Lame_max
    Irrig = Irrig_auto * np.maximum(np.minimum(Dr - Rain, Lame_max), Lame_min)
    
    # Second step: only keep irrigation where the depletion is higher than a fraction (frac_TAW) of TAW and Kcb is higher than a minimum value
    Irrig = np.where((Dr > frac_TAW * TAW) & (Kcb > Kcb_min_start_Irrig), Irrig, np.float32(0))
    
    # Third step: if Kcb has already gone higher than a fraction (frac_Kcb_stop_irrig) of its maximum value,
    # turn off irrigation when goes under that fraction of maximum Kcb
    return np.where((Irrig_test) & (Kcb < frac_Kcb_stop_irrig * Kcb_max_obs), np.float32(0), Irrig)
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_Te(De: np.ndarray, Dr: np.ndarray, Ks: np.ndarray, Kcb: np.ndarray, Ze: np.ndarray, Zr: np.ndarray, TEW: np.ndarray, TAW: np.ndarray, ET0: np.ndarray) -> np.ndarray:
    Calculate Te (root uptake of water) coefficient for current day. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. De: ``np.ndarray``
        Dei or Dep, depletion of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Dr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Ks: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Kcb: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Ze: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. TEW: ``np.ndarray``
        water capacity of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    8. TAW: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    9. ET0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. Te: ``np.ndarray``
    # Equation: Kt = min( ((Ze / Zr)**0.6) * (1 - De/TEW) / max(1 - Dr / TAW, 0.01), 1)
    # Equation: Te = Kt * Ks * Kcb * ET0
    return (np.minimum(((Ze / Zr) ** 0.6) * (np.float32(1) - De / TEW) / np.maximum(np.float32(1) - Dr / TAW, 0.001), np.float32(1)) * Ks * Kcb * ET0).astype(np.float32)
@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def update_De_from_Diff(De : np.ndarray, few: np.ndarray, Ke: np.ndarray, Te: np.ndarray, Diff_re: np.ndarray, TEW: np.ndarray, ET0: np.ndarray) -> np.ndarray:
    Last update step for Dei and Dep depletions. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. De: ``np.ndarray``
        Dei or Dep, depletion of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. few: ``np.ndarray``
        fewi or fewp, soil fraction which is wetted by
        irrigation or precipitation and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Ke: ``np.ndarray``
        Kei or Kep, evaporation coefficient for soil fraction
        irrigated or rainfed and exposed to evaporation
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Te: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Diff_re: ``np.ndarray``
        dffusion between the root and evaporation layers
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. TEW: ``np.ndarray``
        water capacity of the evaporative layer
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. ET0: ``np.ndarray``
        reference evapotranspiration of the current day

Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    De: ``np.ndarray``
    # Equation: De = where(few > 0, min(max(De + ET0 * Ke / few + Te - Diff_re, 0), TEW), min(max(De + Te - Diff_re, 0), TEW))
    return np.where(few > np.float32(0), np.minimum(np.maximum(De + ET0 * Ke / few + Te - Diff_re, np.float32(0)), TEW), np.minimum(np.maximum(De + Te - Diff_re, np.float32(0)), TEW))
        

@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def update_Dr_from_root(Wfc: np.ndarray, Wwp: np.ndarray, Zr: np.ndarray, Zsoil: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray) -> np.ndarray:
    Return the updated depletion for the root layer. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Wwp: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Zsoil: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Dr0: ``np.ndarray``
        depletion of the root layer for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Dd0: ``np.ndarray``
        depletion of the deep laye for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Zr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. output: ``np.ndarray``
    # Temporary variables to make calculation easier to read
    tmp1 = np.maximum(Dr0 + Dd0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * (Zsoil - Zr0)), np.float32(0))
    tmp2 = np.maximum(Dr0 + Dr0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * Zr0), np.float32(0))

    # Return updated Dr
    return np.where(Zr > Zr0, tmp1, tmp2)


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def update_Dd_from_root(Wfc: np.ndarray, Wwp: np.ndarray, Zr: np.ndarray, Zsoil: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray) -> np.ndarray:
    Return the updated depletion for the deep layer. Uses numba for faster and parallel calculation.

Jeremy Auclair's avatar
Jeremy Auclair committed
    1. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Wwp: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Zr: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Zsoil: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Dr0: ``np.ndarray``
        depletion of the root layer for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. Dd0: ``np.ndarray``
        depletion of the deep laye for previous day
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. Zr0: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    1. output: ``np.ndarray``
        updated depletion for the deep layer
    """

    # Temporary variables to make calculation easier to read
    tmp1 = np.maximum(Dd0 - Dd0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * (Zsoil - Zr0)), np.float32(0))
    tmp2 = np.maximum(Dd0 - Dr0 * ((Wfc - Wwp) * (Zr - Zr0)) / ((Wfc - Wwp) * Zr0), np.float32(0))

    # Return updated Dd
    return np.where(Zr > Zr0, tmp1, tmp2)


@njit((float32[:,:], float32[:,:], float32[:,:], float32[:,:], float32[:,:]), nogil = True, parallel = True, fastmath = True, cache = True)
def calculate_SWCe(Dei: np.ndarray, Dep: np.ndarray, fewi: np.ndarray, fewp: np.ndarray, TEW: np.ndarray) -> np.ndarray:
    """
    Calculate the soil water content of the evaporative layer.

    Arguments
    =========

    1. Dei: ``np.ndarray``
        depletion of the evaporative layer for the irrigated part
    2. Dep: ``np.ndarray``
        depletion of the evaporative layer for the rainfed part
    3. fewi: ``np.ndarray``
        soil fraction which is wetted by irrigation and exposed
        to evaporation
    4. fewp: ``np.ndarray``
        soil fraction which is wetted by precipitation and exposed
        to evaporation
    5. TEW: ``np.ndarray``
        water capacity of the evaporative layer

    Returns
    =======

    1. SWCe: ``np.ndarray``
        soil water content of the evaporative layer
    """
    
    # Return SWCe
    return np.where((fewi + fewp) > np.float32(0), np.float32(1) - ((Dei * fewi + Dep * fewp) / ((fewi + fewp) * TEW)), np.float32(1) - ((Dei + Dep) / (np.float32(2) * TEW)))
def calculate_memory_requirement(x_size: int, y_size: int, time_size: int, nb_inputs: int, nb_outputs: int, nb_variables: int, nb_params: int, nb_bytes: int) -> float:
    Calculate memory requirement (GiB) of calculation if all datasets where loaded in memory.
    Used to determine how to divide the datasets in times chunks for more efficient I/O
    operations.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. x_size: ``int``
        x size of dataset (pixels)
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. y_size: ``int``
        y size of dataset (pixels)
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. time_size: ``int``
        number of time bands (dates)
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. nb_inputs: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. nb_outputs: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    6. nb_variables: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    7. nb_params: ``int``
        number of bytes of datatype for inputs and outputs
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. total_memory_requirement: ``float``
        calculation memory requirement in GiB
    """
    # Memory requirement of input datasets
    input_memory_requirement = (x_size * y_size * time_size * nb_inputs * nb_bytes) / (1024**3)
    # Memory requirement of calculation variables
    calculation_memory_requirement = (x_size * y_size * (nb_params * 4 + nb_variables * 4)) / (1024**3)  # calculation done in float32, params in float32
    output_memory_requirement = (x_size * y_size * time_size * nb_outputs * nb_bytes) / (1024**3)
    total_memory_requirement = (input_memory_requirement + calculation_memory_requirement + output_memory_requirement) * 1.05  # 5% adjustment factor
def calculate_time_slices_to_load(x_size: int, y_size: int, time_size: int, nb_inputs: int, nb_outputs: int, nb_variables: int, nb_params: int, nb_bytes: int, available_ram: int) -> tuple[int, int, bool]:
    Calculate how many time slices to load in memory (for input and output data)
    based on available ram and calculation requirements.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. x_size: ``int``
        x size of dataset (pixels)
    2. y_size: ``int``
        y size of dataset (pixels)
    3. time_size: ``int``
        number of time bands (dates)
    4. nb_inputs: ``int``
        number of input variables
    5. nb_outputs: ``int``
        number of ouput variables
    6. nb_variables: ``int``
        number of calculation variables
    7. nb_params: ``int``
        number of raster parameters
    8. nb_bytes: ``int``
        number of bytes of datatype for inputs and outputs
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. time_slice: ``int``
        number of times slices to load for
        input and output data
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. remainder_to_load: ``int``
        remainder of euclidian division for
        the number of time slices to load
        (last block of data to load)
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. already_loaded: ``bool``
        used to know wheather data has been loaded
        when the whole time values fit in memory, not
        used otherwise
    """
    
    if calculate_memory_requirement(x_size, y_size, 1, nb_inputs, nb_outputs, nb_variables, nb_params, nb_bytes) > available_ram:
        import sys
        print('Calculation area is too large for available memory.')
        sys.exit()
    
    # Boolean to get out of while loop
    not_finished = True
    # Increase divisor by one at every loop
    while not_finished and divisor < time_size / 2:
        memory_requirement = calculate_memory_requirement(x_size, y_size, time_size // divisor, nb_inputs, nb_outputs, nb_variables, nb_params, nb_bytes)
        
        # Test if memory requirement divided by divisor fits in the available ram
            # If all the inputs and outputs can be loaded
            if divisor == 1:
                time_slice = time_size  # whole dataset is loaded
                remainder_to_load = None  # no remainder to load
                already_loaded = False  # this boolean is set to true once the whole inputs have been loaded
                not_finished = False  # boolean set to false to get out of while loop
                
                return time_slice, remainder_to_load, already_loaded
            
            # Otherwise return the number of time  bands that can be loaded
                remainder_to_load = time_size % time_slice  # remainder, used to load the last block of inputs
                already_loaded = None  # this boolean is set to None if the whole dataset can't be loaded
                not_finished = False  # boolean set to false to get out of while loop
                
                return time_slice, remainder_to_load, already_loaded
    
    # If dataset is to big, load only one time slice per loop
    time_slice = 1
    remainder_to_load = 1  # in order to correctly save last date
    already_loaded = None  # this boolean is set to None if the whole dataset can't be loaded
        
    return time_slice, remainder_to_load, already_loaded


def get_empty_arrays(x_size: int, y_size: int, time_size: int, nb_arrays: int, array: bool = False) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    Short function to make the `run_samir()` function easier to read.
    Generates a varying number (`nb_arrays`) of empty numpy arrays of shape 
    `(x_size, y_size, time_size)` in list or tuple mode. Used to
    store variable values before writing in in the output file.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. x_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. y_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. time_size: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. nb_arrays: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    output: ``Tuple[np.ndarray * nb_arrays]`` or ``List[np.ndarray * nb_arrays]``
        output empty arrays
    """
    
    # Return empty arrays into a list
    if array:
        # return [np.empty((time_size, y_size, x_size), dtype = np.int16) for k in range(nb_arrays)]
        return np.empty((nb_arrays, time_size, y_size, x_size), dtype = np.int16, order = 'C')
    return tuple([np.empty((time_size, y_size, x_size), dtype = np.uint16) for k in range(nb_arrays)])
def read_inputs(ndvi_cube_path: str, weather_path: str, i: int, time_slice: int, load_all: bool = False) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    Read input data into numpy arrays based on loop conditions.
Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. ndvi_cube_path: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. weather_path: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. i: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. time_slice: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. load_all: ``bool`` ``default = False``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    1. NDVI: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. Rain: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. ET0: ``np.ndarray``
        
    """
    
    # Load whole dataset
    if load_all:
        
        with nc.Dataset(ndvi_cube_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, y, x)
            ds.variables['NDVI'].set_auto_mask(False)
            NDVI = ds.variables['NDVI'][:, :, :]
        with nc.Dataset(weather_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, y, x)
            ds.variables['Rain'].set_auto_mask(False)
            Rain = ds.variables['Rain'][:, :, :]
            ds.variables['ET0'].set_auto_mask(False)
            ET0 = ds.variables['ET0'][:, :, :]
    
    # Load given number of time slices
    else:
        
        with nc.Dataset(ndvi_cube_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, y, x)
            ds.variables['NDVI'].set_auto_mask(False)
            NDVI = ds.variables['NDVI'][i: i + time_slice, :, :]
        with nc.Dataset(weather_path, mode='r') as ds:
            # Dimensions of ndvi dataset : (time, y, x)
            ds.variables['Rain'].set_auto_mask(False)
            Rain = ds.variables['Rain'][i: i + time_slice, :, :]
            ds.variables['ET0'].set_auto_mask(False)
            ET0 = ds.variables['ET0'][i: i + time_slice, :, :]
def write_outputs(save_path: str, DP: np.ndarray, SWCe: np.ndarray, SWCr: np.ndarray, E: np.ndarray, Tr: np.ndarray, Irrig: np.ndarray, additional_outputs: dict[str, int], additional_outputs_data: np.ndarray, i: int, time_slice: int, write_all = False) -> None:
    Write outputs to netcdf file based on conditions of current loop.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========

    1. save_path: ``str``
Jeremy Auclair's avatar
Jeremy Auclair committed
    2. DP: ``np.ndarray``
        deep percolaton ``np.ndarray``
    3. SWCe: ``np.ndarray``
        soil water content of evaporative layer ``np.ndarray``
    4. SWCr: ``np.ndarray``
        soil water content of root layer ``np.ndarray``
    5. E: ``np.ndarray``
        surface evaporation ``np.ndarray``
    6. Tr: ``np.ndarray``
        plant transpiration ``np.ndarray``
    7. Irrig: ``np.ndarray``
        simulated irrigation ``np.ndarray``
        dictionnary containing additional outputs and their scale factors
Jeremy Auclair's avatar
Jeremy Auclair committed
    9. additional_outputs_data: ``List[np.ndarray]``
        list of additional output ``np.ndarray``. Is ``None`` if no additional ouputs
    10. i: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    11. time_slice: ``int``
Jeremy Auclair's avatar
Jeremy Auclair committed
    12. write_all: ``bool`` ``default = False``
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======

    ``None``
        with nc.Dataset(save_path, mode = 'a') as outputs:
            # Dimensions of output dataset : (time, y, x)
            # Soil water content of the evaporative layer
            outputs.variables['SWCe'][:, :, :] = SWCe
            # Soil water content of the root layer
            outputs.variables['SWCr'][:, :, :] = SWCr
            outputs.variables['Irr'][:, :, :] = Irrig
            # Additionnal outputs
            if additional_outputs:
                k = 0
                    outputs.variables[var][:, :, :] = additional_outputs_data[k,:,:,:]
        with nc.Dataset(save_path, mode = 'a') as outputs:
            # Dimensions of output dataset : (time, y, x)
            outputs.variables['DP'][i - time_slice + 1: i + 1, :, :] = DP
            # Soil water content of the evaporative layer
            outputs.variables['SWCe'][i - time_slice + 1: i + 1, :, :] = SWCe
            # Soil water content of the root layer
            outputs.variables['SWCr'][i - time_slice + 1: i + 1, :, :] = SWCr
            outputs.variables['E'][i - time_slice + 1: i + 1, :, :] = E
            outputs.variables['Tr'][i - time_slice + 1: i + 1, :, :] = Tr
            outputs.variables['Irr'][i - time_slice + 1: i + 1, :, :] = Irrig
            # Additionnal outputs
            if additional_outputs:
                k=0
                    outputs.variables[var][i - time_slice + 1: i + 1, :, :] = additional_outputs_data[k,:,:,:]
@njit((uint8[:, :], uint16[:, :], uint16[:, :], float32[:, :], float32[:, :], float32[:, :], boolean[:, :], boolean[:, :], float32[:, :], float32[:, :], float32[:, :], uint16[:, :], uint16[:, :], float32[:, :], float32[:, :], int64, float32[:, :, :], types.ListType(types.unicode_type), types.ListType(types.unicode_type), int16[:], boolean, int16[:], types.ListType(types.unicode_type), int16[:], int16[:, :, :, :], int64), nogil = True, parallel = True, fastmath = True, cache = True)
def samir_daily(NDVI: np.ndarray, ET0: np.ndarray, Rain: np.ndarray, Wfc: np.ndarray, Wwp: np.ndarray, Kcb_max_obs: np.ndarray, Irrig_auto: np.ndarray, Irrig_test: np.ndarray, Dr0: np.ndarray, Dd0: np.ndarray, Zr0: np.ndarray, E0: np.ndarray, Tr0: np.ndarray, Dei0: np.ndarray, Dep0: np.ndarray, iday: int, params: np.ndarray, param_list: list[str], scaling_names: list[str], scaling_array: np.ndarray, write_additional_data: bool, additional_outputs: np.ndarray, additional_names: list[str], additional_idx: list[int], additional_outputs_data: np.ndarray, time_slice: int) -> tuple[np.ndarray]:
    Run the SAMIR model on a single day. Inputs and outputs are `numpy.ndarray`.
    Calls functions compiled with numba for intermediary calculations.

Jeremy Auclair's avatar
Jeremy Auclair committed
    Arguments
    =========
Jeremy Auclair's avatar
Jeremy Auclair committed
    3. Rain: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    4. Wfc: ``np.ndarray``
Jeremy Auclair's avatar
Jeremy Auclair committed
    5. Wwp: ``np.ndarray``
    6. Kcb_max_obs: ``np.ndarray``
        observed maximum value of Kcb 
    7. Irrig_test: ``np.ndarray``
        boolean array that is true after the Kcb has reached 80% 
        of its maximum
        dictionnary containing the rasterized
        samir parameters and their scale factors
        previous day surface layer depletion
        for irrigation part
        previous day surface layer depletion
        for precipitation part
        scaling dictionnary for the nominal outputs
    18. additional_names: ``list[str]``
        list of names of additional outputs
    19. field_indices: ``list[int]``
        list of indices corresponding to additional names
    20. additional_outputs: ``np.ndarray``
        dictionary containing the names of additional
        output data and their integer scale
        list of numpy arrays containing the scaled values of
        the additional outputs
        value of the current time slice, used to
        correctly write additional output data
Jeremy Auclair's avatar
Jeremy Auclair committed
    Returns
    =======
    1. current_day_outputs: `Tuple[np.ndarray]`
        multiple `numpy.ndarray` arrays are returned as a tuple for current day
    """
    
    # Create aliases
    DiffE = params[find_index(param_list, 'DiffE')]
    DiffR = params[find_index(param_list, 'DiffR')]
    FW = params[find_index(param_list, 'FW')]
    FCmax = params[find_index(param_list, 'FCmax')]
    Foffset = params[find_index(param_list, 'Foffset')]
    Fslope = params[find_index(param_list, 'Fslope')]
    frac_TAW = params[find_index(param_list, 'frac_TAW')]
    Init_RU = params[find_index(param_list, 'Init_RU')]
    Kcbmax = params[find_index(param_list, 'Kcbmax')]
    Kcmax = params[find_index(param_list, 'Kcmax')]
    Kslope = params[find_index(param_list, 'Kslope')]
    Koffset= params[find_index(param_list, 'Koffset')]
    Kcb_min_start_irrig = params[find_index(param_list, 'Kcb_min_start_irrig')]
    frac_Kcb_stop_irrig = params[find_index(param_list, 'frac_Kcb_stop_irrig')]
    Lame_max = params[find_index(param_list, 'Lame_max')]
    Lame_min = params[find_index(param_list, 'Lame_min')]
    REW = params[find_index(param_list, 'REW')]
    Ze = params[find_index(param_list, 'Ze')]
    Zsoil = params[find_index(param_list, 'Zsoil')]
    minZr = params[find_index(param_list, 'minZr')]
    maxZr = params[find_index(param_list, 'maxZr')]
    p = params[find_index(param_list, 'p')]
    NDVI = NDVI.astype(np.float32) / np.float32(255)
    Rain = Rain.astype(np.float32) / np.float32(100)
    ET0 = ET0.astype(np.float32) / np.float32(1000)
    E0 = E0.astype(np.float32) / scaling_array[find_index(scaling_names, 'E')]
    Tr0 = Tr0.astype(np.float32) / scaling_array[find_index(scaling_names, 'Tr')]
    # Equation: Fslope * NDVI + Foffset
    # Equation: min(max(FCov, 0), FCmax)
    FCov = np.minimum(np.maximum(FCov, np.float32(0)), FCmax)
    # Equation: Zr = max(minZr + (FCov / FCmax) * (maxZr - minZr), Ze + 0.001)
    Zr = np.maximum(minZr + (FCov / FCmax) * (maxZr - minZr), Ze + np.float32(0.001))
    TDW = (Wfc - Wwp) * (Zsoil - Zr)
    TEW = (Wfc - (Wwp / np.float32(2))) * Ze
    RUE = (Wfc - Wwp) * Ze

    # Update depletions from root increase
    Dei = Dei0
    Dep = Dep0
    Dr = update_Dr_from_root(Wfc, Wwp, Zr, Zsoil, Dr0, Dd0, Zr0)
    Dd = update_Dd_from_root(Wfc, Wwp, Zr, Zsoil, Dr0, Dd0, Zr0)
    # Equation: Kslope * NDVI + Koffset
    Kcb = np.minimum(np.maximum(Kslope * NDVI + Koffset, np.float32(0)), Kcbmax)
    Irrig_test = np.where(np.invert(Irrig_test) & (Kcb > frac_Kcb_stop_irrig * Kcb_max_obs), True, Irrig_test)  # set Irrig_test to true when Kcb goes over Kcb_stop_irrig fraction of maximum and stay true
    Irrig = calculate_irrig(Dr, TAW, Rain, Kcb, Irrig_auto, Lame_max, Lame_min, Kcb_min_start_irrig, frac_Kcb_stop_irrig, Irrig_test, frac_TAW, Kcb_max_obs)
    # Create temporary variable used multiple times
    DP = - np.minimum(Dd + np.minimum(temp, np.float32(0)), np.float32(0))

    # Update depletions with Rainfall and/or irrigation
    # Equation: Dei = min(max(Dei - Rain - Irrig / FW, 0), TEW)
    Dei = np.minimum(np.maximum(Dei - Rain - (Irrig / FW), np.float32(0)), TEW)
    # Equation: Dep = min(max(Dep - Rain, 0), TEW)
    Dep = np.minimum(np.maximum(Dep - Rain, np.float32(0)), TEW)
    fewi = np.minimum(np.float32(1) - FCov, FW)
    fewp = np.float32(1) - FCov - fewi
    # Equation: De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
    # Equation: De = where(De.isfinite, De, Dei * FW + Dep * (1 - FW))
    De = np.where(np.isfinite(De), De, Dei * FW + Dep * (np.float32(1) - FW))

    # Update depletions from rain and irrigation
    Dr = np.minimum(np.maximum(temp, np.float32(0)), TAW)
    Dd = np.minimum(np.maximum(Dd + np.minimum(temp, np.float32(0)), np.float32(0)), TDW)
    temp = False # remove temp variable
    # Equation: check calculate_diff_re() and calculate_diff_dr functions
    Diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, Wfc, Ze, DiffE)
    Diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, Wfc, Ze, DiffE)
    Diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, Wfc, Zsoil, DiffR)
    
        Ks = np.minimum((TAW - Dr) / (TAW * (np.float32(1) - p)), np.float32(1))

    # Reduction coefficient for evaporation
    W = calculate_W(TEW, Dei, Dep, fewi, fewp)

    # Equation: Kei = np.minimum(W * Kri * (Kc_max - Kcb), fewi * Kc_max)
    Kei = calculate_Ke(W, Dei, TEW, REW, Kcmax, Kcb, fewi)
    # Equation: Kep = np.minimum((1 - W) * Krp * (Kc_max - Kcb), fewp * Kc_max)
    Kep = calculate_Ke((np.float32(1)-W), Dep, TEW, REW, Kcmax, Kcb, fewp)

    # Prepare coefficients for evapotranspiration
    Tei = calculate_Te(Dei, Dr, Ks, Kcb, Ze, Zr, TEW, TAW, ET0)
    Tep = calculate_Te(Dep, Dr, Ks, Kcb, Ze, Zr, TEW, TAW, ET0)

    # Update depletions
    Dei = update_De_from_Diff(Dei, fewi, Kei, Tei, Diff_rei, TEW, ET0)
    Dep = update_De_from_Diff(Dep, fewp, Kep, Tep, Diff_rep, TEW, ET0)

    # Equation: De = (Dei * fewi + Dep * fewp) / (fewi + fewp)
    # Equation: De = where(De.isfinite, De, Dei * FW + Dep * (1 - FW))
    De = np.where(np.isfinite(De), De, Dei * FW + Dep * (np.float32(1) - FW))

    # Transpiration
    Tr = Kcb * Ks * ET0

    # Update depletions (root and deep zones) at the end of the day
    Dr = np.minimum(np.maximum(Dr + E + Tr - Diff_dr, np.float32(0)), TAW)
    Dd = np.minimum(np.maximum(Dd + Diff_dr, np.float32(0)), TDW)
    # Soil water content of evaporative laye
    SWCe = calculate_SWCe(Dei, Dep, fewi, fewp, TEW)
    DP = (DP * scaling_array[find_index(scaling_names, 'DP')]).astype(np.int16)
    Irrig = (Irrig * scaling_array[find_index(scaling_names, 'Irr')]).astype(np.int16)
    SWCe = (SWCe * scaling_array[find_index(scaling_names, 'SWCe')]).astype(np.int16)
    SWCr = (SWCr * scaling_array[find_index(scaling_names, 'SWCr')]).astype(np.int16)
    E = (E * scaling_array[find_index(scaling_names, 'E')]).astype(np.int16)
    Tr = (Tr * scaling_array[find_index(scaling_names, 'Tr')]).astype(np.uint16)
    if write_additional_data:
        variable_mapping = {
            'FCov': FCov, 'Zr': Zr, 'TAW': TAW, 'TDW': TDW, 'TEW': TEW, 'RUE': RUE, 'Dei': Dei, 'Dep': Dep,
            'De': De, 'Dr': Dr, 'Dd': Dd, 'Kcb': Kcb, 'fewi': fewi, 'fewp': fewp, 'Diff_rei': Diff_rei,
            'Diff_rep': Diff_rep, 'Diff_dr': Diff_dr, 'Ks': Ks, 'W': W, 'Kei': Kei, 'Kep': Kep, 'Tei': Tei,
            'Tep': Tep}
        
        for var, idx in zip(additional_names, additional_idx):
            additional_outputs_data[idx, iday % time_slice, :, :] = np.round(variable_mapping[var] * additional_outputs[idx]).astype(np.int16)
    
    return DP, Dd, Dei, Dep, Dr, E, Irrig, Irrig_test, SWCe, SWCr, Tr, Zr
def run_samir(csv_param_file: str, ndvi_cube_path: str, weather_path: str, soil_params_path: str, land_cover_path: str, irrigation_raster: str, init_RU_path: str, Kcb_max_obs_path: str, save_path: str, scaling_dict: dict[str, int] = {'E': 1000, 'Tr': 1000, 'SWCe': 1000, 'SWCr': 1000, 'DP': 100, 'Irr': 100}, additional_outputs: dict[str, int] = None, available_ram: int = 8, available_cpu: int = 4, compress_outputs: bool = False) -> None:
    """
    Run the *SAMIR* model on the prepared inputs. Calls the ``samir_daily()`` in a time loop.
    Maximizes memory usage with given limits to run faster.

    Arguments
    =========

    1. csv_param_file: ``str``
        SAMIR csv parameter file
    2. ndvi_cube_path: ``str``
        path to ndvi cube
    3. weather_path: ``str``
        path to weather cube
    4. soil_params_path: ``str``
        path to soil dataset
    5. land_cover_path: ``str``
        path to land cover raster
    6. irrigation_raster: ``str``
        path to netCDF file containing an irrigation mask
        for input parameters
    7. init_RU_path: ``str``
        path to netCDF file containing initial soil water content raster
    8. Kcb_max_obs_path: ``str``
        path to netCDF containing a single
        array of maximum observed Kcb values
    9. scaling_dict: ``Dict[str, int]`` ``default = {'E': 1000, 'Tr': 1000, 'SWCe': 1000, 'SWCr': 1000, 'DP': 100, 'Irr': 100}``
        scaling dictionnary for the nominal outputs
    10. additional_outputs: ``Dict[str, int]`` ``default = None``
        dictionnary containing the names and scale
        factors of potential additional outouts
    11. available_ram: ``int`` ``default = 8``
        available RAM in **GiB** for the model
    12. available_cpu: ``int`` ``default = 4``
        number of available **physical** CPU cores
    12. compress_outputs: ``bool`` ``default = False``
        choice to compress output file (takes longer)

    # Turn off numpy warings
    np.seterr(divide='ignore', invalid='ignore')
    
    # Test if memory requirement is not loo large
    if np.ceil(virtual_memory().available / (1024**3)) < available_ram:
        print('\nRequested', available_ram, 'GiB of memory when available memory is approximately', round(virtual_memory().available / (1024**3), 1), 'GiB.\n\nExiting script.\n')
        return None

    # Set maximum number of usable CPU cores
    # Get number of CPU cores and limit max value (working on a cluster requires os.sched_getaffinity to get true number of available CPUs, 
    # this is not true on a "personnal" computer, hence the use of the min function)
    try:
        nb_threads = min([available_cpu * 2, cpu_count(logical = True), len(os.sched_getaffinity(0))])
    except:
        nb_threads = min([available_cpu * 2, cpu_count(logical = True)])  # os.sched_getaffinity won't work on windows
    # ============ Manage inputs ============ #
    # NDVI (to have a correct empty dataset structure)
    ndvi_cube = xr.open_dataset(ndvi_cube_path)
    ndvi_cube_empty = ndvi_cube.drop_sel(time = ndvi_cube.time)  # create dataset with a time dimension of length 0
    parameter_array, Irrig_auto, Init_RU, param_list = rasterize_samir_parameters(csv_param_file, land_cover_path, irrigation_raster, init_RU_path)
    
    # ============ Get size of dataset ============ #
    x_size = ndvi_cube.sizes['x']
    y_size = ndvi_cube.sizes['y']
    time_size = ndvi_cube.sizes['time']
    dimensions = ndvi_cube_empty.sizes  # to create empty output dataset
    # ============ Scaling factors ============ #
    # Standard scaling dict
    scaling_array = []
    for var in scaling_dict.keys():
        scaling_array.append(scaling_dict[var])
    scaling_array = np.ascontiguousarray(np.array(scaling_array, dtype = np.int16))
    scaling_names = List(list(scaling_dict.keys()))
    
    # Manage additional output parameters
    if additional_outputs is not None:
        # Additional output scaling dict
        additional_scaling_array = []
        for var in additional_outputs.keys():
            additional_scaling_array.append(additional_outputs[var])
        additional_scaling_array = np.array(additional_scaling_array, dtype = np.int16)
        additional_names = list(additional_outputs.keys())
        field_name_to_index = {name: idx for idx, name in enumerate(additional_outputs)}
        additional_idx = np.ascontiguousarray(np.array([field_name_to_index[var] for var in additional_names], dtype = np.int16))
        additional_names = List(additional_names)
        write_additional_data = True
    else:
        # Create empty variables for correct numba parsing
        additional_scaling_array = np.array([0], dtype = np.int16)
        additional_idx = np.array([0], dtype = np.int16)
        additional_names = List(['0'])
        additional_outputs_data = np.empty(shape = (1,1,1,1), dtype = np.int16)
        write_additional_data = False
    
    # ============ Memory handling ============ #
    # Check how much memory the calculation would take if all the inputs would be loaded in memory
    # Unit is GiB
    # Datatype of variables is float32 for calculation
    nb_bytes = 2  # uint16 or int16
    nb_inputs = 3  # NDVI, Rain, ET0
    if additional_outputs:
        nb_outputs = 6 + len(additional_outputs)  # DP, E, Irr, SWCe, SWCr, Tr
    else:
        nb_outputs = 6  # DP, E, Irr, SWCe, SWCr, Tr
    nb_variables = 38  # calculation variables (e.g:  Dd, Diff_rei, FCov, etc.)
    total_memory_requirement = calculate_memory_requirement(x_size, y_size, time_size, nb_inputs, nb_outputs, nb_variables, nb_params, nb_bytes)
    
    # Determine how many time slices can be loaded in memory at once
    # This will allow faster I/O operations and a faster runtime
    time_slice, remainder_to_load, already_loaded = calculate_time_slices_to_load(x_size, y_size, time_size, nb_inputs, nb_outputs, nb_variables, nb_params, nb_bytes, available_ram)
    actual_memory_requirement = calculate_memory_requirement(x_size, y_size, time_slice, nb_inputs, nb_outputs, nb_variables, nb_params, nb_bytes)
    print_size1, print_unit1 = format_Byte_size(total_memory_requirement, decimals = 1)
    print_size2, print_unit2, = format_Byte_size(actual_memory_requirement, decimals = 1)
    print('\nApproximate memory requirement of calculation:', print_size1, print_unit1 + ', available memory:', available_ram, 'GiB\n\nLoading blocks of', time_slice, 'time bands, actual memory requirement:', print_size2, print_unit2, '\n')
    
    # ============ Prepare outputs ============ #
    model_outputs = prepare_output_dataset(ndvi_cube_path, dimensions, scaling_dict, additional_outputs = additional_outputs)
    for variable in list(model_outputs.keys()):
        # Write encoding dict
        encod = {}
        if variable not in scaling_dict.keys():  # additional outputs are not in the scaling dict
            encod['dtype'] = 'i2'
        else:
            encod['dtype'] = 'u2'
        # TODO: figure out optimal file chunk size
        if compress_outputs:
            encod['zlib'] = True
            encod['complevel'] = 1
        encoding_dict[variable] = encod

    # Save empty output
    model_outputs.to_netcdf(save_path, encoding = encoding_dict, unlimited_dims = 'time')  # add time as an unlimited dimension, allows to append data along the time dimension
    model_outputs.close()

    # ============ Prepare time iterations ============#

    # input soil data
    with nc.Dataset(soil_params_path, mode = 'r') as ds:
        ds.variables['Wfc'].set_auto_mask(False)
        Wfc = ds.variables['Wfc'][:, :]
        ds.variables['Wwp'].set_auto_mask(False)
        Wwp = ds.variables['Wwp'][:, :]
    
    # Observed Kcb max data
    with nc.Dataset(Kcb_max_obs_path, mode = 'r') as ds:
        ds.variables['Kcb_max_obs'].set_auto_mask(False)
        Kcb_max_obs = ds.variables['Kcb_max_obs'][:, :]
    progress_bar = tqdm(total = len(dates), desc = '', unit = ' days')

    for i in range(0, len(dates)):

        # ============ Load input data and prepare output data ============ #
        # Based on available memory and previous calculation of time slices to load,
        # load a given number of time slices
        if time_slice == time_size and not already_loaded:  # if whole dataset fits in memory and it has not already been loaded
            
            # Update progress bar
            progress_bar.set_description_str(desc = 'Reading inputs')
            
            NDVI, Rain, ET0 = read_inputs(ndvi_cube_path, weather_path, i, time_slice, load_all = True)
            already_loaded = True
            
            # Prepare output data
            # Standard outputs
            DP, E, Irrig, SWCe, SWCr, Tr = get_empty_arrays(x_size, y_size, time_slice, 6)
            # Additionnal outputs
            if additional_outputs:
                additional_outputs_data = get_empty_arrays(x_size, y_size, time_slice, len(additional_outputs.keys()), array = True)
                
            # Update progress bar
            progress_bar.set_description_str(desc = 'Running model')
            
        elif i % time_slice == 0:  # load a time slice every time i is divisible by the size of the time slice
            if i + time_slice <= time_size:  # if the time slice does not gow over the dataset size
                
                # Update progress bar
                progress_bar.set_description_str(desc = 'Reading inputs')
                
                NDVI, Rain, ET0 = read_inputs(ndvi_cube_path, weather_path, i, time_slice)
                
                # Prepare output data
                DP, E, Irrig, SWCe, SWCr, Tr = get_empty_arrays(x_size, y_size, time_slice, 6)
                # Additionnal outputs
                if additional_outputs:
                    additional_outputs_data = get_empty_arrays(x_size, y_size, time_slice, len(additional_outputs.keys()), array = True)
                
                # Update progress bar
                progress_bar.set_description_str(desc = 'Running model')

            else:  # load the remainder when the time slice would go over the dataset size
                
                # Update progress bar
                progress_bar.set_description_str(desc = 'Reading inputs')
                
                NDVI, Rain, ET0 = read_inputs(ndvi_cube_path, weather_path, i, remainder_to_load)
                
                # Prepare output data
                DP, E, Irrig, SWCe, SWCr, Tr = get_empty_arrays(x_size, y_size, remainder_to_load, 6)
                # Additionnal outputs
                if additional_outputs:
                    additional_outputs_data = get_empty_arrays(x_size, y_size, remainder_to_load, len(additional_outputs.keys()), array = True)
                
                # Update progress bar
                progress_bar.set_description_str(desc = 'Running model')
            E0, Tr0, Zr0, Dei0, Dep0, Dr0, Dd0, Irrig_test = get_starting_conditions(parameter_array, param_list, NDVI[0], Init_RU, Wfc, Wwp, y_size, x_size)

        # Run SAMIR daily
        DP[i % time_slice], Dd0, Dei0, Dep0, Dr0, E[i % time_slice], Irrig[i % time_slice], Irrig_test, SWCe[i % time_slice], SWCr[i % time_slice], Tr[i % time_slice], Zr0 = samir_daily(NDVI[i % time_slice], ET0[i % time_slice], Rain[i % time_slice], Wfc, Wwp, Kcb_max_obs, Irrig_auto, Irrig_test, Dr0, Dd0, Zr0, E0, Tr0, Dei0, Dep0, i, parameter_array, param_list, scaling_names, scaling_array, write_additional_data, additional_scaling_array, additional_names, additional_idx, additional_outputs_data, time_slice)
        E0, Tr0  = E[i % time_slice], Tr[i % time_slice]
        
        # ============ Write outputs ============ #
        # Based on available memory and previous calculation of time slices to load,
        if time_slice == time_size and i == time_size - 1:  # if whole dataset fits in memory, write data at the end of the loop
            
            # Remove unecessary data
            del NDVI, Rain, ET0
            collect()  # free up unused memory
            
            # Update progress bar
            progress_bar.set_description_str(desc = 'Writing outputs')
            
            write_outputs(save_path, DP, SWCe, SWCr, E, Tr, Irrig, additional_outputs, additional_outputs_data, i, time_slice, write_all = True)
            
            # Remove written data
            del DP, SWCe, SWCr, E, Tr, Irrig, additional_outputs_data
            additional_outputs_data = None
        elif (i % time_slice == time_slice - 1) and (remainder_to_load != None):  # write a time slice every time i is divisible by the size of the time slice
            
            # Remove unecessary data
            del NDVI, Rain, ET0
            collect()  # free up unused memory
            
            # Update progress bar
            progress_bar.set_description_str(desc = 'Writing outputs')
            
            write_outputs(save_path, DP, SWCe, SWCr, E, Tr, Irrig, additional_outputs, additional_outputs_data, i, time_slice)
            
            # Remove written data
            del DP, SWCe, SWCr, E, Tr, Irrig, additional_outputs_data
            additional_outputs_data = None
        elif i == time_size - 1 and write_remainder:  # write the remainder when the time slice would go over the dataset size
            # Remove unecessary data
            del NDVI, Rain, ET0
            collect()  # free up unused memory
            
            # Update progress bar
            progress_bar.set_description_str(desc = 'Writing outputs')
            
            write_outputs(save_path, DP, SWCe, SWCr, E, Tr, Irrig, additional_outputs, additional_outputs_data, i, remainder_to_load)

            # Remove written data
            del DP, SWCe, SWCr, E, Tr, Irrig, additional_outputs_data
            additional_outputs_data = None
        # Update progress bar
        progress_bar.update()

    # Close progress bar
    progress_bar.close()
    
    print('\nWritting output dataset:', save_path)
Jeremy Auclair's avatar
Jeremy Auclair committed
    # Reopen output model file to reinsert dates  
    with nc.Dataset(save_path, mode = 'a') as model_outputs:
        model_outputs.variables['time'].units = f'days since {np.datetime_as_string(dates[0], unit = "D")} 00:00:00'  # set correct unit
        model_outputs.variables['time'][:] = np.arange(0, len(dates))  # save dates as integers representing the number of days since the first day
        model_outputs.sync() # flush data to disk