{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import xarray as xr\n", "from dask.distributed import Client\n", "import os\n", "import numpy as np\n", "from typing import List, Tuple, Union\n", "import warnings\n", "import gc\n", "from parameters.params_samir_class import samir_parameters\n", "from config.config import config\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "\n", "def rasterize_samir_parameters(csv_param_file: str, empty_dataset: xr.Dataset, land_cover_raster: str, chunk_size: dict) -> Tuple[xr.Dataset, dict]:\n", " \"\"\"\n", " Creates a raster `xarray` dataset from the csv parameter file, the land cover raster and an empty dataset\n", " that contains the right structure (emptied ndvi dataset for example). For each parameter, the function loops\n", " on land cover classes to fill the raster.\n", "\n", " ## Arguments\n", " 1. csv_param_file: `str`\n", " path to csv paramter file\n", " 2. empty_dataset: `xr.Dataset`\n", " empty dataset that contains the right structure (emptied ndvi dataset for example).\n", " 3. land_cover_raster: `str`\n", " path to land cover netcdf raster\n", " 4. chunk_size: `dict`\n", " chunk_size for dask computation\n", "\n", " ## Returns\n", " 1. parameter_dataset: `xr.Dataset`\n", " the dataset containing all the rasterized Parameters\n", " 2. scale_factor: `dict`\n", " dictionnary containing the scale factors for each parameter\n", " \"\"\"\n", " \n", " # Load samir params into an object\n", " table_param = samir_parameters(csv_param_file)\n", " \n", " # Set general variables\n", " class_count = table_param.table.shape[1] - 2 # remove dtype and default columns\n", " \n", " # Open land cover raster\n", " land_cover = xr.open_dataarray(land_cover_raster, chunks = chunk_size)\n", " \n", " # Create dataset\n", " parameter_dataset = empty_dataset.copy(deep = True)\n", " \n", " # Create dictionnary containing the scale factors\n", " scale_factor = {}\n", " \n", " # Loop on samir parameters and create \n", " for parameter in table_param.table.index[1:]:\n", " \n", " # Create new variable and set attributes\n", " parameter_dataset[parameter] = land_cover.copy(deep = True).astype('f4')\n", " parameter_dataset[parameter].attrs['name'] = parameter\n", " parameter_dataset[parameter].attrs['description'] = 'cf SAMIR Doc for detail'\n", " parameter_dataset[parameter].attrs['scale factor'] = str(table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0])\n", " \n", " # Assigne value in dictionnary\n", " scale_factor[parameter] = 1/int(table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0])\n", " \n", " # Loop on classes to set parameter values for each class\n", " for class_val, class_name in zip(range(1, class_count + 1), table_param.table.columns[2:]):\n", " \n", " # Parameter values are multiplied by the scale factor in order to store all values as int16 types\n", " # These values are then rounded to make sure there isn't any decimal point issues when casting the values to int16\n", " parameter_dataset[parameter].values = np.where(parameter_dataset[parameter].values == class_val, round(table_param.table.loc[table_param.table.index == parameter][class_name].values[0]*table_param.table.loc[table_param.table.index == parameter]['scale_factor'].values[0]), parameter_dataset[parameter].values).astype('f4')\n", " \n", " # Return dataset converted to 'int16' data type to reduce memory usage\n", " # and scale_factor dictionnary for later conversion\n", " return parameter_dataset, scale_factor\n", "\n", "\n", "def setup_time_loop(calculation_variables_t1: List[str], calculation_variables_t2: List[str], empty_dataset: xr.Dataset) -> Tuple[xr.Dataset, xr.Dataset]:\n", " \"\"\"\n", " Creates two temporary `xarray Datasets` that will be used in the SAMIR time loop.\n", " `variables_t1` corresponds to the variables for the previous day and `variables_t2`\n", " corresponds to the variables for the current day. After each loop, `variables_t1`\n", " takes the value of `variables_t2` for the corresponding variables.\n", "\n", " ## Arguments\n", " 1. calculation_variables_t1: `List[str]`\n", " list of strings containing the variable names\n", " for the previous day dataset\n", " 2. calculation_variables_t2: `List[str]`\n", " list of strings containing the variable names\n", " for the current day dataset\n", " 3. empty_dataset: `xr.Dataset`\n", " empty dataset that contains the right structure\n", "\n", " ## Returns\n", " 1. variables_t1: `xr.Dataset`\n", " output dataset for previous day\n", " 2. variables_t2: `xr.Dataset`\n", " output dataset for current day\n", " \"\"\"\n", " \n", " # Create new dataset\n", " variables_t1 = empty_dataset.copy(deep = True)\n", " \n", " # Create empty DataArray for each variable\n", " for variable in calculation_variables_t1:\n", " \n", " # Assign new empty DataArray\n", " variables_t1[variable] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'float32'))\n", " variables_t1[variable].attrs['name'] = variable # set name in attributes\n", " \n", " # Create new dataset\n", " variables_t2 = empty_dataset.copy(deep = True)\n", " \n", " # Create empty DataArray for each variable\n", " for variable in calculation_variables_t2:\n", " \n", " # Assign new empty DataArray\n", " variables_t2[variable] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'float32'))\n", " variables_t2[variable].attrs['name'] = variable # set name in attributes\n", " \n", " return variables_t1, variables_t2\n", "\n", "\n", "def prepare_outputs(empty_dataset: xr.Dataset, additional_outputs: List[str] = None) -> xr.Dataset:\n", " \"\"\"\n", " Creates the `xarray Dataset` containing the outputs of the SAMIR model that will be saved.\n", " Additional variables can be saved by adding their names to the `additional_outputs` list.\n", "\n", " ## Arguments\n", " 1. empty_dataset: `xr.Dataset`\n", " empty dataset that contains the right structure\n", " 2. additional_outputs: `List[str]`\n", " list of additional variable names to be saved\n", "\n", " ## Returns\n", " 1. model_outputs: `xr.Dataset`\n", " model outputs to be saved\n", " \"\"\"\n", " \n", " # Evaporation and Transpiraion\n", " model_outputs = empty_dataset.copy(deep = True)\n", " \n", " model_outputs['E'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " model_outputs['E'].attrs['units'] = 'mm'\n", " model_outputs['E'].attrs['standard_name'] = 'Evaporation'\n", " model_outputs['E'].attrs['description'] = 'Accumulated daily evaporation in milimeters'\n", " model_outputs['E'].attrs['scale factor'] = '1'\n", " \n", " model_outputs['Tr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " model_outputs['Tr'].attrs['units'] = 'mm'\n", " model_outputs['Tr'].attrs['standard_name'] = 'Transpiration'\n", " model_outputs['Tr'].attrs['description'] = 'Accumulated daily plant transpiration in milimeters'\n", " model_outputs['Tr'].attrs['scale factor'] = '1'\n", " \n", " # Soil Water Content\n", " model_outputs['SWCe'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " model_outputs['SWCe'].attrs['units'] = 'mm'\n", " model_outputs['SWCe'].attrs['standard_name'] = 'Soil Water Content of the evaporative zone'\n", " model_outputs['SWCe'].attrs['description'] = 'Soil water content of the evaporative zone in milimeters'\n", " model_outputs['SWCe'].attrs['scale factor'] = '1'\n", " \n", " model_outputs['SWCr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " model_outputs['SWCr'].attrs['units'] = 'mm'\n", " model_outputs['SWCr'].attrs['standard_name'] = 'Soil Water Content of the root zone'\n", " model_outputs['SWCr'].attrs['description'] = 'Soil water content of the root zone in milimeters'\n", " model_outputs['SWCr'].attrs['scale factor'] = '1'\n", " \n", " # Irrigation\n", " model_outputs['Irr'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " model_outputs['Irr'].attrs['units'] = 'mm'\n", " model_outputs['Irr'].attrs['standard_name'] = 'Irrigation'\n", " model_outputs['Irr'].attrs['description'] = 'Simulated daily irrigation in milimeters'\n", " model_outputs['Irr'].attrs['scale factor'] = '1'\n", " \n", " # Deep Percolation\n", " model_outputs['DP'] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " model_outputs['DP'].attrs['units'] = 'mm'\n", " model_outputs['DP'].attrs['standard_name'] = 'Deep Percolation'\n", " model_outputs['DP'].attrs['description'] = 'Simulated daily Deep Percolation in milimeters'\n", " model_outputs['DP'].attrs['scale factor'] = '1'\n", " \n", " if additional_outputs:\n", " for var in additional_outputs:\n", " model_outputs[var] = (empty_dataset.dims, np.zeros(tuple(empty_dataset.dims[d] for d in list(empty_dataset.dims)), dtype = 'int16'))\n", " \n", " return model_outputs\n", "\n", "\n", "def xr_maximum(ds: xr.DataArray, value: Union[xr.DataArray, float, int]) -> xr.DataArray:\n", " \"\"\"\n", " Equivalent of `numpy.maximum(ds, value)` for xarray DataArrays\n", "\n", " ## Arguments\n", " 1. ds: `xr.DataArray`\n", " datarray to compare\n", " 2. value: `Union[xr.DataArray, float, int]`\n", " value (scalar or dataarray) to compare\n", "\n", " ## Returns\n", " 1. output: `xr.DataArray`\n", " resulting dataarray with maximum value element-wise\n", " \"\"\"\n", " return xr.where(ds <= value, value, ds, keep_attrs = True)\n", "\n", "\n", "def xr_minimum(ds: xr.DataArray, value: Union[xr.DataArray, float, int]) -> xr.DataArray:\n", " \"\"\"\n", " Equivalent of `numpy.minimum(ds, value)` for xarray DataArrays\n", "\n", " ## Arguments\n", " 1. ds: `xr.DataArray`\n", " datarray to compare\n", " 2. value: `Union[xr.DataArray, float, int]`\n", " value (scalar or dataarray) to compare\n", "\n", " ## Returns\n", " 1. output: `xr.DataArray`\n", " resulting dataarray with minimum value element-wise\n", " \"\"\"\n", " return xr.where(ds >= value, value, ds, keep_attrs = True)\n", "\n", "\n", "def calculate_diff_re(TAW: xr.DataArray, Dr: xr.DataArray, Zr: xr.DataArray, RUE: xr.DataArray, De: xr.DataArray, FCov: xr.DataArray, Ze_: xr.DataArray, DiffE_: xr.DataArray, scale_dict: dict) -> xr.DataArray:\n", " \"\"\"\n", " Calculates the diffusion between the top soil layer and the root layer.\n", "\n", " ## Arguments\n", " 1. TAW: `xr.DataArray`\n", " water capacity of root layer\n", " 2. Dr: `xr.DataArray`\n", " depletion of root layer\n", " 3. Zr: `xr.DataArray`\n", " height of root layer\n", " 4. RUE: `xr.DataArray`\n", " total available surface water\n", " 5. De: `xr.DataArray`\n", " depletion of the evaporative layer\n", " 6. FCov: `xr.DataArray`\n", " fraction cover of plants\n", " 7. Ze_: `xr.DataArray`\n", " height of evaporative layer (paramter)\n", " 8. DiffE_: `xr.DataArray`\n", " diffusion coefficient between evaporative\n", " and root layers (unitless, parameter)\n", " 9. scale_dict: `dict`\n", " dictionnary containing the scale factors for\n", " the rasterized parameters\n", "\n", " ## Returns\n", " 1. diff_re: `xr.Dataset`\n", " the diffusion between the top soil layer and\n", " the root layer\n", " \"\"\"\n", " \n", " # Temporary variables to make calculation easier to read\n", " tmp1 = (((TAW - Dr) / Zr - (RUE - De) / (scale_dict['Ze'] * Ze_)) / FCov) * (scale_dict['DiffE'] * DiffE_)\n", " tmp2 = ((TAW * scale_dict['Ze'] * Ze_) - (RUE - De - Dr) * Zr) / (Zr + scale_dict['Ze'] * Ze_) - Dr\n", " \n", " # Calculate diffusion according to SAMIR equation\n", " diff_re = xr.where(tmp1 < 0, xr_maximum(tmp1, tmp2), xr_minimum(tmp1, tmp2))\n", "\n", " # Return zero values where the 'DiffE' parameter is equal to 0\n", " return xr.where(DiffE_ == 0, 0, diff_re)\n", "\n", "\n", "def calculate_diff_dr(TAW: xr.DataArray, TDW: xr.DataArray, Dr: xr.DataArray, Zr: xr.DataArray, Dd: xr.DataArray, FCov: xr.DataArray, Zsoil_: xr.DataArray, DiffR_: xr.DataArray, scale_dict: dict) -> xr.DataArray:\n", " \"\"\"\n", " Calculates the diffusion between the root layer and the deep layer.\n", "\n", " ## Arguments\n", " 1. TAW: `xr.DataArray`\n", " water capacity of root layer\n", " 2. TDW: `xr.DataArray`\n", " water capacity of deep layer\n", " 3. Dr: `xr.DataArray`\n", " depletion of root layer\n", " 4. Zr: `xr.DataArray`\n", " height of root layer\n", " 5. Dd: `xr.DataArray`\n", " depletion of deep layer\n", " 6. FCov: `xr.DataArray`\n", " fraction cover of plants\n", " 7. Zsoil_: `xr.DataArray`\n", " total height of soil (paramter)\n", " 8. DiffR_: `xr.DataArray`\n", " Diffusion coefficient between root\n", " and deep layers (unitless, parameter)\n", " 9. scale_dict: `dict`\n", " dictionnary containing the scale factors for\n", " the rasterized parameters\n", "\n", " ## Returns\n", " 1. diff_dr: `xr.Dataset`\n", " the diffusion between the root layer and the\n", " deep layer\n", " \"\"\"\n", " \n", " # Temporary variables to make calculation easier to read\n", " tmp1 = (((TDW - Dd) / (scale_dict['Zsoil'] * Zsoil_ - Zr) - (TAW - Dr) / Zr) / FCov) * scale_dict['DiffR'] * DiffR_\n", " tmp2 = (TDW *Zr - (TAW - Dr - Dd) * (scale_dict['Zsoil'] * Zsoil_ - Zr)) / (scale_dict['Zsoil'] * Zsoil_) - Dd\n", " \n", " # Calculate diffusion according to SAMIR equation\n", " diff_dr = xr.where(tmp1 < 0, xr_maximum(tmp1, tmp2), xr_minimum(tmp1, tmp2))\n", " \n", " # Return zero values where the 'DiffR' parameter is equal to 0\n", " return xr.where(DiffR_ == 0, 0, diff_dr)\n", "\n", "\n", "def calculate_W(TEW: xr.DataArray, Dei: xr.DataArray, Dep: xr.DataArray, fewi: xr.DataArray, fewp: xr.DataArray) -> xr.DataArray:\n", " \"\"\"\n", " Calculate W, the weighting factor to split the energy available\n", " for evaporation depending on the difference in water availability\n", " in the two evaporation components, ensuring that the larger and\n", " the wetter, the more the evaporation occurs from that component\n", "\n", " ## Arguments\n", " 1. TEW: `xr.DataArray`\n", " water capacity of evaporative layer\n", " 2. Dei: `xr.DataArray`\n", " depletion of the evaporative layer\n", " (irrigation part)\n", " 3. Dep: `xr.DataArray`\n", " depletion of the evaporative layer\n", " (precipitation part)\n", " 4. fewi: `xr.DataArray`\n", " soil fraction which is wetted by irrigation\n", " and exposed to evaporation\n", " 5. fewp: `xr.DataArray`\n", " soil fraction which is wetted by precipitation\n", " and exposed to evaporation\n", "\n", " ## Returns\n", " 1. W: `xr.DataArray`\n", " weighting factor W\n", " \"\"\"\n", " \n", " # Temporary variables to make calculation easier to read\n", " tmp = fewi * (TEW - Dei)\n", " \n", " # Calculate the weighting factor to split the energy available for evaporation\n", " W = 1 / (1 + (fewp * (TEW - Dep) / tmp ))\n", "\n", " # Return W \n", " return xr.where(tmp > 0, W, 0)\n", "\n", "\n", "def calculate_Kr(TEW: xr.DataArray, De: xr.DataArray, REW_: xr.DataArray, scale_dict: dict) -> xr.DataArray:\n", " \"\"\"\n", " calculates of the reduction coefficient for evaporation dependent \n", " on the amount of water in the soil using the FAO-56 method\n", "\n", " ## Arguments\n", " 1. TEW: `xr.DataArray`\n", " water capacity of evaporative layer\n", " 2. De: `xr.DataArray`\n", " depletion of evaporative layer\n", " 3. REW_: `xr.DataArray`\n", " readily evaporable water\n", " 4. scale_dict: `dict`\n", " dictionnary containing the scale factors for\n", " the rasterized parameters\n", "\n", " ## Returns\n", " 1. Kr: `xr.DataArray`\n", " Kr coefficient\n", " \"\"\"\n", " \n", " # Formula for calculating Kr\n", " Kr = (TEW - De) / (TEW - scale_dict['REW'] * REW_)\n", " \n", " # Return Kr\n", " return xr_maximum(0, xr_minimum(Kr, 1))\n", "\n", "\n", "def update_Dr(TAW: xr.DataArray, TDW: xr.DataArray, Zr: xr.DataArray, TAW0: xr.DataArray, TDW0: xr.DataArray, Dr0: xr.DataArray, Dd0: xr.DataArray, Zr0: xr.DataArray) -> xr.DataArray:\n", " \"\"\"\n", " Return the updated depletion for the root layer\n", "\n", " ## Arguments\n", " 1. TAW: `xr.DataArray`\n", " water capacity of root layer for current day\n", " 2. TDW: `xr.DataArray`\n", " water capacity of deep layer for current day\n", " 3. Zr: `xr.DataArray`\n", " root layer height for current day\n", " 4. TAW0: `xr.DataArray`\n", " water capacity of root layer for previous day\n", " 5. TDW0: `xr.DataArray`\n", " water capacity of deep layer for previous day\n", " 6. Dr0: `xr.DataArray`\n", " depletion of the root layer for previous day\n", " 7. Dd0: `xr.DataArray`\n", " depletion of the deep laye for previous day\n", " 8. Zr0: `xr.DataArray`\n", " root layer height for previous day\n", "\n", " ## Returns\n", " 1. output: `xr.DataArray`\n", " updated depletion for the root layer\n", " \"\"\"\n", " \n", " # Temporary variables to make calculation easier to read\n", " tmp1 = xr_maximum(Dr0 + Dd0 * (TAW - TAW0) / TDW0, 0)\n", " tmp2 = xr_minimum(Dr0 + Dd0 * (TAW - TAW0) / TDW0, TDW)\n", "\n", " # Return updated Dr\n", " return xr.where(Zr > Zr0, tmp1, tmp2)\n", "\n", "\n", "def update_Dd(TAW: xr.DataArray, TDW: xr.DataArray, Zr: xr.DataArray, TAW0: xr.DataArray, TDW0: xr.DataArray, Dd0: xr.DataArray, Zr0: xr.DataArray) -> xr.DataArray:\n", " \"\"\"\n", " Return the updated depletion for the deep layer\n", "\n", " ## Arguments\n", " 1. TAW: `xr.DataArray`\n", " water capacity of root layer for current day\n", " 2. TDW: `xr.DataArray`\n", " water capacity of deep layer for current day\n", " 3. TAW0: `xr.DataArray`\n", " water capacity of root layer for previous day\n", " 5. TDW0: `xr.DataArray`\n", " water capacity of deep layer for previous day\n", " 6. Dd0: `xr.DataArray`\n", " depletion of the deep laye for previous day\n", " 7. Zr0: `xr.DataArray`\n", " root layer height for previous day\n", "\n", " ## Returns\n", " 1. output: `xr.DataArray`\n", " updated depletion for the deep layer\n", " \"\"\"\n", " \n", " # Temporary variables to make calculation easier to read\n", " tmp1 = xr_maximum(Dd0 - Dd0 * (TAW - TAW0) / TDW0, 0)\n", " tmp2 = xr_minimum(Dd0 - Dd0 * (TAW - TAW0) / TDW0, TDW)\n", " \n", " # Return updated Dd\n", " return xr.where(Zr > Zr0, tmp1, tmp2)\n", "\n", "\n", "def run_samir(json_config_file: str, csv_param_file: str, ndvi_cube_path: str, weather_cube_path: str, soil_params_path: str, land_cover_path: str, chunk_size: dict, save_path: str, max_GB: int = 2) -> None:\n", " \n", " # warnings.simplefilter(\"error\", category = RuntimeWarning())\n", " warnings.filterwarnings(\"ignore\", message=\"invalid value encountered in cast\")\n", " warnings.filterwarnings(\"ignore\", message=\"invalid value encountered in divide\")\n", " np.errstate(all = 'raise')\n", " gc.disable()\n", " \n", " #============ General parameters ============#\n", " config_params = config(json_config_file)\n", " calculation_variables_t2 = ['diff_rei', 'diff_rep', 'diff_dr' , 'Dd', 'De', 'Dei', 'Dep', 'Dr', 'FCov', 'Irrig', 'Kcb', 'Kei', 'Kep', 'Ks', 'Kti', 'Ktp', 'RUE', 'TAW', 'TDW', 'TEW', 'Tei', 'Tep', 'W', 'Zr', 'fewi', 'fewp']\n", " calculation_variables_t1 = ['Dr', 'Dd', 'TAW', 'TDW', 'Zr']\n", " \n", " #============ Manage inputs ============#\n", " # NDVI\n", " ndvi_cube = xr.open_dataset(ndvi_cube_path, chunks = chunk_size).astype('f4')\n", " \n", " # # Create a daily DateTimeIndex for the desired date range\n", " # daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')\n", "\n", " # # Resample the dataset to a daily frequency and reindex with the new DateTimeIndex\n", " # ndvi_cube = ndvi_cube.resample(time = '1D').asfreq().reindex(time = daily_index)\n", "\n", " # # Interpolate the dataset along the time dimension to fill nan values\n", " # ndvi_cube = ndvi_cube.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').astype('u1')\n", " \n", " # Weather\n", " weather_cube = xr.open_dataset(weather_cube_path, chunks = chunk_size).astype('f4')\n", " \n", " # Soil\n", " soil_params = xr.open_dataset(soil_params_path, chunks = chunk_size).astype('f4')\n", " \n", " # SAMIR Parameters\n", " param_dataset, scale_factor = rasterize_samir_parameters(csv_param_file, ndvi_cube.drop_vars(['ndvi', 'time']), land_cover_path, chunk_size = chunk_size)\n", " \n", " # SAMIR Variables\n", " variables_t1, variables_t2 = setup_time_loop(calculation_variables_t1, calculation_variables_t2, ndvi_cube.drop_vars(['ndvi', 'time']))\n", " \n", " # Manage loading of data based on disk size of inputs\n", " if ndvi_cube.nbytes < max_GB * (1024)**3:\n", " ndvi_cube.load()\n", " \n", " if weather_cube.nbytes < max_GB * (1024)**3:\n", " weather_cube.load()\n", "\n", " #============ Prepare outputs ============#\n", " model_outputs = prepare_outputs(ndvi_cube.drop_vars(['ndvi']))\n", " \n", " #============ Prepare time iterations ============#\n", " dates = ndvi_cube.time.values\n", " \n", " #============ Create aliases for better readability ============#\n", " \n", " # Variables for current day\n", " diff_rei = variables_t2.diff_rei\n", " diff_rep = variables_t2.diff_rep\n", " diff_dr = variables_t2.diff_dr\n", " Dd = variables_t2.Dd\n", " De = variables_t2.De\n", " Dei = variables_t2.Dei\n", " Dep = variables_t2.Dep\n", " Dr = variables_t2.Dr\n", " FCov = variables_t2.FCov\n", " Irrig = variables_t2.Irrig\n", " Kcb = variables_t2.Kcb\n", " Kei = variables_t2.Kei\n", " Kep = variables_t2.Kep\n", " Ks = variables_t2.Ks\n", " Kti = variables_t2.Kti\n", " Ktp = variables_t2.Ktp\n", " RUE = variables_t2.RUE\n", " TAW = variables_t2.TAW\n", " TDW = variables_t2.TDW\n", " TEW = variables_t2.TEW\n", " Tei = variables_t2.Tei\n", " Tep = variables_t2.Tep\n", " Zr = variables_t2.Zr\n", " W = variables_t2.W\n", " fewi = variables_t2.fewi\n", " fewp = variables_t2.fewp\n", " \n", " # Variables for previous day\n", " TAW0 = variables_t1.TAW\n", " TDW0 = variables_t1.TDW\n", " Dr0 = variables_t1.Dr\n", " Dd0 = variables_t1.Dd\n", " Zr0 = variables_t1.Zr\n", " \n", " # Parameters\n", " # Parameters have an underscore (_) behind their name for recognition \n", " DiffE_ = param_dataset.DiffE\n", " DiffR_ = param_dataset.DiffR\n", " FW_ = param_dataset.FW\n", " Fc_stop_ = param_dataset.Fc_stop\n", " FmaxFC_ = param_dataset.FmaxFC\n", " Foffset_ = param_dataset.Foffset\n", " Fslope_ = param_dataset.Fslope\n", " Init_RU_ = param_dataset.Init_RU\n", " Irrig_auto_ = param_dataset.Irrig_auto\n", " Kcmax_ = param_dataset.Kcmax\n", " KmaxKcb_ = param_dataset.KmaxKcb\n", " Koffset_ = param_dataset.Koffset\n", " Kslope_ = param_dataset.Kslope\n", " Lame_max_ = param_dataset.Lame_max\n", " REW_ = param_dataset.REW\n", " Ze_ = param_dataset.Ze\n", " Zsoil_ = param_dataset.Zsoil\n", " maxZr_ = param_dataset.maxZr\n", " minZr_ = param_dataset.minZr\n", " p_ = param_dataset.p\n", " \n", " # scale factors\n", " # Scale factors have the following name scheme : s_ + parameter_name\n", " s_DiffE = scale_factor['DiffE']\n", " s_DiffR = scale_factor['DiffR']\n", " s_FW = scale_factor['FW']\n", " s_Fc_stop = scale_factor['Fc_stop']\n", " s_FmaxFC = scale_factor['FmaxFC']\n", " s_Foffset = scale_factor['Foffset']\n", " s_Fslope = scale_factor['Fslope']\n", " s_Init_RU = scale_factor['Init_RU']\n", " # s_Irrig_auto = scale_factor['Irrig_auto']\n", " s_Kcmax = scale_factor['Kcmax']\n", " s_KmaxKcb = scale_factor['KmaxKcb']\n", " s_Koffset = scale_factor['Koffset']\n", " s_Kslope = scale_factor['Kslope']\n", " s_Lame_max = scale_factor['Lame_max']\n", " s_REW = scale_factor['REW']\n", " s_Ze = scale_factor['Ze']\n", " s_Zsoil = scale_factor['Zsoil']\n", " s_maxZr = scale_factor['maxZr']\n", " s_minZr = scale_factor['minZr']\n", " s_p = scale_factor['p']\n", " \n", " #============ First day initialization ============#\n", " # Fraction cover\n", " FCov = s_Fslope * Fslope_ * (ndvi_cube.ndvi.sel({'time': dates[0]})/255) + s_Foffset * Foffset_\n", " FCov = xr_minimum(xr_maximum(FCov, 0), s_Fc_stop * Fc_stop_)\n", " \n", " # Root depth upate\n", " Zr = s_minZr * minZr_ + (FCov / (s_FmaxFC * FmaxFC_)) * s_maxZr * (maxZr_ - minZr_)\n", " \n", " # Water capacities\n", " TEW = (soil_params.FC - soil_params.WP/2) * s_Ze * Ze_\n", " RUE = (soil_params.FC - soil_params.WP) * s_Ze * Ze_\n", " TAW = (soil_params.FC - soil_params.WP) * Zr\n", " TDW = (soil_params.FC - soil_params.WP) * (s_Zsoil * Zsoil_ - Zr) # Zd = Zsoil - Zr\n", " \n", " # Depletions\n", " Dei = RUE * (1 - s_Init_RU * Init_RU_)\n", " Dep = RUE * (1 - s_Init_RU * Init_RU_)\n", " Dr = TAW * (1 - s_Init_RU * Init_RU_)\n", " Dd = TDW * (1 - s_Init_RU * Init_RU_)\n", " \n", " # Irrigation ==============!!!!!\n", " Irrig = xr_minimum(xr_maximum(Dr - weather_cube.tp.sel({'time': dates[0]}) / 1000, 0), s_Lame_max * Lame_max_) * Irrig_auto_\n", " Irrig = xr.where(Dr > TAW * s_p * p_, Irrig, 0)\n", " \n", " # Kcb\n", " Kcb = xr_minimum(s_Kslope * Kslope_ * (ndvi_cube.ndvi.sel({'time': dates[0]}) / 255) + s_Koffset * Koffset_, s_KmaxKcb * KmaxKcb_)\n", " \n", " # Update depletions with rainfall and/or irrigation \n", " ## DP\n", " model_outputs.DP.loc[{'time': dates[0]}] = -xr_minimum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig, 0), 0)\n", " \n", " ## De\n", " Dei = xr_minimum(xr_maximum(Dei - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig / (s_FW * FW_ / 100), 0), TEW)\n", " Dep = xr_minimum(xr_maximum(Dep - (weather_cube.tp.sel({'time': dates[0]}) / 1000), 0), TEW)\n", " \n", " fewi = xr_minimum(1 - FCov, (s_FW * FW_ / 100))\n", " fewp = 1 - FCov - fewi\n", " \n", " De = (Dei * fewi + Dep * fewp) / (fewi + fewp)\n", " # variables_t1['De'] = xr.where(variables_t1['De'].isfinite(), variables_t1['De'], variables_t1['Dei'] * (s_FW * FW_ / 100) + variables_t1['Dep'] * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!\n", "\n", " ## Dr\n", " Dr = xr_minimum(xr_maximum(Dr - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig, 0), TAW)\n", " \n", " ## Dd\n", " Dd = xr_minimum(xr_maximum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[0]}) / 1000) - Irrig, 0), 0), TDW)\n", " \n", " # Diffusion coefficients\n", " diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, FCov, Ze_, DiffE_, scale_factor)\n", " diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, FCov, Ze_, DiffE_, scale_factor)\n", " diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, FCov, Zsoil_, DiffR_, scale_factor) \n", " \n", " # Weighing factor W\n", " W = calculate_W(TEW, Dei, Dep, fewi, fewp)\n", " \n", " # Soil water contents\n", " model_outputs.SWCe.loc[{'time': dates[0]}] = 1 - De/TEW\n", " model_outputs.SWCr.loc[{'time': dates[0]}] = 1 - Dr/TAW\n", " \n", " # Water Stress coefficient\n", " Ks = xr_minimum((TAW - Dr) / (TAW * (1 - s_p * p_)), 1)\n", " \n", " # Reduction coefficient for evaporation\n", " Kei = xr_minimum(W * calculate_Kr(TEW, Dei, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewi * s_Kcmax * Kcmax_)\n", " Kep = xr_minimum((1 - W) * calculate_Kr(TEW, Dep, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewp * s_Kcmax * Kcmax_)\n", " \n", " # Prepare coefficients for evapotranspiration\n", " Kti = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dei / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)\n", " Ktp = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dep / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)\n", " Tei = Kti * Ks * Kcb * weather_cube.ET0.sel({'time': dates[0]}) / 1000\n", " Tep = Ktp * Ks * Kcb * weather_cube.ET0.sel({'time': dates[0]}) / 1000\n", " \n", " # Update depletions\n", " Dei = xr.where(fewi > 0, xr_minimum(xr_maximum(Dei + (weather_cube.ET0.sel({'time': dates[0]}) / 1000) * Kei / fewi + Tei - diff_rei, 0), TEW), xr_minimum(xr_maximum(Dei + Tei - diff_rei, 0), TEW))\n", " Dep = xr.where(fewp > 0, xr_minimum(xr_maximum(Dep + (weather_cube.ET0.sel({'time': dates[0]}) / 1000) * Kep / fewp + Tep - diff_rep, 0), TEW), xr_minimum(xr_maximum(Dep + Tep - diff_rep, 0), TEW))\n", " \n", " De = (Dei * fewi + Dep * fewp) / (fewi + fewp)\n", " # De = xr.where(De.isfinite(), De, Dei * (s_FW * FW_ / 100) + Dep * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!\n", " \n", " # Evaporation\n", " model_outputs.E.loc[{'time': dates[0]}] = xr_maximum((Kei + Kep) * weather_cube.ET0.sel({'time': dates[0]}) / 1000, 0)\n", " \n", " # Transpiration\n", " model_outputs.Tr.loc[{'time': dates[0]}] = Kcb * Ks * weather_cube.ET0.sel({'time': dates[0]}) / 1000\n", " \n", " # Potential evapotranspiration and evaporative fraction ??\n", " \n", " # Update depletions (root and deep zones) at the end of the day\n", " Dr = xr_minimum(xr_maximum(Dr + model_outputs.E.loc[{'time': dates[0]}] + model_outputs.Tr.loc[{'time': dates[0]}] - diff_dr, 0), TAW)\n", " Dd = xr_minimum(xr_maximum(Dd + diff_dr, 0), TDW)\n", " \n", " # Write outputs\n", " model_outputs.Irr.loc[{'time': dates[0]}] = Irrig\n", " \n", " # Update variable_t1 values\n", " for variable in calculation_variables_t1:\n", " variables_t1[variable] = variables_t2[variable].copy(deep = True)\n", " \n", " #============ Time loop ============#\n", " for i in range(1, len(dates)):\n", " \n", " # Update variables\n", " ## Fraction cover\n", " FCov = s_Fslope * Fslope_ * (ndvi_cube.ndvi.sel({'time': dates[i]})/255) + s_Foffset * Foffset_\n", " FCov = xr_minimum(xr_maximum(FCov, 0), s_Fc_stop * Fc_stop_)\n", " \n", " ## Root depth upate\n", " Zr = s_minZr * minZr_ + (FCov / (s_FmaxFC * FmaxFC_)) * s_maxZr * (maxZr_ - minZr_)\n", " \n", " # Water capacities\n", " TAW = (soil_params.FC - soil_params.WP) * Zr\n", " TDW = (soil_params.FC - soil_params.WP) * (s_Zsoil * Zsoil_ - Zr)\n", " \n", " # Update depletions\n", " Dr = update_Dr(TAW, TDW, Zr, TAW0, TDW0, Dr0, Dd0, Zr0)\n", " Dd = update_Dd(TAW, TDW, Zr, TAW0, TDW0, Dd0, Zr0)\n", " \n", " # Update param p\n", " p_ = (xr_minimum(xr_maximum(s_p * p_ + 0.04 * (5 - weather_cube.ET0.sel({'time': dates[i-1]}) / 1000), 0.1), 0.8) * (1 / s_p)).round(0).astype('i2')\n", " \n", " # Irrigation ==============!!!!!\n", " Irrig = xr_minimum(xr_maximum(Dr - weather_cube.tp.sel({'time': dates[i]}) / 1000, 0), s_Lame_max * Lame_max_) * Irrig_auto_\n", " Irrig = xr.where(Dr > TAW * s_p * p_, Irrig, 0)\n", " \n", " # Kcb\n", " Kcb = xr_minimum(s_Kslope * Kslope_ * (ndvi_cube.ndvi.sel({'time': dates[i]}) / 255) + s_Koffset * Koffset_, s_KmaxKcb * KmaxKcb_)\n", " \n", " # Update depletions with rainfall and/or irrigation \n", " ## DP\n", " model_outputs.DP.loc[{'time': dates[i]}] = -xr_minimum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig, 0), 0)\n", " \n", " ## De\n", " Dei = xr_minimum(xr_maximum(Dei - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig / (s_FW * FW_ / 100), 0), TEW)\n", " Dep = xr_minimum(xr_maximum(Dep - (weather_cube.tp.sel({'time': dates[i]}) / 1000), 0), TEW)\n", " \n", " fewi = xr_minimum(1 - FCov, (s_FW * FW_ / 100))\n", " fewp = 1 - FCov - fewi\n", " \n", " De = (Dei * fewi + Dep * fewp) / (fewi + fewp)\n", " # variables_t1['De'] = xr.where(variables_t1['De'].isfinite(), variables_t1['De'], variables_t1['Dei'] * (s_FW * FW_ / 100) + variables_t1['Dep'] * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!\n", "\n", " ## Dr\n", " Dr = xr_minimum(xr_maximum(Dr - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig, 0), TAW)\n", " \n", " ## Dd\n", " Dd = xr_minimum(xr_maximum(Dd + xr_minimum(Dr - (weather_cube.tp.sel({'time': dates[i]}) / 1000) - Irrig, 0), 0), TDW)\n", " \n", " # Diffusion coefficients\n", " diff_rei = calculate_diff_re(TAW, Dr, Zr, RUE, Dei, FCov, Ze_, DiffE_, scale_factor)\n", " diff_rep = calculate_diff_re(TAW, Dr, Zr, RUE, Dep, FCov, Ze_, DiffE_, scale_factor)\n", " diff_dr = calculate_diff_dr(TAW, TDW, Dr, Zr, Dd, FCov, Zsoil_, DiffR_, scale_factor) \n", " \n", " # Weighing factor W\n", " W = calculate_W(TEW, Dei, Dep, fewi, fewp)\n", " \n", " # Soil water contents\n", " model_outputs.SWCe.loc[{'time': dates[i]}] = 1 - De/TEW\n", " model_outputs.SWCr.loc[{'time': dates[i]}] = 1 - Dr/TAW\n", " \n", " # Water Stress coefficient\n", " Ks = xr_minimum((TAW - Dr) / (TAW * (1 - s_p * p_)), 1)\n", " \n", " # Reduction coefficient for evaporation\n", " Kei = xr_minimum(W * calculate_Kr(TEW, Dei, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewi * s_Kcmax * Kcmax_)\n", " Kep = xr_minimum((1 - W) * calculate_Kr(TEW, Dei, REW_, scale_factor) * (s_Kcmax * Kcmax_ - Kcb), fewp * s_Kcmax * Kcmax_)\n", " \n", " # Prepare coefficients for evapotranspiration\n", " Kti = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dei / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)\n", " Ktp = xr_minimum(((s_Ze * Ze_ / Zr)**6) * (1 - Dep / TEW) / xr_maximum(1 - Dr / TAW, 0.001), 1)\n", " Tei = Kti * Ks * Kcb * weather_cube.ET0.sel({'time': dates[i]}) / 1000\n", " Tep = Ktp * Ks * Kcb * weather_cube.ET0.sel({'time': dates[i]}) / 1000\n", " \n", " # Update depletions\n", " Dei = xr.where(fewi > 0, xr_minimum(xr_maximum(Dei + (weather_cube.ET0.sel({'time': dates[i]}) / 1000) * Kei / fewi + Tei - diff_rei, 0), TEW), xr_minimum(xr_maximum(Dei + Tei - diff_rei, 0), TEW))\n", " Dep = xr.where(fewp > 0, xr_minimum(xr_maximum(Dep + (weather_cube.ET0.sel({'time': dates[i]}) / 1000) * Kep / fewp + Tep - diff_rep, 0), TEW), xr_minimum(xr_maximum(Dep + Tep - diff_rep, 0), TEW))\n", " \n", " De = (Dei * fewi + Dep * fewp) / (fewi + fewp)\n", " # De = xr.where(De.isfinite(), De, Dei * (s_FW * FW_ / 100) + Dep * (1 - (s_FW * FW_ / 100))) #================= find replacement for .isfinite() method !!!!!!!!!\n", " \n", " # Evaporation\n", " model_outputs.E.loc[{'time': dates[i]}] = xr_maximum((Kei + Kep) * weather_cube.ET0.sel({'time': dates[i]}) / 1000, 0)\n", " \n", " # Transpiration\n", " model_outputs.Tr.loc[{'time': dates[i]}] = Kcb * Ks * weather_cube.ET0.sel({'time': dates[i]}) / 1000\n", " \n", " # Potential evapotranspiration and evaporative fraction ??\n", " \n", " # Update depletions (root and deep zones) at the end of the day\n", " Dr = xr_minimum(xr_maximum(Dr + model_outputs.E.loc[{'time': dates[i]}] + model_outputs.Tr.loc[{'time': dates[i]}] - diff_dr, 0), TAW)\n", " Dd = xr_minimum(xr_maximum(Dd + diff_dr, 0), TDW)\n", " \n", " # Write outputs\n", " model_outputs.Irr.loc[{'time': dates[i]}] = Irrig\n", " \n", " # Update variable_t1 values\n", " for variable in calculation_variables_t1:\n", " variables_t1[variable] = variables_t2[variable].copy(deep = True)\n", " \n", " print('day ', i+1, '/', len(dates), ' ', end = '\\r')\n", " \n", " # Scale the model_outputs variable to save in int16 format\n", " model_outputs = model_outputs * 1000\n", " \n", " # Write encoding dict\n", " encoding_dict = {}\n", " for variable in list(model_outputs.keys()):\n", " encod = {}\n", " encod['dtype'] = 'i2'\n", " encoding_dict[variable] = encod\n", " \n", " # Save model outputs to netcdf\n", " model_outputs.to_netcdf(save_path, encoding = encoding_dict)\n", " \n", " return None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = Client()\n", "client" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "data_path = '/mnt/e/DATA/DEV_inputs_test'\n", "\n", "ndvi_path = data_path + os.sep + 'ndvi.nc'\n", "weather_path = data_path + os.sep + 'weather.nc'\n", "land_cover_path = data_path + os.sep + 'land_cover.nc'\n", "json_config_file = '/home/auclairj/GIT/modspa-pixel/config/config_modspa.json'\n", "param_file = '/home/auclairj/GIT/modspa-pixel/parameters/csv_files/params_samir_test.csv'\n", "soil_path = data_path + os.sep + 'soil.nc'\n", "save_path = data_path + os.sep + 'outputs.nc'\n", "\n", "chunk_size = {'x': -1, 'y': -1, 'time': -1}\n", "\n", "\n", "run_samir(json_config_file, param_file, ndvi_path, weather_path, soil_path, land_cover_path, chunk_size, save_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "modspa_pixel", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.9" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }