Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
Jeremy Auclair
committed
04-07-2023
@author: jeremy auclair
Calculate NDVI images with xarray
"""
Jeremy Auclair
committed
import csv # open csv files
from fnmatch import fnmatch # for character string comparison
Jeremy Auclair
committed
from typing import List, Union # to declare variables
import xarray as xr # to manage dataset
Jeremy Auclair
committed
import rasterio as rio # to open geotiff files
Jeremy Auclair
committed
import numpy as np # vectorized math
Jeremy Auclair
committed
import geopandas as gpd # to manage shapefile crs projections
Jeremy Auclair
committed
import datetime # for date management
from scipy.ndimage import zoom # to rescale different resolution images
Jeremy Auclair
committed
from shapely.geometry import box # to create boundary box
Jeremy Auclair
committed
from p_tqdm import p_map # for multiprocessing with progress bars
from psutil import cpu_count # to get number of physical cores available
from modspa_pixel.config.config import config # to import config file
Jeremy Auclair
committed
from modspa_pixel.preprocessing.input_toolbox import product_str_to_datetime, read_product_info
Jeremy Auclair
committed
def calculate_ndvi(extracted_paths: Union[List[str], str], config_file: str, chunk_size: dict = {'x': 1000, 'y': 1000, 'time': -1}, scaling: int = 255, acorvi_corr: int = 0) -> str:
Calculate ndvi images in a xarray dataset (a data cube) and save it.
Jeremy Auclair
committed
ndvi values are scaled and saved as ``uint8`` (0 to 255).
.. warning:: Current version for Copernicus Sentinel-2 images
Jeremy Auclair
committed
.. warning:: only 10 and 20 meters currently supported
Arguments
=========
1. extracted_paths: ``Union[List[str], str]``
list of paths to extracted sentinel-2 products
Jeremy Auclair
committed
or path to ``csv``` file containing those paths
Jeremy Auclair
committed
2. config_file: ``str``
path to configuration file
3. chunk_size: ``dict`` ``default = {'x': 1000, 'y': 1000, 'time': -1}``
dictionnary containing the chunk size for
the xarray dask calculation
4. scaling: ``int`` ``default = 255``
integer scaling to save NDVI data as integer values
acorvi correction parameter to add to the red band
Returns
=======
1. ndvi_precube_path: ``str``
path to save the ndvi pre-cube
"""
Jeremy Auclair
committed
Jeremy Auclair
committed
# Open config_file
config_params = config(config_file)
# Load parameters from config file
boundary_shapefile_path = config_params.shapefile_path
run_name = config_params.run_name
resolution = config_params.resolution
preferred_provider = config_params.preferred_provider
if preferred_provider == 'copernicus':
Jeremy Auclair
committed
save_dir = config_params.download_path + os.sep + 'SCIHUB' + os.sep + 'NDVI'
else:
save_dir = config_params.download_path + os.sep + 'THEIA' + os.sep + 'NDVI'
# Check resolution for Sentinel-2
if not resolution in [10, 20]:
print('Resolution should be equal to 10 or 20 meters for sentinel-2')
return None
Jeremy Auclair
committed
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(extracted_paths) == str:
with open(extracted_paths, 'r') as file:
extracted_paths = []
Jeremy Auclair
committed
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
extracted_paths.append(row[0])
# Sort by band
red_paths = []
nir_paths = []
mask_paths = []
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Check provider
if config_params.preferred_provider == 'copernicus':
# Integer offset has to be applied on optical bands for copernicus data
offset = 0
if resolution == 10:
for product in extracted_paths:
for file in os.listdir(product):
if fnmatch(file, '*_B04_10m*'):
red_paths.append(product + os.sep + file)
elif fnmatch(file, '*_B08_10m*'):
nir_paths.append(product + os.sep + file)
elif fnmatch(file, '*_SCL_20m*'):
mask_paths.append(product + os.sep + file)
else:
for product in extracted_paths:
for file in os.listdir(product):
if fnmatch(file, '*_B04_20m*'):
red_paths.append(product + os.sep + file)
elif fnmatch(file, '*_B08_10m*'):
nir_paths.append(product + os.sep + file)
elif fnmatch(file, '*_SCL_20m*'):
mask_paths.append(product + os.sep + file)
elif config_params.preferred_provider == 'copernicus':
print('Theia data handling not yet implemented')
return None
# Sort and get dates
red_paths.sort()
nir_paths.sort()
mask_paths.sort()
dates = [product_str_to_datetime(prod) for prod in red_paths]
Jeremy Auclair
committed
Jeremy Auclair
committed
# Get crs
with rio.open(red_paths[0]) as temp:
crs = temp.crs
# Open shapefile to clip the dataset
shapefile = gpd.read_file(boundary_shapefile_path)
shapefile = shapefile.to_crs(crs)
bounds = shapefile.bounds.values[0]
# Open datasets with xarray, select only data inside the bounds of the input shapefile
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'}).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'}).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'}).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1]))
mask = xr.where((mask == 4) | (mask == 5), 1, np.NaN).interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
Jeremy Auclair
committed
nir = nir.interp(x = red.coords['x'], y = red.coords['y'], method = 'linear')
mask = xr.where((mask == 4) | (mask == 5), 1, np.NaN)
# Set time coordinate
red['time'] = pd.to_datetime(dates)
nir['time'] = pd.to_datetime(dates)
mask['time'] = pd.to_datetime(dates)
Jeremy Auclair
committed
# Save single red band as reference tif for later use in reprojection algorithms
ref = xr.open_dataset(red_paths[0]).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'}).sel(x = slice(bounds[0], bounds[2]), y = slice(bounds[3], bounds[1])).rio.write_crs(crs)
ref.rio.to_raster(save_dir + os.sep + run_name + os.sep + run_name + '_grid_reference.tif')
# Create save path
ndvi_precube_path = save_dir + os.sep + run_name + os.sep + run_name + '_NDVI_precube_' + dates[0].strftime('%Y-%m-%d') + '_' + dates[-1].strftime('%Y-%m-%d') + '.nc'
# Check if file exists and ndvi overwrite is false
if os.path.exists(ndvi_precube_path) and not config_params.ndvi_overwrite:
return ndvi_precube_path
# Create ndvi dataset and calculate ndvi
ndvi = red
ndvi = ndvi.drop('red')
ndvi['NDVI'] = (((nir.nir - red.red - acorvi_corr)/(nir.nir + red.red + acorvi_corr + 2 * offset)) * mask.mask)
Jeremy Auclair
committed
Jeremy Auclair
committed
ndvi['NDVI'] = xr.where(ndvi.NDVI < 0, 0, ndvi.NDVI)
ndvi['NDVI'] = xr.where(ndvi.NDVI > 1, 1, ndvi.NDVI)
Jeremy Auclair
committed
ndvi['NDVI'] = ((ndvi.NDVI + 1/scaling) * (scaling - 1)).round()
Jeremy Auclair
committed
# Write attributes
Jeremy Auclair
committed
ndvi['NDVI'].attrs['units'] = 'None'
ndvi['NDVI'].attrs['standard_name'] = 'NDVI'
ndvi['NDVI'].attrs['description'] = 'Normalized difference Vegetation Index (of the near infrared and red band). A value of one is a high vegetation presence.'
ndvi['NDVI'].attrs['scale factor'] = str(scaling)
Jeremy Auclair
committed
# Save NDVI cube to netcdf
file_chunksize = (ndvi.dims['time'], min(500, ndvi.dims['y']), min(500, ndvi.dims['x']))
ndvi.to_netcdf(ndvi_precube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": file_chunksize}})
Jeremy Auclair
committed
ndvi.close()
return ndvi_precube_path
Jeremy Auclair
committed
Jeremy Auclair
committed
def interpolate_ndvi(ndvi_path: str, config_file: str, chunk_size: dict = {'x': 500, 'y': 500, 'time': -1}) -> str:
Interpolate the ndvi cube to a daily frequency between the
desired dates defined in the ``json`` config file. The extra
month of data downloaded is used for a better interpolation,
it is then discarded and the final NDVI cube has the dates
defined in the config file.
Arguments
=========
Jeremy Auclair
committed
1. ndvi_path: ``str``
path to ndvi pre-cube
Jeremy Auclair
committed
2. config_file: ``str``
path to ``json`` config file
Jeremy Auclair
committed
3. chunk_size: ``dict`` ``default = {'x': 500, 'y': 500, 'time': -1}``
chunk size to use by dask for calculation,
``'time' = -1`` means the chunk has the whole
time dimension in it. The Dataset can't be
divided along the time axis for interpolation.
Returns
=======
``None``
Jeremy Auclair
committed
# Open config_file
config_params = config(config_file)
Jeremy Auclair
committed
# Load parameters from config file
run_name = config_params.run_name
if config_params.preferred_provider == 'copernicus':
save_dir = config_params.download_path + os.sep + 'SCIHUB' + os.sep + 'NDVI'
else:
save_dir = config_params.download_path + os.sep + 'THEIA' + os.sep + 'NDVI'
# Create save path
ndvi_cube_path = save_dir + os.sep + run_name + os.sep + run_name + '_NDVI_cube_' + config_params.start_date + '_' + config_params.end_date + '.nc'
# Check if file exists and ndvi overwrite is false
if os.path.exists(ndvi_cube_path) and not config_params.ndvi_overwrite:
return ndvi_cube_path
Jeremy Auclair
committed
# Open NDVI pre-cube
ndvi = xr.open_dataset(ndvi_path, chunks = chunk_size)
# Get dates of NDVI precube
dates = ndvi.time.values
Jeremy Auclair
committed
# Get Spatial reference
spatial_ref = ndvi.spatial_ref.load()
# Sort images by time
ndvi = ndvi.sortby('time')
Jeremy Auclair
committed
# daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')
daily_index = pd.date_range(start = dates[0], end = dates[-1], freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index)
Jeremy Auclair
committed
dates = pd.to_datetime(ndvi.time.values)
# Interpolate the dataset along the time dimension to fill nan values
Jeremy Auclair
committed
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').round(decimals = 0)
# Remove extra dates, only keep selected window
ndvi = ndvi.sel({'time': slice(config_params.start_date, config_params.end_date)})
Jeremy Auclair
committed
# Rescale data
ndvi = (ndvi * (255 / 254) - 1).round()
# Set negative values as 0
Jeremy Auclair
committed
ndvi = xr.where(ndvi < 0, 0, ndvi.NDVI)
# Set values above 255 (ndvi > 1) as 255 (ndvi = 1)
Jeremy Auclair
committed
ndvi = xr.where(ndvi > 255, 255, ndvi.NDVI)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Rewrite spatial reference
ndvi['spatial_ref'] = spatial_ref
Jeremy Auclair
committed
# Save NDVI cube to netcdf
Jeremy Auclair
committed
if ndvi.dims['y'] > 1500 and ndvi.dims['x'] > 1500:
file_chunksize = (1, 1000, 1000)
else:
file_chunksize = (1, ndvi.dims['y'], ndvi.dims['x'])
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": file_chunksize}})
Jeremy Auclair
committed
Jeremy Auclair
committed
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
return ndvi_cube_path
def write_geotiff(path: str, data: np.ndarray, transform: tuple, projection: str, scaling: int = 255) -> None:
"""
Writes a GeoTiff image using ``rasterio``. Takes an array and georeferencing data (obtained by ``rasterio``
on an image with same georeferencing) to write a well formatted image. Data format: ``uint8``
Arguments
=========
1. path: ``str``
true path of file to save data in (path = path + save name)
2. data: ``np.ndarray``
numpy array containging the data
3. geotransform: ``tuple``
tuple containing the geo-transform data
4. projection: ``str``
string containing the epsg projection data
5. scaling: ``int`` ``default = 255``
scaling factor for saving NDVI images as integer arrays
Returns
=======
``None``
"""
# Apply scaling
Jeremy Auclair
committed
data = np.round((data + np.float32(1/scaling)) * (scaling - 1), 1)
Jeremy Auclair
committed
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
# Write NDVI image with rasterio
with rio.open(path, mode = "w", driver = "GTiff", height = data.shape[0], width = data.shape[1], count = 1, dtype = np.uint8, crs = projection, transform = transform, nodata = 0) as new_dataset:
new_dataset.write(data, 1)
return None
def calculate_ndvi_image(args: tuple) -> str:
"""
Opens zip file of the given product to extract the red, near-infrared and mask bands (without extracting
the whole archive) and calculate the ndvi, apply a mask to keep only clear land data (no clouds, no snow,
no water, no unclassified or erroneous pixel). This array is then saved in a GeoTiff image with
the parent name file and ``'_NDVI.tif'`` extension. If overwrite is false, already existing files won't be
calculated and saved again (default value is ``false``).
This function is called in the calculate_ndvi function as a subprocess to allow multiprocessing. Variables
that have served their purpose are directly deleted to reduce memory usage.
Arguments (packed in args: ``tuple``)
=====================================
1. product_path: ``str``
path to the product to extract for ndvi calculation
2. save_dir: ``str``
directory in which to save ndvi images
3. overwrite: ``bool``
boolean to choose to rewrite already existing files
4. ACORVI_correction: ``int``
correction parameter to apply on the red band for stability of NDVI values
Returns
=======
1. ndvi_path: ``str``
path to the saved ndvi image
"""
# Unpack arguments
product_path, save_dir, overwrite, ACORVI_correction = args
# Check provider
_, _, provider, _ = read_product_info(product_path)
# To ignore numpy warning when dividing by NaN
np.seterr(invalid='ignore', divide='ignore')
if provider == 'copernicus':
# Define offset for copernicus data
offset = -1000
Jeremy Auclair
committed
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# Create file name
file_name = os.path.basename(product_path)
# file_name = os.path.splitext(file_name)[0]
save_name = save_dir + os.sep + file_name + '_NDVI.tif'
# Check if file is already written. If override is false, no need to calculate and write ndvi again.
# If override is true: ndvi is calculated and saved again.
if not os.path.exists(save_name) or overwrite: # File does not exist or overwrite is true
pass
else:
# Collect save path for return
return save_name
# Look for desired images in the directories
for f in os.listdir(product_path):
if fnmatch(f, '*_B04_10m.jp2'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_B08_10m.jp2'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_SCL_20m.jp2'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
# Read bands, geometry and projection information
red_band = rio.open(red_file, mode = 'r') # read red band
transorm = red_band.transform
projection = red_band.crs
del red_file
nir_band = rio.open(nir_file, mode = 'r') # read nir band
del nir_file
# Read array data
red_data = red_band.read(1) + ACORVI_correction
red_band.close()
nir_data = nir_band.read(1)
nir_band.close()
# Calculate ndvi
ndvi_data = np.divide((nir_data - red_data), (nir_data + red_data + 2*offset), dtype = np.float32)
del red_data, nir_data
# Read classif data
classif_band = rio.open(classif_file, mode = 'r') # read classif band
del classif_file
# Read array data
classif_data = classif_band.read(1)
classif_data = zoom(classif_data, zoom = 2, order = 0, output = np.uint8)
classif_band.close()
# # Create binary mask
Jeremy Auclair
committed
binary_mask = np.where((classif_data == 4) | (classif_data == 5) | (classif_data == 6), np.float32(1), np.float32(np.NaN))
Jeremy Auclair
committed
del classif_data
# Apply mask
np.multiply(ndvi_data, binary_mask, out=ndvi_data, dtype = np.float32)
del binary_mask
Jeremy Auclair
committed
# Clip out of range data
np.minimum(ndvi_data, 1, out = ndvi_data, dtype = np.float32)
np.maximum(ndvi_data, 0, out = ndvi_data, dtype = np.float32)
Jeremy Auclair
committed
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
# Write image
write_geotiff(save_name, ndvi_data, transorm, projection)
del ndvi_data, transorm, projection
elif provider == 'theia':
# Create file name
file_name = os.path.basename(product_path)
# file_name = os.path.splitext(file_name)[0]
save_name = save_dir + os.sep + file_name + '_NDVI.tif'
# Check if file is already written. If override is false, no need to calculate and write ndvi again.
# If override is true: ndvi is calculated and saved again.
if not os.path.exists(save_name) or overwrite: # File does not exist or overwrite is true
pass
else:
# Collect save path for return
return save_name
# Look for desired images in the directories
for f in os.listdir(product_path):
if fnmatch(f, '*_FRE_B4.tif'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_FRE_B8.tif'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_MG2_R1.tif'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
# Read bands, geometry and projection information
red_band = rio.open(red_file, mode = 'r') # read red band
transorm = red_band.transform
projection = red_band.crs
del red_file
nir_band = rio.open(nir_file, mode = 'r') # read nir band
del nir_file
# Read array data, set -10000 as no data value on optical bands for correct masking
red_data = red_band.read(1) + ACORVI_correction
red_data = np.where((red_data == -10000), np.float32(np.NaN), red_data)
red_band.close()
nir_data = nir_band.read(1)
nir_data = np.where((nir_data == -10000), np.float32(np.NaN), nir_data)
nir_band.close()
# Calculate ndvi
ndvi_data = np.divide((nir_data - red_data), (nir_data + red_data), dtype = np.float32)
del red_data, nir_data
# Extract classif data
classif_band = rio.open(classif_file, mode = 'r') # read classif band
del classif_file
# Read array data
classif_data = classif_band.read(1)
# classif_data = zoom(classif_data, zoom = 2, order = 0, output = np.uint8)
classif_band.close()
# Create binary mask
Jeremy Auclair
committed
binary_mask = np.where((classif_data == 0) | (classif_data == 1), np.float32(1), np.float32(np.NaN))
Jeremy Auclair
committed
del classif_data
# Apply mask
np.multiply(ndvi_data, binary_mask, dtype = np.float32, out = ndvi_data)
del binary_mask
Jeremy Auclair
committed
# Clip out of range data
np.minimum(ndvi_data, 1, out = ndvi_data, dtype = np.float32)
np.maximum(ndvi_data, 0, out = ndvi_data, dtype = np.float32)
Jeremy Auclair
committed
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
# Write image
write_geotiff(save_name, ndvi_data, transorm, projection)
del ndvi_data, transorm, projection
return save_name
def calculate_ndvi_parcel(download_path: Union[List[str], str], save_dir: str, save_path: str, overwrite: bool = False, max_cpu: int = 4, ACORVI_correction: int = 500) -> List[str]:
"""
Opens the red, near infrared and scene classification to calculate the ndvi, apply a mask to keep only clear land data (no clouds, no snow,
no water, no unclassified or erroneous pixel). This array is then saved in a GeoTiff image with
the parrent name file and ``'_NDVI.tif'`` extension. If overwrite is false, already existing fils won't be
calculated and saved again (default value is ``false``).
This function calls the calculate_ndvi_image function as a subprocess to allow multiprocessing. A modified
version of the ``tqdm`` module (``p_tqdm``) is used for progress bars.
Arguments
=========
1. download_path: ``list[str]`` or ``str``
list or paths to the products to extract for ndvi calculation or path to ``csv`` file that contains this list
2. save_dir: ``str``
directory in which to save ndvi images
3. save_path : ``str``
path to a csv file containing the paths to the saved ndvi images
4. overwrite: ``bool`` ``default = False``
boolean to choose to rewrite already existing files
5. max_cpu: ``int`` `default = 4`
max number of CPUs that the pool can use, if max_cpu is bigger than available CPUs, the pool only uses availables CPUs
6. ACORVI_correction: ``int`` ``default = 500``
correction parameter to apply on the red band for stability of NDVI values
Returns
=======
1. ndvi_path: ``list[str]``
list of paths to the saved ndvi images
"""
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(download_path) == str:
with open(download_path, 'r') as file:
download_path = []
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
download_path.append(row[0])
ndvi_path = [] # Where saved image paths will be stored
# Prepare arguments for multiprocessing
args = [(product, save_dir, overwrite, ACORVI_correction) for product in download_path]
# Get number of CPU cores and limit max value (working on a cluster requires os.sched_getaffinity to get true number of available CPUs,
# this is not true on a "personnal" computer, hence the use of the min function)
try:
nb_cores = min([max_cpu, cpu_count(logical = False), len(os.sched_getaffinity(0))])
except:
nb_cores = min([max_cpu, cpu_count(logical = False)]) # os.sched_getaffinity won't work on windows
Jeremy Auclair
committed
print('\nStarting NDVI calculations with %d cores for %d images...\n' %(nb_cores, len(download_path)))
# Start pool and get results
results = p_map(calculate_ndvi_image, args, **{"num_cpus": nb_cores})
# Collect results and sort them
for result in results:
ndvi_path.append(result)
ndvi_path.sort()
# Save ndvi paths as a csv file
with open(save_path, 'w', newline='') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for ndvi_image in ndvi_path:
write.writerow([ndvi_image])
return ndvi_path