Skip to content
Snippets Groups Projects
Commit 24f7e245 authored by juliettemiranville's avatar juliettemiranville
Browse files

checking what is already extracted and data can be compiled

parent 6fac6d0a
Branches
No related tags found
No related merge requests found
......@@ -3,7 +3,10 @@
"""
Module for computing time series.
"""
import os
import csv
import hashlib
import zipfile
import logging
import pathlib
from pathlib import Path
......@@ -26,9 +29,10 @@ from typing import Sequence, List, Dict, Union
import matplotlib.pyplot as plt
import numpy as np
import math
from datetime import timedelta
from datetime import datetime, timedelta
import time
import multiprocessing
import shutil
import itertools
from functools import partial
......@@ -40,7 +44,11 @@ from .products import L2aProduct, IndiceProduct
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
logging.basicConfig(
level=logging.INFO,
filename=Config().get("extraction_path")+ "/app.log",
filemode="a"
)
# logging.basicConfig(format='%(process)s %(asctime)s %(levelname)s:%(module)s:%(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
logging.getLogger().handlers[0].setFormatter(
logging.Formatter(
......@@ -52,14 +60,12 @@ logging.getLogger().handlers[0].setFormatter(
class TimeSeries:
"""Class for time series extraction.
:param vectors_file: path to the vector file.
:param indices: list of valid indices names.
:param date_min: starting date of the time series (YYYY-MM-DD format).
:param date_max: ending date of the time series (YYYY-MM-DD format).
:param cover_min: minimum cloud cover value. Default: 0.
:param cover_max: minimum cloud cover value. Default: 100.
Usage:
>>> ts = TimeSeries(
"polygons.geojson",
......@@ -79,7 +85,7 @@ class TimeSeries:
cover_max: int = 100,
field_names: str = None,
multiproc: int = 0,
getstat: bool = True,
#getstat: bool = True,
cm_version: str = "CM001",
probability: int = 1,
iterations: int = 5,
......@@ -90,6 +96,7 @@ class TimeSeries:
) -> None:
self._vectors_file = Path(vectors_file)
self._checksum = self.calculate_checksum()
self._date_min = date_min
self._date_max = date_max
self._cover_min = cover_min
......@@ -105,6 +112,8 @@ class TimeSeries:
self.cld_hi_prob = cld_hi_prob
self.thin_cir = thin_cir
if(date_min is None and date_max is not None) or (date_min is not None and date_max is None):
raise ValueError("If you specify datemin or datemax, you must specify both")
if indices is None:
self._indices_list = IndicesCollection.list
else:
......@@ -117,24 +126,93 @@ class TimeSeries:
self._key = self._field_names
else:
self._key = "fid"
self._tiles_geoms = self._get_tiles_geom_dict()
self._df_dicts = dict()
if getstat is True:
path = Path(self._vectors_file)
existing_folder_name = None
for d in Path(self._out_path).glob('*'):
if d.is_dir() and self._checksum in d.name:
existing_folder_name = d.name
break
if existing_folder_name:
self._data_name = existing_folder_name.split("_")[0]
logger.info(" Vous avez déjà executer ce shp")
else:
self._data_name = self._vectors_file.stem
folder = self._data_name + "_" + self._checksum
logger.info(" Vous trouverez vos données dans le dossier {}".format(folder))
folder_cp = Path(self._out_path) / folder / "shp_files"
folder_cp.mkdir(parents=True, exist_ok=True)
for ext in ['.shp', '.cpg', '.dbf', '.prj', '.shx']:
file_to_cp = path.with_suffix(ext)
shutil.copy2(file_to_cp, folder_cp)
def calculate_checksum(self):
#getting all the files :
temp_path = Config().get("temp_path")
path = Path(self._vectors_file)
#checksum :
sha256_hash = hashlib.sha256()
for ext in ['.shp', '.cpg', '.dbf', '.prj', '.shx']:
with open(path.with_suffix(ext), "rb") as f:
while True:
data = f.read(65536)
if not data:
break
sha256_hash.update(data)
checksum = sha256_hash.hexdigest()
print(checksum)
return checksum
def extract_data(self,getstat: bool = True): # a tester avec un très gros jeu de données. avec des petites plages de dates, plus rapide avec les listes
start = time.time()
if getstat:
if self._multiproc:
self._get_stats_multiproc()
else:
self._get_stats()
end = time.time()
logger.info(
"Execution time: {}".format(timedelta(seconds=end - start))
)
@property
def data(self) -> Dict[str, pd.DataFrame]:
"""Returns a dictionnary with indices as keys and pandas.DataFrame as values."""
return self._df_dicts
df = dict()
if self.cm_version == "CM001":
cm = "CM001"
elif self.cm_version == "CM002":
cm = "CM002-B11"
elif self.cm_version == "CM003":
cm = str("CM003" +
"-PRB" + str(self.probability) +
"-ITER" + str(self.iterations))
elif self.cm_version == "CM004":
cm = str("CM004" +
"-CSH" + str(1 * self.cld_shad) +
"-CMP" + str(1 * self.cld_med_prob) +
"-CHP" + str(1 * self.cld_hi_prob) +
"-TCI" + str(1 * self.thin_cir) +
"-ITER" + str(1 * self.iterations))
name= self._data_name + "_" + self._checksum
compiled_data = Path(self._out_path) / name / "compiled_data"
if not os.path.isdir(compiled_data):
self.result_csv(alldata=True)
for indice in os.listdir(compiled_data):
path_file = Path(compiled_data)/indice / cm / "all_data" /f'{self._data_name}_CC{self._cover_max}_all_data.csv'
if os.path.isfile(path_file):
df[indice] = pd.read_csv(str(path_file), dtype=object, index_col = 0)
df[indice][['count','nodata','nbcld','cldprb','min','max','mean','std','median','percentile_25','percentile_75','id']] = df[indice][['count','nodata','nbcld','cldprb','min','max','mean','std','median','percentile_25','percentile_75','id']].apply(pd.to_numeric,errors='coerce')
else:
logger.info("All data file doesn't exist for {},{},{}, please check info or try extract_data".format(indice, cm, self._cover_max))
return df
@staticmethod
def _is_indice_in_tile(indice: str, tile: Tile) -> bool:
"""Does the tile has a folder for this indice.
:param indice: indice name.
:param tile: Tile object.
"""
......@@ -181,7 +259,6 @@ class TimeSeries:
:param cover_min: minimum cloud coverage.
:param cover_max: maximum cloud coverage.
"""
#test filtering with cm
products = getattr(vars(tile)[
indice.lower()
].masks,cm_version).params(
......@@ -205,7 +282,6 @@ class TimeSeries:
cloud_proba_path: str = None,
) -> Dict:
"""Extracts statistics from a raster in a geometry.
:param feature: GeoJSON like object.
:param raster_path: path to the raster.
"""
......@@ -216,11 +292,6 @@ class TimeSeries:
feature["geometry"],
)
)
# with rasterio.open(str(raster_path)) as raster_src:
# raster_profile = raster_src.profile
# logger.info(raster_profile)
stats = zonal_stats(
geom,
str(raster_path),
......@@ -236,34 +307,14 @@ class TimeSeries:
"percentile_75",
],
)[0]
# logger.info(stats)
if cloud_path:
# def mycount(x):
# return np.count_nonzero(x == 1)
# with rasterio.open(str(cloud_path)) as cld_src:
# cld_profile = cld_src.profile
# cld_array = cld_src.read(1, out_shape = (10980, 10980), resampling=Resampling.nearest)
# cld_array = cld_src.read(1)
# logger.info("shape: {}".format(cld_array.shape))
# stats_cld = zonal_stats(geom, cld_array, affine=raster_profile["transform"] , stats=["count", "nodata"], add_stats={'cldcount':mycount})
# stats_cld = zonal_stats(geom, cld_array, affine=raster_profile["transform"], categorical=True)[0]
# stats_cld = zonal_stats(geom, str(cloud_path), nodata = 16383.0, categorical=True)[0]
# stats_cld = zonal_stats(geom, str(cloud_path), stats=["count", "nodata"], add_stats={'cldcount':mycount})
# stats_cld = zonal_stats(geom, str(cloud_path), categorical=True, nodata = 16383.0, category_map= {-999: 'tttttttttttt', 1.0: 'clds', 0.0:'noclds', 16383.0: 'nnnnnnn'})[0]
stats_cld = zonal_stats(
geom, str(cloud_path), stats=["count", "nodata"]
)[0]
# logger.info(stats_cld)
try:
nbcld = stats["nodata"] - stats_cld["nodata"]
except:
nbcld = 0
# logger.info(stats_cld)
# logger.info(cld_pct)
stats["nbcld"] = nbcld
if cloud_proba_path:
......@@ -272,14 +323,27 @@ class TimeSeries:
)[0]
stats["cldprb"] = stats_cld_prb["mean"]
logger.info(stats)
# mettre ici le cloud mask !
return stats
def _get_stats(self) -> None:
start = time.time()
"""Compute stats in polygons."""
folder= self._data_name + "_" + self._checksum
if self.cm_version == "CM001":
cm = "CM001"
elif self.cm_version == "CM002":
cm = "CM002-B11"
elif self.cm_version == "CM003":
cm = str("CM003" +
"-PRB" + str(self.probability) +
"-ITER" + str(self.iterations))
elif self.cm_version == "CM004":
cm = str("CM004" +
"-CSH" + str(1 * self.cld_shad) +
"-CMP" + str(1 * self.cld_med_prob) +
"-CHP" + str(1 * self.cld_hi_prob) +
"-TCI" + str(1 * self.thin_cir) +
"-ITER" + str(1 * self.iterations))
with fiona.open(str(self._vectors_file), "r") as vectors:
features = {feat["id"]: feat for feat in vectors}
for index2, indice in enumerate(self._indices_list):
......@@ -289,8 +353,7 @@ class TimeSeries:
)
)
rows_list = []
# for tile, fid_list in self._tiles_geoms.items():
products_data = set(['_'.join(f.stem.split('_')[1:])+".jp2" for f in (Path(self._out_path) / folder / "raw_data" / indice / cm).glob('*') if f.is_file()])
for index3, tile in enumerate(self._tiles_geoms):
fid_list = self._tiles_geoms[tile]
tile_obj = Tile(tile)
......@@ -300,7 +363,6 @@ class TimeSeries:
index3 + 1, len(self._tiles_geoms), tile
)
)
if TimeSeries._is_indice_in_tile(indice, tile_obj):
tile_indice_path = tile_obj.paths["indices"][
indice.lower()
......@@ -320,24 +382,9 @@ class TimeSeries:
self.cld_hi_prob,
self.thin_cir,
)
products._dict = {cle: valeur for cle, valeur in products._dict.items() if cle not in products_data}
for index1, prod in enumerate(products):
indice_product = IndiceProduct(prod.identifier)
# prod_path = str(indice_product.path)
# prod_path_unmasked = prod_path.replace(
# "_CM001", ""
# )
# l2a = L2aProduct(indice_product.l2a)
# if l2a.path.exists():
# prod_path_cloud_proba = l2a.msk_cldprb_20m
# else:
# prod_path_cloud_proba = None
# prod_path = tile_indice_path / prod.identifier[:(-12 - len(indice))] / prod.identifier
# cloud_path = tile_obj.paths["l2a"] / (prod.identifier[:(-12 - len(indice))] + "_CLOUD_MASK.jp2")
# prod_path_unmasked = tile_indice_path / prod.identifier[:(-12 - len(indice))] / (prod.identifier[:-11] + '.jp2')
# prod_path_cloud_proba = L2aProduct(prod.identifier[:(-12 - len(indice))]).msk_cldprb_20m
# logger.info(prod_path_cloud_proba)
logger.info(
"Product {}/{}: {}".format(
index1 + 1, len(products), prod.identifier
......@@ -348,10 +395,6 @@ class TimeSeries:
for index, fid in enumerate(fid_list):
df_dict = OrderedDict()
df_dict["fid"] = fid
# feat_properties = features[fid]["properties"]
# if feat_properties:
# df_dict.update(feat_properties)
df_dict.update(
TimeSeries._get_raster_stats_in_geom(
features[fid],
......@@ -360,8 +403,6 @@ class TimeSeries:
indice_product.msk_cldprb_20m,
)
)
# df_properties = features[fid]["properties"]
df_dict["date"] = prod.date
df_dict["tile"] = tile
df_dict["filename"] = prod.identifier
......@@ -370,10 +411,12 @@ class TimeSeries:
df_dict[prop] = features[fid][
"properties"
][prop]
rows_list.append(df_dict)
if rows_list:
self._df_dicts[indice] = TimeSeries._list_to_df(rows_list)
self.to_csv()
rows_list.clear()
self._df_dicts.clear()
end = time.time()
logger.info(
"Execution time: {}".format(timedelta(seconds=end - start))
......@@ -381,24 +424,6 @@ class TimeSeries:
def _raster_stats_multi(self, features, shared_list, proc_item):
indice_product = IndiceProduct(proc_item[0].identifier)
# logger.info(
# "masque nuages path: {} - {}".format(
# indice_product.identifier,
# str(indice_product.msk_cldprb_20m)
# )
# )
# prod_path = str(indice_product.path)
# prod_path_unmasked = prod_path.replace("_CM001", "")
# l2a = L2aProduct(indice_product.l2a)
# if l2a.path.exists():
# prod_path_cloud_proba = l2a.msk_cldprb_20m
# else:
# prod_path_cloud_proba = None
# prod_path = proc_item[2] / proc_item[0].identifier[:(-12 - len(proc_item[3]))] / proc_item[0].identifier
# prod_path_unmasked = proc_item[2] / proc_item[0].identifier[:(-12 - len(proc_item[3]))] / (proc_item[0].identifier[:-11] + '.jp2')
# prod_path_cloud_proba = L2aProduct(proc_item[0].identifier[:(-12 - len(proc_item[3]))]).msk_cldprb_20m
# logger.info(prod_path_cloud_proba)
fid = proc_item[1]
result_dict = OrderedDict()
result_dict["fid"] = fid
......@@ -414,17 +439,28 @@ class TimeSeries:
result_dict["tile"] = proc_item[4]
result_dict["filename"] = proc_item[0].identifier
for prop in features[fid]["properties"]:
# if type(features[fid]["properties"][prop]) == float:
# result_dict[prop] = "{:.6f}".format(features[fid]["properties"][prop])
# logger.info("toto {}".format(result_dict[prop]))
# else:
# result_dict[prop] = features[fid]["properties"][prop]
# logger.info("tata {}".format(result_dict[prop]))
result_dict[prop] = features[fid]["properties"][prop]
shared_list.append(result_dict)
def _get_stats_multiproc(self,) -> None:
start = time.time()
folder= self._data_name + "_" + self._checksum
if self.cm_version == "CM001":
cm = "CM001"
elif self.cm_version == "CM002":
cm = "CM002-B11"
elif self.cm_version == "CM003":
cm = str("CM003" +
"-PRB" + str(self.probability) +
"-ITER" + str(self.iterations))
elif self.cm_version == "CM004":
cm = str("CM004" +
"-CSH" + str(1 * self.cld_shad) +
"-CMP" + str(1 * self.cld_med_prob) +
"-CHP" + str(1 * self.cld_hi_prob) +
"-TCI" + str(1 * self.thin_cir) +
"-ITER" + str(1 * self.iterations))
"""Compute stats in polygons."""
with fiona.open(str(self._vectors_file), "r") as vectors:
features = {feat["id"]: feat for feat in vectors}
......@@ -434,21 +470,18 @@ class TimeSeries:
index2 + 1, len(self._indices_list), indice
)
)
# rows_list = []
manager = multiprocessing.Manager()
shared_list = manager.list()
# for tile, fid_list in self._tiles_geoms.items():
#recupérer dans raw_data les noms des produits qu'on a déjà fait
products_data = set(['_'.join(f.stem.split('_')[1:])+".jp2" for f in (Path(self._out_path) / folder / "raw_data" / indice / cm).glob('*') if f.is_file()])
for index3, tile in enumerate(self._tiles_geoms):
shared_list = manager.list()
fid_list = self._tiles_geoms[tile]
tile_obj = Tile(tile)
logger.info(
"Tile {}/{}: {}".format(
index3 + 1, len(self._tiles_geoms), tile
)
)
if TimeSeries._is_indice_in_tile(indice, tile_obj):
tile_indice_path = tile_obj.paths["indices"][
indice.lower()
......@@ -468,7 +501,7 @@ class TimeSeries:
self.cld_hi_prob,
self.thin_cir,
)
products._dict = {cle: valeur for cle, valeur in products._dict.items() if cle not in products_data}
proc_list = list(
itertools.product(
products,
......@@ -479,7 +512,6 @@ class TimeSeries:
)
)
logger.info("{} extractions".format(len(proc_list)))
pool = multiprocessing.Pool(self._multiproc)
results = [
pool.map(
......@@ -494,40 +526,18 @@ class TimeSeries:
pool.close()
pool.join()
# logger.info("{}".format(shared_list))
# for index1, prod in enumerate(products):
rows_list = list(shared_list)
# logger.info("rows_list {}".format(rows_list))
if rows_list:
self._df_dicts[indice] = TimeSeries._list_to_df(
rows_list
)
# prod_path = tile_indice_path / prod.identifier[:(-12 - len(indice))] / prod.identifier
# logger.info("Product {}/{}: {}".format(index1 + 1, len(products), prod.identifier))
# logger.info("{} features".format(len(fid_list)))
# for index, fid in enumerate(fid_list):
# df_dict = OrderedDict()
# df_dict["fid"] = fid
self.to_csv()
rows_list.clear()
self._df_dicts.clear()
# # feat_properties = features[fid]["properties"]
# # if feat_properties:
# # df_dict.update(feat_properties)
# df_dict.update(TimeSeries._get_raster_stats_in_geom(features[fid], prod_path))
# # df_properties = features[fid]["properties"]
# df_dict["date"] = prod.date
# df_dict["tile"] = tile
# df_dict["filename"] = prod.identifier
# for prop in features[fid]["properties"]:
# df_dict[prop] = features[fid]["properties"][prop]
# rows_list.append(df_dict)
# if rows_list:
# self._df_dicts[indice] = TimeSeries._list_to_df(rows_list)
end = time.time()
logger.info(
"Execution time: {}".format(timedelta(seconds=end - start))
)
......@@ -535,7 +545,6 @@ class TimeSeries:
@staticmethod
def _list_to_df(rows_list: List[Dict]) -> pd.DataFrame:
"""Converts a list of dictionaries as a pandas dataframe.
:param rows_list: list of dictionaries.
"""
df = pd.DataFrame.from_dict(rows_list)
......@@ -547,13 +556,13 @@ class TimeSeries:
def to_csv(self, out_path: Union[str, pathlib.PosixPath] = None) -> None:
"""Exports the time series to CSV format.
:param out_path: output folder. Default is DATA/EXTRACTION.
"""
if out_path is None:
out_path = self._out_path
out_path_folder = Path(out_path) / self._vectors_file.stem
out_path_folder.mkdir(parents=True, exist_ok=True)
folder= self._data_name + "_" + self._checksum
base_path_folder= Path(out_path) / folder / "raw_data"
base_path_folder.mkdir(parents=True, exist_ok=True)
list_order = [
"fid",
......@@ -571,26 +580,27 @@ class TimeSeries:
"percentile_25",
"percentile_75",
]
# b=[a for a in df.columns if a not in liste]
for df_name, df in self._df_dicts.items():
csv_path = out_path_folder / "{0}_{1}_{2}_{3}.csv".format(
self._vectors_file.stem,
df_name,
self._date_min,
self._date_max,
)
# df.sort_values(by=['fid', 'date', 'count', 'tile']).reindex(columns=(list_order + [a for a in df.columns if a not in list_order]), copy=False).to_csv(str(csv_path))
df["fid"] = pd.to_numeric(df["fid"])
parts = os.path.splitext(df["filename"][0])[0].split("_")
out_path_folder = Path(base_path_folder) / df_name / parts[8]
out_path_folder.mkdir(parents=True, exist_ok=True)
dates = df.index.unique()
for date in dates:
df_filtered = df[df.index == date]
csv_path = out_path_folder / "{0}_{1}.csv".format(
self._data_name,
os.path.splitext(df_filtered["filename"][0])[0]
)
df.loc[df.index == date,"fid"] = pd.to_numeric(df_filtered["fid"])
dg = (
df.sort_values(
by=["fid", "date", "count", "std"],
df_filtered.sort_values(
by=["date", "fid", "count", "std"],
ascending=[True, True, True, False],
)
.reindex(
columns=(
list_order
+ [a for a in df.columns if a not in list_order]
+ [a for a in df_filtered.columns if a not in list_order]
),
copy=False,
)
......@@ -606,25 +616,24 @@ class TimeSeries:
inplace=True,
)
dg.to_csv(str(csv_path))
logger.info("exported to csv: {}".format(csv_path))
logger.info("exported to csv: ({})".format(csv_path))
def plot_global(
self, out_path: Union[str, pathlib.PosixPath] = None
) -> None:
"""Exports the time series to CSV format.
:param out_path: output folder. Default is DATA/EXTRACTION.
"""
if out_path is None:
out_path = self._out_path
out_path_folder = Path(out_path) / self._vectors_file.stem
folder = self._data_name + "_" + self._checksum
out_path_folder = Path(out_path) / folder
out_path_folder.mkdir(parents=True, exist_ok=True)
for df_name, df in self._df_dicts.items():
for df_name, df in self.data.items():
png_path = (
out_path_folder
/ "{0}_plot-global_{1}_{2}_{3}.png".format(
self._vectors_file.stem,
self._data_name,
df_name,
self._date_min,
self._date_max,
......@@ -644,6 +653,9 @@ class TimeSeries:
plt.close()
logger.info("Plot saved to png: {}".format(png_path))
def plot_details(
self, out_path: Union[str, pathlib.PosixPath] = None
) -> None:
......@@ -653,13 +665,14 @@ class TimeSeries:
"""
if out_path is None:
out_path = self._out_path
out_path_folder = Path(out_path) / self._vectors_file.stem
folder= self._data_name + "_" + self._checksum
out_path_folder = Path(out_path) / folder
out_path_folder.mkdir(parents=True, exist_ok=True)
for df_name, df in self._df_dicts.items():
for df_name, df in self.data.items():
png_path = (
out_path_folder
/ "{0}_plot-details_{1}_{2}_{3}.png".format(
self._vectors_file.stem,
self._data_name,
df_name,
self._date_min,
self._date_max,
......@@ -668,7 +681,7 @@ class TimeSeries:
png_path_nonan = (
out_path_folder
/ "{0}_plot-details_{1}-nonan_{2}_{3}.png".format(
self._vectors_file.stem,
self._data_name,
df_name,
self._date_min,
self._date_max,
......@@ -717,9 +730,7 @@ class TimeSeries:
linestyle="",
legend=False,
)
# dfe.dropna().plot.bar(y=['na_ratio'], ax=ax2, color='red',legend=False)
ax.plot(np.nan, "-r", label="NaN ratio")
# ax.legend(loc=0, prop={'size': 6})
ax.legend(loc=0, labelspacing=0.05)
fig.tight_layout()
fig.suptitle(df_name, fontsize=16)
......@@ -765,7 +776,8 @@ class TimeSeries:
"""Extract ql images around vectors."""
if out_path is None:
out_path = Config().get("extraction_path")
out_path_folder = Path(out_path) / self._vectors_file.stem
folder= self._data_name + "_" + self._checksum
out_path_folder = Path(out_path) / folder
out_path_folder.mkdir(parents=True, exist_ok=True)
for df_name, df in self.data.items():
......@@ -799,7 +811,7 @@ class TimeSeries:
indice_png_path = (
out_path_fid_folder
/ "{0}_MOZ-QL_{1}_{2}_{3}_{4}.jpg".format(
self._vectors_file.stem,
self._data_name,
fidname,
df_name,
self._date_min,
......@@ -809,7 +821,7 @@ class TimeSeries:
l2a_png_path = (
out_path_fid_folder
/ "{0}_MOZ-QL_{1}_{2}_{3}_{4}.jpg".format(
self._vectors_file.stem,
self._data_name,
fidname,
"L2A",
self._date_min,
......@@ -841,13 +853,11 @@ class TimeSeries:
np.array(axs).flatten(),
np.array(bxs).flatten(),
):
# logger.info("row[filename]: {}".format(row['filename']))
prod_id = IndiceProduct(row["filename"]).l2a
# prod_id = row['filename'][:(-12 - len(df_name))]
indice_png_tile_path = (
out_path_fid_folder
/ "{0}_QL_{1}_{2}_{3}_{4}_{5}.jpg".format(
self._vectors_file.stem,
self._data_name,
fidname,
df_name,
prod_id[11:19],
......@@ -858,7 +868,7 @@ class TimeSeries:
l2a_png_tile_path = (
out_path_fid_folder
/ "{0}_QL_{1}_{2}_{3}_{4}_{5}.jpg".format(
self._vectors_file.stem,
self._data_name,
fidname,
"L2A",
prod_id[11:19],
......@@ -873,9 +883,6 @@ class TimeSeries:
tile_l2a_path = tile_obj.paths["l2a"]
prod_path = tile_indice_path / prod_id / row["filename"]
tci_path = L2aProduct(prod_id).tci_10m
# tci_path = list(Path(str(tile_l2a_path / row['filename'][:(-12 - len(df_name))])+ '.SAFE/')\
# .glob('GRANULE/*/IMG_DATA/R10m/*_TCI_10m.jp2'))
crop_extent = gpd.read_file(str(self._vectors_file))
logger.info(tci_path)
raster_tci = rasterio.open(tci_path)
......@@ -978,3 +985,327 @@ class TimeSeries:
figb.suptitle(fidname + " - " + df_name)
figb.savefig(str(indice_png_path), bbox_inches="tight")
plt.close(fig=figb)
def result_csv(
self,
alldata : bool = False,
daily : bool = False,
weekly : bool = False,
monthly : bool = False,
yearly : bool = False,
datemin : str = None,
datemax : str = None,
covermax : int = 100 ):
if(datemin is None and datemax is not None) or (datemin is not None and datemax is None):
raise ValueError("If you specify datemin or datemax, you must specify both")
# Définir 'cm' en fonction de 'self.cm_version'
if self.cm_version == "CM001":
cm = "CM001"
elif self.cm_version == "CM002":
cm = "CM002-B11"
elif self.cm_version == "CM003":
cm = str("CM003" +
"-PRB" + str(self.probability) +
"-ITER" + str(self.iterations))
elif self.cm_version == "CM004":
cm = str("CM004" +
"-CSH" + str(1 * self.cld_shad) +
"-CMP" + str(1 * self.cld_med_prob) +
"-CHP" + str(1 * self.cld_hi_prob) +
"-TCI" + str(1 * self.thin_cir) +
"-ITER" + str(1 * self.iterations))
name= self._data_name + "_" + self._checksum
raw_data= Path(self._out_path) / name / "raw_data"
compiled_data = Path(self._out_path) / name / "compiled_data"
list_order = ['date','fid', 'tile', 'filename', 'count', 'nodata', 'nbcld', 'cldprb', 'min', 'max', 'mean', 'std','median', 'percentile_25', 'percentile_75']
if os.path.isdir(raw_data):
compiled_data.mkdir(parents=True, exist_ok=True)
for indice in os.listdir(raw_data):
print("\n Indice ", indice, " : \n")
ind = Path(compiled_data) / indice
ind.mkdir(parents=True, exist_ok=True)
if cm in os.listdir(Path(raw_data)/indice):
cm_path = Path(ind) / cm
cm_path.mkdir(parents=True, exist_ok=True)
all_data_folder = Path(cm_path)/ "all_data"
if daily:
daily_folder = Path(ind) / cm / "daily"
if weekly:
weekly_folder = Path(ind) / cm / "weekly"
if monthly:
monthly_folder = Path(ind) / cm / "monthly"
if yearly:
yearly_folder = Path(ind) / cm / "yearly"
if datemin is not None and datemax is not None:
custom_folder = Path(ind) / cm / "custom"
path_file = Path(all_data_folder) /f'{self._data_name}_CC{covermax}_all_data.csv'
all_data = pd.DataFrame()
if os.path.isdir(all_data_folder) and os.path.isfile(path_file):
all_data = pd.concat([all_data,pd.read_csv(str(path_file), dtype=object)])
if covermax < 100:
data = data[data['cldprb'] < covermax]
else:
all_data_folder.mkdir(parents=True, exist_ok=True)
raw = Path(raw_data) / indice / cm
new_data = pd.DataFrame()
if daily:
daily_new_data = pd.DataFrame()
#getting all the data available and not already compiled
dates_by_week = {}
dates_by_month = {}
dates_by_year = {}
for f in os.listdir(raw):
parties = f.split('_')
tile = parties[6][1:]
date = f"{parties[3][:4]}-{parties[3][4:6]}-{parties[3][6:8]}"
correspondence = False
if alldata:
if not all_data.empty: #on a déjà un csv all_data
for index, row in all_data.iterrows():
if tile == row['tile'] and date == row['date'][:10]:
correspondence = True
if not correspondence:
new_data = pd.concat([new_data, pd.read_csv(str(Path(raw)/f), dtype=object)])
else: #on avait pas de csv all_data
df = pd.read_csv(str(Path(raw)/f), dtype=object)
new_data = pd.concat([new_data,df])
if daily:
daily_folder.mkdir(parents=True, exist_ok=True)
daily_correspondence = False
daily_csv = daily_folder/ f'{self._data_name}_CC{covermax}_daily_{date}.csv'
if os.path.exists(daily_csv): #if a file with the date exist
data = pd.read_csv(daily_csv, dtype=object)
for index, row in data.iterrows():
if tile == row['tile']:
logger.info(f"A file with this day ({date}) already exists, updating...")
daily_correspondence = True
break
else:
logger.info(f"A file with this day ({date}) doesn't exists, creating...")
if not daily_correspondence:
daily_new_data = pd.concat([daily_new_data, pd.read_csv(str(Path(raw) /f), dtype=object)])
else:
logger.info(f"No new data for this day ({date})")
if weekly:
weekly_folder.mkdir(parents=True, exist_ok=True)
week = datetime.strptime(date, "%Y-%m-%d").strftime('%Y-%U')
if week not in dates_by_week:
dates_by_week[week] = []
dates_by_week[week].append((date, tile, f))
for week in dates_by_week:
dates_by_week[week].sort(key=lambda x: x[0]) # Trie en fonction de la date (x[0])
if monthly:
month = f"{parties[3][:4]}-{parties[3][4:6]}"
if month not in dates_by_month:
dates_by_month[month] = []
dates_by_month[month].append((date, tile, f))
for month in dates_by_month:
dates_by_month[month].sort(key=lambda x: x[0]) # Trie en fonction de la date (x[0])
if yearly:
year = f"{parties[3][:4]}"
if year not in dates_by_year:
dates_by_year[year] = []
dates_by_year[year].append((date, tile, f))
for year in dates_by_year:
dates_by_year[year].sort(key=lambda x: x[0]) # Trie en fonction de la date (x[0])
if alldata:
if not new_data.empty:
csv = Path(all_data_folder) / f'{self._data_name}_CC{covermax}_all_data.csv'
new_data['fid'] = pd.to_numeric(new_data['fid'])
if not all_data.empty:
combined_data = all_data._append(new_data, ignore_index = True)
combined_data['fid'] = pd.to_numeric(combined_data['fid'])
dg= (combined_data.sort_values(by=['date','fid', 'count', 'std'], ascending=[True, True, True, False]).\
reindex(columns=(list_order + [a for a in combined_data.columns if a not in list_order]), copy=False).\
reset_index(drop=True).\
drop_duplicates(['date','fid'], keep='last')
)
if datemin is not None and datemax is not None:
dgcustom = dg[(dg['date'] >= datemin) & (dg['date'] <= datemax)]
else:
dg= (new_data.sort_values(by=['date','fid', 'count', 'std'], ascending=[True, True, True, False]).\
reindex(columns=(list_order + [a for a in new_data.columns if a not in list_order]), copy=False).\
reset_index(drop=True).\
drop_duplicates(['date','fid'], keep='last')
)
if datemin is not None and datemax is not None:
dgcustom = dg[(dg['date'] >= datemin) & (dg['date'] <= datemax)]
dg['date'] = pd.to_datetime(dg['date'])
if covermax < 100:
dg = dg[dg['cldprb'] < covermax]
dg.to_csv(csv, index=False)
logger.info(f"CSV created : ({csv})")
else:
logger.info("No data to compile in the all_data folder : {}, {}".format(cm, indice))
if datemin is not None and datemax is not None:
dgcustom = all_data[(all_data['date'] >= datemin) & (all_data['date'] <= datemax)]
if datemin is not None and datemax is not None:
custom_folder.mkdir(parents=True, exist_ok=True)
csv = Path(custom_folder) / f'{self._data_name}_CC{covermax}_{datemin}_{datemax}.csv'
if not dgcustom.empty:
dgcustom['date'] = pd.to_datetime(dgcustom['date'])
dgcustom.to_csv(csv, index=False)
logger.info(f"CSV created : ({csv})")
if daily:
daily_folder.mkdir(parents=True, exist_ok=True)
if not daily_new_data.empty:
daily_new_data['fid'] = pd.to_numeric(daily_new_data['fid'])
df = (daily_new_data.sort_values(by=['date','fid', 'count', 'std'], ascending=[True, True, True, False]).\
reindex(columns=(list_order + [a for a in daily_new_data.columns if a not in list_order]), copy=False).\
reset_index(drop=True).\
drop_duplicates(['date','fid'], keep='last'))
df['date'] = pd.to_datetime(df['date'])
groups_by_day = df.groupby('date')
for date, group in groups_by_day:
csv = daily_folder/ f'{self._data_name}_CC{covermax}_daily_{date.strftime("%Y-%m-%d")}.csv'
group.to_csv(csv, index=False)
logger.info(f"CSV created : ({csv})")
else: #pas d'update
logger.info("No new data to compile in daily folder : {}, {}".format(cm,indice))
if weekly:
if not weekly_folder.exists():
weekly_folder.mkdir(parents=True, exist_ok=True)
weekly_new_data = pd.DataFrame()
for week, date_list in dates_by_week.items():
read = Path(weekly_folder) / f'{self._data_name}_CC{covermax}_weekly_{week}.csv'
if os.path.isfile(read):
logger.info(f"A file with this week ({week}) already exists, updating...")
data = pd.read_csv(str(read), dtype=object)
date_tile_set = set([(date[0][:10], date[1]) for date in date_list])
new_w_data = pd.concat([pd.read_csv(str(Path(raw) / date[2]), dtype=object) for date in date_list])
new_w_data = new_w_data[~(new_w_data.apply(lambda row: (row['date'][:10], row['tile']) in date_tile_set, axis=1))]
if not new_w_data.empty:
weekly_new_data = pd.concat([data, new_w_data])
else:
logger.info(f"No new data for this week ({week})")
else:
logger.info(f"A file with this week ({week}) doesn't exists, creating...")
weekly_new_data = pd.concat([weekly_new_data, pd.concat([pd.read_csv(str(Path(raw) / date[2]), dtype=object) for date in date_list])])
if not weekly_new_data.empty:
weekly_new_data['fid'] = pd.to_numeric(weekly_new_data['fid'])
df = (weekly_new_data.sort_values(by=['date','fid', 'count', 'std'], ascending=[True, True, True, False]).\
reindex(columns=(list_order + [a for a in weekly_new_data.columns if a not in list_order]), copy=False).\
reset_index(drop=True).\
drop_duplicates(['date','fid'], keep='last'))
df['date'] = pd.to_datetime(df['date'])
groups_by_week = df.groupby(df['date'].dt.strftime('%Y-%U'))
for week, group in groups_by_week:
csv = weekly_folder / f'{self._data_name}_CC{covermax}_weekly_{week}.csv'
group.to_csv(csv,index=False)
logger.info(f"CSV created : ({csv})")
if monthly:
#faire le traitement ici permet d'éviter d'ouvrir plusieurs fois le même fichier
if not monthly_folder.exists():
monthly_folder.mkdir(parents=True, exist_ok=True)
monthly_new_data = pd.DataFrame()
for month, date_list in dates_by_month.items():
read = Path(monthly_folder) / f'{self._data_name}_CC{covermax}_monthly_{month}.csv'
if os.path.isfile(read):
logger.info(f"A file with this month ({month}) already exists, updating...")
data = pd.read_csv(str(read), dtype=object)
data['datewouthtime'] = data['date'].str[0:10]
data_wntd = pd.DataFrame(date_list, columns=['datewouthtime', 'tile', 'file'])
merged = data_wntd.merge(data, on=['datewouthtime', 'tile'], how='left', indicator=True)
filtered_data_wntd = merged[merged['_merge'] != 'both'].drop(columns=['_merge'])
data.drop(columns = ["datewouthtime"], inplace=True)
if not filtered_data_wntd.empty:
new_data_list = []
for _, row in filtered_data_wntd.iterrows():
file_path = Path(raw) / row['file']
if os.path.isfile(file_path):
csv_data = pd.read_csv(file_path, dtype=object)
new_data_list.append(csv_data)
if new_data_list:
new_data_list.append(data)
monthly_new_data = pd.concat(new_data_list, ignore_index=True)
else:
logger.info("No new data to compile in monthly folder for this month : {}, {}, {}".format(cm,indice, month))
else:
logger.info(f"A file with this month ({month}) doesn't exists, creating...")
data_to_concatenate = [pd.read_csv(str(Path(raw) / date[2]), dtype=object) for date in date_list if os.path.isfile(str(Path(raw) / date[2]))]
if data_to_concatenate:
monthly_new_data = pd.concat([monthly_new_data] + data_to_concatenate, ignore_index=True)
if not monthly_new_data.empty:
#"not monthly_new_data.empty !"
monthly_new_data['fid'] = pd.to_numeric(monthly_new_data['fid'])
df = (monthly_new_data.sort_values(by=['date','fid', 'count', 'std'], ascending=[True, True, True, False]).\
reindex(columns=(list_order + [a for a in monthly_new_data.columns if a not in list_order]), copy=False).\
reset_index(drop=True).\
drop_duplicates(['date','fid'], keep='last'))
df['date'] = pd.to_datetime(df['date'])
groups_by_month = df.groupby(df['date'].dt.to_period('M'))
for period, group in groups_by_month:
csv = monthly_folder / f'{self._data_name}_CC{covermax}_monthly_{period.start_time.strftime("%Y-%m")}.csv'
group.to_csv(csv, index=False)
logger.info(f"CSV created : ({csv})")
if yearly:
if not yearly_folder.exists():
yearly_folder.mkdir(parents=True, exist_ok=True)
yearly_new_data = pd.DataFrame()
for year, date_list in dates_by_year.items():
read = Path(yearly_folder) / f'{self._data_name}_CC{covermax}_yearly_{year}.csv'
if os.path.isfile(read): # A file with this year already exists, so it needs to be updated
logger.info(f"A file with this year ({year}) already exists, updating...")
data = pd.read_csv(str(read), dtype=object)
data['datewouthtime'] = data['date'].str[:10]
data_wntd = pd.DataFrame(date_list, columns=['datewouthtime', 'tile', 'file'])
merged = data_wntd.merge(data, on=['datewouthtime', 'tile'], how='left', indicator=True)
filtered_data_wntd = merged[merged['_merge'] != 'both'].drop(columns=['_merge'])
data.drop(columns=["datewouthtime"], inplace=True)
if not filtered_data_wntd.empty:
new_data_list = [] # List to store new data
for _, row in filtered_data_wntd.iterrows():
file_path = Path(raw) / row['file']
if os.path.isfile(file_path):
csv_data = pd.read_csv(file_path, dtype=object)
new_data_list.append(csv_data)
if new_data_list:
new_data_list.append(data) # Add existing data
yearly_new_data = pd.concat(new_data_list, ignore_index=True)
else:
logger.info("No new data to compile in yearly folder for this year : {}, {}, {}".format(cm,indice, year))
else:
logger.info(f"The entire year ({year}) doesn't exist, creating... ")
yearly_new_data = pd.concat([yearly_new_data, pd.concat([pd.read_csv(str(Path(raw) / date[2]), dtype=object) for date in date_list])])
if not yearly_new_data.empty:
yearly_new_data['fid'] = pd.to_numeric(yearly_new_data['fid'])
df = (yearly_new_data.sort_values(by=['date','fid', 'count', 'std'], ascending=[True, True, True, False]).\
reindex(columns=(list_order + [a for a in yearly_new_data.columns if a not in list_order]), copy=False).\
reset_index(drop=True).\
drop_duplicates(['date','fid'], keep='last'))
df['date'] = pd.to_datetime(df['date'])
groups_by_year = df.groupby(df['date'].dt.strftime('%Y'))
for year, group in groups_by_year:
csv = yearly_folder / f'{self._data_name}_CC{covermax}_yearly_{year}.csv'
group.to_csv(csv, index=False)
logger.info(f"CSV created : ({csv})")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment