diff --git a/sen2chain/data/job_ini.cfg b/sen2chain/data/job_ini.cfg index 4266c99f2ddfded0b1e84fac0f49aeb9bea097d3..268165625806e34d8d44ff2b0f31d761804ba9d9 100644 --- a/sen2chain/data/job_ini.cfg +++ b/sen2chain/data/job_ini.cfg @@ -5,23 +5,30 @@ # tries: the number of times the download should loop before stopping, to download OFFLINE products # sleep: the time in min to wait between loops # nb_proc: the number of cpu cores to use for this job, default 8 +# copy_l2a_side_products: to duplicate msk_cldprb_20m and scl_20m from l2a folder to cloudmask folder after l2a production. +# Interesting if you plan to remove l2a to save disk space, but want to keep these 2 files for cloudmask generation and better extraction +# Possible values: True | False # tile: tile identifier, format ##XXX, comment line using ! before tile name # date_min the start date for this task, possible values: -# empty (2015-01-01 will be used) | any date | now-xx (xx nb of days before now to consider) +# empty (2015-01-01 will be used) | any date | today-xx (xx nb of days before today to consider) # date_max the last date for this task, possible values: -# empty (9999-12-31 will be used) | any date | now -# +# empty (9999-12-31 will be used) | any date | today +# max_clouds: max cloud cover to consider for downloading images, and computing l2a products # l1c: download l1c: True|False # l2a: compute l2a with sen2chain: True | False -# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0 +# cloudmasks: the cloudmask(s) to compute and use to mask indice(s). Possible values range from none (False) to multiple cloudmasks: +# False | CM001/CM002/CM003-PRB1-ITER5/CM004-CSH1-CMP1-CHP1-TCI1-ITER0/etc. # indices: False | All | NDVI/NDWIGAO/etc. -# remove: to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a +# remove: used to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a # comments: free user comments, ie tile name, etc. logs = False timing = 0 0 * * * provider = peps +tries = 1 +sleep = 0 nb_proc = 8 +copy_l2a_sideproducts = False -tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;remove +tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;remove;comments diff --git a/sen2chain/download_eodag.py b/sen2chain/download_eodag.py index 72a6117b8063cc2593c8c2c401bb23914306f68c..3283dffa2b5b974cbef9cf3e75a6bd91a2a5a442 100644 --- a/sen2chain/download_eodag.py +++ b/sen2chain/download_eodag.py @@ -116,7 +116,11 @@ class S2cEodag: self.products.remove(p) logger.info("{} - local l1c {} - filtering (wrong Tile)".format(p.properties["title"], l1c_presence)) else: - # logger.info(p.properties["cloudCover"]) + try: + prop = p.properties["storageStatus"] + except: + if p.properties["storage"]["mode"] == "tier2": + p.properties["storageStatus"] = "ONLINE" if not(min_cloudcover <= float(p.properties["cloudCover"] or 0) <= max_cloudcover): self.products.remove(p) logger.info("{} - local l1c {} - filtering (CC = {}%)".format(p.properties["title"], l1c_presence, int(p.properties["cloudCover"]))) @@ -127,6 +131,7 @@ class S2cEodag: self.products.remove(p) else: logger.info("{} - local l1c {} - remote {}".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) + elif ref == "l2a": if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): logger.info("{} - local l1c {} - local l2a {} - filtering".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"])) diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index 37e499ef9400f3710eb440a265ae6c52c48fd265..22e491225918b832c2b72a47a4738154e80055ee 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -128,15 +128,25 @@ class Job: self._log_folder_path = Path(Config().get("log_path")) / ("job_" + jid) def __repr__(self): - return "logs: {}\ntiming: {}\ncron_status: {}\nprovider: {}\ndownload tries: {}x / sleep: {}min\nnb_proc: {}\ntasks:\n{}".format( - self.logs, - self.timing, - self.cron_status, - self.provider, - self.tries, - self.sleep, - self.nb_proc, - self.tasks, + return ( + "logs: {}" + "\ntiming: {}" + "\ncron_status: {}" + "\nprovider: {}" + "\ndownload tries: {} x / sleep: {} min" + "\nnb_proc: {}" + "\ncopy_l2a_sideproducts: {}" + "\n\ntasks:\n{}".format( + self.logs, + self.timing, + self.cron_status, + self.provider, + self.tries, + self.sleep, + self.nb_proc, + self.copy_l2a_sideproducts, + self.tasks, + ) ) def init(self): @@ -150,7 +160,7 @@ class Job: ("max_clouds", [100]), ("l1c", ["False"]), ("l2a", ["False"]), - ("cloudmask", ["False"]), + ("cloudmasks", ["False"]), ("indices", ["False"]), ("remove", ["False"]), ("comments", [""]), @@ -169,7 +179,7 @@ class Job: "max_clouds": [100], "l1c": ["False"], "l2a": ["False"], - "cloudmask": ["False"], + "cloudmasks": ["False"], "indices": ["False"], "remove": ["False"], "comments": [""], @@ -179,7 +189,7 @@ class Job: self.tasks = pd.concat([self.tasks, row], ignore_index=True)[ self.tasks.columns ] - self.format_indices() + self.format_cm_indices() logger.info("\n{}".format(self.tasks)) def task_edit(self, task_id: int = None, **kwargs): @@ -199,7 +209,7 @@ class Job: ) else: logger.info("{} not found".format(arg)) - self.format_indices() + self.format_cm_indices() logger.info("\n{}".format(self.tasks)) else: logger.info("Task_number not found") @@ -227,18 +237,22 @@ class Job: "# tries: the number of times the download should loop before stopping, to download OFFLINE products", "# sleep: the time in min to wait between loops", "# nb_proc: the number of cpu cores to use for this job, default 8", + "# copy_l2a_side_products: to duplicate msk_cldprb_20m and scl_20m from l2a folder to cloudmask folder after l2a production.", + "# Interesting if you plan to remove l2a to save disk space, but want to keep these 2 files for cloudmask generation and better extraction", + "# Possible values: True | False", "#", "# tile: tile identifier, format ##XXX, comment line using ! before tile name", "# date_min the start date for this task, possible values:", - "# empty (2015-01-01 will be used) | any date | now-xx (xx nb of days before now to consider)", + "# empty (2015-01-01 will be used) | any date | today-xx (xx nb of days before today to consider)", "# date_max the last date for this task, possible values:", - "# empty (9999-12-31 will be used) | any date | now", - "#", + "# empty (9999-12-31 will be used) | any date | today", + "# max_clouds: max cloud cover to consider for downloading images", "# l1c: download l1c: True|False", "# l2a: compute l2a with sen2chain: True | False", - "# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0", + "# cloudmasks: the cloudmask(s) to compute and use to mask indice(s). Possible values range from none (False) to multiple cloudmasks:", + "# False | CM001/CM002/CM003-PRB1-ITER5/CM004-CSH1-CMP1-CHP1-TCI1-ITER0/etc.", "# indices: False | All | NDVI/NDWIGAO/etc.", - "# remove: to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a", + "# remove: used to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a", "# comments: free user comments, ie tile name, etc.", "", "", @@ -252,6 +266,7 @@ class Job: "tries = " + str(self.tries), "sleep = " + str(self.sleep), "nb_proc = " + str(self.nb_proc), + "copy_l2a_sideproducts = " + str(self.copy_l2a_sideproducts), "", "", ] @@ -263,9 +278,9 @@ class Job: for line in header: ict.write(line) # self.tasks.to_csv(ict) - self.unformat_indices() + self.unformat_cm_indices() self.tasks.to_csv(ict, index=False, sep=";") - self.format_indices() + self.format_cm_indices() def get_cron_status(self): @@ -356,7 +371,11 @@ class Job: parser.read_string( "[top]\n" + stream.read(), ) # This line does the trick. - self.logs = bool(setuptools.distutils.util.strtobool(parser["top"]["logs"])) + self.logs = bool( + setuptools.distutils.util.strtobool( + parser["top"]["logs"] + ) + ) self.timing = parser["top"]["timing"] try: self.provider = parser["top"]["provider"] @@ -374,6 +393,15 @@ class Job: self.nb_proc = int(parser["top"]["nb_proc"]) except: self.nb_proc = 8 + try: + self.copy_l2a_sideproducts = bool( + setuptools.distutils.util.strtobool( + parser["top"]["copy_l2a_sideproducts"] + ) + ) + except: + self.copy_l2a_sideproducts = False + for i in range(1, 10): try: self.tasks = pd.read_csv( @@ -386,17 +414,19 @@ class Job: header = i, skip_blank_lines = True, ) - # logger.info(i) - # logger.info(self.tasks) if 'comments' not in self.tasks: self.tasks['comments'] = "" - self.format_indices() - # logger.info(i) + if 'cloudmask' in self.tasks: + self.tasks.rename( + columns={"cloudmask": "cloudmasks"}, + inplace=True, + ) + self.format_cm_indices() break except: pass - def format_indices(self): + def format_cm_indices(self): for index, row in self.tasks.iterrows(): if not row.indices == "False": if row.indices == "All": @@ -408,12 +438,19 @@ class Job: self.tasks.at[index, "indices"] = str(row.indices).split( "/" ) + if not row.cloudmasks == "False": + if not isinstance(self.tasks.at[index, "cloudmasks"] , list): + self.tasks.at[index, "cloudmasks"] = str(row.cloudmasks).split( + "/" + ) - def unformat_indices(self): + def unformat_cm_indices(self): for index, row in self.tasks.iterrows(): if not((row.indices == "False") or (row.indices == "All")): # logger.info(row.indices) self.tasks.at[index, "indices"] = '/'.join(self.tasks.at[index, "indices"]) + if not (row.cloudmasks == "False"): + self.tasks.at[index, "cloudmasks"] = '/'.join(self.tasks.at[index, "cloudmasks"]) def fill_dates_clouds( self, @@ -427,7 +464,7 @@ class Job: tasks.at[index, "date_min"] = datetime.datetime.strptime( "2015-01-01", "%Y-%m-%d" ).strftime("%Y-%m-%d") - elif "now" in row.date_min: + elif "today" in row.date_min: tasks.at[index, "date_min"] = ( datetime.datetime.now() - datetime.timedelta( days = int(row.date_min.split("-")[1]) @@ -437,7 +474,7 @@ class Job: tasks.at[index, "date_max"] = datetime.datetime.strptime( "9999-12-31", "%Y-%m-%d" ).strftime("%Y-%m-%d") - elif row.date_max == "now": + elif row.date_max == "today": tasks.at[index, "date_max"] = ( datetime.datetime.now() + datetime.timedelta(days=1) ).strftime("%Y-%m-%d") @@ -592,12 +629,13 @@ class Job: start = row.date_min, end = row.date_max, new = False, + max_cloudcover = row.max_clouds, ) download_list.extend(tile_download_list) logger.info("Checked tile: {} - {} new product(s) to download".format(t.name, len(tile_download_list))) if self.logs: if len(tile_download_list): - f.write("{}\n - {} - {} new product(s) to download\n".format( + f.write("{}\n{} - {} new product(s) to download\n".format( datetime.datetime.now(), t.name, len(tile_download_list)), @@ -661,10 +699,13 @@ class Job: if bool(setuptools.distutils.util.strtobool(str(row.l2a))): t = Tile(row.tile) l1c_to_process = list( - p.identifier + [ + p.identifier, + self.copy_l2a_sideproducts, + ] for p in t.l2a_missings.filter_dates( date_min=row.date_min, date_max=row.date_max - ) + ).filter_clouds(cover_max = row.max_clouds) ) l1c_process_list.append(l1c_to_process) logger.info( @@ -672,11 +713,18 @@ class Job: row.tile, len(l1c_to_process) ) ) - l1c_process_list = list( - set( - chain.from_iterable(l1c_process_list) - ) - ) + # l1c_process_list = list( + # set( + # chain.from_iterable(l1c_process_list) + # ) + # ) + + ### remove duplicate l2a + l1c_process_list.sort() + uniq = list(k for k,_ in groupby(l1c_process_list)) + l1c_process_list = uniq + + logger.info( "Process list ({} files): {}".format( len(l1c_process_list), l1c_process_list @@ -701,7 +749,7 @@ class Job: if "l1c" in str(row.remove).lower(): t = Tile(row.tile) prodlist = [p for p in l1c_process_list if row.tile in p] - t.remove_l2a(prodlist) + t.remove_l1c(prodlist) logger.info("Removing downloaded l1c products: {}".format(prodlist)) if self.logs: f.write("{}\nRemoving downloaded l1c products: {}\n\n".format(datetime.datetime.now(), prodlist)) @@ -716,50 +764,56 @@ class Job: cld_l2a_process_list = [] for index, row in tasks.iterrows(): # if not bool(distutils.util.strtobool(str(row.cloudmask))): - if not (row.cloudmask == "False" or not row.cloudmask): - cloudmask = self.get_cm_version(row.cloudmask) - t = Tile(row.tile) - l2a_to_process = [ - p.identifier - for p in t.cloudmasks_missing( - cm_version = cloudmask["cm_version"].lower(), - probability = cloudmask["probability"], - iterations = cloudmask["iterations"], - cld_shad = cloudmask["cld_shad"], - cld_med_prob = cloudmask["cld_med_prob"], - cld_hi_prob = cloudmask["cld_hi_prob"], - thin_cir = cloudmask["thin_cir"], - ).filter_dates( - date_min=row.date_min, date_max=row.date_max - ) - ] - for j in l2a_to_process: - l2a_cm_details = [ - j, - cloudmask["cm_version"].upper(), - cloudmask["probability"], - cloudmask["iterations"], - cloudmask["cld_shad"], - cloudmask["cld_med_prob"], - cloudmask["cld_hi_prob"], - cloudmask["thin_cir"], - reprocess, + if not (row.cloudmasks == "False" or not row.cloudmasks): + for cm in row.cloudmasks: + cloudmask = self.get_cm_version(cm) + t = Tile(row.tile) + l2a_to_process = [ + p.identifier + for p in t.cloudmasks_missing( + cm_version = cloudmask["cm_version"].upper(), + probability = cloudmask["probability"], + iterations = cloudmask["iterations"], + cld_shad = cloudmask["cld_shad"], + cld_med_prob = cloudmask["cld_med_prob"], + cld_hi_prob = cloudmask["cld_hi_prob"], + thin_cir = cloudmask["thin_cir"], + ).filter_dates( + date_min=row.date_min, date_max=row.date_max + ) ] - cld_l2a_process_list.append(l2a_cm_details) - logger.info( - "{} - {} l2a product(s) to process".format( - row.tile, len(l2a_to_process) - ) - ) - if self.logs: + for j in l2a_to_process: + l2a_cm_details = [ + j, + cloudmask["cm_version"].upper(), + cloudmask["probability"], + cloudmask["iterations"], + cloudmask["cld_shad"], + cloudmask["cld_med_prob"], + cloudmask["cld_hi_prob"], + cloudmask["thin_cir"], + reprocess, + ] + cld_l2a_process_list.append(l2a_cm_details) if len(l2a_to_process): - f.write( - "{} - {} l2a product(s) to process\n".format( - row.tile, len(l2a_to_process) + logger.info( + "{} - {} - {} l2a product(s) to process".format( + row.tile, + cm, + len(l2a_to_process), ) ) - f.flush() - # removing duplicate cloudmask computing + if self.logs: + if len(l2a_to_process): + f.write( + "{} - {} - {} l2a product(s) to process\n".format( + row.tile, + cm, + len(l2a_to_process), + ) + ) + f.flush() + # removing duplicate cloudmasks before computing cld_l2a_process_list.sort() uniq = list(k for k,_ in groupby(cld_l2a_process_list)) cld_l2a_process_list = uniq @@ -777,7 +831,6 @@ class Job: for cld in cld_l2a_process_list: f.write("{}\n".format(cld)) f.write("\n") - f.flush() cld_res = False @@ -796,66 +849,70 @@ class Job: indices_l2a_process_list = [] for index, row in tasks.iterrows(): if not (row.indices == "False" or not row.indices): - - nodata_clouds = not (row.cloudmask == "False" or not row.cloudmask) - cloudmask = self.get_cm_version(row.cloudmask) - t = Tile(row.tile) - # indices_list = row.indices.split("/") - for i in row.indices: - # l2a_list = [p.identifier for p in t.missing_indices(i).filter_dates(date_min = row.start_time, date_max = row.end_time)] - l2a_to_process = [ - p.identifier - for p in t.missing_indices( - indice=i, - nodata_clouds=nodata_clouds, - cm_version = cloudmask["cm_version"].lower(), - probability = cloudmask["probability"], - iterations = cloudmask["iterations"], - cld_shad = cloudmask["cld_shad"], - cld_med_prob = cloudmask["cld_med_prob"], - cld_hi_prob = cloudmask["cld_hi_prob"], - thin_cir = cloudmask["thin_cir"], - ).filter_dates( - date_min=row.date_min, date_max=row.date_max - ) - ] - for j in l2a_to_process: - l2a_ind_details = [ - j, - i, - reprocess, - nodata_clouds, - quicklook, - cloudmask["cm_version"].upper(), - cloudmask["probability"], - cloudmask["iterations"], - cloudmask["cld_shad"], - cloudmask["cld_med_prob"], - cloudmask["cld_hi_prob"], - cloudmask["thin_cir"], - ] - indices_l2a_process_list.append(l2a_ind_details) - logger.info( - "{}: adding - {}: {} l2a products".format( - row.tile, i, len(l2a_to_process) - ) - ) - if self.logs: - if len(l2a_to_process): - f.write( - "{} - {} {}\n".format( - row.tile, - len(l2a_to_process), + nodata_clouds = not (row.cloudmasks == "False" or not row.cloudmasks) + if nodata_clouds: + for cm in row.cloudmasks: + cloudmask = self.get_cm_version(cm) + t = Tile(row.tile) + for i in row.indices: + l2a_to_process = [ + p.identifier + for p in t.missing_indices( + indice=i, + nodata_clouds=nodata_clouds, + cm_version = cloudmask["cm_version"].upper(), + probability = cloudmask["probability"], + iterations = cloudmask["iterations"], + cld_shad = cloudmask["cld_shad"], + cld_med_prob = cloudmask["cld_med_prob"], + cld_hi_prob = cloudmask["cld_hi_prob"], + thin_cir = cloudmask["thin_cir"], + ).filter_dates( + date_min=row.date_min, date_max=row.date_max + ) + ] + for j in l2a_to_process: + l2a_ind_details = [ + j, i, + reprocess, + nodata_clouds, + quicklook, + cloudmask["cm_version"].upper(), + cloudmask["probability"], + cloudmask["iterations"], + cloudmask["cld_shad"], + cloudmask["cld_med_prob"], + cloudmask["cld_hi_prob"], + cloudmask["thin_cir"], + ] + indices_l2a_process_list.append(l2a_ind_details) + if len(l2a_to_process): + logger.info( + "{} - {} - {} - {} l2a product(s) to process".format( + row.tile, + cm, + i, + len(l2a_to_process) + ) ) - ) - f.flush() + if self.logs: + if len(l2a_to_process): + f.write( + "{} - {} - {} - {} l2a product(s) to process\n".format( + row.tile, + cm, + i, + len(l2a_to_process), + ) + ) + f.flush() # removing duplicate indice computing indices_l2a_process_list.sort() uniq = list(k for k,_ in groupby(indices_l2a_process_list)) indices_l2a_process_list = uniq - + logger.info( "Indice computing process list ({} l2a products): {}".format( diff --git a/sen2chain/multi_processing.py b/sen2chain/multi_processing.py index 17c9cf474877e794d475d8a72d1385d6df3331cb..43e42f0662b6a4504b01b6d13efb74850de60b45 100644 --- a/sen2chain/multi_processing.py +++ b/sen2chain/multi_processing.py @@ -2,7 +2,7 @@ import multiprocessing, subprocess import os -from time import sleep +import time import logging from functools import partial @@ -15,7 +15,10 @@ logging.basicConfig(level=logging.INFO) logger.setLevel(logging.INFO) -def multi(product): +def multi(product_copyl2asideproducts): + product = product_copyl2asideproducts[0] + copy_l2a_sideproducts = product_copyl2asideproducts[1] + proc = None try: fwd = os.path.dirname(os.path.realpath(__file__)) logger.info("Processing {}".format(product)) @@ -26,6 +29,7 @@ def multi(product): "/usr/bin/python3", fwd + "/multiprocess_l2a.py", product, + copy_l2a_sideproducts, ] proc = subprocess.Popen(cmd) @@ -34,14 +38,21 @@ def multi(product): "_OPER_", "_USER_" ) l2a_prod = L2aProduct(l2a_identifier) + timeout = time.time() + 2*60*60 while not (l2a_prod.in_library): - sleep(5) + if time.time() > timeout: + logger.info("Timeout (2h) reached for Sen2Cor processing, killing process {}".format(proc.pid)) + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) + break + time.sleep(5) logger.info("End {}".format(product)) except: logger.info("Plante {}".format(product)) + if proc: + logger.info("Killing process {}".format(proc.pid)) + os.killpg(os.getpgid(proc.pid), signal.SIGTERM) pass - def l2a_multiprocessing(process_list, nb_proc=4): """ """ nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) @@ -95,7 +106,7 @@ def multi_cld_ver_pro_iter_repro(l2a_ver_pro_iter_repro): reprocess = l2a_ver_pro_iter_repro[8] try: l2a.compute_cloud_mask( - cm_version=cm_version.lower(), + cm_version=cm_version, probability=probability, iterations=iterations, cld_shad=cld_shad, @@ -109,7 +120,8 @@ def multi_cld_ver_pro_iter_repro(l2a_ver_pro_iter_repro): def cld_version_probability_iterations_reprocessing_multiprocessing( - process_list, nb_proc=4 + process_list, + nb_proc=4 ): """ """ nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) @@ -140,7 +152,7 @@ def multi_idx(l2a_id_idx): reprocess=reprocess, nodata_clouds=nodata_clouds, quicklook=quicklook, - cm_version=cm_version.lower(), + cm_version=cm_version, probability=probability, iterations=iterations, cld_shad=cld_shad, @@ -152,7 +164,10 @@ def multi_idx(l2a_id_idx): pass -def idx_multiprocessing(process_list, nb_proc=4): +def idx_multiprocessing( + process_list, + nb_proc=4, +): """ """ nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) pool = multiprocessing.Pool(nb_proc) diff --git a/sen2chain/multiprocess_l2a.py b/sen2chain/multiprocess_l2a.py index db74f4202c3ec6692feed3071da037dc893cd7a1..49e85d62d4c3c095de0bc597a71132eadda1e807 100644 --- a/sen2chain/multiprocess_l2a.py +++ b/sen2chain/multiprocess_l2a.py @@ -10,6 +10,7 @@ import sys from sen2chain import L1cProduct identifier = sys.argv[1] +copy_l2a_sideproducts = sys.argv[2] l1c = L1cProduct(identifier) -l1c.process_l2a() +l1c.process_l2a(copy_l2a_sideproducts = copy_l2a_sideproducts) diff --git a/sen2chain/products.py b/sen2chain/products.py index e4bcbfc3fb09dbad0b97422f1ede84fd74b75092..22411e0967918f30569caac7b6e61ccc3412e076 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -225,6 +225,7 @@ class L1cProduct(Product): self, reprocess: bool = False, s2c_path: Union[str, pathlib.PosixPath] = None, + copy_l2a_sideproducts: bool = False, ) -> "L1cProduct": """ process with sen2cor """ logger.info("{}: processing L2A".format(self.identifier)) @@ -245,6 +246,7 @@ class L1cProduct(Product): else: if not reprocess: logger.info("{} already exists.".format(l2a_identifier)) + else: if l2a_prod.in_library: shutil.rmtree(str(l2a_prod.path)) @@ -274,6 +276,17 @@ class L1cProduct(Product): l2a_prod = L2aProduct(l2a_identifier) l2a_prod.set_permissions() l2a_prod.update_md(sen2cor_version = get_Sen2Cor_version(s2c_path)) + + if copy_l2a_sideproducts: + if l2a_prod.path.exists(): + cloudmask = NewCloudMaskProduct(l2a_identifier = l2a_identifier) + cloudmask.path.parent.mkdir(parents = True, exist_ok = True) + if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists(): + logger.info("Copying msk_cldprb_20m to cloumask folder") + shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent) + if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists(): + shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent) + logger.info("Copying scl_20m to cloumask folder") return self def process_ql( @@ -455,7 +468,10 @@ class L2aProduct(Product): _tiled_metadata = "MTD_MSIL2A.xml" def __init__( - self, identifier: str = None, tile: str = None, path: str = None + self, + identifier: str = None, + tile: str = None, + path: str = None ) -> None: super().__init__(identifier=identifier, tile=tile, path=path) if not re.match(r".*L2A_.*", identifier): @@ -636,6 +652,7 @@ class L2aProduct(Product): # out_path_mask_b11 = None ) -> "L2aProduct": """ """ + cm_version = cm_version.upper() if cm_version == "CM003": logger.info( "Computing cloudmask version {}, probability {}%, iteration(s) {}: {}".format( @@ -721,7 +738,7 @@ class L2aProduct(Product): "No cloudmask version cm001 found, please compute this one first" ) else: - logger.info("Skipping: no L2A") + logger.info("Skipping: no B11 from L2A") elif cm_version == "CM003": if cloudmask.msk_cldprb_20m: cloudmask.path.unlink(missing_ok = True) @@ -990,14 +1007,15 @@ class L2aProduct(Product): sen2cor_version=sen2cor_version, ) - def remove(self): + def remove( + self, + copy_l2a_sideproducts: bool = False, + ): + + if copy_l2a_sideproducts: + self.copy_l2a_sideproducts() + if self.path.exists(): - cloudmask = NewCloudMaskProduct(l2a_identifier = self.identifier) - if cloudmask.path.parent.exists(): - if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists(): - shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent) - if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists(): - shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent) if self.path.is_symlink(): l2a_path = os.readlink(str(self.path)) logger.info("Removing: {}".format(l2a_path)) @@ -1009,7 +1027,22 @@ class L2aProduct(Product): logger.info("Removing: {}".format(self.path)) else: logger.info("L2A product not on disk") - + + def copy_l2a_sideproducts(self): + if self.path.exists(): + cloudmask = NewCloudMaskProduct(l2a_identifier = self.identifier) + cloudmask.path.parent.mkdir(parents = True, exist_ok = True) + if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists(): + logger.info("{} - Copying msk_cldprb_20m to cloumask folder".format(self.identifier)) + shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent) + else: + logger.info("{} - msk_cldprb_20m already copied to cloumask folder".format(self.identifier)) + if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists(): + logger.info("{} - Copying scl_20m to cloumask folder".format(self.identifier)) + shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent) + else: + logger.info("{} - scl_20m already copied to cloumask folder".format(self.identifier)) + @property def sen2chain_version(self): return Sen2ChainMetadataParser( diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index 51361c8ffb57ebf3cc1ae4e70604c42933b1cecd..475942ea834a96523377fc3e549017992a28dc13 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -227,7 +227,7 @@ class CloudMaskList(ProductsList): """Class for managing mask product list""" @property - def cm001(self) -> "CloudMaskList": + def CM001(self) -> "CloudMaskList": filtered = CloudMaskList() for k, v in self._dict.items(): if "_CM001" in k: @@ -238,7 +238,7 @@ class CloudMaskList(ProductsList): return filtered @property - def cm002(self) -> "CloudMaskList": + def CM002(self) -> "CloudMaskList": filtered = CloudMaskList() for k, v in self._dict.items(): if "_CM002" in k: @@ -249,7 +249,7 @@ class CloudMaskList(ProductsList): return filtered @property - def cm003(self) -> "CloudMaskList": + def CM003(self) -> "CloudMaskList": filtered = CloudMaskList() for k, v in self._dict.items(): if "_CM003" in k: @@ -260,7 +260,7 @@ class CloudMaskList(ProductsList): return filtered @property - def cm004(self) -> "CloudMaskList": + def CM004(self) -> "CloudMaskList": filtered = CloudMaskList() for k, v in self._dict.items(): if "_CM004" in k: @@ -675,7 +675,7 @@ class Tile: return prod_list # def cloudmasks_missing(self, - # cm_version: str = "cm001", + # cm_version: str = "CM001", # probability: int = 1, # iterations: int = 5, # cld_shad: bool = True, @@ -687,7 +687,7 @@ class Tile: def cloudmasks_missing( self, - cm_version: str = "cm001", + cm_version: str = "CM001", probability: int = 1, iterations: int = 5, cld_shad: bool = True, @@ -769,7 +769,7 @@ class Tile: self, indice: str, nodata_clouds: bool = False, - cm_version: list = "cm001", + cm_version: list = "CM001", probability: int = 1, iterations: int = 5, cld_shad: bool = True, @@ -799,7 +799,14 @@ class Tile: getattr(getattr(self, indice.lower()), "masks"), cm_version, ) - .params(probability=probability, iterations=iterations) + .params( + probability = probability, + iterations = iterations, + cld_shad = cld_shad, + cld_med_prob = cld_med_prob, + cld_hi_prob = cld_hi_prob, + thin_cir = thin_cir, + ) .products } except: @@ -946,6 +953,7 @@ class Tile: cover_min: int = 0, cover_max: int = 100, nb_proc: int = 4, + copy_l2a_sideproducts: bool = False, ): """ Compute all missing l2a for l1c products between date_min and date_max @@ -979,9 +987,13 @@ class Tile: l1c_process_list = [] l1c_process_list.append( list( - p.identifier + [ + p.identifier, + copy_l2a_sideproducts, + ] for p in self.l2a_missings.filter_dates( - date_min=date_min, date_max=date_max + date_min=date_min, + date_max=date_max, ).filter_clouds( cover_min = cover_min, cover_max = cover_max @@ -999,10 +1011,10 @@ class Tile: l2a_res = False if l1c_process_list: l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=nb_proc) - + def compute_cloudmasks( self, - cm_version: str = "cm001", + cm_version: str = "CM001", probability: int = 1, iterations: int = 5, cld_shad: bool = True, @@ -1016,11 +1028,11 @@ class Tile: ): """Compute all (missing) cloud masks for l2a products. - :param cm_version: version of cloudmask to compute. Can be either cm001, - cm002, cm003, or cm004. - :param probability: only used by cm003: threshold probability of clouds + :param cm_version: version of cloudmask to compute. Can be either CM001, + CM002, CM003, or CM004. + :param probability: only used by CM003: threshold probability of clouds to be considered. - :param iterations: only used by cm003: number of iterations for + :param iterations: only used by CM003: number of iterations for dilatation process while computing cloudmask. :param reprocess: if False (default), only missing cloudmasks will be computed. if True already processed cloudmask will be computed again. @@ -1031,7 +1043,8 @@ class Tile: :param nb_proc: number of parallel process, limited to the number of proc of your PC (default 4). """ - + + cm_version = cm_version.upper() if not reprocess: cld_l2a_process_list = list( [ @@ -1074,7 +1087,7 @@ class Tile: ) if cld_l2a_process_list: logger.info( - "{} l2a products to process:".format(len(cld_l2a_process_list)) + "{} l2a product(s) to process".format(len(cld_l2a_process_list)) ) # logger.info("{}".format(cld_l2a_process_list)) cld_version_probability_iterations_reprocessing_multiprocessing( @@ -1090,7 +1103,7 @@ class Tile: reprocess: bool = False, nodata_clouds: bool = True, quicklook: bool = False, - cm_version: list = "cm001", + cm_version: list = "CM001", probability: int = 1, iterations: int = 5, cld_shad: bool = True, @@ -1107,6 +1120,8 @@ class Tile: - if indices not provided, will compute missing dates of already existing indices for this tile (no new indice computed) - indices won't be masked if no cloud masks are present, you have to compute cloudmasks first """ + + cm_version = cm_version.upper() if not indices: indices = list(self._paths["indices"].keys()) # else: @@ -1615,7 +1630,7 @@ class Tile: """ Rename old indices to match new cloudmask nomenclature """ - # Rename old indices to default cm_version cm001 + # Rename old indices to default cm_version CM001 logger.info("Moving and renaming old indices") for indice, path in self._paths["indices"].items(): @@ -1675,6 +1690,7 @@ class Tile: self, product_list: list = [], entire: bool = False, + copy_l2a_sideproducts: bool = False, ): """ Remove l2a files @@ -1683,5 +1699,26 @@ class Tile: product_list = [product.identifier for product in self.l2a] for identifier in product_list: l2a = L2aProduct(identifier) - l2a.remove() + l2a.remove(copy_l2a_sideproducts = copy_l2a_sideproducts) logger.info("Removed: {} products".format(len(product_list))) + + def copy_sp(self, l2a_product): + l2a = L2aProduct(l2a_product) + l2a.copy_l2a_sideproducts() + + def copy_l2a_sideproducts( + self, + product_list: list = [], + nb_proc: int = 4, + ): + if not product_list: + product_list = [product.identifier for product in self.l2a] + + nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) + pool = multiprocessing.Pool(nb_proc) + results = [pool.map(self.copy_sp, product_list)] + pool.close() + pool.join() + return True + +