Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
"""
03-10-2022 modified 04-07-2023
@author: jeremy auclair
Download S2 data pre-modspa
"""
import os # for path exploration
import shutil # for file management
from eodag import setup_logging # module that downloads S2 data
from eodag import EODataAccessGateway # module that downloads S2 data
import geopandas as gpd # to read shapefile
Jeremy Auclair
committed
from typing import List, Union # to declare variables
from datetime import datetime # manage dates
from dateutil.relativedelta import relativedelta # date math
Jeremy Auclair
committed
import csv # for loading and saving path results in csv format
import zipfile as zp # to open zip archives
from tqdm import tqdm # to print progress bars during code execution
from fnmatch import fnmatch # for character string comparison
Jeremy Auclair
committed
from modspa_pixel.preprocessing.input_toolbox import read_product_info
Jeremy Auclair
committed
def download_S2_data(start_date: str, end_date: str, preferred_provider: str, save_path: str, shapefile: str, mode: str = 'pixel', cloud_cover_limit: int = 80) -> List[str]:
Jeremy Auclair
committed
"""
download_S2_data uses the eodag module to look for all products of a given provider
(copernicus or theia) during a specific time window and covering the whole shapefile
enveloppe (several Sentinel-2 tiles might be needed, only one can be chosen for the
pixel mode). It then downloads that data into the download path parametered in the
config file. Paths to the downloaded data are returned and saved as a ``csv`` file.
An extra month of data is downloaded for a better interpolation, it is then discarded
and the final NDVI cube has the dates defined in the config file.
Arguments
=========
1. start_date: ``str``
beginning of the time window to download (format: ``YYYY-MM-DD``)
2. end_date: ``str``
end of the time window to download (format: ``YYYY-MM-DD``)
3. preferred_provider: ``str``
chosen source of the Sentinel-2 data (``copernicus`` or ``theia``)
4. save_path: ``str``
Jeremy Auclair
committed
path where a csv file containing the product paths will be saved
5. shapefile: ``str``
path to the shapefile (``.shp``) for which the data is downloaded
6. mode: ``str`` ``default = 'pixel'``
run download code in 'pixel' or 'parcel' mode
7. cloud_cover_limit: ``int`` ``default = 80``
Jeremy Auclair
committed
maximum percentage to pass the filter before download (between 0 and 100)
Returns
=======
1. product_paths: ``list[str]``
Jeremy Auclair
committed
a list of the paths to the downloaded data
"""
setup_logging(2) # 3 for even more information
dag = EODataAccessGateway()
# Open shapefile containing geometry
geopandas_shape = gpd.read_file(shapefile)
geopandas_shape = geopandas_shape.to_crs(epsg = '4326') # Force WGS84 projection
bounds = geopandas_shape.geometry.total_bounds # In WGS84 projection
# Select product type based on preferred provider
if preferred_provider == 'theia':
product_type = 'S2_MSI_L2A_MAJA'
dag.set_preferred_provider('theia')
else:
product_type = 'S2_MSI_L2A'
dag.set_preferred_provider('scihub')
# Change start and end date to better cover the chosen period
new_start_date = (datetime.strptime(start_date, '%Y-%m-%d') - relativedelta(months=1)).strftime('%Y-%m-%d')
new_end_date = (datetime.strptime(end_date, '%Y-%m-%d') + relativedelta(months=1)).strftime('%Y-%m-%d')
Jeremy Auclair
committed
# Create a search criteria to feed into the eodag search_all method
search_criteria = {
'productType': product_type,
'start': new_start_date,
'end': new_end_date,
Jeremy Auclair
committed
'geom': list(bounds)
}
# Try to search all products corresponding to the search criteria. If a type error occurs it
# means there is an error in the search criteria parameters
try:
all_products = dag.search_all(**search_criteria)
except TypeError:
print('Something went wrong during the product search, check your inputs')
return None
# If the search_all method returns None, there is no product matching the search criteria
if len(all_products) == 0:
print('No products matching your search criteria were found')
return None
# Filter products that have more clouds than desired
products_to_download = all_products.filter_property(cloudCover = cloud_cover_limit, operator = 'lt')
Jeremy Auclair
committed
# Choose only one tile if pixel mode
if mode == 'pixel':
tiles = []
for product in products_to_download:
_, tile, _, _ = read_product_info(product.properties['title'])
if tile not in tiles:
tiles.append(tile)
if len(tiles) > 1:
tile_index = int(input(f'\nMultiple tiles cover your shapefile ({tiles}), which one do you want to choose ? Type in the index from 0 to {len(tiles) - 1}'))
chosen_tile = tiles[tile_index]
print(f'\nChosen tile: {chosen_tile}\n')
for product in products_to_download:
_, tile, _, _ = read_product_info(product.properties['title'])
if not tile == chosen_tile:
products_to_download.remove(product)
# Download filtered products
Jeremy Auclair
committed
product_paths = dag.download_all(products_to_download, extract = False) # No archive extraction
product_paths.sort()
# Save list of paths as a csv file for later use
with open(save_path, 'w', newline = '') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for product in product_paths:
write.writerow([product])
return product_paths
def extract_zip_archives(download_path: str, list_paths: Union[List[str], str], preferred_provider: str, save_path: str, remove_archive: bool = False) -> List[str]:
Jeremy Auclair
committed
"""
Extract specific bands in a zip archive for a list of tar archives.
Arguments
=========
1. download_path: ``str``
Jeremy Auclair
committed
path in which the archives will be extracted (usually where the archives are located)
2. list_paths: ``List[str]``
Jeremy Auclair
committed
list of paths to the zip archives
3. bands_to_extract: ``List[str]``
Jeremy Auclair
committed
list of strings that will be used to match specific bands. For example if you are looking
for bands B3 and B4 in a given archive, `bands_to_extract = ['*_B3.TIF', '*_B4.TIF']`. This
depends on the product architecture.
4. save_path: ``str``
Jeremy Auclair
committed
path where a csv file containing the product paths will be saved
5. remove_archive: ``bool`` ``default = False``
Jeremy Auclair
committed
boolean to choose whether to remove the archive or not
Returns
=======
1. product_list: ``List[str]``
Jeremy Auclair
committed
list of the paths to the extracted products
"""
Jeremy Auclair
committed
# Load csv file if input is a path
if type(list_paths) == str:
with open(list_paths, 'r') as file:
list_paths = []
csvreader = csv.reader(file, delimiter='\n')
for row in csvreader:
list_paths.append(row[0])
# Check provider
if preferred_provider == 'copernicus':
bands_to_extract = ['*_B04_10m.jp2', '*_B08_10m.jp2', '*_SCL_20m.jp2']
else:
bands_to_extract = ['*_FRE_B4.tif', '*_FRE_B8.tif', '*_MG2_R1.tif']
Jeremy Auclair
committed
# Final product list
product_list = []
Jeremy Auclair
committed
# Create progress bar
print('')
Jeremy Auclair
committed
progress_bar = tqdm(total = len(list_paths))
for file_path in list_paths:
# Change progress bar to print current file
Jeremy Auclair
committed
progress_bar.set_description_str(desc = f'Extracting {os.path.basename(file_path)}, total progress')
Jeremy Auclair
committed
# Get path in which to extract the archive
extract_path = download_path + os.sep + os.path.basename(file_path)[:-4]
# Extract desired bands from tar file
with zp.ZipFile(file_path, mode = 'r') as myzip:
file_list = (myzip.namelist())
for f in file_list:
Jeremy Auclair
committed
Jeremy Auclair
committed
for band in bands_to_extract:
if fnmatch(f, band):
Jeremy Auclair
committed
# Check if already extacted
f_name = os.path.basename(f)
if not os.path.exists(extract_path + os.sep + f_name):
Jeremy Auclair
committed
Jeremy Auclair
committed
# Extract file
myzip.extract(f, path = extract_path)
Jeremy Auclair
committed
Jeremy Auclair
committed
# Move extracted file to the root of the directory
shutil.move(extract_path + os.sep + f, extract_path + os.sep + f_name)
product_list.append(extract_path)
Jeremy Auclair
committed
# Remove unecessary empty directories
Jeremy Auclair
committed
try:
subfolder = [ f.path for f in os.scandir(extract_path) if f.is_dir()][0]
shutil.rmtree(subfolder)
except:
pass
Jeremy Auclair
committed
if remove_archive:
# Remove zip file
os.remove(file_path)
progress_bar.update(1)
Jeremy Auclair
committed
# Close progress bar
progress_bar.set_description_str(desc = 'Done!')
Jeremy Auclair
committed
progress_bar.close()
# Save list of paths as a csv file for later use
with open(save_path, 'w', newline = '') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for product in product_list:
write.writerow([product])
return product_list