diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index c8fa7dde18dad78963f26355df1080379e4b4363..020203e33eaadb997258033d55d0bfbf0d786404 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -10,7 +10,7 @@ import pandas as pd import datetime from itertools import chain import re -import distutils +import setuptools from crontab import CronTab from collections import OrderedDict from configparser import ConfigParser @@ -20,6 +20,10 @@ from .config import SHARED_DATA, Config from .indices import IndicesCollection from .library import Library from .tiles import Tile +from .utils import datetime_to_str +from .multi_processing import ( + l2a_multiprocessing, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -52,21 +56,25 @@ class Jobs: logger.disabled = False def __repr__(self): - return repr( - pd.DataFrame( - OrderedDict( - [ - ("job_id", list(self._jid_set)), - ("config_file", self._config_files_exist), - ("python_script", self._py_scripts_exist), - ("logging", self._logs), - ("timing", self._timing), - ("cron_status", self._cron_status), - ("cron_timing", self._cron_timing), - ] + if self._jobs_all: + return repr( + pd.DataFrame( + OrderedDict( + [ + ("job_id", list(self._jid_set)), + ("config_file", self._config_files_exist), + ("python_script", self._py_scripts_exist), + ("logging", self._logs), + ("timing", self._timing), + ("cron_status", self._cron_status), + ("cron_timing", self._cron_timing), + ] + ) ) ) - ) + else: + logger.info("No Jobs here") + return repr(None) # >>> job.render().split(' ')[0:4] @@ -120,8 +128,8 @@ class Job: first_row = OrderedDict( [ ("tile", ["40KCB"]), - ("date_min", ["2022-02-04"]), - ("date_max", ["2022-02-04"]), + ("date_min", [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')]), + ("date_max", [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')]), ("max_clouds", [100]), ("l1c", [False]), ("l2a", [False]), @@ -138,8 +146,8 @@ class Job: row = pd.DataFrame( { "tile": ["40KCB"], - "date_min": ["2022-02-04"], - "date_max": ["2022-02-04"], + "date_min": [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')], + "date_max": [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')], "max_clouds": [100], "l1c": [False], "l2a": [False], @@ -290,7 +298,7 @@ class Job: parser.read_string( "[top]\n" + stream.read() ) # This line does the trick. - self.logs = bool(distutils.util.strtobool(parser["top"]["logs"])) + self.logs = bool(setuptools.distutils.util.strtobool(parser["top"]["logs"])) self.timing = parser["top"]["timing"] self.tasks = pd.read_csv( @@ -385,11 +393,7 @@ class Job: f.flush() if not self.tasks.empty: - - # Telechargement - # todo - # keep list of downloaded l1c products - + # Nettoyage if clean_before: logger.info("Cleaning Tiles") @@ -401,12 +405,32 @@ class Job: clean_list, remove=True, remove_indice_tif=True ) # lib.clean(clean_list, remove=False) + + # Telechargement des L1C + logger.info("Downloading l1c") + for index, row in self.tasks.iterrows(): + if bool(setuptools.distutils.util.strtobool(str(row.l1c))): + t = Tile(row.tile) + logger.info("Tile: {}".format(t.name)) + downloaded = t.get_l1c( + provider = "peps", + download = True, + dl_mode = "multit", + start = row.date_min, + end = row.date_max, + new = False, + ) + logger.info("Downloaded list: {}".format(downloaded)) + + # todo + # keep list of downloaded l1c products + # Traitement des L1C en L2A logger.info("Computing l2a") l1c_process_list = [] for index, row in self.tasks.iterrows(): - if not row.l2a == False: + if bool(setuptools.distutils.util.strtobool(str(row.l2a))): t = Tile(row.tile) l1c_to_process = list( p.identifier @@ -557,6 +581,19 @@ class Job: # Remove L2A # todo + + + # Nettoyage + if clean_after: + logger.info("Cleaning Tiles") + clean_list = [] + for index, row in self.tasks.iterrows(): + clean_list.append(row.tile) + lib = Library() + pb_before = lib.clean( + clean_list, remove=True, remove_indice_tif=True + ) + # lib.clean(clean_list, remove=False) else: logger.info("No task defined for this job, doing nothing") diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index 8cf3b2f4575521c1ccf1643eaeeaf38187900635..89db6b4236da8599050dd0460f8147defd17231e 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -10,7 +10,7 @@ import re import fiona import shutil from PIL import Image -import distutils +import setuptools import os from itertools import chain from pathlib import Path @@ -736,8 +736,7 @@ class Tile: start = datetime_to_str(self.l1c.last.date + timedelta(days=1), date_format = 'ymd') dag.search(start = start, end = end) - ##### to do - ## choix entre sequential / multithreading / multiprocessing default multithreading + before_list = [p.location for p in dag.products] if download: @@ -771,10 +770,9 @@ class Tile: pool.close() pool.join() - - # def get_l1c_new(): - # toto = 12 + after_list = [p.location for p in dag.products] + return [Path(after_list[i]).stem for i in range(len(before_list)) if before_list[i] != after_list[i]] def compute_l2a( @@ -1260,10 +1258,10 @@ class Tile: logger.info("archiving {}".format(l1c.identifier)) move_path.parent.mkdir(exist_ok=True) # shutil.move(str(l1c.path), str(move_path.parent)) - distutils.dir_util.copy_tree( + setuptools.distutils.dir_util.copy_tree( str(l1c.path), str(move_path) ) - distutils.dir_util.remove_tree(str(l1c.path)) + setuptools.distutils.dir_util.remove_tree(str(l1c.path)) l1c.path.symlink_to( move_path, target_is_directory=True )