diff --git a/sen2chain/automatization.py b/sen2chain/automatization.py index af6bd365edd19837913faa320c2ba415f0dfbcdc..b1814cadd5141d2f269b9006ee21749e6449e571 100644 --- a/sen2chain/automatization.py +++ b/sen2chain/automatization.py @@ -40,7 +40,7 @@ class Automatization: def __init__(self) -> None: self._df = None self._tiles_to_update = dict() - self._products_list = {"hubs": {}, "aws": {}} + self._products_list = {} if not self._csv_path.exists(): logger.info("Creating tiles_to_watch file") @@ -190,8 +190,7 @@ class Automatization: ) request.from_tiles([tile]) - self._products_list["hubs"].update(request.products_list["hubs"]) - self._products_list["aws"].update(request.products_list["aws"]) + self._products_list.update(request.products_list) @staticmethod def _get_ignored_tiles(self) -> np.ndarray: @@ -235,7 +234,6 @@ class Automatization: prods = DownloadAndProcess( identifiers=self._products_list, hubs_limit=hubs_limit, - aws_limit=2, process_products=process_products, max_processes=3, indices_list=indices_list, @@ -273,7 +271,7 @@ class Automatization: @property def products(self) -> Dict[str, dict]: - """Returns the products to download on AWS and on hubs.""" + """Returns the products to download.""" return self._products_list diff --git a/sen2chain/data_request.py b/sen2chain/data_request.py index c718d5d3a42d78c2c1280dcd046cc62cc5bb2d5f..65403749512bdcd45fcde6c4dacd342563fb557f 100644 --- a/sen2chain/data_request.py +++ b/sen2chain/data_request.py @@ -64,13 +64,6 @@ class DataRequest: land_only=True).from_tiles["40KCB", "40KEC"] """ - # Since "requester pays" was enabled on AWS for the Sentienl-2 L1C dataset - # (products are no longer free), downloading non-tiled products on AWS - # does'nt work anymore. - # Therefore, it's useless to make a complex request to separate non-tiled - # and tiled products. - # This class needs to be refactored. - # Proxy settings proxy_http_url = Config().get("proxy_http_url").strip() proxy_https_url = Config().get("proxy_https_url").strip() @@ -105,7 +98,7 @@ class DataRequest: self.land_only = land_only self.tiles_to_keep = None self.tiles_to_keep_geom = dict() - self.products_list = {"aws": {}, "hubs": {}} + self.products_list = {} self.cloudcoverpercentage = ( cloud_cover_percentage if cloud_cover_percentage else (0, 100) ) @@ -198,146 +191,18 @@ class DataRequest: return self.products_list def _make_request(self) -> None: - """Will call the right request method depending on products""" - + """Scihub API request using sentinelsat.""" logger.debug("_make_request") logger.info( - "Requesting images ranging from {} to {}".format( - self.start_date, self.end_date - ) + f"Requesting images ranging from {self.start_date} to {self.end_date}" ) if self.tiles_to_keep is None: raise ValueError("Query tiles not provided") - # reset products_list - # should the products_list be updated or erased for each new request ? - self.products_list = {"aws": {}, "hubs": {}} - - tileddate = str_to_datetime("2016-11-01", "ymd") - - if self.start_date > tileddate: - self._make_request_tiled_only() - else: - self._make_request_not_tiled() - - def _make_request_not_tiled(self) -> None: - """Scihub API request using sentinelsat. This method is called for - tiled products only.""" - - logger.debug("_make_request_not_tiled") - - print("Tile:", self.tiles_to_keep) - - products_from_hubs = dict() - products_from_aws = dict() - - # query by group of 3 tiles, otherwise getting error message - # "Request URI too long" from scihub - for tiles_to_keep_triplet, tiles_to_keep_triplet_geom in zip( - grouper(self.tiles_to_keep, 3), - grouper(self.tiles_to_keep_geom.values(), 3), - ): - - tiles_to_keep = [tile for tile in tiles_to_keep_triplet if tile] - tiles_to_keep_geom = [ - geom for geom in tiles_to_keep_triplet_geom if geom - ] - - print(tiles_to_keep) - - # build a multipolygon from tiles geom - query_geom = MultiPolygon(tiles_to_keep_geom) - logging.debug("query geometry:\n{}".format(query_geom)) - - # scihub request - products = OrderedDict() - products = self.api.query( - query_geom, - date=(self.start_date, self.end_date), - order_by="+endposition", - platformname="Sentinel-2", - producttype="S2MSI1C", - cloudcoverpercentage=self.cloudcoverpercentage, - ) - - # save products list as a pandas dataframe - products_df = self.api.to_dataframe(products) - - if products_df.empty: - return - # a products dictionnay for each server (AWS vs hubs) - # fill each dictionnary depending on the acquisition date - for index, row in products_df[ - ["title", "beginposition", "footprint"] - ].iterrows(): - - # start date of the tiled S2 collection on the scihub server - tileddate = str_to_datetime("2016-11-01", "ymd") - img_title = row[0] - img_date = row[1].to_pydatetime() - img_footprint = loads(row[2]) - - for tile_name, tile_geom in self.tiles_to_keep_geom.items(): - # in case of duplicates on the server - if ( - img_title not in self.products_list["hubs"].keys() - and img_title not in self.products_list["aws"].keys() - ): - - # tiled products are downloaded on hubs - if re.match(r".*_T[0-9]{2}[A-Z]{3}_.*", img_title): - if tile_name in img_title: - self.products_list["hubs"][img_title] = { - "date": img_date, - "tile": tile_name, - } - continue - else: - continue - - # non-tiled products will be downloaded on aws - else: - if tile_geom.intersects(img_footprint): - self.products_list["aws"][img_title] = { - "date": img_date, - "tile": tile_name, - } - - # pprint dicts in chronological order - print("\nFrom AWS") - pprint( - list( - OrderedDict( - sorted( - self.products_list["aws"].items(), - key=lambda t: t[1]["date"], - ) - ) - ) - ) - print("\nFrom hubs") - pprint( - list( - OrderedDict( - sorted( - self.products_list["hubs"].items(), - key=lambda t: t[1]["date"], - ) - ) - ) - ) - - # Tiled products request (lighter) - def _make_request_tiled_only(self) -> None: - """Scihub API request using sentinelsat. This method is called if - products are a mix of tiled and non-tiled products.""" - - logger.debug("_make_request_tiled_only") print("Sentinel2 tiles:\n", self.tiles_to_keep) - products_from_hubs = dict() - products_from_aws = dict() + products = dict() # remove water-only tiles when land-only parameter is enabled if self.land_only: @@ -361,7 +226,7 @@ class DataRequest: products = OrderedDict() for tile in self.tiles_to_keep: kw = query_kwargs.copy() - kw["filename"] = "*_T{}_*".format(tile) + kw["filename"] = f"*_T{tile}_*" pp = self.api.query(**kw) products.update(pp) @@ -370,23 +235,21 @@ class DataRequest: if products_df.empty: return - # a products dictionnay for each server (AWS vs hubs) # fill each dictionnary depending on the acquisition date for index, row in products_df[["title", "beginposition"]].iterrows(): img_title = row[0] img_date = row[1].to_pydatetime() - self.products_list["hubs"][img_title] = { + self.products_list[img_title] = { "date": img_date, "tile": re.findall("_T([0-9]{2}[A-Z]{3})_", img_title)[0], } # pprint dicts in chronological order - print("\nFrom hubs") pprint( list( OrderedDict( sorted( - self.products_list["hubs"].items(), + self.products_list.items(), key=lambda t: t[1]["date"], ) ) diff --git a/sen2chain/download_and_process.py b/sen2chain/download_and_process.py index fa7aae9f9a1210d43f60f46cb63fc56163e255b4..ae272ccfbf4bbfa7a30d80308e39253f0fe763fb 100644 --- a/sen2chain/download_and_process.py +++ b/sen2chain/download_and_process.py @@ -13,7 +13,6 @@ from datetime import datetime from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from collections import defaultdict from sentinelsat import SentinelAPI -# from sentinelhub import AwsProductRequest from pprint import pprint # type annotations @@ -31,38 +30,6 @@ logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG) -# FIXME: AWS download no longer available -def download_aws(identifier: str, tile: str, hub: str) -> Tuple[str, bool]: - """Downloads L1C safe from AWS using sentinelhub package. - - :param identifier: - :param tile: - :param hub: - """ - logger.debug("download_aws {}".format(identifier)) - temp_container = TempContainer(identifier, tile) - temp_container.create_temp_folder() - # downloaded = True - - if ( - not temp_container.l1c.path.exists() - and not temp_container.l1c.in_library - ): - product_request = AwsProductRequest( - product_id=identifier, - tile_list=["T" + tile], - data_folder=str(temp_container.temp_path), - safe_format=True, - ) - product_request.save_data() - # archive_l1c(identifier, tile, hub) - - # if not temp_container.l1c.path.exists(): - # downloaded = False - - return identifier, hub - - def download_peps( identifier: str, tile: str, hub: str ) -> Tuple[str, str, str, bool]: @@ -214,7 +181,6 @@ class DownloadAndProcess: :param identifiers: a dictionnary containing products identifiers as keys. :param hubs_limit: hub name as key, max of downloading threads as value. - :param aws_limit: max of downloading threads. :param process_products: process products after download. :param max_processes: number of parallel processes. :param indices_list: list of valid indices names that will be processed. @@ -233,7 +199,6 @@ class DownloadAndProcess: self, identifiers: Dict[str, dict], hubs_limit: dict = None, - aws_limit: int = None, process_products: bool = False, max_processes: int = 2, indices_list: list = [], @@ -252,18 +217,6 @@ class DownloadAndProcess: else: self.hubs_limit = hubs_limit - if aws_limit is None: - self.aws_limit = 3 - logger.debug("aws_limit set to: {}".format(self.aws_limit)) - else: - if aws_limit > 3: - self.aws_limit = 3 - logger.warning( - "aws limit too high, using default: {}".format( - self.aws_limit - ) - ) - if not isinstance(process_products, bool): raise ValueError("process_product must be either True or False") else: @@ -339,7 +292,7 @@ class DownloadAndProcess: while len(tasks) < limit and self.queue.qsize() > 0: logger.debug("tasks < limit") item = await self.queue.get() - tile = self.identifiers["hubs"][item]["tile"] + tile = self.identifiers[item]["tile"] task = asyncio.ensure_future( self.downloader_hubs(item, tile, hub) ) @@ -412,25 +365,6 @@ class DownloadAndProcess: return (identifier, tile, hub) - # FIXME: AWS download no longer available - async def downloader_aws( - self, identifier: str, tile: str, hub: str - ) -> Tuple[str, str, str]: - """coroutine appelant la coroutine de téléchargement sur aws""" - logger.debug("downloader_aws {}".format(identifier)) - - async with asyncio.Semaphore(self.aws_limit): - fut = self.loop.run_in_executor( - self.threads_executor, - functools.partial(download_aws, identifier, tile, hub), - ) - await fut - logger.info( - "--> --> {} downloaded from {}".format(identifier, hub) - ) - - return (identifier, tile, hub) - async def archive_l1c( self, identifier: str, tile: str, hub: str ) -> Tuple[str, str, str]: @@ -479,38 +413,13 @@ class DownloadAndProcess: return identifier, tile, hub - # FIXME: AWS download no longer available - async def download_process_aws( - self, identifier: str - ) -> Tuple[str, str, str]: - """ - Coroutine for downloading and processing products from AWS. - - :param identifier: - """ - logger.info("download_process_aws") - - tile = self.identifiers["aws"][identifier]["tile"] - downloads = await asyncio.ensure_future( - self.downloader_aws(identifier, tile, "aws") - ) - if self.process_products: - fut = self.process(*downloads) - # if fut.cancelled(): - # return fut - await fut - return downloads - async def main(self) -> None: """Main coroutine. Starts proxies and fills the queue.""" logger.debug("main") - - identifiers_aws = self.identifiers["aws"] - identifiers_hubs = self.identifiers["hubs"] + identifiers_hubs = self.identifiers print("Tiled: ", len(identifiers_hubs)) - print("Non tiled: ", len(identifiers_aws)) tasks = [] # on lance les proxies @@ -530,14 +439,6 @@ class DownloadAndProcess: # on attend que tout se termine await asyncio.gather(*tasks) - # on lance les DL aws sur la bonne liste - if identifiers_aws: - aws = [ - asyncio.ensure_future(self.download_process_aws(identifier)) - for (identifier, data) in identifiers_aws.items() - ] - await asyncio.gather(*aws) - # shutting down executors try: self.threads_executor.shutdown(wait=True) @@ -560,11 +461,7 @@ class DownloadAndProcess: try: self.loop.run_until_complete(self.main()) end_time = datetime.now() - start_time - total_products = ( - len(self.identifiers["aws"]) - + len(self.identifiers["hubs"]) - - len(self.failed_products) - ) + total_products = len(self.identifiers) - len(self.failed_products) logger.info( "Downloaded and processed {} file(s) in {}".format( total_products, end_time diff --git a/sen2chain/products.py b/sen2chain/products.py index 5159cc160a9f77a5b3028c099e26d541d1c49659..010c21d01c07d9fe8e646d9d41884b56c94c753a 100755 --- a/sen2chain/products.py +++ b/sen2chain/products.py @@ -1043,10 +1043,6 @@ class L2aProduct(Product): def thin_cirrus_percentage(self): return self._get_metadata_value(key="THIN_CIRRUS_PERCENTAGE") - @property - def cloud_coverage_percentage(self): - return self._get_metadata_value(key="CLOUD_COVERAGE_PERCENTAGE") - # FOOTPRINT @property def footprint(self) -> List[Tuple[float, float]]: