From 734b0d4ed152e476e7571ba03c70a92fc7d0411c Mon Sep 17 00:00:00 2001 From: Jeremy Commins <jebins@openaliasbox.org> Date: Fri, 5 Oct 2018 13:39:26 +0400 Subject: [PATCH] Removed dev folder. --- dev/dev_automatization.py | 13 - dev/dev_download_and_process.py | 490 -------------------------------- dev/dev_products_manager.py | 53 ---- dev/dev_timeseries.py | 29 -- dev/dev_xmlparser.py | 63 ---- 5 files changed, 648 deletions(-) delete mode 100644 dev/dev_automatization.py delete mode 100644 dev/dev_download_and_process.py delete mode 100644 dev/dev_products_manager.py delete mode 100644 dev/dev_timeseries.py delete mode 100644 dev/dev_xmlparser.py diff --git a/dev/dev_automatization.py b/dev/dev_automatization.py deleted file mode 100644 index 23cf181..0000000 --- a/dev/dev_automatization.py +++ /dev/null @@ -1,13 +0,0 @@ -# -*- coding: utf-8 -*- -import pandas as pd - -from sen2chain import Automatization -from sen2chain.automatization import TimeSeriesAutomatization - - -#tsauto = TimeSeriesAutomatization() -#tsauto.run() - -auto = Automatization() -print(auto.data) -auto.run() diff --git a/dev/dev_download_and_process.py b/dev/dev_download_and_process.py deleted file mode 100644 index ea9d160..0000000 --- a/dev/dev_download_and_process.py +++ /dev/null @@ -1,490 +0,0 @@ -# -*- coding: utf-8 -*- - -""" -Module for downloading and processing Sentinel-2 images in parallel. -""" - -import logging -import asyncio -import random -import time -import functools -import subprocess -import shutil -from pathlib import Path -from datetime import datetime -from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed -from collections import OrderedDict, defaultdict -from sentinelsat import SentinelAPI -from sentinelhub import AwsProductRequest -from pprint import pprint -# type annotations -from typing import List, Set, Dict, Tuple, Optional - -from sen2chain import Config, DataRequest -from sen2chain.config import SHARED_DATA -from sen2chain.library import TempContainer -from sen2chain.products import L1cProduct, L2aProduct - -peps_download_script = SHARED_DATA.get("peps_download") -peps_download_config = Path(Config().get("peps_config_path")) - - -logger = logging.getLogger(__name__) -logging.basicConfig(level=logging.DEBUG) - - -def download_aws(identifier: str, tile: str, hub: str) -> Tuple[str, bool]: - """Downloads L1C safe from AWS using sentinelhub package. - - :param identifier: - :param tile: - :param hub: - """ - logger.debug("download_aws {}".format(identifier)) - temp_container = TempContainer(identifier, tile) - temp_container.create_temp_folder() - downloaded = True - - if not temp_container.l1c.path.exists() and not temp_container.l1c.in_library: - product_request = AwsProductRequest(product_id=identifier, - tile_list=["T"+tile], - data_folder=str(temp_container.temp_path), - safe_format=True) - product_request.save_data() - archive_l1c(identifier, tile, hub) - - if not temp_container.l1c.path.exists(): - downloaded = False - - return (identifier, hub) - - -def download_peps(identifier: str, tile: str, hub: str) -> Tuple[str, str, str, bool]: - """Downloads L1C safe zip file from PEPS using peps_downloader.py. - - :param identifier: - :param tile: - :param hub: - """ - logger.debug("download_peps {}".format(identifier)) - - temp_container = TempContainer(identifier, tile) - temp_container.create_temp_folder() - downloaded = True - - if not temp_container.l1c.path.exists() and not temp_container.l1c.in_library: - try: - command = "python3 {peps_download_script} -a {peps_download_config} -c S2ST -i {identifier}".format( - peps_download_script=peps_download_script, - peps_download_config=str(peps_download_config), - identifier=identifier) - - process = subprocess.run(command.split(), cwd=str(temp_container.temp_path), check=True, stderr=subprocess.PIPE) - except subprocess.CalledProcessError as e: - logger.error("download_peps - peps_download FAILED: {}".format(e)) - logger.warning("File does not exist on PEPS yet: {}".format(identifier)) - - try: - temp_container.unzip_l1c() - #archive_l1c(identifier, tile, hub) - except: - pass - - if not temp_container.l1c.path.exists(): - downloaded = False - logger.info("Failed download: {}".format(identifier)) - - return (identifier, tile, hub, downloaded) - - -def download_scihub(identifier: str, tile: str, hub: str) -> Tuple[str, str, str, bool]: - """Downloads L1C safe zip file from Scihub using sentinelsat package. - - :param identifier: - :param tile: - :param hub: - """ - logger.debug("download_scihub {}".format(identifier)) - - temp_container = TempContainer(identifier, tile) - temp_container.create_temp_folder() - downloaded = True - - if not temp_container.l1c.path.exists() and not temp_container.l1c.in_library: - api = SentinelAPI(Config().get("scihub_id"), Config().get("scihub_pwd")) - product_request = api.query(identifier='*{}*'.format(identifier)) - api.download_all(product_request, directory_path=str(temp_container.temp_path)) - - try: - temp_container.unzip_l1c() - #archive_l1c(identifier, tile, hub) - except: - #temp_container.delete_temp_folder() - downloaded = False - logger.info("Failed download: {}".format(identifier)) - - return (identifier, tile, hub, downloaded) - - -def processing(identifier: str, tile: str, hub: str, indices_list: List[str], - nodata_clouds: bool, quicklook: bool) -> Tuple[str, str, str]: - """Process each L1C downloaded. - - :tile: - :hub: - :indices_list: - :nodata_clouds: - :quicklook: - """ - logger.debug("processing {}".format(identifier)) - - l1c_product = L1cProduct(identifier, tile) - l1c_product.process_l2a() - - l2a_identifier = identifier.replace("L1C_","L2A_").replace("_USER_", "_OPER_") - l2a_product = L2aProduct(l2a_identifier, tile) - l2a_product.process_cloud_mask() - l2a_product.process_indices(indices_list, nodata_clouds, quicklook) - - return (identifier, tile, hub) - - -def archive_l1c(identifier, tile, hub): - """ - """ - temp = TempContainer(identifier) - l1c_prod = L1cProduct(identifier, tile, path=temp.temp_path) - l1c_prod.archive() - temp.delete_temp_folder() - - return (identifier, tile, hub) - - -class DownloadAndProcess: - """Class for downloading and processing products from :class:`data_request.DataRequest`. - - :param identifiers: a dictionnary containing products identifiers as keys. - :param hubs_limit: hub name as key, max of downloading threads as value. - :param aws_limit: max of downloading threads. - :param process_products: process products after download. - :param max_processes: number of parallel processes. - :param indices_list: list of valid indices names that will be processed. - :param nodata_clouds: mask indices output rasters with a cloud-mask. - :param quicklook: creates a quicklook for each indice processed. - """ - - def __init__(self, identifiers: str, - hubs_limit: dict = None, aws_limit: int = None, - process_products: bool = False, max_processes: int = 2, indices_list: list = [], - nodata_clouds: bool = False, quicklook: bool = False) -> None: - - if not isinstance(identifiers, dict): - raise ValueError("identifiers needs to be a dict") - else: - self.identifiers = identifiers - - if hubs_limit is None: - self.hubs_limit={"peps":3, "scihub":2} - logger.debug("hubs_limit set to: {}".format(self.hubs_limit)) - else: - self.hubs_limit = hubs_limit - - if aws_limit is None: - self.aws_limit = 3 - logger.debug("aws_limit set to: {}".format(self.aws_limit)) - else: - if aws_limit > 3: - self.aws_limit = 3 - logger.warning("aws limit too high, using default: {}".format(self.aws_limit)) - - if not isinstance(process_products, bool): - raise ValueError("process_product must be either True or False") - else: - self.process_products = process_products - - if max_processes >= 1: - self.max_processes = max_processes + 1 # + 1 pour process principal - if max_processes > 2: - logger.warning("max_processes set to: {}, don't run out of memory!".format(max_processes)) - else: - raise ValueError("max_processes must be an unsigned number > 0.") - - if indices_list is None: - self.indices_list = [] - else: - if not isinstance(indices_list, list): - raise ValueError("indices_list must be a list of indices names") - self.indices_list = indices_list - - if not isinstance(nodata_clouds, bool): - raise ValueError("nodata_clouds must be either True or False") - else: - self.nodata_clouds = nodata_clouds - - if not isinstance(quicklook, bool): - raise ValueError("quicklook must be either True or False") - else: - self.quicklook = quicklook - - - self.loop = asyncio.get_event_loop() - self.queue = asyncio.Queue() - - # executors - self.threads_executor = ThreadPoolExecutor( - max_workers=sum(self.hubs_limit.values()) - ) - self.processes_executor = ProcessPoolExecutor( - max_workers=self.max_processes - ) if self.process_products else None - - # failed downloads - self._products_attempts = defaultdict(lambda: 0) - self.failed_products = set() - - # run main coroutine - self.run_loop() - - - async def proxy_pool(self, hub: str, limit: int) -> None: - """ - gestionnaire de "proxies" : permet de switcher entre peps et scihub - de façon à ce que les éléments de la queue soient traités le plus - rapidement possible - - :param hub: - :param limit: - """ - logger.debug("proxy_pool") - - tasks = [] - - while self.queue.qsize() > 0: - logger.debug("while True") - while len(tasks) < limit and self.queue.qsize() > 0: - logger.debug("tasks < limit") - item = await self.queue.get() - tile = self.identifiers["hubs"][item]["tile"] - task = asyncio.ensure_future(self.downloader_hubs(item, tile, hub)) - tasks.append(task) - done, pending = await asyncio.wait(tasks, - return_when=asyncio.FIRST_COMPLETED) - - for each in done: - tasks.remove(each) - - if self.queue.empty(): - logger.debug("queue empty") - if tasks: - await asyncio.gather(*tasks) - return - - - async def downloader_hubs(self, - identifier: str, tile: str, hub: str) -> Tuple[str, str, str]: - """Coroutine for starting - coroutine de téléchargement sur peps et scihub : appelle la fonction qui va télécharger - - """ - if hub == "scihub": - logger.info("--> downloading {} from {}".format(identifier, hub)) - fut = self.loop.run_in_executor(self.threads_executor, - functools.partial(download_scihub, - identifier, - tile, - hub)) - elif hub == "peps": - logger.info("--> downloading {} from {}".format(identifier, hub)) - fut = self.loop.run_in_executor(self.threads_executor, - functools.partial(download_peps, - identifier, - tile, - hub)) - await fut - # if download was successful, process the file - if fut.result()[3]: - logger.info("--> --> {} downloaded from {}".format(identifier, hub)) - arch = asyncio.ensure_future(self.archive_l1c(identifier, tile, hub)) - await arch - - if self.process_products: - fut = asyncio.ensure_future(self.process(identifier, tile, hub)) - await fut - - # if download failed, try again on another hub if first retry - elif not fut.result()[3] and self._products_attempts[identifier] < 3: - self._products_attempts[identifier] += 1 - logger.info("{} download failed, will try again".format(identifier)) - # increase the number of seconds to wait with the number of attempts - # will retry in 2, 4 then 6 minutes before giving up - seconds_to_wait = self._products_attempts[identifier] * 120 - await asyncio.sleep(seconds_to_wait) - await self.queue.put(identifier) - - # if the download failed again, return without processing - else: - self.failed_products.add(identifier) - logger.error("Download failed: {}".format(identifier)) - - return (identifier, tile, hub) - - - - async def downloader_aws(self, - identifier: str, tile: str, hub: str) -> Tuple[str, str, str]: - """coroutine appelant la coroutine de téléchargement sur aws - - """ - logger.debug("downloader_aws {}".format(identifier)) - - async with asyncio.Semaphore(self.aws_limit): - fut = self.loop.run_in_executor(self.threads_executor, - functools.partial(download_aws, - identifier, - tile, - hub)) - await fut - logger.info("--> --> {} downloaded from {}".format(identifier, hub)) - - return (identifier, tile, hub) - - - async def archive_l1c(self, - identifier: str, tile: str, hub: str) -> Tuple[str, str, str]: - """ - """ - #fut = self.loop.run_in_executor(self.processes_executor, - fut = self.loop.run_in_executor(None, - functools.partial(archive_l1c, - identifier, - tile, - hub)) - if fut.cancelled: - return fut - await fut - logger.info("--> --> --> --> {} archived".format(identifier, hub)) - - return (identifier, tile, hub) - - - - async def process(self, - identifier: str, tile: str, hub: str) -> Tuple[str, str, str]: - """ - coroutine qui va appeler la fonction permettant de traiter le fichier pour scihub et peps - - :param identifier: - :param tile: - :param hub: - """ - logger.debug("process {}".format(identifier)) - - #async with asyncio.Semaphore(self.max_processes): - logger.info("--> --> --> processing {}".format(identifier)) - fut = self.loop.run_in_executor(self.processes_executor, - functools.partial(processing, - identifier, - tile, - hub, - self.indices_list, - self.nodata_clouds, - self.quicklook)) - if fut.cancelled: - return fut - await fut - logger.info("--> --> --> --> {} processed".format(identifier, hub)) - - return (identifier, tile, hub) - - - async def download_process_aws(self, identifier: str) -> Tuple[str, str, str]: - """ - Coroutine for downloading and processing products from AWS. - - :param identifier: - """ - logger.info("download_process_aws") - - tile = self.identifiers["aws"][identifier]["tile"] - downloads = await asyncio.ensure_future(self.downloader_aws(identifier, tile, "aws")) - if self.process_products: - fut = self.process(*downloads) - #if fut.cancelled(): - #return fut - await fut - return downloads - - - async def main(self) -> None: - """ - Main coroutine. - """ - logger.debug("main") - - identifiers_aws = self.identifiers["aws"] - identifiers_hubs = self.identifiers["hubs"] - - print("Tiled: ",len(identifiers_hubs)) - print("Non tiled: ", len(identifiers_aws)) - - tasks = [] - # on lance les proxies - for hub, limit in self.hubs_limit.items(): - if limit > 0: - tasks.append(asyncio.ensure_future(self.proxy_pool(hub, limit))) - - # on remplit la queue - if identifiers_hubs: - for identifier, data in identifiers_hubs.items(): - if TempContainer(identifier).l1c.in_library : - continue - await self.queue.put(identifier) - - # on attend que tout se termine - await asyncio.gather(*tasks) - - # on lance les DL aws sur la bonne liste - if identifiers_aws: - aws = [asyncio.ensure_future(self.download_process_aws(identifier)) - for (identifier, data) in identifiers_aws.items()] - await asyncio.gather(*aws) - - # shutting down executors - try: - self.threads_executor.shutdown(wait=True) - logger.debug("Shutting down threads_executor") - except: - pass - try: - self.processes_executor.shutdown(wait=True) - logger.debug("Shutting down processes_executor") - except: - pass - - logger.debug("exiting main") - - - def run_loop(self) -> None: - """ - run loop - """ - start_time = datetime.now() - try: - self.loop.run_until_complete(self.main()) - end_time = datetime.now() - start_time - total_products = len(self.identifiers["aws"]) + len(self.identifiers["hubs"]) - len(self.failed_products) - logger.info("Downloaded and processed {} file(s) in {}".format(total_products, end_time)) - finally: - logger.debug("closing loop") - #self.loop.close() - if self.failed_products: - print("Failed downloads:") - pprint(self.failed_products) - - -#tiles_list = ["10TEP", "13SDT", "13TEF", "14SQJ", "16TCS", "19MHP", "30PXU", "32UPC", "33KUA", "35KRR", "36JUT", "38KQB", "38KQV", "48PWU", "52VDN"] - -prods = DataRequest("2018-09-10", "2018-09-28").from_tiles(["13TEF"]) - -DownloadAndProcess(prods, process_products=True, indices_list=["NDVI"], nodata_clouds=True, quicklook=True) diff --git a/dev/dev_products_manager.py b/dev/dev_products_manager.py deleted file mode 100644 index 317c471..0000000 --- a/dev/dev_products_manager.py +++ /dev/null @@ -1,53 +0,0 @@ -# -*-coding:utf-8-*- - -from pathlib import Path -from collections import namedtuple -from pprint import pprint, pformat -from datetime import datetime -import re -# type annotations -from typing import List, Set, Dict, Tuple, Optional - -from sen2chain import (Config, - L1cProduct, L2aProduct, Tile, - IndicesCollection, - str_to_datetime) - -data = Path("data") - -t = Tile("38KQV") - -#print(t.l2a.last.date) -#print(t.l2a.filter_by_cloud_cover(0,50)) - -#l1c = L1cProduct("S2A_MSIL1C_20180904T063511_N0206_R134_T40KCB_20180904T090928.SAFE") -#print(l1c.in_library) - - -class Library: - """ - """ - _temp_path = Path(Config().get("temp_path")) - _l1c_path = Path(Config().get("l1c_path")) - _l2a_path = Path(Config().get("l2a_path")) - _indices_path = Path(Config().get("indices_path")) - - def __init__(self) -> None: - self._l1c_tiles = [f.name for f in self._l1c_path.glob("*")] - self._l2a_tiles = [f.name for f in self._l2a_path.glob("*")] - self._indices = [f.name for f in self._indices_path.glob("*")] - - @property - def l1c_tiles(self): - return self._l1c_tiles - - @property - def l2a_tiles(self): - return self._l2a_tiles - - @property - def indices(self): - return self._indicess - - -print(Library().l_tiles) diff --git a/dev/dev_timeseries.py b/dev/dev_timeseries.py deleted file mode 100644 index 375deca..0000000 --- a/dev/dev_timeseries.py +++ /dev/null @@ -1,29 +0,0 @@ -# -*- coding: utf-8 -*- - -import fiona -import rasterio -import csv -from fiona import transform -from pathlib import Path -from shapely.geometry import shape -from rasterstats import zonal_stats -from collections import OrderedDict -from osgeo import ogr -# type annotations -from typing import List, Set, Dict, Tuple, Union -import pandas as pd -from pprint import pprint - -from sen2chain import TimeSeries, Tile -from sen2chain.indices import IndicesCollection -from sen2chain.geo_utils import get_tiles_from_file -from sen2chain.utils import datetime_to_str - - - - - -#TimeSeries("/media/seas-oi/92DAD912DAD8F387/jeremy/DATA_S2/TIME_SERIES/35LLD_kafue.geojson", indices=["NDVI"], date_min="2018-01-01", date_max="2018-09-01") -auto = TimeSeries("/home/jiji/sen2chain/data/TIME_SERIES/test.shp") #, indices=["NDVI"], date_min="2016-01-01", date_max="2016-09-01") - -auto = TimeSeries() diff --git a/dev/dev_xmlparser.py b/dev/dev_xmlparser.py deleted file mode 100644 index 2f69e67..0000000 --- a/dev/dev_xmlparser.py +++ /dev/null @@ -1,63 +0,0 @@ -# coding:utf-8 - -import sys -import os -sys.path.insert(0, os.path.abspath('..')) - -import re -from pathlib import Path -from pprint import pprint - -from sen2chain import xmlparser, L1cProduct - -tile = "40KCB" -#mtd = Path("data/S2A_USER_MTD_SAFL2A_PDMC_20160119T023918_R134_V20160118T063524_20160118T063524.xml") -mtd = Path("data/S2A_OPER_MTD_SAFL1C_PDMC_20151201T164959_R134_V20151129T063527_20151129T063527.xml") - -#parser = xmlparser.MetadataParser(mtd, tile) - -p = L1cProduct("S2A_MSIL1C_20161213T063502_N0204_R134_T40KCB_20161213T063504.SAFE", path=Path()/"data") - -print(p._update_metadata_parser()) - -#def test_xmlparser_get_granule(): - - -#print(parser._psd_version) -#print(parser._root) - -#schema_location = parser._root.attrib["{http://www.w3.org/2001/XMLSchema-instance}schemaLocation"] - - -#version = re.findall("https://psd-([0-9]{2}).sentinel2.eo.esa.int", schema_location)[0] - -#print(parser._psd_version) - -#pprint(parser.__dict__) - -#pprint([g.attrib['granuleIdentifier'] for g in parser._root.findall('.//Granules') if tile in g.attrib["granuleIdentifier"]][0]) - - -#print(parser._granule) - -#granules = [g.attrib["granuleIdentifier"] for g in parser._root.findall('.//Granule')] - -#print(parser._granule_string) - -#print(parser.get_band_path("B04")) - - -#parser._root.findall(".//[@granuleIdentifier={}]" - -#print( - #[f.text + ".jp2" - #for f in parser._root.findall(".//{}[@granuleIdentifier='{}']/{}".format(parser._granule_string, parser._granule, parser._image_string)) - #if "B04" in f.text][0] -#) - - -#print(granules) - -#for child in granules[0]: - #print(child.text) - -- GitLab