Skip to content
Snippets Groups Projects
download_S2.py 6.64 KiB
Newer Older
# -*- coding: UTF-8 -*-
# Python
"""
03-10-2022 modified 04-07-2023
@author: jeremy auclair

Download S2 data pre-modspa
"""

import os  # for path exploration
import shutil  # for file management
from eodag import setup_logging  # module that downloads S2 data
from eodag import EODataAccessGateway  # module that downloads S2 data
import geopandas as gpd  # to read shapefile
from typing import List  # to declare variables
import csv  # for loading and saving path results in csv format
import zipfile as zp  # to open zip archives
from tqdm import tqdm  # to print progress bars during code execution
from fnmatch import fnmatch  # for character string comparison


def download_S2_data(start_date: str, end_date: str, preferred_provider: str, save_path: str, shapefile: str = None, cloud_cover_limit: int = 80) -> List[str]:
    """
    download_S2_data uses the eodag module to look for all products of a given provider
    (copernicus or theia) during a specific time window and covering the whole shapefile
    enveloppe (several Sentinel-2 tiles might be needed). It then downloads that data into
    the download path parametered in the config file. Paths to the downloaded data are
    returned and saved as a `csv` file.

    ## Arguments
    1. start_date: `str`
        beginning of the time window to download (format: `yyyy-mm-dd`)
    2. end_date: `str`
        end of the time window to download (format: `yyyy-mm-dd`)
    3. preferred_provider: `str`
        chosen source of the Sentinel-2 data (`copernicus` or `theia`)
    4. save_path: `str`
        path where a csv file containing the product paths will be saved
    5. shapefile: `str`
        path to the shapefile (`.shp`) for which the data is downloaded
    6. cloud_cover_limit: `int` `default = 80`
        maximum percentage to pass the filter before download (between 0 and 100)

    ## Returns
    1. product_paths: `list[str]`
        a list of the paths to the downloaded data
    """

    setup_logging(2)  # 3 for even more information
    dag = EODataAccessGateway()

    # Open shapefile containing geometry
    geopandas_shape = gpd.read_file(shapefile)
    geopandas_shape = geopandas_shape.to_crs(epsg = '4326')  # Force WGS84 projection
    bounds = geopandas_shape.geometry.total_bounds  # In WGS84 projection

    # Select product type based on preferred provider
    if preferred_provider == 'theia':
        product_type = 'S2_MSI_L2A_MAJA'
        dag.set_preferred_provider('theia')
    else:
        product_type = 'S2_MSI_L2A'
        dag.set_preferred_provider('scihub')

    # Create a search criteria to feed into the eodag search_all method
    search_criteria = {
        'productType': product_type,
        'start': start_date,
        'end': end_date,
        'geom': list(bounds)
    }

    # Try to search all products corresponding to the search criteria. If a type error occurs it
    # means there is an error in the search criteria parameters
    try:
        all_products = dag.search_all(**search_criteria)
    except TypeError:
        print('Something went wrong during the product search, check your inputs')
        return None

    # If the search_all method returns None, there is no product matching the search criteria
    if len(all_products) == 0:
        print('No products matching your search criteria were found')
        return None

    # Filter products that have more clouds than desired
    products_to_download = all_products.filter_property(cloudCover = cloud_cover_limit, operator = 'lt')
    product_paths = dag.download_all(products_to_download, extract = False)  # No archive extraction
    product_paths.sort()

    # Save list of paths as a csv file for later use
    with open(save_path, 'w', newline = '') as f:
        # using csv.writer method from CSV package
        write = csv.writer(f)

        for product in product_paths:
            write.writerow([product])

    return product_paths


def extract_zip_archives(download_path: str, list_paths: List[str], bands_to_extract: List[str], save_path: str, remove_archive: bool = False) -> List[str]:
    """
    Extract specific bands in a zip archive for a list of tar archives.

    ## Arguments
    1. download_path: `str`
        path in which the archives will be extracted (usually where the archives are located)
    2. list_paths: `List[str]`
        list of paths to the zip archives
    3. bands_to_extract: `List[str]`
        list of strings that will be used to match specific bands. For example if you are looking
        for bands B3 and B4 in a given archive, `bands_to_extract = ['*_B3.TIF', '*_B4.TIF']`. This
        depends on the product architecture.
    4. save_path: `str`
        path where a csv file containing the product paths will be saved
    5. remove_archive: `bool` `default = False`
        boolean to choose whether to remove the archive or not

    ## Returns
    1. product_list: `List[str]`
        list of the paths to the extracted products
    """
    
    # Final product list
    product_list = []
    
    progress_bar = tqdm(total = len(list_paths))
    
    for file_path in list_paths:
        
        # Change progress bar to print current file
Jeremy Auclair's avatar
Jeremy Auclair committed
        progress_bar.set_description_str(desc = '\rExtracting ' + os.path.basename(file_path) + '\ntotal progress')
        
        # Get path in which to extract the archive
        extract_path = download_path + os.sep + os.path.basename(file_path)[:-4]
        
        # Extract desired bands from tar file
        with zp.ZipFile(file_path, mode = 'r') as myzip:
            file_list = (myzip.namelist())
            for f in file_list:
                for band in bands_to_extract:
                    if fnmatch(f, band):
                        
                        # Extract file
                        myzip.extract(f, path = extract_path)
                        
                        # Move extracted file to the root of the directory
                        f_name = os.path.basename(f)
                        shutil.move(extract_path + os.sep + f, extract_path + os.sep + f_name)
                        product_list.append(extract_path + os.sep + f_name)
        
        # Remove unecessary empty directories
        subfolder = [ f.path for f in os.scandir(extract_path) if f.is_dir()][0]
        shutil.rmtree(subfolder)
        
        if remove_archive:
            # Remove zip file
            os.remove(file_path)
        progress_bar.update(1)
    progress_bar.close()
    
    # Save list of paths as a csv file for later use
    with open(save_path, 'w', newline = '') as f:
        # using csv.writer method from CSV package
        write = csv.writer(f)

        for product in product_list:
            write.writerow([product])
    
    return product_list