Skip to content
Snippets Groups Projects
Commit ad7ce4ac authored by pascal.mouquet_ird.fr's avatar pascal.mouquet_ird.fr
Browse files

Many updates and bugfixes to Job

parent 6daec4f3
No related branches found
No related tags found
No related merge requests found
Pipeline #1236 passed
### Parameters
# logs: True | False
# timing: in cron format
# tile: tile identifier, format ##XXX
# tile: tile identifier, format ##XXX, comment line using ! before tile name
# l1c: download l1c: True|False
# l2a: compute l2a with sen2chain: True | False
# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0
......
......@@ -116,7 +116,7 @@ class S2cEodag:
self.products.remove(p)
logger.info("{} - local l1c {} - filtering (wrong Tile)".format(p.properties["title"], l1c_presence))
else:
logger.info(p.properties["cloudCover"])
# logger.info(p.properties["cloudCover"])
if not(min_cloudcover <= float(p.properties["cloudCover"] or 0) <= max_cloudcover):
self.products.remove(p)
logger.info("{} - local l1c {} - filtering (CC = {}%)".format(p.properties["title"], l1c_presence, int(p.properties["cloudCover"])))
......
......@@ -24,6 +24,7 @@ from .utils import datetime_to_str
from .multi_processing import (
l2a_multiprocessing,
idx_multiprocessing,
cld_version_probability_iterations_reprocessing_multiprocessing,
)
from .download_eodag import (
S2cEodag_download,
......@@ -211,7 +212,7 @@ class Job:
"### Parameters",
"# logs: True | False",
"# timing: in cron format",
"# tile: tile identifier, format ##XXX",
"# tile: tile identifier, format ##XXX, comment line using ! before tile name"
"# l1c: download l1c: True|False",
"# l2a: compute l2a with sen2chain: True | False",
"# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0",
......@@ -340,6 +341,7 @@ class Job:
dtype=str,
header=2,
)
# if "logs" not in self.tasks:
# self.tasks["logs"] = False
......@@ -424,7 +426,7 @@ class Job:
returned_val["thin_cir"] = True
except:
returned_val = {
"cm_version": "CM001",
"cm_version": "cm001",
"probability": 1,
"iterations": 5,
"cld_shad": True,
......@@ -443,7 +445,7 @@ class Job:
clean_before: bool = False,
clean_after: bool = False,
):
if self.logs:
self._log_folder_path.mkdir(exist_ok=True)
self._log_file = self._log_folder_path / (
......@@ -454,16 +456,23 @@ class Job:
+ ".log"
)
f = open(str(self._log_file), "w")
f.write("Debut : {}\n\n".format(datetime.datetime.now()))
f.write(repr(self) + "\n")
f.write("Start : {}\n\n".format(datetime.datetime.now()))
f.write(repr(self) + "\n\n")
f.flush()
if not self.tasks.empty:
tasks = self.tasks
tasks = tasks[tasks["tile"].str.contains("!") == False]
if not tasks.empty:
## Cleaning before
if clean_before:
logger.info("Cleaning Tiles")
if self.logs:
f.write("Cleaning Tiles\n")
f.flush()
clean_list = []
for index, row in self.tasks.iterrows():
for index, row in tasks.iterrows():
clean_list.append(row.tile)
lib = Library()
pb_before = lib.clean(
......@@ -474,7 +483,7 @@ class Job:
## L1C download - each tile sequential
# download_list= []
# logger.info("Downloading l1c seq")
# for index, row in self.tasks.iterrows():
# for index, row in tasks.iterrows():
# if bool(setuptools.distutils.util.strtobool(str(row.l1c))):
# t = Tile(row.tile)
# logger.info("Tile: {}".format(t.name))
......@@ -492,11 +501,19 @@ class Job:
## L1C download - all tiles multi
download_list= []
logger.info("Searching l1c products")
for index, row in self.tasks.iterrows():
logger.info("Downloading l1c products")
if self.logs:
f.write("Downloading l1c products\n")
f.flush()
for index, row in tasks.iterrows():
if bool(setuptools.distutils.util.strtobool(str(row.l1c))):
t = Tile(row.tile)
logger.info("Tile: {}".format(t.name))
if self.logs:
f.write("Tile: {}\n".format(t.name))
f.flush()
tile_download_list = t.get_l1c(
provider = "peps",
download = False,
......@@ -514,11 +531,18 @@ class Job:
after_list = [p.location for p in download_list]
downloaded_products = [Path(after_list[i]).stem for i in range(len(before_list)) if before_list[i] != after_list[i]]
logger.info("Downloaded product(s): {}".format(downloaded_products))
if self.logs:
f.write("{} downloaded product(s): {}\n\n".format(len(downloaded_products), downloaded_products))
f.flush()
# Traitement des L1C en L2A
logger.info("Computing l2a")
if self.logs:
f.write("Computing l2a\n")
f.flush()
l1c_process_list = []
for index, row in self.tasks.iterrows():
for index, row in tasks.iterrows():
if bool(setuptools.distutils.util.strtobool(str(row.l2a))):
t = Tile(row.tile)
l1c_to_process = list(
......@@ -529,41 +553,49 @@ class Job:
)
l1c_process_list.append(l1c_to_process)
logger.info(
"ajout {}: {} l1c files".format(
"ajout {}: {} l1c files\n".format(
row.tile, len(l1c_to_process)
)
)
l1c_process_list = list(chain.from_iterable(l1c_process_list))
logger.info(
"l1c Sen2Cor process list ({} files): {}".format(
"Process list ({} files): {}".format(
len(l1c_process_list), l1c_process_list
)
)
if self.logs:
f.write(
"Process list ({} files): {}\n\n".format(
len(l1c_process_list), l1c_process_list
)
)
f.flush()
l2a_res = False
if l1c_process_list:
l2a_res = l2a_multiprocessing(
l1c_process_list, nb_proc=nb_proc
)
# logger.info("je multiprocess les l1c en l2a")
if self.logs:
f.write("\nTraitement des l1c : {}\n".format(l2a_res))
f.write(
"l1c_process_list: \n"
+ "\n".join(l1c_process_list)
+ "\n"
)
# Remove downloaded L1C
for index, row in self.tasks.iterrows():
for index, row in tasks.iterrows():
if "l1c" in str(row.remove).lower():
t = Tile(row.tile)
t.remove_l1c([p for p in downloaded_products if row.tile in p])
# Traitement des L2A (clouds)
prodlist = [p for p in downloaded_products if row.tile in p]
t.remove_l1c(prodlist)
logger.info("Removing downloaded l1c products: {}".format(prodlist))
if self.logs:
f.write("Removing downloaded l1c products: {}\n\n".format(prodlist))
f.flush()
# Comuting cloudmasks (from L2A)
logger.info("Computing cloudmasks")
if self.logs:
f.write("Computing cloudmasks\n")
f.flush()
reprocess = False
cld_l2a_process_list = []
for index, row in self.tasks.iterrows():
for index, row in tasks.iterrows():
# if not bool(distutils.util.strtobool(str(row.cloudmask))):
if not (row.cloudmask == "False" or not row.cloudmask):
cloudmask = self.get_cm_version(row.cloudmask)
......@@ -588,34 +620,46 @@ class Job:
]
cld_l2a_process_list.append(l2a_cm_details)
logger.info(
"ajout {}: {} l2a products".format(
"{} adding: {} l2a products to process".format(
row.tile, len(l2a_to_process)
)
)
if self.logs:
f.write(
"{} adding: {} l2a products to process\n".format(
row.tile, len(l2a_to_process)
)
)
f.flush()
logger.info(
"l2a cloudmasks process list ({} products): {}".format(
"Cloudmask computing process list ({} l2a products): {}".format(
len(cld_l2a_process_list), cld_l2a_process_list
)
)
if self.logs:
f.write(
"Cloudmask computing process list ({} l2a products): {}\n\n".format(
len(cld_l2a_process_list), cld_l2a_process_list
)
)
f.flush()
cld_res = False
if cld_l2a_process_list:
cld_res = cld_version_probability_iterations_reprocessing_multiprocessing(
cld_l2a_process_list, nb_proc=nb_proc
)
if self.logs:
f.write("\nTraitement des clouds : {}\n".format(cld_res))
f.write(
"cld_l2a_process_list: \n"
+ "\n".join(cld_l2a_process_list)
+ "\n"
)
# Traitement des L2A (indices)
logger.info("Computing indices")
if self.logs:
f.write("Computing indices\n")
f.flush()
nodata_clouds = True
quicklook = False
indices_l2a_process_list = []
for index, row in self.tasks.iterrows():
for index, row in tasks.iterrows():
if not (row.indices == "False" or not row.indices):
t = Tile(row.tile)
# indices_list = row.indices.split("/")
......@@ -647,29 +691,35 @@ class Job:
]
indices_l2a_process_list.append(l2a_ind_details)
logger.info(
"ajout {} - {}: {} l2a products".format(
"{}: adding - {}: {} l2a products".format(
row.tile, i, len(l2a_to_process)
)
)
if self.logs:
f.write(
"{}: adding - {}: {} l2a products".format(
row.tile, i, len(l2a_to_process)
)
)
f.flush()
logger.info(
"l2a indices process list ({} products): {}".format(
"Indice computing process list ({} l2a products): {}".format(
len(indices_l2a_process_list), indices_l2a_process_list
)
)
if self.logs:
f.write(
"Indice computing process list ({} l2a products): {}\n\n".format(
len(indices_l2a_process_list), indices_l2a_process_list
)
)
f.flush()
indices_res = False
if indices_l2a_process_list:
indices_res = idx_multiprocessing(
indices_l2a_process_list, nb_proc=nb_proc
)
if self.logs:
f.write(
"\nTraitement des indices: {}\n".format(indices_res)
)
f.write(
"indices_l2a_process_list: \n"
+ "\n".join(indices_l2a_process_list)
+ "\n"
)
# Remove L2A
# todo
......@@ -677,8 +727,12 @@ class Job:
# Cleaning after
if clean_after:
logger.info("Cleaning Tiles")
if self.logs:
f.write("Cleaning Tiles\n\n")
f.flush()
clean_list = []
for index, row in self.tasks.iterrows():
for index, row in tasks.iterrows():
clean_list.append(row.tile)
lib = Library()
pb_before = lib.clean(
......@@ -688,7 +742,10 @@ class Job:
else:
logger.info("No task defined for this job, doing nothing")
if self.logs:
f.write("No task defined for this job, doing nothing\n\n")
f.flush()
if self.logs:
f.write("\nFin : {}\n".format(datetime.datetime.now()))
f.write("Fin : {}\n".format(datetime.datetime.now()))
f.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment