Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
"""
04-07-2023
@author: jeremy auclair
Calculate NDVI images with xarray
"""
import os # for path exploration
import csv # open csv files
from fnmatch import fnmatch # for character string comparison
Jeremy Auclair
committed
from typing import List, Union # to declare variables
import xarray as xr # to manage dataset
Jeremy Auclair
committed
import rasterio as rio # to open geotiff files
import geopandas as gpd # to manage shapefile crs projections
from shapely.geometry import box # to create boundary box
from config.config import config # to import config file
Jeremy Auclair
committed
from input.input_toolbox import product_str_to_datetime
Jeremy Auclair
committed
Jeremy Auclair
committed
def calculate_ndvi(extracted_paths: Union[List[str], str], save_dir: str, boundary_shapefile_path: str, resolution: int = 20, chunk_size: dict = {'x': 4096, 'y': 4096, 'time': 2}, acorvi_corr: int = 500) -> str:
"""
Calculate ndvi images in a xarray dataset (a data cube) and save it.
ndvi values are scaled and saved as `uint8` (0 to 255).
/!\ Current version for Copernicus Sentinel-2 images /!\
## Arguments
1. extracted_paths: `Union[List[str], str]`
list of paths to extracted sentinel-2 products
or path to `csv` file containing those paths
2. save_dir: `str`
directory in which to save the ndvi pre-cube
3. boundary_shapefile_path: `str`
shapefile path to save the geographical bounds
of the ndvi tile, used later to download weather
products
4. resolution: `int` `default = 20`
resolution in meters\n
/!\ only 10 and 20 meters currently supported /!\
5. chunk_size: `dict` `default = {'x': 4096, 'y': 4096, 'time': 2}`
dictionnary containing the chunk size for
the xarray dask calculation
6. acorvi_corr: `int` `default = 500`
acorvi correction parameter to add to the red band
to adjust ndvi values
## Returns
1. ndvi_cube_path: `str`
path to save the ndvi pre-cube
"""
Jeremy Auclair
committed
# Check resolution for Sentinel-2
if not resolution in [10, 20]:
print('Resolution should be equal to 10 or 20 meters for sentinel-2')
return None
Jeremy Auclair
committed
# If a file name is provided instead of a list of paths, load the csv file that contains the list of paths
if type(extracted_paths) == str:
with open(extracted_paths, 'r') as file:
extracted_paths = []
Jeremy Auclair
committed
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
extracted_paths.append(row[0])
# Sort by band
red_paths = []
nir_paths = []
mask_paths = []
if resolution == 10:
for product in extracted_paths:
if fnmatch(product, '*_B04_10m*'):
red_paths.append(product)
Jeremy Auclair
committed
elif fnmatch(product, '*_B08_10m*'):
nir_paths.append(product)
elif fnmatch(product, '*_SCL_20m*'):
mask_paths.append(product)
else:
for product in extracted_paths:
Jeremy Auclair
committed
if fnmatch(product, '*_B04_20m*'):
Jeremy Auclair
committed
elif fnmatch(product, '*_B8A_20m*'):
nir_paths.append(product)
elif fnmatch(product, '*_SCL_20m*'):
mask_paths.append(product)
Jeremy Auclair
committed
# Create boundary shapefile from Sentinel-2 image for weather download
ra = rio.open(red_paths[0])
bounds = ra.bounds
geom = box(*bounds)
df = gpd.GeoDataFrame({"id":1,"geometry":[geom]})
df.crs = ra.crs
df.geometry = df.geometry.to_crs('epsg:4326')
df.to_file(boundary_shapefile_path)
del df, ra, geom, bounds
# Sort and get dates
red_paths.sort()
nir_paths.sort()
mask_paths.sort()
dates = [product_str_to_datetime(prod) for prod in red_paths]
Jeremy Auclair
committed
red = xr.open_mfdataset(red_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'red'})
nir = xr.open_mfdataset(nir_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'nir'})
mask = xr.open_mfdataset(mask_paths, combine = 'nested', concat_dim = 'time', chunks = chunk_size, parallel = True).squeeze(dim = ['band'], drop = True).rename({'band_data': 'mask'})
Jeremy Auclair
committed
mask = xr.where((mask == 4) | (mask == 5), 1, 0).interp(x = red.coords['x'], y = red.coords['y'], method = 'nearest')
Jeremy Auclair
committed
mask = xr.where((mask == 4) | (mask == 5), 1, 0)
# Set time coordinate
red['time'] = pd.to_datetime(dates)
nir['time'] = pd.to_datetime(dates)
mask['time'] = pd.to_datetime(dates)
# Create ndvi dataset and calculate ndvi
ndvi = red
ndvi = ndvi.drop('red')
ndvi['ndvi'] = (((nir.nir - red.red - acorvi_corr)/(nir.nir + red.red + acorvi_corr))*mask.mask)
del red, nir, mask
Jeremy Auclair
committed
# Mask and scale ndvi
ndvi['ndvi'] = xr.where(ndvi.ndvi < 0, 0, ndvi.ndvi)
ndvi['ndvi'] = xr.where(ndvi.ndvi > 1, 1, ndvi.ndvi)
Jeremy Auclair
committed
ndvi['ndvi'] = (ndvi.ndvi*255)
# Write attributes
ndvi['ndvi'].attrs['units'] = 'None'
ndvi['ndvi'].attrs['standard_name'] = 'NDVI'
ndvi['ndvi'].attrs['description'] = 'Normalized difference Vegetation Index (of the near infrared and red band). A value of one is a high vegetation presence.'
ndvi['ndvi'].attrs['scale factor'] = '255'
# Create save path
ndvi_cube_path = save_dir + os.sep + 'NDVI_precube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
# Save NDVI cube to netcdf
ndvi.to_netcdf(ndvi_cube_path, encoding = {"ndvi": {"dtype": "u1", "_FillValue": 0, "chunksizes": (4, 1024, 1024)}})
ndvi.close()
return ndvi_cube_path
def interpolate_ndvi(ndvi_path: str, save_dir: str, config_file: str, chunk_size: dict = {'x': 512, 'y': 512, 'time': -1}) -> None:
"""
Interpolate the ndvi cube to a daily frequency between the
desired dates defined in the `json` config file.
## Arguments
ndvi_path: `str`
path to ndvi pre-cube
save_dir: `str`
path to save interpolated ndvi cube
config_file: `str`
path to `json` config file
chunk_size: `dict` `default = {'x': 512, 'y': 512, 'time': -1}`
chunk size to use by dask for calculation,
`'time' = -1` means the chunk has the whole
time dimension in it. The Dataset can't be
divided along the time axis for interpolation.
## Returns
`None`
"""
Jeremy Auclair
committed
# Open config_file
config_params = config(config_file)
# Open NDVI pre-cube
ndvi = xr.open_dataset(ndvi_path, chunks = chunk_size)
# Sort images by time
ndvi = ndvi.sortby('time')
Jeremy Auclair
committed
dates = pd.to_datetime(ndvi.time.values)
Jeremy Auclair
committed
# Interpolates on a daily frequency
daily_index = pd.date_range(start = config_params.start_date, end = config_params.end_date, freq = 'D')
# Resample the dataset to a daily frequency and reindex with the new DateTimeIndex
ndvi = ndvi.resample(time = '1D').asfreq().reindex(time = daily_index)
# Interpolate the dataset along the time dimension to fill nan values
Jeremy Auclair
committed
ndvi = ndvi.interpolate_na(dim = 'time', method = 'linear', fill_value = 'extrapolate').round(decimals = 0)
# Set negative values as 0
Jeremy Auclair
committed
ndvi = xr.where(ndvi < 0, 0, ndvi.ndvi)
# Set values above 255 (ndvi > 1) as 255 (ndvi = 1)
Jeremy Auclair
committed
ndvi = xr.where(ndvi > 255, 255, ndvi.ndvi)
Jeremy Auclair
committed
ndvi_cube_path = save_dir + os.sep + 'NDVI_cube_' + dates[0].strftime('%d-%m-%Y') + '_' + dates[-1].strftime('%d-%m-%Y') + '.nc'
Jeremy Auclair
committed
Jeremy Auclair
committed
# Save NDVI cube to netcdf
Jeremy Auclair
committed
ndvi.to_netcdf(ndvi_cube_path, encoding = {"ndvi": {"dtype": "u1", "_FillValue": 0, "chunksizes": (4, 1024, 1024)}})
Jeremy Auclair
committed