Skip to content
Snippets Groups Projects
Commit 835b76d8 authored by pascal.mouquet_ird.fr's avatar pascal.mouquet_ird.fr
Browse files

Merge branch 'develop' into 'eodag_reformatted'

# Conflicts:
#   sen2chain/download_and_process.py
parents dc0f1e24 ebf34809
No related branches found
No related tags found
No related merge requests found
...@@ -40,7 +40,7 @@ class Automatization: ...@@ -40,7 +40,7 @@ class Automatization:
def __init__(self) -> None: def __init__(self) -> None:
self._df = None self._df = None
self._tiles_to_update = dict() self._tiles_to_update = dict()
self._products_list = {"hubs": {}, "aws": {}} self._products_list = {}
if not self._csv_path.exists(): if not self._csv_path.exists():
logger.info("Creating tiles_to_watch file") logger.info("Creating tiles_to_watch file")
...@@ -190,8 +190,7 @@ class Automatization: ...@@ -190,8 +190,7 @@ class Automatization:
) )
request.from_tiles([tile]) request.from_tiles([tile])
self._products_list["hubs"].update(request.products_list["hubs"]) self._products_list.update(request.products_list)
self._products_list["aws"].update(request.products_list["aws"])
@staticmethod @staticmethod
def _get_ignored_tiles(self) -> np.ndarray: def _get_ignored_tiles(self) -> np.ndarray:
...@@ -235,7 +234,6 @@ class Automatization: ...@@ -235,7 +234,6 @@ class Automatization:
prods = DownloadAndProcess( prods = DownloadAndProcess(
identifiers=self._products_list, identifiers=self._products_list,
hubs_limit=hubs_limit, hubs_limit=hubs_limit,
aws_limit=2,
process_products=process_products, process_products=process_products,
max_processes=3, max_processes=3,
indices_list=indices_list, indices_list=indices_list,
...@@ -273,7 +271,7 @@ class Automatization: ...@@ -273,7 +271,7 @@ class Automatization:
@property @property
def products(self) -> Dict[str, dict]: def products(self) -> Dict[str, dict]:
"""Returns the products to download on AWS and on hubs.""" """Returns the products to download."""
return self._products_list return self._products_list
......
...@@ -64,13 +64,6 @@ class DataRequest: ...@@ -64,13 +64,6 @@ class DataRequest:
land_only=True).from_tiles["40KCB", "40KEC"] land_only=True).from_tiles["40KCB", "40KEC"]
""" """
# Since "requester pays" was enabled on AWS for the Sentienl-2 L1C dataset
# (products are no longer free), downloading non-tiled products on AWS
# does'nt work anymore.
# Therefore, it's useless to make a complex request to separate non-tiled
# and tiled products.
# This class needs to be refactored.
# Proxy settings # Proxy settings
proxy_http_url = Config().get("proxy_http_url").strip() proxy_http_url = Config().get("proxy_http_url").strip()
proxy_https_url = Config().get("proxy_https_url").strip() proxy_https_url = Config().get("proxy_https_url").strip()
...@@ -105,7 +98,7 @@ class DataRequest: ...@@ -105,7 +98,7 @@ class DataRequest:
self.land_only = land_only self.land_only = land_only
self.tiles_to_keep = None self.tiles_to_keep = None
self.tiles_to_keep_geom = dict() self.tiles_to_keep_geom = dict()
self.products_list = {"aws": {}, "hubs": {}} self.products_list = {}
self.cloudcoverpercentage = ( self.cloudcoverpercentage = (
cloud_cover_percentage if cloud_cover_percentage else (0, 100) cloud_cover_percentage if cloud_cover_percentage else (0, 100)
) )
...@@ -198,146 +191,18 @@ class DataRequest: ...@@ -198,146 +191,18 @@ class DataRequest:
return self.products_list return self.products_list
def _make_request(self) -> None: def _make_request(self) -> None:
"""Will call the right request method depending on products""" """Scihub API request using sentinelsat."""
logger.debug("_make_request") logger.debug("_make_request")
logger.info( logger.info(
"Requesting images ranging from {} to {}".format( f"Requesting images ranging from {self.start_date} to {self.end_date}"
self.start_date, self.end_date
)
) )
if self.tiles_to_keep is None: if self.tiles_to_keep is None:
raise ValueError("Query tiles not provided") raise ValueError("Query tiles not provided")
# reset products_list
# should the products_list be updated or erased for each new request ?
self.products_list = {"aws": {}, "hubs": {}}
tileddate = str_to_datetime("2016-11-01", "ymd")
if self.start_date > tileddate:
self._make_request_tiled_only()
else:
self._make_request_not_tiled()
def _make_request_not_tiled(self) -> None:
"""Scihub API request using sentinelsat. This method is called for
tiled products only."""
logger.debug("_make_request_not_tiled")
print("Tile:", self.tiles_to_keep)
products_from_hubs = dict()
products_from_aws = dict()
# query by group of 3 tiles, otherwise getting error message
# "Request URI too long" from scihub
for tiles_to_keep_triplet, tiles_to_keep_triplet_geom in zip(
grouper(self.tiles_to_keep, 3),
grouper(self.tiles_to_keep_geom.values(), 3),
):
tiles_to_keep = [tile for tile in tiles_to_keep_triplet if tile]
tiles_to_keep_geom = [
geom for geom in tiles_to_keep_triplet_geom if geom
]
print(tiles_to_keep)
# build a multipolygon from tiles geom
query_geom = MultiPolygon(tiles_to_keep_geom)
logging.debug("query geometry:\n{}".format(query_geom))
# scihub request
products = OrderedDict()
products = self.api.query(
query_geom,
date=(self.start_date, self.end_date),
order_by="+endposition",
platformname="Sentinel-2",
producttype="S2MSI1C",
cloudcoverpercentage=self.cloudcoverpercentage,
)
# save products list as a pandas dataframe
products_df = self.api.to_dataframe(products)
if products_df.empty:
return
# a products dictionnay for each server (AWS vs hubs)
# fill each dictionnary depending on the acquisition date
for index, row in products_df[
["title", "beginposition", "footprint"]
].iterrows():
# start date of the tiled S2 collection on the scihub server
tileddate = str_to_datetime("2016-11-01", "ymd")
img_title = row[0]
img_date = row[1].to_pydatetime()
img_footprint = loads(row[2])
for tile_name, tile_geom in self.tiles_to_keep_geom.items():
# in case of duplicates on the server
if (
img_title not in self.products_list["hubs"].keys()
and img_title not in self.products_list["aws"].keys()
):
# tiled products are downloaded on hubs
if re.match(r".*_T[0-9]{2}[A-Z]{3}_.*", img_title):
if tile_name in img_title:
self.products_list["hubs"][img_title] = {
"date": img_date,
"tile": tile_name,
}
continue
else:
continue
# non-tiled products will be downloaded on aws
else:
if tile_geom.intersects(img_footprint):
self.products_list["aws"][img_title] = {
"date": img_date,
"tile": tile_name,
}
# pprint dicts in chronological order
print("\nFrom AWS")
pprint(
list(
OrderedDict(
sorted(
self.products_list["aws"].items(),
key=lambda t: t[1]["date"],
)
)
)
)
print("\nFrom hubs")
pprint(
list(
OrderedDict(
sorted(
self.products_list["hubs"].items(),
key=lambda t: t[1]["date"],
)
)
)
)
# Tiled products request (lighter)
def _make_request_tiled_only(self) -> None:
"""Scihub API request using sentinelsat. This method is called if
products are a mix of tiled and non-tiled products."""
logger.debug("_make_request_tiled_only")
print("Sentinel2 tiles:\n", self.tiles_to_keep) print("Sentinel2 tiles:\n", self.tiles_to_keep)
products_from_hubs = dict() products = dict()
products_from_aws = dict()
# remove water-only tiles when land-only parameter is enabled # remove water-only tiles when land-only parameter is enabled
if self.land_only: if self.land_only:
...@@ -361,7 +226,7 @@ class DataRequest: ...@@ -361,7 +226,7 @@ class DataRequest:
products = OrderedDict() products = OrderedDict()
for tile in self.tiles_to_keep: for tile in self.tiles_to_keep:
kw = query_kwargs.copy() kw = query_kwargs.copy()
kw["filename"] = "*_T{}_*".format(tile) kw["filename"] = f"*_T{tile}_*"
pp = self.api.query(**kw) pp = self.api.query(**kw)
products.update(pp) products.update(pp)
...@@ -370,23 +235,21 @@ class DataRequest: ...@@ -370,23 +235,21 @@ class DataRequest:
if products_df.empty: if products_df.empty:
return return
# a products dictionnay for each server (AWS vs hubs)
# fill each dictionnary depending on the acquisition date # fill each dictionnary depending on the acquisition date
for index, row in products_df[["title", "beginposition"]].iterrows(): for index, row in products_df[["title", "beginposition"]].iterrows():
img_title = row[0] img_title = row[0]
img_date = row[1].to_pydatetime() img_date = row[1].to_pydatetime()
self.products_list["hubs"][img_title] = { self.products_list[img_title] = {
"date": img_date, "date": img_date,
"tile": re.findall("_T([0-9]{2}[A-Z]{3})_", img_title)[0], "tile": re.findall("_T([0-9]{2}[A-Z]{3})_", img_title)[0],
} }
# pprint dicts in chronological order # pprint dicts in chronological order
print("\nFrom hubs")
pprint( pprint(
list( list(
OrderedDict( OrderedDict(
sorted( sorted(
self.products_list["hubs"].items(), self.products_list.items(),
key=lambda t: t[1]["date"], key=lambda t: t[1]["date"],
) )
) )
......
...@@ -13,7 +13,6 @@ from datetime import datetime ...@@ -13,7 +13,6 @@ from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from collections import defaultdict from collections import defaultdict
from sentinelsat import SentinelAPI from sentinelsat import SentinelAPI
# from sentinelhub import AwsProductRequest
from pprint import pprint from pprint import pprint
# type annotations # type annotations
...@@ -31,38 +30,6 @@ logger = logging.getLogger(__name__) ...@@ -31,38 +30,6 @@ logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
# FIXME: AWS download no longer available
def download_aws(identifier: str, tile: str, hub: str) -> Tuple[str, bool]:
"""Downloads L1C safe from AWS using sentinelhub package.
:param identifier:
:param tile:
:param hub:
"""
logger.debug("download_aws {}".format(identifier))
temp_container = TempContainer(identifier, tile)
temp_container.create_temp_folder()
# downloaded = True
if (
not temp_container.l1c.path.exists()
and not temp_container.l1c.in_library
):
product_request = AwsProductRequest(
product_id=identifier,
tile_list=["T" + tile],
data_folder=str(temp_container.temp_path),
safe_format=True,
)
product_request.save_data()
# archive_l1c(identifier, tile, hub)
# if not temp_container.l1c.path.exists():
# downloaded = False
return identifier, hub
def download_peps( def download_peps(
identifier: str, tile: str, hub: str identifier: str, tile: str, hub: str
) -> Tuple[str, str, str, bool]: ) -> Tuple[str, str, str, bool]:
...@@ -214,7 +181,6 @@ class DownloadAndProcess: ...@@ -214,7 +181,6 @@ class DownloadAndProcess:
:param identifiers: a dictionnary containing products identifiers as keys. :param identifiers: a dictionnary containing products identifiers as keys.
:param hubs_limit: hub name as key, max of downloading threads as value. :param hubs_limit: hub name as key, max of downloading threads as value.
:param aws_limit: max of downloading threads.
:param process_products: process products after download. :param process_products: process products after download.
:param max_processes: number of parallel processes. :param max_processes: number of parallel processes.
:param indices_list: list of valid indices names that will be processed. :param indices_list: list of valid indices names that will be processed.
...@@ -233,7 +199,6 @@ class DownloadAndProcess: ...@@ -233,7 +199,6 @@ class DownloadAndProcess:
self, self,
identifiers: Dict[str, dict], identifiers: Dict[str, dict],
hubs_limit: dict = None, hubs_limit: dict = None,
aws_limit: int = None,
process_products: bool = False, process_products: bool = False,
max_processes: int = 2, max_processes: int = 2,
indices_list: list = [], indices_list: list = [],
...@@ -252,18 +217,6 @@ class DownloadAndProcess: ...@@ -252,18 +217,6 @@ class DownloadAndProcess:
else: else:
self.hubs_limit = hubs_limit self.hubs_limit = hubs_limit
if aws_limit is None:
self.aws_limit = 3
logger.debug("aws_limit set to: {}".format(self.aws_limit))
else:
if aws_limit > 3:
self.aws_limit = 3
logger.warning(
"aws limit too high, using default: {}".format(
self.aws_limit
)
)
if not isinstance(process_products, bool): if not isinstance(process_products, bool):
raise ValueError("process_product must be either True or False") raise ValueError("process_product must be either True or False")
else: else:
...@@ -339,7 +292,7 @@ class DownloadAndProcess: ...@@ -339,7 +292,7 @@ class DownloadAndProcess:
while len(tasks) < limit and self.queue.qsize() > 0: while len(tasks) < limit and self.queue.qsize() > 0:
logger.debug("tasks < limit") logger.debug("tasks < limit")
item = await self.queue.get() item = await self.queue.get()
tile = self.identifiers["hubs"][item]["tile"] tile = self.identifiers[item]["tile"]
task = asyncio.ensure_future( task = asyncio.ensure_future(
self.downloader_hubs(item, tile, hub) self.downloader_hubs(item, tile, hub)
) )
...@@ -412,25 +365,6 @@ class DownloadAndProcess: ...@@ -412,25 +365,6 @@ class DownloadAndProcess:
return (identifier, tile, hub) return (identifier, tile, hub)
# FIXME: AWS download no longer available
async def downloader_aws(
self, identifier: str, tile: str, hub: str
) -> Tuple[str, str, str]:
"""coroutine appelant la coroutine de téléchargement sur aws"""
logger.debug("downloader_aws {}".format(identifier))
async with asyncio.Semaphore(self.aws_limit):
fut = self.loop.run_in_executor(
self.threads_executor,
functools.partial(download_aws, identifier, tile, hub),
)
await fut
logger.info(
"--> --> {} downloaded from {}".format(identifier, hub)
)
return (identifier, tile, hub)
async def archive_l1c( async def archive_l1c(
self, identifier: str, tile: str, hub: str self, identifier: str, tile: str, hub: str
) -> Tuple[str, str, str]: ) -> Tuple[str, str, str]:
...@@ -479,38 +413,13 @@ class DownloadAndProcess: ...@@ -479,38 +413,13 @@ class DownloadAndProcess:
return identifier, tile, hub return identifier, tile, hub
# FIXME: AWS download no longer available
async def download_process_aws(
self, identifier: str
) -> Tuple[str, str, str]:
"""
Coroutine for downloading and processing products from AWS.
:param identifier:
"""
logger.info("download_process_aws")
tile = self.identifiers["aws"][identifier]["tile"]
downloads = await asyncio.ensure_future(
self.downloader_aws(identifier, tile, "aws")
)
if self.process_products:
fut = self.process(*downloads)
# if fut.cancelled():
# return fut
await fut
return downloads
async def main(self) -> None: async def main(self) -> None:
"""Main coroutine. Starts proxies and fills the queue.""" """Main coroutine. Starts proxies and fills the queue."""
logger.debug("main") logger.debug("main")
identifiers_hubs = self.identifiers
identifiers_aws = self.identifiers["aws"]
identifiers_hubs = self.identifiers["hubs"]
print("Tiled: ", len(identifiers_hubs)) print("Tiled: ", len(identifiers_hubs))
print("Non tiled: ", len(identifiers_aws))
tasks = [] tasks = []
# on lance les proxies # on lance les proxies
...@@ -530,14 +439,6 @@ class DownloadAndProcess: ...@@ -530,14 +439,6 @@ class DownloadAndProcess:
# on attend que tout se termine # on attend que tout se termine
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
# on lance les DL aws sur la bonne liste
if identifiers_aws:
aws = [
asyncio.ensure_future(self.download_process_aws(identifier))
for (identifier, data) in identifiers_aws.items()
]
await asyncio.gather(*aws)
# shutting down executors # shutting down executors
try: try:
self.threads_executor.shutdown(wait=True) self.threads_executor.shutdown(wait=True)
...@@ -560,11 +461,7 @@ class DownloadAndProcess: ...@@ -560,11 +461,7 @@ class DownloadAndProcess:
try: try:
self.loop.run_until_complete(self.main()) self.loop.run_until_complete(self.main())
end_time = datetime.now() - start_time end_time = datetime.now() - start_time
total_products = ( total_products = len(self.identifiers) - len(self.failed_products)
len(self.identifiers["aws"])
+ len(self.identifiers["hubs"])
- len(self.failed_products)
)
logger.info( logger.info(
"Downloaded and processed {} file(s) in {}".format( "Downloaded and processed {} file(s) in {}".format(
total_products, end_time total_products, end_time
......
...@@ -1043,10 +1043,6 @@ class L2aProduct(Product): ...@@ -1043,10 +1043,6 @@ class L2aProduct(Product):
def thin_cirrus_percentage(self): def thin_cirrus_percentage(self):
return self._get_metadata_value(key="THIN_CIRRUS_PERCENTAGE") return self._get_metadata_value(key="THIN_CIRRUS_PERCENTAGE")
@property
def cloud_coverage_percentage(self):
return self._get_metadata_value(key="CLOUD_COVERAGE_PERCENTAGE")
# FOOTPRINT # FOOTPRINT
@property @property
def footprint(self) -> List[Tuple[float, float]]: def footprint(self) -> List[Tuple[float, float]]:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment