diff --git a/sen2chain/__init__.py b/sen2chain/__init__.py index 83cd50323fe9fb2ce950a214f32d892e9bce4028..1651d4b478d76f59aa8505c7b57ed7be14c84e35 100644 --- a/sen2chain/__init__.py +++ b/sen2chain/__init__.py @@ -32,6 +32,7 @@ from .library import Library from .data_request import DataRequest from .indices import IndicesCollection from .download_and_process import DownloadAndProcess +from .download_eodag import S2cEodag from .time_series import TimeSeries from .automatization import Automatization from .utils import ( @@ -56,6 +57,7 @@ from .multi_processing import ( from .tileset import TileSet from .jobs import Jobs, Job + __version__ = "0.7.0" __author__ = ( "Jérémy Commins <jebins@laposte.net> & Impact <pascal.mouquet@ird.fr>" diff --git a/sen2chain/config.py b/sen2chain/config.py index d44d7c6feeb42420d9c13efe2ae0f03ea98c544c..27b47680ad293f22c8b11136a5f3318c99822987 100644 --- a/sen2chain/config.py +++ b/sen2chain/config.py @@ -22,6 +22,7 @@ SHARED_DATA = dict( peps_download=ROOT / "sen2chain" / "peps_download3.py", sen2chain_meta=ROOT / "sen2chain" / "data" / "sen2chain_info.xml", raw_job_cfg=ROOT / "sen2chain" / "data" / "job_ini.cfg", + eodag_centroids_shp=ROOT / "sen2chain" / "data" / "eodag_workspace_locations_tiles" / "sentinel2_tiling_grid_centroids.shp" ) diff --git a/sen2chain/data/eodag_workspace_locations_tiles/custom_locations.yml b/sen2chain/data/eodag_workspace_locations_tiles/custom_locations.yml deleted file mode 100644 index 32a529959bcf8e00afc1b85fc7aad99e798587d1..0000000000000000000000000000000000000000 --- a/sen2chain/data/eodag_workspace_locations_tiles/custom_locations.yml +++ /dev/null @@ -1,4 +0,0 @@ -shapefiles: - - name: s2_tile_centroid - path: /home/operateur/sen2chain/sen2chain/data/sentinel2_tiling_grid_centroids.shp - attr: tile_id \ No newline at end of file diff --git a/sen2chain/download_eodag.py b/sen2chain/download_eodag.py index b81b490970f9f7ed63b7ce37f9650bc2abe4466b..63cbf67c2cab85155f071fc322eef69ee58fdbae 100644 --- a/sen2chain/download_eodag.py +++ b/sen2chain/download_eodag.py @@ -6,6 +6,322 @@ Module for downloading Sentinel-2 images using EODAG https://www.github.com/CS-SI/EODAG """ +import logging +import shapefile +import os +from pathlib import Path +from eodag import EODataAccessGateway +from eodag import setup_logging +from queue import Queue +from eodag.utils.logging import get_logging_verbose +from threading import Thread +import multiprocessing + +from .config import SHARED_DATA, Config +from .utils import get_tile + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +ROOT = Path(os.path.realpath(__file__)).parent.parent + +class S2cEodag: + + def __init__( + self, + name: str = None, + provider: str = "peps" + ): + + self.name = name + self.provider = provider + self.products = None + eodag_centroids_shp = SHARED_DATA.get("eodag_centroids_shp") + + with shapefile.Reader(eodag_centroids_shp) as shp: + shaperecs = shp.shapeRecords() + + locations_yaml_content = """ + shapefiles: + - name: s2_tile_centroid + path: {} + attr: tile_id + """.format(os.path.abspath(eodag_centroids_shp)) + + eodag_custom_location_cfg = os.path.abspath(Config()._CONFIG_DIR / "eodag_custom_locations.yml") + + with open(eodag_custom_location_cfg, "w") as f_yml: + f_yml.write(locations_yaml_content.strip()) + logging.disable(level=logging.WARNING) + self.dag = EODataAccessGateway(locations_conf_path = eodag_custom_location_cfg) + logging.disable(logging.NOTSET) + + self.dag.set_preferred_provider(self.provider) + # logger.info(self.dag.available_providers("S2_MSI_L1C")) + + targeted_tile = [ + sr + for sr in shaperecs + if sr.record["tile_id"] == name + ][0] + + + def search( + self, + productType: str = "L1C", + start: str = "2015-01-01", + end: str = "9999-12-31", + ref: str = "l1c", + min_cloudcover: int = 0, + max_cloudcover: int = 100, + ): + ######### ici faudrait virer le self.productType, qui ne doit pas être global pour le download... + if productType == "L1C": + self.productType = "S2_MSI_L1C" + outputs_prefix = Path(Config().get("l1c_path")) / self.name + elif productType == "L2A": + self.productType = "S2_MSI_L2A" + outputs_prefix = Path(Config().get("l2a_path")) / self.name + + default_search_criteria = dict( + productType = self.productType, + start = start, + end = end, + # items_per_page=500, + ) + + logging.disable(level=logging.WARNING) + self.products = self.dag.search_all( + locations = dict(s2_tile_centroid = self.name), + **default_search_criteria + ) + logging.disable(logging.NOTSET) + + fitered = self.products[:] + for p in fitered: + if self.name not in p.properties["title"]: + self.products.remove(p) + logger.info("{} - wrong Tile - filtering".format(p.properties["title"])) + else: + if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover): + self.products.remove(p) + logger.info("{} - wrong cloud cover ({}%) - filtering".format(p.properties["title"], int(p.properties["cloudCover"]))) + else: + if ref == "l1c": + if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): + # p.location = "file://" + str(outputs_prefix / (p.properties["title"] + ".SAFE")) + logger.info("{} - remote {} - local l1c PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + self.products.remove(p) + else: + logger.info("{} - remote {} - local l1c ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + elif ref == "l2a": + if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): + logger.info("{} - remote {} - local l2a PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + self.products.remove(p) + else: + logger.info("{} - remote {} - local l2a ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + + + + # clouds = self.products[:] + # for p in clouds: + # if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover): + # self.products.remove(p) + # logger.info("{} - wrong cloud cover ({}%) - filtering".format(p.properties["title"], int(p.properties["cloudCover"]))) + # local = self.products[:] + # for p in local: + # if ref == "l1c": + # if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists(): + # logger.info("{} - remote {} - local l1c PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + # self.products.remove(p) + # else: + # logger.info("{} - remote {} - local l1c ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + # elif ref == "l2a": + # if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists(): + # logger.info("{} - remote {} - local l2a PRESENT - filtering".format(p.properties["title"], p.properties["storageStatus"])) + # self.products.remove(p) + # else: + # logger.info("{} - remote {} - local l2a ABSENT".format(p.properties["title"], p.properties["storageStatus"])) + logger.info("Search returned {} new product(s) to download".format(len(self.products))) ####### rajouter ici "- dont xx ABSENT - dont xx ONLINE / xx STAGING" + + def download( + self, + product_id, + outputs_prefix: str = None, + extract: bool = True, + delete_archive: bool = True, + ): + if not outputs_prefix: + if "L1C" in product_id.properties['title']: + root_path = "l1c_path" + elif "L2A" in product_id.properties['title']: + root_path = "l2a_path" + outputs_prefix = str(Path(Config().get(root_path)) / self.name) + + setup_logging(verbose = 2) + + if product_id.properties["storageStatus"] == "ONLINE": + try: + # logging.disable(level=logging.WARNING) + downloaded_path = self.dag.download( + product_id, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + wait=1, + timeout=0, + ) + # logging.disable(logging.NOTSET) + + if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: + upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name + Path(downloaded_path).replace(upper_location) + product_id.location = "file://" + str(upper_location) + Path(downloaded_path).parent.rmdir() + logger.info("Moving up SAVE file") + except: + logger.info("Error: ONLINE product but cannot be downloaded, retry later") + + elif product_id.properties["storageStatus"] == "OFFLINE": + try: + setup_logging(verbose = 0) + downloaded_path = self.dag.download( + product_id, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + wait=1, + timeout=0, + ) + except: + logger.info("remote OFFLINE, ordered") + + elif product_id.properties["storageStatus"] == "STAGING": + logger.info("remote STAGING, retry later") + + def order_offline( + self, + product_id, + ): + setup_logging(verbose = 0) + if product_id.properties["storageStatus"] == "OFFLINE": + try: + downloaded_path = self.dag.download( + product_id, + wait=1, + timeout=0, + ) + except: + logger.info("{} - remote OFFLINE, ordered".format(product_id.properties["title"])) + setup_logging(verbose = 2) + +def S2cEodag_download_uniq( + product, + outputs_prefix: str = None, + extract: bool = True, + delete_archive: bool = True, +): + tile_name = get_tile(product.properties['title']) + if not outputs_prefix: + if "L1C" in product.properties['title']: + root_path = "l1c_path" + elif "L2A" in product.properties['title']: + root_path = "l2a_path" + outputs_prefix = str(Path(Config().get(root_path)) / tile_name) + + setup_logging(verbose = 2) + + logging.disable(level=logging.WARNING) + dag = EODataAccessGateway() + logging.disable(logging.NOTSET) + + if product.properties["storageStatus"] == "ONLINE": + try: + # logging.disable(level=logging.WARNING) + downloaded_path = dag.download( + product, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + wait=1, + timeout=0, + ) + # logging.disable(logging.NOTSET) + + if Path(downloaded_path).stem == Path(downloaded_path).parent.stem: + upper_location = Path(downloaded_path).parent.parent / Path(downloaded_path).name + Path(downloaded_path).replace(upper_location) + product.location = "file://" + str(upper_location) + Path(downloaded_path).parent.rmdir() + logger.info("Moving up SAVE file") + except: + logger.info("Error: ONLINE product but cannot be downloaded, retry later") + + elif product.properties["storageStatus"] == "OFFLINE": + try: + setup_logging(verbose = 0) + downloaded_path = dag.download( + product, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + wait=1, + timeout=0, + ) + except: + logger.info("OFFLINE product, ordered, retry later") + + elif product.properties["storageStatus"] == "STAGING": + logger.info("STAGING product, retry later") + + +def S2cEodag_download( + download_list: list = None, + dl_mode: str = "multit", + outputs_prefix: str = None, + extract: bool = True, + delete_archive: bool = True, +): + + ## sequential + if dl_mode == "seq": + for product in download_list: + S2cEodag_download_uniq( + product, + outputs_prefix = outputs_prefix, + extract = extract, + delete_archive = delete_archive, + ) + + ## multithreading + elif dl_mode == "multit": + NUM_THREADS = 8 + q = Queue() + def do_stuff(q): + while True: + S2cEodag_download_uniq(q.get()) + q.task_done() + for product in download_list: + q.put(product) + for t in range(NUM_THREADS): + worker = Thread(target = do_stuff, args = (q,)) + worker.daemon = True + worker.start() + q.join() + + ## multiprocessing + elif dl_mode == "multip": + nb_proc = 8 + nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) + pool = multiprocessing.Pool(nb_proc) + results = [pool.map(S2cEodag_download_uniq, download_list)] + pool.close() + pool.join() + + + + + diff --git a/sen2chain/jobs.py b/sen2chain/jobs.py index c8fa7dde18dad78963f26355df1080379e4b4363..662ffbe0079ebec89efde2456df8b12beff4dfdd 100644 --- a/sen2chain/jobs.py +++ b/sen2chain/jobs.py @@ -10,7 +10,7 @@ import pandas as pd import datetime from itertools import chain import re -import distutils +import setuptools from crontab import CronTab from collections import OrderedDict from configparser import ConfigParser @@ -20,6 +20,15 @@ from .config import SHARED_DATA, Config from .indices import IndicesCollection from .library import Library from .tiles import Tile +from .utils import datetime_to_str +from .multi_processing import ( + l2a_multiprocessing, + idx_multiprocessing, +) +from .download_eodag import ( + S2cEodag_download, + S2cEodag_download_uniq, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -52,21 +61,25 @@ class Jobs: logger.disabled = False def __repr__(self): - return repr( - pd.DataFrame( - OrderedDict( - [ - ("job_id", list(self._jid_set)), - ("config_file", self._config_files_exist), - ("python_script", self._py_scripts_exist), - ("logging", self._logs), - ("timing", self._timing), - ("cron_status", self._cron_status), - ("cron_timing", self._cron_timing), - ] + if self._jobs_all: + return repr( + pd.DataFrame( + OrderedDict( + [ + ("job_id", list(self._jid_set)), + ("config_file", self._config_files_exist), + ("python_script", self._py_scripts_exist), + ("logging", self._logs), + ("timing", self._timing), + ("cron_status", self._cron_status), + ("cron_timing", self._cron_timing), + ] + ) ) ) - ) + else: + logger.info("No Jobs here") + return repr(None) # >>> job.render().split(' ')[0:4] @@ -120,8 +133,8 @@ class Job: first_row = OrderedDict( [ ("tile", ["40KCB"]), - ("date_min", ["2022-02-04"]), - ("date_max", ["2022-02-04"]), + ("date_min", [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')]), + ("date_max", [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')]), ("max_clouds", [100]), ("l1c", [False]), ("l2a", [False]), @@ -138,8 +151,8 @@ class Job: row = pd.DataFrame( { "tile": ["40KCB"], - "date_min": ["2022-02-04"], - "date_max": ["2022-02-04"], + "date_min": [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')], + "date_max": [datetime_to_str(datetime.datetime.now(), date_format = 'ymd')], "max_clouds": [100], "l1c": [False], "l2a": [False], @@ -290,7 +303,7 @@ class Job: parser.read_string( "[top]\n" + stream.read() ) # This line does the trick. - self.logs = bool(distutils.util.strtobool(parser["top"]["logs"])) + self.logs = bool(setuptools.distutils.util.strtobool(parser["top"]["logs"])) self.timing = parser["top"]["timing"] self.tasks = pd.read_csv( @@ -385,12 +398,8 @@ class Job: f.flush() if not self.tasks.empty: - - # Telechargement - # todo - # keep list of downloaded l1c products - - # Nettoyage + + ## Cleaning before if clean_before: logger.info("Cleaning Tiles") clean_list = [] @@ -401,12 +410,56 @@ class Job: clean_list, remove=True, remove_indice_tif=True ) # lib.clean(clean_list, remove=False) + + ## L1C download - each tile sequential + # download_list= [] + # logger.info("Downloading l1c seq") + # for index, row in self.tasks.iterrows(): + # if bool(setuptools.distutils.util.strtobool(str(row.l1c))): + # t = Tile(row.tile) + # logger.info("Tile: {}".format(t.name)) + # downloaded = t.get_l1c( + # provider = "peps", + # download = True, + # dl_mode = "multit", + # start = row.date_min, + # end = row.date_max, + # new = False, + # ) + # logger.info("Downloaded list: {}".format(downloaded)) + # download_list.extend(downloaded) + # logger.info("download_list: {}".format(download_list)) + + ## L1C download - all tiles multi + download_list= [] + logger.info("Searching l1c products") + for index, row in self.tasks.iterrows(): + if bool(setuptools.distutils.util.strtobool(str(row.l1c))): + t = Tile(row.tile) + logger.info("Tile: {}".format(t.name)) + tile_download_list = t.get_l1c( + provider = "peps", + download = False, + start = row.date_min, + end = row.date_max, + new = False, + ) + download_list.extend(tile_download_list) + + ### download + before_list = [p.location for p in download_list] + # S2cEodag_download(download_list, "seq") + S2cEodag_download(download_list, "multit") + # S2cEodag_download(download_list, "multip") + after_list = [p.location for p in download_list] + downloaded_products = [Path(after_list[i]).stem for i in range(len(before_list)) if before_list[i] != after_list[i]] + logger.info("Downloaded product(s): {}".format(downloaded_products)) # Traitement des L1C en L2A logger.info("Computing l2a") l1c_process_list = [] for index, row in self.tasks.iterrows(): - if not row.l2a == False: + if bool(setuptools.distutils.util.strtobool(str(row.l2a))): t = Tile(row.tile) l1c_to_process = list( p.identifier @@ -439,10 +492,13 @@ class Job: + "\n".join(l1c_process_list) + "\n" ) - - # Remove L1C - # todo - # remove list of downloaded l1c products + + # Remove downloaded L1C + for index, row in self.tasks.iterrows(): + if "l1c" in str(row.remove).lower(): + t = Tile(row.tile) + t.remove_l1c([p for p in downloaded_products if row.tile in p]) + # Traitement des L2A (clouds) logger.info("Computing cloudmasks") @@ -557,6 +613,20 @@ class Job: # Remove L2A # todo + + + + # Cleaning after + if clean_after: + logger.info("Cleaning Tiles") + clean_list = [] + for index, row in self.tasks.iterrows(): + clean_list.append(row.tile) + lib = Library() + pb_before = lib.clean( + clean_list, remove=True, remove_indice_tif=True + ) + # lib.clean(clean_list, remove=False) else: logger.info("No task defined for this job, doing nothing") diff --git a/sen2chain/tiles.py b/sen2chain/tiles.py index 12f286c34702874867f76d50c616e8346875e266..c67a057c3b157edccf8f0f60f68b03e0648e4890 100644 --- a/sen2chain/tiles.py +++ b/sen2chain/tiles.py @@ -10,13 +10,16 @@ import re import fiona import shutil from PIL import Image -import distutils +import setuptools import os from itertools import chain from pathlib import Path from collections import namedtuple -from datetime import datetime +from datetime import datetime, timedelta from pprint import pformat +import multiprocessing +from queue import Queue +from threading import Thread # type annotations from typing import List, Dict, Iterable @@ -36,6 +39,8 @@ from .multi_processing import ( cld_version_probability_iterations_reprocessing_multiprocessing, idx_multiprocessing, ) +from .download_eodag import S2cEodag +from .utils import datetime_to_str logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -715,6 +720,100 @@ class Tile: "cloud_cover": self._products["l2a"][prod].cloud_cover, } return prodlist + + def get_l1c( + self, + provider: str = "peps", + download: bool = True, + dl_mode: str = "multit", + start: str = "2015-01-01", + end: str = "9999-12-31", + new: bool = False, + ref: str = "l1c", + min_cloudcover: int = 0, + max_cloudcover: int = 100, + order_offline: bool = False, + ): + dag = S2cEodag(self.name, provider = provider) + if new: + if self.l1c: + start = datetime_to_str(self.l1c.last.date + timedelta(days=1), date_format = 'ymd') + dag.search(start = start, end = end, ref = ref, min_cloudcover = min_cloudcover, max_cloudcover = max_cloudcover) + + ############## virer ici de la liste des téléchargements les produits qui ont déjà des l2a ou indices (pour ne pas retélécharger des produits l1c à chaque fois) + # if ref == "l2a": + # new_list = [p for p in dag.products if p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE" not in [l2a.identifier for l2a in self.l2a]] + # logger.info("new list: {}".format(new_list)) + + # local = dag.products[:] + # l2a_identifiers = [Path(prod.identifier).stem for prod in self.l2a] + # logger.info("l2a_identifiers: {}".format(l2a_identifiers)) + + # for p in local: + # l2a_corresp = p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + # logger.info("l2a_corresp: {}".format(l2a_corresp)) + # if l2a_corresp in l2a_identifiers: + # logger.info("found l2a {} removing l1c {} from download list".format(l2a_corresp, p.properties["title"])) + # dag.products.remove(p) + + # elif ref == "ndvi": + # toto = 13 + ############# + + if download: + before_list = [p.location for p in dag.products] + + ## sequential + if dl_mode == "seq": + for p in dag.products: + dag.download(p) + + ## multithreading + elif dl_mode == "multit": + NUM_THREADS = 8 + q = Queue() + def do_stuff(q, dag): + while True: + dag.download(q.get()) + q.task_done() + for p in dag.products: + q.put(p) + for t in range(NUM_THREADS): + worker = Thread(target=do_stuff, args = (q, dag)) + worker.daemon = True + worker.start() + q.join() + + ## multiprocessing + elif dl_mode == "multip": + nb_proc = 8 + nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1) + pool = multiprocessing.Pool(nb_proc) + results = [pool.map(dag.download, dag.products)] + pool.close() + pool.join() + + after_list = [p.location for p in dag.products] + + return [Path(after_list[i]).stem for i in range(len(before_list)) if before_list[i] != after_list[i]] + elif order_offline: + # for p in dag.products: + # if p.properties["storageStatus"] == "OFFLINE": + # dag.order_offline(p) + NUM_THREADS = 8 + q = Queue() + def do_stuff(q, dag): + while True: + dag.order_offline(q.get()) + q.task_done() + for p in dag.products: + q.put(p) + for t in range(NUM_THREADS): + worker = Thread(target=do_stuff, args = (q, dag)) + worker.daemon = True + worker.start() + q.join() + return dag.products def compute_l2a( self, @@ -1199,10 +1298,10 @@ class Tile: logger.info("archiving {}".format(l1c.identifier)) move_path.parent.mkdir(exist_ok=True) # shutil.move(str(l1c.path), str(move_path.parent)) - distutils.dir_util.copy_tree( + setuptools.distutils.dir_util.copy_tree( str(l1c.path), str(move_path) ) - distutils.dir_util.remove_tree(str(l1c.path)) + setuptools.distutils.dir_util.remove_tree(str(l1c.path)) l1c.path.symlink_to( move_path, target_is_directory=True ) @@ -1448,11 +1547,12 @@ class Tile: def remove_l1c( self, product_list: list = [], + entire: bool = False, ): """ Remove l1c files """ - if not product_list: + if entire: product_list = [product.identifier for product in self.l1c] for identifier in product_list: l1c = L1cProduct(identifier) @@ -1463,11 +1563,12 @@ class Tile: def remove_l2a( self, product_list: list = [], + entire: bool = False, ): """ Remove l2a files """ - if not product_list: + if entire: product_list = [product.identifier for product in self.l2a] for identifier in product_list: l2a = L2aProduct(identifier) diff --git a/sen2chain/utils.py b/sen2chain/utils.py index f54d141f0878968241779dd395e8a02448aa4e1a..12d0f6ca785b0f5421cf312927c4cbfccb2be2b6 100644 --- a/sen2chain/utils.py +++ b/sen2chain/utils.py @@ -144,3 +144,11 @@ def get_current_Sen2Cor_version(): ), None, ) + + +def get_tile(identifier) -> str: + """Returns tile name from a string. + + :param string: string from which to extract the tile name. + """ + return re.findall("_T([0-9]{2}[A-Z]{3})_", identifier)[0]