diff --git a/sen2chain/data/job_ini.cfg b/sen2chain/data/job_ini.cfg index 0d011376302e6781aba09a4165838258766ee8ee..12df82d504587073abe7079f1909487c3c2a4f09 100644 --- a/sen2chain/data/job_ini.cfg +++ b/sen2chain/data/job_ini.cfg @@ -11,15 +11,15 @@ # tile: tile identifier, format ##XXX, comment line using ! before tile name # date_min the start date for this task, possible values: -# empty (2015-01-01 will be used) | any date | now-xx (xx nb of days before now to consider) +# empty (2015-01-01 will be used) | any date | today-xx (xx nb of days before today to consider) # date_max the last date for this task, possible values: -# empty (9999-12-31 will be used) | any date | now -# max_clouds: max cloud cover to consider for downloading images +# empty (9999-12-31 will be used) | any date | today +# max_clouds: max cloud cover to consider for downloading images, and computing l2a products # l1c: download l1c: True|False # l2a: compute l2a with sen2chain: True | False # cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0 # indices: False | All | NDVI/NDWIGAO/etc. -# remove: to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a +# remove: used to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a # comments: free user comments, ie tile name, etc. logs = False @@ -30,4 +30,4 @@ sleep = 0 nb_proc = 8 copy_l2a_sideproducts = False -tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;remove +tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;remove;comments diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index 05b69842ed180b4e3ae9333d9996ee30e7794818..6de767dc17dc5734e32f9b7cfa57a8e211807143 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -128,16 +128,25 @@ class Job: self._log_folder_path = Path(Config().get("log_path")) / ("job_" + jid) def __repr__(self): - return "logs: {}\ntiming: {}\ncron_status: {}\nprovider: {}\ndownload tries: {}x / sleep: {}min\nnb_proc: {}\ntasks:\n{}".format( - self.logs, - self.timing, - self.cron_status, - self.provider, - self.tries, - self.sleep, - self.nb_proc, - self.copy_l2a_sideproducts, - self.tasks, + return ( + "logs: {}" + "\ntiming: {}" + "\ncron_status: {}" + "\nprovider: {}" + "\ndownload tries: {} x / sleep: {} min" + "\nnb_proc: {}" + "\ncopy_l2a_sideproducts: {}" + "\n\ntasks:\n{}".format( + self.logs, + self.timing, + self.cron_status, + self.provider, + self.tries, + self.sleep, + self.nb_proc, + self.copy_l2a_sideproducts, + self.tasks, + ) ) def init(self): @@ -151,7 +160,7 @@ class Job: ("max_clouds", [100]), ("l1c", ["False"]), ("l2a", ["False"]), - ("cloudmask", ["False"]), + ("cloudmasks", ["False"]), ("indices", ["False"]), ("remove", ["False"]), ("comments", [""]), @@ -170,7 +179,7 @@ class Job: "max_clouds": [100], "l1c": ["False"], "l2a": ["False"], - "cloudmask": ["False"], + "cloudmasks": ["False"], "indices": ["False"], "remove": ["False"], "comments": [""], @@ -180,7 +189,7 @@ class Job: self.tasks = pd.concat([self.tasks, row], ignore_index=True)[ self.tasks.columns ] - self.format_indices() + self.format_cm_indices() logger.info("\n{}".format(self.tasks)) def task_edit(self, task_id: int = None, **kwargs): @@ -200,7 +209,7 @@ class Job: ) else: logger.info("{} not found".format(arg)) - self.format_indices() + self.format_cm_indices() logger.info("\n{}".format(self.tasks)) else: logger.info("Task_number not found") @@ -234,15 +243,15 @@ class Job: "#", "# tile: tile identifier, format ##XXX, comment line using ! before tile name", "# date_min the start date for this task, possible values:", - "# empty (2015-01-01 will be used) | any date | now-xx (xx nb of days before now to consider)", + "# empty (2015-01-01 will be used) | any date | today-xx (xx nb of days before today to consider)", "# date_max the last date for this task, possible values:", - "# empty (9999-12-31 will be used) | any date | now", + "# empty (9999-12-31 will be used) | any date | today", "# max_clouds: max cloud cover to consider for downloading images", "# l1c: download l1c: True|False", "# l2a: compute l2a with sen2chain: True | False", "# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0", "# indices: False | All | NDVI/NDWIGAO/etc.", - "# remove: to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a", + "# remove: used to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a", "# comments: free user comments, ie tile name, etc.", "", "", @@ -268,9 +277,9 @@ class Job: for line in header: ict.write(line) # self.tasks.to_csv(ict) - self.unformat_indices() + self.unformat_cm_indices() self.tasks.to_csv(ict, index=False, sep=";") - self.format_indices() + self.format_cm_indices() def get_cron_status(self): @@ -361,7 +370,11 @@ class Job: parser.read_string( "[top]\n" + stream.read(), ) # This line does the trick. - self.logs = bool(setuptools.distutils.util.strtobool(parser["top"]["logs"])) + self.logs = bool( + setuptools.distutils.util.strtobool( + parser["top"]["logs"] + ) + ) self.timing = parser["top"]["timing"] try: self.provider = parser["top"]["provider"] @@ -380,9 +393,14 @@ class Job: except: self.nb_proc = 8 try: - self.copy_l2a_sideproducts = bool(setuptools.distutils.util.strtobool(parser["top"]["nb_proc"])) + self.copy_l2a_sideproducts = bool( + setuptools.distutils.util.strtobool( + parser["top"]["copy_l2a_sideproducts"] + ) + ) except: - self.copy_l2a_sideproducts = False + self.copy_l2a_sideproducts = False + for i in range(1, 10): try: self.tasks = pd.read_csv( @@ -395,17 +413,19 @@ class Job: header = i, skip_blank_lines = True, ) - # logger.info(i) - # logger.info(self.tasks) if 'comments' not in self.tasks: self.tasks['comments'] = "" - self.format_indices() - # logger.info(i) + if 'cloudmask' in self.tasks: + self.tasks.rename( + columns={"cloudmask": "cloudmasks"}, + inplace=True, + ) + self.format_cm_indices() break except: pass - def format_indices(self): + def format_cm_indices(self): for index, row in self.tasks.iterrows(): if not row.indices == "False": if row.indices == "All": @@ -417,12 +437,19 @@ class Job: self.tasks.at[index, "indices"] = str(row.indices).split( "/" ) + if not row.cloudmasks == "False": + if not isinstance(self.tasks.at[index, "cloudmasks"] , list): + self.tasks.at[index, "cloudmasks"] = str(row.cloudmasks).split( + "/" + ) - def unformat_indices(self): + def unformat_cm_indices(self): for index, row in self.tasks.iterrows(): if not((row.indices == "False") or (row.indices == "All")): # logger.info(row.indices) self.tasks.at[index, "indices"] = '/'.join(self.tasks.at[index, "indices"]) + if not (row.cloudmasks == "False"): + self.tasks.at[index, "cloudmasks"] = '/'.join(self.tasks.at[index, "cloudmasks"]) def fill_dates_clouds( self, @@ -436,7 +463,7 @@ class Job: tasks.at[index, "date_min"] = datetime.datetime.strptime( "2015-01-01", "%Y-%m-%d" ).strftime("%Y-%m-%d") - elif "now" in row.date_min: + elif "today" in row.date_min: tasks.at[index, "date_min"] = ( datetime.datetime.now() - datetime.timedelta( days = int(row.date_min.split("-")[1]) @@ -446,7 +473,7 @@ class Job: tasks.at[index, "date_max"] = datetime.datetime.strptime( "9999-12-31", "%Y-%m-%d" ).strftime("%Y-%m-%d") - elif row.date_max == "now": + elif row.date_max == "today": tasks.at[index, "date_max"] = ( datetime.datetime.now() + datetime.timedelta(days=1) ).strftime("%Y-%m-%d") @@ -601,12 +628,13 @@ class Job: start = row.date_min, end = row.date_max, new = False, + max_cloudcover = row.max_clouds, ) download_list.extend(tile_download_list) logger.info("Checked tile: {} - {} new product(s) to download".format(t.name, len(tile_download_list))) if self.logs: if len(tile_download_list): - f.write("{}\n - {} - {} new product(s) to download\n".format( + f.write("{}\n{} - {} new product(s) to download\n".format( datetime.datetime.now(), t.name, len(tile_download_list)), @@ -670,10 +698,13 @@ class Job: if bool(setuptools.distutils.util.strtobool(str(row.l2a))): t = Tile(row.tile) l1c_to_process = list( - p.identifier + [ + p.identifier, + self.copy_l2a_sideproducts, + ] for p in t.l2a_missings.filter_dates( date_min=row.date_min, date_max=row.date_max - ) + ).filter_clouds(cover_max = row.max_clouds) ) l1c_process_list.append(l1c_to_process) logger.info( @@ -681,11 +712,18 @@ class Job: row.tile, len(l1c_to_process) ) ) - l1c_process_list = list( - set( - chain.from_iterable(l1c_process_list) - ) - ) + # l1c_process_list = list( + # set( + # chain.from_iterable(l1c_process_list) + # ) + # ) + + ### remove duplicate l2a + l1c_process_list.sort() + uniq = list(k for k,_ in groupby(l1c_process_list)) + l1c_process_list = uniq + + logger.info( "Process list ({} files): {}".format( len(l1c_process_list), l1c_process_list @@ -710,7 +748,7 @@ class Job: if "l1c" in str(row.remove).lower(): t = Tile(row.tile) prodlist = [p for p in l1c_process_list if row.tile in p] - t.remove_l2a(prodlist) + t.remove_l1c(prodlist) logger.info("Removing downloaded l1c products: {}".format(prodlist)) if self.logs: f.write("{}\nRemoving downloaded l1c products: {}\n\n".format(datetime.datetime.now(), prodlist)) @@ -725,50 +763,56 @@ class Job: cld_l2a_process_list = [] for index, row in tasks.iterrows(): # if not bool(distutils.util.strtobool(str(row.cloudmask))): - if not (row.cloudmask == "False" or not row.cloudmask): - cloudmask = self.get_cm_version(row.cloudmask) - t = Tile(row.tile) - l2a_to_process = [ - p.identifier - for p in t.cloudmasks_missing( - cm_version = cloudmask["cm_version"].upper(), - probability = cloudmask["probability"], - iterations = cloudmask["iterations"], - cld_shad = cloudmask["cld_shad"], - cld_med_prob = cloudmask["cld_med_prob"], - cld_hi_prob = cloudmask["cld_hi_prob"], - thin_cir = cloudmask["thin_cir"], - ).filter_dates( - date_min=row.date_min, date_max=row.date_max - ) - ] - for j in l2a_to_process: - l2a_cm_details = [ - j, - cloudmask["cm_version"].upper(), - cloudmask["probability"], - cloudmask["iterations"], - cloudmask["cld_shad"], - cloudmask["cld_med_prob"], - cloudmask["cld_hi_prob"], - cloudmask["thin_cir"], - reprocess, + if not (row.cloudmasks == "False" or not row.cloudmasks): + for cm in row.cloudmasks: + cloudmask = self.get_cm_version(cm) + t = Tile(row.tile) + l2a_to_process = [ + p.identifier + for p in t.cloudmasks_missing( + cm_version = cloudmask["cm_version"].upper(), + probability = cloudmask["probability"], + iterations = cloudmask["iterations"], + cld_shad = cloudmask["cld_shad"], + cld_med_prob = cloudmask["cld_med_prob"], + cld_hi_prob = cloudmask["cld_hi_prob"], + thin_cir = cloudmask["thin_cir"], + ).filter_dates( + date_min=row.date_min, date_max=row.date_max + ) ] - cld_l2a_process_list.append(l2a_cm_details) - logger.info( - "{} - {} l2a product(s) to process".format( - row.tile, len(l2a_to_process) - ) - ) - if self.logs: + for j in l2a_to_process: + l2a_cm_details = [ + j, + cloudmask["cm_version"].upper(), + cloudmask["probability"], + cloudmask["iterations"], + cloudmask["cld_shad"], + cloudmask["cld_med_prob"], + cloudmask["cld_hi_prob"], + cloudmask["thin_cir"], + reprocess, + ] + cld_l2a_process_list.append(l2a_cm_details) if len(l2a_to_process): - f.write( - "{} - {} l2a product(s) to process\n".format( - row.tile, len(l2a_to_process) + logger.info( + "{} - {} - {} l2a product(s) to process".format( + row.tile, + cm, + len(l2a_to_process), ) ) - f.flush() - # removing duplicate cloudmask computing + if self.logs: + if len(l2a_to_process): + f.write( + "{} - {} - {} l2a product(s) to process\n".format( + row.tile, + cm, + len(l2a_to_process), + ) + ) + f.flush() + # removing duplicate cloudmasks before computing cld_l2a_process_list.sort() uniq = list(k for k,_ in groupby(cld_l2a_process_list)) cld_l2a_process_list = uniq @@ -786,7 +830,6 @@ class Job: for cld in cld_l2a_process_list: f.write("{}\n".format(cld)) f.write("\n") - f.flush() cld_res = False @@ -805,66 +848,70 @@ class Job: indices_l2a_process_list = [] for index, row in tasks.iterrows(): if not (row.indices == "False" or not row.indices): - - nodata_clouds = not (row.cloudmask == "False" or not row.cloudmask) - cloudmask = self.get_cm_version(row.cloudmask) - t = Tile(row.tile) - # indices_list = row.indices.split("/") - for i in row.indices: - # l2a_list = [p.identifier for p in t.missing_indices(i).filter_dates(date_min = row.start_time, date_max = row.end_time)] - l2a_to_process = [ - p.identifier - for p in t.missing_indices( - indice=i, - nodata_clouds=nodata_clouds, - cm_version = cloudmask["cm_version"].upper(), - probability = cloudmask["probability"], - iterations = cloudmask["iterations"], - cld_shad = cloudmask["cld_shad"], - cld_med_prob = cloudmask["cld_med_prob"], - cld_hi_prob = cloudmask["cld_hi_prob"], - thin_cir = cloudmask["thin_cir"], - ).filter_dates( - date_min=row.date_min, date_max=row.date_max - ) - ] - for j in l2a_to_process: - l2a_ind_details = [ - j, - i, - reprocess, - nodata_clouds, - quicklook, - cloudmask["cm_version"].upper(), - cloudmask["probability"], - cloudmask["iterations"], - cloudmask["cld_shad"], - cloudmask["cld_med_prob"], - cloudmask["cld_hi_prob"], - cloudmask["thin_cir"], - ] - indices_l2a_process_list.append(l2a_ind_details) - logger.info( - "{}: adding - {}: {} l2a products".format( - row.tile, i, len(l2a_to_process) - ) - ) - if self.logs: - if len(l2a_to_process): - f.write( - "{} - {} {}\n".format( - row.tile, - len(l2a_to_process), + nodata_clouds = not (row.cloudmasks == "False" or not row.cloudmasks) + if nodata_clouds: + for cm in row.cloudmasks: + cloudmask = self.get_cm_version(cm) + t = Tile(row.tile) + for i in row.indices: + l2a_to_process = [ + p.identifier + for p in t.missing_indices( + indice=i, + nodata_clouds=nodata_clouds, + cm_version = cloudmask["cm_version"].upper(), + probability = cloudmask["probability"], + iterations = cloudmask["iterations"], + cld_shad = cloudmask["cld_shad"], + cld_med_prob = cloudmask["cld_med_prob"], + cld_hi_prob = cloudmask["cld_hi_prob"], + thin_cir = cloudmask["thin_cir"], + ).filter_dates( + date_min=row.date_min, date_max=row.date_max + ) + ] + for j in l2a_to_process: + l2a_ind_details = [ + j, i, + reprocess, + nodata_clouds, + quicklook, + cloudmask["cm_version"].upper(), + cloudmask["probability"], + cloudmask["iterations"], + cloudmask["cld_shad"], + cloudmask["cld_med_prob"], + cloudmask["cld_hi_prob"], + cloudmask["thin_cir"], + ] + indices_l2a_process_list.append(l2a_ind_details) + if len(l2a_to_process): + logger.info( + "{} - {} - {} - {} l2a product(s) to process".format( + row.tile, + cm, + i, + len(l2a_to_process) + ) ) - ) - f.flush() + if self.logs: + if len(l2a_to_process): + f.write( + "{} - {} - {} - {} l2a product(s) to process\n".format( + row.tile, + cm, + i, + len(l2a_to_process), + ) + ) + f.flush() # removing duplicate indice computing indices_l2a_process_list.sort() uniq = list(k for k,_ in groupby(indices_l2a_process_list)) indices_l2a_process_list = uniq - + logger.info( "Indice computing process list ({} l2a products): {}".format( diff --git a/sen2chain/multi_processing.py b/sen2chain/multi_processing.py index c98ddc4da67592fee60ab1e017230cf30a547374..43e42f0662b6a4504b01b6d13efb74850de60b45 100644 --- a/sen2chain/multi_processing.py +++ b/sen2chain/multi_processing.py @@ -29,7 +29,7 @@ def multi(product_copyl2asideproducts): "/usr/bin/python3", fwd + "/multiprocess_l2a.py", product, - copy_side_products, + copy_l2a_sideproducts, ] proc = subprocess.Popen(cmd) diff --git a/sen2chain/products.py b/sen2chain/products.py index beba99a17acb8a7ae756c32b5be482a5b11c0c93..22411e0967918f30569caac7b6e61ccc3412e076 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -1033,12 +1033,16 @@ class L2aProduct(Product): cloudmask = NewCloudMaskProduct(l2a_identifier = self.identifier) cloudmask.path.parent.mkdir(parents = True, exist_ok = True) if not (cloudmask.path.parent / Path(cloudmask.msk_cldprb_20m).name).exists(): - logger.info("Copying msk_cldprb_20m to cloumask folder") + logger.info("{} - Copying msk_cldprb_20m to cloumask folder".format(self.identifier)) shutil.copy2(cloudmask.msk_cldprb_20m, cloudmask.path.parent) + else: + logger.info("{} - msk_cldprb_20m already copied to cloumask folder".format(self.identifier)) if not (cloudmask.path.parent / Path(cloudmask.scl_20m).name).exists(): - logger.info("Copying scl_20m to cloumask folder") + logger.info("{} - Copying scl_20m to cloumask folder".format(self.identifier)) shutil.copy2(cloudmask.scl_20m, cloudmask.path.parent) - + else: + logger.info("{} - scl_20m already copied to cloumask folder".format(self.identifier)) + @property def sen2chain_version(self): return Sen2ChainMetadataParser( diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index a77f1f8d76843152d92e17eaa5e08c29923aee04..475942ea834a96523377fc3e549017992a28dc13 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -799,7 +799,14 @@ class Tile: getattr(getattr(self, indice.lower()), "masks"), cm_version, ) - .params(probability=probability, iterations=iterations) + .params( + probability = probability, + iterations = iterations, + cld_shad = cld_shad, + cld_med_prob = cld_med_prob, + cld_hi_prob = cld_hi_prob, + thin_cir = thin_cir, + ) .products } except: @@ -985,7 +992,8 @@ class Tile: copy_l2a_sideproducts, ] for p in self.l2a_missings.filter_dates( - date_min=date_min, date_max=date_max + date_min=date_min, + date_max=date_max, ).filter_clouds( cover_min = cover_min, cover_max = cover_max @@ -1694,13 +1702,23 @@ class Tile: l2a.remove(copy_l2a_sideproducts = copy_l2a_sideproducts) logger.info("Removed: {} products".format(len(product_list))) + def copy_sp(self, l2a_product): + l2a = L2aProduct(l2a_product) + l2a.copy_l2a_sideproducts() + def copy_l2a_sideproducts( self, product_list: list = [], + nb_proc: int = 4, ): if not product_list: product_list = [product.identifier for product in self.l2a] - for identifier in product_list: - l2a = L2aProduct(identifier) - l2a.copy_l2a_sideproducts() - + + nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) + pool = multiprocessing.Pool(nb_proc) + results = [pool.map(self.copy_sp, product_list)] + pool.close() + pool.join() + return True + +