diff --git a/sen2chain/config.py b/sen2chain/config.py index d7f2edf3f98f92bf9da17c988c77173aa1b91944..3454613b2e5b03375876a20b9bceefa5e5d4c0b5 100644 --- a/sen2chain/config.py +++ b/sen2chain/config.py @@ -20,7 +20,7 @@ SHARED_DATA = dict( tiles_index_dict = ROOT / "sen2chain" / "data" / "tiles_index_dict.p", peps_download = ROOT / "sen2chain" / "peps_download3.py", sen2chain_meta = ROOT / "sen2chain" / "data" / "sen2chain_info.xml", - raw_job_cfg = ROOT / "sen2chain" / "data" / "job_0000000000.cfg", + raw_job_cfg = ROOT / "sen2chain" / "data" / "job_ini.cfg", ) @@ -62,6 +62,8 @@ class Config: "proxy_https_url": ""} self._config_params["SEN2CHAIN VERSIONS"] = {"sen2chain_processing_version": "xx.xx"} + self._config_params["LOG PATH"] = {"log_path": str(self._USER_DIR / "logs")} + if self._CONFIG_FILE.exists(): self._config_params.read(str(self._CONFIG_FILE)) self._config_params_disk = ConfigParser() @@ -84,8 +86,9 @@ class Config: self._USER_DIR.mkdir(exist_ok=True) self._CONFIG_DIR.mkdir(exist_ok=True) self._DEFAULT_DATA_DIR.mkdir(exist_ok=True) - self.__JOBS_DIR.mkdir(exist_ok=True) - (self.__JOBS_DIR / "logs").mkdir(exist_ok=True) + self._JOBS_DIR.mkdir(exist_ok=True) + Path(self._config_params["LOG PATH"]["log_path"]).mkdir(exist_ok = True) + #~ (self.__JOBS_DIR / "logs").mkdir(exist_ok=True) with open(str(self._CONFIG_FILE), "w") as cfg_file: self._config_params.write(cfg_file) diff --git a/sen2chain/data/job_0000000000.cfg b/sen2chain/data/job_ini.cfg similarity index 61% rename from sen2chain/data/job_0000000000.cfg rename to sen2chain/data/job_ini.cfg index b6f356a0134778d19defc2ddba07a4ed14181e0d..098895841a09ddc8de791cdf1edcf0eec70a987e 100644 --- a/sen2chain/data/job_0000000000.cfg +++ b/sen2chain/data/job_ini.cfg @@ -1,12 +1,12 @@ -logs = False -timing = 0 0 * * * -tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;logs - ###Â Parameters +# logs: True | False +# timing: in cron format # tile: tile identifier, format ##XXX # l1c: download l1c: True|False -# l2a: True | False +# l2a: compute l2a with sen2chain: True | False # cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0 # indices: False | All | NDVI/NDWIGAO/etc. -# logs: False | True +logs = False +timing = 0 0 * * * +tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index 0975dbd112269af15c27f9ddb73130e8f6ee6060..7b835a50b3f2ecc472f3449fe3f536238d8961e5 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -12,6 +12,8 @@ import re import distutils from crontab import CronTab from collections import OrderedDict +from configparser import ConfigParser + from .config import SHARED_DATA, Config from .indices import IndicesCollection @@ -34,13 +36,19 @@ class Jobs(): self._jid_set = set([t.stem[4:] for t in self._jobs_all]) self._config_files_exist = [] self._py_scripts_exist = [] + self._logs = [] + self._timing = [] self._cron_status = [] + self._cron_timing = [] for jid in self._jid_set: logger.disabled = True j = Job(jid) self._config_files_exist.append(j._config_path.exists()) self._py_scripts_exist.append(j._python_script_path.exists()) + self._logs.append(j.logs) + self._timing.append(j.timing) self._cron_status.append(j.cron_status) + self._cron_timing.append(j.cron_timing) logger.disabled = False @@ -48,8 +56,11 @@ class Jobs(): return repr(pd.DataFrame(OrderedDict([("job_id", list(self._jid_set)), ("config_file", self._config_files_exist), ("python_script", self._py_scripts_exist), + ("logging", self._logs), + ("timing", self._timing), ("cron_status", self._cron_status), - + ("cron_timing", self._cron_timing), + ]))) ###### >>> job.render().split(' ')[0:4] @@ -62,7 +73,7 @@ class Jobs(): def list(self): return [t.stem[4:] for t in self._jobs_all] - def erase(self, + def remove(self, jid: str = None): if jid in self.list: logger.disabled = True @@ -92,12 +103,13 @@ class Job(): if self._config_path.exists(): logger.info("Reading existing config...") self.read(self._config_path) - #~ logger.info("\n{}".format(self.tasks)) else: logger.info("Creating new job...") self.init() - #~ logger.info("\n{}".format(self.tasks)) self._cron = CronTab(user=True) + self.cron_status = self.get_cron_status() + + self._log_folder_path = Path(Config().get("log_path")) / ("job_" + jid) def __repr__(self): return repr(self.tasks) @@ -113,7 +125,7 @@ class Job(): ("l2a", [False]), ("cloudmask", [False]), ("indices", [False]), - ("logs", [False]), + #~ ("logs", [False]), ]) self.tasks = pd.DataFrame(first_row) @@ -129,7 +141,7 @@ class Job(): "l2a": [False], "cloudmask": [False], "indices": [False], - "logs": [False], + #~ "logs": [False], }) #~ self.tasks = self.tasks.append(row, ignore_index=True) self.tasks = pd.concat([self.tasks, row], ignore_index = True)[self.tasks.columns] @@ -169,8 +181,8 @@ class Job(): # save task to disk with open(str(self._config_path), 'w') as ict: header = '\n'.join( - ['logs = False', - 'timing = * * * * *', + ['logs = ' + str(self.logs), + 'timing = ' + self.timing, '', ] ) @@ -178,30 +190,22 @@ class Job(): ict.write(line) #~ self.tasks.to_csv(ict) self.tasks.to_csv(ict, index = False, sep=';') - - @property - def cron_status(self): - # display cron status: disabled | enabled (frenquency) + def get_cron_status(self): iter = list(self._cron.find_comment("sen2chain_job_" + self.jid)) if iter: for job in iter: if job.is_enabled(): - logger.info("Job in cron - Enabled: {}".format(job)) status = "enabled" else: - logger.info("Job in cron - Disabled: {}".format(job)) status = "disabled" + self.cron_job = job + self.cron_timing = str(self.cron_job.slices) else: - logger.info("Job not installed in cron") status = "absent" + self.cron_job = None + self.cron_timing = None return status - #~ if iter: - #~ return True - #~ else: - #~ return False - #~ for entry in cron: - #~ print(entry) def create_python_script(self): lines = ["# -*- coding:utf-8 -*-\n"] @@ -216,7 +220,6 @@ class Job(): f.writelines(lines) def cron_enable(self, - timing: str = None, ): # enable job in cron self.save() @@ -225,13 +228,13 @@ class Job(): if iter: for job in iter: logger.info("Enabling job") - if timing: - job.setall(timing) + if self.timing: + job.setall(self.timing) job.enable() else: job = self._cron.new(command="/usr/bin/python3 " + str(self._python_script_path), comment="sen2chain_job_" + self.jid) - if timing: - job.setall(timing) + if self.timing: + job.setall(self.timing) else: job.setall("0 0 * * *") job.enable() @@ -240,7 +243,8 @@ class Job(): #~ logger.info("Time: {}".format(job.time)) self._cron.write() #~ new.enable(False) - self.cron_status + self.get_cron_status() + def cron_disable(self): # disable / commenting job in cron @@ -250,7 +254,7 @@ class Job(): logger.info("Disabling job...") job.enable(False) self._cron.write() - self.cron_status + self.get_cron_status() def cron_remove(self): # remove job from cron @@ -260,14 +264,20 @@ class Job(): logger.info("Removing job from cron...") self._cron.remove(job) self._cron.write() - self.cron_status + self.get_cron_status() def read(self, path): + parser = ConfigParser(allow_no_value=True) + with open(str(path)) as stream: + parser.read_string("[top]\n" + stream.read()) # This line does the trick. + self.logs = bool(distutils.util.strtobool(parser['top']['logs'])) + self.timing = parser['top']['timing'] + self.tasks = pd.read_csv(path, sep = ';', na_values="", na_filter=False, comment='#', dtype = str, header = 2) - if "logs" not in self.tasks: - self.tasks["logs"] = False + #~ if "logs" not in self.tasks: + #~ self.tasks["logs"] = False for index, row in self.tasks.iterrows(): if not row.date_min: @@ -320,6 +330,14 @@ class Job(): clean_before: bool = False, clean_after: bool = False): + if self.logs: + self._log_folder_path.mkdir(exist_ok = True) + self._log_file = self._log_folder_path / ("job_" + self.jid + "_run_"+ datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + ".log") + f = open(str(self._log_file), "w") + f.write("Debut : {}\n\n".format(datetime.datetime.now())) + f.write(repr(self) + "\n") + f.flush() + if not self.tasks.empty: # Telechargement @@ -350,6 +368,9 @@ class Job(): if l1c_process_list: l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc = nb_proc) #~ logger.info("je multiprocess les l1c en l2a") + if self.logs: + f.write("\nTraitement des l1c : {}\n".format(l2a_res)) + f.write("l1c_process_list: \n" + "\n".join(l1c_process_list) + "\n") # Traitement des L2A (clouds) logger.info("Computing cloudmasks") @@ -377,6 +398,9 @@ class Job(): cld_res = False if cld_l2a_process_list: cld_res = cld_version_probability_iterations_reprocessing_multiprocessing(cld_l2a_process_list, nb_proc = nb_proc) + if self.logs: + f.write("\nTraitement des clouds : {}\n".format(cld_res)) + f.write("cld_l2a_process_list: \n" + "\n".join(cld_l2a_process_list) + "\n") # Traitement des L2A (indices) logger.info("Computing indices") @@ -411,7 +435,14 @@ class Job(): logger.info("l2a indices process list ({} products): {}".format(len(indices_l2a_process_list), indices_l2a_process_list)) indices_res = False if indices_l2a_process_list: - indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc = nb_proc) + indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc = nb_proc) + if self.logs: + f.write("\nTraitement des indices: {}\n".format(indices_res)) + f.write("indices_l2a_process_list: \n" + "\n".join(indices_l2a_process_list) + "\n") + else: logger.info("No task defined for this job, doing nothing") + if self.logs: + f.write("\nFin : {}\n".format(datetime.datetime.now())) + f.close()