From 2c7ae54fdabdb8da7b02d3b854558863549ad7d1 Mon Sep 17 00:00:00 2001
From: pmouquet <pascal.mouquet@ird.fr>
Date: Fri, 17 Mar 2023 16:59:14 +0400
Subject: [PATCH] added possibility de copy l2a side products for products,
 tile, multiprocessing, etc. Modifying Job to copy these side products while
 generating l2a

---
 sen2chain/data/job_ini.cfg    |  8 +++++-
 sen2chain/jobs.py             | 11 ++++++++-
 sen2chain/multi_processing.py | 27 +++++++++++++++-----
 sen2chain/multiprocess_l2a.py |  3 ++-
 sen2chain/products.py         | 46 ++++++++++++++++++++++++++++-------
 sen2chain/tiles.py            | 22 ++++++++++++++---
 6 files changed, 96 insertions(+), 21 deletions(-)

diff --git a/sen2chain/data/job_ini.cfg b/sen2chain/data/job_ini.cfg
index 4266c99..0d01137 100644
--- a/sen2chain/data/job_ini.cfg
+++ b/sen2chain/data/job_ini.cfg
@@ -5,13 +5,16 @@
 # tries: the number of times the download should loop before stopping, to download OFFLINE products
 # sleep: the time in min to wait between loops
 # nb_proc: the number of cpu cores to use for this job, default 8
+# copy_l2a_side_products: to duplicate msk_cldprb_20m and scl_20m from l2a folder to cloudmask folder after l2a production. 
+#   Interesting if you plan to remove l2a to save disk space, but want to keep these 2 files for cloudmask generation and better extraction
+#   Possible values: True | False
 
 # tile: tile identifier, format ##XXX, comment line using ! before tile name
 # date_min the start date for this task, possible values:
 #    empty (2015-01-01 will be used) | any date | now-xx (xx nb of days before now to consider)
 # date_max the last date for this task, possible values:
 #    empty (9999-12-31 will be used) | any date | now
-#
+# max_clouds: max cloud cover to consider for downloading images
 # l1c: download l1c: True|False
 # l2a: compute l2a with sen2chain: True | False
 # cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0
@@ -22,6 +25,9 @@
 logs = False
 timing = 0 0 * * *
 provider = peps
+tries = 1
+sleep = 0
 nb_proc = 8
+copy_l2a_sideproducts = False
 
 tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;remove
diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py
index b4ea6c4..05b6984 100644
--- a/sen2chain/jobs.py
+++ b/sen2chain/jobs.py
@@ -136,6 +136,7 @@ class Job:
             self.tries,
             self.sleep,
             self.nb_proc,
+            self.copy_l2a_sideproducts,
             self.tasks,
         )
         
@@ -227,13 +228,16 @@ class Job:
                     "# tries: the number of times the download should loop before stopping, to download OFFLINE products",
                     "# sleep: the time in min to wait between loops",
                     "# nb_proc: the number of cpu cores to use for this job, default 8",
+                    "# copy_l2a_side_products: to duplicate msk_cldprb_20m and scl_20m from l2a folder to cloudmask folder after l2a production.",
+                    "#   Interesting if you plan to remove l2a to save disk space, but want to keep these 2 files for cloudmask generation and better extraction",
+                    "#   Possible values: True | False",
                     "#",
                     "# tile: tile identifier, format ##XXX, comment line using ! before tile name",
                     "# date_min the start date for this task, possible values:",
                     "#    empty (2015-01-01 will be used) | any date | now-xx (xx nb of days before now to consider)",
                     "# date_max the last date for this task, possible values:",
                     "#    empty (9999-12-31 will be used) | any date | now",
-                    "#",
+                    "# max_clouds: max cloud cover to consider for downloading images",
                     "# l1c: download l1c: True|False",
                     "# l2a: compute l2a with sen2chain: True | False", 
                     "# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0",
@@ -252,6 +256,7 @@ class Job:
                     "tries = " + str(self.tries),
                     "sleep = " + str(self.sleep),
                     "nb_proc = " + str(self.nb_proc),
+                    "copy_l2a_sideproducts = " + str(self.copy_l2a_sideproducts),
                     "",
                     "",
                 ]
@@ -374,6 +379,10 @@ class Job:
             self.nb_proc = int(parser["top"]["nb_proc"])
         except:
             self.nb_proc = 8
+        try:
+            self.copy_l2a_sideproducts = bool(setuptools.distutils.util.strtobool(parser["top"]["nb_proc"]))
+        except:
+            self.copy_l2a_sideproducts = False           
         for i in range(1, 10):
             try:
                 self.tasks = pd.read_csv(
diff --git a/sen2chain/multi_processing.py b/sen2chain/multi_processing.py
index f33b580..c98ddc4 100644
--- a/sen2chain/multi_processing.py
+++ b/sen2chain/multi_processing.py
@@ -2,7 +2,7 @@
 
 import multiprocessing, subprocess
 import os
-from time import sleep
+import time
 import logging
 from functools import partial
 
@@ -15,7 +15,10 @@ logging.basicConfig(level=logging.INFO)
 logger.setLevel(logging.INFO)
 
 
-def multi(product):
+def multi(product_copyl2asideproducts):
+    product = product_copyl2asideproducts[0]
+    copy_l2a_sideproducts = product_copyl2asideproducts[1]
+    proc = None
     try:
         fwd = os.path.dirname(os.path.realpath(__file__))
         logger.info("Processing {}".format(product))
@@ -26,6 +29,7 @@ def multi(product):
                 "/usr/bin/python3",
                 fwd + "/multiprocess_l2a.py",
                 product,
+                copy_side_products,
             ]
             proc = subprocess.Popen(cmd)
 
@@ -34,14 +38,21 @@ def multi(product):
                 "_OPER_", "_USER_"
             )
             l2a_prod = L2aProduct(l2a_identifier)
+            timeout = time.time() + 2*60*60
             while not (l2a_prod.in_library):
-                sleep(5)
+                if time.time() > timeout:
+                    logger.info("Timeout (2h) reached for Sen2Cor processing, killing process {}".format(proc.pid))
+                    os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
+                    break
+                time.sleep(5)
             logger.info("End {}".format(product))
     except:
         logger.info("Plante {}".format(product))
+        if proc:
+            logger.info("Killing process {}".format(proc.pid))
+            os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
         pass
 
-
 def l2a_multiprocessing(process_list, nb_proc=4):
     """ """
     nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1)
@@ -109,7 +120,8 @@ def multi_cld_ver_pro_iter_repro(l2a_ver_pro_iter_repro):
 
 
 def cld_version_probability_iterations_reprocessing_multiprocessing(
-    process_list, nb_proc=4
+    process_list, 
+    nb_proc=4
 ):
     """ """
     nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1)
@@ -152,7 +164,10 @@ def multi_idx(l2a_id_idx):
         pass
 
 
-def idx_multiprocessing(process_list, nb_proc=4):
+def idx_multiprocessing(
+    process_list, 
+    nb_proc=4,
+):
     """ """
     nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1)
     pool = multiprocessing.Pool(nb_proc)
diff --git a/sen2chain/multiprocess_l2a.py b/sen2chain/multiprocess_l2a.py
index db74f42..49e85d6 100644
--- a/sen2chain/multiprocess_l2a.py
+++ b/sen2chain/multiprocess_l2a.py
@@ -10,6 +10,7 @@ import sys
 from sen2chain import L1cProduct
 
 identifier = sys.argv[1]
+copy_l2a_sideproducts = sys.argv[2]
 
 l1c = L1cProduct(identifier)
-l1c.process_l2a()
+l1c.process_l2a(copy_l2a_sideproducts = copy_l2a_sideproducts)
diff --git a/sen2chain/products.py b/sen2chain/products.py
index d63de43..beba99a 100755
--- a/sen2chain/products.py
+++ b/sen2chain/products.py
@@ -225,6 +225,7 @@ class L1cProduct(Product):
         self, 
         reprocess: bool = False,
         s2c_path: Union[str, pathlib.PosixPath] = None,
+        copy_l2a_sideproducts: bool = False,
     ) -> "L1cProduct":
         """ process with sen2cor """
         logger.info("{}: processing L2A".format(self.identifier))
@@ -245,6 +246,7 @@ class L1cProduct(Product):
         else:
             if not reprocess:
                 logger.info("{} already exists.".format(l2a_identifier))
+            
             else:
                 if l2a_prod.in_library:
                     shutil.rmtree(str(l2a_prod.path))
@@ -274,6 +276,17 @@ class L1cProduct(Product):
             l2a_prod = L2aProduct(l2a_identifier)
             l2a_prod.set_permissions()
             l2a_prod.update_md(sen2cor_version = get_Sen2Cor_version(s2c_path))
+            
+            if copy_l2a_sideproducts:
+                if l2a_prod.path.exists():
+                    cloudmask = NewCloudMaskProduct(l2a_identifier = l2a_identifier)
+                    cloudmask.path.parent.mkdir(parents = True, exist_ok = True)
+                    if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists():
+                        logger.info("Copying msk_cldprb_20m to cloumask folder")
+                        shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent)
+                    if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists():
+                        shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent)
+                        logger.info("Copying scl_20m to cloumask folder")
         return self
 
     def process_ql(
@@ -455,7 +468,10 @@ class L2aProduct(Product):
     _tiled_metadata = "MTD_MSIL2A.xml"
 
     def __init__(
-        self, identifier: str = None, tile: str = None, path: str = None
+        self, 
+        identifier: str = None, 
+        tile: str = None, 
+        path: str = None
     ) -> None:
         super().__init__(identifier=identifier, tile=tile, path=path)
         if not re.match(r".*L2A_.*", identifier):
@@ -991,14 +1007,15 @@ class L2aProduct(Product):
             sen2cor_version=sen2cor_version,
         )
 
-    def remove(self):
+    def remove(
+        self,
+        copy_l2a_sideproducts: bool = False,
+    ):
+        
+        if copy_l2a_sideproducts:
+            self.copy_l2a_sideproducts()
+
         if self.path.exists():
-            cloudmask = NewCloudMaskProduct(l2a_identifier = self.identifier)
-            if cloudmask.path.parent.exists():
-                if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists():
-                    shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent)
-                if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists():
-                    shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent)
             if self.path.is_symlink():
                 l2a_path = os.readlink(str(self.path))
                 logger.info("Removing: {}".format(l2a_path))
@@ -1010,7 +1027,18 @@ class L2aProduct(Product):
                 logger.info("Removing: {}".format(self.path))
         else:
             logger.info("L2A product not on disk")
-
+    
+    def copy_l2a_sideproducts(self):
+        if self.path.exists():
+            cloudmask = NewCloudMaskProduct(l2a_identifier = self.identifier)
+            cloudmask.path.parent.mkdir(parents = True, exist_ok = True)
+            if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists():
+                logger.info("Copying msk_cldprb_20m to cloumask folder")
+                shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent)
+            if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists():
+                logger.info("Copying scl_20m to cloumask folder")
+                shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent)
+        
     @property
     def sen2chain_version(self):
         return Sen2ChainMetadataParser(
diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py
index e8b5e22..a77f1f8 100644
--- a/sen2chain/tiles.py
+++ b/sen2chain/tiles.py
@@ -946,6 +946,7 @@ class Tile:
         cover_min: int = 0,
         cover_max: int = 100,        
         nb_proc: int = 4,
+        copy_l2a_sideproducts: bool = False,
     ):
         """
         Compute all missing l2a for l1c products between date_min and date_max
@@ -979,7 +980,10 @@ class Tile:
         l1c_process_list = []
         l1c_process_list.append(
             list(
-                p.identifier
+                [
+                    p.identifier,
+                    copy_l2a_sideproducts,
+                ]
                 for p in self.l2a_missings.filter_dates(
                     date_min=date_min, date_max=date_max
                 ).filter_clouds(
@@ -999,7 +1003,7 @@ class Tile:
         l2a_res = False
         if l1c_process_list:
             l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=nb_proc)
-
+            
     def compute_cloudmasks(
         self,
         cm_version: str = "CM001",
@@ -1678,6 +1682,7 @@ class Tile:
         self,
         product_list: list = [],
         entire: bool = False,
+        copy_l2a_sideproducts: bool = False,
     ):
         """
         Remove l2a files
@@ -1686,5 +1691,16 @@ class Tile:
             product_list = [product.identifier for product in self.l2a]
         for identifier in product_list:
             l2a = L2aProduct(identifier)
-            l2a.remove()
+            l2a.remove(copy_l2a_sideproducts = copy_l2a_sideproducts)
         logger.info("Removed: {} products".format(len(product_list)))
+    
+    def copy_l2a_sideproducts(
+        self,
+        product_list: list = [],
+    ):
+        if not product_list:
+            product_list = [product.identifier for product in self.l2a]
+        for identifier in product_list:
+            l2a = L2aProduct(identifier)
+            l2a.copy_l2a_sideproducts()
+    
-- 
GitLab