Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
Jeremy Auclair
committed
04-07-2023
@author: jeremy auclair
Calculate NDVI images with xarray
"""
import os # for path exploration
import csv # open csv files
from fnmatch import fnmatch # for character string comparison
Jeremy Auclair
committed
from typing import List, Union # to declare variables
import xarray as xr # to manage dataset
Jeremy Auclair
committed
import rasterio as rio # to open geotiff files
import geopandas as gpd # to manage shapefile crs projections
from shapely.geometry import box # to create boundary box
from modspa_pixel.config.config import config # to import config file
Jeremy Auclair
committed
from modspa_pixel.preprocessing.input_toolbox import product_str_to_datetime
Jeremy Auclair
committed
Jeremy Auclair
committed
def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, boundary_shapefile_path: str, resolution: int = 20, chunk_size: dict = {'x': 4096, 'y': 4096, 'time': 2}, acorvi_corr: int = 500) -> str:
Calculate ndvi images in a xarray dataset (a data cube) and save it.
ndvi values are scaled and saved as `uint8` (0 to 255).
.. warning:: Current version for Copernicus Sentinel-2 images
Arguments
=========
1. extracted_paths: ``Union[List[str], str]``
list of paths to extracted sentinel-2 products
or path to ``csv`` file containing those paths
2. save_dir: ``str``
directory in which to save the ndvi pre-cube
3. boundary_shapefile_path: ``str``
shapefile path to save the geographical bounds
of the ndvi tile, used later to download weather
products
4. resolution: ``int`` ``default = 20``
resolution in meters
.. warning:: only 10 and 20 meters currently supported
5. chunk_size: ``dict`` ``default = {'x': 4096, 'y': 4096, 'time': 2}``
dictionnary containing the chunk size for
the xarray dask calculation
6. acorvi_corr: ``int`` ``default = 500``
acorvi correction parameter to add to the red band
to adjust ndvi values
Returns
=======
1. ndvi_cube_path: ``str``
path to save the ndvi pre-cube
"""
Jeremy Auclair
committed
# Check resolution for Sentinel-2
if not resolution in [10, 20]:
print('Resolution should be equal to 10 or 20 meters for sentinel-2')
return None
Jeremy Auclair
committed
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(extracted_paths) == str:
with open(extracted_paths, 'r') as file:
extracted_paths = []
Jeremy Auclair
committed
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
extracted_paths.append(row[0])
# Sort by band
red_paths = []
nir_paths = []
mask_paths = []
if resolution == 10:
for product in extracted_paths:
if fnmatch(product, '*_B04_10m*'):
red_paths.append(product)
Jeremy Auclair
committed
elif fnmatch(product, '*_B08_10m*'):
nir_paths.append(product)
elif fnmatch(product, '*_SCL_20m*'):
mask_paths.append(product)
else:
for product in extracted_paths:
Jeremy Auclair
committed
if fnmatch(product, '*_B04_20m*'):
Jeremy Auclair
committed
elif fnmatch(product, '*_B8A_20m*'):
nir_paths.append(product)
elif fnmatch(product, '*_SCL_20m*'):
mask_paths.append(product)
Jeremy Auclair
committed
# Create boundary shapefile from Sentinel-2 image for weather download
ra = rio.open(red_paths[0])
bounds = ra.bounds
geom = box(*bounds)
df = gpd.GeoDataFrame({"id":1,"geometry":[geom]})
df.crs = ra.crs
df.geometry = df.geometry.to_crs('epsg:4326')
df.to_file(boundary_shapefile_path)
del df, ra, geom, bounds
# Sort and get dates
red_paths.sort()
nir_paths.sort()
mask_paths.sort()
dates = [product_str_to_datetime(prod) for prod in red_paths]
Jeremy Auclair
committed
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'})
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'})
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'})
Jeremy Auclair
committed
mask = xr.where((mask == 4) | (mask == 5), 1, 0).interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
Jeremy Auclair
committed
mask = xr.where((mask == 4) | (mask == 5), 1, 0)
# Set time coordinate
red['time'] = pd.to_datetime(dates)
nir['time'] = pd.to_datetime(dates)
mask['time'] = pd.to_datetime(dates)
# Create ndvi dataset and calculate ndvi
ndvi = red
ndvi = ndvi.drop('red')
Jeremy Auclair
committed
ndvi['NDVI'] = (((nir.nir - red.red - acorvi_corr)/(nir.nir + red.red + acorvi_corr))*mask.mask)
Jeremy Auclair
committed
Jeremy Auclair
committed
ndvi['NDVI'] = xr.where(ndvi.NDVI < 0, 0, ndvi.NDVI)
ndvi['NDVI'] = xr.where(ndvi.NDVI > 1, 1, ndvi.NDVI)
ndvi['NDVI'] = (ndvi.NDVI*255)
Jeremy Auclair
committed
# Write attributes
Jeremy Auclair
committed
ndvi['NDVI'].attrs['units'] = 'None'
ndvi['NDVI'].attrs['standard_name'] = 'NDVI'
ndvi['NDVI'].attrs['description'] = 'Normalized difference Vegetation Index (of the near infrared and red band). A value of one is a high vegetation presence.'
ndvi['NDVI'].attrs['scale factor'] = '255'
Jeremy Auclair
committed
# Create save path
ndvi_cube_path = save_dir + os.sep + 'NDVI_precube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
# Save NDVI cube to netcdf
Jeremy Auclair
committed
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": (4, 1024, 1024)}}) #, 'zlib': True, "complevel": 4}})
Jeremy Auclair
committed
ndvi.close()
return ndvi_cube_path
Jeremy Auclair
committed
def interpolate_ndvi(ndvi_path: str, save_dir: str, config_file: str, chunk_size: dict = {'x': 512, 'y': 512, 'time': -1}) -> str:
Interpolate the ndvi cube to a daily frequency between the
desired dates defined in the ``json`` config file.
Arguments
=========
ndvi_path: ``str``
path to ndvi pre-cube
save_dir: ``str``
path to save interpolated ndvi cube
config_file: ``str``
path to ``json`` config file
chunk_size: ``dict`` ``default = {'x': 512, 'y': 512, 'time': -1}``
chunk size to use by dask for calculation,
``'time' = -1`` means the chunk has the whole
time dimension in it. The Dataset can't be
divided along the time axis for interpolation.
Returns
=======
``None``
Jeremy Auclair
committed
# Open config_file
config_params = config(config_file)
# Open NDVI pre-cube
ndvi = xr.open_dataset(ndvi_path, chunks = chunk_size)
Jeremy Auclair
committed
# Get Spatial reference
spatial_ref = ndvi.spatial_ref.load()
# Sort images by time
ndvi = ndvi.sortby('time')
Jeremy Auclair
committed
# Interpolates on a daily frequency
daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index)
Jeremy Auclair
committed
dates = pd.to_datetime(ndvi.time.values)
# Interpolate the dataset along the time dimension to fill nan values
Jeremy Auclair
committed
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').round(decimals = 0)
# Set negative values as 0
Jeremy Auclair
committed
ndvi = xr.where(ndvi < 0, 0, ndvi.NDVI)
# Set values above 255 (ndvi > 1) as 255 (ndvi = 1)
Jeremy Auclair
committed
ndvi = xr.where(ndvi > 255, 255, ndvi.NDVI)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Rewrite spatial reference
ndvi['spatial_ref'] = spatial_ref
ndvi_cube_path = save_dir + os.sep + 'NDVI_cube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
Jeremy Auclair
committed
Jeremy Auclair
committed
# Save NDVI cube to netcdf
Jeremy Auclair
committed
ndvi.to_netcdf(ndvi_cube_path, encoding = {"NDVI": {"dtype": "u1", "_FillValue": 0, "chunksizes": (4, 1024, 1024)}}) #, 'zlib': True, "complevel": 4}})
Jeremy Auclair
committed
Jeremy Auclair
committed
return ndvi_cube_path