# -*- coding: UTF-8 -*- # Python """ 03-10-2022 modified 04-07-2023 @author: jeremy auclair Download S2 data pre-modspa """ import os # for path exploration import shutil # for file management from eodag import setup_logging # module that downloads S2 data from eodag import EODataAccessGateway # module that downloads S2 data import geopandas as gpd # to read shapefile from typing import List, Union # to declare variables from datetime import datetime # manage dates from dateutil.relativedelta import relativedelta # date math import csv # for loading and saving path results in csv format import zipfile as zp # to open zip archives from tqdm import tqdm # to print progress bars during code execution from fnmatch import fnmatch # for character string comparison from modspa_pixel.preprocessing.input_toolbox import read_product_info def download_S2_data(start_date: str, end_date: str, preferred_provider: str, save_path: str, shapefile: str, mode: str = 'pixel', cloud_cover_limit: int = 80) -> List[str]: """ download_S2_data uses the eodag module to look for all products of a given provider (copernicus or theia) during a specific time window and covering the whole shapefile enveloppe (several Sentinel-2 tiles might be needed, only one can be chosen for the pixel mode). It then downloads that data into the download path parametered in the config file. Paths to the downloaded data are returned and saved as a ``csv`` file. An extra month of data is downloaded for a better interpolation, it is then discarded and the final NDVI cube has the dates defined in the config file. Arguments ========= 1. start_date: ``str`` beginning of the time window to download (format: ``YYYY-MM-DD``) 2. end_date: ``str`` end of the time window to download (format: ``YYYY-MM-DD``) 3. preferred_provider: ``str`` chosen source of the Sentinel-2 data (``copernicus`` or ``theia``) 4. save_path: ``str`` path where a csv file containing the product paths will be saved 5. shapefile: ``str`` path to the shapefile (``.shp``) for which the data is downloaded 6. mode: ``str`` ``default = 'pixel'`` run download code in 'pixel' or 'parcel' mode 7. cloud_cover_limit: ``int`` ``default = 80`` maximum percentage to pass the filter before download (between 0 and 100) Returns ======= 1. product_paths: ``list[str]`` a list of the paths to the downloaded data """ setup_logging(2) # 3 for even more information dag = EODataAccessGateway() # Open shapefile containing geometry geopandas_shape = gpd.read_file(shapefile) geopandas_shape = geopandas_shape.to_crs(epsg = '4326') # Force WGS84 projection bounds = geopandas_shape.geometry.total_bounds # In WGS84 projection # Select product type based on preferred provider if preferred_provider == 'theia': product_type = 'S2_MSI_L2A_MAJA' dag.set_preferred_provider('theia') else: product_type = 'S2_MSI_L2A' dag.set_preferred_provider('scihub') # Change start and end date to better cover the chosen period new_start_date = (datetime.strptime(start_date, '%Y-%m-%d') - relativedelta(months=1)).strftime('%Y-%m-%d') new_end_date = (datetime.strptime(end_date, '%Y-%m-%d') + relativedelta(months=1)).strftime('%Y-%m-%d') # Create a search criteria to feed into the eodag search_all method search_criteria = { 'productType': product_type, 'start': new_start_date, 'end': new_end_date, 'geom': list(bounds) } # Try to search all products corresponding to the search criteria. If a type error occurs it # means there is an error in the search criteria parameters try: all_products = dag.search_all(**search_criteria) except TypeError: print('Something went wrong during the product search, check your inputs') return None # If the search_all method returns None, there is no product matching the search criteria if len(all_products) == 0: print('No products matching your search criteria were found') return None # Filter products that have more clouds than desired products_to_download = all_products.filter_property(cloudCover = cloud_cover_limit, operator = 'lt') # Choose only one tile if pixel mode if mode == 'pixel': tiles = [] for product in products_to_download: _, tile, _, _ = read_product_info(product.properties['title']) if tile not in tiles: tiles.append(tile) if len(tiles) > 1: tile_index = int(input(f'\nMultiple tiles cover your shapefile ({tiles}), which one do you want to choose ? Type in the index from 0 to {len(tiles) - 1}')) chosen_tile = tiles[tile_index] print(f'\nChosen tile: {chosen_tile}\n') for product in products_to_download: _, tile, _, _ = read_product_info(product.properties['title']) if not tile == chosen_tile: products_to_download.remove(product) # Download filtered products product_paths = dag.download_all(products_to_download, extract = False) # No archive extraction product_paths.sort() # Save list of paths as a csv file for later use with open(save_path, 'w', newline = '') as f: # using csv.writer method from CSV package write = csv.writer(f) for product in product_paths: write.writerow([product]) return product_paths def extract_zip_archives(download_path: str, list_paths: Union[List[str], str], preferred_provider: str, save_path: str, remove_archive: bool = False) -> List[str]: """ Extract specific bands in a zip archive for a list of tar archives. Arguments ========= 1. download_path: ``str`` path in which the archives will be extracted (usually where the archives are located) 2. list_paths: ``List[str]`` list of paths to the zip archives 3. bands_to_extract: ``List[str]`` list of strings that will be used to match specific bands. For example if you are looking for bands B3 and B4 in a given archive, `bands_to_extract = ['*_B3.TIF', '*_B4.TIF']`. This depends on the product architecture. 4. save_path: ``str`` path where a csv file containing the product paths will be saved 5. remove_archive: ``bool`` ``default = False`` boolean to choose whether to remove the archive or not Returns ======= 1. product_list: ``List[str]`` list of the paths to the extracted products """ # Load csv file if input is a path if type(list_paths) == str: with open(list_paths, 'r') as file: list_paths = [] csvreader = csv.reader(file, delimiter='\n') for row in csvreader: list_paths.append(row[0]) # Check provider if preferred_provider == 'copernicus': bands_to_extract = ['*_B04_10m.jp2', '*_B08_10m.jp2', '*_SCL_20m.jp2'] else: bands_to_extract = ['*_FRE_B4.tif', '*_FRE_B8.tif', '*_MG2_R1.tif'] # Final product list product_list = [] # Create progress bar print('') progress_bar = tqdm(total = len(list_paths)) for file_path in list_paths: # Change progress bar to print current file progress_bar.set_description_str(desc = f'Extracting {os.path.basename(file_path)}, total progress') # Get path in which to extract the archive extract_path = download_path + os.sep + os.path.basename(file_path)[:-4] # Extract desired bands from tar file with zp.ZipFile(file_path, mode = 'r') as myzip: file_list = (myzip.namelist()) for f in file_list: for band in bands_to_extract: if fnmatch(f, band): # Check if already extacted f_name = os.path.basename(f) if not os.path.exists(extract_path + os.sep + f_name): # Extract file myzip.extract(f, path = extract_path) # Move extracted file to the root of the directory shutil.move(extract_path + os.sep + f, extract_path + os.sep + f_name) product_list.append(extract_path) # Remove unecessary empty directories try: subfolder = [ f.path for f in os.scandir(extract_path) if f.is_dir()][0] shutil.rmtree(subfolder) except: pass if remove_archive: # Remove zip file os.remove(file_path) progress_bar.update(1) # Close progress bar progress_bar.set_description_str(desc = 'Done!') progress_bar.close() # Save list of paths as a csv file for later use with open(save_path, 'w', newline = '') as f: # using csv.writer method from CSV package write = csv.writer(f) for product in product_list: write.writerow([product]) return product_list