Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
Jeremy Auclair
committed
04-07-2023
@author: jeremy auclair
Calculate NDVI images with xarray
"""
Jeremy Auclair
committed
import csv # open csv files
from fnmatch import fnmatch # for character string comparison
Jeremy Auclair
committed
from typing import List, Union # to declare variables
import xarray as xr # to manage dataset
Jeremy Auclair
committed
import rasterio as rio # to open geotiff files
Jeremy Auclair
committed
import numpy as np # vectorized math
Jeremy Auclair
committed
import geopandas as gpd # to manage shapefile crs projections
Jeremy Auclair
committed
from p_tqdm import p_map # for multiprocessing with progress bars
from psutil import cpu_count # to get number of physical cores available
from modspa_pixel.config.config import config # to import config file
Jeremy Auclair
committed
from modspa_pixel.preprocessing.input_toolbox import product_str_to_datetime, read_product_info, get_band_paths
Jeremy Auclair
committed
def calculate_ndvi(extracted_paths: Union[List[str], str], config_file: str, chunk_size: dict = {'x': 1000, 'y': 1000, 'time': -1}, scaling: int = 255, acorvi_corr: int = 0) -> str:
Calculate ndvi images in a xarray dataset, interpolate is to a daily time step (a data cube) and save it.
Jeremy Auclair
committed
ndvi values are scaled and saved as ``uint8`` (0 to 255).
Jeremy Auclair
committed
.. warning:: only 10 and 20 meters currently supported
Arguments
=========
1. extracted_paths: ``Union[List[str], str]``
list of paths to extracted sentinel-2 products
Jeremy Auclair
committed
or path to ``csv``` file containing those paths
Jeremy Auclair
committed
2. config_file: ``str``
path to configuration file
3. chunk_size: ``dict`` ``default = {'x': 1000, 'y': 1000, 'time': -1}``
dictionnary containing the chunk size for
the xarray dask calculation
4. scaling: ``int`` ``default = 255``
integer scaling to save NDVI data as integer values
acorvi correction parameter to add to the red band
Returns
=======
1. ndvi_precube_path: ``str``
path to save the ndvi pre-cube
"""
Jeremy Auclair
committed
Jeremy Auclair
committed
# Open config_file
config_params = config(config_file)
# Load parameters from config file
boundary_shapefile_path = config_params.shapefile_path
run_name = config_params.run_name
resolution = config_params.resolution
preferred_provider = config_params.preferred_provider
if preferred_provider == 'copernicus':
Jeremy Auclair
committed
save_dir = config_params.download_path + os.sep + 'SCIHUB' + os.sep + 'NDVI'
Jeremy Auclair
committed
elif preferred_provider == 'theia':
Jeremy Auclair
committed
save_dir = config_params.download_path + os.sep + 'THEIA' + os.sep + 'NDVI'
Jeremy Auclair
committed
elif preferred_provider == 'usgs':
save_dir = config_params.download_path + os.sep + 'USGS' + os.sep + 'NDVI'
# Check resolution for Sentinel-2
if not resolution in [10, 20]:
print('Resolution should be equal to 10 or 20 meters for sentinel-2')
return None
Jeremy Auclair
committed
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(extracted_paths) == str:
with open(extracted_paths, 'r') as file:
extracted_paths = []
Jeremy Auclair
committed
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
Jeremy Auclair
committed
# Get list of paths to red, nir and mask images
red_paths, nir_paths, mask_paths = get_band_paths(config_params, extracted_paths)
dates = [product_str_to_datetime(prod) for prod in red_paths]
Jeremy Auclair
committed
Jeremy Auclair
committed
# Get crs
with rio.open(red_paths[0]) as temp:
crs = temp.crs
# Open shapefile to clip the dataset
shapefile = gpd.read_file(boundary_shapefile_path)
shapefile = shapefile.to_crs(crs)
Jeremy Auclair
committed
bounds = shapefile.total_bounds
Jeremy Auclair
committed
Jeremy Auclair
committed
# Open datasets with xarray
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'}).sortby('y', ascending = False)
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'}).sortby('y', ascending = False)
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'}).sortby('y', ascending = False)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Get masking condition and resolution management based on provider, select only data inside the bounds of the input shapefile
Jeremy Auclair
committed
if preferred_provider == 'copernicus':
Jeremy Auclair
committed
red = red.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
Jeremy Auclair
committed
if resolution == 10:
Jeremy Auclair
committed
nir = nir.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
mask = mask.interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
Jeremy Auclair
committed
else:
nir = nir.interp(x = red.coords['x'], y = red.coords['y'], method = 'linear')
Jeremy Auclair
committed
mask = xr.where((mask == 4) | (mask == 5), 1, np.NaN)
Jeremy Auclair
committed
elif preferred_provider == 'theia':
if resolution == 10:
Jeremy Auclair
committed
red = red.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
nir = nir.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
mask = mask.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
elif resolution == 20:
mask = mask.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
Jeremy Auclair
committed
red = red.interp(x = mask.coords['x'], y = mask.coords['y'], method = 'linear')
nir = nir.interp(x = mask.coords['x'], y = mask.coords['y'], method = 'linear')
Jeremy Auclair
committed
red = xr.where(red == -10000, np.nan, red)
nir = xr.where(nir == -10000, np.nan, nir)
mask = xr.where((mask == 0), 1, np.NaN)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Rescale optical data for LandSat
Jeremy Auclair
committed
elif preferred_provider == 'usgs':
Jeremy Auclair
committed
red = red.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1])) * 2.75e-5 - 0.2
nir = nir.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1])) * 2.75e-5 - 0.2
mask = mask.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
Jeremy Auclair
committed
mask = xr.where((mask == 21824), 1, np.NaN)
# Set time coordinate
red['time'] = pd.to_datetime(dates)
nir['time'] = pd.to_datetime(dates)
mask['time'] = pd.to_datetime(dates)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Sort by date
red = red.sortby(variables = 'time')
nir = nir.sortby(variables = 'time')
mask = mask.sortby(variables = 'time')
dates.sort()
Jeremy Auclair
committed
# Save single red band as reference tif for later use in reprojection algorithms
Jeremy Auclair
committed
ref = xr.open_dataset(red_paths[0]).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'}).sortby('y', ascending = False).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1])).rio.write_crs(crs)
ref.rio.to_raster(save_dir + os.sep + run_name + os.sep + run_name + '_grid_reference.tif')
# Create save path
ndvi_cube_path = save_dir + os.sep + run_name + os.sep + run_name + '_NDVI_cube_' + config_params.start_date + '_' + config_params.end_date + '.nc'
# Check if file exists and ndvi overwrite is false
if os.path.exists(ndvi_cube_path) and not config_params.ndvi_overwrite:
return ndvi_cube_path
# Create ndvi dataset and calculate ndvi
ndvi = red
ndvi = ndvi.drop('red')
Jeremy Auclair
committed
ndvi['NDVI'] = (((nir.nir - red.red - acorvi_corr)/(nir.nir + red.red + acorvi_corr)) * mask.mask)
Jeremy Auclair
committed
# Mask NDVI
ndvi['NDVI'] = xr.where(ndvi.NDVI > 1, np.NaN, ndvi.NDVI)
Jeremy Auclair
committed
ndvi['NDVI'] = xr.where(ndvi.NDVI < 0, 0, ndvi.NDVI)
# Interpolates on a daily frequency
daily_index = pd.date_range(start = dates[0], end = dates[-1], freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index).chunk(chunks = {'x': 250, 'y': 250, 'time': -1})
dates = pd.to_datetime(ndvi.time.values)
# Interpolate the dataset along the time dimension to fill nan values
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate')
# Remove extra dates, only keep selected window
ndvi = ndvi.sel({'time': slice(config_params.start_date, config_params.end_date)})

Jérémy AUCLAIR
committed
# TODO: understand why dimensions are not in the right order anymore
# Reorder dimensions
ndvi = ndvi[['time', 'y', 'x', 'NDVI']]
# Scale ndvi
ndvi['NDVI'] = (ndvi.NDVI * scaling).round()
Jeremy Auclair
committed
# Write attributes
Jeremy Auclair
committed
ndvi['NDVI'].attrs['units'] = 'None'
ndvi['NDVI'].attrs['standard_name'] = 'NDVI'
ndvi['NDVI'].attrs['description'] = 'Normalized difference Vegetation Index (of the near infrared and red band). A value of one is a high vegetation presence.'
ndvi['NDVI'].attrs['scale factor'] = str(scaling)
Jeremy Auclair
committed
# Save NDVI cube to netcdf
if ndvi.dims['y'] > 1500 and ndvi.dims['x'] > 1500:
file_chunksize = (1, 1000, 1000)
else:
file_chunksize = (1, ndvi.dims['y'], ndvi.dims['x'])
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": file_chunksize}})
Jeremy Auclair
committed
ndvi.close()
return ndvi_cube_path
Jeremy Auclair
committed
Jeremy Auclair
committed
def interpolate_ndvi(ndvi_path: str, config_file: str, chunk_size: dict = {'x': 250, 'y': 250, 'time': -1}, scaling: int = 255,) -> str:
Interpolate the ndvi cube to a daily frequency between the
desired dates defined in the ``json`` config file. The extra
month of data downloaded is used for a better interpolation,
it is then discarded and the final NDVI cube has the dates
defined in the config file.
Arguments
=========
Jeremy Auclair
committed
1. ndvi_path: ``str``
path to ndvi pre-cube
Jeremy Auclair
committed
2. config_file: ``str``
path to ``json`` config file
Jeremy Auclair
committed
3. chunk_size: ``dict`` ``default = {'x': 250, 'y': 250, 'time': -1}``
chunk size to use by dask for calculation,
``'time' = -1`` means the chunk has the whole
time dimension in it. The Dataset can't be
divided along the time axis for interpolation.
Jeremy Auclair
committed
4. scaling: ``int`` ``default = 255``
integer scaling to save NDVI data as integer values
Returns
=======
``None``
Jeremy Auclair
committed
# Open config_file
config_params = config(config_file)
Jeremy Auclair
committed
# Load parameters from config file
run_name = config_params.run_name
if config_params.preferred_provider == 'copernicus':
save_dir = config_params.download_path + os.sep + 'SCIHUB' + os.sep + 'NDVI'
Jeremy Auclair
committed
elif config_params.preferred_provider == 'theia':
Jeremy Auclair
committed
save_dir = config_params.download_path + os.sep + 'THEIA' + os.sep + 'NDVI'
Jeremy Auclair
committed
elif config_params.preferred_provider == 'usgs':
save_dir = config_params.download_path + os.sep + 'USGS' + os.sep + 'NDVI'
Jeremy Auclair
committed
# Create save path
ndvi_cube_path = save_dir + os.sep + run_name + os.sep + run_name + '_NDVI_cube_' + config_params.start_date + '_' + config_params.end_date + '.nc'
# Check if file exists and ndvi overwrite is false
if os.path.exists(ndvi_cube_path) and not config_params.ndvi_overwrite:
return ndvi_cube_path
Jeremy Auclair
committed
# Open NDVI pre-cube
ndvi = xr.open_dataset(ndvi_path, chunks = chunk_size)
# Get dates of NDVI precube
dates = ndvi.time.values
Jeremy Auclair
committed
# Get Spatial reference
spatial_ref = ndvi.spatial_ref.load()
daily_index = pd.date_range(start = dates[0], end = dates[-1], freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index)
Jeremy Auclair
committed
dates = pd.to_datetime(ndvi.time.values)
# Interpolate the dataset along the time dimension to fill nan values
Jeremy Auclair
committed
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').round(decimals = 0)
# Remove extra dates, only keep selected window
ndvi = ndvi.sel({'time': slice(config_params.start_date, config_params.end_date)})
Jeremy Auclair
committed
# Rescale data
Jeremy Auclair
committed
ndvi = (ndvi * (scaling / (scaling - 1)) - 1).round()
Jeremy Auclair
committed
# Set negative values as 0
Jeremy Auclair
committed
ndvi = xr.where(ndvi < 0, 0, ndvi.NDVI)
# Set values above 255 (ndvi > 1) as 255 (ndvi = 1)
Jeremy Auclair
committed
ndvi = xr.where(ndvi > scaling, scaling, ndvi.NDVI)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Rewrite spatial reference
ndvi['spatial_ref'] = spatial_ref
Jeremy Auclair
committed
# Save NDVI cube to netcdf
Jeremy Auclair
committed
if ndvi.dims['y'] > 1500 and ndvi.dims['x'] > 1500:
file_chunksize = (1, 1000, 1000)
else:
file_chunksize = (1, ndvi.dims['y'], ndvi.dims['x'])
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": file_chunksize}})
Jeremy Auclair
committed
Jeremy Auclair
committed
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
return ndvi_cube_path
def write_geotiff(path: str, data: np.ndarray, transform: tuple, projection: str, scaling: int = 255) -> None:
"""
Writes a GeoTiff image using ``rasterio``. Takes an array and georeferencing data (obtained by ``rasterio``
on an image with same georeferencing) to write a well formatted image. Data format: ``uint8``
Arguments
=========
1. path: ``str``
true path of file to save data in (path = path + save name)
2. data: ``np.ndarray``
numpy array containging the data
3. geotransform: ``tuple``
tuple containing the geo-transform data
4. projection: ``str``
string containing the epsg projection data
5. scaling: ``int`` ``default = 255``
scaling factor for saving NDVI images as integer arrays
Returns
=======
``None``
"""
# Apply scaling
Jeremy Auclair
committed
data = np.round((data + np.float32(1/scaling)) * (scaling - 1), 1)
Jeremy Auclair
committed
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
# Write NDVI image with rasterio
with rio.open(path, mode = "w", driver = "GTiff", height = data.shape[0], width = data.shape[1], count = 1, dtype = np.uint8, crs = projection, transform = transform, nodata = 0) as new_dataset:
new_dataset.write(data, 1)
return None
def calculate_ndvi_image(args: tuple) -> str:
"""
Opens zip file of the given product to extract the red, near-infrared and mask bands (without extracting
the whole archive) and calculate the ndvi, apply a mask to keep only clear land data (no clouds, no snow,
no water, no unclassified or erroneous pixel). This array is then saved in a GeoTiff image with
the parent name file and ``'_NDVI.tif'`` extension. If overwrite is false, already existing files won't be
calculated and saved again (default value is ``false``).
This function is called in the calculate_ndvi function as a subprocess to allow multiprocessing. Variables
that have served their purpose are directly deleted to reduce memory usage.
Arguments (packed in args: ``tuple``)
=====================================
1. product_path: ``str``
path to the product to extract for ndvi calculation
2. save_dir: ``str``
directory in which to save ndvi images
Jeremy Auclair
committed
3. shapefile_path: ``str``
path to the shapefile (``.shp``) for which the data is calculated. Used to clip
satellite imagery
4. overwrite: ``bool``
Jeremy Auclair
committed
boolean to choose to rewrite already existing files
4. ACORVI_correction: ``int``
correction parameter to apply on the red band for stability of NDVI values
Returns
=======
1. ndvi_path: ``str``
path to the saved ndvi image
"""
# Unpack arguments
Jeremy Auclair
committed
product_path, save_dir, shapefile_path, overwrite, ACORVI_correction = args
Jeremy Auclair
committed
# Check provider
_, _, provider, _ = read_product_info(product_path)
# To ignore numpy warning when dividing by NaN
np.seterr(invalid='ignore', divide='ignore')
Jeremy Auclair
committed
# Define offset for copernicus data
offset = 0
Jeremy Auclair
committed
Jeremy Auclair
committed
# Create file name
file_name = os.path.basename(product_path)
# file_name = os.path.splitext(file_name)[0]
save_name = save_dir + os.sep + file_name + '_NDVI.tif'
Jeremy Auclair
committed
Jeremy Auclair
committed
# Check if file is already written. If override is false, no need to calculate and write ndvi again.
# If override is true: ndvi is calculated and saved again.
if not os.path.exists(save_name) or overwrite: # File does not exist or overwrite is true
pass
else:
# Collect save path for return
return save_name
Jeremy Auclair
committed
Jeremy Auclair
committed
# Look for desired images in the directories
if provider == 'copernicus':
Jeremy Auclair
committed
for f in os.listdir(product_path):
if fnmatch(f, '*_B04_10m.jp2'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_B08_10m.jp2'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_SCL_20m.jp2'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
Jeremy Auclair
committed
Jeremy Auclair
committed
elif provider == 'theia':
for f in os.listdir(product_path):
if fnmatch(f, '*_FRE_B4.tif'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_FRE_B8.tif'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_MG2_R1.tif'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
Jeremy Auclair
committed
elif provider == 'usgs':
for f in os.listdir(product_path):
if fnmatch(f, '*_SR_B4.TIF'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_SR_B5.TIF'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_QA_PIXEL.TIF'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Read bands, geometry and projection information
red_band = xr.open_dataarray(red_file) # read red band
projection = red_band.rio.crs
del red_file
# Open shapefile to clip the dataset
shapefile = gpd.read_file(shapefile_path)
shapefile = shapefile.to_crs(projection)
bounds = shapefile.total_bounds
Jeremy Auclair
committed
del shapefile
Jeremy Auclair
committed
red_band = red_band.sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
transorm = red_band.rio.transform(recalc = True)
nir_band = xr.open_dataarray(nir_file).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1])) # read nir band
del nir_file
# Read array data
red_data = red_band.values + ACORVI_correction
nir_data = nir_band.values
nir_band.close()
# Adjust data for specific providers
if provider == 'theia':
Jeremy Auclair
committed
red_data = np.where((red_data == -10000), np.float32(np.NaN), red_data)
nir_data = np.where((nir_data == -10000), np.float32(np.NaN), nir_data)
Jeremy Auclair
committed
elif provider == 'usgs':
red_data = red_data * 2.75e-5 - 0.2
Jeremy Auclair
committed
nir_data = nir_data * 2.75e-5 - 0.2
Jeremy Auclair
committed
Jeremy Auclair
committed
# Calculate ndvi
ndvi_data = np.divide((nir_data - red_data), (nir_data + red_data + 2*offset), dtype = np.float32)
del red_data, nir_data
Jeremy Auclair
committed
Jeremy Auclair
committed
# Read classif data
classif_band = xr.open_dataarray(classif_file).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1])) # read classif band
del classif_file
Jeremy Auclair
committed
Jeremy Auclair
committed
# Adjust mask based on provider
if provider == 'copernicus':
classif_band = xr.where((classif_band == 4) | (classif_band == 5), 1, np.NaN).interp(x = red_band.coords['x'], y = red_band.coords['y'], method = 'nearest')
elif provider == 'theia':
classif_band = xr.where((classif_band == 0), 1, np.NaN)
elif provider == 'usgs':
classif_band = xr.where((classif_band == 21824), 1, np.NaN)
# Read array data
binary_mask = classif_band.values
classif_band.close()
red_band.close()
Jeremy Auclair
committed
Jeremy Auclair
committed
# Apply mask
np.multiply(ndvi_data, binary_mask, out=ndvi_data, dtype = np.float32)
del binary_mask
Jeremy Auclair
committed
Jeremy Auclair
committed
# Clip out of range data
np.minimum(ndvi_data, 1, out = ndvi_data, dtype = np.float32)
np.maximum(ndvi_data, 0, out = ndvi_data, dtype = np.float32)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Write image
write_geotiff(save_name, ndvi_data[0], transorm, projection)
del ndvi_data, transorm, projection
Jeremy Auclair
committed
return save_name
Jeremy Auclair
committed
def calculate_ndvi_parcel(download_path: Union[List[str], str], save_dir: str, save_path: str, shapefile_path: str, overwrite: bool = False, max_cpu: int = 4, ACORVI_correction: int = 0) -> List[str]:
Jeremy Auclair
committed
"""
Opens the red, near infrared and scene classification to calculate the ndvi, apply a mask to keep only clear land data (no clouds, no snow,
no water, no unclassified or erroneous pixel). This array is then saved in a GeoTiff image with
the parrent name file and ``'_NDVI.tif'`` extension. If overwrite is false, already existing fils won't be
calculated and saved again (default value is ``false``).
This function calls the calculate_ndvi_image function as a subprocess to allow multiprocessing. A modified
version of the ``tqdm`` module (``p_tqdm``) is used for progress bars.
Arguments
=========
1. download_path: ``list[str]`` or ``str``
list or paths to the products to extract for ndvi calculation or path to ``csv`` file that contains this list
2. save_dir: ``str``
directory in which to save ndvi images
3. save_path : ``str``
path to a csv file containing the paths to the saved ndvi images
Jeremy Auclair
committed
4. shapefile_path: ``str``
path to the shapefile (``.shp``) for which the data is calculated. Used to clip
satellite imagery
5. overwrite: ``bool`` ``default = False``
Jeremy Auclair
committed
boolean to choose to rewrite already existing files
5. max_cpu: ``int`` `default = 4`
max number of CPUs that the pool can use, if max_cpu is bigger than available CPUs, the pool only uses availables CPUs
6. ACORVI_correction: ``int`` ``default = 500``
correction parameter to apply on the red band for stability of NDVI values
Returns
=======
1. ndvi_path: ``list[str]``
list of paths to the saved ndvi images
"""
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(download_path) == str:
with open(download_path, 'r') as file:
download_path = []
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
download_path.append(row[0])
ndvi_path = [] # Where saved image paths will be stored
# Prepare arguments for multiprocessing
Jeremy Auclair
committed
args = [(product, save_dir, shapefile_path, overwrite, ACORVI_correction) for product in download_path]
Jeremy Auclair
committed
# Get number of CPU cores and limit max value (working on a cluster requires os.sched_getaffinity to get true number of available CPUs,
# this is not true on a "personnal" computer, hence the use of the min function)
try:
nb_cores = min([max_cpu, cpu_count(logical = False), len(os.sched_getaffinity(0))])
except:
nb_cores = min([max_cpu, cpu_count(logical = False)]) # os.sched_getaffinity won't work on windows
Jeremy Auclair
committed
print('\nStarting NDVI calculations with %d cores for %d images...\n' %(nb_cores, len(download_path)))
# Start pool and get results
results = p_map(calculate_ndvi_image, args, **{"num_cpus": nb_cores})
# Collect results and sort them
for result in results:
ndvi_path.append(result)
ndvi_path.sort()
# Save ndvi paths as a csv file
with open(save_path, 'w', newline='') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for ndvi_image in ndvi_path:
write.writerow([ndvi_image])
return ndvi_path