diff --git a/main_prepare_inputs.py b/main_prepare_inputs.py index 29db89da9e30db66577ea2f96717c58629d40871..080cc3127dd596c04f3fba43575e4c0dbbfb016a 100644 --- a/main_prepare_inputs.py +++ b/main_prepare_inputs.py @@ -116,7 +116,7 @@ if __name__ == '__main__': # Extract NDVI values on shapefile features csv_ndvi_extract_file = os.path.join(ndvi_path, run_name, 'ndvi_extract.csv') - raw_ndvi = extract_ndvi_stats(ndvi_precube, shapefile_path, ndvi_dates, csv_ndvi_extract_file) + raw_ndvi = extract_ndvi_stats(ndvi_precube, shapefile_path, ndvi_dates, csv_ndvi_extract_file, max_cpu = config_params.max_cpu * 2) # Process and interpolate NDVI values to a daily time step csv_ndvi_interp_file = os.path.join(ndvi_path, run_name, 'ndvi_interp.csv') diff --git a/parameters/params_samir_class.py b/parameters/params_samir_class.py index 5913d6e2d4cf79d512826e5456c178966c2991dd..2beb44caad930c69b447a52ec7d88aeb2f8fc595 100644 --- a/parameters/params_samir_class.py +++ b/parameters/params_samir_class.py @@ -161,6 +161,20 @@ class samir_parameters: # Equation: Koffset = - NDVIsol * Kslope self.table.at['Koffset', class_name] = - np.round(self.table.at['NDVIsol', class_name] * self.table.at['Kslope', class_name], decimals = round(np.log10(self.table.at['Koffset', 'scale_factor']))) + + if self.table.at['maxZr', class_name] > self.table.at['Zsoil', class_name]: + + print(f'\nmaxZr is higher than Zsoil for class {class_name}') + + # Set boolean to true to exit script + out_of_range_param = True + + if self.table.at['minZr', class_name] > self.table.at['maxZr', class_name]: + + print(f'\minZr is higher than maxZr for class {class_name}') + + # Set boolean to true to exit script + out_of_range_param = True # Test values of the parameters min_max_table = read_csv(min_max_param_file, index_col = 0) diff --git a/preprocessing/download_ERA5_weather.py b/preprocessing/download_ERA5_weather.py index f1373e4fbd71fd69e3a12b2dfebc2042bb7605f3..32360b6bd6a343a0229e45764cd524c4531e158f 100644 --- a/preprocessing/download_ERA5_weather.py +++ b/preprocessing/download_ERA5_weather.py @@ -175,7 +175,7 @@ def request_ER5_weather(config_file: str, ndvi_path: str, raw_S2_image_ref: str weather_daily_rain, weather_daily_ET0 = era5land.era5Land_daily_to_yearly_parcel(weather_files, variables, weather_daily_ncFile, start_date, end_date, h = wind_height) # Generate and save weather dataframe - era5land.extract_weather_dataframe(weather_daily_rain, weather_daily_ET0, shapefile, config_file, weather_datframe) + era5land.extract_weather_dataframe(weather_daily_rain, weather_daily_ET0, shapefile, config_file, weather_datframe, max_cpu = config_params.max_cpu) # Convert dataframe to xarray dataset convert_dataframe_to_xarray(weather_datframe, weather_dataset, variables = ['Rain', 'ET0'], data_types = ['u2', 'u2']) diff --git a/preprocessing/extract_ndvi.py b/preprocessing/extract_ndvi.py index c635d6f81cfc9039b11189b5a2b1273a396605de..5dfa7257914f11ce563273922985c607a2c8c22a 100644 --- a/preprocessing/extract_ndvi.py +++ b/preprocessing/extract_ndvi.py @@ -15,15 +15,146 @@ import rasterio as rio # to open geotiff files from datetime import datetime # manage dates from rasterio.mask import mask # to mask images import numpy as np # vectorized math -from shapely.geometry import box # to extract parcel statistics from tqdm import tqdm # for classic progress bars +from multiprocessing import Pool, Manager # to parallelize functions from p_tqdm import p_map # for multiprocessing with progress bars from psutil import cpu_count # to get number of physical cores available import warnings # To suppress some warnings from modspa_pixel.preprocessing.input_toolbox import find_anomalies -def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str, save_raw_dataframe_path: str, scaling: int = 255, buffer_distance: int = -10) -> list: +def init_worker(shared_value_, lock_) -> None: + """ + Function to initialize the pool workers with shared value and lock. + + Arguments + ========= + + 1. shared_value_: ``float`` + shared progress bar value + 2. lock_: ``Lock`` + lock to access shared value + """ + + global shared_value, lock + shared_value = shared_value_ + lock = lock_ + + +def divide_geodataframe(gdf: gpd.GeoDataFrame, n: int) -> list[gpd.GeoDataFrame]: + """ + Divide geodataframes into n equal parts. + + Arguments + ========= + + 1. gdf: ``gpd.GeoDataFrame`` + input geodataframe + 2. n: ``int`` + number of parts to divide into + + Returns + ======= + + 1. divided_gdfs: ``list[gpd.GeoDataFrame]`` + output geodataframes + """ + + # Calculate the size of each part + part_size = len(gdf) // n + remainder = len(gdf) % n + + # Create a list to store the divided GeoDataFrames + divided_gdfs = [] + + start_idx = 0 + for i in range(n): + end_idx = start_idx + part_size + (1 if i < remainder else 0) + divided_gdfs.append(gdf.iloc[start_idx:end_idx]) + start_idx = end_idx + + return divided_gdfs + + +def extract_ndvi_multiprocess(args: tuple) -> list: + """ + Wrapped extract ndvi function for multiprocessing. + + Arguments (packed in args) + ========================== + + 1. shapefile: ``gpd.GeoDataFrame`` + geodataframe containing polygons for extraction + 2. ndvi_cube: ``str`` + path to ndvi cube geotiff + 3. nodata: ``float`` + ndvi no data value + 4. scaling: ``int`` + ndvi scaling value + 5. dates: ``list[datetime.date]`` + list of ndvi datetimes + + Returns + ======= + + 1. ndvi_stats: ``list`` + extracted ndvi stats + """ + + # Extract args + shapefile, ndvi_cube, nodata, scaling, dates = args + + # Open dataset + ndvi_dataset = rio.open(ndvi_cube, mode = 'r') + + ndvi_stats = [] + + for index, row in shapefile.iterrows(): + + # Get the feature geometry as a shapely object + geom = row.geometry + + # id number of the current polygon + id = index + 1 + + # Get land cover + LC = row.LC + + # Crop the raster using the bounding box + try: + masked_raster, _ = mask(ndvi_dataset, [geom], crop = True) + except: + ndvi_stats.extend([[dates[i], id, 0, 0, LC] for i in range(len(dates))]) + continue + + # Mask the raster using the geometry + masked_raster, _ = mask(ndvi_dataset, [geom], crop = True) + + # Replace the nodata values with nan + masked_raster = masked_raster.astype(np.float32) + masked_raster[masked_raster == nodata] = np.nan + + # Correct scaling + np.round(masked_raster * (scaling / (scaling - 1)) - 1, decimals = 0, out = masked_raster) + + # Calculate the zonal statistics + mean = np.nanmean(masked_raster, axis = (1,2)) + count = np.count_nonzero(~np.isnan(masked_raster), axis = (1,2)) + + # Append current statistics to dataframe + ndvi_stats.extend([[dates[i], id, mean[i], count[i], LC] for i in range(len(dates))]) + + # Update progress bar + with lock: + shared_value.value += 1 + + # Close dataset + ndvi_dataset.close() + + return ndvi_stats + + +def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str, save_raw_dataframe_path: str, scaling: int = 255, buffer_distance: int = -10, max_cpu: int = 4) -> list: """ extract_ndvi_stats extracts ndvi statistics (``mean`` and ``count``) from a ndvi image and a geopandas shapefile object. It iterates over the features of the shapefile geometry (polygons). @@ -49,6 +180,8 @@ def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str 6. buffer_distance: ``int`` ``default = -10`` distance to buffer shapefile polygon to prevent extracting pixels that are not entirely in a polygon, in meters for Sentinel-2 and LandSat 8 images, < 0 to reduce size + 7. max_cpu: ``int`` ``default = 4`` + max number of CPU cores to use for calculation Returns @@ -66,62 +199,55 @@ def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str ndvi_stats = [] # Open ndvi image and shapefile geometry - ndvi_dataset = rio.open(ndvi_cube, mode = 'r') + with rio.open(ndvi_cube, mode = 'r') as ndvi_dataset: + # Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference + target_epsg = ndvi_dataset.crs + + # Get no data value + nodata = ndvi_dataset.nodata # Open dates file dates = np.load(dates_file_path, allow_pickle = True) - # Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference - target_epsg = ndvi_dataset.crs - # Open shapefile with geopandas and reproject its geometry shapefile = gpd.read_file(shapefile_path) shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg) # TODO: decide how to manage small polygons # shapefile['geometry'] = shapefile['geometry'].buffer(buffer_distance) - - # Get no data value - nodata = ndvi_dataset.nodata - - # Loop on the individual polygons in the shapefile geometry - print('') - for index, row in tqdm(shapefile.iterrows(), desc = 'Extracting zonal statistics', total = len(shapefile), unit = ' polygons', dynamic_ncols = True): - # Get the feature geometry as a shapely object - geom = row.geometry - - # id number of the current polygon - id = index + 1 - - # Get land cover - LC = row.LC - - # Crop the raster using the bounding box - try: - masked_raster, _ = mask(ndvi_dataset, [geom], crop = True) - except: - ndvi_stats.extend([[dates[i], id, 0, 0, LC] for i in range(len(dates))]) - continue - - # Mask the raster using the geometry - masked_raster, _ = mask(ndvi_dataset, [geom], crop = True) - - # Replace the nodata values with nan - masked_raster = masked_raster.astype(np.float32) - masked_raster[masked_raster == nodata] = np.nan - - # Correct scaling - np.round(masked_raster * (scaling / (scaling - 1)) - 1, decimals = 0, out = masked_raster) + args = [(smaller_shapefile, ndvi_cube, nodata, scaling, dates) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)] + + # Shared value and lock for controlling access to the value + manager = Manager() + shared_value = manager.Value('i', 0) + lock = manager.Lock() + + # Total iterations + total_iterations = len(shapefile.index) + + # Create and initialize the pool + pool = Pool(processes = max_cpu, initializer = init_worker, initargs = (shared_value, lock)) + + # Progress bar + with tqdm(desc = 'Extracting zonal statistics', total = total_iterations, unit = ' polygons', dynamic_ncols = True) as pbar: - # Calculate the zonal statistics - mean = np.nanmean(masked_raster, axis = (1,2)) - count = np.count_nonzero(~np.isnan(masked_raster), axis = (1,2)) + # Start the worker processes + results = [pool.apply_async(extract_ndvi_multiprocess, args = (arg,)) for arg in args] - # Append current statistics to dataframe - ndvi_stats.extend([[dates[i], id, mean[i], count[i], LC] for i in range(len(dates))]) + while shared_value.value < total_iterations: + with lock: + pbar.update(shared_value.value - pbar.n) + + # Wait for all processes to complete + for result in results: + result.wait() + + # Close the pool + pool.close() + pool.join() - # Close dataset - ndvi_dataset.close() + for result in results: + ndvi_stats.extend(result.get()) # Build DataFrame and put the dates as index ndvi_dataframe = pd.DataFrame(ndvi_stats, columns = ['date', 'id', 'NDVI', 'count', 'LC']) @@ -295,9 +421,9 @@ def process_raw_ndvi(ndvi_dataframe: Union[pd.DataFrame, str], save_path: str, # Get number of CPU cores and limit max value (working on a cluster requires os.sched_getaffinity to get true number of available CPUs, # this is not true on a "personnal" computer, hence the use of the min function) try: - nb_cores = min([max_cpu, cpu_count(logical = False), len(os.sched_getaffinity(0))]) + nb_cores = min([max_cpu * 2, cpu_count(logical = True), len(os.sched_getaffinity(0))]) except: - nb_cores = min([max_cpu, cpu_count(logical = False)]) # os.sched_getaffinity won't work on windows + nb_cores = min([max_cpu * 2, cpu_count(logical = True)]) # os.sched_getaffinity won't work on windows # Prepare clean_dataframes_feature arguments for multiprocessing args = [(id_filter, start_date, end_date, min_pixel_ratio, interp_method, threshold_ratio_deriv, threshold_median, median_window, custom) for _, id_filter in ndvi_dataframe.groupby('id')] diff --git a/preprocessing/lib_era5_land_pixel.py b/preprocessing/lib_era5_land_pixel.py index 89cc726aa2050d61ce2ab63a3ebdf64b96907a02..a3bdd914dd9b05a3b9a2ebaaebb9e544cceb8d90 100644 --- a/preprocessing/lib_era5_land_pixel.py +++ b/preprocessing/lib_era5_land_pixel.py @@ -4,10 +4,10 @@ Functions to call ECMWF Reanalysis with CDS-api - ERA5-land daily request -- request a list of daily variables dedicated to the calculus of ET0 - and the generation of MODSPA daily forcing files - - heavily modified from @rivallandv's original file +- request a list of hourly variables dedicated to the calculus of ET0 +and the generation of MODSPA daily forcing files + +heavily modified from @rivallandv's original file @author: auclairj """ @@ -25,7 +25,7 @@ from rasterio.mask import mask # to mask images from rasterio.enums import Resampling # reprojection algorithms import netCDF4 as nc # to write netcdf4 files from tqdm import tqdm # to follow progress -from multiprocessing import Pool # to parallelize reprojection +from multiprocessing import Pool, Manager # to parallelize functions from psutil import virtual_memory # to check available ram from psutil import cpu_count # to get number of physical cores available from modspa_pixel.config.config import config # to import config file @@ -48,10 +48,10 @@ def era5_enclosing_shp_aera(bbox: list[float], pas: float) -> tuple[float, float Find the four coordinates including the boxbound scene to agree with gridsize resolution system projection: WGS84 lat/lon degree - + Arguments ========= - + 1. area: ``list[float]`` bounding box of the demanded area list of floats: [lat north, lon west, lat south, lon east] in degree WGS84 @@ -839,21 +839,11 @@ def extract_rasterstats(args: tuple) -> list[float]: # Open ndvi image and shapefile geometry raster_dataset = rio.open(raster_path, mode = 'r') - # Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference - target_epsg = raster_dataset.crs - - # Open shapefile with geopandas and reproject its geometry - shapefile = gpd.read_file(shapefile) - shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg) - # Get no data value nodata = raster_dataset.nodata # Get the number of bands nbands = raster_dataset.count - - # Create progress bar - progress_bar = tqdm(total = len(shapefile.index), desc='Extracting zonal statistics', unit=' polygons') # Loop on the individual polygons in the shapefile geometry for index, row in shapefile.iterrows(): @@ -888,16 +878,68 @@ def extract_rasterstats(args: tuple) -> list[float]: raster_stats.extend([[dates[i], id, mean[i], LC] for i in range(nbands)]) # Update progress bar - progress_bar.update(1) + with lock: + shared_value.value += 0.5 - # Close dataset and progress bar raster_dataset.close() - progress_bar.close() return raster_stats -def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, config_file: str, save_path: str) -> None: +def init_worker(shared_value_, lock_) -> None: + """ + Function to initialize the pool workers with shared value and lock. + + Arguments + ========= + + 1. shared_value_: ``float`` + shared progress bar value + 2. lock_: ``Lock`` + lock to access shared value + """ + + global shared_value, lock + shared_value = shared_value_ + lock = lock_ + + +def divide_geodataframe(gdf: gpd.GeoDataFrame, n: int) -> list[gpd.GeoDataFrame]: + """ + Divide geodataframes into n equal parts. + + Arguments + ========= + + 1. gdf: ``gpd.GeoDataFrame`` + input geodataframe + 2. n: ``int`` + number of parts to divide into + + Returns + ======= + + 1. divided_gdfs: ``list[gpd.GeoDataFrame]`` + output geodataframes + """ + + # Calculate the size of each part + part_size = len(gdf) // n + remainder = len(gdf) % n + + # Create a list to store the divided GeoDataFrames + divided_gdfs = [] + + start_idx = 0 + for i in range(n): + end_idx = start_idx + part_size + (1 if i < remainder else 0) + divided_gdfs.append(gdf.iloc[start_idx:end_idx]) + start_idx = end_idx + + return divided_gdfs + + +def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, config_file: str, save_path: str, max_cpu: int = 4) -> None: """ Extract a weather dataframe for each variable (Rain, ET0) and merge them in one dataframe. This dataframe is saved as ``csv`` file. @@ -915,6 +957,8 @@ def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, con path to config file 5. save_path: ``str`` save path for weather dataframe + 6. max_cpu: ``int`` ``default = 4`` + max number of CPU cores to use for calculation Returns ======= @@ -922,18 +966,59 @@ def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, con ``None`` """ - # Generate arguments for multiprocessing - args = [(rain_path, shapefile, config_file), (ET0_path, shapefile, config_file)] + print(f'\nStarting weather data extraction on {max_cpu} cores..\n') - print('\nStarting weather data extraction on two cores..\n') + # Shared value and lock for controlling access to the value + manager = Manager() + shared_value = manager.Value('i', 0) + lock = manager.Lock() + + # Get target epsg + with rio.open(rain_path, mode = 'r') as src: + target_epsg = src.crs + + # Total iterations (assuming each extract_rasterstats call represents one iteration) + shapefile = gpd.read_file(shapefile) + shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg) + total_iterations = len(shapefile.index) + + args1 = [(rain_path, smaller_shapefile, config_file) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)] + args2 = [(ET0_path, smaller_shapefile, config_file) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)] + + # # Generate arguments for multiprocessing + # args = [(rain_path, shapefile, config_file), (ET0_path, shapefile, config_file)] + args = args1 + args2 + + # Create and initialize the pool + pool = Pool(processes = 2 * max_cpu, initializer = init_worker, initargs = (shared_value, lock)) + + # Progress bar + with tqdm(desc = 'Extracting zonal statistics', total = total_iterations, unit = ' polygons', dynamic_ncols = True) as pbar: + # Start the worker processes + results = [pool.apply_async(extract_rasterstats, args=(arg,)) for arg in args] + + while shared_value.value < total_iterations: + with lock: + pbar.update(round(shared_value.value - pbar.n)) + + # Wait for all processes to complete + for result in results: + result.wait() + + # Close the pool + pool.close() + pool.join() - # Extract weather values for both weather varialbes - with Pool(2) as p: - results = p.map(extract_rasterstats, args) + rain_list = [] + ET0_list = [] + for i in range(max_cpu): + rain_list.extend(results[i].get()) + ET0_list.extend(results[max_cpu + i].get()) + del results, result # Collect results in a single dataframe - weather_dataframe = pd.DataFrame(results[0], columns = ['date', 'id', 'Rain', 'LC']) - weather_dataframe['ET0'] = pd.DataFrame(results[1], columns = ['date', 'id', 'ET0', 'LC'])['ET0'] + weather_dataframe = pd.DataFrame(rain_list, columns = ['date', 'id', 'Rain', 'LC']) + weather_dataframe['ET0'] = pd.DataFrame(ET0_list, columns = ['date', 'id', 'ET0', 'LC'])['ET0'] # Reorder columns weather_dataframe = weather_dataframe.reindex(columns = ['date', 'id', 'Rain', 'ET0', 'LC'])