Skip to content
Snippets Groups Projects
Commit 5f2bdc5c authored by Jérémy AUCLAIR's avatar Jérémy AUCLAIR
Browse files

Added multiprocessing to polygon extraction in parcel mode. Added one test to samir params.

parent a573a43e
No related branches found
No related tags found
No related merge requests found
......@@ -116,7 +116,7 @@ if __name__ == '__main__':
# Extract NDVI values on shapefile features
csv_ndvi_extract_file = os.path.join(ndvi_path, run_name, 'ndvi_extract.csv')
raw_ndvi = extract_ndvi_stats(ndvi_precube, shapefile_path, ndvi_dates, csv_ndvi_extract_file)
raw_ndvi = extract_ndvi_stats(ndvi_precube, shapefile_path, ndvi_dates, csv_ndvi_extract_file, max_cpu = config_params.max_cpu * 2)
# Process and interpolate NDVI values to a daily time step
csv_ndvi_interp_file = os.path.join(ndvi_path, run_name, 'ndvi_interp.csv')
......
......@@ -161,6 +161,20 @@ class samir_parameters:
# Equation: Koffset = - NDVIsol * Kslope
self.table.at['Koffset', class_name] = - np.round(self.table.at['NDVIsol', class_name] * self.table.at['Kslope', class_name], decimals = round(np.log10(self.table.at['Koffset', 'scale_factor'])))
if self.table.at['maxZr', class_name] > self.table.at['Zsoil', class_name]:
print(f'\nmaxZr is higher than Zsoil for class {class_name}')
# Set boolean to true to exit script
out_of_range_param = True
if self.table.at['minZr', class_name] > self.table.at['maxZr', class_name]:
print(f'\minZr is higher than maxZr for class {class_name}')
# Set boolean to true to exit script
out_of_range_param = True
# Test values of the parameters
min_max_table = read_csv(min_max_param_file, index_col = 0)
......
......@@ -175,7 +175,7 @@ def request_ER5_weather(config_file: str, ndvi_path: str, raw_S2_image_ref: str
weather_daily_rain, weather_daily_ET0 = era5land.era5Land_daily_to_yearly_parcel(weather_files, variables, weather_daily_ncFile, start_date, end_date, h = wind_height)
# Generate and save weather dataframe
era5land.extract_weather_dataframe(weather_daily_rain, weather_daily_ET0, shapefile, config_file, weather_datframe)
era5land.extract_weather_dataframe(weather_daily_rain, weather_daily_ET0, shapefile, config_file, weather_datframe, max_cpu = config_params.max_cpu)
# Convert dataframe to xarray dataset
convert_dataframe_to_xarray(weather_datframe, weather_dataset, variables = ['Rain', 'ET0'], data_types = ['u2', 'u2'])
......
......@@ -15,15 +15,146 @@ import rasterio as rio # to open geotiff files
from datetime import datetime # manage dates
from rasterio.mask import mask # to mask images
import numpy as np # vectorized math
from shapely.geometry import box # to extract parcel statistics
from tqdm import tqdm # for classic progress bars
from multiprocessing import Pool, Manager # to parallelize functions
from p_tqdm import p_map # for multiprocessing with progress bars
from psutil import cpu_count # to get number of physical cores available
import warnings # To suppress some warnings
from modspa_pixel.preprocessing.input_toolbox import find_anomalies
def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str, save_raw_dataframe_path: str, scaling: int = 255, buffer_distance: int = -10) -> list:
def init_worker(shared_value_, lock_) -> None:
"""
Function to initialize the pool workers with shared value and lock.
Arguments
=========
1. shared_value_: ``float``
shared progress bar value
2. lock_: ``Lock``
lock to access shared value
"""
global shared_value, lock
shared_value = shared_value_
lock = lock_
def divide_geodataframe(gdf: gpd.GeoDataFrame, n: int) -> list[gpd.GeoDataFrame]:
"""
Divide geodataframes into n equal parts.
Arguments
=========
1. gdf: ``gpd.GeoDataFrame``
input geodataframe
2. n: ``int``
number of parts to divide into
Returns
=======
1. divided_gdfs: ``list[gpd.GeoDataFrame]``
output geodataframes
"""
# Calculate the size of each part
part_size = len(gdf) // n
remainder = len(gdf) % n
# Create a list to store the divided GeoDataFrames
divided_gdfs = []
start_idx = 0
for i in range(n):
end_idx = start_idx + part_size + (1 if i < remainder else 0)
divided_gdfs.append(gdf.iloc[start_idx:end_idx])
start_idx = end_idx
return divided_gdfs
def extract_ndvi_multiprocess(args: tuple) -> list:
"""
Wrapped extract ndvi function for multiprocessing.
Arguments (packed in args)
==========================
1. shapefile: ``gpd.GeoDataFrame``
geodataframe containing polygons for extraction
2. ndvi_cube: ``str``
path to ndvi cube geotiff
3. nodata: ``float``
ndvi no data value
4. scaling: ``int``
ndvi scaling value
5. dates: ``list[datetime.date]``
list of ndvi datetimes
Returns
=======
1. ndvi_stats: ``list``
extracted ndvi stats
"""
# Extract args
shapefile, ndvi_cube, nodata, scaling, dates = args
# Open dataset
ndvi_dataset = rio.open(ndvi_cube, mode = 'r')
ndvi_stats = []
for index, row in shapefile.iterrows():
# Get the feature geometry as a shapely object
geom = row.geometry
# id number of the current polygon
id = index + 1
# Get land cover
LC = row.LC
# Crop the raster using the bounding box
try:
masked_raster, _ = mask(ndvi_dataset, [geom], crop = True)
except:
ndvi_stats.extend([[dates[i], id, 0, 0, LC] for i in range(len(dates))])
continue
# Mask the raster using the geometry
masked_raster, _ = mask(ndvi_dataset, [geom], crop = True)
# Replace the nodata values with nan
masked_raster = masked_raster.astype(np.float32)
masked_raster[masked_raster == nodata] = np.nan
# Correct scaling
np.round(masked_raster * (scaling / (scaling - 1)) - 1, decimals = 0, out = masked_raster)
# Calculate the zonal statistics
mean = np.nanmean(masked_raster, axis = (1,2))
count = np.count_nonzero(~np.isnan(masked_raster), axis = (1,2))
# Append current statistics to dataframe
ndvi_stats.extend([[dates[i], id, mean[i], count[i], LC] for i in range(len(dates))])
# Update progress bar
with lock:
shared_value.value += 1
# Close dataset
ndvi_dataset.close()
return ndvi_stats
def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str, save_raw_dataframe_path: str, scaling: int = 255, buffer_distance: int = -10, max_cpu: int = 4) -> list:
"""
extract_ndvi_stats extracts ndvi statistics (``mean`` and ``count``) from a ndvi image and a
geopandas shapefile object. It iterates over the features of the shapefile geometry (polygons).
......@@ -49,6 +180,8 @@ def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str
6. buffer_distance: ``int`` ``default = -10``
distance to buffer shapefile polygon to prevent extracting pixels that are not entirely in a polygon,
in meters for Sentinel-2 and LandSat 8 images, < 0 to reduce size
7. max_cpu: ``int`` ``default = 4``
max number of CPU cores to use for calculation
Returns
......@@ -66,62 +199,55 @@ def extract_ndvi_stats(ndvi_cube: str, shapefile_path: str, dates_file_path: str
ndvi_stats = []
# Open ndvi image and shapefile geometry
ndvi_dataset = rio.open(ndvi_cube, mode = 'r')
with rio.open(ndvi_cube, mode = 'r') as ndvi_dataset:
# Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference
target_epsg = ndvi_dataset.crs
# Get no data value
nodata = ndvi_dataset.nodata
# Open dates file
dates = np.load(dates_file_path, allow_pickle = True)
# Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference
target_epsg = ndvi_dataset.crs
# Open shapefile with geopandas and reproject its geometry
shapefile = gpd.read_file(shapefile_path)
shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg)
# TODO: decide how to manage small polygons
# shapefile['geometry'] = shapefile['geometry'].buffer(buffer_distance)
# Get no data value
nodata = ndvi_dataset.nodata
# Loop on the individual polygons in the shapefile geometry
print('')
for index, row in tqdm(shapefile.iterrows(), desc = 'Extracting zonal statistics', total = len(shapefile), unit = ' polygons', dynamic_ncols = True):
# Get the feature geometry as a shapely object
geom = row.geometry
# id number of the current polygon
id = index + 1
# Get land cover
LC = row.LC
# Crop the raster using the bounding box
try:
masked_raster, _ = mask(ndvi_dataset, [geom], crop = True)
except:
ndvi_stats.extend([[dates[i], id, 0, 0, LC] for i in range(len(dates))])
continue
# Mask the raster using the geometry
masked_raster, _ = mask(ndvi_dataset, [geom], crop = True)
# Replace the nodata values with nan
masked_raster = masked_raster.astype(np.float32)
masked_raster[masked_raster == nodata] = np.nan
# Correct scaling
np.round(masked_raster * (scaling / (scaling - 1)) - 1, decimals = 0, out = masked_raster)
args = [(smaller_shapefile, ndvi_cube, nodata, scaling, dates) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)]
# Shared value and lock for controlling access to the value
manager = Manager()
shared_value = manager.Value('i', 0)
lock = manager.Lock()
# Total iterations
total_iterations = len(shapefile.index)
# Create and initialize the pool
pool = Pool(processes = max_cpu, initializer = init_worker, initargs = (shared_value, lock))
# Progress bar
with tqdm(desc = 'Extracting zonal statistics', total = total_iterations, unit = ' polygons', dynamic_ncols = True) as pbar:
# Calculate the zonal statistics
mean = np.nanmean(masked_raster, axis = (1,2))
count = np.count_nonzero(~np.isnan(masked_raster), axis = (1,2))
# Start the worker processes
results = [pool.apply_async(extract_ndvi_multiprocess, args = (arg,)) for arg in args]
# Append current statistics to dataframe
ndvi_stats.extend([[dates[i], id, mean[i], count[i], LC] for i in range(len(dates))])
while shared_value.value < total_iterations:
with lock:
pbar.update(shared_value.value - pbar.n)
# Wait for all processes to complete
for result in results:
result.wait()
# Close the pool
pool.close()
pool.join()
# Close dataset
ndvi_dataset.close()
for result in results:
ndvi_stats.extend(result.get())
# Build DataFrame and put the dates as index
ndvi_dataframe = pd.DataFrame(ndvi_stats, columns = ['date', 'id', 'NDVI', 'count', 'LC'])
......@@ -295,9 +421,9 @@ def process_raw_ndvi(ndvi_dataframe: Union[pd.DataFrame, str], save_path: str,
# Get number of CPU cores and limit max value (working on a cluster requires os.sched_getaffinity to get true number of available CPUs,
# this is not true on a "personnal" computer, hence the use of the min function)
try:
nb_cores = min([max_cpu, cpu_count(logical = False), len(os.sched_getaffinity(0))])
nb_cores = min([max_cpu * 2, cpu_count(logical = True), len(os.sched_getaffinity(0))])
except:
nb_cores = min([max_cpu, cpu_count(logical = False)]) # os.sched_getaffinity won't work on windows
nb_cores = min([max_cpu * 2, cpu_count(logical = True)]) # os.sched_getaffinity won't work on windows
# Prepare clean_dataframes_feature arguments for multiprocessing
args = [(id_filter, start_date, end_date, min_pixel_ratio, interp_method, threshold_ratio_deriv, threshold_median, median_window, custom) for _, id_filter in ndvi_dataframe.groupby('id')]
......
......@@ -4,10 +4,10 @@
Functions to call ECMWF Reanalysis with CDS-api
- ERA5-land daily request
- request a list of daily variables dedicated to the calculus of ET0
and the generation of MODSPA daily forcing files
heavily modified from @rivallandv's original file
- request a list of hourly variables dedicated to the calculus of ET0
and the generation of MODSPA daily forcing files
heavily modified from @rivallandv's original file
@author: auclairj
"""
......@@ -25,7 +25,7 @@ from rasterio.mask import mask # to mask images
from rasterio.enums import Resampling # reprojection algorithms
import netCDF4 as nc # to write netcdf4 files
from tqdm import tqdm # to follow progress
from multiprocessing import Pool # to parallelize reprojection
from multiprocessing import Pool, Manager # to parallelize functions
from psutil import virtual_memory # to check available ram
from psutil import cpu_count # to get number of physical cores available
from modspa_pixel.config.config import config # to import config file
......@@ -48,10 +48,10 @@ def era5_enclosing_shp_aera(bbox: list[float], pas: float) -> tuple[float, float
Find the four coordinates including the boxbound scene
to agree with gridsize resolution
system projection: WGS84 lat/lon degree
Arguments
=========
1. area: ``list[float]``
bounding box of the demanded area
list of floats: [lat north, lon west, lat south, lon east] in degree WGS84
......@@ -839,21 +839,11 @@ def extract_rasterstats(args: tuple) -> list[float]:
# Open ndvi image and shapefile geometry
raster_dataset = rio.open(raster_path, mode = 'r')
# Get input raster spatial reference and epsg code to reproject shapefile in the same spatial reference
target_epsg = raster_dataset.crs
# Open shapefile with geopandas and reproject its geometry
shapefile = gpd.read_file(shapefile)
shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg)
# Get no data value
nodata = raster_dataset.nodata
# Get the number of bands
nbands = raster_dataset.count
# Create progress bar
progress_bar = tqdm(total = len(shapefile.index), desc='Extracting zonal statistics', unit=' polygons')
# Loop on the individual polygons in the shapefile geometry
for index, row in shapefile.iterrows():
......@@ -888,16 +878,68 @@ def extract_rasterstats(args: tuple) -> list[float]:
raster_stats.extend([[dates[i], id, mean[i], LC] for i in range(nbands)])
# Update progress bar
progress_bar.update(1)
with lock:
shared_value.value += 0.5
# Close dataset and progress bar
raster_dataset.close()
progress_bar.close()
return raster_stats
def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, config_file: str, save_path: str) -> None:
def init_worker(shared_value_, lock_) -> None:
"""
Function to initialize the pool workers with shared value and lock.
Arguments
=========
1. shared_value_: ``float``
shared progress bar value
2. lock_: ``Lock``
lock to access shared value
"""
global shared_value, lock
shared_value = shared_value_
lock = lock_
def divide_geodataframe(gdf: gpd.GeoDataFrame, n: int) -> list[gpd.GeoDataFrame]:
"""
Divide geodataframes into n equal parts.
Arguments
=========
1. gdf: ``gpd.GeoDataFrame``
input geodataframe
2. n: ``int``
number of parts to divide into
Returns
=======
1. divided_gdfs: ``list[gpd.GeoDataFrame]``
output geodataframes
"""
# Calculate the size of each part
part_size = len(gdf) // n
remainder = len(gdf) % n
# Create a list to store the divided GeoDataFrames
divided_gdfs = []
start_idx = 0
for i in range(n):
end_idx = start_idx + part_size + (1 if i < remainder else 0)
divided_gdfs.append(gdf.iloc[start_idx:end_idx])
start_idx = end_idx
return divided_gdfs
def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, config_file: str, save_path: str, max_cpu: int = 4) -> None:
"""
Extract a weather dataframe for each variable (Rain, ET0) and merge them in one
dataframe. This dataframe is saved as ``csv`` file.
......@@ -915,6 +957,8 @@ def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, con
path to config file
5. save_path: ``str``
save path for weather dataframe
6. max_cpu: ``int`` ``default = 4``
max number of CPU cores to use for calculation
Returns
=======
......@@ -922,18 +966,59 @@ def extract_weather_dataframe(rain_path: str, ET0_path: str, shapefile: str, con
``None``
"""
# Generate arguments for multiprocessing
args = [(rain_path, shapefile, config_file), (ET0_path, shapefile, config_file)]
print(f'\nStarting weather data extraction on {max_cpu} cores..\n')
print('\nStarting weather data extraction on two cores..\n')
# Shared value and lock for controlling access to the value
manager = Manager()
shared_value = manager.Value('i', 0)
lock = manager.Lock()
# Get target epsg
with rio.open(rain_path, mode = 'r') as src:
target_epsg = src.crs
# Total iterations (assuming each extract_rasterstats call represents one iteration)
shapefile = gpd.read_file(shapefile)
shapefile['geometry'] = shapefile['geometry'].to_crs(target_epsg)
total_iterations = len(shapefile.index)
args1 = [(rain_path, smaller_shapefile, config_file) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)]
args2 = [(ET0_path, smaller_shapefile, config_file) for smaller_shapefile in divide_geodataframe(shapefile, max_cpu)]
# # Generate arguments for multiprocessing
# args = [(rain_path, shapefile, config_file), (ET0_path, shapefile, config_file)]
args = args1 + args2
# Create and initialize the pool
pool = Pool(processes = 2 * max_cpu, initializer = init_worker, initargs = (shared_value, lock))
# Progress bar
with tqdm(desc = 'Extracting zonal statistics', total = total_iterations, unit = ' polygons', dynamic_ncols = True) as pbar:
# Start the worker processes
results = [pool.apply_async(extract_rasterstats, args=(arg,)) for arg in args]
while shared_value.value < total_iterations:
with lock:
pbar.update(round(shared_value.value - pbar.n))
# Wait for all processes to complete
for result in results:
result.wait()
# Close the pool
pool.close()
pool.join()
# Extract weather values for both weather varialbes
with Pool(2) as p:
results = p.map(extract_rasterstats, args)
rain_list = []
ET0_list = []
for i in range(max_cpu):
rain_list.extend(results[i].get())
ET0_list.extend(results[max_cpu + i].get())
del results, result
# Collect results in a single dataframe
weather_dataframe = pd.DataFrame(results[0], columns = ['date', 'id', 'Rain', 'LC'])
weather_dataframe['ET0'] = pd.DataFrame(results[1], columns = ['date', 'id', 'ET0', 'LC'])['ET0']
weather_dataframe = pd.DataFrame(rain_list, columns = ['date', 'id', 'Rain', 'LC'])
weather_dataframe['ET0'] = pd.DataFrame(ET0_list, columns = ['date', 'id', 'ET0', 'LC'])['ET0']
# Reorder columns
weather_dataframe = weather_dataframe.reindex(columns = ['date', 'id', 'Rain', 'ET0', 'LC'])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment