Skip to content
Snippets Groups Projects
automatization.py 8.29 KiB
Newer Older
Jeremy Commins's avatar
Jeremy Commins committed
# -*- coding: utf-8 -*-

"""
Module for automatized downloading and processing of new Sentinel-2 images.
"""

import logging
import pathlib
import pandas as pd
import numpy as np
from pathlib import Path
from pprint import pprint
from datetime import datetime, timedelta
# type annotations
from typing import List, Set, Dict, Tuple, Optional

from .config import Config, SHARED_DATA
from .data_request import DataRequest
from .download_and_process import DownloadAndProcess
from .time_series import TimeSeries
from .utils import datetime_to_str, str_to_datetime
from .library import Library
from .tiles import Tile


logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)


class Automatization:
    """Automatization

    """
    _csv_path = Config().tiles_to_watch

    def __init__(self) -> None:
        """
        """
        self._df = None
        self._tiles_to_update = dict()
        self._products_list = {"hubs": {}, "aws": {}}

        if not self._csv_path.exists():
            self._init()
        else:
            self._read_csv()
            self._update_df()


    def _init(self):
        """
        """
        self._create_df()
        self._load_library()
        self._save_csv()


    def _create_df(self):
        """
        """
        self._df = pd.DataFrame(index=Library().l1c,
                          columns=["start_date",
                                   "last_date",
                                   "ignore",
                                   "tags"])
        self._df.index.name = "tile"


    def _read_csv(self):
        """
        """
        self._df = pd.read_csv(self._csv_path,
                               sep=",",
                               converters={"tile": str.strip,
                                           "start_date": str.strip,
                                           "last_date": str.strip,
                                           "ignore": str.strip,
                                           "tags": str.strip},
                               index_col="tile",
                               na_values="")

        self._df["start_date"] = pd.to_datetime(self._df["start_date"], format="%Y-%m-%d")
        self._df["last_date"] = pd.to_datetime(self._df["last_date"], format="%Y-%m-%d")
        # bug sur replace:
        #self._df.replace(r"^\s*$", np.nan, regex=True, inplace=True)
        # -> workaround:
        for c in self._df.select_dtypes(include=["object"]).columns:
            self._df[c] = self._df[c].replace(r"^\s*$", np.nan, regex=True, inplace=True)


    def _save_csv(self):
        """
        """
        logger.info("Saving database")
        self._df.to_csv(str(self._csv_path))


    def _load_library(self):
        """
        """
        logger.info("Scanning L1C library")
        for tile in Library().l1c:
            if tile not in self._df.index:
                self._df.loc[tile] = [None, None, None, None]
        self._update_df()


    def _update_df(self):
        """
        """
        logger.info("Updating database")
        for tile in self._df.index:
            if not pd.isnull(self._df.loc[tile, "ignore"]):
                continue
            self.get_tile_last_date(tile)


    def get_tile_last_date(self, tile):
        """
        """
        self._df.loc[tile, "last_date"] = Tile(tile).l1c.last.date


    def get_tile_request_date(self, tile):
        """
        """
        start_date = self._df.loc[tile, "start_date"]
        last_date = self._df.loc[tile, "last_date"]

        if pd.isnull(start_date) and pd.isnull(last_date):
            request_date = None

        elif not pd.isnull(start_date) and pd.isnull(last_date):
            request_date = start_date

        elif pd.isnull(start_date) and not pd.isnull(last_date):
            request_date = last_date

        elif not pd.isnull(start_date) and not pd.isnull(last_date):
            request_date = start_date if start_date > last_date else last_date

        # comment
        request_date = request_date + timedelta(days=1) if request_date else None
        return request_date


    def _get_tiles_to_update(self, tiles_list=[]):
        """
        """
        if not tiles_list:
            tiles = self._df.index
        else:
            tiles = tiles_list

        for tile in tiles:
            if not pd.isnull(self._df.loc[tile, "ignore"]):
                continue
            request_date = self.get_tile_request_date(tile)
            self._tiles_to_update[tile] = request_date


    @staticmethod
    def _five_days_since_date(date):
        """
        """
        time_period = datetime.today() - date
        if time_period.days < 4:
            return False
        return True


    def _get_products_list(self) -> None:
        """
        merge each request dict into a single one
        """
        for tile, request_date in self._tiles_to_update.items():

            # Don't overload the server with useless requests :
            # Sentinel-2 revisit time is 5 days
            if request_date:
                if not Automatization._five_days_since_date(request_date):
                    logger.info("Too early to check {}".format(tile))
                    continue

            logger.info("Checking tile: {}".format(tile))
            request = DataRequest(start_date=request_date if request_date else None,
                                  end_date=None)
            request.from_tiles([tile])

            self._products_list["hubs"].update(request.products_list["hubs"])
            self._products_list["aws"].update(request.products_list["aws"])


    def get_tiles_from_tags(self, tags=()):
        """
        """
        tiles_set = set()
        for tile in self._df.index:
            tile_tags = self._df.loc[tile, "tags"]
            if not pd.isnull(tile_tags):
                for tag in tags:
                    if tag in self._df.loc[tile,"tags"].split():
                        tiles_set.add(tile)
        return tiles_set


    @staticmethod
    def _get_ignored_tiles(self):
        """
        """
        return self._df[self._df["ignore"].notna()].index.values


    def run(self,
            tiles: List[str] = [], process_products: bool = False,
            indices_list: List[str] = [],
            nodata_clouds=True, quicklook=True) -> None:
        """
        check new
        """
        logger.info("Running automatization")
        logger.info("Ignored tiles: {}".format(self._get_ignored_tiles(self)))
        self._get_tiles_to_update(tiles_list=tiles)
        self._get_products_list()
        if any(self._products_list.values()):
            prods = DownloadAndProcess(identifiers=self._products_list,
                                       hubs_limit={"peps":3, "scihub":2},
                                       aws_limit=2,
                                       process_products=process_products,
                                       max_processes=3,
                                       indices_list=indices_list,
                                       nodata_clouds=nodata_clouds,
                                       quicklook=quicklook)
            failed = prods.failed_products
            if failed:
                print(failed)
            self._update_df()
            self._save_csv()


    @property
    def data(self):
        return self._df


    @property
    def products(self):
        return self._products_list


class TimeSeriesAutomatization:
    """ automatization traitement séries temporelles

    """
    _time_series_path = Path(Config().get("time_series_path"))

    def __init__(self) -> None:
        """
        description
        """
        self._vectors_files = list()
        self._list_files()

    def _list_files(self) -> None:
        """
        """
        valid_types = ("*.geojson", "*.shp")
        for valid_type in valid_types:
            self._vectors_files.extend(list(self._time_series_path.glob(valid_type)))

    def run(self, indices=[]) -> None:
        """
        """
        for vectors_file in self._vectors_files:
            logger.info("Processing: {}".format(vectors_file.name))
            #TimeSeries(start_date=datetime.strptime("20150101", "%Y%m%d"),
            ts =TimeSeries(date_min=None,
                           date_max=None,
                           vectors_file=str(vectors_file),
                           indices=["NDVI"])
            ts.to_csv()





#TimeSeriesAutomatization().run()