Newer
Older
Jeremy Auclair
committed
# -*- coding: UTF-8 -*-
# Python
"""
Contains functions to facilitate code workflow, directory creation, Notebook readability, logfile creation, etc.
04-07-2023
@author: jeremy auclair
"""
Jeremy Auclair
committed
import os # for path management
Jeremy Auclair
committed
from typing import List, Tuple # to declare argument types
import numpy as np # vectorized math
from fnmatch import fnmatch # to match character strings
Jeremy Auclair
committed
import re # for character string search
import datetime # for date management
Jeremy Auclair
committed
from scipy.signal import medfilt # scipy median filter
Jeremy Auclair
committed
import ruamel.yaml as yml # for yaml file modification
Jeremy Auclair
committed
from numba import jit, njit # to compile functions for faster execution
from modspa_pixel.config.config import config # to import config file
Jeremy Auclair
committed
def product_str_to_datetime(product_name: str) -> datetime.date:
"""
Jeremy Auclair
committed
product_str_to_datetime returns a ``datetime.date`` object for the date of the given product.
Jeremy Auclair
committed
Works for both copernicus and theia providers.
Jeremy Auclair
committed
Arguments
=========
1. product_name: ``str``
Jeremy Auclair
committed
name or path of the product
Jeremy Auclair
committed
Returns
=======
1. product_date: ``datetime.date``
Jeremy Auclair
committed
datetime.date object, date of the product
"""
# Search for a date pattern (yyyymmdd) in the product name or path
try:
match = re.search('\d{4}\d{2}\d{2}', product_name)
format = '%Y%m%d'
datetime_object = datetime.datetime.strptime(match[0], format)
return datetime_object.date()
except TypeError:
pass
# Search for a date pattern (yymmdd) in the product name or path
try:
match = re.search('\d{2}\d{2}\d{2}', product_name)
format = '%y%m%d'
datetime_object = datetime.datetime.strptime(match[0], format)
return datetime_object.date()
except TypeError:
Jeremy Auclair
committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
pass
def read_product_info(path_to_product: str) -> Tuple[datetime.date, str, str, str]:
"""
Read_product_info detects and returns the date, tile and provider of a given product.
Arguments
=========
1. path_to_product: ``str``
path to the product
Returns
=======
1. date, tile, provider): ``tuple``
``datetime.date`` object of the product
2. tile: ``str``
tile name of product
3. provider: ``str``
provider of product
4. satellite: ``str``
name of the satellite (ex: Sentinel-2, LandSat-8, SPOT-4)
"""
# Collect date of product
date = product_str_to_datetime(path_to_product)
# Test provider type
if fnmatch(path_to_product, '*S2?_MSIL2A_20*'):
provider = 'copernicus'
satellite = 'Sentinel-2'
# Try finding the tile pattern (_TXXXXX_) in the path to product. None is returned if the path is not in
# accordance with copernicus or theia naming schemes
try:
tile = re.findall('_T(.?.?.?.?.?)_', path_to_product)[0]
except TypeError:
print('Error in provided path, no tile found in string')
tile = None
elif fnmatch(path_to_product, '*SENTINEL2?_20*'):
provider = 'theia'
satellite = 'Sentinel-2'
# Try finding the tile pattern (_TXXXXX_) in the path to product. None is returned if the path is not in
# accordance with copernicus or theia naming schemes
try:
tile = re.findall('_T(.?.?.?.?.?)_', path_to_product)[0]
except TypeError:
print('Error in provided path, no tile found in string')
tile = None
elif fnmatch(path_to_product, '*LC08_L2SP_*'):
provider = 'usgs'
satellite = 'LandSat-8'
# Try finding the tile pattern (_XXXXXX_) in the path to product. None is returned if the path is not in
# accordance with usgs naming schemes
try:
tile = re.findall('_(.?.?.?.?.?.?)_', path_to_product)[0]
except TypeError:
print('Error in provided path, no tile found in string')
tile = None
elif fnmatch(path_to_product, '*LE07_L2SP_*'):
provider = 'usgs'
satellite = 'LandSat-7'
# Try finding the tile pattern (_XXXXXX_) in the path to product. None is returned if the path is not in
# accordance with usgs naming schemes
try:
tile = re.findall('_(.?.?.?.?.?.?)_', path_to_product)[0]
except TypeError:
print('Error in provided path, no tile found in string')
tile = None
elif fnmatch(path_to_product, '*SP4_OPER_*'):
provider = 'swh'
satellite = 'SPOT-4'
# Try finding the tile pattern (_XXXXXX-) in the path to product. None is returned if the path is not in
# accordance with SPOT naming schemes
try:
tile = re.findall('T(.?.?.?.?.?.?)_', path_to_product)[0]
except TypeError:
print('Error in provided path, no tile found in string')
tile = None
elif fnmatch(path_to_product, '*SP5_OPER_*'):
provider = 'swh'
satellite = 'SPOT-5'
# Try finding the tile pattern (_XXXXXX-) in the path to product. None is returned if the path is not in
# accordance with SPOT naming schemes
try:
tile = re.findall('T(.?.?.?.?.?.?)_', path_to_product)[0]
except TypeError:
print('Error in provided path, no tile found in string')
tile = None
elif fnmatch(path_to_product, '*SP_*'):
provider = 'swh'
satellite = 'SP-5'
tile = ''
else:
tile, provider, satellite = '', '', ''
return date, tile, provider, satellite
Jeremy Auclair
committed
def prepare_directories(config_file: str) -> None:
"""
Creates the directories for the data inputs and outputs.
Arguments
=========
1. config_file: ``str``
Jeremy Auclair
committed
path to configuration file
Jeremy Auclair
committed
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
Returns
=======
``None``
"""
# Open config file
config_params = config(config_file)
# Get parameters
download_path = config_params.download_path
era5_path = config_params.era5_path
run_name = config_params.run_name
output_path = config_params.output_path
# Create all the directories for downloaded data if they do not exist.
if not os.path.exists(download_path):
os.mkdir(download_path)
# Path for scihub data
scihub_path = download_path + os.sep + 'SCIHUB'
if not os.path.exists(scihub_path):
os.mkdir(scihub_path)
ndvi_scihub_path = scihub_path + os.sep + 'NDVI'
if not os.path.exists(ndvi_scihub_path):
os.mkdir(ndvi_scihub_path)
ndvi_run_name = ndvi_scihub_path + os.sep + run_name
if not os.path.exists(ndvi_run_name):
os.mkdir(ndvi_run_name)
Jeremy Auclair
committed
# Path for theia data
theia_path = download_path + os.sep + 'THEIA'
if not os.path.exists(theia_path):
os.mkdir(theia_path)
ndvi_theia_path = theia_path + os.sep + 'NDVI'
if not os.path.exists(ndvi_theia_path):
os.mkdir(ndvi_theia_path)
ndvi_run_name = ndvi_theia_path + os.sep + run_name
if not os.path.exists(ndvi_run_name):
os.mkdir(ndvi_run_name)
Jeremy Auclair
committed
# Path for usgs data
usgs_path = download_path + os.sep + 'USGS'
if not os.path.exists(usgs_path):
os.mkdir(usgs_path)
ndvi_usgs_path = usgs_path + os.sep + 'NDVI'
if not os.path.exists(ndvi_usgs_path):
os.mkdir(ndvi_usgs_path)
# Create weather data directories
if not os.path.exists(era5_path):
os.mkdir(era5_path)
Jeremy Auclair
committed
if not os.path.exists(era5_path + os.sep + run_name):
os.mkdir(era5_path + os.sep + run_name)
# Create soil data directories
soil_dir = download_path + os.sep + 'SOIL'
if not os.path.exists(soil_dir):
os.mkdir(soil_dir)
if not os.path.exists(soil_dir + os.sep + run_name):
os.mkdir(soil_dir + os.sep + run_name)
# Create land cover data directories
landcover_dir = download_path + os.sep + 'LAND_COVER'
if not os.path.exists(landcover_dir):
os.mkdir(landcover_dir)
if not os.path.exists(landcover_dir + os.sep + run_name):
os.mkdir(landcover_dir + os.sep + run_name)
Jeremy Auclair
committed
# Create output data directories
if not os.path.exists(output_path):
os.mkdir(output_path)
Jeremy Auclair
committed
if not os.path.exists(output_path + os.sep + run_name):
os.mkdir(output_path + os.sep + run_name)
Jeremy Auclair
committed
return None
Jeremy Auclair
committed
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
@jit(nopython = False, forceobj = True)
def intersection(list1: list, list2: list) -> list:
"""
returns the intersection of two lists. Objects in lists need to be comparable.
Arguments
=========
1. list1: ``list``
first list
2. list2: ``list``
second list
Returns
=======
1. list3: ``list``
list of the intersection of list1 and list2
"""
list3 = [value for value in list1 if value in list2]
list3 = list(dict.fromkeys(list3))
return list3
@jit(nopython = False, forceobj = True)
def avg_2dates(date1: datetime.datetime, date2: datetime.datetime) -> datetime.datetime:
"""
Get the average date between to dates. For example, the average between 01/01/2020 and 07/01/2020 is 04/01/2020.
if the number of days between the two dates is odd, the average date has 12 hours added to it.
Arguments
=========
1. date1: ``datetime.datetime``
first date
2. date2: ``datetime.datetime``
second date
Returns
=======
1. averaged date: ``datetime.datetime``
averaged date as datetime format
"""
if date2 >= date1:
return date1 + datetime.timedelta((date2 - date1).days)/2
else:
return date2 + datetime.timedelta((date1 - date2).days)/2
@jit(nopython = False, forceobj = True)
def calculate_time_derivative(values: np.ndarray, dates: List[datetime.datetime]) -> Tuple[List[float], List[datetime.datetime]]:
"""
Get a time derivative of a series of dated values (one value per day maximum). If values is of length N,
the returned lists are of length n-1
Arguments
=========
1. values: ``list[float]``
list of timed values
2. dates: ``list[np.datetime64]``
list of dates for the values
Returns
=======
1. derivative: ``list[float]``
derivative values
2. deriv_dates: ``list[np.datetime64]``
dates corresponding to the derivative values
"""
# Get derivative values
value_deltas, time_deltas = values[1:] - values[:-1], (dates[1:] - dates[:-1]).days
derivative = np.array(value_deltas/time_deltas)
# Get new date list for derivative values
deriv_dates = [avg_2dates(dates[i+1], dates[i]) for i in range(len(dates)-1)]
return derivative, deriv_dates
@njit
def detect_anomalies_deriv(derivative: np.ndarray, threshold: float) -> List[int]:
"""
Detects anomalies in a list of derivative of values, based on a threshold. The algorithm
looks for derivative high values (negative or positive) followed by another high value of
the opposite sign. Filters out two or three consecutive values to keep the one that looks
like a peak.
Arguments
=========
1. derivative: ``list[float]``
list of derivative values
2. threshold: ``float``
float value between two derivative values to flag as an anomaly
Returns
=======
1. list_anomalies: ``list[int]``
indexes (derivative values) of flagged values.
"""
list_anomalies = []
# Detect anomalies based on derivative value
for i in range(1, len(derivative)-1):
if abs(derivative[i+1] - derivative[i]) > threshold:
# First condition finds high changes in derivative (peaks) values that are not updward or downward curves
# Second condition ensures that the consecutive derivative values change sign (indicator of peak value)
# Third condition ensures that the following derivative values isn't too small (meaning we are not selecting a "normal" curve)
if abs(derivative[i+1] - derivative[i-1]) <= abs(derivative[i+1] - derivative[i]) and np.sign(derivative[i+1]) != np.sign(derivative[i]) and abs(derivative[i+1]) > threshold/5:
list_anomalies.append(i)
# Find series of 3 consecutive anomamlies, keep middle one
to_pop = []
for i in range(1, len(list_anomalies)-1):
if list_anomalies[i+1] - list_anomalies[i] == 1 and list_anomalies[i] - list_anomalies[i-1] == 1:
to_pop.append(i+1)
to_pop.append(i-1)
list_anomalies = [list_anomalies[i] for i in range(len(list_anomalies)) if i not in to_pop]
# Find double consecutive anomalies, keep the one with the highest derivative value on the right
to_pop = []
for i in range(len(list_anomalies)-1):
if list_anomalies[i+1] - list_anomalies[i] == 1:
if abs(derivative[list_anomalies[i]+1] - derivative[list_anomalies[i]]) > max(abs(derivative[list_anomalies[i]] - derivative[list_anomalies[i]-1]), abs(derivative[list_anomalies[i]+2] - derivative[list_anomalies[i]+1])):
to_pop.append(i+1)
else:
to_pop.append(i)
list_anomalies = [list_anomalies[i] for i in range(len(list_anomalies)) if i not in to_pop]
return list_anomalies
def detect_anomalies_median(values: List[float], threshold_ratio: float, window: int = 3) -> List[int]:
"""
Detects anomalies in a NDVI time series based on a moving median filter. If one point is to
far from its corresponding point in the median filtered list, it is flagged as an anomaly. The
threshold is calculated as followed: ``threshold[i] = threshold_ratio*median(values[i-1], values[i], values[i+1])``
Arguments
=========
1. values: ``List[float]``
values in which to detect the anomalies
2. threshold_ratio: ``float``
ratio to apply to the current median value to get a threshold over which to flag values
3. window: ``int`` ``default = 3``
size of window for moving median filter
Returns
=======
1. list_anomalies: ``List[int]``
indexes of flagged values
"""
list_anomalies = []
# Set moving window to size of list if list is smaller
if len(values) < 3: window = 1
# Get median filter
median = medfilt(values, window)
# Calculate delta
Jeremy Auclair
committed
delta = np.abs(values - median)
Jeremy Auclair
committed
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
# Return indexes where condition is met
list_anomalies = np.argwhere(delta > threshold_ratio*median)
return list_anomalies
def find_anomalies(values: List[float], dates: List[datetime.datetime], deriv_threshold_ratio: float = 25, median_threshold_ratio: float = 0.1, median_window: int = 3) -> List[int]:
"""
Detects anomalies with the intersection of two filters. The first (derivative filter)
finds anomalies by looking for consecuvite high derivative values of opposite sign (with
a few additional conditions). The second (median filter) finds anomalies by comparing each
point to its corresponding point in the median filter list, when the difference is over the
given threshold, the point is flagged. The intersection of these two filters is returned as
the list of detected anomalies.
Arguments
=========
1. values: ``List[float]``
list in which to detect anomalies
2. dates: ``List[datetime.datetime]``
list of dates of values
3. deriv_threshold_ratio: ``int`` ``default = 25``
ratio to divide the max of values to get a threshold for the derivative values.
25 is arbitrary but seems to work well for NDVI values
changes in NDVI values can seem "more brutal")
4. median_threshold_ratio: ``float`` ``default = 0.1``
threshold ratio for median filter
5. median_window: ``int`` ``default = 3``
size of window for median filter
Returns
=======
1. list_anomalies: ``List[int]``
indexes of values flagged as anomalies
"""
# Get values' derivative
values_deriv, _ = calculate_time_derivative(values, dates)
# Get threshold from data
deriv_threshold = max(values)/deriv_threshold_ratio
# Get anomalies
deriv_anomalies = list(np.array(detect_anomalies_deriv(values_deriv, deriv_threshold)) +1)
median_anomalies = detect_anomalies_median(values, median_threshold_ratio, median_window)
# Get intersection
list_anomalies = intersection(deriv_anomalies, median_anomalies)
Jeremy Auclair
committed
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
return list_anomalies
def set_eodag_config_file(path_to_config_file: str, download_dir: str, provider: str) -> None:
"""
Modifies the download path in the eodag `yaml` configuration file according to the code configuration file if needed.
If you add additionnal providers this function needs to be modified: add elif condition for new provider.
## Arguments
1. path_to_config_file: `str`
path to the eodag config file
2. download_dir: `str`
path to the download directory declared in the code config file
3. provider: `str`
provider for which tu update/change download directory
## Returns
`None`
"""
# Check if eodag config files exists (said file is created on the first run of eodag)
if not os.path.exists(path_to_config_file):
print("First run of eodag, eodag config file doesn't exist yet, run script again.\n")
return None
# Open eodag config file
parameter_list, ind, bsi = yml.util.load_yaml_guess_indent(open(path_to_config_file))
# Adapt structure based on provider
if provider == 'coperniucs':
provider = 'scihub'
if parameter_list[provider]['api']['outputs_prefix'] != download_dir + os.sep + 'SCIHUB' + os.sep:
parameter_list[provider]['api']['outputs_prefix'] = download_dir + os.sep + 'SCIHUB' + os.sep
elif provider == 'theia':
if parameter_list[provider]['download']['outputs_prefix'] != download_dir + os.sep + 'THEIA' + os.sep:
parameter_list[provider]['download']['outputs_prefix'] = download_dir + os.sep + 'THEIA' + os.sep
elif provider == 'usgs':
if parameter_list[provider]['api']['outputs_prefix'] != download_dir + os.sep + 'USGS' + os.sep:
parameter_list[provider]['api']['outputs_prefix'] = download_dir + os.sep + 'USGS' + os.sep
# Save modified data
yaml = yml.YAML()
yaml.indent(mapping=ind, sequence=ind, offset=bsi)
with open(path_to_config_file, 'w') as fp:
yaml.dump(parameter_list, fp)
return None