From e04cfd95407abb4995d9096e814e205e1d372eee Mon Sep 17 00:00:00 2001 From: Impact <pascal.mouquet@ird.fr> Date: Tue, 13 Oct 2020 13:15:48 +0400 Subject: [PATCH] Update multiprocessing functions for cloudmask versions. Updated NDVI masking --- sen2chain/cloud_mask.py | 23 +-- sen2chain/indices.py | 33 ++-- sen2chain/multi_processing.py | 23 +++ sen2chain/products.py | 301 +++++++++++++++++---------------- sen2chain/tiles.py | 303 ++++++++++++++++++++++------------ sen2chain/xmlparser.py | 6 - 6 files changed, 392 insertions(+), 297 deletions(-) diff --git a/sen2chain/cloud_mask.py b/sen2chain/cloud_mask.py index eb70fbe..eadd536 100755 --- a/sen2chain/cloud_mask.py +++ b/sen2chain/cloud_mask.py @@ -256,21 +256,15 @@ def create_cloud_mask_v2( ) -> None: """ create cloud mask - uint8 - :param cloud_mask: path to the cloud mask raster. :param out_path: path to the output. :param erosion: size of the outer buffer in px. :param dilatation: size of the inner buffer in px. """ - logger.info("Creating cloud-mask_v2") out_temp_path = Path(Config().get("temp_path")) - #~ out_bin = str(out_temp_path / (out_path.stem + "_tmp_bin.tif")) - #~ out_erode = str(out_temp_path / (out_path.stem + "_tmp_erode.tif")) out_dilate = str(out_temp_path / (out_path.stem + "_tmp_dilate.tif")) - - logger.info('Loading cloud_prb...') + CLD_seuil = 25 with rasterio.open(str(cloud_mask)) as cld_src: cld = cld_src.read(1).astype(np.int16) @@ -290,7 +284,6 @@ def create_cloud_mask_v2( [0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0]]) cld_dilate = ndimage.binary_dilation(cld_erode, kernel).astype(cld_erode.dtype) - logger.info("saving TIFF file...") cld_profile.update(driver="Gtiff", compress="NONE", tiled=False, @@ -305,7 +298,6 @@ def create_cloud_mask_v2( dst.write(cld_dilate.astype(np.int8), 1) # Save to JP2000 - logger.info("converting to JP2000 file...") src_ds = gdal.Open(out_dilate) driver = gdal.GetDriverByName("JP2OpenJPEG") dst_ds = driver.CreateCopy(str(out_path), src_ds, @@ -313,9 +305,8 @@ def create_cloud_mask_v2( dst_ds = None src_ds = None - #~ os.remove(out_bin) - #~ os.remove(out_erode) os.remove(out_dilate) + logger.info("Done: {}".format(out_path.name)) def create_cloud_mask_b11( @@ -333,14 +324,10 @@ def create_cloud_mask_b11( :param erosion: size of the outer buffer in px. :param dilatation: size of the inner buffer in px. """ - logger.info("Masking cloud-mask_v2 with B11") out_temp_path = Path(Config().get("temp_path")) - #~ out_bin = str(out_temp_path / (out_path.stem + "_tmp_bin.tif")) - #~ out_dilate = str(out_temp_path / (out_path.stem + "_tmp_dilate.tif")) out_mask = str(out_temp_path / (out_path.stem + "_tmp_mask.tif")) - logger.info('Loading B11...') b11_seuil = 1500 with rasterio.open(str(b11_path)) as b11_src: b11 = b11_src.read(1).astype(np.int16) @@ -379,7 +366,6 @@ def create_cloud_mask_b11( [0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0]]) cld_mskd_dilate = ndimage.binary_dilation(cld_mskd, kernel).astype(cld_mskd.dtype) - logger.info("saving TIFF file...") cld_profile.update(driver="Gtiff", compress="NONE", tiled=False, @@ -394,7 +380,6 @@ def create_cloud_mask_b11( dst.write(cld_mskd_dilate.astype(np.int8), 1) # Save to JP2000 - logger.info("converting to JP2000 file...") src_ds = gdal.Open(out_mask) driver = gdal.GetDriverByName("JP2OpenJPEG") dst_ds = driver.CreateCopy(str(out_path), src_ds, @@ -402,6 +387,6 @@ def create_cloud_mask_b11( dst_ds = None src_ds = None - #~ os.remove(out_bin) - #~ os.remove(out_dilate) os.remove(out_mask) + logger.info("Done: {}".format(out_path.name)) + diff --git a/sen2chain/indices.py b/sen2chain/indices.py index 2c075f4..1bea2f8 100644 --- a/sen2chain/indices.py +++ b/sen2chain/indices.py @@ -18,6 +18,7 @@ from .indices_functions import (create_raw_ndvi, create_raw_ndwi, create_raw_ndw create_raw_mndwi, create_masked_indice, index_tiff_2_jp2) from .colormap import matplotlib_colormap_to_rgb, create_colormap, create_rvb +#~ from .products import NewCloudMaskProduct logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO) @@ -58,9 +59,8 @@ class Indice(metaclass=ABCMeta): class Ndvi(Indice): """ NDVI = (NIR-VIR) / (NIR+VIR) - - NIR: band 08 - VIR: band 04 + NIR: band 08 (10m) + VIR: band 04 (10m) """ name = "NDVI" filename_template = "{product_identifier}_NDVI{ext}" @@ -69,11 +69,12 @@ class Ndvi(Indice): colormap = cm.RdYlGn # _colors_table_path = functions_data_path = Path("../share/data/RdYlGn.lut") - def __init__(self, l2a_product_object): - if l2a_product_object is None: - raise ValueError("A L2aProduct object must be provided") + def __init__(self, l2a_product_object, cm_product_object): + if (l2a_product_object or cm_product_object) is None: + raise ValueError("A L2aProduct and NewCloudMask objects must be provided") else: self.l2a_product = l2a_product_object + self.cm_product = cm_product_object # output path self.out_path = None @@ -88,6 +89,7 @@ class Ndvi(Indice): out_path: pathlib.PosixPath, reprocess: bool = False, nodata_clouds: bool = False, + #~ cm_version: str = "cm001", quicklook: bool = False ) -> None: """ process @@ -108,10 +110,17 @@ class Ndvi(Indice): out_path=(out_path / self.indice_filename)) if nodata_clouds: - if self.l2a_product.user_cloud_mask is None: + + #~ cm_product = NewCloudMaskProduct(l2a_identifier = self.l2a_product.identifier, version = cm_version) + if not self.cm_product.path.exists(): raise ValueError("Cloud mask does not exist") - masked_indice_filename = self.indice_stem + "_MASKED" + self.ext - masked_indice_raw = self.indice_stem + "_MASKED" + self.ext_raw + masked_indice_filename = self.indice_stem + "_" + self.cm_product.suffix + self.ext + masked_indice_raw = self.indice_stem + "_" + self.cm_product.suffix + self.ext_raw + + #~ if self.l2a_product.user_cloud_mask is None: + #~ raise ValueError("Cloud mask does not exist") + #~ masked_indice_filename = self.indice_stem + "_MASKED" + self.ext + #~ masked_indice_raw = self.indice_stem + "_MASKED" + self.ext_raw if (out_path / masked_indice_filename).exists() and not reprocess: logger.info("{} already exists".format(masked_indice_filename)) @@ -121,7 +130,7 @@ class Ndvi(Indice): else: ndvi_name = (out_path / self.indice_filename) create_masked_indice(indice_path=ndvi_name, - cloud_mask_path=self.l2a_product.user_cloud_mask, + cloud_mask_path=self.cm_product.path, out_path=(out_path / masked_indice_raw)) index_tiff_2_jp2(img_path=(out_path / masked_indice_raw), out_path=(out_path / masked_indice_filename)) @@ -136,7 +145,7 @@ class Ndvi(Indice): if quicklook: cmap = matplotlib_colormap_to_rgb(self.colormap, revers=False) - quicklook_filename = self.indice_stem + "_QUICKLOOK.tif" + quicklook_filename = self.indice_stem + "_" + self.cm_product.suffix + "_QL.tif" if (self.out_path / quicklook_filename).exists() and not reprocess: logger.info("{} already exists".format(quicklook_filename)) @@ -153,7 +162,7 @@ class Ndvi(Indice): #~ lut_dict=cmap, clouds_color="white", #~ out_path=(self.out_path / quicklook_filename)) create_rvb(raster=(self.out_path / self.indice_filename), - cloud_mask=self.l2a_product.user_cloud_mask, + cloud_mask=self.cm_product.path, # lut_dict=get_colormap(self._colors_table_path.absolute()), lut_dict=cmap, clouds_color="white", out_path=(self.out_path / quicklook_filename)) diff --git a/sen2chain/multi_processing.py b/sen2chain/multi_processing.py index 72a5f8b..134b4a4 100644 --- a/sen2chain/multi_processing.py +++ b/sen2chain/multi_processing.py @@ -69,6 +69,29 @@ def cld_multiprocessing(process_list, nb_proc=4): pool.join() return True +def multi_cld_ver_pro_iter_repro(l2a_ver_pro_iter_repro): + l2a = L2aProduct(l2a_ver_pro_iter_repro[0]) + version = l2a_ver_pro_iter_repro[1] + probability = l2a_ver_pro_iter_repro[2] + iterations = l2a_ver_pro_iter_repro[3] + reprocess = l2a_ver_pro_iter_repro[4] + try: + l2a.compute_cloud_mask(version = version, + probability = probability, + iterations = iterations, + reprocess = reprocess) + except: + pass + +def cld_version_probability_iterations_reprocessing_multiprocessing(process_list, nb_proc=4): + """ """ + nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1) + pool = multiprocessing.Pool(nb_proc) + results = [pool.map(multi_cld_ver_pro_iter_repro, process_list)] + pool.close() + pool.join() + return True + def multi_idx(l2a_id_idx): l2a_identifier = l2a_id_idx[0] indice = [l2a_id_idx[1]] diff --git a/sen2chain/products.py b/sen2chain/products.py index 3a35bc2..0519e87 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -375,36 +375,29 @@ class L2aProduct(Product): tile: str = None, path: str = None ) -> None: - logger.info("l2a-001") super().__init__(identifier=identifier, tile=tile, path=path) if not re.match(r".*L2A_.*", identifier): raise ValueError("Invalid L2A product name") self.indices_path = self._indices_library_path - logger.info("l2a-002") + # user cloud mask - self.user_cloud_mask = self.path.parent / (self.identifier + "_CLOUD_MASK.jp2") - if not self.user_cloud_mask.exists(): - self.user_cloud_mask = None - logger.info("l2a-003") - # user cloud mask b11 - self.user_cloud_mask_b11 = self.path.parent / (self.identifier + "_CLOUD_MASK_B11.jp2") - if not self.user_cloud_mask_b11.exists(): - self.user_cloud_mask_b11 = None - logger.info("l2a-004") + #~ self.user_cloud_mask = self.path.parent / (self.identifier + "_CLOUD_MASK.jp2") + #~ if not self.user_cloud_mask.exists(): + #~ self.user_cloud_mask = None + #~ # user cloud mask b11 + #~ self.user_cloud_mask_b11 = self.path.parent / (self.identifier + "_CLOUD_MASK_B11.jp2") + #~ if not self.user_cloud_mask_b11.exists(): + #~ self.user_cloud_mask_b11 = None # user QL self.user_ql = self.path.parent / (self.identifier + "_QL.tif") if not self.user_ql.exists(): self.user_ql = None - logger.info("l2a-005") # versions self._sen2chain_info_path = self.path / "sen2chain_info.xml" - logger.info("l2a-005-00") if self._sen2chain_info_path.parent.exists() and not self._sen2chain_info_path.exists(): - logger.info("l2a-005-01") Sen2ChainMetadataParser(self._sen2chain_info_path).init_metadata() - logger.info("l2a-006") def process_ql(self, reprocess: bool = False, @@ -519,17 +512,17 @@ class L2aProduct(Product): def compute_cloud_mask(self, - version: str = "cm001", - probability: int = 1, - iterations: int = 5, - #~ buffering: bool = True, - reprocess: bool = False, - out_path_mask = None, - out_path_mask_b11 = None - ) -> "L2aProduct": + version: str = "cm001", + probability: int = 1, + iterations: int = 5, + #~ buffering: bool = True, + reprocess: bool = False, + out_path_mask = None, + out_path_mask_b11 = None + ) -> "L2aProduct": """ """ - logger.info("{}: processing cloud_mask_v3".format(self.identifier)) + logger.info("Computing cloudmask version {}: {}".format(version, self.identifier)) cloudmask = NewCloudMaskProduct(l2a_identifier = self.identifier, sen2chain_processing_version = self.sen2chain_processing_version, @@ -537,41 +530,42 @@ class L2aProduct(Product): probability = probability, iterations = iterations) - if cloudmask.path and not reprocess: - logger.info("{} cloud mask already exists".format(cloudmask.identifier)) + if cloudmask.path.exists() and not reprocess: + logger.info("{} cloud mask already computed".format(cloudmask.identifier)) else: - if cloudmask.updated_path: - if version == "cm001": - if cloudmask.path: - cloudmask.path.unlink() # in version 3.8 will be updated using missing_ok = True - create_cloud_mask_v2(self.cld_20m, - erosion=1, - dilatation=5, - out_path=cloudmask.updated_path) - self.user_cloud_mask = cloudmask.updated_path - elif version == "cm002": - cloudmask_cm001 = NewCloudMaskProduct(l2a_identifier = self.identifier, - sen2chain_processing_version = self.sen2chain_processing_version, - version = "cm001") - if cloudmask_cm001.path: - if cloudmask.path: - cloudmask.path.unlink() # in version 3.8 will be updated using missing_ok = True - create_cloud_mask_b11(cloudmask_cm001.path, - self.b11_20m, - dilatation=5, - out_path=Path(str(cloudmask_cm001.path).replace("CM001", "CM002-B11"))) - else: - logger.info("No cloudmask version cm001 found, please compute this one first") - elif version == "cm003": - toto=12 - elif version == "cm004": - toto=12 - else: - logger.info("Wrong cloudmask version {}".format(version)) + if version == "cm001": + if cloudmask.path.exists(): # in version 3.8 will be updated using missing_ok = True + cloudmask.path.unlink() + cloudmask._info_path.unlink() + create_cloud_mask_v2(self.cld_20m, + erosion=1, + dilatation=5, + out_path=cloudmask.path) + #~ self.user_cloud_mask = cloudmask.updated_path + elif version == "cm002": + cloudmask_cm001 = NewCloudMaskProduct(l2a_identifier = self.identifier, + sen2chain_processing_version = self.sen2chain_processing_version, + version = "cm001") + if cloudmask_cm001.path.exists(): + if cloudmask.path.exists(): # in version 3.8 will be updated using missing_ok = True + cloudmask.path.unlink() + cloudmask._info_path.unlink() + create_cloud_mask_b11(cloudmask_cm001.path, + self.b11_20m, + dilatation=5, + out_path=Path(str(cloudmask_cm001.path).replace("CM001", "CM002-B11"))) + else: + logger.info("No cloudmask version cm001 found, please compute this one first") + elif version == "cm003": + toto=12 + elif version == "cm004": + toto=12 else: - logger.info("Impossible to compute cloudmask (no L2a product)") + logger.info("Wrong cloudmask version {}".format(version)) + + cloudmask.init_md() - return self + #~ return self def process_indices(self, indices_list: List[str] = [], @@ -613,6 +607,50 @@ class L2aProduct(Product): nodata_clouds=nodata_clouds, quicklook=quicklook, reprocess=reprocess) + return self + + def compute_indice(self, + indice: str = None, + nodata_clouds: bool = True, + cm_version: str = "cm001", + quicklook: bool = False, + reprocess: bool = False, + out_path: str = None + ) -> "L2aProduct": + """ + compute and mask indice specified cloudmask version + :param indice: a valid indice name + :param nodata_clouds: mask indices rasters with the cloud-mask or not + :param cm_version: cloudmask version to use for masking + :param quicklook: process a quicklook or not (if available for the indice). + :param reprocess: if true reprocess + :param out_path: if specified, a folder {identifier}_INDICES containing \ + the processed indices will be created in the out_path. + """ + logger.info("Computing indice {}: {}".format(indice, self.identifier)) + + #~ if not isinstance(indices_list, list): + #~ raise TypeError("Indices must be provided as a list.") + + #~ for indice in set(indices_list): + #~ logger.info("Processing {}: {}".format(indice, self.identifier)) + indice_cls = IndicesCollection.get_indice_cls(indice) + if indice_cls is None: + print("Indices available: {}".format(IndicesCollection.available_indices)) + raise ValueError("Indice not defined") + + if out_path is None: + # logger.info(self.identifier) + indice_path = self.indices_path / indice.upper() / self.tile / self.identifier + else: + indice_path = Path(out_path) / (self.identifier + "_INDICES") / indice.upper() + indice_path.mkdir(parents=True, exist_ok=True) + indice_obj = indice_cls(self, NewCloudMaskProduct(l2a_identifier = self.identifier, version = cm_version)) + indice_obj.process_indice(out_path = indice_path, + nodata_clouds = nodata_clouds, + #~ cm_version = cm_version, + quicklook = quicklook, + reprocess = reprocess) return self @@ -667,7 +705,6 @@ class L2aProduct(Product): @property def sen2chain_processing_version(self): - logger.info("SPV-001") return Sen2ChainMetadataParser(self._sen2chain_info_path).get_metadata_value('SEN2CHAIN_PROCESSING_VERSION') @property @@ -990,101 +1027,13 @@ class NewCloudMaskProduct: raise ValueError("Product or L2a identifier cannot be empty") else: self.tile = self.get_tile(identifier or l2a_identifier) - self.l2a = l2a_identifier.replace(".SAFE", "") or self.get_l2a(identifier) - self.path = None - self.updated_path = None - self.spv = None - self.uptodate = False - self.updatable = False - #~ self.theorical_path = None - if identifier: - self.identifier = identifier - else: - self.identifier = None - if version == "cm001": - suffix = "_CM001" - elif version == "cm002": - suffix = "_CM002-B11" - elif version == "cm003": - suffix = "_CM003-PRB" + str(probability) + "ITR" + str(iterations) - else: - raise ValueError("Cloud Mask version cmxxx is not defined") - logger.info("002") - - self.path = next(iter(list((self._library_path / self.tile / self.l2a).glob(self.l2a.replace(".SAFE", "") + \ - "_SPV*" + suffix + ".jp2"))), \ - None) - if self.path: - self.identifier = self.path.name - self.spv = re.findall("_SPV([0-9]{4})_", str(self.path))[0] - if self.spv == Config().get("sen2chain_processing_version"): - self.uptodate = True - else: - l2a = L2aProduct(self.l2a) - if l2a.path.exists() and int(l2a.sen2chain_processing_version) > int(self.spv): - self.updatable = True - - l2a = L2aProduct(self.l2a) - if l2a.path.exists(): - self.updated_path = self._library_path /\ - self.tile /\ - self.l2a /\ - (self.l2a + "_SPV" + l2a.sen2chain_processing_version +\ - suffix + ".jp2") - - - #~ self.uptodate = # vrai ou faux si les deux sont égaux et si spv sup - #~ self.updatable = # vrai si uptidater et l2a exist - - - - - #~ l2a_product = L2aProduct(l2a_identifier) - #~ if not sen2chain_processing_version: - #~ logger.info("002-01") - #~ path = list((self._library_path / l2a_product.tile / self.l2a).glob(self.l2a.replace(".SAFE", "") + \ - #~ "_SPV*" + \ - #~ suffix + \ - #~ ".jp2")) - #~ logger.info(path) - #~ if not path: - #~ try: - #~ sen2chain_processing_version = l2a_product.sen2chain_processing_version - #~ logger.info("sen2chain_processing_version {}".format(sen2chain_processing_version)) - #~ self.identifier = self.l2a.replace(".SAFE", "") + \ - #~ "_SPV" + str(sen2chain_processing_version or Config().get("sen2chain_processing_version")) + \ - #~ suffix + \ - #~ ".jp2" - #~ self.l2a_present = False - - #~ except: - #~ self.l2a_present = True - #~ self.spv=None - #~ self.identifier = self.l2a.replace(".SAFE", "") + \ - #~ "_SPV" + Config().get("sen2chain_processing_version") + \ - #~ suffix + \ - #~ ".jp2" - - #~ else: - #~ self.identifier = path[0].name - #~ self.spv = re.findall("_SPV([0-9]{4})_", self.identifier)[0] - #~ logger.info("003") - - - - #~ else: - #~ self.identifier = identifier - #~ self.spv = re.findall("_SPV([0-9]{4})_", identifier)[0] - #~ self.l2a = self.get_l2a(self.identifier) - #~ logger.info("004") - - #~ logger.info("005") - - #~ logger.info("006") - #~ self.path = self._library_path / self.tile / self.l2a / self.identifier - #~ logger.info(self.path) - #~ if self.path.exists() and - + self.l2a = (l2a_identifier or self.get_l2a(identifier)).replace(".SAFE", "") + self.suffix = [i for i in ["CM001", "CM002-B11", "CM003-PRB" + str(probability) + "ITR" + str(iterations)] if version.upper() in i][0] + self.identifier = identifier or self.l2a + "_" + self.suffix + ".jp2" + self.version = self.get_version(self.identifier) or version + self.path = self._library_path / self.tile / self.l2a / self.identifier + self._info_path = self.path.parent / (self.path.stem + ".xml") + self.init_md() @staticmethod def get_tile(identifier) -> str: @@ -1094,10 +1043,56 @@ class NewCloudMaskProduct: """ return re.findall("_T([0-9]{2}[A-Z]{3})_", identifier)[0] + @staticmethod + def get_identifier(l2a_identifier) -> str: + """Returns l2a name from a old cloud mask identifier string. + :param string: string from which to extract the l2a name. + """ + return re.findall(r"(S2.+)_CM.+jp2", identifier)[0] + @staticmethod def get_l2a(identifier) -> str: """Returns l2a name from a old cloud mask identifier string. :param string: string from which to extract the l2a name. """ - return re.findall(r"(S2.+)_SPV.+jp2", identifier)[0] + return re.findall(r"(S2.+)_CM.+jp2", identifier)[0] + + @staticmethod + def get_version(identifier) -> str: + """Returns cloudmask version from a cloud mask identifier string. + :param string: string from which to extract the version name. + """ + return re.findall(r"S2.+_(CM[0-9]{3}).*jp2", identifier)[0] + + @property + def sen2chain_version(self): + return Sen2ChainMetadataParser(self._info_path).get_metadata_value('SEN2CHAIN_VERSION') + + @property + def sen2chain_processing_version(self): + return Sen2ChainMetadataParser(self._info_path).get_metadata_value('SEN2CHAIN_PROCESSING_VERSION') + + @property + def sen2cor_version(self): + return Sen2ChainMetadataParser(self._info_path).get_metadata_value('SEN2COR_VERSION') + + def init_md(self): + if self.path.exists() and not self._info_path.exists(): + l2a = L2aProduct(self.l2a) + if l2a._sen2chain_info_path.exists(): + Sen2ChainMetadataParser(self._info_path).set_metadata(sen2chain_version = l2a.sen2chain_version, + sen2chain_processing_version = l2a.sen2chain_processing_version, + sen2cor_version = l2a.sen2cor_version) + else: + Sen2ChainMetadataParser(self._info_path).init_metadata() + + def update_md(self, + sen2chain_version: str = None, + sen2chain_processing_version: str = None, + sen2cor_version: str = None, + ): + """ Set custom sen2chain, sen2chain_processing and sen2cor versions """ + Sen2ChainMetadataParser(self._info_path).set_metadata(sen2chain_version = sen2chain_version, + sen2chain_processing_version = sen2chain_processing_version, + sen2cor_version = sen2cor_version) diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index d0ae050..759d8f4 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -23,7 +23,7 @@ from .config import Config, SHARED_DATA from .utils import str_to_datetime, human_size, getFolderSize from .indices import IndicesCollection from .products import L1cProduct, L2aProduct, OldCloudMaskProduct, NewCloudMaskProduct -from .multi_processing import l2a_multiprocessing, cld_multiprocessing, idx_multiprocessing +from .multi_processing import l2a_multiprocessing, cld_multiprocessing, cld_version_probability_iterations_reprocessing_multiprocessing, idx_multiprocessing logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -126,35 +126,6 @@ class ProductsList: self._time_index[value["date"]] = item -class IndicesList(ProductsList): - """Class for managing indices products lists. - - """ - @property - def raws(self) -> "ProductsList": - filtered = ProductsList() - for k, v in self._dict.items(): - if not("MASK" in k) and not("QUICKLOOK" in k): - filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} - return filtered - - @property - def masks(self) -> "ProductsList": - filtre = ProductsList() - for k, v in self._dict.items(): - if "MASK" in k: - filtre[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} - return filtre - - @property - def quicklooks(self) -> "ProductsList": - filtered = ProductsList() - for k, v in self._dict.items(): - if "QUICKLOOK" in k: - filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} - return filtered - - class CloudMaskList(ProductsList): """Class for managing mask product list @@ -182,6 +153,14 @@ class CloudMaskList(ProductsList): if "_CM003" in k: filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} return filtered + + @property + def cm004(self) -> "CloudMaskList": + filtered = CloudMaskList() + for k, v in self._dict.items(): + if "_CM004" in k: + filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + return filtered def params(self, probability: int = 1, @@ -194,7 +173,64 @@ class CloudMaskList(ProductsList): filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} else: filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} - return filtered + return filtered + + +#~ class IndicesList(ProductsList): + #~ """Class for managing indices products lists. + + #~ """ + #~ @property + #~ def raws(self) -> "ProductsList": + #~ filtered = ProductsList() + #~ for k, v in self._dict.items(): + #~ if not("MASK" in k) and not("QUICKLOOK" in k): + #~ filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + #~ return filtered + + #~ @property + #~ def masks(self) -> "ProductsList": + #~ filtre = ProductsList() + #~ for k, v in self._dict.items(): + #~ if "MASK" in k: + #~ filtre[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + #~ return filtre + + #~ @property + #~ def quicklooks(self) -> "ProductsList": + #~ filtered = ProductsList() + #~ for k, v in self._dict.items(): + #~ if "QUICKLOOK" in k: + #~ filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + #~ return filtered + +class NewIndiceList(CloudMaskList): + """Class for managing indices products lists. + + """ + @property + def raws(self) -> "ProductsList": + filtered = NewIndiceList() + for k, v in self._dict.items(): + if not("_CM" in k): + filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + return filtered + + @property + def masks(self) -> "ProductsList": + filtred = NewIndiceList() + for k, v in self._dict.items(): + if ("_CM" in k) and not("_QL" in k): + filtred[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + return filtred + + @property + def quicklooks(self) -> "ProductsList": + filtered = NewIndiceList() + for k, v in self._dict.items(): + if "_QL" in k: + filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]} + return filtered class Tile: """Class for managing tiles in the library. @@ -217,15 +253,15 @@ class Tile: self._products = {"l1c": ProductsList(), "l2a": ProductsList(), "cloudmasks" : ProductsList(), - "cloudmasks2" : ProductsList(), + #~ "cloudmasks2" : ProductsList(), "indices": dict()} self._get_indices_paths() self._get_l1c_list() self._get_l2a_list() self._get_cloudmasks() - self._get_cloudmasks2() - self._get_indices_list() + #~ self._get_indices_list() + self._get_new_indice_list() def _get_indices_paths(self) -> None: """Updates self._paths""" @@ -250,21 +286,21 @@ class Tile: cloud_cover = L2aProduct(f.name, self.name).cloud_coverage_assessment self._products["l2a"][f.name] = {"date": date, "cloud_cover": float(cloud_cover)} - def _get_cloudmasks(self) -> None: - """Scans L2A folder for cloud masks and adds corresponding L2A products in a ProductsList.""" - for f in self._paths["l2a"].glob("*L2A*_CLOUD_MASK.jp2"): - l1c_name = f.name.replace("L2A_", "L1C_").replace("_USER_", "_OPER_").replace("_CLOUD_MASK.jp2", ".SAFE") - try: - date = self._products["l1c"][l1c_name].date - cloud_cover = self._products["l1c"][l1c_name].cloud_cover - except KeyError: - date = Tile._get_date(f.name.replace("_CLOUD_MASK.jp2", ".SAFE")) - cloud_cover = L2aProduct(f.name.replace("_CLOUD_MASK.jp2", ".SAFE"), self.name).cloud_coverage_assessment - self._products["cloudmasks"][f.name.replace("_CLOUD_MASK.jp2", ".SAFE")] = {"date": date, "cloud_cover": float(cloud_cover)} + #~ def _get_cloudmasks(self) -> None: + #~ """Scans L2A folder for cloud masks and adds corresponding L2A products in a ProductsList.""" + #~ for f in self._paths["l2a"].glob("*L2A*_CLOUD_MASK.jp2"): + #~ l1c_name = f.name.replace("L2A_", "L1C_").replace("_USER_", "_OPER_").replace("_CLOUD_MASK.jp2", ".SAFE") + #~ try: + #~ date = self._products["l1c"][l1c_name].date + #~ cloud_cover = self._products["l1c"][l1c_name].cloud_cover + #~ except KeyError: + #~ date = Tile._get_date(f.name.replace("_CLOUD_MASK.jp2", ".SAFE")) + #~ cloud_cover = L2aProduct(f.name.replace("_CLOUD_MASK.jp2", ".SAFE"), self.name).cloud_coverage_assessment + #~ self._products["cloudmasks"][f.name.replace("_CLOUD_MASK.jp2", ".SAFE")] = {"date": date, "cloud_cover": float(cloud_cover)} - def _get_cloudmasks2(self) -> None: + def _get_cloudmasks(self) -> None: """Scans cloudmasks folder for cloud masks and adds corresponding L2A products in a ProductsList.""" - self._products["cloudmasks2"] = CloudMaskList() + self._products["cloudmasks"] = CloudMaskList() for f in self._paths["cloudmasks"].glob("*L2A*/*_CM*.jp2"): l1c_name = f.parent.name.replace("L2A_", "L1C_").replace("_USER_", "_OPER_") + ".SAFE" try: @@ -273,19 +309,46 @@ class Tile: except KeyError: date = Tile._get_date(f.parent.name) cloud_cover = L2aProduct(f.parent.name, self.name).cloud_coverage_assessment - self._products["cloudmasks2"][f.name] = {"date": date, + self._products["cloudmasks"][f.name] = {"date": date, "cloud_cover": float(cloud_cover), #~ "version": re.findall(r"_(CM...)", f.name)[0] } - def _get_indices_list(self) -> None: - """Scans indices folders and adds products in a IndicesList.""" + #~ def _get_indices_list(self) -> None: + #~ """Scans indices folders and adds products in a IndicesList.""" + #~ for indice, path in self._paths["indices"].items(): + #~ if path.is_dir(): + #~ self._products["indices"][indice] = IndicesList() + #~ indice_template = IndicesCollection.get_indice_cls(indice.upper()).filename_template + #~ indice_ext = IndicesCollection.get_indice_cls(indice.upper()).ext + #~ file_patterns = [indice_ext, 'QUICKLOOK.tif'] + #~ files_selected = [] + #~ for p in file_patterns: + #~ files_selected.extend(path.glob("*/*{}".format(p))) + #~ for f in files_selected: + #~ try: + #~ indice_pattern = re.sub("{.*?}", "", indice_template) + #~ remove_pattern = "{}.*".format(indice_pattern) + #~ l2a_name = re.sub(remove_pattern, '', f.name) + ".SAFE" + #~ date = self._products["l2a"][l2a_name].date + #~ cloud_cover = self._products["l2a"][l2a_name].cloud_cover + #~ except KeyError: + #~ date = Tile._get_date(f.name) + #~ cloud_cover = None + #~ self._products["indices"][indice][f.name] = {"date": date, "cloud_cover": cloud_cover} + #~ self.__dict__[indice] = self._products["indices"][indice] + + def _get_new_indice_list(self) -> None: + """Scans indice folders and adds products in a NewIndiceList.""" for indice, path in self._paths["indices"].items(): if path.is_dir(): - self._products["indices"][indice] = IndicesList() + self._products["indices"][indice] = NewIndiceList() indice_template = IndicesCollection.get_indice_cls(indice.upper()).filename_template indice_ext = IndicesCollection.get_indice_cls(indice.upper()).ext - file_patterns = [indice_ext, 'QUICKLOOK.tif'] + #~ file_patterns = [indice_ext, '_QL.tif'] + file_patterns = [indice.upper() + indice_ext, + indice.upper() + "_CM*" + indice_ext, + indice.upper() + "_CM*" + "_QL.tif"] files_selected = [] for p in file_patterns: files_selected.extend(path.glob("*/*{}".format(p))) @@ -346,16 +409,16 @@ class Tile: """Returns tile's L2A products as a ProductsList.""" return self._products["l2a"] + #~ @property + #~ def cloudmasks(self) -> "ProductsList": + #~ """Returns tile's cloud masks products as a ProductsList.""" + #~ return self._products["cloudmasks"] + @property def cloudmasks(self) -> "ProductsList": """Returns tile's cloud masks products as a ProductsList.""" return self._products["cloudmasks"] - @property - def cloudmasks2(self) -> "ProductsList": - """Returns tile's cloud masks products as a ProductsList.""" - return self._products["cloudmasks2"] - @property def l1c_missings(self) -> "ProductsList": """Returns tile's L2A products that don't have a L1C as a ProductsList.""" @@ -378,16 +441,16 @@ class Tile: "cloud_cover": self._products["l1c"][prod].cloud_cover} return prods_list - @property - def cloudmasks_missings(self) -> "ProductsList": - """Returns tile's L2A products that don't have a cloud mask as a ProductsList.""" - prods_list = ProductsList() - missings_l2a_set = set(self.l2a.products) - {identifier for identifier in self.cloudmasks.products} - for prod in missings_l2a_set: - prods_list[prod] = {"date": self._products["l2a"][prod].date, - "cloud_cover": self._products["l2a"][prod].cloud_cover} - return prods_list - + #~ @property + #~ def cloudmasks_missings(self) -> "ProductsList": + #~ """Returns tile's L2A products that don't have a cloud mask as a ProductsList.""" + #~ prods_list = ProductsList() + #~ missings_l2a_set = set(self.l2a.products) - {identifier for identifier in self.cloudmasks.products} + #~ for prod in missings_l2a_set: + #~ prods_list[prod] = {"date": self._products["l2a"][prod].date, + #~ "cloud_cover": self._products["l2a"][prod].cloud_cover} + #~ return prods_list + def cloudmasks_missing(self, version: str = "cm001", probability: int = 1, @@ -395,8 +458,8 @@ class Tile: ) -> "ProductsList": """Returns tile's L2A products that don't have a cloud mask as a ProductsList.""" prods_list = ProductsList() - missings_l2a_set = set(self.l2a.products) - {(re.findall(r"(S2.+)_SPV.+.jp2", identifier)[0] + ".SAFE") \ - for identifier in getattr(self.cloudmasks2, version).\ + missings_l2a_set = set(self.l2a.products) - {(re.findall(r"(S2.+)_CM.+.jp2", identifier)[0] + ".SAFE") \ + for identifier in getattr(self.cloudmasks, version).\ params(probability = probability, iterations = iterations).\ products} for prod in missings_l2a_set: @@ -439,7 +502,8 @@ class Tile: """Returns tile's L2A products that don't have indices as a ProductsList.""" prods_list = ProductsList() try: - missings_indice_set = set(self.l2a.products) - {identifier.replace("_" + indice.upper() + "_MASKED.jp2", ".SAFE") \ + missings_indice_set = set(self.l2a.products) - {re.sub("_" + indice.upper() + "_CM.+jp2", ".SAFE", identifier) \ + #~ identifier.replace("_" + indice.upper() + "_CM*.jp2", ".SAFE") \ for identifier in getattr(getattr(self, indice.lower()), 'masks').products} except: missings_indice_set = set(self.l2a.products) @@ -467,30 +531,7 @@ class Tile: if l1c_process_list: l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=nb_proc) - def compute_cloudmasks(self, - version: str = "cm001", - probability: int = 1, - iterations: int = 5, - date_min: str = None, - date_max: str = None, - nb_proc: int = 4): - """ - Compute all missing cloud masks for l2a products - """ - - cld_l2a_process_list = [] - cld_l2a_process_list.append(list(p.identifier for p in self.cloudmasks_missings.filter_dates(date_min = date_min, date_max = date_max))) - cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list)) - if cld_l2a_process_list: - logger.info("{} l2a products to process:".format(len(cld_l2a_process_list))) - logger.info("{}".format(cld_l2a_process_list)) - else: - logger.info("All cloud masks already computed") - cld_res = False - if cld_l2a_process_list: - cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=nb_proc) - - #~ def compute_cloudmasks2(self, + #~ def compute_cloudmasks(self, #~ version: str = "cm001", #~ probability: int = 1, #~ iterations: int = 5, @@ -502,7 +543,7 @@ class Tile: #~ """ #~ cld_l2a_process_list = [] - #~ cld_l2a_process_list.append(list(p.identifier for p in self.cloudmasks_missings.filter_dates(date_min = date_min, date_max = date_max))) + #~ cld_l2a_process_list.append(list(p.identifier for p in self.cloudmasks_missing.filter_dates(date_min = date_min, date_max = date_max))) #~ cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list)) #~ if cld_l2a_process_list: #~ logger.info("{} l2a products to process:".format(len(cld_l2a_process_list))) @@ -512,10 +553,47 @@ class Tile: #~ cld_res = False #~ if cld_l2a_process_list: #~ cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=nb_proc) - + + def compute_cloudmasks(self, + version: str = "cm001", + probability: int = 1, + iterations: int = 5, + reprocess: bool = False, + date_min: str = None, + date_max: str = None, + nb_proc: int = 4): + """ + Compute all (missing) cloud masks for l2a products + :param version: version of cloudmask to compute. Can be either cm001, cm002, cm003, or cm004 + :param probability: only used by cm003: threshold probability of clouds to be considered + :param iterations: only used by cm003: number of iterations for dilatation process while computing cloudmask + :param reprocess: if False (default), only missing cloudmasks will be computed. if True already processed cloudmask will be computed again. + :param date_min: products before this date wont be processed. Defaut None = no limit + :param date_max: product after this date wont be processed. Defaut None = no limit + :param nb_proc: number of parallel process, limited to the number of proc of your PC (default 4) + """ + + if not reprocess: + cld_l2a_process_list = list([p.identifier, version, probability, iterations, reprocess] \ + for p in self.cloudmasks_missing(version = version, + probability = probability, + iterations = iterations, + )\ + .filter_dates(date_min = date_min, date_max = date_max)) + else: + cld_l2a_process_list = list([p.identifier, version, probability, iterations, reprocess] \ + for p in self.l2a.filter_dates(date_min = date_min, date_max = date_max)) + if cld_l2a_process_list: + logger.info("{} l2a products to process:".format(len(cld_l2a_process_list))) + logger.info("{}".format(cld_l2a_process_list)) + cld_version_probability_iterations_reprocessing_multiprocessing(cld_l2a_process_list, nb_proc=nb_proc) + else: + logger.info("All cloud masks already computed") + #~ return False def compute_indices(self, indices: list = [], + cm_version: list = ["cm001"], date_min: str = None, date_max: str = None, nb_proc: int = 4): @@ -785,7 +863,7 @@ class Tile: else: l2a.process_ql(out_path = outfullpath, out_resolution=(750,750), jpg = True) - def update_cloudmasks(self): + def update_old_cloudmasks(self): """ Move and rename old cloudmasks to new cloudmask folder cloudmask xmls are removed @@ -794,19 +872,30 @@ class Tile: logger.info("Moving and renaming old masks") for f in self._paths["l2a"].glob("*L2A*_CLOUD_MASK*.jp2"): p = OldCloudMaskProduct(f.name) - f_renamed = f.name.replace("CLOUD_MASK_B11", - "SPV" + \ - L2aProduct(p.l2a).sen2chain_processing_version.replace(".", "") +\ - "_CM002-B11")\ - .replace("CLOUD_MASK", - "SPV" + \ - L2aProduct(p.l2a).sen2chain_processing_version.replace(".", "") +\ - "_CM001")\ - p_new = NewCloudMaskProduct(f_renamed) - p_new.path.parent.mkdir(exist_ok=True) + f_renamed = f.name.replace("CLOUD_MASK_B11", "CM002-B11")\ + .replace("CLOUD_MASK", "CM001") + logger.info(f_renamed) + p_new = NewCloudMaskProduct(identifier = f_renamed) + p_new.path.parent.mkdir(exist_ok=True, parents=True) p.path.replace(p_new.path) + p_new.init_md() #Remove xml logger.info("Removing xmls") for f in self._paths["l2a"].glob("*L2A*_CLOUD_MASK*.jp2.aux.xml"): f.unlink() + + def update_old_indices(self): + """ + Rename old indices to match new cloudmask nomenclature + """ + #Rename old indices to default cm_version cm001 + logger.info("Moving and renaming old indices") + + for indice, path in self._paths["indices"].items(): + logger.info("Processing: {}".format(indice.upper())) + for f in list(Path(path).glob("*/*MASKED*")) + list(Path(path).glob("*/*QUICKLOOK*")): + f_renamed = f.name.replace("MASKED", "CM001").replace("QUICKLOOK", "CM001_QL") + f.rename(str(Path(f.parent / f_renamed))) + logger.info(f_renamed) + diff --git a/sen2chain/xmlparser.py b/sen2chain/xmlparser.py index b3f20d6..ed0cac4 100644 --- a/sen2chain/xmlparser.py +++ b/sen2chain/xmlparser.py @@ -139,18 +139,13 @@ class Sen2ChainMetadataParser: xml_path, ): SEN2CHAIN_META = SHARED_DATA.get("sen2chain_meta") - logger.info("Sen2ChainMetadataParser-001") self.xml_path = xml_path if xml_path.exists(): - logger.info("Sen2ChainMetadataParser-002") self._tree = et.parse(str(xml_path)) - logger.info("Sen2ChainMetadataParser-003") else: - logger.info("Sen2ChainMetadataParser-004") self._tree = et.parse(str(SEN2CHAIN_META)) - logger.info("Sen2ChainMetadataParser-005") self._root = self._tree.getroot() @@ -167,7 +162,6 @@ class Sen2ChainMetadataParser: :param key: metadata tag name. """ try: - logger.info("get_metadata_value-001") return [v.text for v in self._root.findall(".//{0}".format(key))][0] except IndexError: logger.error("Metadata value not found: {}".format(key)) -- GitLab