Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
Sen2Chain
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ESPACE-DEV
Sen2Chain
Commits
d05895f0
Commit
d05895f0
authored
5 years ago
by
pascal.mouquet_ird.fr
Browse files
Options
Downloads
Patches
Plain Diff
Finished new indices integration
parent
e9f51503
No related branches found
No related tags found
No related merge requests found
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
sen2chain/colormap.py
+1
-1
1 addition, 1 deletion
sen2chain/colormap.py
sen2chain/indices.py
+241
-172
241 additions, 172 deletions
sen2chain/indices.py
sen2chain/indices_functions.py
+118
-0
118 additions, 0 deletions
sen2chain/indices_functions.py
with
360 additions
and
173 deletions
sen2chain/colormap.py
+
1
−
1
View file @
d05895f0
...
@@ -264,7 +264,7 @@ def create_rvb(
...
@@ -264,7 +264,7 @@ def create_rvb(
#~ cmap = (128 * (cmap/10000 + 1) * ((cmap+10000) > 0)).astype(np.uint8)
#~ cmap = (128 * (cmap/10000 + 1) * ((cmap+10000) > 0)).astype(np.uint8)
cmap
=
np
.
where
(
raster_band
==
16383
,
stretch
[
0
],
raster_band
)
cmap
=
np
.
where
(
raster_band
==
16383
,
stretch
[
0
],
raster_band
)
cmap
=
(
255
*
(
cmap
-
stretch
[
0
]).
astype
(
np
.
float
)
/
(
stretch
[
1
]
-
stretch
[
0
])).
astype
(
np
.
uint8
)
cmap
=
np
.
clip
(
(
255
*
(
cmap
-
stretch
[
0
]).
astype
(
np
.
float
)
/
(
stretch
[
1
]
-
stretch
[
0
]))
,
0
,
255
)
.
astype
(
np
.
uint8
)
cmap
=
np
.
where
(
cld_reproj
==
1
,
cld_val
,
cmap
)
cmap
=
np
.
where
(
cld_reproj
==
1
,
cld_val
,
cmap
)
# compute default transform, width and height to fit the out resolution
# compute default transform, width and height to fit the out resolution
...
...
This diff is collapsed.
Click to expand it.
sen2chain/indices.py
+
241
−
172
View file @
d05895f0
...
@@ -14,7 +14,8 @@ import os
...
@@ -14,7 +14,8 @@ import os
from
typing
import
Union
,
List
from
typing
import
Union
,
List
from
.indices_functions
import
(
create_raw_ndvi
,
create_raw_ndwi
,
create_raw_ndwimcf
,
from
.indices_functions
import
(
create_raw_ndvi
,
create_raw_ndwi
,
create_raw_ndwimcf
,
create_raw_bigr
,
create_raw_bigr
,
create_raw_birnir
,
create_raw_bibg
,
create_raw_mndwi
,
create_masked_indice
,
index_tiff_2_jp2
)
create_masked_indice
,
index_tiff_2_jp2
)
from
.colormap
import
matplotlib_colormap_to_rgb
,
create_colormap
,
create_rvb
from
.colormap
import
matplotlib_colormap_to_rgb
,
create_colormap
,
create_rvb
...
@@ -327,7 +328,7 @@ class NdwiMcf(Indice):
...
@@ -327,7 +328,7 @@ class NdwiMcf(Indice):
class
BIGR
(
Indice
):
class
BIGR
(
Indice
):
"""
"""
Brightness Index Green Red = ( GREEN² + RED² ) ^ 0.5
Brightness Index Green Red = (
(
GREEN² + RED²
)/2
) ^ 0.5
GREEN: band 03
GREEN: band 03
RED: band 04
RED: band 04
...
@@ -401,10 +402,6 @@ class BIGR(Indice):
...
@@ -401,10 +402,6 @@ class BIGR(Indice):
logger
.
info
(
"
{} already exists
"
.
format
(
quicklook_filename
))
logger
.
info
(
"
{} already exists
"
.
format
(
quicklook_filename
))
else
:
else
:
logger
.
info
(
"
creating quicklook
"
)
logger
.
info
(
"
creating quicklook
"
)
# create_colormap(raster=(self.out_path / self.indice_filename),
# cloud_mask=self.l2a_product.user_cloud_mask,
# lut_dict=cmap, clouds_color="white",
# out_path=(self.out_path / quicklook_filename))
create_rvb
(
raster
=
(
self
.
out_path
/
self
.
indice_filename
),
create_rvb
(
raster
=
(
self
.
out_path
/
self
.
indice_filename
),
cloud_mask
=
self
.
l2a_product
.
user_cloud_mask
,
cloud_mask
=
self
.
l2a_product
.
user_cloud_mask
,
lut_dict
=
cmap
,
clouds_color
=
"
white
"
,
lut_dict
=
cmap
,
clouds_color
=
"
white
"
,
...
@@ -412,178 +409,250 @@ class BIGR(Indice):
...
@@ -412,178 +409,250 @@ class BIGR(Indice):
stretch
=
(
0
,
2500
))
stretch
=
(
0
,
2500
))
#~
class
IbRP
IR(Indice):
class
BIRN
IR
(
Indice
):
#~
"""
"""
#~ NDWI(McFeeters) = (GREEN-NIR) /
(
G
RE
EN+NIR)
Brightness Index Red Near InfraRed = (
(RE
D² + NIR²)/2 ) ^ 0.5
#~ GREEN
: band 0
3
NIR
: band 0
8
#~ NIR
: band 0
8
RED
: band 0
4
#~
"""
"""
#~
name = "
NDWIMCF
"
name
=
"
BIRNIR
"
#~
filename_template = "{product_identifier}_
NDWIMCF
{ext}"
filename_template
=
"
{product_identifier}_
BIRNIR
{ext}
"
#~
ext = ".jp2"
ext
=
"
.jp2
"
#~
ext_raw = ".tif"
ext_raw
=
"
.tif
"
#
colormap = cm.
RdYlBu
colormap
=
cm
.
afmhot
#~ colormap = cm.colors.LinearSegmentedColormap.from_list("", ["
green
", "white"
, "blue"
])
#~ colormap = cm.colors.LinearSegmentedColormap.from_list("", ["
black
", "white"])
#~
def __init__(self, l2a_product_object):
def
__init__
(
self
,
l2a_product_object
):
#~
if l2a_product_object is None:
if
l2a_product_object
is
None
:
#~
raise ValueError("A L2aProduct object must be provided")
raise
ValueError
(
"
A L2aProduct object must be provided
"
)
#~
else:
else
:
#~
self.l2a_product = l2a_product_object
self
.
l2a_product
=
l2a_product_object
#~
# output path
# output path
#~
self.out_path = None
self
.
out_path
=
None
#~
# filenames
# filenames
#~
self.indice_stem = self.filename_template.format(product_identifier=self.l2a_product.identifier, ext="")
self
.
indice_stem
=
self
.
filename_template
.
format
(
product_identifier
=
self
.
l2a_product
.
identifier
,
ext
=
""
)
#~
self.indice_filename = self.indice_stem + self.ext
self
.
indice_filename
=
self
.
indice_stem
+
self
.
ext
#~
self.indice_raw = self.indice_stem + self.ext_raw
self
.
indice_raw
=
self
.
indice_stem
+
self
.
ext_raw
#~
def process_indice(
def
process_indice
(
#~
self,
self
,
#~
out_path: pathlib.PosixPath,
out_path
:
pathlib
.
PosixPath
,
#~
reprocess: bool = False,
reprocess
:
bool
=
False
,
#~
nodata_clouds: bool = False,
nodata_clouds
:
bool
=
False
,
#~
quicklook: bool = False
quicklook
:
bool
=
False
#~
) -> None:
)
->
None
:
#~
""" process """
"""
process
"""
#~
self.out_path = out_path
self
.
out_path
=
out_path
#~
if (out_path / self.indice_filename).exists() and not reprocess:
if
(
out_path
/
self
.
indice_filename
).
exists
()
and
not
reprocess
:
#~
logger.info("{} already exists".format(self.indice_filename))
logger
.
info
(
"
{} already exists
"
.
format
(
self
.
indice_filename
))
#~
else:
else
:
#~
create_raw_
ndwimcf(nir
_path=self.l2a_product.b0
8
_10m,
create_raw_
birnir
(
red
_path
=
self
.
l2a_product
.
b0
4
_10m
,
#~ green
_path=self.l2a_product.b0
3
_10m,
nir
_path
=
self
.
l2a_product
.
b0
8
_10m
,
#~
out_path=(out_path / self.indice_raw))
out_path
=
(
out_path
/
self
.
indice_raw
))
#~
index_tiff_2_jp2(img_path=(out_path / self.indice_raw),
index_tiff_2_jp2
(
img_path
=
(
out_path
/
self
.
indice_raw
),
#~
out_path=(out_path / self.indice_filename))
out_path
=
(
out_path
/
self
.
indice_filename
))
#~
if nodata_clouds:
if
nodata_clouds
:
#~
if self.l2a_product.user_cloud_mask is None:
if
self
.
l2a_product
.
user_cloud_mask
is
None
:
#~
raise ValueError("Cloud mask does not exist")
raise
ValueError
(
"
Cloud mask does not exist
"
)
#~
masked_indice_filename = self.indice_stem + "_MASKED" + self.ext
masked_indice_filename
=
self
.
indice_stem
+
"
_MASKED
"
+
self
.
ext
#~
masked_indice_raw = self.indice_stem + "_MASKED" + self.ext_raw
masked_indice_raw
=
self
.
indice_stem
+
"
_MASKED
"
+
self
.
ext_raw
#~
if (out_path / masked_indice_filename).exists() and not reprocess:
if
(
out_path
/
masked_indice_filename
).
exists
()
and
not
reprocess
:
#~
logger.info("{} already exists".format(masked_indice_filename))
logger
.
info
(
"
{} already exists
"
.
format
(
masked_indice_filename
))
#~
else:
else
:
#~
if (out_path / self.indice_raw).exists():
if
(
out_path
/
self
.
indice_raw
).
exists
():
#~ ndwimcf
_name = (out_path / self.indice_raw)
birnir
_name
=
(
out_path
/
self
.
indice_raw
)
#~
else:
else
:
#~ ndwimcf
_name = (out_path / self.indice_filename)
birnir
_name
=
(
out_path
/
self
.
indice_filename
)
#~
create_masked_indice(indice_path=
ndwimcf
_name,
create_masked_indice
(
indice_path
=
birnir
_name
,
#~
cloud_mask_path=self.l2a_product.user_cloud_mask,
cloud_mask_path
=
self
.
l2a_product
.
user_cloud_mask
,
#~
out_path=(out_path / masked_indice_raw))
out_path
=
(
out_path
/
masked_indice_raw
))
#~
index_tiff_2_jp2(img_path=(out_path / masked_indice_raw),
index_tiff_2_jp2
(
img_path
=
(
out_path
/
masked_indice_raw
),
#~
out_path=(out_path / masked_indice_filename))
out_path
=
(
out_path
/
masked_indice_filename
))
#~
os.remove(str(out_path / masked_indice_raw))
os
.
remove
(
str
(
out_path
/
masked_indice_raw
))
#~ if quicklook:
if
quicklook
:
#~ cmap = matplotlib_colormap_to_rgb(self.colormap, revers=False)
cmap
=
matplotlib_colormap_to_rgb
(
self
.
colormap
,
revers
=
False
)
#~ quicklook_filename = self.indice_stem + "_QUICKLOOK.tif"
quicklook_filename
=
self
.
indice_stem
+
"
_QUICKLOOK.tif
"
#~ if (self.out_path / quicklook_filename).exists() and not reprocess:
if
(
self
.
out_path
/
quicklook_filename
).
exists
()
and
not
reprocess
:
#~ logger.info("{} already exists".format(quicklook_filename))
logger
.
info
(
"
{} already exists
"
.
format
(
quicklook_filename
))
#~ else:
else
:
#~ logger.info("creating quicklook")
logger
.
info
(
"
creating quicklook
"
)
#~ # create_colormap(raster=(self.out_path / self.indice_filename),
create_rvb
(
raster
=
(
self
.
out_path
/
self
.
indice_filename
),
#~ # cloud_mask=self.l2a_product.user_cloud_mask,
cloud_mask
=
self
.
l2a_product
.
user_cloud_mask
,
#~ # lut_dict=cmap, clouds_color="white",
lut_dict
=
cmap
,
clouds_color
=
"
white
"
,
#~ # out_path=(self.out_path / quicklook_filename))
out_path
=
(
self
.
out_path
/
quicklook_filename
),
#~ create_rvb(raster=(self.out_path / self.indice_filename),
stretch
=
(
0
,
5000
))
#~ cloud_mask=self.l2a_product.user_cloud_mask,
#~ lut_dict=cmap, clouds_color="white",
#~ out_path=(self.out_path / quicklook_filename))
class
BIBG
(
Indice
):
"""
Brightness Index Blue Green = ( (BLUE² + GREEN²)/2 ) ^ 0.5
BLUE: band 02
GREEN: band 03
"""
name
=
"
BIBG
"
filename_template
=
"
{product_identifier}_BIBG{ext}
"
ext
=
"
.jp2
"
ext_raw
=
"
.tif
"
colormap
=
cm
.
bone
#~ colormap = cm.colors.LinearSegmentedColormap.from_list("", ["black", "white"])
def
__init__
(
self
,
l2a_product_object
):
if
l2a_product_object
is
None
:
raise
ValueError
(
"
A L2aProduct object must be provided
"
)
else
:
self
.
l2a_product
=
l2a_product_object
# output path
self
.
out_path
=
None
# filenames
self
.
indice_stem
=
self
.
filename_template
.
format
(
product_identifier
=
self
.
l2a_product
.
identifier
,
ext
=
""
)
self
.
indice_filename
=
self
.
indice_stem
+
self
.
ext
self
.
indice_raw
=
self
.
indice_stem
+
self
.
ext_raw
def
process_indice
(
self
,
out_path
:
pathlib
.
PosixPath
,
reprocess
:
bool
=
False
,
nodata_clouds
:
bool
=
False
,
quicklook
:
bool
=
False
)
->
None
:
"""
process
"""
self
.
out_path
=
out_path
if
(
out_path
/
self
.
indice_filename
).
exists
()
and
not
reprocess
:
logger
.
info
(
"
{} already exists
"
.
format
(
self
.
indice_filename
))
else
:
create_raw_bibg
(
blue_path
=
self
.
l2a_product
.
b02_10m
,
green_path
=
self
.
l2a_product
.
b03_10m
,
out_path
=
(
out_path
/
self
.
indice_raw
))
index_tiff_2_jp2
(
img_path
=
(
out_path
/
self
.
indice_raw
),
out_path
=
(
out_path
/
self
.
indice_filename
))
if
nodata_clouds
:
if
self
.
l2a_product
.
user_cloud_mask
is
None
:
raise
ValueError
(
"
Cloud mask does not exist
"
)
masked_indice_filename
=
self
.
indice_stem
+
"
_MASKED
"
+
self
.
ext
masked_indice_raw
=
self
.
indice_stem
+
"
_MASKED
"
+
self
.
ext_raw
#~ class Mndwi(Indice):
if
(
out_path
/
masked_indice_filename
).
exists
()
and
not
reprocess
:
#~ """
logger
.
info
(
"
{} already exists
"
.
format
(
masked_indice_filename
))
#~ NDWI(McFeeters) = (GREEN-NIR) / (GREEN+NIR)
else
:
if
(
out_path
/
self
.
indice_raw
).
exists
():
#~ GREEN: band 03
bibg_name
=
(
out_path
/
self
.
indice_raw
)
#~ NIR: band 08
else
:
#~ """
bibg_name
=
(
out_path
/
self
.
indice_filename
)
#~ name = "NDWIMCF"
create_masked_indice
(
indice_path
=
bibg_name
,
#~ filename_template = "{product_identifier}_NDWIMCF{ext}"
cloud_mask_path
=
self
.
l2a_product
.
user_cloud_mask
,
#~ ext = ".jp2"
out_path
=
(
out_path
/
masked_indice_raw
))
#~ ext_raw = ".tif"
index_tiff_2_jp2
(
img_path
=
(
out_path
/
masked_indice_raw
),
#colormap = cm.RdYlBu
out_path
=
(
out_path
/
masked_indice_filename
))
#~ colormap = cm.colors.LinearSegmentedColormap.from_list("", ["green", "white", "blue"])
os
.
remove
(
str
(
out_path
/
masked_indice_raw
))
#~ def __init__(self, l2a_product_object):
#~ if l2a_product_object is None:
#~ raise ValueError("A L2aProduct object must be provided")
#~ else:
#~ self.l2a_product = l2a_product_object
#~ # output path
#~ self.out_path = None
#~ # filenames
#~ self.indice_stem = self.filename_template.format(product_identifier=self.l2a_product.identifier, ext="")
#~ self.indice_filename = self.indice_stem + self.ext
#~ self.indice_raw = self.indice_stem + self.ext_raw
#~ def process_indice(
#~ self,
#~ out_path: pathlib.PosixPath,
#~ reprocess: bool = False,
#~ nodata_clouds: bool = False,
#~ quicklook: bool = False
#~ ) -> None:
#~ """ process """
#~ self.out_path = out_path
#~ if (out_path / self.indice_filename).exists() and not reprocess:
#~ logger.info("{} already exists".format(self.indice_filename))
#~ else:
#~ create_raw_ndwimcf(nir_path=self.l2a_product.b08_10m,
#~ green_path=self.l2a_product.b03_10m,
#~ out_path=(out_path / self.indice_raw))
#~ index_tiff_2_jp2(img_path=(out_path / self.indice_raw),
#~ out_path=(out_path / self.indice_filename))
#~ if nodata_clouds:
#~ if self.l2a_product.user_cloud_mask is None:
#~ raise ValueError("Cloud mask does not exist")
#~ masked_indice_filename = self.indice_stem + "_MASKED" + self.ext
#~ masked_indice_raw = self.indice_stem + "_MASKED" + self.ext_raw
#~ if (out_path / masked_indice_filename).exists() and not reprocess:
#~ logger.info("{} already exists".format(masked_indice_filename))
#~ else:
#~ if (out_path / self.indice_raw).exists():
#~ ndwimcf_name = (out_path / self.indice_raw)
#~ else:
#~ ndwimcf_name = (out_path / self.indice_filename)
#~ create_masked_indice(indice_path=ndwimcf_name,
#~ cloud_mask_path=self.l2a_product.user_cloud_mask,
#~ out_path=(out_path / masked_indice_raw))
#~ index_tiff_2_jp2(img_path=(out_path / masked_indice_raw),
#~ out_path=(out_path / masked_indice_filename))
#~ os.remove(str(out_path / masked_indice_raw))
#~ if quicklook:
if
quicklook
:
#~ cmap = matplotlib_colormap_to_rgb(self.colormap, revers=False)
cmap
=
matplotlib_colormap_to_rgb
(
self
.
colormap
,
revers
=
False
)
#~ quicklook_filename = self.indice_stem + "_QUICKLOOK.tif"
quicklook_filename
=
self
.
indice_stem
+
"
_QUICKLOOK.tif
"
#~ if (self.out_path / quicklook_filename).exists() and not reprocess:
if
(
self
.
out_path
/
quicklook_filename
).
exists
()
and
not
reprocess
:
#~ logger.info("{} already exists".format(quicklook_filename))
logger
.
info
(
"
{} already exists
"
.
format
(
quicklook_filename
))
#~ else:
else
:
#~ logger.info("creating quicklook")
logger
.
info
(
"
creating quicklook
"
)
#~ # create_colormap(raster=(self.out_path / self.indice_filename),
create_rvb
(
raster
=
(
self
.
out_path
/
self
.
indice_filename
),
#~ # cloud_mask=self.l2a_product.user_cloud_mask,
cloud_mask
=
self
.
l2a_product
.
user_cloud_mask
,
#~ # lut_dict=cmap, clouds_color="white",
lut_dict
=
cmap
,
clouds_color
=
"
white
"
,
#~ # out_path=(self.out_path / quicklook_filename))
out_path
=
(
self
.
out_path
/
quicklook_filename
),
#~ create_rvb(raster=(self.out_path / self.indice_filename),
stretch
=
(
0
,
2500
))
#~ cloud_mask=self.l2a_product.user_cloud_mask,
#~ lut_dict=cmap, clouds_color="white",
#~ out_path=(self.out_path / quicklook_filename))
class
Mndwi
(
Indice
):
"""
MNDWI = (GREEN-SWIR) / (GREEN+SWIR)
GREEN: band 03
SWIR: band 11
"""
name
=
"
MNDWI
"
filename_template
=
"
{product_identifier}_MNDWI{ext}
"
ext
=
"
.jp2
"
ext_raw
=
"
.tif
"
colormap
=
cm
.
BrBG
def
__init__
(
self
,
l2a_product_object
):
if
l2a_product_object
is
None
:
raise
ValueError
(
"
A L2aProduct object must be provided
"
)
else
:
self
.
l2a_product
=
l2a_product_object
# output path
self
.
out_path
=
None
# filenames
self
.
indice_stem
=
self
.
filename_template
.
format
(
product_identifier
=
self
.
l2a_product
.
identifier
,
ext
=
""
)
self
.
indice_filename
=
self
.
indice_stem
+
self
.
ext
self
.
indice_raw
=
self
.
indice_stem
+
self
.
ext_raw
def
process_indice
(
self
,
out_path
:
pathlib
.
PosixPath
,
reprocess
:
bool
=
False
,
nodata_clouds
:
bool
=
False
,
quicklook
:
bool
=
False
)
->
None
:
"""
process
"""
self
.
out_path
=
out_path
if
(
out_path
/
self
.
indice_filename
).
exists
()
and
not
reprocess
:
logger
.
info
(
"
{} already exists
"
.
format
(
self
.
indice_filename
))
else
:
create_raw_mndwi
(
green_path
=
self
.
l2a_product
.
b03_10m
,
swir_path
=
self
.
l2a_product
.
b11_20m
,
out_path
=
(
out_path
/
self
.
indice_raw
))
index_tiff_2_jp2
(
img_path
=
(
out_path
/
self
.
indice_raw
),
out_path
=
(
out_path
/
self
.
indice_filename
))
if
nodata_clouds
:
if
self
.
l2a_product
.
user_cloud_mask
is
None
:
raise
ValueError
(
"
Cloud mask does not exist
"
)
masked_indice_filename
=
self
.
indice_stem
+
"
_MASKED
"
+
self
.
ext
masked_indice_raw
=
self
.
indice_stem
+
"
_MASKED
"
+
self
.
ext_raw
if
(
out_path
/
masked_indice_filename
).
exists
()
and
not
reprocess
:
logger
.
info
(
"
{} already exists
"
.
format
(
masked_indice_filename
))
else
:
if
(
out_path
/
self
.
indice_raw
).
exists
():
mndwi_name
=
(
out_path
/
self
.
indice_raw
)
else
:
mndwi_name
=
(
out_path
/
self
.
indice_filename
)
create_masked_indice
(
indice_path
=
mndwi_name
,
cloud_mask_path
=
self
.
l2a_product
.
user_cloud_mask
,
out_path
=
(
out_path
/
masked_indice_raw
))
index_tiff_2_jp2
(
img_path
=
(
out_path
/
masked_indice_raw
),
out_path
=
(
out_path
/
masked_indice_filename
))
os
.
remove
(
str
(
out_path
/
masked_indice_raw
))
if
quicklook
:
cmap
=
matplotlib_colormap_to_rgb
(
self
.
colormap
,
revers
=
False
)
quicklook_filename
=
self
.
indice_stem
+
"
_QUICKLOOK.tif
"
if
(
self
.
out_path
/
quicklook_filename
).
exists
()
and
not
reprocess
:
logger
.
info
(
"
{} already exists
"
.
format
(
quicklook_filename
))
else
:
logger
.
info
(
"
creating quicklook
"
)
create_rvb
(
raster
=
(
self
.
out_path
/
self
.
indice_filename
),
cloud_mask
=
self
.
l2a_product
.
user_cloud_mask
,
lut_dict
=
cmap
,
clouds_color
=
"
white
"
,
out_path
=
(
self
.
out_path
/
quicklook_filename
))
class
IndicesCollectionMeta
(
type
):
class
IndicesCollectionMeta
(
type
):
...
...
This diff is collapsed.
Click to expand it.
sen2chain/indices_functions.py
+
118
−
0
View file @
d05895f0
...
@@ -151,6 +151,7 @@ def create_raw_ndwimcf(nir_path: Union[str, pathlib.PosixPath],
...
@@ -151,6 +151,7 @@ def create_raw_ndwimcf(nir_path: Union[str, pathlib.PosixPath],
dst
.
write
(
ndwimcf_masked
,
1
)
dst
.
write
(
ndwimcf_masked
,
1
)
return
Path
(
str
(
out_path
)).
absolute
return
Path
(
str
(
out_path
)).
absolute
def
create_raw_bigr
(
red_path
:
Union
[
str
,
pathlib
.
PosixPath
],
def
create_raw_bigr
(
red_path
:
Union
[
str
,
pathlib
.
PosixPath
],
green_path
:
Union
[
str
,
pathlib
.
PosixPath
],
green_path
:
Union
[
str
,
pathlib
.
PosixPath
],
out_path
:
Union
[
str
,
pathlib
.
PosixPath
]
=
"
./raw_bigr.tif
"
)
->
pathlib
.
PosixPath
:
out_path
:
Union
[
str
,
pathlib
.
PosixPath
]
=
"
./raw_bigr.tif
"
)
->
pathlib
.
PosixPath
:
...
@@ -185,6 +186,123 @@ def create_raw_bigr(red_path: Union[str, pathlib.PosixPath],
...
@@ -185,6 +186,123 @@ def create_raw_bigr(red_path: Union[str, pathlib.PosixPath],
return
Path
(
str
(
out_path
)).
absolute
return
Path
(
str
(
out_path
)).
absolute
def
create_raw_birnir
(
red_path
:
Union
[
str
,
pathlib
.
PosixPath
],
nir_path
:
Union
[
str
,
pathlib
.
PosixPath
],
out_path
:
Union
[
str
,
pathlib
.
PosixPath
]
=
"
./raw_birnir.tif
"
)
->
pathlib
.
PosixPath
:
"""
Creates a BI (Red, NIR) raster from RED and NIR rasters.
:param red_path: path to the RED raster.
:param nir_path: path to the NIR raster.
:param out_path: path to the output raster.
"""
logger
.
info
(
"
creating raw BIRNIR (tiff - int16)
"
)
with
rasterio
.
open
(
str
(
red_path
))
as
red_src
,
\
rasterio
.
open
(
str
(
nir_path
))
as
nir_src
:
red_profile
=
red_src
.
profile
red
=
red_src
.
read
(
1
).
astype
(
np
.
float32
)
nir
=
nir_src
.
read
(
1
).
astype
(
np
.
float32
)
np
.
seterr
(
divide
=
'
ignore
'
,
invalid
=
'
ignore
'
)
# ignore warnings when dividing by zero
birnir
=
((((
nir
)
**
2
+
(
red
)
**
2
)
/
2
)
**
0.5
).
astype
(
np
.
int16
)
birnir_masked
=
np
.
where
(
red
!=
0
,
birnir
,
32767
)
red_profile
.
update
(
driver
=
"
Gtiff
"
,
compress
=
"
DEFLATE
"
,
tiled
=
False
,
dtype
=
np
.
int16
,
nodata
=
32767
,
transform
=
red_src
.
transform
)
red_profile
.
pop
(
'
tiled
'
,
None
)
with
rasterio
.
Env
(
GDAL_CACHEMAX
=
512
)
as
env
:
with
rasterio
.
open
(
str
(
out_path
),
"
w
"
,
**
red_profile
)
as
dst
:
dst
.
write
(
birnir_masked
,
1
)
return
Path
(
str
(
out_path
)).
absolute
def
create_raw_bibg
(
blue_path
:
Union
[
str
,
pathlib
.
PosixPath
],
green_path
:
Union
[
str
,
pathlib
.
PosixPath
],
out_path
:
Union
[
str
,
pathlib
.
PosixPath
]
=
"
./raw_bibg.tif
"
)
->
pathlib
.
PosixPath
:
"""
Creates a BI (Blue, Green) raster from BLUE and GREEN rasters.
:param blue_path: path to the BLUE raster.
:param green_path: path to the GREEN raster.
:param out_path: path to the output raster.
"""
logger
.
info
(
"
creating raw BIBG (tiff - int16)
"
)
with
rasterio
.
open
(
str
(
blue_path
))
as
blue_src
,
\
rasterio
.
open
(
str
(
green_path
))
as
green_src
:
blue_profile
=
blue_src
.
profile
blue
=
blue_src
.
read
(
1
).
astype
(
np
.
float32
)
green
=
green_src
.
read
(
1
).
astype
(
np
.
float32
)
np
.
seterr
(
divide
=
'
ignore
'
,
invalid
=
'
ignore
'
)
# ignore warnings when dividing by zero
bibg
=
((((
green
)
**
2
+
(
blue
)
**
2
)
/
2
)
**
0.5
).
astype
(
np
.
int16
)
bibg_masked
=
np
.
where
(
blue
!=
0
,
bibg
,
32767
)
blue_profile
.
update
(
driver
=
"
Gtiff
"
,
compress
=
"
DEFLATE
"
,
tiled
=
False
,
dtype
=
np
.
int16
,
nodata
=
32767
,
transform
=
blue_src
.
transform
)
blue_profile
.
pop
(
'
tiled
'
,
None
)
with
rasterio
.
Env
(
GDAL_CACHEMAX
=
512
)
as
env
:
with
rasterio
.
open
(
str
(
out_path
),
"
w
"
,
**
blue_profile
)
as
dst
:
dst
.
write
(
bibg_masked
,
1
)
return
Path
(
str
(
out_path
)).
absolute
def
create_raw_mndwi
(
green_path
:
Union
[
str
,
pathlib
.
PosixPath
],
swir_path
:
Union
[
str
,
pathlib
.
PosixPath
],
out_path
:
Union
[
str
,
pathlib
.
PosixPath
]
=
"
./raw_mndwi.tif
"
)
->
pathlib
.
PosixPath
:
"""
Creates a MNDWI raster from GREEN and SWIR rasters.
:param green_path: path to the GREEN raster.
:param swir_path: path to the SWIR raster.
:param out_path: path to the output raster.
"""
logger
.
info
(
"
creating raw MNDWI (tiff - int16)
"
)
with
rasterio
.
open
(
str
(
green_path
))
as
green_src
,
\
rasterio
.
open
(
str
(
swir_path
))
as
swir_src
:
green_profile
=
green_src
.
profile
# swir_profile = swir_src.profile
green
=
green_src
.
read
(
1
).
astype
(
np
.
float32
)
swir
=
swir_src
.
read
(
1
).
astype
(
np
.
float32
)
# reproject swir band (20m) to nir band resolution (10m)
swir_reproj
=
np
.
empty
(
green
.
shape
,
dtype
=
np
.
float32
)
reproject
(
source
=
swir
,
destination
=
swir_reproj
,
src_transform
=
swir_src
.
transform
,
src_crs
=
swir_src
.
crs
,
dst_transform
=
green_src
.
transform
,
dst_crs
=
green_src
.
crs
,
resampling
=
Resampling
.
nearest
)
np
.
seterr
(
divide
=
'
ignore
'
,
invalid
=
'
ignore
'
)
# ignore warnings when dividing by zero
ndwi
=
((
green
-
swir_reproj
)
/
(
green
+
swir_reproj
)
*
10000
).
astype
(
np
.
int16
)
ndwi_masked
=
np
.
where
(
green
!=
0
,
ndwi
,
32767
)
green_profile
.
update
(
driver
=
"
Gtiff
"
,
compress
=
"
DEFLATE
"
,
tiled
=
False
,
dtype
=
np
.
int16
,
nodata
=
32767
,
transform
=
green_src
.
transform
)
green_profile
.
pop
(
'
tiled
'
,
None
)
with
rasterio
.
Env
(
GDAL_CACHEMAX
=
512
)
as
env
:
with
rasterio
.
open
(
str
(
out_path
),
"
w
"
,
**
green_profile
)
as
dst
:
dst
.
write
(
ndwi_masked
,
1
)
return
Path
(
str
(
out_path
)).
absolute
def
create_masked_indice
(
def
create_masked_indice
(
indice_path
:
Union
[
str
,
pathlib
.
PosixPath
],
indice_path
:
Union
[
str
,
pathlib
.
PosixPath
],
cloud_mask_path
:
Union
[
str
,
pathlib
.
PosixPath
],
cloud_mask_path
:
Union
[
str
,
pathlib
.
PosixPath
],
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment