diff --git a/requirements.txt b/requirements.txt index 6548cbcf39545d1c01f4755e781c58ebb8b7f99e..5d4c3e5f390c07e745cb358a39083c436e8e8058 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,4 @@ scipy pillow python-crontab packaging +eodag-sentinelsat diff --git a/sen2chain/config/eodag.yml b/sen2chain/config/eodag.yml deleted file mode 100644 index 46df0d4b852219543cfbf999ab5bd76ab6703edc..0000000000000000000000000000000000000000 --- a/sen2chain/config/eodag.yml +++ /dev/null @@ -1,98 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2021, CS GROUP - France, http://www.c-s.fr -# -# This file is part of EODAG project -# https://www.github.com/CS-SI/EODAG -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -peps: - priority: # Lower value means lower priority (Default: 1) - search: # Search parameters configuration - download: - extract: # whether to extract the downloaded products (true or false). - outputs_prefix: # where to store downloaded products. - dl_url_params: # additional parameters to pass over to the download url as an url parameter - auth: - credentials: - username: - password: -theia: - priority: # Lower value means lower priority (Default: 0) - search: # Search parameters configuration - download: - extract: - outputs_prefix: - dl_url_params: - auth: - credentials: - ident: - pass: -usgs: - priority: # Lower value means lower priority (Default: 0) - api: - extract: - outputs_prefix: - dl_url_params: - product_location_scheme: - credentials: - username: - password: -aws_eos: - priority: # Lower value means lower priority (Default: 0) - search: # Search parameters configuration - auth: - credentials: - apikey: - aws_access_key_id: - aws_secret_access_key: - download: - outputs_prefix: -sobloo: - priority: # Lower value means lower priority (Default: 0) - search: # Search parameters configuration - download: - extract: - outputs_prefix: - dl_url_params: - auth: - credentials: - apikey: -creodias: - priority: # Lower value means lower priority (Default: 0) - search: # Search parameters configuration - download: - extract: - outputs_prefix: - auth: - credentials: - username: - password: -mundi: - priority: # Lower value means lower priority (Default: 0) - search: # Search parameters configuration - download: - extract: - outputs_prefix: - auth: - credentials: - apikey: -onda: - priority: # Lower value means lower priority (Default: 0) - search: # Search parameters configuration - download: - extract: - outputs_prefix: - auth: - credentials: - username: - password: diff --git a/sen2chain/data/job_ini.cfg b/sen2chain/data/job_ini.cfg index 6ef215a872d5e022c78b317568ac22a4521481e2..41340bac6a333bddfffd1afa877d44dec681a1e0 100644 --- a/sen2chain/data/job_ini.cfg +++ b/sen2chain/data/job_ini.cfg @@ -1,12 +1,13 @@ ### Parameters # logs: True | False # timing: in cron format -# tile: tile identifier, format ##XXX +# tile: tile identifier, format ##XXX, comment line using ! before tile name # l1c: download l1c: True|False # l2a: compute l2a with sen2chain: True | False # cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0 # indices: False | All | NDVI/NDWIGAO/etc. -# remove: False | l1c/l2a +# remove: to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a +# comments: free user comments, ie tile name, etc. logs = False timing = 0 0 * * * diff --git a/sen2chain/download_eodag.py b/sen2chain/download_eodag.py index a9326e53421afcae0f5ce6801c96f2bd332ca11e..15699ad9c9436fbebe58cfd7bf4ae03590bf6afe 100644 --- a/sen2chain/download_eodag.py +++ b/sen2chain/download_eodag.py @@ -10,6 +10,7 @@ https://www.github.com/CS-SI/EODAG import logging import shapefile import os +import shutil from pathlib import Path from eodag import EODataAccessGateway from eodag import setup_logging @@ -100,27 +101,44 @@ class S2cEodag: fitered = self.products[:] for p in fitered: + if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): + l1c_presence = "PRESENT" + else: + l1c_presence = "ABSENT" + if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): + l2a_presence = "PRESENT" + else: + l2a_presence = "ABSENT" + + ##### remonter la condition l1c + if self.name not in p.properties["title"]: self.products.remove(p) - logger.info("{} - wrong Tile - filtering".format(p.properties["title"])) + logger.info("{} - local l1c {} - filtering (wrong Tile)".format(p.properties["title"], l1c_presence)) else: - if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover): + # logger.info(p.properties["cloudCover"]) + if not(min_cloudcover <= float(p.properties["cloudCover"] or 0) <= max_cloudcover): self.products.remove(p) - logger.info("{} - wrong cloud cover ({}%) - filtering".format(p.properties["title"], int(p.properties["cloudCover"]))) + logger.info("{} - local l1c {} - filtering (CC = {}%)".format(p.properties["title"], l1c_presence, int(p.properties["cloudCover"]))) else: if ref == "l1c": if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): - # p.location = "file://" + str(outputs_prefix / (p.properties["title"] + ".SAFE")) - logger.info("{} - remote {} - local l1c PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("{} - local l1c {} - filtering".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) self.products.remove(p) else: - logger.info("{} - remote {} - local l1c ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("{} - local l1c {} - remote {}".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) elif ref == "l2a": if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): - logger.info("{} - remote {} - local l2a PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("{} - local l1c {} - local l2a {} - filtering".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"])) self.products.remove(p) else: - logger.info("{} - remote {} - local l2a ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): + logger.info("{} - local l1c {} - filtering".format(p.properties["title"], l1c_presence, p.properties["storageStatus"])) + self.products.remove(p) + else: + logger.info("{} - local l1c {} - local l2a {} - remote {}".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"])) + + @@ -173,15 +191,20 @@ class S2cEodag: timeout=0, ) # logging.disable(logging.NOTSET) - if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name Path(downloaded_path).replace(upper_location) product_id.location = "file://" + str(upper_location) Path(downloaded_path).parent.rmdir() logger.info("Moving up SAVE file") + # modification de l'arborescence si il y a un sous dossier en trop (provider onda par ex) + if os.listdir(downloaded_path)[0] == str(Path(downloaded_path).name): + shutil.copytree(downloaded_path + '/' + os.listdir(downloaded_path)[0],downloaded_path,dirs_exist_ok=True) + logger.info("Moving up files") + shutil.rmtree(downloaded_path + '/' + os.listdir(downloaded_path)[0]) + logger.info("Deleting old folder") except: - logger.info("Error: ONLINE product but cannot be downloaded, retry later") + logger.info("Error: ONLINE product but cannot be downloaded, check your credentials or retry later") elif product_id.properties["storageStatus"] == "OFFLINE": try: @@ -198,8 +221,8 @@ class S2cEodag: logger.info("remote OFFLINE, ordered") elif product_id.properties["storageStatus"] == "STAGING": - logger.info("remote STAGING, retry later") - + logger.info("remote STAGING, retry later") + def order_offline( self, product_id, diff --git a/sen2chain/indices.py b/sen2chain/indices.py index 4a17247a2b1bf3b36ff20018ceab91041feb0f0d..7a967457b1ab780a13fa5d61a2cdbb492466a24c 100644 --- a/sen2chain/indices.py +++ b/sen2chain/indices.py @@ -24,6 +24,7 @@ from .indices_functions import ( create_raw_bigr, create_raw_birnir, create_raw_bibg, + create_raw_evi, create_masked_indice, index_tiff_2_jp2, ) @@ -68,7 +69,8 @@ class Indice(metaclass=ABCMeta): class Ndvi(Indice): """ - NDVI = (NIR-VIR) / (NIR+VIR) + $$NDVI = {{NIR-VIR}\over{NIR+VIR}}$$ + NIR: band 08 (10m) VIR: band 04 (10m) """ @@ -183,7 +185,7 @@ class Ndvi(Indice): class NdwiMcf(Indice): """ - NDWI(McFeeters) = (GREEN-NIR) / (GREEN+NIR) + $$NDWI(McFeeters) = {{GREEN-NIR}\over{GREEN+NIR}}$$ GREEN: band 03 NIR: band 08 @@ -295,10 +297,12 @@ class NdwiMcf(Indice): class NdwiGao(Indice): """ - NDWI(Gao) = (NIR-SWIR) / (NIR+SWIR) + $$NDWI(Gao) = {{NIR-SWIR}\over{NIR+SWIR}}$$ NIR: band 08 SWIR: band 11 + + Also called NDMI """ name = "NDWIGAO" @@ -403,7 +407,7 @@ class NdwiGao(Indice): class Mndwi(Indice): """ - MNDWI = (GREEN-SWIR) / (GREEN+SWIR) + $$MNDWI = {{GREEN-SWIR}\over{GREEN+SWIR}}$$ GREEN: band 03 SWIR: band 11 @@ -513,10 +517,10 @@ class Mndwi(Indice): class Ndre(Indice): """ - NDRE = (NIR - REDEDGE) / (NIR + REDEDGE) + $$NDRE = {{NIR-RedEdge}\over{NIR+RedEdge}}$$ NIR: band 08 - REDEDGE: band 05 + RedEdge: band 05 """ name = "NDRE" @@ -621,7 +625,8 @@ class Ndre(Indice): class IRECI(Indice): """ - IRECI = (NIR-R)/(RE1/RE2) + $$IRECI = {{NIR-R}\over{RE1/RE2}}$$ + NIR: band 783nm (B7 - 20m) R: band 665nm (B4 - 10m) RE1: band 705nm (B5 - 20m) @@ -732,7 +737,7 @@ class IRECI(Indice): class BIGR(Indice): """ - Brightness Index Green Red = ( (GREEN² + RED²)/2 ) ^ 0.5 + $$Brightness\,Index\,Green\,Red = \sqrt{ {GREEN^{2}+ RED^{2}}\over{2}}$$ GREEN: band 03 RED: band 04 @@ -843,7 +848,7 @@ class BIGR(Indice): class BIRNIR(Indice): """ - Brightness Index Red Near InfraRed = ( (RED² + NIR²)/2 ) ^ 0.5 + $$Brightness\,Index\,Red\,Near\,InfraRed = \sqrt{ {RED^{2} + NIR^{2}}\over{2} }$$ NIR: band 08 RED: band 04 @@ -954,7 +959,7 @@ class BIRNIR(Indice): class BIBG(Indice): """ - Brightness Index Blue Green = ( (BLUE² + GREEN²)/2 ) ^ 0.5 + $$Brightness\,Index\,Blue\,Green = \sqrt{{BLUE^{2} + GREEN^{2}}\over{2}}$$ BLUE: band 02 GREEN: band 03 @@ -1057,6 +1062,222 @@ class BIBG(Indice): stretch=(0, 2500), ) +class EVI(Indice): + """ + $$Enhanced\,Vegetation\,Index = {2.5 * {( NIR - RED )}\over{( NIR + 6 * RED - 7.5 * BLUE ) + 1}}$$ + + BLUE: band 02 + RED: band 04 + NIR: band 08 + """ + + name = "EVI" + filename_template = "{product_identifier}_EVI{ext}" + ext = ".jp2" + ext_raw = ".tif" + colormap = cm.bone # à changer selon la bonne couleur + + def __init__(self, l2a_product_object, cm_product_object): + if (l2a_product_object or cm_product_object) is None: + raise ValueError( + "A L2aProduct and NewCloudMask objects must be provided" + ) + else: + self.l2a_product = l2a_product_object + self.cm_product = cm_product_object + + # output path + self.out_path = None + + # filenames + self.indice_stem = self.filename_template.format( + product_identifier=self.l2a_product.identifier, ext="" + ) + self.indice_filename = self.indice_stem + self.ext + self.indice_raw = self.indice_stem + self.ext_raw + + def process_indice( + self, + out_path: pathlib.PosixPath, + reprocess: bool = False, + nodata_clouds: bool = False, + quicklook: bool = False, + ) -> None: + """process""" + self.out_path = out_path + + if (out_path / self.indice_filename).exists() and not reprocess: + logger.info("{} already exists".format(self.indice_filename)) + else: + create_raw_evi( + blue_path=self.l2a_product.b02_10m, + red_path=self.l2a_product.b04_10m, + nir_path=self.l2a_product.b08_10m, + out_path=(out_path / self.indice_raw), + ) + index_tiff_2_jp2( + img_path=(out_path / self.indice_raw), + out_path=(out_path / self.indice_filename), + ) + if nodata_clouds: + if not self.cm_product.path.exists(): + logger.info("Cloudmask does not exist, indice not masked") + raise ValueError("Cloud mask does not exist") + masked_indice_filename = ( + self.indice_stem + "_" + self.cm_product.suffix + self.ext + ) + masked_indice_raw = ( + self.indice_stem + "_" + self.cm_product.suffix + self.ext_raw + ) + if (out_path / masked_indice_filename).exists() and not reprocess: + logger.info("{} already exists".format(masked_indice_filename)) + else: + if (out_path / self.indice_raw).exists(): + evi_name = out_path / self.indice_raw + else: + evi_name = out_path / self.indice_filename + create_masked_indice( + indice_path=evi_name, + cloud_mask_path=self.cm_product.path, + out_path=(out_path / masked_indice_raw), + ) + index_tiff_2_jp2( + img_path=(out_path / masked_indice_raw), + out_path=(out_path / masked_indice_filename), + ) + os.remove(str(out_path / masked_indice_raw)) + try: + os.remove(str(out_path / self.indice_raw)) + logger.info("Removing {}".format(self.indice_raw)) + except: + pass + if quicklook: + cmap = matplotlib_colormap_to_rgb(self.colormap, revers=False) + quicklook_filename = ( + self.indice_stem + "_" + self.cm_product.suffix + "_QL.tif" + ) + if (self.out_path / quicklook_filename).exists() and not reprocess: + logger.info("{} already exists".format(quicklook_filename)) + else: + logger.info("creating quicklook") + create_rvb( + raster=(self.out_path / self.indice_filename), + cloud_mask=self.cm_product.path, + lut_dict=cmap, + clouds_color="white", + out_path=(self.out_path / quicklook_filename), + stretch=(0, 2500), + ) + + +class NBR(Indice): + """ + $$Normalized\,Burn\,Ratio = {{NIR-SWIR}\over{NIR+SWIR}}$$ + + NIR: band 08 + SWIR: band 12 + + """ + + name = "NBR" + filename_template = "{product_identifier}NBR{ext}" + ext = ".jp2" + ext_raw = ".tif" + colormap = cm.RdYlBu + + def __init__(self, l2a_product_object, cm_product_object): + if (l2a_product_object or cm_product_object) is None: + raise ValueError( + "A L2aProduct and NewCloudMask objects must be provided" + ) + else: + self.l2a_product = l2a_product_object + self.cm_product = cm_product_object + + self.out_path = None + + self.indice_stem = self.filename_template.format( + product_identifier=self.l2a_product.identifier, ext="" + ) + self.indice_filename = self.indice_stem + self.ext + self.indice_raw = self.indice_stem + self.ext_raw + + def process_indice( + self, + out_path: pathlib.PosixPath, + reprocess: bool = False, + nodata_clouds: bool = False, + quicklook: bool = False, + ) -> None: + """process""" + self.out_path = out_path + + if (out_path / self.indice_filename).exists() and not reprocess: + logger.info("{} already exists".format(self.indice_filename)) + else: + create_raw_ndr( + b1_path=self.l2a_product.b08_10m, + b2_path=self.l2a_product.b12_20m, + out_path=(out_path / self.indice_raw), + ) + index_tiff_2_jp2( + img_path=(out_path / self.indice_raw), + out_path=(out_path / self.indice_filename), + quality=20, + ) + + if nodata_clouds: + if not self.cm_product.path.exists(): + logger.info("Cloudmask does not exist, indice not masked") + raise ValueError("Cloud mask does not exist") + masked_indice_filename = ( + self.indice_stem + "_" + self.cm_product.suffix + self.ext + ) + masked_indice_raw = ( + self.indice_stem + "_" + self.cm_product.suffix + self.ext_raw + ) + + if (out_path / masked_indice_filename).exists() and not reprocess: + logger.info("{} already exists".format(masked_indice_filename)) + else: + if (out_path / self.indice_raw).exists(): + nbr_name = out_path / self.indice_raw + else: + nbr_name = out_path / self.indice_filename + create_masked_indice( + indice_path=nbr_name, + cloud_mask_path=self.cm_product.path, + out_path=(out_path / masked_indice_raw), + ) + index_tiff_2_jp2( + img_path=(out_path / masked_indice_raw), + out_path=(out_path / masked_indice_filename), + quality=20, + ) + os.remove(str(out_path / masked_indice_raw)) + + try: + os.remove(str(out_path / self.indice_raw)) + logger.info("Removing {}".format(self.indice_raw)) + except: + pass + + if quicklook: + cmap = matplotlib_colormap_to_rgb(self.colormap, revers=False) + + quicklook_filename = self.indice_stem + "_QUICKLOOK.tif" + if (self.out_path / quicklook_filename).exists() and not reprocess: + logger.info("{} already exists".format(quicklook_filename)) + else: + logger.info("creating quicklook") + create_rvb( + raster=(self.out_path / self.indice_filename), + cloud_mask=self.cm_product.path, + lut_dict=cmap, + clouds_color="white", + out_path=(self.out_path / quicklook_filename), + ) + class IndicesCollectionMeta(type): """Adds special methods to IndicesCollection class.""" diff --git a/sen2chain/indices_functions.py b/sen2chain/indices_functions.py index ea9919a01f2b03b9aba21099dd8a436935558745..6833ce76327ab16f082cd49210c223ea785554ea 100644 --- a/sen2chain/indices_functions.py +++ b/sen2chain/indices_functions.py @@ -549,6 +549,46 @@ def create_raw_bi( dst.write(bigr_masked, 1) return Path(str(out_path)).absolute +def create_raw_evi( + blue_path: Union[str, pathlib.PosixPath], + red_path: Union[str, pathlib.PosixPath], + nir_path: Union[str, pathlib.PosixPath], + out_path: Union[str, pathlib.PosixPath] = "./raw_evi.tif", +) -> pathlib.PosixPath: + """ + Creates en EVI raster from BLUE, RED, NIR rasters. + + :param blue_path: path to the BLUE raster + :param red_path: path to the RED raster. + :param nir_path: path to the NIR raster. + :param out_path: path to the output raster. + """ + logger.info("creating raw EVI (tiff - int16)") + + with rasterio.open(str(red_path)) as red_src, rasterio.open(str(blue_path)) as blue_src, rasterio.open(str(nir_path)) as nir_src: + red_profile = red_src.profile + red = red_src.read(1).astype(np.float32) + blue = blue_src.read(1).astype(np.float32) + nir = nir_src.read(1).astype(np.float32) + np.seterr( + divide="ignore", invalid="ignore" + ) # ignore warnings when dividing by zero + evi = 2.5 * ((nir/10000 - red/10000)) / ((nir/10000 + 6.0 * red/10000- 7.5 * blue/10000)+ 1).astype(np.int32) + evi_masked = np.where(red != 0, evi, 32767) + + red_profile.update( + driver="Gtiff", + compress="DEFLATE", + tiled=False, + dtype=np.int16, + nodata=32767, + transform=red_src.transform, + ) + red_profile.pop("tiled", None) + with rasterio.Env(GDAL_CACHEMAX=512) as env: + with rasterio.open(str(out_path), "w", **red_profile) as dst: + dst.write(evi_masked, 1) + return Path(str(out_path)).absolute def create_masked_indice( indice_path: Union[str, pathlib.PosixPath], diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index fb8d425ab34b6d263a4467ff2a14e88b8cbe4596..1fd9013d7c13101fcab415180495b7892eac2f32 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -24,6 +24,7 @@ from .utils import datetime_to_str from .multi_processing import ( l2a_multiprocessing, idx_multiprocessing, + cld_version_probability_iterations_reprocessing_multiprocessing, ) from .download_eodag import ( S2cEodag_download, @@ -36,7 +37,7 @@ logger = logging.getLogger(__name__) class Jobs: """ - Class to manage created jobs + Class to manage jobs routines """ def __init__(self): @@ -105,7 +106,16 @@ class Jobs: class Job: """ - Class to manage job + Class to create, edit or delete a job routine. A job consist of a group of tasks. A task is a succession of sen2chain main Tile processings : + - Download L1C + - Process L2A + - Process cloudmask (only 1 per task) + - Process Indices (Any, using previously produced cloudmask) + Both L1C and L2A can be removed once the task is performed. + + Job editing can be done manually in sen2chain_data/config/jobs/yourjobname.cfg + + A job can be launched immediately or scheduled with the cron command line. """ # logger.propagate = False @@ -136,16 +146,23 @@ class Job: ("date_min", [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')]), ("date_max", [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')]), ("max_clouds", [100]), - ("l1c", [False]), - ("l2a", [False]), - ("cloudmask", [False]), - ("indices", [False]), - ("remove", [False]), + ("l1c", ["False"]), + ("l2a", ["False"]), + ("cloudmask", ["False"]), + ("indices", ["False"]), + ("remove", ["False"]), + ("comments", [""]), ] ) self.tasks = pd.DataFrame(first_row) def task_add(self, row: dict = None): + """ + Add a task to the job. If row : dict = None, default settings are used + + :param row: Dictionnary of task attributes. + :type row: dict + """ if not row: logger.info("No row provided, using default") row = pd.DataFrame( @@ -154,11 +171,12 @@ class Job: "date_min": [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')], "date_max": [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')], "max_clouds": [100], - "l1c": [False], - "l2a": [False], - "cloudmask": [False], - "indices": [False], - "remove": [False], + "l1c": ["False"], + "l2a": ["False"], + "cloudmask": ["False"], + "indices": ["False"], + "remove": ["False"], + "comments": [""], } ) # self.tasks = self.tasks.append(row, ignore_index=True) @@ -169,6 +187,12 @@ class Job: logger.info("\n{}".format(self.tasks)) def task_edit(self, task_id: int = None, **kwargs): + """ + Edit existing task from job routine. Any task attribute can be provided. + + :param task_id: Task number ID. Default to none + :type task_id: int + """ if task_id is None: logger.info( "Please provide task_number to edit, if no task in job, create_task first" @@ -185,11 +209,18 @@ class Job: ) else: logger.info("{} not found".format(arg)) + self.clean_fields() logger.info("\n{}".format(self.tasks)) else: logger.info("Task_number not found") def task_remove(self, task_id: int = None): + """ + Remove task from job. + + :param task_id: Tas number ID + :type task_id: int + """ if task_id is None: logger.info("Please provide task_number to remove") else: @@ -201,21 +232,49 @@ class Job: logger.info("Task_number not found") def save(self): + """ + Save all task edits to job. Job file located in sen2chain_data/config/jobs/yourjob.cfg + """ # save task to disk with open(str(self._config_path), "w") as ict: + comments_header ="\n".join( + [ + "### Parameters", + "# logs: True | False", + "# timing: in cron format", + "# tile: tile identifier, format ##XXX, comment line using ! before tile name" + "# l1c: download l1c: True|False", + "# l2a: compute l2a with sen2chain: True | False", + "# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0", + "# indices: False | All | NDVI/NDWIGAO/etc.", + "# remove: to remove downloaded L1C and/or produced L2A, possible values: False | l1c | l2a | l1c/l2a", + "# comments: free user comments, ie tile name, etc.", + "", + "", + ] + ) header = "\n".join( [ "logs = " + str(self.logs), "timing = " + self.timing, "", + "", ] ) + + for line in comments_header: + ict.write(line) + for line in header: ict.write(line) # self.tasks.to_csv(ict) + self.unclean_fields() self.tasks.to_csv(ict, index=False, sep=";") def get_cron_status(self): + """ + Return cron status of selected job : enabled, disabled or absent from cron. + """ iter = list(self._cron.find_comment("sen2chain_job_" + self.jid)) if iter: for job in iter: @@ -250,6 +309,9 @@ class Job: def cron_enable( self, ): + """ + Enable job in cron. + """ # enable job in cron self.save() self.create_python_script() @@ -278,6 +340,9 @@ class Job: self.get_cron_status() def cron_disable(self): + """ + Disable job from cron. + """ # disable / commenting job in cron iter = list(self._cron.find_comment("sen2chain_job_" + self.jid)) if iter: @@ -288,6 +353,9 @@ class Job: self.get_cron_status() def cron_remove(self): + """ + Remove job from cron. + """ # remove job from cron iter = list(self._cron.find_comment("sen2chain_job_" + self.jid)) if iter: @@ -298,10 +366,10 @@ class Job: self.get_cron_status() def read(self, path): - parser = ConfigParser(allow_no_value=True) + parser = ConfigParser(allow_no_value=True, strict = False) with open(str(path)) as stream: parser.read_string( - "[top]\n" + stream.read() + "[top]\n" + stream.read(), ) # This line does the trick. self.logs = bool(setuptools.distutils.util.strtobool(parser["top"]["logs"])) self.timing = parser["top"]["timing"] @@ -315,9 +383,16 @@ class Job: dtype=str, header=2, ) + # if "logs" not in self.tasks: # self.tasks["logs"] = False - + + ##### prévoir ici un check pour les champs nouveaux ou manquants ! + if 'comments' not in self.tasks: + self.tasks['comments'] = "" + self.clean_fields() + + def clean_fields(self): for index, row in self.tasks.iterrows(): if not row.date_min: # self.tasks.at[index, "start_time"] = (datetime.datetime.now()-datetime.timedelta(days=delta_t)).strftime('%Y-%m-%d') @@ -336,14 +411,17 @@ class Job: index, "indices" ] = IndicesCollection.available_indices else: - self.tasks.at[index, "indices"] = str(row.indices).split( - "/" - ) - if not row.cloudmask == "False": - self.tasks.at[index, "cloudmask"] = self.get_cm_version( - row.cloudmask - ) + if not isinstance(self.tasks.at[index, "indices"] , list): + self.tasks.at[index, "indices"] = str(row.indices).split( + "/" + ) + def unclean_fields(self): + for index, row in self.tasks.iterrows(): + if not((row.indices == "False") or (row.indices == "All")): + logger.info(row.indices) + self.tasks.at[index, "indices"] = '/'.join(self.tasks.at[index, "indices"]) + @staticmethod def get_cm_version(identifier) -> str: """Returns cloudmask version from a cloud mask identifier string. @@ -351,7 +429,7 @@ class Job: """ try: pat = re.compile(r"(?P<cm_version>CM00[1-2])") - return pat.match(identifier).groupdict() + # return pat.match(identifier).groupdict() except: pass try: @@ -360,7 +438,7 @@ class Job: + "-PRB(?P<probability>.*)" + "-ITER(?P<iterations>.*)" ) - return pat.match(identifier).groupdict() + # return pat.match(identifier).groupdict() except: pass try: @@ -372,9 +450,37 @@ class Job: + "-TCI(?P<thin_cir>.*)" + "-ITER(?P<iterations>.*)" ) - return pat.match(identifier).groupdict() + # return pat.match(identifier).groupdict() except: pass + try: + returned_val = pat.match(identifier).groupdict() + if "probability" not in returned_val: + returned_val["probability"] = 1 + if "iterations" not in returned_val: + returned_val["iterations"] = 5 + if "cld_shad" not in returned_val: + returned_val["cld_shad"] = True + if "cld_med_prob" not in returned_val: + returned_val["cld_med_prob"] = True + if "cld_hi_prob" not in returned_val: + returned_val["cld_hi_prob"] = True + if "thin_cir" not in returned_val: + returned_val["thin_cir"] = True + except: + returned_val = { + "cm_version": "cm001", + "probability": 1, + "iterations": 5, + "cld_shad": True, + "cld_med_prob": True, + "cld_hi_prob": True, + "thin_cir": True, + } + + + return returned_val + def run( self, @@ -382,7 +488,10 @@ class Job: clean_before: bool = False, clean_after: bool = False, ): - + """ + Run job. Tasks are executed on + """ + if self.logs: self._log_folder_path.mkdir(exist_ok=True) self._log_file = self._log_folder_path / ( @@ -393,16 +502,23 @@ class Job: + ".log" ) f = open(str(self._log_file), "w") - f.write("Debut : {}\n\n".format(datetime.datetime.now())) - f.write(repr(self) + "\n") + f.write("Start : {}\n\n".format(datetime.datetime.now())) + f.write(repr(self) + "\n\n") f.flush() - - if not self.tasks.empty: + + tasks = self.tasks + tasks = tasks[tasks["tile"].str.contains("!") == False] + + if not tasks.empty: ## Cleaning before if clean_before: logger.info("Cleaning Tiles") + if self.logs: + f.write("Cleaning Tiles\n") + f.flush() + clean_list = [] - for index, row in self.tasks.iterrows(): + for index, row in tasks.iterrows(): clean_list.append(row.tile) lib = Library() pb_before = lib.clean( @@ -413,7 +529,7 @@ class Job: ## L1C download - each tile sequential # download_list= [] # logger.info("Downloading l1c seq") - # for index, row in self.tasks.iterrows(): + # for index, row in tasks.iterrows(): # if bool(setuptools.distutils.util.strtobool(str(row.l1c))): # t = Tile(row.tile) # logger.info("Tile: {}".format(t.name)) @@ -431,11 +547,19 @@ class Job: ## L1C download - all tiles multi download_list= [] - logger.info("Searching l1c products") - for index, row in self.tasks.iterrows(): + logger.info("Downloading l1c products") + if self.logs: + f.write("Downloading l1c products\n") + f.flush() + + for index, row in tasks.iterrows(): if bool(setuptools.distutils.util.strtobool(str(row.l1c))): t = Tile(row.tile) - logger.info("Tile: {}".format(t.name)) + logger.info("Checking tile: {}".format(t.name)) + if self.logs: + f.write("Checking tile: {}\n".format(t.name)) + f.flush() + tile_download_list = t.get_l1c( provider = "peps", download = False, @@ -452,12 +576,25 @@ class Job: # S2cEodag_download(download_list, "multip") after_list = [p.location for p in download_list] downloaded_products = [Path(after_list[i]).stem for i in range(len(before_list)) if before_list[i] != after_list[i]] - logger.info("Downloaded product(s): {}".format(downloaded_products)) + logger.info("{} product(s) downloaded: {}\n\n".format(len(downloaded_products), downloaded_products)) + if self.logs: + f.write("{} product(s) downloaded: \n".format(len(downloaded_products))) + if downloaded_products: + for pro in downloaded_products: + f.write("{}\n\n".format(pro)) + else: + f.write("\n".format(cld)) + f.flush() + # Traitement des L1C en L2A logger.info("Computing l2a") + if self.logs: + f.write("Computing l2a\n") + f.flush() + l1c_process_list = [] - for index, row in self.tasks.iterrows(): + for index, row in tasks.iterrows(): if bool(setuptools.distutils.util.strtobool(str(row.l2a))): t = Tile(row.tile) l1c_to_process = list( @@ -468,43 +605,52 @@ class Job: ) l1c_process_list.append(l1c_to_process) logger.info( - "ajout {}: {} l1c files".format( + "ajout {}: {} l1c files\n".format( row.tile, len(l1c_to_process) ) ) l1c_process_list = list(chain.from_iterable(l1c_process_list)) logger.info( - "l1c Sen2Cor process list ({} files): {}".format( + "Process list ({} files): {}".format( len(l1c_process_list), l1c_process_list ) ) + if self.logs: + f.write( + "Process list ({} files): {}\n\n".format( + len(l1c_process_list), l1c_process_list + ) + ) + f.flush() + l2a_res = False if l1c_process_list: l2a_res = l2a_multiprocessing( l1c_process_list, nb_proc=nb_proc ) - # logger.info("je multiprocess les l1c en l2a") - if self.logs: - f.write("\nTraitement des l1c : {}\n".format(l2a_res)) - f.write( - "l1c_process_list: \n" - + "\n".join(l1c_process_list) - + "\n" - ) # Remove downloaded L1C - for index, row in self.tasks.iterrows(): + for index, row in tasks.iterrows(): if "l1c" in str(row.remove).lower(): t = Tile(row.tile) - t.remove_l1c([p for p in downloaded_products if row.tile in p]) - - # Traitement des L2A (clouds) + prodlist = [p for p in l1c_process_list if row.tile in p] + t.remove_l2a(prodlist) + logger.info("Removing downloaded l1c products: {}".format(prodlist)) + if self.logs: + f.write("Removing downloaded l1c products: {}\n\n".format(prodlist)) + f.flush() + + # Comuting cloudmasks (from L2A) logger.info("Computing cloudmasks") + if self.logs: + f.write("Computing cloudmasks\n") + f.flush() reprocess = False cld_l2a_process_list = [] - for index, row in self.tasks.iterrows(): + for index, row in tasks.iterrows(): # if not bool(distutils.util.strtobool(str(row.cloudmask))): if not (row.cloudmask == "False" or not row.cloudmask): + cloudmask = self.get_cm_version(row.cloudmask) t = Tile(row.tile) l2a_to_process = [ p.identifier @@ -515,45 +661,62 @@ class Job: for j in l2a_to_process: l2a_cm_details = [ j, - row.cloudmask["cm_version"], - row.cloudmask["probability"], - row.cloudmask["iterations"], - row.cloudmask["cld_shad"], - row.cloudmask["cld_med_prob"], - row.cloudmask["cld_hi_prob"], - row.cloudmask["thin_cir"], + cloudmask["cm_version"].lower(), + cloudmask["probability"], + cloudmask["iterations"], + cloudmask["cld_shad"], + cloudmask["cld_med_prob"], + cloudmask["cld_hi_prob"], + cloudmask["thin_cir"], reprocess, ] cld_l2a_process_list.append(l2a_cm_details) logger.info( - "ajout {}: {} l2a products".format( + "{} adding: {} l2a products to process".format( row.tile, len(l2a_to_process) ) ) + if self.logs: + f.write( + "{} adding: {} l2a products to process\n".format( + row.tile, len(l2a_to_process) + ) + ) + f.flush() + logger.info( - "l2a cloudmasks process list ({} products): {}".format( + "Cloudmask computing process list ({} l2a products): {}".format( len(cld_l2a_process_list), cld_l2a_process_list ) ) + if self.logs: + f.write( + "Cloudmask computing process list ({} l2a products):\n".format( + len(cld_l2a_process_list), ) + ) + if cld_l2a_process_list: + for cld in cld_l2a_process_list: + f.write("{}\n\n".format(cld)) + else: + f.write("\n".format(cld)) + + f.flush() + cld_res = False if cld_l2a_process_list: cld_res = cld_version_probability_iterations_reprocessing_multiprocessing( cld_l2a_process_list, nb_proc=nb_proc ) - if self.logs: - f.write("\nTraitement des clouds : {}\n".format(cld_res)) - f.write( - "cld_l2a_process_list: \n" - + "\n".join(cld_l2a_process_list) - + "\n" - ) # Traitement des L2A (indices) logger.info("Computing indices") + if self.logs: + f.write("Computing indices\n") + f.flush() nodata_clouds = True quicklook = False indices_l2a_process_list = [] - for index, row in self.tasks.iterrows(): + for index, row in tasks.iterrows(): if not (row.indices == "False" or not row.indices): t = Tile(row.tile) # indices_list = row.indices.split("/") @@ -575,48 +738,76 @@ class Job: reprocess, nodata_clouds, quicklook, - row.cloudmask["cm_version"], - row.cloudmask["probability"], - row.cloudmask["iterations"], - row.cloudmask["cld_shad"], - row.cloudmask["cld_med_prob"], - row.cloudmask["cld_hi_prob"], - row.cloudmask["thin_cir"], + cloudmask["cm_version"], + cloudmask["probability"], + cloudmask["iterations"], + cloudmask["cld_shad"], + cloudmask["cld_med_prob"], + cloudmask["cld_hi_prob"], + cloudmask["thin_cir"], ] indices_l2a_process_list.append(l2a_ind_details) logger.info( - "ajout {} - {}: {} l2a products".format( + "{}: adding - {}: {} l2a products".format( row.tile, i, len(l2a_to_process) ) ) + if self.logs: + f.write( + "{}: adding - {}: {} l2a products\n".format( + row.tile, i, len(l2a_to_process) + ) + ) + f.flush() logger.info( - "l2a indices process list ({} products): {}".format( + "Indice computing process list ({} l2a products): {}".format( len(indices_l2a_process_list), indices_l2a_process_list ) ) + if self.logs: + f.write( + "Indice computing process list ({} l2a products): \n".format( + len(indices_l2a_process_list), + ) + ) + for ind in indices_l2a_process_list: + f.write("{}\n".format(ind)) + f.write("\n") + f.flush() + indices_res = False if indices_l2a_process_list: indices_res = idx_multiprocessing( indices_l2a_process_list, nb_proc=nb_proc ) - if self.logs: - f.write( - "\nTraitement des indices: {}\n".format(indices_res) - ) - f.write( - "indices_l2a_process_list: \n" - + "\n".join(indices_l2a_process_list) - + "\n" - ) # Remove L2A - # todo + l2a_remove_list = list( + set( + [cld[0] for cld in cld_l2a_process_list] + [ind[0] for ind in indices_l2a_process_list] + ) + ) + for index, row in tasks.iterrows(): + if "l2a" in str(row.remove).lower(): + t = Tile(row.tile) + prodlist = [p for p in l2a_remove_list if row.tile in p] + t.remove_l2a(prodlist) + logger.info("Removing {} produced l2a product(s): {}".format(len(prodlist), prodlist)) + if self.logs: + f.write("Removing {} produced l2a products: {}\n".format(len(prodlist))) + for l2a in prodlist: + f.write("{}\n\n".format(l2a)) + f.flush() # Cleaning after if clean_after: logger.info("Cleaning Tiles") + if self.logs: + f.write("Cleaning Tiles\n\n") + f.flush() + clean_list = [] - for index, row in self.tasks.iterrows(): + for index, row in tasks.iterrows(): clean_list.append(row.tile) lib = Library() pb_before = lib.clean( @@ -626,7 +817,10 @@ class Job: else: logger.info("No task defined for this job, doing nothing") + if self.logs: + f.write("No task defined for this job, doing nothing\n\n") + f.flush() if self.logs: - f.write("\nFin : {}\n".format(datetime.datetime.now())) + f.write("Fin : {}\n".format(datetime.datetime.now())) f.close() diff --git a/sen2chain/multi_processing.py b/sen2chain/multi_processing.py index b794ee27b25a3455d9380e25431537ab6b52044f..f33b580635b3ea49e391a08f838954345e8f27a1 100644 --- a/sen2chain/multi_processing.py +++ b/sen2chain/multi_processing.py @@ -19,22 +19,24 @@ def multi(product): try: fwd = os.path.dirname(os.path.realpath(__file__)) logger.info("Processing {}".format(product)) - cmd = [ - "setsid", - "/usr/bin/python3", - fwd + "/multiprocess_l2a.py", - product, - ] - proc = subprocess.Popen(cmd) - l1c = L1cProduct(product) - l2a_identifier = l1c.identifier.replace("L1C_", "L2A_").replace( - "_OPER_", "_USER_" - ) - l2a_prod = L2aProduct(l2a_identifier) - while not (l2a_prod.in_library): - sleep(5) - logger.info("End {}".format(product)) + if l1c.processable_to_l2a(): + cmd = [ + "setsid", + "/usr/bin/python3", + fwd + "/multiprocess_l2a.py", + product, + ] + proc = subprocess.Popen(cmd) + + # l1c = L1cProduct(product) + l2a_identifier = l1c.identifier.replace("L1C_", "L2A_").replace( + "_OPER_", "_USER_" + ) + l2a_prod = L2aProduct(l2a_identifier) + while not (l2a_prod.in_library): + sleep(5) + logger.info("End {}".format(product)) except: logger.info("Plante {}".format(product)) pass diff --git a/sen2chain/products.py b/sen2chain/products.py index 36a6e89fb79cfa86dd6723fc9b8cac325ffbc40b..79cc710727f2f68c18f82106a7b0f322078eef8a 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -203,12 +203,24 @@ class L1cProduct(Product): def __init__( self, identifier: str = None, tile: str = None, path: str = None ): - + super().__init__(identifier=identifier, tile=tile, path=path) - + if not re.match(r".*L1C_.*", self.identifier): raise ValueError("Invalid L1C product name") - + + def processable_to_l2a( + self, + s2c_path: Union[str, pathlib.PosixPath] = None, + ) -> "L1cProduct": + """ check if l1c can be processed to l2a with sen2cor """ + s2c_path = s2c_path or get_latest_s2c_version_path(self.identifier) + if s2c_path: + return True + else: + logger.info("{} not processable, check your Sen2Cor install".format(self.identifier)) + return False + def process_l2a( self, reprocess: bool = False, @@ -228,6 +240,8 @@ class L1cProduct(Product): s2c_path = s2c_path or get_latest_s2c_version_path(self.identifier) if s2c_path: process_it = True + else: + logger.info("{} not processable, check your Sen2Cor install".format(self.identifier)) else: if not reprocess: logger.info("{} already exists.".format(l2a_identifier)) diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index f07cf012ed9e67dfca52ac6ddadc3a75cc61bce1..cb999d76c3165a4ab3878b4db56161c7535b0159 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -839,6 +839,10 @@ class Tile: # elif ref == "ndvi": # toto = 13 ############# + if provider == "scihub": + NUM_THREADS = 4 + else: + NUM_THREADS = 8 if download: before_list = [p.location for p in dag.products] @@ -850,7 +854,7 @@ class Tile: ## multithreading elif dl_mode == "multit": - NUM_THREADS = 8 + #NUM_THREADS = 8 q = Queue() def do_stuff(q, dag): while True: diff --git a/sen2chain/xmlparser.py b/sen2chain/xmlparser.py index 8815a25a0dc3b39c6385f2f11261da9aaa12c82b..50610bad253b87b0110b8f01075d4924ca4e20fa 100644 --- a/sen2chain/xmlparser.py +++ b/sen2chain/xmlparser.py @@ -219,7 +219,7 @@ class Sen2ChainMetadataParser: self._root.find("SEN2CHAIN_PROCESSING_VERSION").text = Config().get( "sen2chain_processing_version" ) - self._root.find("SEN2COR_VERSION").text = get_current_Sen2Cor_version() + self._root.find("SEN2COR_VERSION").text = get_Sen2Cor_version() self._tree.write( str(self.xml_path), encoding="UTF-8", @@ -241,8 +241,10 @@ class Sen2ChainMetadataParser: "sen2chain_processing_version" ) self._root.find("SEN2COR_VERSION").text = ( - sen2cor_version or get_current_Sen2Cor_version() + sen2cor_version or get_Sen2Cor_version() ) self._tree.write( - str(self.xml_path), encoding="UTF-8", xml_declaration=True + str(self.xml_path), + encoding="UTF-8", + xml_declaration=True )