From 87d11e5c71edd1a32d1d3180de010ee93aae7832 Mon Sep 17 00:00:00 2001 From: pmouquet <pascal.mouquet@ird.fr> Date: Wed, 22 Mar 2023 15:17:03 +0400 Subject: [PATCH] New FamilyProduct class to get information about l1c, l2a, cloudmask and indice products in database from an identifier string --- sen2chain/__init__.py | 2 + sen2chain/products.py | 107 +++++++++++++++++++++++++++++++++++++++++- sen2chain/utils.py | 86 +++++++++++++++++++++++++++++++++ 3 files changed, 194 insertions(+), 1 deletion(-) diff --git a/sen2chain/__init__.py b/sen2chain/__init__.py index ddb76c4..4679ab4 100644 --- a/sen2chain/__init__.py +++ b/sen2chain/__init__.py @@ -27,6 +27,7 @@ from .products import ( OldCloudMaskProduct, NewCloudMaskProduct, IndiceProduct, + FamilyProduct, ) from .library import Library from .data_request import DataRequest @@ -46,6 +47,7 @@ from .utils import ( get_Sen2Cor_version, get_latest_s2c_version_path, set_permissions, + get_cm_dict, get_cm_string_from_dict, get_indice_from_identifier ) from .geo_utils import ( serialise_tiles_index, diff --git a/sen2chain/products.py b/sen2chain/products.py index 22411e0..0847440 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -20,7 +20,15 @@ from typing import ( Union ) from packaging import version -from .utils import grouper, set_permissions, get_Sen2Cor_version, get_latest_s2c_version_path +from .utils import ( + grouper, + set_permissions, + get_Sen2Cor_version, + get_latest_s2c_version_path, + get_cm_dict, + get_cm_string_from_dict, + get_indice_from_identifier +) from .config import Config, SHARED_DATA from .xmlparser import MetadataParser, Sen2ChainMetadataParser from .sen2cor import process_sen2cor @@ -1733,3 +1741,100 @@ class IndiceProduct: ) else: Sen2ChainMetadataParser(self._info_path).init_metadata() + +class FamilyProduct(dict): + """Family product class + :param identifier: cloudmask filename. + """ + + _l1c_library_path = Path(Config().get("l1c_path")) + _l2a_library_path = Path(Config().get("l2a_path")) + _cloudmask_library_path = Path(Config().get("cloudmasks_path")) + _indice_library_path = Path(Config().get("indices_path")) + + def __init__( + self, + identifier: str = None, + ): + if not (identifier): + raise ValueError( + "Identifier cannot be empty" + ) + else: + fid_tile = self.get_family_id_tile(identifier) + if fid_tile: + self.update(fid_tile) + self.family_id = fid_tile["family_id"] + self.tile = fid_tile["tile"] + self.l1c_id = self.get_l1c() + self.l2a_id = self.get_l2a() + self.cloudmasks = self.get_cloudmasks() + self.indices = self.get_indices() + for key, val in self.__dict__.items(): + self[key] = val + + def get_family_id_tile(self, identifier): + try: + pat = re.compile( + r".*(?P<family_id>" + + "[0-9]{8}T[0-9]{6}" + + "_N[0-9]{4}_R[0-9]{3}" + + "_T(?P<tile>[0-9]{2}[A-Z]{3})" + + "_[0-9]{8}T[0-9]{6}" + + ").*" + ) + return pat.match(identifier).groupdict() + except: + pass + + def get_l1c(self): + library_path = self._l1c_library_path / self.tile + paths = list(library_path.glob("*L1C*" + self.family_id + "*")) + if len(paths) > 0: + return paths[0].stem + else: + return None + + def get_l2a(self): + library_path = self._l2a_library_path / self.tile + paths = list(library_path.glob("*L2A*" + self.family_id + "*")) + if len(paths) > 0: + return paths[0].stem + else: + return None + + def get_cloudmasks(self): + cloudmasks = [] + library_path = self._cloudmask_library_path / self.tile / self.l2a_id + paths = list(library_path.glob("*L2A*" + self.family_id + "_CM*.jp2")) + for item in paths: + cm_dict = get_cm_dict(item.stem) + cm_string = get_cm_string_from_dict(cm_dict) + cm_dict["cm_string"] = cm_string + cloudmasks.append(cm_dict) + return cloudmasks + + def get_indices(self): + indices = [] + library_path = self._indice_library_path + paths = list( + library_path.glob( + "*/" + + self.tile + "/" + + self.l2a_id + "/" + + "*.jp2" + ) + ) + for item in paths: + logger.info(item) + indice_dict = {} + indice_dict["indice"] = get_indice_from_identifier(item.stem) + cm_dict = get_cm_dict(item.stem) + if cm_dict: + indice_dict.update(cm_dict) + indice_dict["cm_string"] = get_cm_string_from_dict(cm_dict) + indice_dict["indice_string"] = indice_dict["indice"] + "_" + indice_dict["cm_string"] + else: + indice_dict["indice_string"] = indice_dict["indice"] + indices.append(indice_dict) + return indices diff --git a/sen2chain/utils.py b/sen2chain/utils.py index 8625519..8828b89 100644 --- a/sen2chain/utils.py +++ b/sen2chain/utils.py @@ -200,3 +200,89 @@ def get_latest_s2c_version_path(l1c_identifier): else: return current_path +def get_cm_dict(identifier) -> dict: + """Returns cloudmask version from an identifier string. + :param string: string from which to extract the version name. + can be a cloudmask or an indice identifier + """ + returned_val = None + try: + pat = re.compile(r".*(?P<cm_version>CM00[1-2])") + returned_val = pat.match(identifier).groupdict() + + except: + try: + pat = re.compile( + r".*(?P<cm_version>CM003)" + + "-PRB(?P<probability>.*)" + + "-ITER(?P<iterations>.*)" + ) + returned_val = pat.match(identifier).groupdict() + + except: + try: + pat = re.compile( + r".*(?P<cm_version>CM004)" + + "-CSH(?P<cld_shad>.*)" + + "-CMP(?P<cld_med_prob>.*)" + + "-CHP(?P<cld_hi_prob>.*)" + + "-TCI(?P<thin_cir>.*)" + + "-ITER(?P<iterations>.*)" + ) + returned_val = pat.match(identifier).groupdict() + except: + pass + + if returned_val: + if "probability" not in returned_val: + returned_val["probability"] = 1 + if "iterations" not in returned_val: + returned_val["iterations"] = 5 + if "cld_shad" not in returned_val: + returned_val["cld_shad"] = True + if "cld_med_prob" not in returned_val: + returned_val["cld_med_prob"] = True + if "cld_hi_prob" not in returned_val: + returned_val["cld_hi_prob"] = True + if "thin_cir" not in returned_val: + returned_val["thin_cir"] = True + + returned_val["probability"] = int(returned_val["probability"]) + returned_val["iterations"] = int(returned_val["iterations"]) + returned_val["cld_shad"] = returned_val["cld_shad"] in ["1", 1, True, "True"] + returned_val["cld_med_prob"] = returned_val["cld_med_prob"] in ["1", 1, True, "True"] + returned_val["cld_hi_prob"] = returned_val["cld_hi_prob"] in ["1", 1, True, "True"] + returned_val["thin_cir"] = returned_val["thin_cir"] in ["1", 1, True, "True"] + + return returned_val + +def get_cm_string_from_dict(cm_dict) -> str: + if cm_dict: + if cm_dict["cm_version"] == "CM001": + cm_string = "CM001" + elif cm_dict["cm_version"] == "CM002": + cm_string = "CM002-B11" + elif cm_dict["cm_version"] == "CM003": + cm_string = ( + "CM003" + + "-PRB" + str(cm_dict["probability"]) + + "-ITER" + str(cm_dict["iterations"]) + ) + elif cm_dict["cm_version"] == "CM004": + cm_string = ( + "CM004" + + "-CSH" + str(1 * cm_dict["cld_shad"]) + + "-CMP" + str(1 * cm_dict["cld_med_prob"]) + + "-CHP" + str(1 * cm_dict["cld_hi_prob"]) + + "-TCI" + str(1 * cm_dict["thin_cir"]) + + "-ITER" + str(cm_dict["iterations"]) + ) + else: + cm_string = None + else: + cm_string = None + return cm_string + +def get_indice_from_identifier(identifier) -> str: + indice = (identifier.replace(".", "_").split("_")[7]).upper() + return indice -- GitLab