diff --git a/sen2chain/__init__.py b/sen2chain/__init__.py index ddb76c4f6ddc822fbb2a455ad1532c7b07700c18..4679ab400f3dacdcd16f265358505e0fa5d6145a 100644 --- a/sen2chain/__init__.py +++ b/sen2chain/__init__.py @@ -27,6 +27,7 @@ from .products import ( OldCloudMaskProduct, NewCloudMaskProduct, IndiceProduct, + FamilyProduct, ) from .library import Library from .data_request import DataRequest @@ -46,6 +47,7 @@ from .utils import ( get_Sen2Cor_version, get_latest_s2c_version_path, set_permissions, + get_cm_dict, get_cm_string_from_dict, get_indice_from_identifier ) from .geo_utils import ( serialise_tiles_index, diff --git a/sen2chain/download_eodag.py b/sen2chain/download_eodag.py index 11d474a1cf199454caf7ce0ea62a6198b60eb748..2f28416fb34da077ac7c0fa1d0fb032b42f359e3 100644 --- a/sen2chain/download_eodag.py +++ b/sen2chain/download_eodag.py @@ -21,6 +21,7 @@ import multiprocessing from .config import SHARED_DATA, Config from .utils import get_tile +from .products import FamilyProduct logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -73,7 +74,7 @@ class S2cEodag: productType: str = "L1C", start: str = "2015-01-01", end: str = "9999-12-31", - ref: str = "l1c", + ref = "l1c", min_cloudcover: int = 0, max_cloudcover: int = 100, ): @@ -99,6 +100,12 @@ class S2cEodag: ) logging.disable(logging.NOTSET) + # if not isinstance(ref, list): + # ref_string = ref + # else: + # ref_string = "specific cloudmask(s) and / or indice(s) product(s)" + logger.info("Searching online products, ref: {}".format(ref)) + fitered = self.products[:] for p in fitered: if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): @@ -123,37 +130,159 @@ class S2cEodag: p.properties["storageStatus"] = "ONLINE" if not(min_cloudcover <= float(p.properties["cloudCover"] or 0) <= max_cloudcover): self.products.remove(p) - logger.info("{} - local l1c {} - filtering (CC = {}%)".format(p.properties["title"], l1c_presence, int(p.properties["cloudCover"]))) + logger.info( + "{} - local l1c {} - filtering (CC = {}%)".format( + p.properties["title"], + l1c_presence, + int(p.properties["cloudCover"]) + ) + ) else: if ref == "l1c": - if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): - logger.info("{} - local l1c {} - filtering".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) + if l1c_presence == "PRESENT": + logger.info( + "{} - l1c PRESENT - filtering".format( + p.properties["title"], + ) + ) self.products.remove(p) else: - logger.info("{} - local l1c {} - remote {}".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) + logger.info( + "{} - l1c ABSENT - remote {}".format( + p.properties["title"], + p.properties["storageStatus"] + ) + ) elif ref == "l2a": - if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): - logger.info("{} - local l1c {} - local l2a {} - filtering".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"])) + if l2a_presence == "PRESENT": + logger.info( + "{} - l2a PRESENT - filtering".format( + p.properties["title"], + ) + ) self.products.remove(p) else: - if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): - logger.info("{} - local l1c {} - filtering".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) + if l1c_presence == "PRESENT": + logger.info( + "{} - l1c PRESENT - filtering".format( + p.properties["title"], + ) + ) self.products.remove(p) else: - logger.info("{} - local l1c {} - local l2a {} - remote {}".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"])) - - elif ref == "indice": - toto = 12 - l2a_identifier = p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") - l2aProduct(l2a_identifier) - - - elif isinstance(ref, list): - toto = 24 - # toto = 13 - - + logger.info( + "{} - l2a ABSENT - l1c ABSENT - downloading - remote {}".format( + p.properties["title"], + p.properties["storageStatus"] + ) + ) + elif ref == "cloudmasks": + fp = FamilyProduct(p.properties["title"]) + if fp.cloudmasks: + logger.info( + "{} - at least one cloudmask PRESENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + if l2a_presence == "PRESENT": + logger.info( + "{} - l2a PRESENT - cloudmasks ABSENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + if l1c_presence == "PRESENT": + logger.info( + "{} - l1c PRESENT - cloudmasks ABSENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + logger.info( + "{} - l1c ABSENT, l2a ABSENT, cloudmasks ABSENT - remote {}".format( + p.properties["title"], + p.properties["storageStatus"] + ) + ) + elif ref == "indices": + fp = FamilyProduct(p.properties["title"]) + if fp.indices: + logger.info( + "{} - at least one indice PRESENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + if l2a_presence == "PRESENT": + logger.info( + "{} - l2a PRESENT - indices ABSENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + if l1c_presence == "PRESENT": + logger.info( + "{} - l1c PRESENT - indices ABSENT - filtering".format( + p.properties["title"], + l1c_presence, + p.properties["storageStatus"] + ) + ) + self.products.remove(p) + else: + logger.info( + "{} - l1c ABSENT, l2a ABSENT, indices ABSENT - remote {}".format( + p.properties["title"], + p.properties["storageStatus"] + ) + ) + else: + if not isinstance(ref, list): + ref = [ref] + fp = FamilyProduct(p.properties["title"]) + all_conditions = [] + for r in ref: + all_conditions.append(r in fp.cm_ind_string_list) + if all(all_conditions): + logger.info( + "{} - all ref PRESENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + if l2a_presence == "PRESENT": + logger.info( + "{} - some ref ABSENT - l2a PRESENT - filtering".format( + p.properties["title"], + ) + ) + self.products.remove(p) + else: + if l1c_presence == "PRESENT": + logger.info( + "{} - some ref ABSENT - l1c PRESENT - filtering".format( + p.properties["title"], + l1c_presence, + p.properties["storageStatus"] + ) + ) + self.products.remove(p) + else: + logger.info( + "{} - some ref ABSENT - l2a ABSENT - l1c ABSENT - downloading - remote {}".format( + p.properties["title"], + p.properties["storageStatus"] + ) + ) + # clouds = self.products[:] # for p in clouds: # if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover): diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index 17e3a8b339e8ae0b17d1753ab87232c0a7b9c45f..439cd537d023e12ebfc9589a90b9544590663fda 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -624,6 +624,20 @@ class Job: download_list= [] for index, row in tasks.iterrows(): if bool(setuptools.distutils.util.strtobool(str(row.l1c))): + if not bool(setuptools.distutils.util.strtobool(str(row.remove))): + if "l1c" in str(row.remove).lower(): + if "l2a" not in str(row.remove).lower(): + ref = "l2a" + else: + ref = [] + for cm in row.cloudmasks: + if not (row.cloudmasks == "False" or not row.cloudmasks): + ref.append(cm) + for ind in row.indices: + if not (row.indices == "False" or not row.indices): + ref.extend([ind, ind + "_" + cm]) + ref = list(set(ref)) + t = Tile(row.tile) tile_download_list = t.get_l1c( provider = self.provider, @@ -631,6 +645,7 @@ class Job: start = row.date_min, end = row.date_max, new = False, + ref = ref, max_cloudcover = row.max_clouds, ) download_list.extend(tile_download_list) @@ -750,16 +765,13 @@ class Job: # Remove downloaded L1C for index, row in tasks.iterrows(): if "l1c" in str(row.remove).lower(): + logger.info("Removing downloaded l1c products for tile: {}".format(row.tile)) + if self.logs: + f.write("{}\nRemoving downloaded l1c products for tile: {}\n\n".format(datetime.datetime.now(), row.tile)) + f.flush() t = Tile(row.tile) prodlist = [p for p in l1c_process_list if row.tile in p] t.remove_l1c(prodlist) - logger.info("Removed {} downloaded l1c products: {}".format(len(prodlist), prodlist)) - if self.logs: - f.write("{}\nRemoved {} downloaded l1c products:\n".format(datetime.datetime.now(), len(prodlist))) - for l1c in prodlist: - f.write("{}\n".format(l1c)) - f.write("\n") - f.flush() # Comuting cloudmasks (from L2A) logger.info("Computing cloudmasks") @@ -952,15 +964,26 @@ class Job: if "l2a" in str(row.remove).lower(): t = Tile(row.tile) prodlist = [p for p in l2a_remove_list if row.tile in p] - t.remove_l2a(prodlist) - logger.info("Removed {} produced l2a product(s): {}".format(len(prodlist), prodlist)) + logger.info( + "Removing {} produced l2a for tile {}".format( + row.tile, + len(prodlist), + ) + ) if self.logs: - f.write("{}\nRemoved {} produced l2a products:\n".format(datetime.datetime.now(), len(prodlist))) + f.write( + "{}\nRemoving {} produced l2a for tile {}\n".format( + datetime.datetime.now(), + len(prodlist), + row.tile, + ) + ) for l2a in prodlist: f.write("{}\n".format(l2a)) f.write("\n") - f.flush() - + f.flush() + t.remove_l2a(prodlist) + # Cleaning after if clean_after: logger.info("Cleaning Tiles") diff --git a/sen2chain/products.py b/sen2chain/products.py index 22411e0967918f30569caac7b6e61ccc3412e076..98e28bbf20439aac6c8045e4a0383b3bdbc8f275 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -20,7 +20,15 @@ from typing import ( Union ) from packaging import version -from .utils import grouper, set_permissions, get_Sen2Cor_version, get_latest_s2c_version_path +from .utils import ( + grouper, + set_permissions, + get_Sen2Cor_version, + get_latest_s2c_version_path, + get_cm_dict, + get_cm_string_from_dict, + get_indice_from_identifier +) from .config import Config, SHARED_DATA from .xmlparser import MetadataParser, Sen2ChainMetadataParser from .sen2cor import process_sen2cor @@ -1733,3 +1741,117 @@ class IndiceProduct: ) else: Sen2ChainMetadataParser(self._info_path).init_metadata() + +class FamilyProduct(dict): + """Family product class + :param identifier: cloudmask filename. + """ + + _l1c_library_path = Path(Config().get("l1c_path")) + _l2a_library_path = Path(Config().get("l2a_path")) + _cloudmask_library_path = Path(Config().get("cloudmasks_path")) + _indice_library_path = Path(Config().get("indices_path")) + + def __init__( + self, + identifier: str = None, + ): + if not (identifier): + raise ValueError( + "Identifier cannot be empty" + ) + else: + fid_tile = self.get_family_id_tile(identifier) + if fid_tile: + self.update(fid_tile) + self.family_id = fid_tile["family_id"] + self.tile = fid_tile["tile"] + self.l1c_id = self.get_l1c() + self.l2a_id = self.get_l2a() + self.cloudmasks = self.get_cloudmasks() + self.indices = self.get_indices() + self.cm_ind_string_list = self.get_cm_ind_string_list() + else: + logger.info("Invalid identifier {}".format(identifier)) + for key, val in self.__dict__.items(): + self[key] = val + + def get_family_id_tile(self, identifier): + try: + pat = re.compile( + r".*(?P<family_id>" + + "[0-9]{8}T[0-9]{6}" + + "_N[0-9]{4}_R[0-9]{3}" + + "_T(?P<tile>[0-9]{2}[A-Z]{3})" + + "_[0-9]{8}T[0-9]{6}" + + ").*" + ) + return pat.match(identifier).groupdict() + except: + pass + + def get_l1c(self): + library_path = self._l1c_library_path / self.tile + paths = list(library_path.glob("*L1C*" + self.family_id + "*")) + if len(paths) > 0: + return paths[0].stem + else: + return None + + def get_l2a(self): + library_path = self._l2a_library_path / self.tile + paths = list(library_path.glob("*L2A*" + self.family_id + "*")) + if len(paths) > 0: + return paths[0].stem + else: + return None + + def get_cloudmasks(self): + cloudmasks = [] + try: + library_path = self._cloudmask_library_path / self.tile + paths = list(library_path.glob("*L2A*" + self.family_id + "/*L2A*" + self.family_id + "_CM*.jp2")) + for item in paths: + cm_dict = get_cm_dict(item.stem) + cm_string = get_cm_string_from_dict(cm_dict) + cm_dict["cm_string"] = cm_string + cloudmasks.append(cm_dict) + except: + pass + return cloudmasks + + def get_indices(self): + indices = [] + try: + library_path = self._indice_library_path + paths = list( + library_path.glob( + "*/" + + self.tile + "/" + + "*L2A*" + self.family_id + "/" + + "*.jp2" + ) + ) + for item in paths: + # logger.info(item) + indice_dict = {} + indice_dict["indice"] = get_indice_from_identifier(item.stem) + cm_dict = get_cm_dict(item.stem) + if cm_dict: + indice_dict.update(cm_dict) + indice_dict["cm_string"] = get_cm_string_from_dict(cm_dict) + indice_dict["indice_string"] = indice_dict["indice"] + "_" + indice_dict["cm_string"] + else: + indice_dict["indice_string"] = indice_dict["indice"] + indices.append(indice_dict) + except: + pass + return indices + + def get_cm_ind_string_list(self): + cm_ind_string_list = [] + if self.cloudmasks: + cm_ind_string_list.extend([cm["cm_string"] for cm in self.cloudmasks]) + if self.indices: + cm_ind_string_list.extend([ind["indice_string"] for ind in self.indices]) + return cm_ind_string_list diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index 37889193b9da11decf9dd430cad11d4ece7fac6c..fea7c4c03e288b4cc9e6cf04795d2b0dbcc5602e 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -827,7 +827,7 @@ class Tile: start: str = "2015-01-01", end: str = "9999-12-31", new: bool = False, - ref: str = "l1c", + ref = "l1c", min_cloudcover: int = 0, max_cloudcover: int = 100, order_offline: bool = False, @@ -835,7 +835,7 @@ class Tile: sleep: int = 0, ): """ - function to download L1C products using EODAG + Function to download l1c products using EODAG """ if provider == "scihub": NUM_THREADS = 4 diff --git a/sen2chain/utils.py b/sen2chain/utils.py index 862551965a578b826b618a19e8c927cb7f3c314f..1d8c22c883990f303c9983a1045dbebdc6399bf5 100644 --- a/sen2chain/utils.py +++ b/sen2chain/utils.py @@ -200,3 +200,100 @@ def get_latest_s2c_version_path(l1c_identifier): else: return current_path +def get_cm_dict(identifier) -> dict: + """Returns cloudmask version from an identifier string. + :param string: string from which to extract the version name. + can be a cloudmask or an indice identifier + """ + returned_val = None + try: + pat = re.compile(r".*(?P<cm_version>CM00[1-2])") + returned_val = pat.match(identifier).groupdict() + + except: + try: + pat = re.compile( + r".*(?P<cm_version>CM003)" + + "-PRB(?P<probability>.*)" + + "-ITER(?P<iterations>.*)" + ) + returned_val = pat.match(identifier).groupdict() + + except: + try: + pat = re.compile( + r".*(?P<cm_version>CM004)" + + "-CSH(?P<cld_shad>.*)" + + "-CMP(?P<cld_med_prob>.*)" + + "-CHP(?P<cld_hi_prob>.*)" + + "-TCI(?P<thin_cir>.*)" + + "-ITER(?P<iterations>.*)" + ) + returned_val = pat.match(identifier).groupdict() + except: + pass + + if returned_val: + if "probability" not in returned_val: + returned_val["probability"] = 1 + if "iterations" not in returned_val: + returned_val["iterations"] = 5 + if "cld_shad" not in returned_val: + returned_val["cld_shad"] = True + if "cld_med_prob" not in returned_val: + returned_val["cld_med_prob"] = True + if "cld_hi_prob" not in returned_val: + returned_val["cld_hi_prob"] = True + if "thin_cir" not in returned_val: + returned_val["thin_cir"] = True + + returned_val["probability"] = int(returned_val["probability"]) + returned_val["iterations"] = int(returned_val["iterations"]) + returned_val["cld_shad"] = returned_val["cld_shad"] in ["1", 1, True, "True"] + returned_val["cld_med_prob"] = returned_val["cld_med_prob"] in ["1", 1, True, "True"] + returned_val["cld_hi_prob"] = returned_val["cld_hi_prob"] in ["1", 1, True, "True"] + returned_val["thin_cir"] = returned_val["thin_cir"] in ["1", 1, True, "True"] + + return returned_val + +def get_cm_string_from_dict(cm_dict) -> str: + if cm_dict: + if cm_dict["cm_version"] == "CM001": + cm_string = "CM001" + elif cm_dict["cm_version"] == "CM002": + cm_string = "CM002-B11" + elif cm_dict["cm_version"] == "CM003": + cm_string = ( + "CM003" + + "-PRB" + str(cm_dict["probability"]) + + "-ITER" + str(cm_dict["iterations"]) + ) + elif cm_dict["cm_version"] == "CM004": + cm_string = ( + "CM004" + + "-CSH" + str(1 * cm_dict["cld_shad"]) + + "-CMP" + str(1 * cm_dict["cld_med_prob"]) + + "-CHP" + str(1 * cm_dict["cld_hi_prob"]) + + "-TCI" + str(1 * cm_dict["thin_cir"]) + + "-ITER" + str(cm_dict["iterations"]) + ) + else: + cm_string = None + else: + cm_string = None + return cm_string + +def get_indice_from_identifier(identifier) -> str: + indice = (identifier.replace(".", "_").split("_")[7]).upper() + return indice + +# def get_cloudmask_indice_dict_from_strings( + # cm_string: str = None, + # indice_string: str = None, +# ) -> dict: + # """Returns a cloudmas or indice dict for FamilyProduct class + # :param string: string from which to extract the version name. + # can be a cloudmask or an indice identifier + # """ + # returned_val = None +