diff --git a/sen2chain/download_eodag.py b/sen2chain/download_eodag.py index 2203117758785fbb14a576fa86f18c9a152fbda3..3589c009ff0484bd49d39b4c3dc5afe42a5852f5 100644 --- a/sen2chain/download_eodag.py +++ b/sen2chain/download_eodag.py @@ -21,8 +21,10 @@ from threading import Thread import multiprocessing from simplejson import JSONDecodeError from urllib3.exceptions import ( - ConnectTimeoutError, MaxRetryError, - NewConnectionError, SSLError, + ConnectTimeoutError, + MaxRetryError, + NewConnectionError, + SSLError, ReadTimeoutError, ) from requests.exceptions import ConnectTimeout @@ -36,14 +38,11 @@ logger = logging.getLogger(__name__) ROOT = Path(os.path.realpath(__file__)).parent.parent + class S2cEodag: - def __init__( - self, - name: str = "40KCB", - provider: str = "peps" - ): - + def __init__(self, name: str = "40KCB", provider: str = "peps"): + self.name = name self.provider = provider self.products = None @@ -53,19 +52,19 @@ class S2cEodag: logging.disable(logging.NOTSET) self.dag.set_preferred_provider(self.provider) - + def search( self, productType: str = "L1C", start: str = "2015-01-01", end: str = "9999-12-31", - ref = "l1c", + ref="l1c", min_cloudcover: int = 0, - max_cloudcover: int = 100, + max_cloudcover: int = 100, min_pb: str = 0, max_pb: str = 9999, ): - ######### ici faudrait virer le self.productType, qui ne doit pas être global pour le download... + ######### ici faudrait virer le self.productType, qui ne doit pas être global pour le download... if productType == "L1C": self.productType = "S2_MSI_L1C" outputs_prefix = Path(Config().get("l1c_path")) / self.name @@ -74,117 +73,128 @@ class S2cEodag: outputs_prefix = Path(Config().get("l2a_path")) / self.name default_search_criteria = dict( - productType = self.productType, - start = start, - end = end, - tileIdentifier = self.name, + productType=self.productType, + start=start, + end=end, + tileIdentifier=self.name, + ) + logger.info( + "Searching online products {}, provider {}, ref {}".format( + self.name, self.provider, ref + ) ) - logger.info("Searching online products {}, provider {}, ref {}".format(self.name, self.provider, ref)) logger.info("dates: {} - {}".format(start, end)) - logger.info("cloudcover: {} - {}".format(str(min_cloudcover), str(max_cloudcover))) + logger.info( + "cloudcover: {} - {}".format(str(min_cloudcover), str(max_cloudcover)) + ) logger.info("processing_baseline: {} - {}".format(min_pb, max_pb)) logging.disable(level=logging.WARNING) try: - self.products = self.dag.search_all( - **default_search_criteria - ) + self.products = self.dag.search_all(**default_search_criteria) logging.disable(logging.NOTSET) except (ConnectTimeout, MaxRetryError, TimeoutError, ConnectTimeoutError): logging.disable(logging.NOTSET) - logger.info("Issue with this tile search for now, should try to increase the timeout for this provider or check your internet connexion") - self.products = [] - pass + logger.exception( + "Issue with this tile search for now, should try to increase the timeout for this provider or check your internet connexion" + ) + return except Exception: logging.disable(logging.NOTSET) - logger.info("An error occured check everything") - self.products = [] - pass - + logger.exception("An error occured check everything") + return + if self.provider == "peps": for p in self.products: - p.properties["processingBaseline"] = float(p.properties["s2TakeId"][-5:]) - - self.products = self.products.crunch( - FilterProperty( - dict( - processingBaseline = min_pb, - operator = "ge" + p.properties["processingBaseline"] = float( + p.properties["s2TakeId"][-5:] ) - ) - ).crunch( - FilterProperty( - dict( - processingBaseline = max_pb, - operator = "le" - ) - ) - ) - + + self.products = self.products.crunch( + FilterProperty(dict(processingBaseline=min_pb, operator="ge")) + ).crunch(FilterProperty(dict(processingBaseline=max_pb, operator="le"))) + filtered = self.products[:] # logger.info([eop.properties["title"] for eop in filtered]) - ordered_products = sorted(filtered, key = lambda d: d.properties["title"][11:], reverse = True) + ordered_products = sorted( + filtered, key=lambda d: d.properties["title"][11:], reverse=True + ) # logger.info([eop.properties["title"] for eop in ordered_products]) - + for p in ordered_products: if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): l1c_presence = "PRESENT" else: l1c_presence = "ABSENT" - if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): + if ( + Path(Config().get("l2a_path")) + / self.name + / ( + p.properties["title"] + .replace("L1C_", "L2A_") + .replace("__OPER__", "_USER_") + + ".SAFE" + ) + ).exists(): l2a_presence = "PRESENT" else: l2a_presence = "ABSENT" - + ##### remonter la condition l1c - + if self.name not in p.properties["title"]: self.products.remove(p) - logger.info("{} - l1c {} - filtering (wrong Tile)".format(p.properties["title"], l1c_presence)) + logger.info( + "{} - l1c {} - filtering (wrong Tile)".format( + p.properties["title"], l1c_presence + ) + ) else: try: prop = p.properties["storageStatus"] except Exception: if p.properties["storage"]["mode"] == "tier2": p.properties["storageStatus"] = "ONLINE" - if not(min_cloudcover <= float(p.properties["cloudCover"] or 0) <= max_cloudcover): + if not ( + min_cloudcover + <= float(p.properties["cloudCover"] or 0) + <= max_cloudcover + ): self.products.remove(p) logger.info( "{} - l1c {} - filtering (CC = {}%)".format( - p.properties["title"], - l1c_presence, - int(p.properties["cloudCover"]) + p.properties["title"], + l1c_presence, + int(p.properties["cloudCover"]), ) ) else: if not ref: logger.info( "{} - * No ref set - remote {}".format( - p.properties["title"], - p.properties["storageStatus"] + p.properties["title"], p.properties["storageStatus"] ) ) - + elif ref == "l1c": if l1c_presence == "PRESENT": logger.info( "{} - l1c PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) else: logger.info( "{} - * l1c ABSENT - remote {}".format( - p.properties["title"], - p.properties["storageStatus"] + p.properties["title"], p.properties["storageStatus"] ) ) - + elif ref == "l2a": if l2a_presence == "PRESENT": logger.info( "{} - l2a PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -192,15 +202,15 @@ class S2cEodag: if l1c_presence == "PRESENT": logger.info( "{} - l1c PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) else: logger.info( "{} - * l2a ABSENT - l1c ABSENT - remote {}".format( - p.properties["title"], - p.properties["storageStatus"] + p.properties["title"], + p.properties["storageStatus"], ) ) elif ref == "cloudmasks": @@ -208,7 +218,7 @@ class S2cEodag: if fp.cloudmasks: logger.info( "{} - at least one cloudmask PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -216,7 +226,7 @@ class S2cEodag: if l2a_presence == "PRESENT": logger.info( "{} - l2a PRESENT - cloudmasks ABSENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -224,15 +234,15 @@ class S2cEodag: if l1c_presence == "PRESENT": logger.info( "{} - l1c PRESENT - cloudmasks ABSENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) else: logger.info( "{} - * l1c ABSENT, l2a ABSENT, cloudmasks ABSENT - remote {}".format( - p.properties["title"], - p.properties["storageStatus"] + p.properties["title"], + p.properties["storageStatus"], ) ) elif ref == "indices": @@ -240,7 +250,7 @@ class S2cEodag: if fp.indices: logger.info( "{} - at least one indice PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -248,7 +258,7 @@ class S2cEodag: if l2a_presence == "PRESENT": logger.info( "{} - l2a PRESENT - indices ABSENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -256,17 +266,17 @@ class S2cEodag: if l1c_presence == "PRESENT": logger.info( "{} - l1c PRESENT - indices ABSENT - filtering".format( - p.properties["title"], - l1c_presence, - p.properties["storageStatus"] + p.properties["title"], + l1c_presence, + p.properties["storageStatus"], ) ) self.products.remove(p) else: logger.info( "{} - * l1c ABSENT, l2a ABSENT, indices ABSENT - remote {}".format( - p.properties["title"], - p.properties["storageStatus"] + p.properties["title"], + p.properties["storageStatus"], ) ) else: @@ -279,7 +289,7 @@ class S2cEodag: if all(all_conditions): logger.info( "{} - all ref PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -287,7 +297,7 @@ class S2cEodag: if l2a_presence == "PRESENT": logger.info( "{} - some ref ABSENT - l2a PRESENT - filtering".format( - p.properties["title"], + p.properties["title"], ) ) self.products.remove(p) @@ -295,100 +305,104 @@ class S2cEodag: if l1c_presence == "PRESENT": logger.info( "{} - some ref ABSENT - l1c PRESENT - filtering".format( - p.properties["title"], - l1c_presence, - p.properties["storageStatus"] + p.properties["title"], + l1c_presence, + p.properties["storageStatus"], ) ) self.products.remove(p) else: logger.info( "{} - some ref ABSENT - l2a ABSENT - l1c ABSENT - downloading - remote {}".format( - p.properties["title"], - p.properties["storageStatus"] + p.properties["title"], + p.properties["storageStatus"], ) ) # logger.info(p.properties) - # clouds = self.products[:] # for p in clouds: - # if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover): - # self.products.remove(p) - # logger.info("{} - wrong cloud cover ({}%) - filtering".format(p.properties["title"], int(p.properties["cloudCover"]))) + # if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover): + # self.products.remove(p) + # logger.info("{} - wrong cloud cover ({}%) - filtering".format(p.properties["title"], int(p.properties["cloudCover"]))) # local = self.products[:] # for p in local: - # if ref == "l1c": - # if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): - # logger.info("{} - remote {} - local l1c PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) - # self.products.remove(p) - # else: - # logger.info("{} - remote {} - local l1c ABSENT".format(p.properties["title"], p.properties["storageStatus"])) - # elif ref == "l2a": - # if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): - # logger.info("{} - remote {} - local l2a PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) - # self.products.remove(p) - # else: - # logger.info("{} - remote {} - local l2a ABSENT".format(p.properties["title"], p.properties["storageStatus"])) - logger.info("Search returned {} products online, of which {} new product(s) to download".format(len(filtered), len(self.products))) ####### rajouter ici "- dont xx ABSENT - dont xx ONLINE / xx STAGING" - - + # if ref == "l1c": + # if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): + # logger.info("{} - remote {} - local l1c PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + # self.products.remove(p) + # else: + # logger.info("{} - remote {} - local l1c ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + # elif ref == "l2a": + # if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): + # logger.info("{} - remote {} - local l2a PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + # self.products.remove(p) + # else: + # logger.info("{} - remote {} - local l2a ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + logger.info( + "Search returned {} products online, of which {} new product(s) to download".format( + len(filtered), len(self.products) + ) + ) ####### rajouter ici "- dont xx ABSENT - dont xx ONLINE / xx STAGING" + def download( self, - product_id, + product_id, outputs_prefix: str = None, extract: bool = True, delete_archive: bool = True, remove_existing_zipfile: bool = True, ): if not outputs_prefix: - if "L1C" in product_id.properties['title']: + if "L1C" in product_id.properties["title"]: root_path = "l1c_path" - elif "L2A" in product_id.properties['title']: + elif "L2A" in product_id.properties["title"]: root_path = "l2a_path" outputs_prefix = str(Path(Config().get(root_path)) / self.name) - setup_logging(verbose = 2) + setup_logging(verbose=2) if remove_existing_zipfile: - if "L1C" in product_id.properties['title']: + if "L1C" in product_id.properties["title"]: zip_file = L1cProduct(product_id.properties["id"]).zip_path if zip_file: logger.info("Found existing zipfile {}".format(zip_file.name)) try: - L1cProduct(product_id.properties["id"]).remove(zipfile = True) + L1cProduct(product_id.properties["id"]).remove(zipfile=True) except Exception: logger.info("Issue with removing {}".format(zip_file)) pass - elif "L2A" in product_id.properties['title']: + elif "L2A" in product_id.properties["title"]: zip_file = L2aProduct(product_id.properties["id"]).zip_path if zip_file: logger.info("Found existing zipfile {}".format(zip_file.name)) try: - L2aProduct(product_id.properties["id"]).remove(zipfile = True) + L2aProduct(product_id.properties["id"]).remove(zipfile=True) except Exception: logger.info("Issue with removing {}".format(zip_file)) pass - + if product_id.properties["storageStatus"] == "ONLINE": # logging.disable(level=logging.WARNING) # try: downloaded_path = self.dag.download( - product_id, - outputs_prefix = outputs_prefix, - extract = extract, - delete_archive = delete_archive, + product_id, + outputs_prefix=outputs_prefix, + extract=extract, + delete_archive=delete_archive, wait=1, timeout=0, ) # except Exception: - # logger.info("Issue with dowloading {}, removing corrupted zipfile".format(product_id.properties["id"])) - # downloaded_path = None - # pass + # logger.info("Issue with dowloading {}, removing corrupted zipfile".format(product_id.properties["id"])) + # downloaded_path = None + # pass if downloaded_path: logger.info("dl_path {}".format(downloaded_path)) # logging.disable(logging.NOTSET) if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: - upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name + upper_location = ( + Path(downloaded_path).parent.parent / Path(downloaded_path).name + ) # upper_location.unlink(missing_ok=True) if upper_location.exists(): logger.info("Removing previous product") @@ -396,27 +410,27 @@ class S2cEodag: Path(downloaded_path).replace(upper_location) product_id.location = "file://" + str(upper_location) Path(downloaded_path).parent.rmdir() - downloaded_path=Path(upper_location) + downloaded_path = Path(upper_location) logger.info("Moving up SAVE file") # modification de l'arborescence si il y a un sous dossier en trop (provider onda par ex) - # for file in os.listdir(downloaded_path): - # shutil.move( - # downloaded_path + '/' + file, - # Path(downloaded_path).parent, - # ) - # shutil.rmtree(downloaded_path ,ignore_errors = True) - # logger.info("Moving up files and deleting old folder") - # downloaded_path=Path(downloaded_path).parent + # for file in os.listdir(downloaded_path): + # shutil.move( + # downloaded_path + '/' + file, + # Path(downloaded_path).parent, + # ) + # shutil.rmtree(downloaded_path ,ignore_errors = True) + # logger.info("Moving up files and deleting old folder") + # downloaded_path=Path(downloaded_path).parent set_permissions(Path(downloaded_path)) elif product_id.properties["storageStatus"] == "OFFLINE": try: - setup_logging(verbose = 0) + setup_logging(verbose=0) downloaded_path = self.dag.download( - product_id, - outputs_prefix = outputs_prefix, - extract = extract, - delete_archive = delete_archive, + product_id, + outputs_prefix=outputs_prefix, + extract=extract, + delete_archive=delete_archive, wait=1, timeout=0, ) @@ -428,36 +442,41 @@ class S2cEodag: def order_offline( self, - product_id, + product_id, ): - setup_logging(verbose = 0) + setup_logging(verbose=0) if product_id.properties["storageStatus"] == "OFFLINE": try: downloaded_path = self.dag.download( - product_id, + product_id, wait=1, timeout=0, ) except Exception: - logger.info("{} - remote OFFLINE, ordered".format(product_id.properties["title"])) - setup_logging(verbose = 2) + logger.info( + "{} - remote OFFLINE, ordered".format( + product_id.properties["title"] + ) + ) + setup_logging(verbose=2) + def S2cEodag_download_uniq( - product, + product, outputs_prefix: str = None, extract: bool = True, delete_archive: bool = True, ): - tile_name = get_tile(product.properties['title']) + tile_name = get_tile(product.properties["title"]) if not outputs_prefix: - if "L1C" in product.properties['title']: + if "L1C" in product.properties["title"]: root_path = "l1c_path" - elif "L2A" in product.properties['title']: + elif "L2A" in product.properties["title"]: root_path = "l2a_path" outputs_prefix = str(Path(Config().get(root_path)) / tile_name) - - setup_logging(verbose = 2) - + + setup_logging(verbose=2) + logging.disable(level=logging.WARNING) dag = EODataAccessGateway() logging.disable(logging.NOTSET) @@ -465,38 +484,44 @@ def S2cEodag_download_uniq( if product.properties["storageStatus"] == "ONLINE": try: # logging.disable(level=logging.WARNING) - setup_logging(verbose = 2) + setup_logging(verbose=2) downloaded_path = dag.download( - product, - outputs_prefix = outputs_prefix, - extract = extract, - delete_archive = delete_archive, + product, + outputs_prefix=outputs_prefix, + extract=extract, + delete_archive=delete_archive, wait=1, timeout=0, ) # logging.disable(logging.NOTSET) if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: - upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name + upper_location = ( + Path(downloaded_path).parent.parent / Path(downloaded_path).name + ) Path(downloaded_path).replace(upper_location) product.location = "file://" + str(upper_location) Path(downloaded_path).parent.rmdir() for file in os.listdir(downloaded_path): - shutil.move(downloaded_path + '/' + file, Path(downloaded_path).parent) - shutil.rmtree(downloaded_path ,ignore_errors = True) + shutil.move( + downloaded_path + "/" + file, Path(downloaded_path).parent + ) + shutil.rmtree(downloaded_path, ignore_errors=True) logger.info("Moving up files and deleting old folder") - downloaded_path=Path(downloaded_path).parent + downloaded_path = Path(downloaded_path).parent logger.info("Moving up SAVE file") # modification de l'arborescence si il y a un sous dossier en trop (provider onda par ex) try: - if os.listdir(downloaded_path)[0] == str(Path(downloaded_path).name): + if os.listdir(downloaded_path)[0] == str( + Path(downloaded_path).name + ): shutil.copytree( - downloaded_path + '/' + os.listdir(downloaded_path)[0], + downloaded_path + "/" + os.listdir(downloaded_path)[0], downloaded_path, dirs_exist_ok=True, ) shutil.rmtree( - downloaded_path + '/' + os.listdir(downloaded_path)[0], - ignore_errors = True, + downloaded_path + "/" + os.listdir(downloaded_path)[0], + ignore_errors=True, ) logger.info("Moving up files and deleting old folder") except Exception: @@ -504,67 +529,72 @@ def S2cEodag_download_uniq( pass set_permissions(Path(downloaded_path)) except Exception: - logger.info("Error: Something went wrong with download process, retry later") + logger.info( + "Error: Something went wrong with download process, retry later" + ) elif product.properties["storageStatus"] == "OFFLINE": try: - setup_logging(verbose = 0) + setup_logging(verbose=0) downloaded_path = dag.download( - product, - outputs_prefix = outputs_prefix, - extract = extract, - delete_archive = delete_archive, + product, + outputs_prefix=outputs_prefix, + extract=extract, + delete_archive=delete_archive, wait=1, timeout=0, ) - setup_logging(verbose = 2) + setup_logging(verbose=2) except Exception: logger.info("OFFLINE product, ordered, retry later") elif product.properties["storageStatus"] == "STAGING": - logger.info("STAGING product, retry later") - - + logger.info("STAGING product, retry later") + + def S2cEodag_download( download_list: list = None, dl_mode: str = "multit", outputs_prefix: str = None, extract: bool = True, delete_archive: bool = True, - nb_threads: int = 8 + nb_threads: int = 8, ): - + ## sequential - if dl_mode == "seq": + if dl_mode == "seq": for product in download_list: S2cEodag_download_uniq( product, - outputs_prefix = outputs_prefix, - extract = extract, - delete_archive = delete_archive, - ) - + outputs_prefix=outputs_prefix, + extract=extract, + delete_archive=delete_archive, + ) + ## multithreading elif dl_mode == "multit": q = Queue() + def do_stuff(q): - while True: - try: - S2cEodag_download_uniq(q.get()) - except Exception: - import traceback - logger.error(traceback.format_exc()) - #TODO should cleanup the tile - finally: - q.task_done() + while True: + try: + S2cEodag_download_uniq(q.get()) + except Exception: + import traceback + + logger.error(traceback.format_exc()) + # TODO should cleanup the tile + finally: + q.task_done() + for product in download_list: q.put(product) for t in range(nb_threads): - worker = Thread(target = do_stuff, args = (q,)) + worker = Thread(target=do_stuff, args=(q,)) worker.daemon = True worker.start() q.join() - + ## multiprocessing elif dl_mode == "multip": nb_proc = 8 @@ -573,6 +603,3 @@ def S2cEodag_download( results = [pool.map(S2cEodag_download_uniq, download_list)] pool.close() pool.join() - - -