"README.md" did not exist on "62619e4d80d03fc9bdbe8ca3215fda675bec8dcb"
Newer
Older
Module for automatized downloading, processing and time series computing of new
Sentinel-2 images.
import time
import logging
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from typing import Sequence, List, Set, Dict, Union, Tuple
from .data_request import DataRequest
from .download_and_process import DownloadAndProcess
from .time_series import TimeSeries
from .library import Library
from .tiles import Tile
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
class Automatization:
"""Class to automate the downloading and processing of Sentinel-2 images.
At first launch it will scan L1C tiles in the library, create a new CSV
file in ``~/sen2chain/config/tiles_to_watch.csv``, and update the file.
If the CSV file already exists, it will read it and download and process
new images for the tiles listed in that file.
_csv_path = Config().tiles_to_watch
def __init__(self) -> None:
self._df = None
self._tiles_to_update = dict()
self._products_list = {"hubs": {}, "aws": {}}
if not self._csv_path.exists():
Jeremy Commins
committed
logger.info("Creating tiles_to_watch file")
Jeremy Commins
committed
logger.info("tiles_to_watch found")
self._create_df()
self._load_library()
self._save_csv()
def _create_df(self) -> None:
"""Creates a pandas dataframe."""
self._df = pd.DataFrame(
index=Library().l1c,
columns=["start_date", "last_date", "ignore", "tags"],
)
def _read_csv(self) -> None:
"""Reads the csv file."""
self._df = pd.read_csv(
self._csv_path,
sep=",",
converters={
"tile": str.strip,
"start_date": str.strip,
"last_date": str.strip,
"ignore": str.strip,
"tags": str.strip,
},
index_col="tile",
na_values="",
)
self._df["start_date"] = pd.to_datetime(
self._df["start_date"], format="%Y-%m-%d"
)
self._df["last_date"] = pd.to_datetime(
self._df["last_date"], format="%Y-%m-%d"
)
self._df.replace(r"^\s*$", np.nan, regex=True, inplace=True)
# for c in self._df.select_dtypes(include=["object"]).columns:
# self._df[c] = self._df[c].replace(r"^\s*$", np.nan, regex=True, inplace=True)
def _save_csv(self) -> None:
"""Saves the dataframe to CSV."""
logger.info("Saving database")
self._df.to_csv(str(self._csv_path))
def _load_library(self) -> None:
"""Loads library L1C tiles in the dataframe."""
logger.info("Scanning L1C library")
for tile in Library().l1c:
if tile not in self._df.index:
self._df.loc[tile] = [None, None, None, None]
self._update_df()
def _update_df(self) -> None:
"""Updates dataframe's tiles last dates."""
logger.info("Updating database")
for tile in self._df.index:
if not pd.isnull(self._df.loc[tile, "ignore"]):
continue
def _get_tile_last_date(self, tile: str) -> None:
"""Updates a dataframe's tile last date.
:param tile: tile which the date will be updated.
"""
self._df.loc[tile, "last_date"] = Tile(tile).l1c.last.date
def _get_tile_request_date(self, tile: str) -> Union[datetime, None]:
"""For a tile, returns the date that will be used for the request.
:param tile: tile for which the request date will be computed.
"""
start_date = self._df.loc[tile, "start_date"]
last_date = self._df.loc[tile, "last_date"]
if not pd.isnull(start_date) and pd.isnull(last_date):
request_date = start_date
elif pd.isnull(start_date) and not pd.isnull(last_date):
request_date = last_date
elif not pd.isnull(start_date) and not pd.isnull(last_date):
request_date = start_date if start_date > last_date else last_date
if request_date:
request_date = request_date + timedelta(days=1)
def _get_tiles_to_update(self, tiles_list: List[str] = None) -> None:
"""Get the list of the tiles to update and assign a request date
for each one.
:param tiles_list: tiles to add.
"""
if not tiles_list:
tiles = self._df.index
else:
tiles = tiles_list
for tile in tiles:
if not pd.isnull(self._df.loc[tile, "ignore"]):
continue
request_date = self._get_tile_request_date(tile)
Jeremy Commins
committed
def _ndays_since_date(date: datetime, ndays: int) -> bool:
"""Checks if at least ndays have passed since the date.
Jeremy Commins
committed
:param ndays: number of days.
Jeremy Commins
committed
if time_period.days < ndays:
Jeremy Commins
committed
def _get_products_list(self, revisit_period) -> None:
"""Merge each request dict into a single one."""
for tile, request_date in self._tiles_to_update.items():
# Don't overload the server with useless requests :
# Sentinel-2 revisit time is 5 days
if request_date:
if not Automatization._ndays_since_date(
request_date, revisit_period
):
logger.info("Too early to check {}".format(tile))
continue
logger.info("Checking tile: {}".format(tile))
request = DataRequest(
start_date=request_date if request_date else None,
end_date=None,
)
request.from_tiles([tile])
self._products_list["hubs"].update(request.products_list["hubs"])
self._products_list["aws"].update(request.products_list["aws"])
@staticmethod
def _get_ignored_tiles(self) -> np.ndarray:
"""Returns ignored tiles."""
return self._df[self._df["ignore"].notna()].index.values
def run(
self,
tiles: List[str] = None,
process_products: bool = False,
indices_list: List[str] = None,
nodata_clouds: bool = True,
quicklook: bool = True,
hubs_limit: Dict[str, int] = None,
revisit_period: int = 2,
) -> None:
Runs automatization.
:param tiles: tiles that will be updated. If none, all the non ignored.
:param process_products: process products after download.
:param indices_list: list of valid indices names that will be processed.
:param nodata_clouds: mask indices output rasters with a cloud-mask.
:param quicklook: creates a quicklook for each indice processed.
:param revisit_period: number of days, since last date, to check again
for new images.
"""
logger.info("Running automatization")
logger.info("Ignored tiles: {}".format(self._get_ignored_tiles(self)))
if tiles is None:
tiles = []
if indices_list is None:
indices_list = []
if hubs_limit is None:
hubs_limit = {"peps": 3, "scihub": 2}
Jeremy Commins
committed
self._get_products_list(revisit_period)
prods = DownloadAndProcess(
identifiers=self._products_list,
hubs_limit=hubs_limit,
aws_limit=2,
process_products=process_products,
max_processes=3,
indices_list=indices_list,
nodata_clouds=nodata_clouds,
quicklook=quicklook,
)
failed = prods.failed_products
if failed:
print(failed)
# When working on a local network storage, pause the process in
# order to let the file to be checked by the filesystem (lags).
time.sleep(2)
Jeremy Commins
committed
self._update_df()
self._save_csv()
def get_tiles_from_tags(self, tags: Sequence[str] = ()) -> Set[str]:
:param tags: tags.
"""
tiles_set = set()
for tile in self._df.index:
tile_tags = self._df.loc[tile, "tags"]
if not pd.isnull(tile_tags):
for tag in tags:
if tag in self._df.loc[tile, "tags"].split():
def data(self) -> pd.DataFrame:
"""Returns the automatization dataframe."""
def products(self) -> Dict[str, dict]:
"""Returns the products to download on AWS and on hubs."""
return self._products_list
class TimeSeriesAutomatization:
Scans vectors files in the Time Series folder and computes a time series
extraction for each of the files.
_time_series_path = Path(Config().get("time_series_path"))
def __init__(self) -> None:
self._vectors_files = list()
def _list_vectors_files(self) -> None:
"""Scans vectors files found in the TIME_SERIES folder."""
valid_types = (
"*.geojson",
"*.gpkg",
"*.shp",
) # type: Tuple[str, ...]
self._vectors_files.extend(
list(self._time_series_path.glob(valid_type))
)
def run(self, indices: Sequence[str] = ("NDVI",)) -> None:
"""Computes time series extraction for each indice and for each
of the vectors file.
:param indices: list of valid indices names that will be processed.
"""
for vectors_file in self._vectors_files:
logger.info("Processing: {}".format(vectors_file.name))
ts = TimeSeries(
date_min=None,
date_max=None,
vectors_file=str(vectors_file),
indices=indices,
)