Skip to content
Snippets Groups Projects
Commit a723056a authored by pascal.mouquet_ird.fr's avatar pascal.mouquet_ird.fr
Browse files

Merge branch 'develop' into 'master'

Develop

See merge request espace-dev/sen2chain!1
parents 3c3b1e80 c346bf06
No related branches found
No related tags found
No related merge requests found
Pipeline #569 failed
...@@ -20,7 +20,7 @@ SHARED_DATA = dict( ...@@ -20,7 +20,7 @@ SHARED_DATA = dict(
tiles_index_dict = ROOT / "sen2chain" / "data" / "tiles_index_dict.p", tiles_index_dict = ROOT / "sen2chain" / "data" / "tiles_index_dict.p",
peps_download = ROOT / "sen2chain" / "peps_download3.py", peps_download = ROOT / "sen2chain" / "peps_download3.py",
sen2chain_meta = ROOT / "sen2chain" / "data" / "sen2chain_info.xml", sen2chain_meta = ROOT / "sen2chain" / "data" / "sen2chain_info.xml",
raw_job_cfg = ROOT / "sen2chain" / "data" / "job_0000000000.cfg", raw_job_cfg = ROOT / "sen2chain" / "data" / "job_ini.cfg",
) )
...@@ -62,6 +62,8 @@ class Config: ...@@ -62,6 +62,8 @@ class Config:
"proxy_https_url": ""} "proxy_https_url": ""}
self._config_params["SEN2CHAIN VERSIONS"] = {"sen2chain_processing_version": "xx.xx"} self._config_params["SEN2CHAIN VERSIONS"] = {"sen2chain_processing_version": "xx.xx"}
self._config_params["LOG PATH"] = {"log_path": str(self._USER_DIR / "logs")}
if self._CONFIG_FILE.exists(): if self._CONFIG_FILE.exists():
self._config_params.read(str(self._CONFIG_FILE)) self._config_params.read(str(self._CONFIG_FILE))
self._config_params_disk = ConfigParser() self._config_params_disk = ConfigParser()
...@@ -84,8 +86,9 @@ class Config: ...@@ -84,8 +86,9 @@ class Config:
self._USER_DIR.mkdir(exist_ok=True) self._USER_DIR.mkdir(exist_ok=True)
self._CONFIG_DIR.mkdir(exist_ok=True) self._CONFIG_DIR.mkdir(exist_ok=True)
self._DEFAULT_DATA_DIR.mkdir(exist_ok=True) self._DEFAULT_DATA_DIR.mkdir(exist_ok=True)
self.__JOBS_DIR.mkdir(exist_ok=True) self._JOBS_DIR.mkdir(exist_ok=True)
(self.__JOBS_DIR / "logs").mkdir(exist_ok=True) Path(self._config_params["LOG PATH"]["log_path"]).mkdir(exist_ok = True)
#~ (self.__JOBS_DIR / "logs").mkdir(exist_ok=True)
with open(str(self._CONFIG_FILE), "w") as cfg_file: with open(str(self._CONFIG_FILE), "w") as cfg_file:
self._config_params.write(cfg_file) self._config_params.write(cfg_file)
......
logs = False
timing = 0 0 * * *
tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;logs
### Parameters ### Parameters
# logs: True | False
# timing: in cron format
# tile: tile identifier, format ##XXX # tile: tile identifier, format ##XXX
# l1c: download l1c: True|False # l1c: download l1c: True|False
# l2a: True | False # l2a: compute l2a with sen2chain: True | False
# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0 # cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0
# indices: False | All | NDVI/NDWIGAO/etc. # indices: False | All | NDVI/NDWIGAO/etc.
# logs: False | True
logs = False
timing = 0 0 * * *
tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices
...@@ -12,6 +12,8 @@ import re ...@@ -12,6 +12,8 @@ import re
import distutils import distutils
from crontab import CronTab from crontab import CronTab
from collections import OrderedDict from collections import OrderedDict
from configparser import ConfigParser
from .config import SHARED_DATA, Config from .config import SHARED_DATA, Config
from .indices import IndicesCollection from .indices import IndicesCollection
...@@ -34,13 +36,19 @@ class Jobs(): ...@@ -34,13 +36,19 @@ class Jobs():
self._jid_set = set([t.stem[4:] for t in self._jobs_all]) self._jid_set = set([t.stem[4:] for t in self._jobs_all])
self._config_files_exist = [] self._config_files_exist = []
self._py_scripts_exist = [] self._py_scripts_exist = []
self._logs = []
self._timing = []
self._cron_status = [] self._cron_status = []
self._cron_timing = []
for jid in self._jid_set: for jid in self._jid_set:
logger.disabled = True logger.disabled = True
j = Job(jid) j = Job(jid)
self._config_files_exist.append(j._config_path.exists()) self._config_files_exist.append(j._config_path.exists())
self._py_scripts_exist.append(j._python_script_path.exists()) self._py_scripts_exist.append(j._python_script_path.exists())
self._logs.append(j.logs)
self._timing.append(j.timing)
self._cron_status.append(j.cron_status) self._cron_status.append(j.cron_status)
self._cron_timing.append(j.cron_timing)
logger.disabled = False logger.disabled = False
...@@ -48,8 +56,11 @@ class Jobs(): ...@@ -48,8 +56,11 @@ class Jobs():
return repr(pd.DataFrame(OrderedDict([("job_id", list(self._jid_set)), return repr(pd.DataFrame(OrderedDict([("job_id", list(self._jid_set)),
("config_file", self._config_files_exist), ("config_file", self._config_files_exist),
("python_script", self._py_scripts_exist), ("python_script", self._py_scripts_exist),
("logging", self._logs),
("timing", self._timing),
("cron_status", self._cron_status), ("cron_status", self._cron_status),
("cron_timing", self._cron_timing),
]))) ])))
###### >>> job.render().split(' ')[0:4] ###### >>> job.render().split(' ')[0:4]
...@@ -62,7 +73,7 @@ class Jobs(): ...@@ -62,7 +73,7 @@ class Jobs():
def list(self): def list(self):
return [t.stem[4:] for t in self._jobs_all] return [t.stem[4:] for t in self._jobs_all]
def erase(self, def remove(self,
jid: str = None): jid: str = None):
if jid in self.list: if jid in self.list:
logger.disabled = True logger.disabled = True
...@@ -92,12 +103,13 @@ class Job(): ...@@ -92,12 +103,13 @@ class Job():
if self._config_path.exists(): if self._config_path.exists():
logger.info("Reading existing config...") logger.info("Reading existing config...")
self.read(self._config_path) self.read(self._config_path)
#~ logger.info("\n{}".format(self.tasks))
else: else:
logger.info("Creating new job...") logger.info("Creating new job...")
self.init() self.init()
#~ logger.info("\n{}".format(self.tasks))
self._cron = CronTab(user=True) self._cron = CronTab(user=True)
self.cron_status = self.get_cron_status()
self._log_folder_path = Path(Config().get("log_path")) / ("job_" + jid)
def __repr__(self): def __repr__(self):
return repr(self.tasks) return repr(self.tasks)
...@@ -113,7 +125,7 @@ class Job(): ...@@ -113,7 +125,7 @@ class Job():
("l2a", [False]), ("l2a", [False]),
("cloudmask", [False]), ("cloudmask", [False]),
("indices", [False]), ("indices", [False]),
("logs", [False]), #~ ("logs", [False]),
]) ])
self.tasks = pd.DataFrame(first_row) self.tasks = pd.DataFrame(first_row)
...@@ -129,7 +141,7 @@ class Job(): ...@@ -129,7 +141,7 @@ class Job():
"l2a": [False], "l2a": [False],
"cloudmask": [False], "cloudmask": [False],
"indices": [False], "indices": [False],
"logs": [False], #~ "logs": [False],
}) })
#~ self.tasks = self.tasks.append(row, ignore_index=True) #~ self.tasks = self.tasks.append(row, ignore_index=True)
self.tasks = pd.concat([self.tasks, row], ignore_index = True)[self.tasks.columns] self.tasks = pd.concat([self.tasks, row], ignore_index = True)[self.tasks.columns]
...@@ -169,8 +181,8 @@ class Job(): ...@@ -169,8 +181,8 @@ class Job():
# save task to disk # save task to disk
with open(str(self._config_path), 'w') as ict: with open(str(self._config_path), 'w') as ict:
header = '\n'.join( header = '\n'.join(
['logs = False', ['logs = ' + str(self.logs),
'timing = * * * * *', 'timing = ' + self.timing,
'', '',
] ]
) )
...@@ -178,30 +190,22 @@ class Job(): ...@@ -178,30 +190,22 @@ class Job():
ict.write(line) ict.write(line)
#~ self.tasks.to_csv(ict) #~ self.tasks.to_csv(ict)
self.tasks.to_csv(ict, index = False, sep=';') self.tasks.to_csv(ict, index = False, sep=';')
@property def get_cron_status(self):
def cron_status(self):
# display cron status: disabled | enabled (frenquency)
iter = list(self._cron.find_comment("sen2chain_job_" + self.jid)) iter = list(self._cron.find_comment("sen2chain_job_" + self.jid))
if iter: if iter:
for job in iter: for job in iter:
if job.is_enabled(): if job.is_enabled():
logger.info("Job in cron - Enabled: {}".format(job))
status = "enabled" status = "enabled"
else: else:
logger.info("Job in cron - Disabled: {}".format(job))
status = "disabled" status = "disabled"
self.cron_job = job
self.cron_timing = str(self.cron_job.slices)
else: else:
logger.info("Job not installed in cron")
status = "absent" status = "absent"
self.cron_job = None
self.cron_timing = None
return status return status
#~ if iter:
#~ return True
#~ else:
#~ return False
#~ for entry in cron:
#~ print(entry)
def create_python_script(self): def create_python_script(self):
lines = ["# -*- coding:utf-8 -*-\n"] lines = ["# -*- coding:utf-8 -*-\n"]
...@@ -216,7 +220,6 @@ class Job(): ...@@ -216,7 +220,6 @@ class Job():
f.writelines(lines) f.writelines(lines)
def cron_enable(self, def cron_enable(self,
timing: str = None,
): ):
# enable job in cron # enable job in cron
self.save() self.save()
...@@ -225,13 +228,13 @@ class Job(): ...@@ -225,13 +228,13 @@ class Job():
if iter: if iter:
for job in iter: for job in iter:
logger.info("Enabling job") logger.info("Enabling job")
if timing: if self.timing:
job.setall(timing) job.setall(self.timing)
job.enable() job.enable()
else: else:
job = self._cron.new(command="/usr/bin/python3 " + str(self._python_script_path), comment="sen2chain_job_" + self.jid) job = self._cron.new(command="/usr/bin/python3 " + str(self._python_script_path), comment="sen2chain_job_" + self.jid)
if timing: if self.timing:
job.setall(timing) job.setall(self.timing)
else: else:
job.setall("0 0 * * *") job.setall("0 0 * * *")
job.enable() job.enable()
...@@ -240,7 +243,8 @@ class Job(): ...@@ -240,7 +243,8 @@ class Job():
#~ logger.info("Time: {}".format(job.time)) #~ logger.info("Time: {}".format(job.time))
self._cron.write() self._cron.write()
#~ new.enable(False) #~ new.enable(False)
self.cron_status self.get_cron_status()
def cron_disable(self): def cron_disable(self):
# disable / commenting job in cron # disable / commenting job in cron
...@@ -250,7 +254,7 @@ class Job(): ...@@ -250,7 +254,7 @@ class Job():
logger.info("Disabling job...") logger.info("Disabling job...")
job.enable(False) job.enable(False)
self._cron.write() self._cron.write()
self.cron_status self.get_cron_status()
def cron_remove(self): def cron_remove(self):
# remove job from cron # remove job from cron
...@@ -260,14 +264,20 @@ class Job(): ...@@ -260,14 +264,20 @@ class Job():
logger.info("Removing job from cron...") logger.info("Removing job from cron...")
self._cron.remove(job) self._cron.remove(job)
self._cron.write() self._cron.write()
self.cron_status self.get_cron_status()
def read(self, def read(self,
path): path):
parser = ConfigParser(allow_no_value=True)
with open(str(path)) as stream:
parser.read_string("[top]\n" + stream.read()) # This line does the trick.
self.logs = bool(distutils.util.strtobool(parser['top']['logs']))
self.timing = parser['top']['timing']
self.tasks = pd.read_csv(path, sep = ';', na_values="", na_filter=False, comment='#', dtype = str, header = 2) self.tasks = pd.read_csv(path, sep = ';', na_values="", na_filter=False, comment='#', dtype = str, header = 2)
if "logs" not in self.tasks: #~ if "logs" not in self.tasks:
self.tasks["logs"] = False #~ self.tasks["logs"] = False
for index, row in self.tasks.iterrows(): for index, row in self.tasks.iterrows():
if not row.date_min: if not row.date_min:
...@@ -320,6 +330,14 @@ class Job(): ...@@ -320,6 +330,14 @@ class Job():
clean_before: bool = False, clean_before: bool = False,
clean_after: bool = False): clean_after: bool = False):
if self.logs:
self._log_folder_path.mkdir(exist_ok = True)
self._log_file = self._log_folder_path / ("job_" + self.jid + "_run_"+ datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".log")
f = open(str(self._log_file), "w")
f.write("Debut : {}\n\n".format(datetime.datetime.now()))
f.write(repr(self) + "\n")
f.flush()
if not self.tasks.empty: if not self.tasks.empty:
# Telechargement # Telechargement
...@@ -350,6 +368,9 @@ class Job(): ...@@ -350,6 +368,9 @@ class Job():
if l1c_process_list: if l1c_process_list:
l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc = nb_proc) l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc = nb_proc)
#~ logger.info("je multiprocess les l1c en l2a") #~ logger.info("je multiprocess les l1c en l2a")
if self.logs:
f.write("\nTraitement des l1c : {}\n".format(l2a_res))
f.write("l1c_process_list: \n" + "\n".join(l1c_process_list) + "\n")
# Traitement des L2A (clouds) # Traitement des L2A (clouds)
logger.info("Computing cloudmasks") logger.info("Computing cloudmasks")
...@@ -377,6 +398,9 @@ class Job(): ...@@ -377,6 +398,9 @@ class Job():
cld_res = False cld_res = False
if cld_l2a_process_list: if cld_l2a_process_list:
cld_res = cld_version_probability_iterations_reprocessing_multiprocessing(cld_l2a_process_list, nb_proc = nb_proc) cld_res = cld_version_probability_iterations_reprocessing_multiprocessing(cld_l2a_process_list, nb_proc = nb_proc)
if self.logs:
f.write("\nTraitement des clouds : {}\n".format(cld_res))
f.write("cld_l2a_process_list: \n" + "\n".join(cld_l2a_process_list) + "\n")
# Traitement des L2A (indices) # Traitement des L2A (indices)
logger.info("Computing indices") logger.info("Computing indices")
...@@ -411,7 +435,14 @@ class Job(): ...@@ -411,7 +435,14 @@ class Job():
logger.info("l2a indices process list ({} products): {}".format(len(indices_l2a_process_list), indices_l2a_process_list)) logger.info("l2a indices process list ({} products): {}".format(len(indices_l2a_process_list), indices_l2a_process_list))
indices_res = False indices_res = False
if indices_l2a_process_list: if indices_l2a_process_list:
indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc = nb_proc) indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc = nb_proc)
if self.logs:
f.write("\nTraitement des indices: {}\n".format(indices_res))
f.write("indices_l2a_process_list: \n" + "\n".join(indices_l2a_process_list) + "\n")
else: else:
logger.info("No task defined for this job, doing nothing") logger.info("No task defined for this job, doing nothing")
if self.logs:
f.write("\nFin : {}\n".format(datetime.datetime.now()))
f.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment