Skip to content
Snippets Groups Projects
Commit 9713f4e4 authored by pascal.mouquet_ird.fr's avatar pascal.mouquet_ird.fr
Browse files

resolved conflicts

parents 661a83c1 33880746
No related branches found
No related tags found
No related merge requests found
Pipeline #1494 passed
......@@ -27,6 +27,7 @@ from .products import (
OldCloudMaskProduct,
NewCloudMaskProduct,
IndiceProduct,
FamilyProduct,
)
from .library import Library
from .data_request import DataRequest
......@@ -46,6 +47,7 @@ from .utils import (
get_Sen2Cor_version,
get_latest_s2c_version_path,
set_permissions,
get_cm_dict, get_cm_string_from_dict, get_indice_from_identifier
)
from .geo_utils import (
serialise_tiles_index,
......
......@@ -21,6 +21,7 @@ import multiprocessing
from .config import SHARED_DATA, Config
from .utils import get_tile
from .products import FamilyProduct
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
......@@ -73,7 +74,7 @@ class S2cEodag:
productType: str = "L1C",
start: str = "2015-01-01",
end: str = "9999-12-31",
ref: str = "l1c",
ref = "l1c",
min_cloudcover: int = 0,
max_cloudcover: int = 100,
):
......@@ -99,6 +100,12 @@ class S2cEodag:
)
logging.disable(logging.NOTSET)
# if not isinstance(ref, list):
# ref_string = ref
# else:
# ref_string = "specific cloudmask(s) and / or indice(s) product(s)"
logger.info("Searching online products, ref: {}".format(ref))
fitered = self.products[:]
for p in fitered:
if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists():
......@@ -123,37 +130,159 @@ class S2cEodag:
p.properties["storageStatus"] = "ONLINE"
if not(min_cloudcover <= float(p.properties["cloudCover"] or 0) <= max_cloudcover):
self.products.remove(p)
logger.info("{} - local l1c {} - filtering (CC = {}%)".format(p.properties["title"], l1c_presence, int(p.properties["cloudCover"])))
logger.info(
"{} - local l1c {} - filtering (CC = {}%)".format(
p.properties["title"],
l1c_presence,
int(p.properties["cloudCover"])
)
)
else:
if ref == "l1c":
if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists():
logger.info("{} - local l1c {} - filtering".format(p.properties["title"], l1c_presence, p.properties["storageStatus"]))
if l1c_presence == "PRESENT":
logger.info(
"{} - l1c PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
logger.info("{} - local l1c {} - remote {}".format(p.properties["title"], l1c_presence, p.properties["storageStatus"]))
logger.info(
"{} - l1c ABSENT - remote {}".format(
p.properties["title"],
p.properties["storageStatus"]
)
)
elif ref == "l2a":
if (Path(Config().get("l2a_path")) / self.name / (p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_") + ".SAFE")).exists():
logger.info("{} - local l1c {} - local l2a {} - filtering".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"]))
if l2a_presence == "PRESENT":
logger.info(
"{} - l2a PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if (outputs_prefix / (p.properties["title"] + ".SAFE")).exists():
logger.info("{} - local l1c {} - filtering".format(p.properties["title"], l1c_presence, p.properties["storageStatus"]))
if l1c_presence == "PRESENT":
logger.info(
"{} - l1c PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
logger.info("{} - local l1c {} - local l2a {} - remote {}".format(p.properties["title"], l1c_presence, l2a_presence, p.properties["storageStatus"]))
elif ref == "indice":
toto = 12
l2a_identifier = p.properties["title"].replace("L1C_", "L2A_").replace("__OPER__", "_USER_")
l2aProduct(l2a_identifier)
elif isinstance(ref, list):
toto = 24
# toto = 13
logger.info(
"{} - l2a ABSENT - l1c ABSENT - downloading - remote {}".format(
p.properties["title"],
p.properties["storageStatus"]
)
)
elif ref == "cloudmasks":
fp = FamilyProduct(p.properties["title"])
if fp.cloudmasks:
logger.info(
"{} - at least one cloudmask PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if l2a_presence == "PRESENT":
logger.info(
"{} - l2a PRESENT - cloudmasks ABSENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if l1c_presence == "PRESENT":
logger.info(
"{} - l1c PRESENT - cloudmasks ABSENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
logger.info(
"{} - l1c ABSENT, l2a ABSENT, cloudmasks ABSENT - remote {}".format(
p.properties["title"],
p.properties["storageStatus"]
)
)
elif ref == "indices":
fp = FamilyProduct(p.properties["title"])
if fp.indices:
logger.info(
"{} - at least one indice PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if l2a_presence == "PRESENT":
logger.info(
"{} - l2a PRESENT - indices ABSENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if l1c_presence == "PRESENT":
logger.info(
"{} - l1c PRESENT - indices ABSENT - filtering".format(
p.properties["title"],
l1c_presence,
p.properties["storageStatus"]
)
)
self.products.remove(p)
else:
logger.info(
"{} - l1c ABSENT, l2a ABSENT, indices ABSENT - remote {}".format(
p.properties["title"],
p.properties["storageStatus"]
)
)
else:
if not isinstance(ref, list):
ref = [ref]
fp = FamilyProduct(p.properties["title"])
all_conditions = []
for r in ref:
all_conditions.append(r in fp.cm_ind_string_list)
if all(all_conditions):
logger.info(
"{} - all ref PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if l2a_presence == "PRESENT":
logger.info(
"{} - some ref ABSENT - l2a PRESENT - filtering".format(
p.properties["title"],
)
)
self.products.remove(p)
else:
if l1c_presence == "PRESENT":
logger.info(
"{} - some ref ABSENT - l1c PRESENT - filtering".format(
p.properties["title"],
l1c_presence,
p.properties["storageStatus"]
)
)
self.products.remove(p)
else:
logger.info(
"{} - some ref ABSENT - l2a ABSENT - l1c ABSENT - downloading - remote {}".format(
p.properties["title"],
p.properties["storageStatus"]
)
)
# clouds = self.products[:]
# for p in clouds:
# if not(min_cloudcover <= int(p.properties["cloudCover"]) <= max_cloudcover):
......
......@@ -624,6 +624,20 @@ class Job:
download_list= []
for index, row in tasks.iterrows():
if bool(setuptools.distutils.util.strtobool(str(row.l1c))):
if not bool(setuptools.distutils.util.strtobool(str(row.remove))):
if "l1c" in str(row.remove).lower():
if "l2a" not in str(row.remove).lower():
ref = "l2a"
else:
ref = []
for cm in row.cloudmasks:
if not (row.cloudmasks == "False" or not row.cloudmasks):
ref.append(cm)
for ind in row.indices:
if not (row.indices == "False" or not row.indices):
ref.extend([ind, ind + "_" + cm])
ref = list(set(ref))
t = Tile(row.tile)
tile_download_list = t.get_l1c(
provider = self.provider,
......@@ -631,6 +645,7 @@ class Job:
start = row.date_min,
end = row.date_max,
new = False,
ref = ref,
max_cloudcover = row.max_clouds,
)
download_list.extend(tile_download_list)
......@@ -750,16 +765,13 @@ class Job:
# Remove downloaded L1C
for index, row in tasks.iterrows():
if "l1c" in str(row.remove).lower():
logger.info("Removing downloaded l1c products for tile: {}".format(row.tile))
if self.logs:
f.write("{}\nRemoving downloaded l1c products for tile: {}\n\n".format(datetime.datetime.now(), row.tile))
f.flush()
t = Tile(row.tile)
prodlist = [p for p in l1c_process_list if row.tile in p]
t.remove_l1c(prodlist)
logger.info("Removed {} downloaded l1c products: {}".format(len(prodlist), prodlist))
if self.logs:
f.write("{}\nRemoved {} downloaded l1c products:\n".format(datetime.datetime.now(), len(prodlist)))
for l1c in prodlist:
f.write("{}\n".format(l1c))
f.write("\n")
f.flush()
# Comuting cloudmasks (from L2A)
logger.info("Computing cloudmasks")
......@@ -952,15 +964,26 @@ class Job:
if "l2a" in str(row.remove).lower():
t = Tile(row.tile)
prodlist = [p for p in l2a_remove_list if row.tile in p]
t.remove_l2a(prodlist)
logger.info("Removed {} produced l2a product(s): {}".format(len(prodlist), prodlist))
logger.info(
"Removing {} produced l2a for tile {}".format(
row.tile,
len(prodlist),
)
)
if self.logs:
f.write("{}\nRemoved {} produced l2a products:\n".format(datetime.datetime.now(), len(prodlist)))
f.write(
"{}\nRemoving {} produced l2a for tile {}\n".format(
datetime.datetime.now(),
len(prodlist),
row.tile,
)
)
for l2a in prodlist:
f.write("{}\n".format(l2a))
f.write("\n")
f.flush()
f.flush()
t.remove_l2a(prodlist)
# Cleaning after
if clean_after:
logger.info("Cleaning Tiles")
......
......@@ -20,7 +20,15 @@ from typing import (
Union
)
from packaging import version
from .utils import grouper, set_permissions, get_Sen2Cor_version, get_latest_s2c_version_path
from .utils import (
grouper,
set_permissions,
get_Sen2Cor_version,
get_latest_s2c_version_path,
get_cm_dict,
get_cm_string_from_dict,
get_indice_from_identifier
)
from .config import Config, SHARED_DATA
from .xmlparser import MetadataParser, Sen2ChainMetadataParser
from .sen2cor import process_sen2cor
......@@ -1733,3 +1741,117 @@ class IndiceProduct:
)
else:
Sen2ChainMetadataParser(self._info_path).init_metadata()
class FamilyProduct(dict):
"""Family product class
:param identifier: cloudmask filename.
"""
_l1c_library_path = Path(Config().get("l1c_path"))
_l2a_library_path = Path(Config().get("l2a_path"))
_cloudmask_library_path = Path(Config().get("cloudmasks_path"))
_indice_library_path = Path(Config().get("indices_path"))
def __init__(
self,
identifier: str = None,
):
if not (identifier):
raise ValueError(
"Identifier cannot be empty"
)
else:
fid_tile = self.get_family_id_tile(identifier)
if fid_tile:
self.update(fid_tile)
self.family_id = fid_tile["family_id"]
self.tile = fid_tile["tile"]
self.l1c_id = self.get_l1c()
self.l2a_id = self.get_l2a()
self.cloudmasks = self.get_cloudmasks()
self.indices = self.get_indices()
self.cm_ind_string_list = self.get_cm_ind_string_list()
else:
logger.info("Invalid identifier {}".format(identifier))
for key, val in self.__dict__.items():
self[key] = val
def get_family_id_tile(self, identifier):
try:
pat = re.compile(
r".*(?P<family_id>"
+ "[0-9]{8}T[0-9]{6}"
+ "_N[0-9]{4}_R[0-9]{3}"
+ "_T(?P<tile>[0-9]{2}[A-Z]{3})"
+ "_[0-9]{8}T[0-9]{6}"
+ ").*"
)
return pat.match(identifier).groupdict()
except:
pass
def get_l1c(self):
library_path = self._l1c_library_path / self.tile
paths = list(library_path.glob("*L1C*" + self.family_id + "*"))
if len(paths) > 0:
return paths[0].stem
else:
return None
def get_l2a(self):
library_path = self._l2a_library_path / self.tile
paths = list(library_path.glob("*L2A*" + self.family_id + "*"))
if len(paths) > 0:
return paths[0].stem
else:
return None
def get_cloudmasks(self):
cloudmasks = []
try:
library_path = self._cloudmask_library_path / self.tile
paths = list(library_path.glob("*L2A*" + self.family_id + "/*L2A*" + self.family_id + "_CM*.jp2"))
for item in paths:
cm_dict = get_cm_dict(item.stem)
cm_string = get_cm_string_from_dict(cm_dict)
cm_dict["cm_string"] = cm_string
cloudmasks.append(cm_dict)
except:
pass
return cloudmasks
def get_indices(self):
indices = []
try:
library_path = self._indice_library_path
paths = list(
library_path.glob(
"*/"
+ self.tile + "/"
+ "*L2A*" + self.family_id + "/"
+ "*.jp2"
)
)
for item in paths:
# logger.info(item)
indice_dict = {}
indice_dict["indice"] = get_indice_from_identifier(item.stem)
cm_dict = get_cm_dict(item.stem)
if cm_dict:
indice_dict.update(cm_dict)
indice_dict["cm_string"] = get_cm_string_from_dict(cm_dict)
indice_dict["indice_string"] = indice_dict["indice"] + "_" + indice_dict["cm_string"]
else:
indice_dict["indice_string"] = indice_dict["indice"]
indices.append(indice_dict)
except:
pass
return indices
def get_cm_ind_string_list(self):
cm_ind_string_list = []
if self.cloudmasks:
cm_ind_string_list.extend([cm["cm_string"] for cm in self.cloudmasks])
if self.indices:
cm_ind_string_list.extend([ind["indice_string"] for ind in self.indices])
return cm_ind_string_list
......@@ -827,7 +827,7 @@ class Tile:
start: str = "2015-01-01",
end: str = "9999-12-31",
new: bool = False,
ref: str = "l1c",
ref = "l1c",
min_cloudcover: int = 0,
max_cloudcover: int = 100,
order_offline: bool = False,
......@@ -835,7 +835,7 @@ class Tile:
sleep: int = 0,
):
"""
function to download L1C products using EODAG
Function to download l1c products using EODAG
"""
if provider == "scihub":
NUM_THREADS = 4
......
......@@ -200,3 +200,100 @@ def get_latest_s2c_version_path(l1c_identifier):
else:
return current_path
def get_cm_dict(identifier) -> dict:
"""Returns cloudmask version from an identifier string.
:param string: string from which to extract the version name.
can be a cloudmask or an indice identifier
"""
returned_val = None
try:
pat = re.compile(r".*(?P<cm_version>CM00[1-2])")
returned_val = pat.match(identifier).groupdict()
except:
try:
pat = re.compile(
r".*(?P<cm_version>CM003)"
+ "-PRB(?P<probability>.*)"
+ "-ITER(?P<iterations>.*)"
)
returned_val = pat.match(identifier).groupdict()
except:
try:
pat = re.compile(
r".*(?P<cm_version>CM004)"
+ "-CSH(?P<cld_shad>.*)"
+ "-CMP(?P<cld_med_prob>.*)"
+ "-CHP(?P<cld_hi_prob>.*)"
+ "-TCI(?P<thin_cir>.*)"
+ "-ITER(?P<iterations>.*)"
)
returned_val = pat.match(identifier).groupdict()
except:
pass
if returned_val:
if "probability" not in returned_val:
returned_val["probability"] = 1
if "iterations" not in returned_val:
returned_val["iterations"] = 5
if "cld_shad" not in returned_val:
returned_val["cld_shad"] = True
if "cld_med_prob" not in returned_val:
returned_val["cld_med_prob"] = True
if "cld_hi_prob" not in returned_val:
returned_val["cld_hi_prob"] = True
if "thin_cir" not in returned_val:
returned_val["thin_cir"] = True
returned_val["probability"] = int(returned_val["probability"])
returned_val["iterations"] = int(returned_val["iterations"])
returned_val["cld_shad"] = returned_val["cld_shad"] in ["1", 1, True, "True"]
returned_val["cld_med_prob"] = returned_val["cld_med_prob"] in ["1", 1, True, "True"]
returned_val["cld_hi_prob"] = returned_val["cld_hi_prob"] in ["1", 1, True, "True"]
returned_val["thin_cir"] = returned_val["thin_cir"] in ["1", 1, True, "True"]
return returned_val
def get_cm_string_from_dict(cm_dict) -> str:
if cm_dict:
if cm_dict["cm_version"] == "CM001":
cm_string = "CM001"
elif cm_dict["cm_version"] == "CM002":
cm_string = "CM002-B11"
elif cm_dict["cm_version"] == "CM003":
cm_string = (
"CM003" +
"-PRB" + str(cm_dict["probability"]) +
"-ITER" + str(cm_dict["iterations"])
)
elif cm_dict["cm_version"] == "CM004":
cm_string = (
"CM004" +
"-CSH" + str(1 * cm_dict["cld_shad"]) +
"-CMP" + str(1 * cm_dict["cld_med_prob"]) +
"-CHP" + str(1 * cm_dict["cld_hi_prob"]) +
"-TCI" + str(1 * cm_dict["thin_cir"]) +
"-ITER" + str(cm_dict["iterations"])
)
else:
cm_string = None
else:
cm_string = None
return cm_string
def get_indice_from_identifier(identifier) -> str:
indice = (identifier.replace(".", "_").split("_")[7]).upper()
return indice
# def get_cloudmask_indice_dict_from_strings(
# cm_string: str = None,
# indice_string: str = None,
# ) -> dict:
# """Returns a cloudmas or indice dict for FamilyProduct class
# :param string: string from which to extract the version name.
# can be a cloudmask or an indice identifier
# """
# returned_val = None
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment