diff --git a/sen2chain/download_and_process.py b/sen2chain/download_and_process.py index 99532dcc383ee04526209f409133de912dbf2ba5..ae272ccfbf4bbfa7a30d80308e39253f0fe763fb 100644 --- a/sen2chain/download_and_process.py +++ b/sen2chain/download_and_process.py @@ -13,7 +13,6 @@ from datetime import datetime from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from collections import defaultdict from sentinelsat import SentinelAPI -from sentinelhub import AwsProductRequest from pprint import pprint # type annotations @@ -31,38 +30,6 @@ logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) -# FIXME: AWS download no longer available -def download_aws(identifier: str, tile: str, hub: str) -> Tuple[str, bool]: - """Downloads L1C safe from AWS using sentinelhub package. - - :param identifier: - :param tile: - :param hub: - """ - logger.debug("download_aws {}".format(identifier)) - temp_container = TempContainer(identifier, tile) - temp_container.create_temp_folder() - # downloaded = True - - if ( - not temp_container.l1c.path.exists() - and not temp_container.l1c.in_library - ): - product_request = AwsProductRequest( - product_id=identifier, - tile_list=["T" + tile], - data_folder=str(temp_container.temp_path), - safe_format=True, - ) - product_request.save_data() - # archive_l1c(identifier, tile, hub) - - # if not temp_container.l1c.path.exists(): - # downloaded = False - - return identifier, hub - - def download_peps( identifier: str, tile: str, hub: str ) -> Tuple[str, str, str, bool]: @@ -214,7 +181,6 @@ class DownloadAndProcess: :param identifiers: a dictionnary containing products identifiers as keys. :param hubs_limit: hub name as key, max of downloading threads as value. - :param aws_limit: max of downloading threads. :param process_products: process products after download. :param max_processes: number of parallel processes. :param indices_list: list of valid indices names that will be processed. @@ -233,7 +199,6 @@ class DownloadAndProcess: self, identifiers: Dict[str, dict], hubs_limit: dict = None, - aws_limit: int = None, process_products: bool = False, max_processes: int = 2, indices_list: list = [], @@ -252,18 +217,6 @@ class DownloadAndProcess: else: self.hubs_limit = hubs_limit - if aws_limit is None: - self.aws_limit = 3 - logger.debug("aws_limit set to: {}".format(self.aws_limit)) - else: - if aws_limit > 3: - self.aws_limit = 3 - logger.warning( - "aws limit too high, using default: {}".format( - self.aws_limit - ) - ) - if not isinstance(process_products, bool): raise ValueError("process_product must be either True or False") else: @@ -339,7 +292,7 @@ class DownloadAndProcess: while len(tasks) < limit and self.queue.qsize() > 0: logger.debug("tasks < limit") item = await self.queue.get() - tile = self.identifiers["hubs"][item]["tile"] + tile = self.identifiers[item]["tile"] task = asyncio.ensure_future( self.downloader_hubs(item, tile, hub) ) @@ -412,25 +365,6 @@ class DownloadAndProcess: return (identifier, tile, hub) - # FIXME: AWS download no longer available - async def downloader_aws( - self, identifier: str, tile: str, hub: str - ) -> Tuple[str, str, str]: - """coroutine appelant la coroutine de téléchargement sur aws""" - logger.debug("downloader_aws {}".format(identifier)) - - async with asyncio.Semaphore(self.aws_limit): - fut = self.loop.run_in_executor( - self.threads_executor, - functools.partial(download_aws, identifier, tile, hub), - ) - await fut - logger.info( - "--> --> {} downloaded from {}".format(identifier, hub) - ) - - return (identifier, tile, hub) - async def archive_l1c( self, identifier: str, tile: str, hub: str ) -> Tuple[str, str, str]: @@ -479,38 +413,13 @@ class DownloadAndProcess: return identifier, tile, hub - # FIXME: AWS download no longer available - async def download_process_aws( - self, identifier: str - ) -> Tuple[str, str, str]: - """ - Coroutine for downloading and processing products from AWS. - - :param identifier: - """ - logger.info("download_process_aws") - - tile = self.identifiers["aws"][identifier]["tile"] - downloads = await asyncio.ensure_future( - self.downloader_aws(identifier, tile, "aws") - ) - if self.process_products: - fut = self.process(*downloads) - # if fut.cancelled(): - # return fut - await fut - return downloads - async def main(self) -> None: """Main coroutine. Starts proxies and fills the queue.""" logger.debug("main") - - identifiers_aws = self.identifiers["aws"] - identifiers_hubs = self.identifiers["hubs"] + identifiers_hubs = self.identifiers print("Tiled: ", len(identifiers_hubs)) - print("Non tiled: ", len(identifiers_aws)) tasks = [] # on lance les proxies @@ -530,14 +439,6 @@ class DownloadAndProcess: # on attend que tout se termine await asyncio.gather(*tasks) - # on lance les DL aws sur la bonne liste - if identifiers_aws: - aws = [ - asyncio.ensure_future(self.download_process_aws(identifier)) - for (identifier, data) in identifiers_aws.items() - ] - await asyncio.gather(*aws) - # shutting down executors try: self.threads_executor.shutdown(wait=True) @@ -560,11 +461,7 @@ class DownloadAndProcess: try: self.loop.run_until_complete(self.main()) end_time = datetime.now() - start_time - total_products = ( - len(self.identifiers["aws"]) - + len(self.identifiers["hubs"]) - - len(self.failed_products) - ) + total_products = len(self.identifiers) - len(self.failed_products) logger.info( "Downloaded and processed {} file(s) in {}".format( total_products, end_time