Skip to content
Snippets Groups Projects
products.py 25.8 KiB
Newer Older
Jeremy Commins's avatar
Jeremy Commins committed
# -*- coding: utf-8 -*-

"""
Module for managing products and tiles in the library and temp folders.
"""

import pathlib
import shutil
import logging
import re

from pathlib import Path
# type annotations
from typing import List, Tuple, Optional
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
from .config import Config, SHARED_DATA
from .xmlparser import MetadataParser
from .sen2cor import process_sen2cor
from .cloud_mask import create_cloud_mask, create_cloud_mask_v2, create_cloud_mask_b11
Jeremy Commins's avatar
Jeremy Commins committed
from .indices import IndicesCollection
from .colormap import create_l2a_ql, create_l1c_ql
Jeremy Commins's avatar
Jeremy Commins committed

s2_tiles_index = SHARED_DATA.get("tiles_index")

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class Product:
Jeremy Commins's avatar
Jeremy Commins committed
    """ Base product template class. This class is designed to be
    instanciated only by sub-classes.
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
    :param identifier: L1C product's identifier or SAFE filename.
    :param tile: tile's name.
    :param path: parent's SAFE folder path.
Jeremy Commins's avatar
Jeremy Commins committed
    """
    _library_path = Path("")
    _metadata_filename_templates = ()

    def __new__(cls, *args, **kwargs):
Jeremy Commins's avatar
Jeremy Commins committed
        """Prevents from direct instantiation."""
Jeremy Commins's avatar
Jeremy Commins committed
        if cls is Product:
            raise TypeError("Product may not be instantiated")
        return object.__new__(cls)

    def __init__(
            self,
            identifier: str = None,
            tile: str = None,
            path: pathlib.PosixPath = None
    ) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        if identifier is None:
            raise ValueError("Product identifier is empty")
        else:
            self.identifier = identifier.upper().replace(".SAFE", "")

        if tile is None:
            try:
                self.tile = Product.get_tile(identifier)
                self._tiled = True
            except IndexError:
                logger.error("Tile must be specified for old products")
                raise
        else:
            self.tile = tile
            try:
                Product.get_tile(identifier)
                self._tiled = True
            except IndexError:
                self._tiled = False

        self.safe = self.identifier + ".SAFE"
        self.library_path = self._library_path / self.tile / self.safe

        if path is None:
            self.path = self.library_path
            logger.debug("No path specified, using library's: {}".format(self.path))
        else:
            self.path = path / self.safe

        if self.path.absolute() is self.library_path.absolute():
            self._library_product = True
        else:
            self._library_product = False

        self._metadata_path = None
        self._metadata_parser = None

    @staticmethod
Jeremy Commins's avatar
Jeremy Commins committed
    def get_tile(string) -> str:
        """Returns tile name from a string.
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
        :param string: string from which to extract the tile name.
Jeremy Commins's avatar
Jeremy Commins committed
        """
Jeremy Commins's avatar
Jeremy Commins committed
        return re.findall("_T([0-9]{2}[A-Z]{3})_", string)[0]

    def _update_metadata_parser(self) -> None:
        """Instanciates the metadaparser object."""
Jeremy Commins's avatar
Jeremy Commins committed
        if self._metadata_parser is None:
            try:
                self._get_metadata_path()
                self._metadata_parser = MetadataParser(metadata_path=self._metadata_path, tile=self.tile)
            except Exception as e:
                logger.debug("{}".format(e))
Jeremy Commins's avatar
Jeremy Commins committed
                logger.error("{}: could not load metadata: {}".format(self.identifier, self._metadata_path))

    def _get_metadata_path(self) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        """Searchs and sets the metadata path of the product."""
Jeremy Commins's avatar
Jeremy Commins committed
        logger.debug("{}: looking for metadata path".format(self.identifier))
        if self._tiled:
            filename = self._tiled_metadata
        else:
            filename = self.identifier.replace("_PRD_", "_MTD_").replace("_MSI", "_SAF") + ".xml"
        metadata_path = self.path / filename
        if not metadata_path.exists():
            self._metadata_path = None
            logger.debug("{} metadata file not found".format(self.safe))
        self._metadata_path = metadata_path
        logger.debug("Found: {}".format(self._metadata_path))

Jeremy Commins's avatar
Jeremy Commins committed
    def _get_metadata_value(self, key: str) -> str:
        """Returns a metadata value.

        :param key: tag of the metadata to search.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        self._update_metadata_parser()
        return self._metadata_parser.get_metadata_value(key=key)

Jeremy Commins's avatar
Jeremy Commins committed
    def _get_band_path(self, key: str, res=None) -> str:
        """Returns a band path according to a key.

        :param key: tag of the metadata to search.
Jeremy Commins's avatar
Jeremy Commins committed
        """
        self._update_metadata_parser()
        return self._metadata_parser.get_band_path(key=key, res=res)

    @property
    def in_library(self) -> bool:
Jeremy Commins's avatar
Jeremy Commins committed
        """Is the product in the library folder."""
Jeremy Commins's avatar
Jeremy Commins committed
        return self.library_path.exists()

Jeremy Commins's avatar
Jeremy Commins committed
    def archive(self) -> "Product":
        """Moves the product's SAFE folder to the library folder."""
Jeremy Commins's avatar
Jeremy Commins committed
        logger.info("archiving {} to {}".format(self.path, self.library_path))
        self.library_path.parent.mkdir(exist_ok=True)
        shutil.move(str(self.path), str(self.library_path.parent))
Jeremy Commins's avatar
Jeremy Commins committed
        return self
Jeremy Commins's avatar
Jeremy Commins committed

    def __str__(self) -> Optional[str]:
Jeremy Commins's avatar
Jeremy Commins committed
        try:
            return str(self.safe)
        except TypeError:
            return None


class L1cProduct(Product):
Jeremy Commins's avatar
Jeremy Commins committed
    """L1C product class.
Jeremy Commins's avatar
Jeremy Commins committed

    :param identifier: L1C product's identifier or SAFE filename.
    :param tile: product's tile.
    :param path: parent folder.
    """
    _library_path = Path(Config().get("l1c_path"))
    _tiled_metadata = "MTD_MSIL1C.xml"

    def __init__(self,
                 identifier: str = None,
                 tile: str = None,
                 path: str = None):
Jeremy Commins's avatar
Jeremy Commins committed
        super().__init__(identifier=identifier, tile=tile, path=path)

        if not re.match(r".*L1C_.*", self.identifier):
            raise ValueError("Invalid L1C product name")

    def process_l2a(self, reprocess: bool = False) -> "L1cProduct":
Jeremy Commins's avatar
Jeremy Commins committed
        """ process sen2cor """
        logger.info("{}: processing L2A".format(self.identifier))

        l2a_identifier = self.identifier.replace("L1C_", "L2A_").replace("_OPER_", "_USER_")
Jeremy Commins's avatar
Jeremy Commins committed
        l2a_path = self.path.parent / (l2a_identifier + ".SAFE")
        l2a_path_tmp = l2a_path.parent / (l2a_path.stem + '.tmp')
Jeremy Commins's avatar
Jeremy Commins committed
        l2a_prod = L2aProduct(l2a_identifier, self.tile, l2a_path.parent)

        if not l2a_prod.in_library and not l2a_path.exists():
            process_it = True
        else:
            if not reprocess:
                logger.info("{} already exists.".format(l2a_identifier))
                process_it = False
            else:
                if l2a_prod.in_library:
                    shutil.rmtree(str(l2a_prod.path))
                    logger.info("Deleted {}".format(l2a_prod.path))
                else:
                    # if reprocessing from the temp folder
                    shutil.rmtree(str(l2a_path))
                    logger.info("Deleted {}".format(l2a_path))

                process_it = True

        if process_it:
            process_sen2cor(self.path, l2a_path_tmp, self.processing_baseline)
Jeremy Commins's avatar
Jeremy Commins committed
            # sen2cor creates the L2A in the parent folder of the L1C.
            # Therefore, if the L1C computed is in the L1C library folder,
            # the newly L2A must be moved to his L2A library folder.
            sorted(l2a_path_tmp.glob("*.SAFE"))[0].rename(l2a_path)
Jeremy Commins's avatar
Jeremy Commins committed
            if self._library_product:
                l2a_prod.archive()
Jeremy Commins's avatar
Jeremy Commins committed
        return self

    def process_ql(self,
                   reprocess: bool = False,
                   out_path: pathlib.PosixPath = None,
                   out_resolution: Tuple[int, int] = (100, 100),
                   jpg = False,
                   ) -> "L1cProduct":
        """
        """
        logger.info("{}: processing L1C Quicklook".format(self.identifier))

        if jpg:
            ql_filename = self.identifier + "_QL.jpg"
        else:
            ql_filename = self.identifier + "_QL.tif"
        
        if out_path is None:
            ql_folder = self.library_path.parent / "QL"
            ql_folder.mkdir(parents=True, exist_ok=True)
            ql_path = ql_folder / ql_filename
        else:
            ql_path = Path(out_path)
            
        if ql_path.exists() and not reprocess:
            logger.info("{} Quicklook already exists".format(self.identifier))
        else:
            create_l1c_ql(tci = self.tci,
                          out_path = ql_path,
                          out_resolution = out_resolution,
                          jpg = jpg)
            self.user_ql = ql_path

        return ql_path
Jeremy Commins's avatar
Jeremy Commins committed
    # METADATA

    @property
    def product_start_time(self):
        return self._get_metadata_value(key="PRODUCT_START_TIME")

    @property
    def product_stop_time(self):
        return self._get_metadata_value(key="PRODUCT_STOP_TIME")

    @property
    def product_uri(self):
        return self._get_metadata_value(key="PRODUCT_URI")

    @property
    def product_type(self):
        return self._get_metadata_value(key="PRODUCT_TYPE")

    @property
    def processing_baseline(self):
        return self._get_metadata_value(key="PROCESSING_BASELINE")

    @property
    def generation_time(self):
        return self._get_metadata_value(key="GENERATION_TIME")

    @property
    def spacecraft_name(self):
        return self._get_metadata_value(key="SPACECRAFT_NAME")

    @property
    def datatake_type(self):
        return self._get_metadata_value(key="DATATAKE_TYPE")

    @property
    def datatake_sensing_start(self):
        return self._get_metadata_value(key="DATATAKE_SENSING_START")

    @property
    def sensing_orbit_number(self):
        return self._get_metadata_value(key="SENSING_ORBIT_NUMBER")

    @property
    def sensing_orbit_direction(self):
        return self._get_metadata_value(key="SENSING_ORBIT_DIRECTION")

    @property
    def cloud_coverage_assessment(self):
        return self._get_metadata_value(key="Cloud_Coverage_Assessment")

    @property
    def footprint(self):
        return self._get_metadata_value(key="EXT_POS_LIST")

    # BANDS PATHS
    @property
    def b01(self):
        return self._get_band_path(key="B01")

    @property
    def b02(self):
        return self._get_band_path(key="B02")

    @property
    def b03(self):
        return self._get_band_path(key="B03")

    @property
    def b04(self):
        return self._get_band_path(key="B04")

    @property
    def b05(self):
        return self._get_band_path(key="B05")

    @property
    def b06(self):
        return self._get_band_path(key="B06")

    @property
    def b07(self):
        return self._get_band_path(key="B07")

    @property
    def b08(self):
        return self._get_band_path(key="B08")

    @property
    def b8a(self):
        return self._get_band_path(key="B8A")

    @property
    def b09(self):
        return self._get_band_path(key="B09")

    @property
    def b10(self):
        return self._get_band_path(key="B10")

    @property
    def b11(self):
        return self._get_band_path(key="B11")

    @property
    def b12(self):
        return self._get_band_path(key="B12")

    @property
    def tci(self):
        return self._get_band_path(key="TCI")


class L2aProduct(Product):
Jeremy Commins's avatar
Jeremy Commins committed
    """L2A product class.
Jeremy Commins's avatar
Jeremy Commins committed

Jeremy Commins's avatar
Jeremy Commins committed
    :param identifier: L2A product's identifier or SAFE filename.
    :param tile: tile's name.
    :param path: parent's SAFE folder path.
Jeremy Commins's avatar
Jeremy Commins committed
    """
    _library_path = Path(Config().get("l2a_path"))
    _indices_library_path = Path(Config().get("indices_path"))
    _tiled_metadata = "MTD_MSIL2A.xml"

    def __init__(
            self,
            identifier: str = None,
            tile: str = None,
            path: str = None
    ) -> None:
Jeremy Commins's avatar
Jeremy Commins committed
        super().__init__(identifier=identifier, tile=tile, path=path)
        if not re.match(r".*L2A_.*", identifier):
            raise ValueError("Invalid L2A product name")

        self.indices_path = self._indices_library_path

        # user cloud mask
        self.user_cloud_mask = self.path.parent / (self.identifier + "_CLOUD_MASK.jp2")
Jeremy Commins's avatar
Jeremy Commins committed
        if not self.user_cloud_mask.exists():
            self.user_cloud_mask = None

        # user cloud mask b11
        self.user_cloud_mask_b11 = self.path.parent / (self.identifier + "_CLOUD_MASK_B11.jp2")
        if not self.user_cloud_mask_b11.exists():
            self.user_cloud_mask_b11 = None

        # user QL
        self.user_ql = self.path.parent / (self.identifier + "_QL.tif")
        if not self.user_ql.exists():
            self.user_ql = None

    def process_ql(self,
                   reprocess: bool = False,
                   out_path: pathlib.PosixPath = None,
                   out_resolution: Tuple[int, int] = (100, 100),
pascal.mouquet_ird.fr's avatar
pascal.mouquet_ird.fr committed
                   jpg = False,
                   ) -> "L2aProduct":
        """
        """
        logger.info("{}: processing L2A Quicklook".format(self.identifier))

pascal.mouquet_ird.fr's avatar
pascal.mouquet_ird.fr committed
        if jpg:
            ql_filename = self.identifier + "_QL.jpg"
        else:
            ql_filename = self.identifier + "_QL.tif"
        
        if out_path is None:
            ql_folder = self.library_path.parent
            ql_folder.mkdir(parents=True, exist_ok=True)
            ql_path = ql_folder / ql_filename
        else:
            ql_path = Path(out_path)
            
        if ql_path.exists() and not reprocess:
            logger.info("{} Quicklook already exists".format(self.identifier))
        else:
            create_l2a_ql(b02 = self.b02_10m,
                          b03 = self.b03_10m,
                          b04 = self.b04_10m,
                          out_path = ql_path,
pascal.mouquet_ird.fr's avatar
pascal.mouquet_ird.fr committed
                          out_resolution = out_resolution,
                          jpg = jpg)
            self.user_ql = ql_path

        return self

Jeremy Commins's avatar
Jeremy Commins committed
    def process_cloud_mask(self,
                           buffering: bool = True,
                           reprocess: bool = False,
                           out_path: pathlib.PosixPath = None
                           ) -> "L2aProduct":
Jeremy Commins's avatar
Jeremy Commins committed
        """
        """
        logger.info("{}: processing cloud_mask".format(self.identifier))

        cloud_mask_filename = self.identifier + "_CLOUD_MASK.tif"

        if out_path is None:
            cloud_mask_folder = self.library_path.parent
            cloud_mask_folder.mkdir(parents=True, exist_ok=True)
            cloud_mask_path = cloud_mask_folder / cloud_mask_filename
        else:
            cloud_mask_path = Path(out_path)

        if cloud_mask_path.exists() and not reprocess:
            logger.info("{} cloud mask already exists".format(self.identifier))
        else:
            create_cloud_mask(self.msk_cldprb_20m,
                              buffering=buffering,
                              erosion=-40,
                              dilatation=100,
                              out_path=cloud_mask_path)
            self.user_cloud_mask = cloud_mask_path
        return self
                              buffering: bool = True,
                              reprocess: bool = False,
                              out_path_mask = None,
                              out_path_mask_b11 = None
                              ) -> "L2aProduct":
        """
        """
        logger.info("{}: processing cloud_mask_v2".format(self.identifier))

        cloud_mask_filename = self.identifier + "_CLOUD_MASK.jp2"
        cloud_mask_b11_filename = str(Path(cloud_mask_filename).parent/Path(cloud_mask_filename).stem) + "_B11.jp2"
        if out_path_mask is None:
            cloud_mask_folder = self.library_path.parent
            cloud_mask_folder.mkdir(parents=True, exist_ok=True)
            cloud_mask_path = cloud_mask_folder / cloud_mask_filename
        else:
            cloud_mask_path = Path(out_path_mask)

        if out_path_mask_b11 is None:
            cloud_mask_folder = self.library_path.parent
            cloud_mask_folder.mkdir(parents=True, exist_ok=True)
            cloud_mask_b11_path = cloud_mask_folder / cloud_mask_b11_filename
        else:
            cloud_mask_b11_path = Path(out_path_mask_b11)

        if cloud_mask_path.exists() and not reprocess:
            logger.info("{} cloud mask v2 already exists".format(self.identifier))
        else:
            create_cloud_mask_v2(self.msk_cldprb_20m,
                                 erosion=1,
                                 dilatation=5,
                                 out_path=cloud_mask_path)
            self.user_cloud_mask = cloud_mask_path

        if cloud_mask_b11_path.exists() and not reprocess:
            logger.info("{} cloud mask v2 masked b11 already exists".format(self.identifier))
        else:
            create_cloud_mask_b11(self.user_cloud_mask,
                                  self.b11_20m,
                                  dilatation=5,
                                  out_path=cloud_mask_b11_path)
            self.user_cloud_mask_b11 = cloud_mask_b11_path

Jeremy Commins's avatar
Jeremy Commins committed

    def process_indices(self,
                        indices_list: List[str] = [],
                        nodata_clouds: bool = False,
                        quicklook: bool = False,
                        reprocess: bool = False,
                        out_path: str = None
                        ) -> "L2aProduct":
Jeremy Commins's avatar
Jeremy Commins committed
        """
        Process indices.

        :param indices_list: list of valid indices names.
        :param nodata_clouds: mask indices rasters with the cloud-mask or not.
        :param quicklook: process a quicklook or not (if available for the indice).
        :param reprocess: erase old data.
        :param out_path: if specified, a folder {identifier}_INDICES containing \
        the processed indices will be created in the out_path.
        """
        logger.info("{}: processing_indices".format(self.identifier))

        if not isinstance(indices_list, list):
            raise TypeError("Indices must be provided as a list.")

        for indice in set(indices_list):
            logger.info("Processing {}: {}".format(indice, self.identifier))
            indice_cls = IndicesCollection.get_indice_cls(indice)
            if indice_cls is None:
                print("Indices available: {}".format(IndicesCollection.available_indices))
                raise ValueError("Indice not defined")

            if out_path is None:
                # logger.info(self.identifier)
                indice_path = self.indices_path / indice.upper() / self.tile / self.identifier
Jeremy Commins's avatar
Jeremy Commins committed
            else:
                indice_path = Path(out_path) / (self.identifier + "_INDICES") / indice.upper()
            indice_path.mkdir(parents=True, exist_ok=True)
            indice_obj = indice_cls(self)
            indice_obj.process_indice(out_path=indice_path,
                                      nodata_clouds=nodata_clouds,
                                      quicklook=quicklook,
                                      reprocess=reprocess)
Jeremy Commins's avatar
Jeremy Commins committed

        return self

    # def has_indice(self, indice):
    # """
    # Returns if the indice file exists on disk.
Jeremy Commins's avatar
Jeremy Commins committed

    # :param indice: name of the indice.
    # """
    # indice_cls = IndicesCollection().get_indice_cls(indice)
Jeremy Commins's avatar
Jeremy Commins committed

    # if indice_cls is None:
    # raise ValueError("{} is not a valid indice name.".format(indice))
Jeremy Commins's avatar
Jeremy Commins committed

    # template = indice_cls.filename_template
    # ext = indice_cls.ext
    # indice_filename = template.format(product_identifier=self.identifier,
    # ext=ext)
    # return (self.indices_path / indice / self.tile / indice_filename).exists()
Jeremy Commins's avatar
Jeremy Commins committed

    # METADATA
    @property
    def product_uri_1c(self):
        return self._get_metadata_value(key="PRODUCT_URI_1C")

    @property
    def product_uri_2a(self):
        return self._get_metadata_value(key="PRODUCT_URI_2A")

    @property
    def cloud_coverage_assessment(self):
        return self._get_metadata_value(key="Cloud_Coverage_Assessment")

    @property
    def nodata_pixel_percentage(self):
        return self._get_metadata_value(key="NODATA_PIXEL_PERCENTAGE")

    @property
    def saturated_defective_pixel_percentage(self):
        return self._get_metadata_value(key="SATURATED_DEFECTIVE_PIXEL_PERCENTAGE")

    @property
    def dark_features_percentage(self):
        return self._get_metadata_value(key="DARK_FEATURES_PERCENTAGE")

    @property
    def cloud_shadow_percentage(self):
        return self._get_metadata_value(key="CLOUD_SHADOW_PERCENTAGE")

    @property
    def vegetation_percentage(self):
        return self._get_metadata_value(key="VEGETATION_PERCENTAGE")

    @property
    def not_vegetated_percentage(self):
        return self._get_metadata_value(key="NOT_VEGETATED_PERCENTAGE")

    @property
    def water_percentage(self):
        return self._get_metadata_value(key="WATER_PERCENTAGE")

    @property
    def unclassified_percentage(self):
        return self._get_metadata_value(key="UNCLASSIFIED_PERCENTAGE")

    @property
    def medium_proba_clouds_percentage(self):
        return self._get_metadata_value(key="MEDIUM_PROBA_CLOUDS_PERCENTAGE")

    @property
    def high_proba_clouds_percentage(self):
        return self._get_metadata_value(key="HIGH_PROBA_CLOUDS_PERCENTAGE")

    @property
    def thin_cirrus_percentage(self):
        return self._get_metadata_value(key="THIN_CIRRUS_PERCENTAGE")

    @property
    def cloud_coverage_percentage(self):
        return self._get_metadata_value(key="CLOUD_COVERAGE_PERCENTAGE")

    # FOOTPRINT
    @property
    def footprint(self) -> List[Tuple[float, float]]:
        """Returns product's footprint as list of coordinates couples."""
        fp_str = self._get_metadata_value(key="EXT_POS_LIST")
        fp_list = fp_str.strip().split()
        fp_coords = grouper(fp_list, 2)
        fp_coords_float = [(float(lat), float(lon)) for lon, lat in fp_coords]
        return fp_coords_float

    # BANDS
    # 10m bands
    @property
    def b02_10m(self):
        return self._get_band_path(key="B02", res="10m")

    @property
    def b03_10m(self):
        return self._get_band_path(key="B03", res="10m")

    @property
    def b04_10m(self):
        return self._get_band_path(key="B04", res="10m")

    @property
    def b08_10m(self):
        return self._get_band_path(key="B08", res="10m")

    @property
    def aot_10m(self):
        return self._get_band_path(key="AOT", res="10m")

    @property
    def wvp_10m(self):
        return self._get_band_path(key="WVP", res="10m")

    @property
    def tci_10m(self):
        return self._get_band_path(key="TCI", res="10m")

    # 20m bands
    @property
    def b02_20m(self):
        return self._get_band_path(key="B02", res="20m")

    @property
    def b03_20m(self):
        return self._get_band_path(key="B03", res="20m")

    @property
    def b04_20m(self):
        return self._get_band_path(key="B04", res="20m")

    @property
    def b05_20m(self):
        return self._get_band_path(key="B05", res="20m")

    @property
    def b06_20m(self):
        return self._get_band_path(key="B06", res="20m")

    @property
    def b07_20m(self):
        return self._get_band_path(key="B07", res="20m")

    @property
    def b8a_20m(self):
        return self._get_band_path(key="B8A", res="20m")

    @property
    def b11_20m(self):
        return self._get_band_path(key="B11", res="20m")

    @property
    def b12_20m(self):
        return self._get_band_path(key="B12", res="20m")

    @property
    def scl_20m(self):
        return self._get_band_path(key="SCL", res="20m")

    @property
    def snw_20m(self):
        """old sen2cor snow-mask's name"""
Jeremy Commins's avatar
Jeremy Commins committed
        return self._get_band_path(key="SNW")
        # except:
        # logger.warning("'snw_20m deprecated', use: 'msk_snwprb_20m'")
        # return self.msk_snwprb_20m
Jeremy Commins's avatar
Jeremy Commins committed

    @property
    def msk_snwprb_20m(self):
        """new sen2cor cloud-mask's name"""
Jeremy Commins's avatar
Jeremy Commins committed
        return self._get_band_path(key="SNW")
        # except:
        # return self._get_band_path(key="SNWPRB", res="20m")
Jeremy Commins's avatar
Jeremy Commins committed

    @property
    def cld_20m(self):
        """old sen2cor cloud-mask's name"""
Jeremy Commins's avatar
Jeremy Commins committed
        return self._get_band_path(key="CLD")
        # Except:
        # logger.warning("'cld_20m' deprecated', use: 'msk_cldprb_20m'")
        # return self.msk_cldprb_20m
Jeremy Commins's avatar
Jeremy Commins committed

    @property
    def msk_cldprb_20m(self):
        """new sen2cor cloud-mask's name"""
        # try:
        # return self._get_band_path(key="MSK_CLDPRB", res="20m")
        # except:
Jeremy Commins's avatar
Jeremy Commins committed
        return self._get_band_path(key="CLD")

    @property
    def aot_20m(self):
        return self._get_band_path(key="AOT", res="20m")

    @property
    def wvp_20m(self):
        return self._get_band_path(key="WVP", res="20m")

    @property
    def tci_20m(self):
        return self._get_band_path(key="TCI", res="20m")

    # 60m bands
    @property
    def b01_60m(self):
        return self._get_band_path(key="B01", res="60m")

    @property
    def b02_60m(self):
        return self._get_band_path(key="B02", res="60m")

    @property
    def b03_60m(self):
        return self._get_band_path(key="B03", res="60m")

    @property
    def b04_60m(self):
        return self._get_band_path(key="B04", res="60m")

    @property
    def b05_60m(self):
        return self._get_band_path(key="B05", res="60m")

    @property
    def b06_60m(self):
        return self._get_band_path(key="B06", res="60m")

    @property
    def b07_60m(self):
        return self._get_band_path(key="B07", res="60m")

    @property
    def b8a_60m(self):
        return self._get_band_path(key="B8A", res="60m")

    @property
    def b09_60m(self):
        return self._get_band_path(key="B09", res="60m")

    @property
    def b11_60m(self):
        return self._get_band_path(key="B11", res="60m")

    @property
    def b12_60m(self):
        return self._get_band_path(key="B12", res="60m")

    @property
    def scl_60m(self):
        return self._get_band_path(key="SCL", res="60m")

    @property
    def snw_60m(self):
        return self._get_band_path(key="SNW", res="60m")

    @property
    def cld_60m(self):
        return self._get_band_path(key="CLD", res="60m")

    @property
    def aot_60m(self):
        return self._get_band_path(key="AOT", res="60m")

    @property
    def wvp_60m(self):
        return self._get_band_path(key="WVP", res="60m")

    @property
    def tci_60m(self):
        return self._get_band_path(key="TCI", res="60m")