Skip to content
Snippets Groups Projects
automatization.py 10.5 KiB
Newer Older
# coding: utf-8
Jeremy Commins's avatar
Jeremy Commins committed

"""
Module for automatized downloading, processing and time series computing of new 
Sentinel-2 images.
Jeremy Commins's avatar
Jeremy Commins committed
"""
Jeremy Commins's avatar
Jeremy Commins committed
import logging
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
Jeremy Commins's avatar
Jeremy Commins committed
# type annotations
from typing import Sequence, List, Set, Dict, Union, Tuple
Jeremy Commins's avatar
Jeremy Commins committed

from .config import Config
Jeremy Commins's avatar
Jeremy Commins committed
from .data_request import DataRequest
from .download_and_process import DownloadAndProcess
from .time_series import TimeSeries
from .library import Library
from .tiles import Tile

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


class Automatization:
Jeremy Commins's avatar
Jeremy Commins committed
    """Class to automate the downloading and processing of Sentinel-2 images.
Jeremy Commins's avatar
Jeremy Commins committed

    At first launch it will scan L1C tiles in the library, create a new CSV
    file in ``~/sen2chain/config/tiles_to_watch.csv``, and update the file.
    If the CSV file already exists, it will read it and download and process
    new images for the tiles listed in that file.
Jeremy Commins's avatar
Jeremy Commins committed
    """
Jeremy Commins's avatar
Jeremy Commins committed
    _csv_path = Config().tiles_to_watch

    def __init__(self) -> None:
        self._df = None
        self._tiles_to_update = dict()
        self._products_list = {"hubs": {}, "aws": {}}

        if not self._csv_path.exists():
            logger.info("Creating tiles_to_watch file")
Jeremy Commins's avatar
Jeremy Commins committed
            self._init()
        else:
Jeremy Commins's avatar
Jeremy Commins committed
            self._read_csv()
            self._update_df()

Jeremy Commins's avatar
Jeremy Commins committed
    def _init(self) -> None:
        """First launch."""
Jeremy Commins's avatar
Jeremy Commins committed
        self._create_df()
        self._load_library()
        self._save_csv()

Jeremy Commins's avatar
Jeremy Commins committed
    def _create_df(self) -> None:
        """Creates a pandas dataframe."""
        self._df = pd.DataFrame(
            index=Library().l1c,
            columns=["start_date", "last_date", "ignore", "tags"],
        )
Jeremy Commins's avatar
Jeremy Commins committed
        self._df.index.name = "tile"

Jeremy Commins's avatar
Jeremy Commins committed
    def _read_csv(self) -> None:
        """Reads the csv file."""
        self._df = pd.read_csv(
            self._csv_path,
            sep=",",
            converters={
                "tile": str.strip,
                "start_date": str.strip,
                "last_date": str.strip,
                "ignore": str.strip,
                "tags": str.strip,
            },
            index_col="tile",
            na_values="",
        )

        self._df["start_date"] = pd.to_datetime(
            self._df["start_date"], format="%Y-%m-%d"
        )
        self._df["last_date"] = pd.to_datetime(
            self._df["last_date"], format="%Y-%m-%d"
        )
Jeremy Commins's avatar
Jeremy Commins committed
        # bug sur replace:
Jeremy Commins's avatar
Jeremy Commins committed
        self._df.replace(r"^\s*$", np.nan, regex=True, inplace=True)
Jeremy Commins's avatar
Jeremy Commins committed
        # -> workaround:
        # for c in self._df.select_dtypes(include=["object"]).columns:
        #     self._df[c] = self._df[c].replace(r"^\s*$", np.nan, regex=True, inplace=True)
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
    def _save_csv(self) -> None:
        """Saves the dataframe to CSV."""
Jeremy Commins's avatar
Jeremy Commins committed
        logger.info("Saving database")
        self._df.to_csv(str(self._csv_path))

Jeremy Commins's avatar
Jeremy Commins committed
    def _load_library(self) -> None:
        """Loads library L1C tiles in the dataframe."""
Jeremy Commins's avatar
Jeremy Commins committed
        logger.info("Scanning L1C library")
        for tile in Library().l1c:
            if tile not in self._df.index:
                self._df.loc[tile] = [None, None, None, None]
        self._update_df()

Jeremy Commins's avatar
Jeremy Commins committed
    def _update_df(self) -> None:
        """Updates dataframe's tiles last dates."""
Jeremy Commins's avatar
Jeremy Commins committed
        logger.info("Updating database")
        for tile in self._df.index:
            if not pd.isnull(self._df.loc[tile, "ignore"]):
                continue
Jeremy Commins's avatar
Jeremy Commins committed
            self._get_tile_last_date(tile)
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
    def _get_tile_last_date(self, tile: str) -> None:
        """Updates a dataframe's tile last date.

        :param tile: tile which the date will be updated.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        self._df.loc[tile, "last_date"] = Tile(tile).l1c.last.date

    def _get_tile_request_date(self, tile: str) -> Union[datetime, None]:
Jeremy Commins's avatar
Jeremy Commins committed
        """For a tile, returns the date that will be used for the request.

        :param tile: tile for which the request date will be computed.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        start_date = self._df.loc[tile, "start_date"]
        last_date = self._df.loc[tile, "last_date"]
        request_date = None
Jeremy Commins's avatar
Jeremy Commins committed

        if not pd.isnull(start_date) and pd.isnull(last_date):
Jeremy Commins's avatar
Jeremy Commins committed
            request_date = start_date
        elif pd.isnull(start_date) and not pd.isnull(last_date):
            request_date = last_date
        elif not pd.isnull(start_date) and not pd.isnull(last_date):
            request_date = start_date if start_date > last_date else last_date

        if request_date:
            request_date = request_date + timedelta(days=1)
Jeremy Commins's avatar
Jeremy Commins committed
        return request_date

    def _get_tiles_to_update(self, tiles_list: List[str] = None) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        """Get the list of the tiles to update and assign a request date
        for each one.

        :param tiles_list: tiles to add.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        if not tiles_list:
            tiles = self._df.index
        else:
            tiles = tiles_list

        for tile in tiles:
            if not pd.isnull(self._df.loc[tile, "ignore"]):
                continue
Jeremy Commins's avatar
Jeremy Commins committed
            request_date = self._get_tile_request_date(tile)
Jeremy Commins's avatar
Jeremy Commins committed
            self._tiles_to_update[tile] = request_date
        return None
Jeremy Commins's avatar
Jeremy Commins committed

    @staticmethod
    def _ndays_since_date(date: datetime, ndays: int) -> bool:
        """Checks if at least ndays have passed since the date.
Jeremy Commins's avatar
Jeremy Commins committed

        :param date: date to check.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        time_period = datetime.today() - date
Jeremy Commins's avatar
Jeremy Commins committed
            return False
        return True

    def _get_products_list(self, revisit_period) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        """Merge each request dict into a single one."""
Jeremy Commins's avatar
Jeremy Commins committed
        for tile, request_date in self._tiles_to_update.items():

            # Don't overload the server with useless requests :
            # Sentinel-2 revisit time is 5 days
            if request_date:
                if not Automatization._ndays_since_date(
                    request_date, revisit_period
                ):
Jeremy Commins's avatar
Jeremy Commins committed
                    logger.info("Too early to check {}".format(tile))
                    continue

            logger.info("Checking tile: {}".format(tile))
            request = DataRequest(
                start_date=request_date if request_date else None,
                end_date=None,
            )
Jeremy Commins's avatar
Jeremy Commins committed
            request.from_tiles([tile])

            self._products_list["hubs"].update(request.products_list["hubs"])
            self._products_list["aws"].update(request.products_list["aws"])

    @staticmethod
Jeremy Commins's avatar
Jeremy Commins committed
    def _get_ignored_tiles(self) -> np.ndarray:
        """Returns ignored tiles."""
Jeremy Commins's avatar
Jeremy Commins committed
        return self._df[self._df["ignore"].notna()].index.values

    def run(
        self,
        tiles: List[str] = None,
        process_products: bool = False,
        indices_list: List[str] = None,
        nodata_clouds: bool = True,
        quicklook: bool = True,
        hubs_limit: Dict[str, int] = None,
        revisit_period: int = 2,
    ) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        """
Jeremy Commins's avatar
Jeremy Commins committed
        Runs automatization.

        :param tiles: tiles that will be updated. If none, all the non ignored.
        :param process_products: process products after download.
        :param indices_list: list of valid indices names that will be processed.
        :param nodata_clouds: mask indices output rasters with a cloud-mask.
        :param quicklook: creates a quicklook for each indice processed.
        :param revisit_period: number of days, since last date, to check again
            for new images.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        logger.info("Running automatization")
        logger.info("Ignored tiles: {}".format(self._get_ignored_tiles(self)))

        if tiles is None:
            tiles = []
        if indices_list is None:
            indices_list = []
        if hubs_limit is None:
            hubs_limit = {"peps": 3, "scihub": 2}
Jeremy Commins's avatar
Jeremy Commins committed
        self._get_tiles_to_update(tiles_list=tiles)
Jeremy Commins's avatar
Jeremy Commins committed
        if any(self._products_list.values()):
            prods = DownloadAndProcess(
                identifiers=self._products_list,
                hubs_limit=hubs_limit,
                aws_limit=2,
                process_products=process_products,
                max_processes=3,
                indices_list=indices_list,
                nodata_clouds=nodata_clouds,
                quicklook=quicklook,
            )
Jeremy Commins's avatar
Jeremy Commins committed
            failed = prods.failed_products
            if failed:
                print(failed)
        # When working on a local network storage, pause the process in
        # order to let the file to be checked by the filesystem (lags).
        time.sleep(2)
Jeremy Commins's avatar
Jeremy Commins committed

    def get_tiles_from_tags(self, tags: Sequence[str] = ()) -> Set[str]:
Jeremy Commins's avatar
Jeremy Commins committed
        """
        Returns tiles corresponding to tags.
Jeremy Commins's avatar
Jeremy Commins committed

        :param tags: tags.
        """
        tiles_set = set()
        for tile in self._df.index:
            tile_tags = self._df.loc[tile, "tags"]
            if not pd.isnull(tile_tags):
                for tag in tags:
                    if tag in self._df.loc[tile, "tags"].split():
Jeremy Commins's avatar
Jeremy Commins committed
                        tiles_set.add(tile)
        return tiles_set

Jeremy Commins's avatar
Jeremy Commins committed
    @property
Jeremy Commins's avatar
Jeremy Commins committed
    def data(self) -> pd.DataFrame:
        """Returns the automatization dataframe."""
Jeremy Commins's avatar
Jeremy Commins committed
        return self._df

    @property
    def products(self) -> Dict[str, dict]:
Jeremy Commins's avatar
Jeremy Commins committed
        """Returns the products to download on AWS and on hubs."""
Jeremy Commins's avatar
Jeremy Commins committed
        return self._products_list


class TimeSeriesAutomatization:
Jeremy Commins's avatar
Jeremy Commins committed
    """Time series automatization.
Jeremy Commins's avatar
Jeremy Commins committed

    Scans vectors files in the Time Series folder and computes a time series
    extraction for each of the files.
Jeremy Commins's avatar
Jeremy Commins committed
    """
Jeremy Commins's avatar
Jeremy Commins committed
    _time_series_path = Path(Config().get("time_series_path"))

    def __init__(self) -> None:
        self._vectors_files = list()
Jeremy Commins's avatar
Jeremy Commins committed
        self._list_vectors_files()
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
    def _list_vectors_files(self) -> None:
        """Scans vectors files found in the TIME_SERIES folder."""
        valid_types = (
            "*.geojson",
            "*.gpkg",
            "*.shp",
        )  # type: Tuple[str, ...]
Jeremy Commins's avatar
Jeremy Commins committed
        for valid_type in valid_types:
            self._vectors_files.extend(
                list(self._time_series_path.glob(valid_type))
            )
Jeremy Commins's avatar
Jeremy Commins committed

    def run(self, indices: Sequence[str] = ("NDVI",)) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        """Computes time series extraction for each indice and for each
        of the vectors file.

        :param indices: list of valid indices names that will be processed.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        for vectors_file in self._vectors_files:
            logger.info("Processing: {}".format(vectors_file.name))
            ts = TimeSeries(
                date_min=None,
                date_max=None,
                vectors_file=str(vectors_file),
                indices=indices,
            )
Jeremy Commins's avatar
Jeremy Commins committed
            ts.to_csv()