diff --git a/sen2chain/download_eodag.py b/sen2chain/download_eodag.py index 8ca4375f5ac0b21b5868e036abc6974fbea144a2..f3286d12b3861ca8188d51b2d321600928535fa5 100644 --- a/sen2chain/download_eodag.py +++ b/sen2chain/download_eodag.py @@ -11,6 +11,7 @@ import shapefile import os from pathlib import Path from eodag import EODataAccessGateway +from eodag import setup_logging from .config import SHARED_DATA, Config @@ -40,11 +41,12 @@ class S2cEodag: with open(eodag_custom_location_cfg, "w") as f_yml: f_yml.write(locations_yaml_content.strip()) - + logging.disable(level=logging.WARNING) self.dag = EODataAccessGateway(locations_conf_path = eodag_custom_location_cfg) + logging.disable(logging.NOTSET) + self.dag.set_preferred_provider("peps") - - logger.info(self.dag.available_providers("S2_MSI_L1C")) + # logger.info(self.dag.available_providers("S2_MSI_L1C")) targeted_tile = [ sr @@ -71,27 +73,31 @@ class S2cEodag: default_search_criteria = dict( productType = self.productType, start = start, - end = end + end = end, + # items_per_page=500, ) + logging.disable(level=logging.WARNING) self.products = self.dag.search_all( locations = dict(s2_tile_centroid = self.name), **default_search_criteria ) + logging.disable(logging.NOTSET) + fitered = self.products + for p in fitered: + if self.name not in p.properties["title"]: + self.products.remove(p) + logger.info("removing wrong tiles from search items: {}".format(p.properties["title"])) + for p in self.products: if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): p.location = "file://" + str(outputs_prefix / (p.properties["title"] + ".SAFE")) - logger.info("Product {} - {} - locally present".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("{} - remote {} - local PRESENT".format(p.properties["title"], p.properties["storageStatus"])) else: - logger.info("Product {} - {} - locally absent".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("{} - remote {} - local ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("Search returned {} products".format(len(self.products))) ####### rajouter ici "- dont xx ABSENT - dont xx ONLINE / xx STAGING" - # ~ logger.info("{} products were found given the above search criteria".format(len(self.products))) - # ~ logger.info(self.products) - - # ~ logger.info("{} products were found given the above search criteria".format(len(self.products))) - # ~ logger.info("{} products were found given the above search criteria, online".format(len(self.products.filter_online()))) - def download( self, product_id, @@ -109,18 +115,45 @@ class S2cEodag: ########## faudrait aussi mettre une condition sur le online ########## et si offline supprimer les retry # ~ logger.info([p.properties["storageStatus"] for p in self.products]) - downloaded_path = self.dag.download( - product_id, - outputs_prefix = outputs_prefix, - extract = extract, - delete_archive = delete_archive, - ) + + setup_logging(verbose = 2) + + if product_id.properties["storageStatus"] == "ONLINE": + try: + # logging.disable(level=logging.WARNING) + downloaded_path = self.dag.download( + product_id, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + wait=1, + timeout=0, + ) + # logging.disable(logging.NOTSET) + + if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: + upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name + Path(downloaded_path).replace(upper_location) + product_id.location = "file://" + str(upper_location) + Path(downloaded_path).parent.rmdir() + logger.info("Moving up SAVE file") + except: + logger.info("Error: ONLINE product but cannot be downloaded, retry later") - if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: - upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name - Path(downloaded_path).replace(upper_location) - product_id.location = "file://" + str(upper_location) - Path(downloaded_path).parent.rmdir() - logger.info("Moving up SAVE file") + elif product_id.properties["storageStatus"] == "OFFLINE": + try: + setup_logging(verbose = 0) + downloaded_path = self.dag.download( + product_id, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + wait=1, + timeout=0, + ) + except: + logger.info("OFFLINE product, ordered, retry later") + elif product_id.properties["storageStatus"] == "STAGING": + logger.info("STAGING product, retry later") diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index 12f286c34702874867f76d50c616e8346875e266..c0e7552c8a8ef383e472cebb53271adca627fd41 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -17,6 +17,10 @@ from pathlib import Path from collections import namedtuple from datetime import datetime from pprint import pformat +# import multiprocessing +from queue import Queue +from threading import Thread +# import threading # type annotations from typing import List, Dict, Iterable @@ -36,6 +40,7 @@ from .multi_processing import ( cld_version_probability_iterations_reprocessing_multiprocessing, idx_multiprocessing, ) +from .download_eodag import S2cEodag logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -715,6 +720,57 @@ class Tile: "cloud_cover": self._products["l2a"][prod].cloud_cover, } return prodlist + + def get_l1c( + self, + download: bool = True, + dl_mode: str = "multit", + start: str = "2015-01-01", + end: str = "9999-12-31", + ): + dag = S2cEodag(self.name) + dag.search(start = start, end = end) + + ##### to do + ## choix entre sequential / multithreading / multiprocessing default multithreading + + if download: + + ## sequential + if dl_mode == "seq": + for p in dag.products: + dag.download(p) + + ## multithreading + elif dl_mode == "multit": + NUM_THREADS = 8 + q = Queue() + def do_stuff(q, dag): + while True: + dag.download(q.get()) + q.task_done() + for p in dag.products: + q.put(p) + for t in range(NUM_THREADS): + worker = Thread(target=do_stuff, args = (q, dag)) + worker.daemon = True + worker.start() + q.join() + + ## multiprocessing + elif dl_mode == "multip": + nb_proc = 8 + nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) + pool = multiprocessing.Pool(nb_proc) + results = [pool.map(dag.download, dag.products)] + pool.close() + pool.join() + + + # def get_l1c_new(): + # toto = 12 + + def compute_l2a( self,