Skip to content
Snippets Groups Projects
Commit 313c820f authored by pascal.mouquet_ird.fr's avatar pascal.mouquet_ird.fr
Browse files

Merge branch 'code-formatting' into 'master'

Format code with Black

See merge request espace-dev/sen2chain!5
parents 6d458acb a4711f0b
No related branches found
No related tags found
No related merge requests found
Pipeline #702 canceled
Showing
with 5299 additions and 3329 deletions
# -*- coding: utf-8 -*-
# coding: utf-8
# Copyright (C) 2018 Jeremy Commins <jebins@openmailbox.org>
# Copyright (C) 2018 Jeremy Commins <jebins@laposte.net>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
......@@ -21,18 +21,42 @@ This module lists all externally useful classes and functions.
from .config import Config
from .tiles import Tile
from .products import L1cProduct, L2aProduct, OldCloudMaskProduct, NewCloudMaskProduct, IndiceProduct
from .products import (
L1cProduct,
L2aProduct,
OldCloudMaskProduct,
NewCloudMaskProduct,
IndiceProduct,
)
from .library import Library
from .data_request import DataRequest
from .indices import IndicesCollection
from .download_and_process import DownloadAndProcess
from .time_series import TimeSeries
from .automatization import Automatization
from .utils import format_word, grouper, datetime_to_str, str_to_datetime, human_size_decimal, human_size, get_current_Sen2Cor_version
from .geo_utils import serialise_tiles_index, get_processed_indices_vect, crop_product_by_shp
from .multi_processing import l2a_multiprocessing, cld_version_probability_iterations_reprocessing_multiprocessing, idx_multiprocessing
from .utils import (
format_word,
grouper,
datetime_to_str,
str_to_datetime,
human_size_decimal,
human_size,
get_current_Sen2Cor_version,
)
from .geo_utils import (
serialise_tiles_index,
get_processed_indices_vect,
crop_product_by_shp,
)
from .multi_processing import (
l2a_multiprocessing,
cld_version_probability_iterations_reprocessing_multiprocessing,
idx_multiprocessing,
)
from .tileset import TileSet
from .jobs import Jobs, Job
__version__ = "0.7.0"
__author__ = "Jérémy Commins <jebins@openmailbox.org> & Impact <pascal.mouquet@ird.fr>"
__author__ = (
"Jérémy Commins <jebins@laposte.net> & Impact <pascal.mouquet@ird.fr>"
)
# -*- coding: utf-8 -*-
# coding: utf-8
"""
Module for automatized downloading, processing and time series computing of new Sentinel-2 images.
Module for automatized downloading, processing and time series computing of new
Sentinel-2 images.
"""
import time
import logging
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
# type annotations
from typing import Sequence, List, Set, Dict, Union, Tuple
......@@ -26,11 +29,12 @@ logging.basicConfig(level=logging.INFO)
class Automatization:
"""Class to automate the downloading and processing of Sentinel-2 images.
At first launch it will scan L1C tiles in the library, create a new CSV file
in ``~/sen2chain/config/tiles_to_watch.csv``, and update the file.
If the CSV file already exists, it will read it and download and process new images for
the tiles listed in that file.
At first launch it will scan L1C tiles in the library, create a new CSV
file in ``~/sen2chain/config/tiles_to_watch.csv``, and update the file.
If the CSV file already exists, it will read it and download and process
new images for the tiles listed in that file.
"""
_csv_path = Config().tiles_to_watch
def __init__(self) -> None:
......@@ -54,32 +58,39 @@ class Automatization:
def _create_df(self) -> None:
"""Creates a pandas dataframe."""
self._df = pd.DataFrame(index=Library().l1c,
columns=["start_date",
"last_date",
"ignore",
"tags"])
self._df = pd.DataFrame(
index=Library().l1c,
columns=["start_date", "last_date", "ignore", "tags"],
)
self._df.index.name = "tile"
def _read_csv(self) -> None:
"""Reads the csv file."""
self._df = pd.read_csv(self._csv_path,
sep=",",
converters={"tile": str.strip,
"start_date": str.strip,
"last_date": str.strip,
"ignore": str.strip,
"tags": str.strip},
index_col="tile",
na_values="")
self._df["start_date"] = pd.to_datetime(self._df["start_date"], format="%Y-%m-%d")
self._df["last_date"] = pd.to_datetime(self._df["last_date"], format="%Y-%m-%d")
self._df = pd.read_csv(
self._csv_path,
sep=",",
converters={
"tile": str.strip,
"start_date": str.strip,
"last_date": str.strip,
"ignore": str.strip,
"tags": str.strip,
},
index_col="tile",
na_values="",
)
self._df["start_date"] = pd.to_datetime(
self._df["start_date"], format="%Y-%m-%d"
)
self._df["last_date"] = pd.to_datetime(
self._df["last_date"], format="%Y-%m-%d"
)
# bug sur replace:
self._df.replace(r"^\s*$", np.nan, regex=True, inplace=True)
# -> workaround:
# for c in self._df.select_dtypes(include=["object"]).columns:
# self._df[c] = self._df[c].replace(r"^\s*$", np.nan, regex=True, inplace=True)
# self._df[c] = self._df[c].replace(r"^\s*$", np.nan, regex=True, inplace=True)
def _save_csv(self) -> None:
"""Saves the dataframe to CSV."""
......@@ -166,13 +177,17 @@ class Automatization:
# Don't overload the server with useless requests :
# Sentinel-2 revisit time is 5 days
if request_date:
if not Automatization._ndays_since_date(request_date, revisit_period):
if not Automatization._ndays_since_date(
request_date, revisit_period
):
logger.info("Too early to check {}".format(tile))
continue
logger.info("Checking tile: {}".format(tile))
request = DataRequest(start_date=request_date if request_date else None,
end_date=None)
request = DataRequest(
start_date=request_date if request_date else None,
end_date=None,
)
request.from_tiles([tile])
self._products_list["hubs"].update(request.products_list["hubs"])
......@@ -183,15 +198,16 @@ class Automatization:
"""Returns ignored tiles."""
return self._df[self._df["ignore"].notna()].index.values
def run(self,
tiles: List[str] = None,
process_products: bool = False,
indices_list: List[str] = None,
nodata_clouds: bool = True,
quicklook: bool = True,
hubs_limit: Dict[str, int] = None,
revisit_period: int = 2,
) -> None:
def run(
self,
tiles: List[str] = None,
process_products: bool = False,
indices_list: List[str] = None,
nodata_clouds: bool = True,
quicklook: bool = True,
hubs_limit: Dict[str, int] = None,
revisit_period: int = 2,
) -> None:
"""
Runs automatization.
......@@ -200,7 +216,8 @@ class Automatization:
:param indices_list: list of valid indices names that will be processed.
:param nodata_clouds: mask indices output rasters with a cloud-mask.
:param quicklook: creates a quicklook for each indice processed.
:param revisit_period: number of days, since last date, to check again for new images.
:param revisit_period: number of days, since last date, to check again
for new images.
"""
logger.info("Running automatization")
logger.info("Ignored tiles: {}".format(self._get_ignored_tiles(self)))
......@@ -215,18 +232,20 @@ class Automatization:
self._get_tiles_to_update(tiles_list=tiles)
self._get_products_list(revisit_period)
if any(self._products_list.values()):
prods = DownloadAndProcess(identifiers=self._products_list,
hubs_limit=hubs_limit,
aws_limit=2,
process_products=process_products,
max_processes=3,
indices_list=indices_list,
nodata_clouds=nodata_clouds,
quicklook=quicklook)
prods = DownloadAndProcess(
identifiers=self._products_list,
hubs_limit=hubs_limit,
aws_limit=2,
process_products=process_products,
max_processes=3,
indices_list=indices_list,
nodata_clouds=nodata_clouds,
quicklook=quicklook,
)
failed = prods.failed_products
if failed:
print(failed)
# When working on a local network storage, pause the process in
# When working on a local network storage, pause the process in
# order to let the file to be checked by the filesystem (lags).
time.sleep(2)
self._update_df()
......@@ -261,9 +280,10 @@ class Automatization:
class TimeSeriesAutomatization:
"""Time series automatization.
Scans vectors files in the Time Series folder and computes a time
series extraction for each of the files.
Scans vectors files in the Time Series folder and computes a time series
extraction for each of the files.
"""
_time_series_path = Path(Config().get("time_series_path"))
def __init__(self) -> None:
......@@ -272,9 +292,15 @@ class TimeSeriesAutomatization:
def _list_vectors_files(self) -> None:
"""Scans vectors files found in the TIME_SERIES folder."""
valid_types = ("*.geojson", "*.gpkg", "*.shp", ) # type: Tuple[str, ...]
valid_types = (
"*.geojson",
"*.gpkg",
"*.shp",
) # type: Tuple[str, ...]
for valid_type in valid_types:
self._vectors_files.extend(list(self._time_series_path.glob(valid_type)))
self._vectors_files.extend(
list(self._time_series_path.glob(valid_type))
)
def run(self, indices: Sequence[str] = ("NDVI",)) -> None:
"""Computes time series extraction for each indice and for each
......@@ -284,8 +310,10 @@ class TimeSeriesAutomatization:
"""
for vectors_file in self._vectors_files:
logger.info("Processing: {}".format(vectors_file.name))
ts = TimeSeries(date_min=None,
date_max=None,
vectors_file=str(vectors_file),
indices=indices)
ts = TimeSeries(
date_min=None,
date_max=None,
vectors_file=str(vectors_file),
indices=indices,
)
ts.to_csv()
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
# coding: utf-8
"""
Module for collecting configuration data from ``~/sen2chain_data/config/config.cfg``
Module for collecting configuration data from
``~/sen2chain_data/config/config.cfg``
"""
import os
......@@ -16,12 +17,12 @@ logging.basicConfig(level=logging.INFO)
ROOT = Path(os.path.realpath(__file__)).parent.parent
SHARED_DATA = dict(
tiles_index = ROOT / "sen2chain" / "data" / "tiles_index.gpkg",
tiles_index_dict = ROOT / "sen2chain" / "data" / "tiles_index_dict.p",
peps_download = ROOT / "sen2chain" / "peps_download3.py",
sen2chain_meta = ROOT / "sen2chain" / "data" / "sen2chain_info.xml",
raw_job_cfg = ROOT / "sen2chain" / "data" / "job_ini.cfg",
)
tiles_index=ROOT / "sen2chain" / "data" / "tiles_index.gpkg",
tiles_index_dict=ROOT / "sen2chain" / "data" / "tiles_index_dict.p",
peps_download=ROOT / "sen2chain" / "peps_download3.py",
sen2chain_meta=ROOT / "sen2chain" / "data" / "sen2chain_info.xml",
raw_job_cfg=ROOT / "sen2chain" / "data" / "job_ini.cfg",
)
class Config:
......@@ -32,6 +33,7 @@ class Config:
Usage::
>>> Config().get("l1c_path")
"""
# TODO: Implement the Config class as a singleton.
_USER_DIR = Path.home() / "sen2chain_data"
......@@ -44,35 +46,44 @@ class Config:
def __init__(self) -> None:
self._config_params = ConfigParser()
self._config_params["DATA PATHS"] = {"temp_path": "",
"l1c_path": "",
"l1c_archive_path": "",
"l2a_path": "",
"l2a_archive_path": "",
"indices_path": "",
"time_series_path": "",
"temporal_summaries_path": "",
"cloudmasks_path": "",
}
self._config_params["DATA PATHS"] = {
"temp_path": "",
"l1c_path": "",
"l1c_archive_path": "",
"l2a_path": "",
"l2a_archive_path": "",
"indices_path": "",
"time_series_path": "",
"temporal_summaries_path": "",
"cloudmasks_path": "",
}
self._config_params["SEN2COR PATH"] = {"sen2cor_bashrc_path": ""}
self._config_params["HUBS LOGINS"] = {"scihub_id": "",
"scihub_pwd": "",
"peps_config_path": ""}
self._config_params["PROXY SETTINGS"] = {"proxy_http_url": "",
"proxy_https_url": ""}
self._config_params["SEN2CHAIN VERSIONS"] = {"sen2chain_processing_version": "xx.xx"}
self._config_params["LOG PATH"] = {"log_path": str(self._USER_DIR / "logs")}
self._config_params["HUBS LOGINS"] = {
"scihub_id": "",
"scihub_pwd": "",
"peps_config_path": "",
}
self._config_params["PROXY SETTINGS"] = {
"proxy_http_url": "",
"proxy_https_url": "",
}
self._config_params["SEN2CHAIN VERSIONS"] = {
"sen2chain_processing_version": "xx.xx"
}
self._config_params["LOG PATH"] = {
"log_path": str(self._USER_DIR / "logs")
}
if self._CONFIG_FILE.exists():
self._config_params.read(str(self._CONFIG_FILE))
self._config_params_disk = ConfigParser()
self._config_params_disk.read(str(self._CONFIG_FILE))
if self._config_params_disk != self._config_params:
if self._config_params_disk != self._config_params:
self._create_config()
else:
self._create_config()
self.config_dict = dict()
for section in self._config_params.sections():
......@@ -87,9 +98,9 @@ class Config:
self._CONFIG_DIR.mkdir(exist_ok=True)
self._DEFAULT_DATA_DIR.mkdir(exist_ok=True)
self._JOBS_DIR.mkdir(exist_ok=True)
Path(self._config_params["LOG PATH"]["log_path"]).mkdir(exist_ok = True)
#~ (self.__JOBS_DIR / "logs").mkdir(exist_ok=True)
Path(self._config_params["LOG PATH"]["log_path"]).mkdir(exist_ok=True)
# ~ (self.__JOBS_DIR / "logs").mkdir(exist_ok=True)
with open(str(self._CONFIG_FILE), "w") as cfg_file:
self._config_params.write(cfg_file)
......@@ -98,6 +109,7 @@ class Config:
Checks if data paths are provided and valids. If not, create default
folders in sen2chain_data/DATA and update the configuration file.
"""
def update_config(section, key, val):
"""
Update a setting in config.ini
......@@ -113,16 +125,25 @@ class Config:
if value.rstrip() == "" or not Path(value).exists():
default_value = self._DEFAULT_DATA_DIR / path.replace("_path", "").upper()
default_value = (
self._DEFAULT_DATA_DIR / path.replace("_path", "").upper()
)
default_value.mkdir(parents=True, exist_ok=True)
update_config("DATA PATHS", path, str(default_value))
logger.info("{}: using default at {}".format(path, str(default_value)))
logger.info(
"{}: using default at {}".format(path, str(default_value))
)
sen2cor_bashrc_path_value = self.config_dict["sen2cor_bashrc_path"]
if sen2cor_bashrc_path_value.rstrip() == "" or not Path(sen2cor_bashrc_path_value).exists():
logging.error("Make sure the path to the sen2cor Bashrc file is valid.")
if (
sen2cor_bashrc_path_value.rstrip() == ""
or not Path(sen2cor_bashrc_path_value).exists()
):
logging.error(
"Make sure the path to the sen2cor Bashrc file is valid."
)
raise ValueError("Invalid sen2cor Bashrc")
def get(self, param: str) -> str:
......
# -*- coding: utf-8 -*-
# coding: utf-8
"""
Module for obtaining data from the scihub API.
......@@ -20,13 +20,19 @@ from shapely.wkt import loads
from shapely.ops import cascaded_union
from pprint import pprint
import itertools
# type annotations
from typing import List, Set, Dict, Tuple, Union
from .config import Config, SHARED_DATA
#from geo_utils import get_tiles_from_point, get_tiles_from_bbox, get_tiles_from_file
from .utils import grouper, str_to_datetime
from .geo_utils import get_tiles_from_point, get_tiles_from_bbox, get_tiles_from_file, serialise_tiles_index
from .geo_utils import (
get_tiles_from_point,
get_tiles_from_bbox,
get_tiles_from_file,
serialise_tiles_index,
)
# présentation problème #
# il n'est pas possible avec l'API du scihub de faire une recherche
......@@ -48,15 +54,21 @@ class DataRequest:
:param start_date: query's start date (YYYY-MM-DD).
:param end_date: query's end date(YYYY-MM-DD) or datetime objet.
:param land_only: keep only tiles that contain land or datetime object.
:param cloud_cover_percentage: cloud cover percentage range (min, max) from 0 to 100.
:param cloud_cover_percentage: cloud cover percentage range (min, max)
from 0 to 100.
Usage:
>>> data_request.DataRequest(start_date="2018-01-10", end_date="2018-01-31,
land_only=True).from_tiles["40KCB", "40KEC"]
>>> data_request.DataRequest(
start_date="2018-01-10",
end_date="2018-01-31,
land_only=True).from_tiles["40KCB", "40KEC"]
"""
# Since "requester pays" was enabled on AWS for the Sentienl-2 L1C dataset
# (products are no longer free), downloading non-tiled products on AWS does'nt work anymore.
# Therefore, it's useless to make a complex request to separate non-tiled and tiled products.
# (products are no longer free), downloading non-tiled products on AWS
# does'nt work anymore.
# Therefore, it's useless to make a complex request to separate non-tiled
# and tiled products.
# This class needs to be refactored.
# Proxy settings
......@@ -64,17 +76,19 @@ class DataRequest:
proxy_https_url = Config().get("proxy_https_url").strip()
def __init__(
self,
start_date: Union[str, datetime] = None,
end_date: Union[str, datetime] = None,
land_only: bool = False,
cloud_cover_percentage: Tuple[int, int] = None
self,
start_date: Union[str, datetime] = None,
end_date: Union[str, datetime] = None,
land_only: bool = False,
cloud_cover_percentage: Tuple[int, int] = None,
) -> None:
if start_date is None:
# default start_date : first sentinel2 acquisition
self.start_date = str_to_datetime("2015-06-29", "ymd")
logger.info("Start date not provided, using {}.".format(self.start_date))
logger.info(
"Start date not provided, using {}.".format(self.start_date)
)
else:
if not isinstance(start_date, datetime):
start_date = str_to_datetime(start_date, "ymd")
......@@ -92,12 +106,21 @@ class DataRequest:
self.tiles_to_keep = None
self.tiles_to_keep_geom = dict()
self.products_list = {"aws": {}, "hubs": {}}
self.cloudcoverpercentage = cloud_cover_percentage if cloud_cover_percentage else (0,100)
self.api = SentinelAPI(Config().get("scihub_id"), Config().get("scihub_pwd"), "https://apihub.copernicus.eu/apihub/")
self.cloudcoverpercentage = (
cloud_cover_percentage if cloud_cover_percentage else (0, 100)
)
self.api = SentinelAPI(
Config().get("scihub_id"),
Config().get("scihub_pwd"),
"https://apihub.copernicus.eu/apihub/",
)
# Set proxy settings to the Requests session
if self.proxy_http_url or self.proxy_https_url:
proxies = {"http": self.proxy_http_url, "https": self.proxy_https_url}
proxies = {
"http": self.proxy_http_url,
"https": self.proxy_https_url,
}
self.api.session.proxies = proxies
def _get_tiles_geom(self) -> None:
......@@ -122,7 +145,9 @@ class DataRequest:
:param : tiles: list of valid tiles names.
"""
self.tiles_to_keep = [re.sub("^T", "", tile.upper()) for tile in set(tiles)]
self.tiles_to_keep = [
re.sub("^T", "", tile.upper()) for tile in set(tiles)
]
self._get_tiles_geom()
self._make_request()
return self.products_list
......@@ -133,15 +158,15 @@ class DataRequest:
:param lon: longitude.
:param lat: latitude.
"""
self.tiles_to_keep = get_tiles_from_point(lon, lat, land_only=self.land_only)
self.tiles_to_keep = get_tiles_from_point(
lon, lat, land_only=self.land_only
)
self._get_tiles_geom()
self.make_request()
return self.products_list
def from_bbox(
self,
lon_min: float, lat_min: float,
lon_max: float, lat_max: float
self, lon_min: float, lat_min: float, lon_max: float, lat_max: float
) -> Dict[str, Dict]:
"""akes request from a bbox.
......@@ -150,7 +175,9 @@ class DataRequest:
:param lon_max: longitude.
:param lat_max: latitude.
"""
self.tiles_to_keep = get_tiles_from_bbox(lon_min, lat_min, lon_max, lat_max, land_only=self.land_only)
self.tiles_to_keep = get_tiles_from_bbox(
lon_min, lat_min, lon_max, lat_max, land_only=self.land_only
)
self._get_tiles_geom()
self._make_request()
return self.products_list
......@@ -160,10 +187,12 @@ class DataRequest:
:param : tiles: list of valid tiles names.
"""
geom_tiles = get_tiles_from_file(vectors_file, land_only=self.land_only)
self.tiles_to_keep = list(set(itertools.chain.from_iterable(
geom_tiles.values()
)))
geom_tiles = get_tiles_from_file(
vectors_file, land_only=self.land_only
)
self.tiles_to_keep = list(
set(itertools.chain.from_iterable(geom_tiles.values()))
)
self._get_tiles_geom()
self._make_request()
return self.products_list
......@@ -172,14 +201,18 @@ class DataRequest:
"""Will call the right request method depending on products"""
logger.debug("_make_request")
logger.info("Requesting images ranging from {} to {}".format(self.start_date, self.end_date))
logger.info(
"Requesting images ranging from {} to {}".format(
self.start_date, self.end_date
)
)
if self.tiles_to_keep is None:
raise ValueError("Query tiles not provided")
# reset products_list
# should the products_list be updated or erased for each new request ?
self.products_list = {"aws":{}, "hubs":{}}
self.products_list = {"aws": {}, "hubs": {}}
tileddate = str_to_datetime("2016-11-01", "ymd")
......@@ -201,12 +234,15 @@ class DataRequest:
# query by group of 3 tiles, otherwise getting error message
# "Request URI too long" from scihub
for tiles_to_keep_triplet, tiles_to_keep_triplet_geom \
in zip(grouper(self.tiles_to_keep, 3),
grouper(self.tiles_to_keep_geom.values(), 3)):
for tiles_to_keep_triplet, tiles_to_keep_triplet_geom in zip(
grouper(self.tiles_to_keep, 3),
grouper(self.tiles_to_keep_geom.values(), 3),
):
tiles_to_keep = [tile for tile in tiles_to_keep_triplet if tile]
tiles_to_keep_geom = [geom for geom in tiles_to_keep_triplet_geom if geom]
tiles_to_keep_geom = [
geom for geom in tiles_to_keep_triplet_geom if geom
]
print(tiles_to_keep)
......@@ -217,12 +253,13 @@ class DataRequest:
# scihub request
products = OrderedDict()
products = self.api.query(
query_geom,
date=(self.start_date, self.end_date),
order_by="+endposition",
platformname="Sentinel-2",
producttype="S2MSI1C",
cloudcoverpercentage=self.cloudcoverpercentage)
query_geom,
date=(self.start_date, self.end_date),
order_by="+endposition",
platformname="Sentinel-2",
producttype="S2MSI1C",
cloudcoverpercentage=self.cloudcoverpercentage,
)
# save products list as a pandas dataframe
products_df = self.api.to_dataframe(products)
......@@ -231,7 +268,9 @@ class DataRequest:
return
# a products dictionnay for each server (AWS vs hubs)
# fill each dictionnary depending on the acquisition date
for index, row in products_df[["title", "beginposition", "footprint"]].iterrows():
for index, row in products_df[
["title", "beginposition", "footprint"]
].iterrows():
# start date of the tiled S2 collection on the scihub server
tileddate = str_to_datetime("2016-11-01", "ymd")
......@@ -241,34 +280,55 @@ class DataRequest:
for tile_name, tile_geom in self.tiles_to_keep_geom.items():
# in case of duplicates on the server
if img_title not in self.products_list["hubs"].keys() and img_title not in self.products_list["aws"].keys():
if (
img_title not in self.products_list["hubs"].keys()
and img_title not in self.products_list["aws"].keys()
):
# tiled products are downloaded on hubs
if re.match(r".*_T[0-9]{2}[A-Z]{3}_.*", img_title):
if tile_name in img_title:
self.products_list["hubs"][img_title] = {"date": img_date,
"tile": tile_name}
self.products_list["hubs"][img_title] = {
"date": img_date,
"tile": tile_name,
}
continue
else:
continue
# non-tiled products will be downloaded on aws
# non-tiled products will be downloaded on aws
else:
if tile_geom.intersects(img_footprint):
self.products_list["aws"][img_title] = {"date": img_date,
"tile": tile_name}
self.products_list["aws"][img_title] = {
"date": img_date,
"tile": tile_name,
}
# pprint dicts in chronological order
print("\nFrom AWS")
pprint(list(OrderedDict(sorted(self.products_list["aws"].items(),
key=lambda t: t[1]["date"]))))
pprint(
list(
OrderedDict(
sorted(
self.products_list["aws"].items(),
key=lambda t: t[1]["date"],
)
)
)
)
print("\nFrom hubs")
pprint(list(OrderedDict(sorted(self.products_list["hubs"].items(),
key=lambda t: t[1]["date"]))))
### REQUÊTE POUR PRODUITS TUILÉS (moins lourde)
pprint(
list(
OrderedDict(
sorted(
self.products_list["hubs"].items(),
key=lambda t: t[1]["date"],
)
)
)
)
# Tiled products request (lighter)
def _make_request_tiled_only(self) -> None:
"""Scihub API request using sentinelsat. This method is called if
products are a mix of tiled and non-tiled products."""
......@@ -276,7 +336,6 @@ class DataRequest:
logger.debug("_make_request_tiled_only")
print("Sentinel2 tiles:\n", self.tiles_to_keep)
products_from_hubs = dict()
products_from_aws = dict()
......@@ -293,15 +352,16 @@ class DataRequest:
print("Ignoring water-only tiles:", water_tiles)
query_kwargs = {
'platformname': 'Sentinel-2',
'producttype': 'S2MSI1C',
'cloudcoverpercentage': self.cloudcoverpercentage,
'date': (self.start_date, self.end_date)}
"platformname": "Sentinel-2",
"producttype": "S2MSI1C",
"cloudcoverpercentage": self.cloudcoverpercentage,
"date": (self.start_date, self.end_date),
}
products = OrderedDict()
for tile in self.tiles_to_keep:
kw = query_kwargs.copy()
kw['filename'] = '*_T{}_*'.format(tile)
kw["filename"] = "*_T{}_*".format(tile)
pp = self.api.query(**kw)
products.update(pp)
......@@ -315,58 +375,20 @@ class DataRequest:
for index, row in products_df[["title", "beginposition"]].iterrows():
img_title = row[0]
img_date = row[1].to_pydatetime()
self.products_list["hubs"][img_title] = {"date": img_date,
"tile": re.findall("_T([0-9]{2}[A-Z]{3})_", img_title)[0]}
self.products_list["hubs"][img_title] = {
"date": img_date,
"tile": re.findall("_T([0-9]{2}[A-Z]{3})_", img_title)[0],
}
# pprint dicts in chronological order
print("\nFrom hubs")
pprint(list(OrderedDict(sorted(self.products_list["hubs"].items(),
key=lambda t: t[1]["date"]))))
#instance_start_date = '2018-07-01'
#instance_end_date = '2018-07-16'
#instance_start_date, instance_end_date = '20160123', '20160424'
#x = 55.5331
#y = -21.1331
#bbox = (54.66796875, -21.647217065387817, 58.4033203125, -19.652934210612436)
#data = DataRequest(start_date=instance_start_date,
#end_date=instance_end_date,
#land_only=False).from_tiles("35LLD", "40KCB", "38KQV", "34HBJ")
#req = DataRequest(start_date=instance_start_date,
#end_date=instance_end_date,
#land_only=False)
#print("#######")
#pprint(req.from_tiles("35LLD"))
#print("-------")
#pprint(req.from_tiles("40KCB", "38KQV", "34HBJ"))
#data = DataRequest(start_date=instance_start_date,
#end_date=instance_end_date,
#land_only=False).from_point(lon=x, lat=y)
#data = DataRequest(start_date=instance_start_date,
#end_date=instance_end_date,
#land_only=True).from_bbox(54.66796875, -21.647217065387817, 58.4033203125, -19.652934210612436)
#data = DataRequest(start_date=instance_start_date,
#end_date=instance_end_date,
#land_only=True).from_file("/home/seas-oi/Documents/NDVI-MODIS-SENTINEL/SIG/sites_mada/34_sites_sentinelles.shp")
#land_only=True).from_file("/home/seas-oi/Téléchargements/map.geojson")
pprint(
list(
OrderedDict(
sorted(
self.products_list["hubs"].items(),
key=lambda t: t[1]["date"],
)
)
)
)
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
# -*- coding: utf-8 -*-
# coding: utf-8
import multiprocessing, subprocess
import os
from time import sleep
import logging
from functools import partial
#~ import psutil
# import psutil
from .products import L1cProduct, L2aProduct
......@@ -13,61 +14,72 @@ logger = logging.getLogger("Multiprocessing")
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.INFO)
def multi(product):
try:
fwd = os.path.dirname(os.path.realpath(__file__))
logger.info("Processing {}".format(product))
cmd= ["setsid", "/usr/bin/python3", fwd + "/multiprocess_l2a.py", product]
cmd = [
"setsid",
"/usr/bin/python3",
fwd + "/multiprocess_l2a.py",
product,
]
proc = subprocess.Popen(cmd)
l1c = L1cProduct(product)
l2a_identifier = l1c.identifier.replace("L1C_", "L2A_").replace("_OPER_", "_USER_")
l2a_identifier = l1c.identifier.replace("L1C_", "L2A_").replace(
"_OPER_", "_USER_"
)
l2a_prod = L2aProduct(l2a_identifier)
while not(l2a_prod.in_library):
while not (l2a_prod.in_library):
sleep(5)
logger.info("End {}".format(product))
except:
logger.info("Plante {}".format(product))
pass
def l2a_multiprocessing(process_list, nb_proc=4):
""" """
nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1)
pool = multiprocessing.Pool(nb_proc)
results = [pool.map(multi, process_list)]
pool.close()
pool.join()
return True
#~ def multi_cldidx(indice_list, l2a_identifier):
#~ l2a = L2aProduct(l2a_identifier)
#~ l2a.process_cloud_mask_v2()
#~ l2a.process_indices(indice_list, True, True)
#~ def cldidx_multiprocessing(process_list, indice_list=["NDVI", "NDWIGAO", "NDWIMCF"], nb_proc=4):
#~ """ """
#~ nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
#~ pool = multiprocessing.Pool(nb_proc)
#~ results = [pool.map(partial(multi_cldidx, indice_list), process_list)]
#~ pool.close()
#~ pool.join()
#~ return True
#~ def multi_cld(l2a_identifier):
#~ l2a = L2aProduct(l2a_identifier)
#~ try:
#~ l2a.process_cloud_mask_v2()
#~ except:
#~ pass
#~ def cld_multiprocessing(process_list, nb_proc=4):
#~ """ """
#~ nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
#~ pool = multiprocessing.Pool(nb_proc)
#~ results = [pool.map(multi_cld, process_list)]
#~ pool.close()
#~ pool.join()
#~ return True
# def multi_cldidx(indice_list, l2a_identifier):
# l2a = L2aProduct(l2a_identifier)
# l2a.process_cloud_mask_v2()
# l2a.process_indices(indice_list, True, True)
# def cldidx_multiprocessing(process_list, indice_list=["NDVI", "NDWIGAO", "NDWIMCF"], nb_proc=4):
# """ """
# nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
# pool = multiprocessing.Pool(nb_proc)
# results = [pool.map(partial(multi_cldidx, indice_list), process_list)]
# pool.close()
# pool.join()
# return True
# def multi_cld(l2a_identifier):
# l2a = L2aProduct(l2a_identifier)
# try:
# l2a.process_cloud_mask_v2()
# except:
# pass
# def cld_multiprocessing(process_list, nb_proc=4):
# """ """
# nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
# pool = multiprocessing.Pool(nb_proc)
# results = [pool.map(multi_cld, process_list)]
# pool.close()
# pool.join()
# return True
def multi_cld_ver_pro_iter_repro(l2a_ver_pro_iter_repro):
l2a = L2aProduct(l2a_ver_pro_iter_repro[0])
......@@ -80,33 +92,39 @@ def multi_cld_ver_pro_iter_repro(l2a_ver_pro_iter_repro):
thin_cir = l2a_ver_pro_iter_repro[7]
reprocess = l2a_ver_pro_iter_repro[8]
try:
l2a.compute_cloud_mask(cm_version = cm_version,
probability = probability,
iterations = iterations,
cld_shad = cld_shad,
cld_med_prob = cld_med_prob,
cld_hi_prob = cld_hi_prob,
thin_cir = thin_cir,
reprocess = reprocess)
l2a.compute_cloud_mask(
cm_version=cm_version,
probability=probability,
iterations=iterations,
cld_shad=cld_shad,
cld_med_prob=cld_med_prob,
cld_hi_prob=cld_hi_prob,
thin_cir=thin_cir,
reprocess=reprocess,
)
except:
pass
def cld_version_probability_iterations_reprocessing_multiprocessing(process_list, nb_proc=4):
def cld_version_probability_iterations_reprocessing_multiprocessing(
process_list, nb_proc=4
):
""" """
nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1)
pool = multiprocessing.Pool(nb_proc)
results = [pool.map(multi_cld_ver_pro_iter_repro, process_list)]
pool.close()
pool.join()
return True
def multi_idx(l2a_id_idx):
l2a_identifier = l2a_id_idx[0]
indice = l2a_id_idx[1]
reprocess = l2a_id_idx[2]
nodata_clouds = l2a_id_idx[3]
quicklook = l2a_id_idx[4]
cm_version = l2a_id_idx[5]
quicklook = l2a_id_idx[4]
cm_version = l2a_id_idx[5]
probability = l2a_id_idx[6]
iterations = l2a_id_idx[7]
cld_shad = l2a_id_idx[8]
......@@ -115,24 +133,26 @@ def multi_idx(l2a_id_idx):
thin_cir = l2a_id_idx[11]
l2a = L2aProduct(l2a_identifier)
try:
l2a.compute_indice(indice = indice,
reprocess = reprocess,
nodata_clouds = nodata_clouds,
quicklook = quicklook,
cm_version = cm_version,
probability = probability,
iterations = iterations,
cld_shad = cld_shad,
cld_med_prob = cld_med_prob,
cld_hi_prob = cld_hi_prob,
thin_cir = thin_cir,
)
l2a.compute_indice(
indice=indice,
reprocess=reprocess,
nodata_clouds=nodata_clouds,
quicklook=quicklook,
cm_version=cm_version,
probability=probability,
iterations=iterations,
cld_shad=cld_shad,
cld_med_prob=cld_med_prob,
cld_hi_prob=cld_hi_prob,
thin_cir=thin_cir,
)
except:
pass
def idx_multiprocessing(process_list, nb_proc=4):
""" """
nb_proc = max(min(len(os.sched_getaffinity(0))-1, nb_proc), 1)
nb_proc = max(min(len(os.sched_getaffinity(0)) - 1, nb_proc), 1)
pool = multiprocessing.Pool(nb_proc)
results = [pool.map(multi_idx, process_list)]
pool.close()
......
# -*- coding:utf-8 -*-
# FIXME: delete file ?
"""
This python script should be run from terminal for multiprocessing l1c-> l2a
"""
......
# -*- coding: utf-8 -*-
# coding: utf-8
import concurrent.futures
import urllib.request
import time
#~ from .tiles import Tile
#~ from .products import L1cProduct, L2aProduct
# from .tiles import Tile
# from .products import L1cProduct, L2aProduct
from sen2chain import Tile, L1cProduct, L2aProduct
def process(l1c_identifier,
reprocess_l2a=False, reprocess_cloud_mask=False,
indices_list=[], reprocess_indices=False, nodata_clouds=False, quicklook=False):
def process(
l1c_identifier,
reprocess_l2a=False,
reprocess_cloud_mask=False,
indices_list=[],
reprocess_indices=False,
nodata_clouds=False,
quicklook=False,
):
"""Process a list of products in paralle.
:param l1c_identifier:
......@@ -27,47 +33,66 @@ def process(l1c_identifier,
L1cProduct(l1c_identifier).process_l2a(reprocess=reprocess_l2a)
l2a_identifier = l1c_identifier.replace("1C_", "2A_")
L2aProduct(l2a_identifier).process_cloud_mask(reprocess=reprocess_cloud_mask)
L2aProduct(l2a_identifier).process_indices(indices_list=indices_list,
nodata_clouds=nodata_clouds,
quicklook=quicklook,
reprocess=reprocess_indices,
)
L2aProduct(l2a_identifier).process_cloud_mask(
reprocess=reprocess_cloud_mask
)
L2aProduct(l2a_identifier).process_indices(
indices_list=indices_list,
nodata_clouds=nodata_clouds,
quicklook=quicklook,
reprocess=reprocess_indices,
)
except Exception as e:
return "FAILED", e
return "success", None
def parallel_processing(identifiers_list, max_workers=3,
reprocess_l2a=False, reprocess_cloud_mask=False,
indices_list=[], reprocess_indices=False,
nodata_clouds=False, quicklook=False):
def parallel_processing(
identifiers_list,
max_workers=3,
reprocess_l2a=False,
reprocess_cloud_mask=False,
indices_list=[],
reprocess_indices=False,
nodata_clouds=False,
quicklook=False,
):
""" """
prods = [L1cProduct(p.identifier) for p in identifiers_list]
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
future_to_mot = {executor.submit(process, p.identifier, **kwargs): p for p in prods}
with concurrent.futures.ProcessPoolExecutor(
max_workers=max_workers
) as executor:
future_to_mot = {
executor.submit(process, p.identifier, **kwargs): p for p in prods
}
for future in concurrent.futures.as_completed(future_to_mot):
identifier = future_to_mot[future]
issue = future.result()
print("{} processing: {}. Errors: {}.".format(identifier, issue[0], issue[1]))
print(
"{} processing: {}. Errors: {}.".format(
identifier, issue[0], issue[1]
)
)
if __name__ == "__main__":
kwargs = {"reprocess_l2a": False,
"reprocess_cloud_mask": False,
"indices_list": ["NDVI", "NDWIGAO", "NDWIMCP"],
"nodata_clouds": True,
"quicklook": True,
"reprocess_indices": False}
kwargs = {
"reprocess_l2a": False,
"reprocess_cloud_mask": False,
"indices_list": ["NDVI", "NDWIGAO", "NDWIMCP"],
"nodata_clouds": True,
"quicklook": True,
"reprocess_indices": False,
}
identifiers_list = Tile("38LPM").l1c
parallel_processing(identifiers_list=identifiers_list, max_workers=8, **kwargs)
parallel_processing(
identifiers_list=identifiers_list, max_workers=8, **kwargs
)
#parallel(identifiers_list, process_indices
# parallel(identifiers_list, process_indices
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment