Skip to content
Snippets Groups Projects
Commit 1ee65309 authored by TGermain's avatar TGermain
Browse files

Merge branch 'master' of framagit.org:espace-dev/sen2chain

parents de90e43a edba9e6c
No related branches found
No related tags found
No related merge requests found
Pipeline #1024 passed
......@@ -14,3 +14,4 @@ geopandas>=0.5.0
scipy
pillow
python-crontab
packaging
......@@ -43,6 +43,9 @@ from .utils import (
human_size_decimal,
human_size,
get_current_Sen2Cor_version,
get_Sen2Cor_version,
get_latest_s2c_version_path,
set_permissions,
)
from .geo_utils import (
serialise_tiles_index,
......@@ -57,7 +60,6 @@ from .multi_processing import (
from .tileset import TileSet
from .jobs import Jobs, Job
__version__ = "0.7.0"
__author__ = (
"Jérémy Commins <jebins@laposte.net> & Impact <pascal.mouquet@ird.fr>"
......
......@@ -58,7 +58,10 @@ class Config:
"temporal_summaries_path": "",
"cloudmasks_path": "",
}
self._config_params["SEN2COR PATH"] = {"sen2cor_bashrc_path": ""}
self._config_params["SEN2COR PATH"] = {
"sen2cor_bashrc_path": "",
"sen2cor_alternative_bashrc_path": "",
}
self._config_params["HUBS LOGINS"] = {
"scihub_id": "",
"scihub_pwd": "",
......
shapefiles:
- name: s2_tile_centroid
path: /home/operateur/sen2chain/sen2chain/data/sentinel2_tiling_grid_centroids.shp
attr: tile_id
\ No newline at end of file
......@@ -194,7 +194,7 @@ class DataRequest:
"""Scihub API request using sentinelsat."""
logger.debug("_make_request")
logger.info(
f"Requesting images ranging from {self.start_date} to {self.end_date}"
"Requesting images ranging from {} to {}".format(self.start_date, self.end_date)
)
if self.tiles_to_keep is None:
......@@ -226,7 +226,7 @@ class DataRequest:
products = OrderedDict()
for tile in self.tiles_to_keep:
kw = query_kwargs.copy()
kw["filename"] = f"*_T{tile}_*"
kw["filename"] = "*_T{}_*".format(tile)
pp = self.api.query(**kw)
products.update(pp)
......
......@@ -6,6 +6,7 @@ Module for downloading Sentinel-2 images using EODAG
https://www.github.com/CS-SI/EODAG
"""
import logging
import shapefile
import os
......@@ -334,11 +335,5 @@ def S2cEodag_download(
pool.close()
pool.join()
......@@ -398,7 +398,6 @@ class Job:
f.flush()
if not self.tasks.empty:
## Cleaning before
if clean_before:
logger.info("Cleaning Tiles")
......@@ -492,13 +491,12 @@ class Job:
+ "\n".join(l1c_process_list)
+ "\n"
)
# Remove downloaded L1C
for index, row in self.tasks.iterrows():
if "l1c" in str(row.remove).lower():
t = Tile(row.tile)
t.remove_l1c([p for p in downloaded_products if row.tile in p])
# Traitement des L2A (clouds)
logger.info("Computing cloudmasks")
......@@ -613,9 +611,7 @@ class Job:
# Remove L2A
# todo
# Cleaning after
if clean_after:
logger.info("Cleaning Tiles")
......
......@@ -13,9 +13,14 @@ import os
from pathlib import Path
# type annotations
from typing import List, Tuple, Optional
from .utils import grouper, setPermissions, get_current_Sen2Cor_version
from typing import (
List,
Tuple,
Optional,
Union
)
from packaging import version
from .utils import grouper, set_permissions, get_Sen2Cor_version, get_latest_s2c_version_path
from .config import Config, SHARED_DATA
from .xmlparser import MetadataParser, Sen2ChainMetadataParser
from .sen2cor import process_sen2cor
......@@ -204,8 +209,12 @@ class L1cProduct(Product):
if not re.match(r".*L1C_.*", self.identifier):
raise ValueError("Invalid L1C product name")
def process_l2a(self, reprocess: bool = False) -> "L1cProduct":
"""process sen2cor"""
def process_l2a(
self,
reprocess: bool = False,
s2c_path: Union[str, pathlib.PosixPath] = None,
) -> "L1cProduct":
""" process with sen2cor """
logger.info("{}: processing L2A".format(self.identifier))
l2a_identifier = self.identifier.replace("L1C_", "L2A_").replace(
......@@ -213,13 +222,15 @@ class L1cProduct(Product):
)
l2a_path = self.path.parent / (l2a_identifier + ".SAFE")
l2a_prod = L2aProduct(l2a_identifier, self.tile, l2a_path.parent)
process_it = False
if not l2a_prod.in_library and not l2a_path.exists():
process_it = True
s2c_path = s2c_path or get_latest_s2c_version_path(self.identifier)
if s2c_path:
process_it = True
else:
if not reprocess:
logger.info("{} already exists.".format(l2a_identifier))
process_it = False
else:
if l2a_prod.in_library:
shutil.rmtree(str(l2a_prod.path))
......@@ -228,19 +239,27 @@ class L1cProduct(Product):
# if reprocessing from the temp folder
shutil.rmtree(str(l2a_path))
logger.info("Deleted {}".format(l2a_path))
process_it = True
#### check here if better to delete after sen2cor version check compatible... ??
s2c_path = s2c_path or get_latest_s2c_version_path(self.identifier)
if s2c_path:
process_it = True
if process_it:
process_sen2cor(self.path, l2a_path, self.processing_baseline)
process_sen2cor(
l1c_product_path = self.path,
l2a_product_path = l2a_path,
s2c_path = s2c_path,
pb = self.processing_baseline,
)
# sen2cor creates the L2A in the parent folder of the L1C.
# Therefore, if the L1C computed is in the L1C library folder,
# the newly L2A must be moved to his L2A library folder.
if self._library_product:
l2a_prod.archive()
l2a_prod = L2aProduct(l2a_identifier)
l2a_prod.setPermissions()
l2a_prod.update_md()
l2a_prod.set_permissions()
l2a_prod.update_md(sen2cor_version = get_Sen2Cor_version(s2c_path))
return self
def process_ql(
......@@ -905,8 +924,11 @@ class L2aProduct(Product):
return self
def setPermissions(self):
setPermissions(self.path)
def set_permissions(self):
try:
set_permissions(self.path)
except:
logger.info("Cannot set permissions to: {}".format(self.identifier))
# def has_indice(self, indice):
# """
......
......@@ -11,7 +11,7 @@ import subprocess
from typing import Union
from .config import Config
from .utils import get_current_Sen2Cor_version
from .utils import get_Sen2Cor_version, get_latest_s2c_version_path
logger = logging.getLogger(__name__)
......@@ -21,6 +21,7 @@ logging.basicConfig(level=logging.INFO)
def process_sen2cor(
l1c_product_path: Union[str, pathlib.PosixPath],
l2a_product_path: Union[str, pathlib.PosixPath],
s2c_path: Union[str, pathlib.PosixPath] = None,
pb: str = "99.99",
resolution: int = 10,
) -> None:
......@@ -36,67 +37,53 @@ def process_sen2cor(
# TODO: Add 60m resolution.
sen2cor_bashrc_path = Config().get("sen2cor_bashrc_path")
# ~ s2c_v = next(iter(re.findall('Sen2Cor-(\d{2}\.\d{2}\.\d{2})', str(sen2cor_bashrc_path))), None)
s2c_v = get_current_Sen2Cor_version()
l2a_product_path_tmp = l2a_product_path.parent / (
l2a_product_path.stem + ".tmp"
)
if s2c_v == "02.05.05":
logger.info(
"sen2cor {} processing: {}".format(s2c_v, l1c_product_path)
)
if resolution == 10:
logger.info("sen2cor processing 20 m: {}".format(l1c_product_path))
command1 = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --resolution {res} {l1c_folder}".format(
sen2cor_bashrc=str(sen2cor_bashrc_path),
res=20,
l1c_folder=str(l1c_product_path),
)
process1 = subprocess.run(command1.split(", "))
logger.info("sen2cor processing 10 m: {}".format(l1c_product_path))
command2 = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --resolution {res} {l1c_folder}".format(
sen2cor_bashrc=str(sen2cor_bashrc_path),
res=10,
l1c_folder=str(l1c_product_path),
)
process2 = subprocess.run(command2.split(", "))
else:
logger.debug("sen2cor processing: {}".format(l1c_product_path))
command = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --resolution {resolution} {l1c_folder}".format(
sen2cor_bashrc=str(sen2cor_bashrc_path),
resolution=resolution,
l1c_folder=str(l1c_product_path),
s2c_path = s2c_path or get_latest_s2c_version_path(pathlib.Path(l1c_product_path).stem)
if s2c_path:
s2c_v = get_Sen2Cor_version(s2c_path)
l2a_product_path_tmp = l2a_product_path.parent / (l2a_product_path.stem + '.tmp')
if s2c_v == '02.05.05':
logger.info("sen2cor {} processing: {}".format(s2c_v, l1c_product_path))
if resolution == 10:
logger.info("sen2cor processing 20 m: {}".format(l1c_product_path))
command1 = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --resolution {res} {l1c_folder}".format(
sen2cor_bashrc=str(s2c_path),
res=20,
l1c_folder=str(l1c_product_path)
)
process1 = subprocess.run(command1.split(", "))
logger.info("sen2cor processing 10 m: {}".format(l1c_product_path))
command2 = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --resolution {res} {l1c_folder}".format(
sen2cor_bashrc=str(s2c_path),
res=10,
l1c_folder=str(l1c_product_path)
)
process2 = subprocess.run(command2.split(", "))
else:
logger.debug("sen2cor processing: {}".format(l1c_product_path))
command = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --resolution {resolution} {l1c_folder}".format(
sen2cor_bashrc=str(s2c_path),
resolution=resolution,
l1c_folder=str(l1c_product_path)
)
process = subprocess.run(command.split(", "))
elif s2c_v in ['02.08.00','02.09.00', '02.10.01']:
logger.info("sen2cor {} processing: {}".format(s2c_v, l1c_product_path))
command = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --processing_baseline {processing_baseline} --output_dir {out_dir} {l1c_folder}".format(
sen2cor_bashrc = str(s2c_path),
processing_baseline = pb,
out_dir = l2a_product_path_tmp,
l1c_folder = str(l1c_product_path)
)
process = subprocess.run(command.split(", "))
elif s2c_v in ["02.08.00", "02.09.00", "02.10.01"]:
logger.info(
"sen2cor {} processing: {}".format(s2c_v, l1c_product_path)
)
command = "/bin/bash, -c, source {sen2cor_bashrc} && L2A_Process --processing_baseline {processing_baseline} --output_dir {out_dir} {l1c_folder}".format(
sen2cor_bashrc=str(sen2cor_bashrc_path),
processing_baseline=pb,
out_dir=l2a_product_path_tmp,
l1c_folder=str(l1c_product_path),
)
process = subprocess.run(command.split(", "))
sorted(l2a_product_path_tmp.glob("*.SAFE"))[0].rename(
l2a_product_path.parent / (l2a_product_path.stem + ".SAFE")
)
l2a_product_path_tmp.rmdir()
elif s2c_v is not None:
logger.info(
"Sen2Cor version {} is not compatible with Sen2Chain".format(s2c_v)
)
sorted(l2a_product_path_tmp.glob("*.SAFE"))[0].rename(l2a_product_path.parent / (l2a_product_path.stem + '.SAFE'))
l2a_product_path_tmp.rmdir()
elif s2c_v is not None:
logger.info('Provided Sen2Cor version {} is not compatible with Sen2Chain yet'.format(s2c_v))
else:
logger.info('Could not determine Sen2Cor version from provided path, please check pattern "Sen2Cor-**.**.**" is in path')
else:
logger.info(
'Could not determine sen2cor version from path, please check pattern "Sen2Cor-**.**.**" is in path'
)
logger.info('Could not determine a compatible installed sen2cor version to process this product')
......@@ -23,9 +23,10 @@ from threading import Thread
# type annotations
from typing import List, Dict, Iterable
from packaging import version
from .config import Config, SHARED_DATA
from .utils import str_to_datetime, human_size, getFolderSize
from .utils import str_to_datetime, human_size, getFolderSize, get_latest_s2c_version_path
from .indices import IndicesCollection
from .products import (
L1cProduct,
......@@ -91,10 +92,12 @@ class ProductsList:
return TileProduct(prod, max_date, self._dict[prod]["cloud_cover"])
def filter_dates(
self, date_min: str = None, date_max: str = None
self,
date_min: str = None,
date_max: str = None,
date_exact: str = None,
) -> "ProductsList":
"""Filters products list in a time range.
:param date_min: oldest date.
:param date_max: newest date.
"""
......@@ -105,14 +108,52 @@ class ProductsList:
str_to_datetime(date_max, "ymd") if date_max else self.last.date
)
filtered = ProductsList()
for k, v in self._dict.items():
if min_date.date() <= v["date"].date() <= max_date.date():
filtered[k] = {
"date": v["date"],
"cloud_cover": v["cloud_cover"],
}
return filtered
if date_exact:
if type(date_exact) is list:
for date in date_exact:
if len(date) == 2:
for k, v in self._dict.items():
if (datetime.strftime(v["date"].date(), "%m") == date) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
elif len(date) == 4:
for k, v in self._dict.items():
if (str(v["date"].date().year) == date) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
elif len(date) == 6:
for k, v in self._dict.items():
if (datetime.strftime(v["date"].date(), "%Y%m") == date) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
elif len(date) == 10:
for k, v in self._dict.items():
if (datetime.strftime(v["date"].date(), "%Y-%m-%d") == date) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
else:
if len(date_exact) == 2:
for k, v in self._dict.items():
if (datetime.strftime(v["date"].date(), "%m") == date_exact) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
elif len(date_exact) == 4:
for k, v in self._dict.items():
if (str(v["date"].date().year) == date_exact) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
elif len(date_exact) == 6:
for k, v in self._dict.items():
if (datetime.strftime(v["date"].date(), "%Y%m") == date_exact) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
elif len(date_exact) == 10:
for k, v in self._dict.items():
if (datetime.strftime(v["date"].date(), "%Y-%m-%d") == date_exact) and (min_date.date() <= v["date"].date() <= max_date.date()):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
else:
for k, v in self._dict.items():
if min_date.date() <= v["date"].date() <= max_date.date():
filtered[k] = {
"date": v["date"],
"cloud_cover": v["cloud_cover"]
}
return filtered
def filter_clouds(
self, cover_min: int = 0, cover_max: int = 100
) -> "ProductsList":
......@@ -135,7 +176,24 @@ class ProductsList:
"cloud_cover": v["cloud_cover"],
}
return filtered
def filter_processing_baseline(
self,
pb_min: str = "0.0",
pb_max: str = "9999",
) -> "ProductsList":
"""Filters products list with processing baseline version
:param pb_min: minimum processing baseline version
:param pb_max: maximum processing baseline version
"""
filtered = ProductsList()
for k, v in self._dict.items():
if "L1C" in k:
processing_baseline = L1cProduct(identifier=k).processing_baseline
if version.parse(pb_min) <= version.parse(processing_baseline) <= version.parse(pb_max):
filtered[k] = {"date": v["date"], "cloud_cover": v["cloud_cover"]}
return filtered
def __len__(self) -> int:
return len(self._dict)
......@@ -585,13 +643,35 @@ class Tile:
# @property
# def cloudmasks_missings(self) -> "ProductsList":
# """Returns tile's L2A products that don't have a cloud mask as a ProductsList."""
# prods_list = ProductsList()
# missings_l2a_set = set(self.l2a.products) - {identifier for identifier in self.cloudmasks.products}
# for prod in missings_l2a_set:
# prods_list[prod] = {"date": self._products["l2a"][prod].date,
# "cloud_cover": self._products["l2a"][prod].cloud_cover}
# return prods_list
# """Returns tile's L2A products that don't have a cloud mask as a ProductsList."""
# prods_list = ProductsList()
# missings_l2a_set = set(self.l2a.products) - {identifier for identifier in self.cloudmasks.products}
# for prod in missings_l2a_set:
# prods_list[prod] = {"date": self._products["l2a"][prod].date,
# "cloud_cover": self._products["l2a"][prod].cloud_cover}
# return prods_list
@property
def l1c_processable(self) -> "ProductsList":
"""Returns tile's L1C products that can be processes with sen2cor installation
"""
prod_list = ProductsList()
processable_l1c_set = {identifier for identifier in self.l1c.products if get_latest_s2c_version_path(identifier)}
for prod in processable_l1c_set:
prod_list[prod] = {"date": self._products["l1c"][prod].date,
"cloud_cover": self._products["l1c"][prod].cloud_cover}
return prod_list
# def cloudmasks_missing(self,
# cm_version: str = "cm001",
# probability: int = 1,
# iterations: int = 5,
# cld_shad: bool = True,
# cld_med_prob: bool = True,
# cld_hi_prob: bool = True,
# thin_cir: bool = True,
# ) -> "ProductsList":
# """Returns tile's L2A products that don't have a cloud mask as a ProductsList."""
def cloudmasks_missing(
self,
......@@ -720,7 +800,7 @@ class Tile:
"cloud_cover": self._products["l2a"][prod].cloud_cover,
}
return prodlist
def get_l1c(
self,
provider: str = "peps",
......@@ -825,6 +905,8 @@ class Tile:
p_60m_missing: bool = False,
date_min: str = None,
date_max: str = None,
cover_min: int = 0,
cover_max: int = 100,
nb_proc: int = 4,
):
"""
......@@ -838,6 +920,9 @@ class Tile:
product.identifier
for product in self.l2a.filter_dates(
date_min=date_min, date_max=date_max
).filter_clouds(
cover_min = cover_min,
cover_max = cover_max
)
if not L2aProduct(product.identifier).b01_60m
]
......@@ -846,6 +931,9 @@ class Tile:
product.identifier
for product in self.l2a.filter_dates(
date_min=date_min, date_max=date_max
).filter_clouds(
cover_min = cover_min,
cover_max = cover_max
)
]
if l2a_remove_list:
......@@ -856,6 +944,9 @@ class Tile:
p.identifier
for p in self.l2a_missings.filter_dates(
date_min=date_min, date_max=date_max
).filter_clouds(
cover_min = cover_min,
cover_max = cover_max
)
)
)
......@@ -871,29 +962,6 @@ class Tile:
if l1c_process_list:
l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=nb_proc)
# def compute_cloudmasks(self,
# version: str = "cm001",
# probability: int = 1,
# iterations: int = 5,
# date_min: str = None,
# date_max: str = None,
# nb_proc: int = 4):
# """
# Compute all missing cloud masks for l2a products
# """
# cld_l2a_process_list = []
# cld_l2a_process_list.append(list(p.identifier for p in self.cloudmasks_missing.filter_dates(date_min = date_min, date_max = date_max)))
# cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list))
# if cld_l2a_process_list:
# logger.info("{} l2a products to process:".format(len(cld_l2a_process_list)))
# logger.info("{}".format(cld_l2a_process_list))
# else:
# logger.info("All cloud masks already computed")
# cld_res = False
# if cld_l2a_process_list:
# cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=nb_proc)
def compute_cloudmasks(
self,
cm_version: str = "cm001",
......@@ -1303,7 +1371,8 @@ class Tile:
move_path.parent.mkdir(exist_ok=True)
# shutil.move(str(l1c.path), str(move_path.parent))
setuptools.distutils.dir_util.copy_tree(
str(l1c.path), str(move_path)
str(l1c.path),
str(move_path),
)
setuptools.distutils.dir_util.remove_tree(str(l1c.path))
l1c.path.symlink_to(
......
......@@ -8,10 +8,14 @@ import os, stat
import logging
from itertools import zip_longest
from datetime import datetime
from typing import Iterable
from typing import Iterable, Union
import re
import pathlib
from packaging import version
from .config import Config
# from .products import L1cProduct
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
......@@ -19,10 +23,12 @@ logging.basicConfig(level=logging.INFO)
# useful dates formats encountered when working with Sentinel-2 data
DATES_FORMATS = dict(
ymd="%Y-%m-%d", filename="%Y%m%dT%H%M%S", metadata="%Y-%m-%dT%H:%M:%S.%fZ"
ymd="%Y-%m-%d",
Ymd = "%Y%m%d",
filename="%Y%m%dT%H%M%S",
metadata="%Y-%m-%dT%H:%M:%S.%fZ"
)
def format_word(word: str) -> str:
"""
Formats a word to lower case and removes heading and trailing spaces.
......@@ -109,8 +115,7 @@ def getFolderSize(folder, follow_symlinks=False):
pass
return total_size
def setPermissions(path):
def set_permissions(path):
os.chmod(
str(path), os.stat(str(path)).st_mode | stat.S_IWGRP | stat.S_IWUSR
)
......@@ -127,11 +132,14 @@ def setPermissions(path):
name = dir_path + "/" + f
# os.chmod(name, os.stat(name).st_mode | 0o064)
# os.chmod(name, os.stat(name).st_mode & ~0o003)
os.chmod(name, os.stat(name).st_mode | stat.S_IWGRP | stat.S_IWUSR)
os.chmod(
name, os.stat(name).st_mode & ~stat.S_IXOTH & ~stat.S_IWOTH
name,
os.stat(name).st_mode | stat.S_IWGRP | stat.S_IWUSR
)
os.chmod(
name,
os.stat(name).st_mode & ~stat.S_IXOTH & ~stat.S_IWOTH
)
def get_current_Sen2Cor_version():
"""Returns your current Sen2Cor version"""
......@@ -145,10 +153,50 @@ def get_current_Sen2Cor_version():
None,
)
def get_tile(identifier) -> str:
"""Returns tile name from a string.
:param string: string from which to extract the tile name.
"""
return re.findall("_T([0-9]{2}[A-Z]{3})_", identifier)[0]
def get_Sen2Cor_version(path: Union[str, pathlib.PosixPath] = None):
""" Returns
- the current Sen2Cor version if no argument provided
- the Sen2Cor version if a Sen2Cor path is provided
"""
if not path:
path = Config().get("sen2cor_bashrc_path")
return next(iter(re.findall('Sen2Cor-(\d{2}\.\d{2}\.\d{2})', str(path))), None)
def get_latest_s2c_version_path(l1c_identifier):
""" Returns
- the path of the first Sen2Cor compatible version, starting from sen2cor_bashrc_path then sen2cor_alternative_bashrc_path
- None if no compatible Sen2Cor version is provided
"""
from .products import L1cProduct
current_path = Config().get("sen2cor_bashrc_path")
alternate_path = Config().get("sen2cor_alternative_bashrc_path")
if version.parse(L1cProduct(l1c_identifier).processing_baseline) >= version.parse("4.0"):
if version.parse(get_Sen2Cor_version(current_path)) >= version.parse("2.10.01"):
return current_path
else:
if version.parse(get_Sen2Cor_version(alternate_path)) >= version.parse("2.10.01"):
return alternate_path
else:
return None
elif version.parse(L1cProduct(l1c_identifier).processing_baseline) >= version.parse("2.06"):
return current_path
else:
if version.parse(get_Sen2Cor_version(current_path)) >= version.parse("2.10.01"):
if alternate_path:
if version.parse(get_Sen2Cor_version(alternate_path)) >= version.parse("2.10.01"):
return None
else:
return alternate_path
else:
return None
else:
return current_path
......@@ -10,7 +10,7 @@ import re
import xml.etree.ElementTree as et
import sen2chain
from .utils import get_current_Sen2Cor_version
from .utils import get_Sen2Cor_version
from .config import SHARED_DATA, Config
......@@ -221,7 +221,9 @@ class Sen2ChainMetadataParser:
)
self._root.find("SEN2COR_VERSION").text = get_current_Sen2Cor_version()
self._tree.write(
str(self.xml_path), encoding="UTF-8", xml_declaration=True
str(self.xml_path),
encoding="UTF-8",
xml_declaration=True
)
def set_metadata(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment