Skip to content
Snippets Groups Projects
Commit 802edbfb authored by Jérémy Commins's avatar Jérémy Commins
Browse files

Remove AWS support from download module

parent 25ad4518
No related branches found
No related tags found
No related merge requests found
...@@ -13,7 +13,6 @@ from datetime import datetime ...@@ -13,7 +13,6 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from collections import defaultdict from collections import defaultdict
from sentinelsat import SentinelAPI from sentinelsat import SentinelAPI
from sentinelhub import AwsProductRequest
from pprint import pprint from pprint import pprint
# type annotations # type annotations
...@@ -31,38 +30,6 @@ logger = logging.getLogger(__name__) ...@@ -31,38 +30,6 @@ logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
# FIXME: AWS download no longer available
def download_aws(identifier: str, tile: str, hub: str) -> Tuple[str, bool]:
"""Downloads L1C safe from AWS using sentinelhub package.
:param identifier:
:param tile:
:param hub:
"""
logger.debug("download_aws {}".format(identifier))
temp_container = TempContainer(identifier, tile)
temp_container.create_temp_folder()
# downloaded = True
if (
not temp_container.l1c.path.exists()
and not temp_container.l1c.in_library
):
product_request = AwsProductRequest(
product_id=identifier,
tile_list=["T" + tile],
data_folder=str(temp_container.temp_path),
safe_format=True,
)
product_request.save_data()
# archive_l1c(identifier, tile, hub)
# if not temp_container.l1c.path.exists():
# downloaded = False
return identifier, hub
def download_peps( def download_peps(
identifier: str, tile: str, hub: str identifier: str, tile: str, hub: str
) -> Tuple[str, str, str, bool]: ) -> Tuple[str, str, str, bool]:
...@@ -214,7 +181,6 @@ class DownloadAndProcess: ...@@ -214,7 +181,6 @@ class DownloadAndProcess:
:param identifiers: a dictionnary containing products identifiers as keys. :param identifiers: a dictionnary containing products identifiers as keys.
:param hubs_limit: hub name as key, max of downloading threads as value. :param hubs_limit: hub name as key, max of downloading threads as value.
:param aws_limit: max of downloading threads.
:param process_products: process products after download. :param process_products: process products after download.
:param max_processes: number of parallel processes. :param max_processes: number of parallel processes.
:param indices_list: list of valid indices names that will be processed. :param indices_list: list of valid indices names that will be processed.
...@@ -233,7 +199,6 @@ class DownloadAndProcess: ...@@ -233,7 +199,6 @@ class DownloadAndProcess:
self, self,
identifiers: Dict[str, dict], identifiers: Dict[str, dict],
hubs_limit: dict = None, hubs_limit: dict = None,
aws_limit: int = None,
process_products: bool = False, process_products: bool = False,
max_processes: int = 2, max_processes: int = 2,
indices_list: list = [], indices_list: list = [],
...@@ -252,18 +217,6 @@ class DownloadAndProcess: ...@@ -252,18 +217,6 @@ class DownloadAndProcess:
else: else:
self.hubs_limit = hubs_limit self.hubs_limit = hubs_limit
if aws_limit is None:
self.aws_limit = 3
logger.debug("aws_limit set to: {}".format(self.aws_limit))
else:
if aws_limit > 3:
self.aws_limit = 3
logger.warning(
"aws limit too high, using default: {}".format(
self.aws_limit
)
)
if not isinstance(process_products, bool): if not isinstance(process_products, bool):
raise ValueError("process_product must be either True or False") raise ValueError("process_product must be either True or False")
else: else:
...@@ -339,7 +292,7 @@ class DownloadAndProcess: ...@@ -339,7 +292,7 @@ class DownloadAndProcess:
while len(tasks) < limit and self.queue.qsize() > 0: while len(tasks) < limit and self.queue.qsize() > 0:
logger.debug("tasks < limit") logger.debug("tasks < limit")
item = await self.queue.get() item = await self.queue.get()
tile = self.identifiers["hubs"][item]["tile"] tile = self.identifiers[item]["tile"]
task = asyncio.ensure_future( task = asyncio.ensure_future(
self.downloader_hubs(item, tile, hub) self.downloader_hubs(item, tile, hub)
) )
...@@ -412,25 +365,6 @@ class DownloadAndProcess: ...@@ -412,25 +365,6 @@ class DownloadAndProcess:
return (identifier, tile, hub) return (identifier, tile, hub)
# FIXME: AWS download no longer available
async def downloader_aws(
self, identifier: str, tile: str, hub: str
) -> Tuple[str, str, str]:
"""coroutine appelant la coroutine de téléchargement sur aws"""
logger.debug("downloader_aws {}".format(identifier))
async with asyncio.Semaphore(self.aws_limit):
fut = self.loop.run_in_executor(
self.threads_executor,
functools.partial(download_aws, identifier, tile, hub),
)
await fut
logger.info(
"--> --> {} downloaded from {}".format(identifier, hub)
)
return (identifier, tile, hub)
async def archive_l1c( async def archive_l1c(
self, identifier: str, tile: str, hub: str self, identifier: str, tile: str, hub: str
) -> Tuple[str, str, str]: ) -> Tuple[str, str, str]:
...@@ -479,38 +413,13 @@ class DownloadAndProcess: ...@@ -479,38 +413,13 @@ class DownloadAndProcess:
return identifier, tile, hub return identifier, tile, hub
# FIXME: AWS download no longer available
async def download_process_aws(
self, identifier: str
) -> Tuple[str, str, str]:
"""
Coroutine for downloading and processing products from AWS.
:param identifier:
"""
logger.info("download_process_aws")
tile = self.identifiers["aws"][identifier]["tile"]
downloads = await asyncio.ensure_future(
self.downloader_aws(identifier, tile, "aws")
)
if self.process_products:
fut = self.process(*downloads)
# if fut.cancelled():
# return fut
await fut
return downloads
async def main(self) -> None: async def main(self) -> None:
"""Main coroutine. Starts proxies and fills the queue.""" """Main coroutine. Starts proxies and fills the queue."""
logger.debug("main") logger.debug("main")
identifiers_hubs = self.identifiers
identifiers_aws = self.identifiers["aws"]
identifiers_hubs = self.identifiers["hubs"]
print("Tiled: ", len(identifiers_hubs)) print("Tiled: ", len(identifiers_hubs))
print("Non tiled: ", len(identifiers_aws))
tasks = [] tasks = []
# on lance les proxies # on lance les proxies
...@@ -530,14 +439,6 @@ class DownloadAndProcess: ...@@ -530,14 +439,6 @@ class DownloadAndProcess:
# on attend que tout se termine # on attend que tout se termine
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
# on lance les DL aws sur la bonne liste
if identifiers_aws:
aws = [
asyncio.ensure_future(self.download_process_aws(identifier))
for (identifier, data) in identifiers_aws.items()
]
await asyncio.gather(*aws)
# shutting down executors # shutting down executors
try: try:
self.threads_executor.shutdown(wait=True) self.threads_executor.shutdown(wait=True)
...@@ -560,11 +461,7 @@ class DownloadAndProcess: ...@@ -560,11 +461,7 @@ class DownloadAndProcess:
try: try:
self.loop.run_until_complete(self.main()) self.loop.run_until_complete(self.main())
end_time = datetime.now() - start_time end_time = datetime.now() - start_time
total_products = ( total_products = len(self.identifiers) - len(self.failed_products)
len(self.identifiers["aws"])
+ len(self.identifiers["hubs"])
- len(self.failed_products)
)
logger.info( logger.info(
"Downloaded and processed {} file(s) in {}".format( "Downloaded and processed {} file(s) in {}".format(
total_products, end_time total_products, end_time
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment