Skip to content
Snippets Groups Projects
Commit a6a13b4d authored by pascal.mouquet_ird.fr's avatar pascal.mouquet_ird.fr
Browse files

updated jobs

parent 3c3b1e80
No related branches found
No related tags found
No related merge requests found
...@@ -20,7 +20,7 @@ SHARED_DATA = dict( ...@@ -20,7 +20,7 @@ SHARED_DATA = dict(
tiles_index_dict = ROOT / "sen2chain" / "data" / "tiles_index_dict.p", tiles_index_dict = ROOT / "sen2chain" / "data" / "tiles_index_dict.p",
peps_download = ROOT / "sen2chain" / "peps_download3.py", peps_download = ROOT / "sen2chain" / "peps_download3.py",
sen2chain_meta = ROOT / "sen2chain" / "data" / "sen2chain_info.xml", sen2chain_meta = ROOT / "sen2chain" / "data" / "sen2chain_info.xml",
raw_job_cfg = ROOT / "sen2chain" / "data" / "job_0000000000.cfg", raw_job_cfg = ROOT / "sen2chain" / "data" / "job_ini.cfg",
) )
......
logs = False
timing = 0 0 * * *
tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices;logs
### Parameters ### Parameters
# logs: True | False
# timing: in cron format
# tile: tile identifier, format ##XXX # tile: tile identifier, format ##XXX
# l1c: download l1c: True|False # l1c: download l1c: True|False
# l2a: True | False # l2a: True | False
# cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0 # cloudmasks: False | CM001 | CM002 | CM003-PRB1-ITER5 | CM004-CSH1-CMP1-CHP1-TCI1-ITER0
# indices: False | All | NDVI/NDWIGAO/etc. # indices: False | All | NDVI/NDWIGAO/etc.
# logs: False | True
logs = False
timing = 0 0 * * *
tile;date_min;date_max;max_clouds;l1c;l2a;cloudmask;indices
...@@ -12,6 +12,8 @@ import re ...@@ -12,6 +12,8 @@ import re
import distutils import distutils
from crontab import CronTab from crontab import CronTab
from collections import OrderedDict from collections import OrderedDict
from configparser import ConfigParser
from .config import SHARED_DATA, Config from .config import SHARED_DATA, Config
from .indices import IndicesCollection from .indices import IndicesCollection
...@@ -34,13 +36,19 @@ class Jobs(): ...@@ -34,13 +36,19 @@ class Jobs():
self._jid_set = set([t.stem[4:] for t in self._jobs_all]) self._jid_set = set([t.stem[4:] for t in self._jobs_all])
self._config_files_exist = [] self._config_files_exist = []
self._py_scripts_exist = [] self._py_scripts_exist = []
self._logs = []
self._timing = []
self._cron_status = [] self._cron_status = []
self._cron_timing = []
for jid in self._jid_set: for jid in self._jid_set:
logger.disabled = True logger.disabled = True
j = Job(jid) j = Job(jid)
self._config_files_exist.append(j._config_path.exists()) self._config_files_exist.append(j._config_path.exists())
self._py_scripts_exist.append(j._python_script_path.exists()) self._py_scripts_exist.append(j._python_script_path.exists())
self._logs.append(j.logs)
self._timing.append(j.timing)
self._cron_status.append(j.cron_status) self._cron_status.append(j.cron_status)
self._cron_timing.append(j.cron_timing)
logger.disabled = False logger.disabled = False
...@@ -48,8 +56,11 @@ class Jobs(): ...@@ -48,8 +56,11 @@ class Jobs():
return repr(pd.DataFrame(OrderedDict([("job_id", list(self._jid_set)), return repr(pd.DataFrame(OrderedDict([("job_id", list(self._jid_set)),
("config_file", self._config_files_exist), ("config_file", self._config_files_exist),
("python_script", self._py_scripts_exist), ("python_script", self._py_scripts_exist),
("logging", self._logs),
("timing", self._timing),
("cron_status", self._cron_status), ("cron_status", self._cron_status),
("cron_timing", self._cron_timing),
]))) ])))
###### >>> job.render().split(' ')[0:4] ###### >>> job.render().split(' ')[0:4]
...@@ -62,7 +73,7 @@ class Jobs(): ...@@ -62,7 +73,7 @@ class Jobs():
def list(self): def list(self):
return [t.stem[4:] for t in self._jobs_all] return [t.stem[4:] for t in self._jobs_all]
def erase(self, def remove(self,
jid: str = None): jid: str = None):
if jid in self.list: if jid in self.list:
logger.disabled = True logger.disabled = True
...@@ -98,6 +109,7 @@ class Job(): ...@@ -98,6 +109,7 @@ class Job():
self.init() self.init()
#~ logger.info("\n{}".format(self.tasks)) #~ logger.info("\n{}".format(self.tasks))
self._cron = CronTab(user=True) self._cron = CronTab(user=True)
self.cron_status = self.get_cron_status()
def __repr__(self): def __repr__(self):
return repr(self.tasks) return repr(self.tasks)
...@@ -113,7 +125,7 @@ class Job(): ...@@ -113,7 +125,7 @@ class Job():
("l2a", [False]), ("l2a", [False]),
("cloudmask", [False]), ("cloudmask", [False]),
("indices", [False]), ("indices", [False]),
("logs", [False]), #~ ("logs", [False]),
]) ])
self.tasks = pd.DataFrame(first_row) self.tasks = pd.DataFrame(first_row)
...@@ -129,7 +141,7 @@ class Job(): ...@@ -129,7 +141,7 @@ class Job():
"l2a": [False], "l2a": [False],
"cloudmask": [False], "cloudmask": [False],
"indices": [False], "indices": [False],
"logs": [False], #~ "logs": [False],
}) })
#~ self.tasks = self.tasks.append(row, ignore_index=True) #~ self.tasks = self.tasks.append(row, ignore_index=True)
self.tasks = pd.concat([self.tasks, row], ignore_index = True)[self.tasks.columns] self.tasks = pd.concat([self.tasks, row], ignore_index = True)[self.tasks.columns]
...@@ -169,8 +181,8 @@ class Job(): ...@@ -169,8 +181,8 @@ class Job():
# save task to disk # save task to disk
with open(str(self._config_path), 'w') as ict: with open(str(self._config_path), 'w') as ict:
header = '\n'.join( header = '\n'.join(
['logs = False', ['logs = ' + str(self.logs),
'timing = * * * * *', 'timing = ' + self.timing,
'', '',
] ]
) )
...@@ -178,30 +190,22 @@ class Job(): ...@@ -178,30 +190,22 @@ class Job():
ict.write(line) ict.write(line)
#~ self.tasks.to_csv(ict) #~ self.tasks.to_csv(ict)
self.tasks.to_csv(ict, index = False, sep=';') self.tasks.to_csv(ict, index = False, sep=';')
@property def get_cron_status(self):
def cron_status(self):
# display cron status: disabled | enabled (frenquency)
iter = list(self._cron.find_comment("sen2chain_job_" + self.jid)) iter = list(self._cron.find_comment("sen2chain_job_" + self.jid))
if iter: if iter:
for job in iter: for job in iter:
if job.is_enabled(): if job.is_enabled():
logger.info("Job in cron - Enabled: {}".format(job))
status = "enabled" status = "enabled"
else: else:
logger.info("Job in cron - Disabled: {}".format(job))
status = "disabled" status = "disabled"
self.cron_job = job
self.cron_timing = str(self.cron_job.slices)
else: else:
logger.info("Job not installed in cron")
status = "absent" status = "absent"
self.cron_job = None
self.cron_timing = None
return status return status
#~ if iter:
#~ return True
#~ else:
#~ return False
#~ for entry in cron:
#~ print(entry)
def create_python_script(self): def create_python_script(self):
lines = ["# -*- coding:utf-8 -*-\n"] lines = ["# -*- coding:utf-8 -*-\n"]
...@@ -216,7 +220,6 @@ class Job(): ...@@ -216,7 +220,6 @@ class Job():
f.writelines(lines) f.writelines(lines)
def cron_enable(self, def cron_enable(self,
timing: str = None,
): ):
# enable job in cron # enable job in cron
self.save() self.save()
...@@ -225,13 +228,13 @@ class Job(): ...@@ -225,13 +228,13 @@ class Job():
if iter: if iter:
for job in iter: for job in iter:
logger.info("Enabling job") logger.info("Enabling job")
if timing: if self.timing:
job.setall(timing) job.setall(self.timing)
job.enable() job.enable()
else: else:
job = self._cron.new(command="/usr/bin/python3 " + str(self._python_script_path), comment="sen2chain_job_" + self.jid) job = self._cron.new(command="/usr/bin/python3 " + str(self._python_script_path), comment="sen2chain_job_" + self.jid)
if timing: if self.timing:
job.setall(timing) job.setall(self.timing)
else: else:
job.setall("0 0 * * *") job.setall("0 0 * * *")
job.enable() job.enable()
...@@ -240,7 +243,8 @@ class Job(): ...@@ -240,7 +243,8 @@ class Job():
#~ logger.info("Time: {}".format(job.time)) #~ logger.info("Time: {}".format(job.time))
self._cron.write() self._cron.write()
#~ new.enable(False) #~ new.enable(False)
self.cron_status self.get_cron_status()
def cron_disable(self): def cron_disable(self):
# disable / commenting job in cron # disable / commenting job in cron
...@@ -250,7 +254,7 @@ class Job(): ...@@ -250,7 +254,7 @@ class Job():
logger.info("Disabling job...") logger.info("Disabling job...")
job.enable(False) job.enable(False)
self._cron.write() self._cron.write()
self.cron_status self.get_cron_status()
def cron_remove(self): def cron_remove(self):
# remove job from cron # remove job from cron
...@@ -260,14 +264,20 @@ class Job(): ...@@ -260,14 +264,20 @@ class Job():
logger.info("Removing job from cron...") logger.info("Removing job from cron...")
self._cron.remove(job) self._cron.remove(job)
self._cron.write() self._cron.write()
self.cron_status self.get_cron_status()
def read(self, def read(self,
path): path):
parser = ConfigParser(allow_no_value=True)
with open(str(path)) as stream:
parser.read_string("[top]\n" + stream.read()) # This line does the trick.
self.logs = parser['top']['logs']
self.timing = parser['top']['timing']
self.tasks = pd.read_csv(path, sep = ';', na_values="", na_filter=False, comment='#', dtype = str, header = 2) self.tasks = pd.read_csv(path, sep = ';', na_values="", na_filter=False, comment='#', dtype = str, header = 2)
if "logs" not in self.tasks: #~ if "logs" not in self.tasks:
self.tasks["logs"] = False #~ self.tasks["logs"] = False
for index, row in self.tasks.iterrows(): for index, row in self.tasks.iterrows():
if not row.date_min: if not row.date_min:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment