# -*- coding: UTF-8 -*-
# Python
"""
04-07-2023
@author: jeremy auclair
Calculate NDVI images with xarray
"""
import os # for path exploration
import csv # open csv files
from fnmatch import fnmatch # for character string comparison
from typing import List, Union # to declare variables
import xarray as xr # to manage dataset
import pandas as pd # to manage dataframes
import rasterio as rio # to open geotiff files
import numpy as np # vectorized math
import geopandas as gpd # to manage shapefile crs projections
import datetime # for date management
from scipy.ndimage import zoom # to rescale different resolution images
from shapely.geometry import box # to create boundary box
from p_tqdm import p_map # for multiprocessing with progress bars
from psutil import cpu_count # to get number of physical cores available
from modspa_pixel.config.config import config # to import config file
from modspa_pixel.preprocessing.input_toolbox import product_str_to_datetime, read_product_info
[docs]
def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, boundary_shapefile_path: str, resolution: int = 20, chunk_size: dict = {'x': 4096, 'y': 4096, 'time': 2}, acorvi_corr: int = 500) -> str:
"""
Calculate ndvi images in a xarray dataset (a data cube) and save it.
ndvi values are scaled and saved as ``uint8`` (0 to 255).
.. warning:: Current version for Copernicus Sentinel-2 images
Arguments
=========
1. extracted_paths: ``Union[List[str], str]``
list of paths to extracted sentinel-2 products
or path to ``csv``` file containing those paths
2. save_dir: ``str``
directory in which to save the ndvi pre-cube
3. boundary_shapefile_path: ``str``
shapefile path to save the geographical bounds
of the ndvi tile, used later to download weather
products
4. resolution: ``int`` ``default = 20``
resolution in meters
.. warning:: only 10 and 20 meters currently supported
5. chunk_size: ``dict`` ``default = {'x': 4096, 'y': 4096, 'time': 2}``
dictionnary containing the chunk size for
the xarray dask calculation
6. acorvi_corr: ``int`` ``default = 500``
acorvi correction parameter to add to the red band
to adjust ndvi values
Returns
=======
1. ndvi_cube_path: ``str``
path to save the ndvi pre-cube
"""
# Check resolution for Sentinel-2
if not resolution in [10, 20]:
print('Resolution should be equal to 10 or 20 meters for sentinel-2')
return None
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(extracted_paths) == str:
with open(extracted_paths, 'r') as file:
extracted_paths = []
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
extracted_paths.append(row[0])
# Sort by band
red_paths = []
nir_paths = []
mask_paths = []
if resolution == 10:
for product in extracted_paths:
if fnmatch(product, '*_B04_10m*'):
red_paths.append(product)
elif fnmatch(product, '*_B08_10m*'):
nir_paths.append(product)
elif fnmatch(product, '*_SCL_20m*'):
mask_paths.append(product)
else:
for product in extracted_paths:
if fnmatch(product, '*_B04_20m*'):
red_paths.append(product)
elif fnmatch(product, '*_B08_10m*'):
nir_paths.append(product)
elif fnmatch(product, '*_SCL_20m*'):
mask_paths.append(product)
# Create boundary shapefile from Sentinel-2 image for weather download
ra = rio.open(red_paths[0])
bounds = ra.bounds
geom = box(*bounds)
df = gpd.GeoDataFrame({"id":1,"geometry":[geom]})
df.crs = ra.crs
df.geometry = df.geometry.to_crs('epsg:4326')
df.to_file(boundary_shapefile_path)
del df, ra, geom, bounds
# Sort and get dates
red_paths.sort()
nir_paths.sort()
mask_paths.sort()
dates = [product_str_to_datetime(prod) for prod in red_paths]
# Open datasets with xarray
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'})
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'})
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'})
if resolution == 10:
mask = xr.where((mask == 4) | (mask == 5), 1, 0).interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
else:
nir = nir.coarsen(x=2, y=2, boundary='exact').mean()
mask = xr.where((mask == 4) | (mask == 5), 1, 0)
# Set time coordinate
red['time'] = pd.to_datetime(dates)
nir['time'] = pd.to_datetime(dates)
mask['time'] = pd.to_datetime(dates)
# Create ndvi dataset and calculate ndvi
ndvi = red
ndvi = ndvi.drop('red')
ndvi['NDVI'] = (((nir.nir - red.red - acorvi_corr)/(nir.nir + red.red + acorvi_corr))*mask.mask)
del red, nir, mask
# Mask and scale ndvi
ndvi['NDVI'] = xr.where(ndvi.NDVI < 0, 0, ndvi.NDVI)
ndvi['NDVI'] = xr.where(ndvi.NDVI > 1, 1, ndvi.NDVI)
ndvi['NDVI'] = (ndvi.NDVI*255)
# Write attributes
ndvi['NDVI'].attrs['units'] = 'None'
ndvi['NDVI'].attrs['standard_name'] = 'NDVI'
ndvi['NDVI'].attrs['description'] = 'Normalized difference Vegetation Index (of the near infrared and red band). A value of one is a high vegetation presence.'
ndvi['NDVI'].attrs['scale factor'] = '255'
# Create save path
ndvi_cube_path = save_dir + os.sep + 'NDVI_precube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
# Save NDVI cube to netcdf
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": (4, 1024, 1024)}}) #, 'zlib': True, "complevel": 4}})
ndvi.close()
return ndvi_cube_path
[docs]
def interpolate_ndvi(ndvi_path: str, save_dir: str, config_file: str, chunk_size: dict = {'x': 512, 'y': 512, 'time': -1}) -> str:
"""
Interpolate the ndvi cube to a daily frequency between the
desired dates defined in the ``json`` config file.
Arguments
=========
ndvi_path: ``str``
path to ndvi pre-cube
save_dir: ``str``
path to save interpolated ndvi cube
config_file: ``str``
path to ``json`` config file
chunk_size: ``dict`` ``default = {'x': 512, 'y': 512, 'time': -1}``
chunk size to use by dask for calculation,
``'time' = -1`` means the chunk has the whole
time dimension in it. The Dataset can't be
divided along the time axis for interpolation.
Returns
=======
``None``
"""
# Open config_file
config_params = config(config_file)
# Open NDVI pre-cube
ndvi = xr.open_dataset(ndvi_path, chunks = chunk_size)
# Get Spatial reference
spatial_ref = ndvi.spatial_ref.load()
# Sort images by time
ndvi = ndvi.sortby('time')
# Interpolates on a daily frequency
daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index)
dates = pd.to_datetime(ndvi.time.values)
# Interpolate the dataset along the time dimension to fill nan values
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').round(decimals = 0)
# Set negative values as 0
ndvi = xr.where(ndvi < 0, 0, ndvi.NDVI)
# Set values above 255 (ndvi > 1) as 255 (ndvi = 1)
ndvi = xr.where(ndvi > 255, 255, ndvi.NDVI)
# Rewrite spatial reference
ndvi['spatial_ref'] = spatial_ref
# Create save path
ndvi_cube_path = save_dir + os.sep + 'NDVI_cube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
# Save NDVI cube to netcdf
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": (4, 1024, 1024)}}) #, 'zlib': True, "complevel": 4}})
ndvi.close()
return ndvi_cube_path
def write_geotiff(path: str, data: np.ndarray, transform: tuple, projection: str, scaling: int = 255) -> None:
"""
Writes a GeoTiff image using ``rasterio``. Takes an array and georeferencing data (obtained by ``rasterio``
on an image with same georeferencing) to write a well formatted image. Data format: ``uint8``
Arguments
=========
1. path: ``str``
true path of file to save data in (path = path + save name)
2. data: ``np.ndarray``
numpy array containging the data
3. geotransform: ``tuple``
tuple containing the geo-transform data
4. projection: ``str``
string containing the epsg projection data
5. scaling: ``int`` ``default = 255``
scaling factor for saving NDVI images as integer arrays
Returns
=======
``None``
"""
# Apply scaling
data = data*scaling
# Write NDVI image with rasterio
with rio.open(path, mode = "w", driver = "GTiff", height = data.shape[0], width = data.shape[1], count = 1, dtype = np.uint8, crs = projection, transform = transform, nodata = 0) as new_dataset:
new_dataset.write(data, 1)
return None
def calculate_ndvi_image(args: tuple) -> str:
"""
Opens zip file of the given product to extract the red, near-infrared and mask bands (without extracting
the whole archive) and calculate the ndvi, apply a mask to keep only clear land data (no clouds, no snow,
no water, no unclassified or erroneous pixel). This array is then saved in a GeoTiff image with
the parent name file and ``'_NDVI.tif'`` extension. If overwrite is false, already existing files won't be
calculated and saved again (default value is ``false``).
This function is called in the calculate_ndvi function as a subprocess to allow multiprocessing. Variables
that have served their purpose are directly deleted to reduce memory usage.
Arguments (packed in args: ``tuple``)
=====================================
1. product_path: ``str``
path to the product to extract for ndvi calculation
2. save_dir: ``str``
directory in which to save ndvi images
3. overwrite: ``bool``
boolean to choose to rewrite already existing files
4. ACORVI_correction: ``int``
correction parameter to apply on the red band for stability of NDVI values
Returns
=======
1. ndvi_path: ``str``
path to the saved ndvi image
"""
# Unpack arguments
product_path, save_dir, overwrite, ACORVI_correction = args
# Check provider
_, _, provider, _ = read_product_info(product_path)
# To ignore numpy warning when dividing by NaN
np.seterr(invalid='ignore', divide='ignore')
if provider == 'copernicus':
# Get date to test if offset has to be applied
if product_str_to_datetime(product_path) >= datetime.date(year=2022, month=1, day=25):
offset = -1000
else:
offset = 0
# Create file name
file_name = os.path.basename(product_path)
# file_name = os.path.splitext(file_name)[0]
save_name = save_dir + os.sep + file_name + '_NDVI.tif'
# Check if file is already written. If override is false, no need to calculate and write ndvi again.
# If override is true: ndvi is calculated and saved again.
if not os.path.exists(save_name) or overwrite: # File does not exist or overwrite is true
pass
else:
# Collect save path for return
return save_name
# Look for desired images in the directories
for f in os.listdir(product_path):
if fnmatch(f, '*_B04_10m.jp2'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_B08_10m.jp2'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_SCL_20m.jp2'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
# Read bands, geometry and projection information
red_band = rio.open(red_file, mode = 'r') # read red band
transorm = red_band.transform
projection = red_band.crs
del red_file
nir_band = rio.open(nir_file, mode = 'r') # read nir band
del nir_file
# Read array data
red_data = red_band.read(1) + ACORVI_correction
red_band.close()
nir_data = nir_band.read(1)
nir_band.close()
# Calculate ndvi
ndvi_data = np.divide((nir_data - red_data), (nir_data + red_data + 2*offset), dtype = np.float32)
del red_data, nir_data
# Read classif data
classif_band = rio.open(classif_file, mode = 'r') # read classif band
del classif_file
# Read array data
classif_data = classif_band.read(1)
classif_data = zoom(classif_data, zoom = 2, order = 0, output = np.uint8)
classif_band.close()
# # Create binary mask
binary_mask = np.where((classif_data == 4) | (classif_data == 5), np.float32(1), np.float32(np.NaN))
del classif_data
# Apply mask
np.multiply(ndvi_data, binary_mask, out=ndvi_data, dtype = np.float32)
del binary_mask
# Mask areas with unwanted data
ndvi_data = np.where((ndvi_data >= 1) | (ndvi_data <= 0), np.float32(np.NaN), ndvi_data)
# Write image
write_geotiff(save_name, ndvi_data, transorm, projection)
del ndvi_data, transorm, projection
elif provider == 'theia':
# Create file name
file_name = os.path.basename(product_path)
# file_name = os.path.splitext(file_name)[0]
save_name = save_dir + os.sep + file_name + '_NDVI.tif'
# Check if file is already written. If override is false, no need to calculate and write ndvi again.
# If override is true: ndvi is calculated and saved again.
if not os.path.exists(save_name) or overwrite: # File does not exist or overwrite is true
pass
else:
# Collect save path for return
return save_name
# Look for desired images in the directories
for f in os.listdir(product_path):
if fnmatch(f, '*_FRE_B4.tif'): # Red band
red_file = os.path.join(product_path, f)
elif fnmatch(f, '*_FRE_B8.tif'): # Near infrared band
nir_file = os.path.join(product_path, f)
elif fnmatch(f, '*_MG2_R1.tif'): # Scene classification for mask
classif_file = os.path.join(product_path, f)
# Read bands, geometry and projection information
red_band = rio.open(red_file, mode = 'r') # read red band
transorm = red_band.transform
projection = red_band.crs
del red_file
nir_band = rio.open(nir_file, mode = 'r') # read nir band
del nir_file
# Read array data, set -10000 as no data value on optical bands for correct masking
red_data = red_band.read(1) + ACORVI_correction
red_data = np.where((red_data == -10000), np.float32(np.NaN), red_data)
red_band.close()
nir_data = nir_band.read(1)
nir_data = np.where((nir_data == -10000), np.float32(np.NaN), nir_data)
nir_band.close()
# Calculate ndvi
ndvi_data = np.divide((nir_data - red_data), (nir_data + red_data), dtype = np.float32)
del red_data, nir_data
# Extract classif data
classif_band = rio.open(classif_file, mode = 'r') # read classif band
del classif_file
# Read array data
classif_data = classif_band.read(1)
# classif_data = zoom(classif_data, zoom = 2, order = 0, output = np.uint8)
classif_band.close()
# Create binary mask
binary_mask = np.where((classif_data == 0), np.float32(1), np.float32(np.NaN))
del classif_data
# Apply mask
np.multiply(ndvi_data, binary_mask, dtype = np.float32, out = ndvi_data)
del binary_mask
# Mask unwanted data
ndvi_data = np.where((ndvi_data >= 1) | (ndvi_data <= 0), np.float32(np.NaN), ndvi_data)
# Write image
write_geotiff(save_name, ndvi_data, transorm, projection)
del ndvi_data, transorm, projection
return save_name
def calculate_ndvi_parcel(download_path: Union[List[str], str], save_dir: str, save_path: str, overwrite: bool = False, max_cpu: int = 4, ACORVI_correction: int = 500) -> List[str]:
"""
Opens the red, near infrared and scene classification to calculate the ndvi, apply a mask to keep only clear land data (no clouds, no snow,
no water, no unclassified or erroneous pixel). This array is then saved in a GeoTiff image with
the parrent name file and ``'_NDVI.tif'`` extension. If overwrite is false, already existing fils won't be
calculated and saved again (default value is ``false``).
This function calls the calculate_ndvi_image function as a subprocess to allow multiprocessing. A modified
version of the ``tqdm`` module (``p_tqdm``) is used for progress bars.
Arguments
=========
1. download_path: ``list[str]`` or ``str``
list or paths to the products to extract for ndvi calculation or path to ``csv`` file that contains this list
2. save_dir: ``str``
directory in which to save ndvi images
3. save_path : ``str``
path to a csv file containing the paths to the saved ndvi images
4. overwrite: ``bool`` ``default = False``
boolean to choose to rewrite already existing files
5. max_cpu: ``int`` `default = 4`
max number of CPUs that the pool can use, if max_cpu is bigger than available CPUs, the pool only uses availables CPUs
6. ACORVI_correction: ``int`` ``default = 500``
correction parameter to apply on the red band for stability of NDVI values
Returns
=======
1. ndvi_path: ``list[str]``
list of paths to the saved ndvi images
"""
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(download_path) == str:
with open(download_path, 'r') as file:
download_path = []
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
download_path.append(row[0])
ndvi_path = [] # Where saved image paths will be stored
# Prepare arguments for multiprocessing
args = [(product, save_dir, overwrite, ACORVI_correction) for product in download_path]
# Get number of CPU cores and limit max value (working on a cluster requires os.sched_getaffinity to get true number of available CPUs,
# this is not true on a "personnal" computer, hence the use of the min function)
nb_cores = min([max_cpu, cpu_count(logical = False), len(os.sched_getaffinity(0))])
print('\nStarting NDVI calculations with %d cores for %d images...\n' %(nb_cores, len(download_path)))
# Start pool and get results
results = p_map(calculate_ndvi_image, args, **{"num_cpus": nb_cores})
# Collect results and sort them
for result in results:
ndvi_path.append(result)
ndvi_path.sort()
# Save ndvi paths as a csv file
with open(save_path, 'w', newline='') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for ndvi_image in ndvi_path:
write.writerow([ndvi_image])
return ndvi_path