From cccdad5eb7c586afe5f0bbb395e790c1c1417ea1 Mon Sep 17 00:00:00 2001 From: Impact <pascal.mouquet@ird.fr> Date: Fri, 4 Sep 2020 16:55:09 +0400 Subject: [PATCH] introducing new functions to manage Tile and Library and new TileSet class to compute l2a, cloudmasks and indices for multiple Tiles using multiprocessing" --- sen2chain/__init__.py | 2 +- sen2chain/download_and_process.py | 3 +- sen2chain/library.py | 78 +++++++++++++++-- sen2chain/tiles.py | 138 ++++++++++++++++++++++++------ sen2chain/tileset.py | 117 +++++++++++++++++++++++++ 5 files changed, 305 insertions(+), 33 deletions(-) create mode 100644 sen2chain/tileset.py diff --git a/sen2chain/__init__.py b/sen2chain/__init__.py index fefbddb..9d15b50 100644 --- a/sen2chain/__init__.py +++ b/sen2chain/__init__.py @@ -31,6 +31,6 @@ from .automatization import Automatization from .utils import format_word, grouper, datetime_to_str, str_to_datetime, human_size_decimal, human_size from .geo_utils import serialise_tiles_index, get_processed_indices_vect, crop_product_by_shp from .multi_processing import l2a_multiprocessing, cldidx_multiprocessing, cld_multiprocessing, idx_multiprocessing - +from .tileset import TileSet __version__ = "0.1.0" __author__ = "Jérémy Commins <jebins@openmailbox.org> & Impact <pascal.mouquet@ird.fr>" diff --git a/sen2chain/download_and_process.py b/sen2chain/download_and_process.py index 159bcd0..63a45cc 100644 --- a/sen2chain/download_and_process.py +++ b/sen2chain/download_and_process.py @@ -89,6 +89,7 @@ def download_peps(identifier: str, tile: str, hub: str) -> Tuple[str, str, str, # archive_l1c(identifier, tile, hub) except Exception as e: logger.debug("{}".format(e)) + temp_container.delete_temp_folder() pass if not temp_container.l1c.path.exists(): @@ -126,7 +127,7 @@ def download_scihub(identifier: str, tile: str, hub: str) -> Tuple[str, str, str #archive_l1c(identifier, tile, hub) except Exception as e: logger.debug("{}".format(e)) - #temp_container.delete_temp_folder() + temp_container.delete_temp_folder() downloaded = False logger.info("Failed download: {}".format(identifier)) diff --git a/sen2chain/library.py b/sen2chain/library.py index 5498896..a6fdbc7 100644 --- a/sen2chain/library.py +++ b/sen2chain/library.py @@ -60,6 +60,14 @@ class Library: clean_list: list = [], remove_indice_tif: bool = False, remove: bool = False): + """Function to clean corrupted files during processing errors from whole Library + or selected Tiles. Calls the clean_lib function from Tile class. + + :param clean_list: list of tiles to be cleaned. If not provided, will process all l1c tiles. + :param remove_indice_tif: bool. If True will remove present TIFF files present in indice folders. Default value False. + :param remove: bool. If True will effectively remove corrupted files, if False will just list identified problems. Default value False. + + """ nb_id = 0 nb_rm = 0 if not clean_list: @@ -77,8 +85,16 @@ class Library: def archive_l1c(self, archive_list: list = [], - size: bool = False, + size_only: bool = False, ): + """ + Function to archive l1c products from library folder to l1c_archive_path. + Calls the archive_l1c function from Tile class, see there for details. + + :param archive_list: list of tiles to archive. If not provided, will process all l1c tiles. + :param size_only: if True, only gives sizes + """ + total_size = 0 if not archive_list: archive_list = self.l1c @@ -86,22 +102,70 @@ class Library: try: logger.info(t) til = Tile(t) - size_tile = til.archive_l1c(size = size) - if size: + size_tile = til.archive_l1c(size_only = size_only) + if size_only: total_size += size_tile - logger.info("Total size: {}".format(human_size_decimal(total_size))) except: pass - #~ else: - #~ logger.info("Please specify a tile list to archive") + logger.info("Total l1c size to move: {}".format(human_size_decimal(total_size))) def update_latest_ql(self): """ - Produce or update the latest quicklook for the tile + Produce or update the latest quicklook for the L2A library tiles """ for tile in self.l2a: Tile(tile).update_latest_ql() + + def archive_l2a(self, + archive_list: list = [], + size_only: bool = False, + ): + + """ + Function to archive l2a products from library folder to l2a_archive_path. + Calls the archive_l2a function from Tile class, see there for details. + + :param archive_list: list of tiles to archive. If not provided, will process all l2a tiles. + :param size_only: if True, only gives sizes + """ + total_size = 0 + if not archive_list: + archive_list = self.l2a + for t in archive_list: + try: + logger.info(t) + til = Tile(t) + size_tile = til.archive_l2a(size_only = size_only) + if size_only: + total_size += size_tile + except: + pass + logger.info("Total l2a size to move: {}".format(human_size_decimal(total_size))) + + def archive_all(self, + archive_list: list = [], + size_only: bool = False, + ): + total_size = 0 + if not archive_list: + archive_list = self.l1c + for t in archive_list: + try: + logger.info(t) + til = Tile(t) + size_tile = til.archive_all(size_only = size_only) + total_size += size_tile + except: + pass + logger.info("Total size to move: {}".format(human_size_decimal(total_size))) + + def update_latest_ql(self): + """ + Produce or update the latest quicklook for the L2A library tiles + """ + for tile in self.l2a: + Tile(tile).update_latest_ql() class TempContainer: """Class for managing a downloaded L1C products. diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index 705947e..501b415 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -17,12 +17,13 @@ from datetime import datetime from pprint import pformat # type annotations from typing import List, Dict, Iterable +from itertools import chain from .config import Config, SHARED_DATA from .utils import str_to_datetime, human_size, getFolderSize from .indices import IndicesCollection from .products import L1cProduct, L2aProduct - +from .multi_processing import l2a_multiprocessing, cld_multiprocessing, idx_multiprocessing logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -93,7 +94,6 @@ class ProductsList: cover_min: int = 0, cover_max: int = 100 ) -> "ProductsList": """Filters products list. - :param cover_min: minimum cloud coverage. :param cover_max: maximum cloud coverage. """ @@ -323,9 +323,10 @@ class Tile: return prods_list @property - def len(self): + def info(self): logger.info("l1c: {}".format(len(self.l1c))) logger.info("l2a: {}".format(len(self.l2a))) + logger.info("cloud_masks: {}".format(len(self.cloudmasks))) for indice, path in self._paths["indices"].items(): logger.info("{} (raw / masked): {} / {}".format(indice, len(getattr(self, indice).raws), len(getattr(self, indice).masks))) @@ -362,16 +363,91 @@ class Tile: prods_list[prod] = {"date": self._products["l2a"][prod].date, "cloud_cover": self._products["l2a"][prod].cloud_cover} return prods_list + + def compute_l2a(self, + date_min: str = None, + date_max: str = None, + nb_proc: int = 8): + """ + Compute all missing l2a for l1c products + """ + l1c_process_list = [] + l1c_process_list.append(list(p.identifier for p in self.l2a_missings.filter_dates(date_min = date_min, date_max = date_max))) + l1c_process_list = list(chain.from_iterable(l1c_process_list)) + if l1c_process_list: + logger.info("{} l1c products to process:".format(len(l1c_process_list))) + logger.info("{}".format(l1c_process_list)) + else: + logger.info("All l2a products already computed") + l2a_res = False + if l1c_process_list: + l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=nb_proc) + + def compute_cloudmasks(self, + date_min: str = None, + date_max: str = None, + nb_proc: int = 8): + """ + Compute all missing cloud masks for l2a products + """ + + cld_l2a_process_list = [] + cld_l2a_process_list.append(list(p.identifier for p in self.cloudmasks_missings.filter_dates(date_min = date_min, date_max = date_max))) + cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list)) + if cld_l2a_process_list: + logger.info("{} l2a products to process:".format(len(cld_l2a_process_list))) + logger.info("{}".format(cld_l2a_process_list)) + else: + logger.info("All cloud masks already computed") + cld_res = False + if cld_l2a_process_list: + cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=nb_proc) + + def compute_indices(self, + indices: list = [], + date_min: str = None, + date_max: str = None, + nb_proc: int = 8): + """ + Compute all missing indices for l2a products + - indices are given as a list + - if indices not provided, will compute missing dates of already existing indices for this tile (no new indice computed) + - indices won't be masked if no cloud masks are present, you have to compute cloudmasks first + """ + if not indices: + indices = list(self._paths["indices"].keys()) + else: + indices = [indice.upper() for indice in indices] + indices_l2a_process_list = [] + for i in indices: + l2a_list = [p.identifier for p in self.missing_indices(i).filter_dates(date_min = date_min, date_max = date_max)] + for j in l2a_list: + indices_l2a_process_list.append([j, i]) + if indices_l2a_process_list: + logger.info("{} l2a products to process:".format(len(indices_l2a_process_list))) + logger.info("{}".format(indices_l2a_process_list)) + else: + logger.info("All indices already computed") + indices_res = False + if indices_l2a_process_list: + indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc=nb_proc) + #~ toto=24 def clean_lib(self, remove_indice_tif: bool = False, remove: bool = False): """ - Search and clean processing error products + Function to search and clean corrupted files during processing errors from Tile + - unmoved error l2a products from l1c folder - moved error l2a products from l2a folder - cloud masks error (0kb) - indices error (0kb) + + :param clean_list: list of tiles to be cleaned. If not provided, will process all l1c Tiles. + :param remove_indice_tif: bool. If True will remove present TIFF files present in indice folders. Default value False. + :param remove: bool. If True will effectively remove corrupted files, if False will just list identified problems. Default value False. + """ #~ logger.info("Cleaning {} library".format(self.name)) @@ -495,7 +571,7 @@ class Tile: return {"identified_problems": nb_id, "removed_problems": nb_rm} def archive_l1c(self, - size: bool = False,): + size_only: bool = False,): """ Check and move l1c products to archive folder @@ -503,7 +579,6 @@ class Tile: l1c_archive_path = Path(Config().get("l1c_archive_path")) - logger.info("Scanning L1C ready to move") prod_list = ProductsList() archive_l1c_set = {a for a in {identifier.replace("L2A_", "L1C_").replace("_USER_", "__OPER__") for identifier in self.l2a.products} if a in set(self.l1c.products)} @@ -511,6 +586,7 @@ class Tile: for prod in archive_l1c_set: prod_list[prod] = {"date": self._products["l1c"][prod].date, "cloud_cover": self._products["l1c"][prod].cloud_cover} + if prod_list: count = 0 total_size = 0 @@ -518,10 +594,8 @@ class Tile: l1c = L1cProduct(prod.identifier) if not l1c.path.is_symlink(): count += 1 - if size: - total_size += getFolderSize(str(l1c.path)) - #~ logger.info("{}: {}".format(prod, human_size(getFolderSize(str(l1c.path))))) - else: + total_size += getFolderSize(str(l1c.path)) + if not size_only: move_path = l1c_archive_path / l1c.tile / l1c.path.name logger.info("archiving {}".format(l1c.identifier)) move_path.parent.mkdir(exist_ok=True) @@ -529,22 +603,23 @@ class Tile: distutils.dir_util.copy_tree(str(l1c.path), str(move_path)) distutils.dir_util.remove_tree(str(l1c.path)) l1c.path.symlink_to(move_path, target_is_directory = True) - - logger.info("{} products archived".format(count)) - if size: - logger.info("Total size to move: {}".format(human_size(total_size))) - return total_size + if size_only: + logger.info("{} l1c product(s) to archive ({})".format(count, human_size(total_size))) + else: + logger.info("{} l1c product(s) archived ({})".format(count, human_size(total_size))) + return total_size if not count: logger.info("No L1C products to archive") + return 0 - def archive_l2a(self): + def archive_l2a(self, + size_only: bool = False,): """ Check errors and move l2a products to archive folder """ if (self.clean_lib()['identified_problems']-self.clean_lib()['removed_problems']) == 0: - logger.info("No tile error processing l2a archive") l2a_archive_path = Path(Config().get("l2a_archive_path")) @@ -552,21 +627,36 @@ class Tile: if prod_list: count = 0 + total_size = 0 for prod in prod_list: l2a = L2aProduct(prod.identifier) if not l2a.path.is_symlink(): count += 1 - move_path = l2a_archive_path / l2a.tile / l2a.path.name - logger.info("archiving {}".format(l2a.identifier)) - move_path.parent.mkdir(exist_ok=True) - shutil.move(str(l2a.path), str(move_path.parent)) - l2a.path.symlink_to(move_path, target_is_directory = True) - logger.info("{} products archived".format(count)) + total_size += getFolderSize(str(l2a.path)) + if not size_only: + move_path = l2a_archive_path / l2a.tile / l2a.path.name + logger.info("archiving {}".format(l2a.identifier)) + move_path.parent.mkdir(exist_ok=True) + shutil.move(str(l2a.path), str(move_path.parent)) + l2a.path.symlink_to(move_path, target_is_directory = True) + if size_only: + logger.info("{} l2a product(s) to archive ({})".format(count, human_size(total_size))) + else: + logger.info("{} l2a product(s) archived ({})".format(count, human_size(total_size))) + return total_size else: logger.info("No L2A products, nothing to archive") + return 0 else: logger.info("Error(s) in l2a product(s) please correct them running clean_lib(remove=True) before archiving") - + return 0 + + def archive_all(self, + size_only: bool = False,): + l1c_size = self.archive_l1c(size_only = size_only) + l2a_size = self.archive_l2a(size_only = size_only) + return l1c_size + l2a_size + def update_latest_ql(self): """ Produce or update the latest l2a quicklook for the tile diff --git a/sen2chain/tileset.py b/sen2chain/tileset.py new file mode 100644 index 0000000..89acc68 --- /dev/null +++ b/sen2chain/tileset.py @@ -0,0 +1,117 @@ +#!/usr/bin/python3 +# -*- coding: utf-8 -*- + + + +import os +import shutil +import logging +import pathlib + +from pathlib import Path +# type annotations +from typing import List, Dict +from itertools import chain + +from .config import Config, SHARED_DATA +from .products import L1cProduct +from .tiles import Tile +from .utils import human_size, human_size_decimal +from .multi_processing import l2a_multiprocessing, cld_multiprocessing, idx_multiprocessing + +s2_tiles_index = SHARED_DATA.get("tiles_index") + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class TileSet: + """ + Module to process l2a, cloud masks and indices from a tile list, using multiprocessing. + """ + + def __init__(self, + tile_list: list = [], + ): + self.tile_list = tile_list + if not self.tile_list: + logger.info("TileSet list cannot be empty, please provide a tile list while initializing class") + + def __repr__(self) -> str: + return "TileSet: {}".format(self.tile_list) + + + def compute_l2a(self, + date_min: str = None, + date_max: str = None, + nb_proc: int = 8): + """ + Compute all missing l2a for l1c products + """ + l1c_process_list = [] + for tile in self.tile_list: + t = Tile(tile) + l1c_process_list.append(list(p.identifier for p in t.l2a_missings.filter_dates(date_min = date_min, date_max = date_max))) + l1c_process_list = list(chain.from_iterable(l1c_process_list)) + if l1c_process_list: + logger.info("{} L1C products to process:".format(len(l1c_process_list))) + logger.info("{}".format(l1c_process_list)) + l2a_multiprocessing(l1c_process_list, nb_proc=nb_proc) + else: + logger.info("All L2A already computed") + + def compute_cloudmasks(self, + date_min: str = None, + date_max: str = None, + nb_proc: int = 8): + """ + Compute all missing cloud masks for l2a products + """ + cld_l2a_process_list = [] + for tile in self.tile_list: + t = Tile(tile) + cld_l2a_process_list.append(list(p.identifier for p in t.cloudmasks_missings.filter_dates(date_min = date_min, date_max = date_max))) + cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list)) + if cld_l2a_process_list: + logger.info("{} L2A products to process:".format(len(cld_l2a_process_list))) + logger.info("{}".format(cld_l2a_process_list)) + cld_multiprocessing(cld_l2a_process_list, nb_proc=nb_proc) + else: + logger.info("All cloud masks already computed") + + def compute_indices(self, + indices: list = [], + date_min: str = None, + date_max: str = None, + nb_proc: int = 8): + """ + Compute all missing indices for l2a products + - indices are given as a list + - if indices not provided, will compute missing dates of already existing indices for each tile (no new indice computed) + - ! indices won't be masked if no cloud masks are present, you have to compute cloudmasks first + """ + indices_l2a_process_list = [] + for tile in self.tile_list: + t = Tile(tile) + if not indices: + indices = list(t._paths["indices"].keys()) + else: + indices = [indice.upper() for indice in indices] + + for i in indices: + l2a_list = [p.identifier for p in t.missing_indices(i).filter_dates(date_min = date_min, date_max = date_max)] + for j in l2a_list: + indices_l2a_process_list.append([j, i]) + if indices_l2a_process_list: + logger.info("{} l2a products to process:".format(len(indices_l2a_process_list))) + logger.info("{}".format(indices_l2a_process_list)) + idx_multiprocessing(indices_l2a_process_list, nb_proc=nb_proc) + else: + logger.info("All indices already computed") + + @property + def info(self): + for t in self.tile_list: + logger.info(t) + Tile(t).info + -- GitLab