Newer
Older
Jeremy Auclair
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# -*- coding: UTF-8 -*-
# Python
"""
03-10-2022 modified 04-07-2023
@author: jeremy auclair
Download S2 data pre-modspa
"""
import os # for path exploration
import shutil # for file management
from eodag import setup_logging # module that downloads S2 data
from eodag import EODataAccessGateway # module that downloads S2 data
import geopandas as gpd # to read shapefile
from typing import List # to declare variables
import csv # for loading and saving path results in csv format
import zipfile as zp # to open zip archives
from tqdm import tqdm # to print progress bars during code execution
from fnmatch import fnmatch # for character string comparison
def download_S2_data(start_date: str, end_date: str, preferred_provider: str, save_path: str, shapefile: str = None, cloud_cover_limit: int = 80) -> List[str]:
"""
download_S2_data uses the eodag module to look for all products of a given provider
(copernicus or theia) during a specific time window and covering the whole shapefile
enveloppe (several Sentinel-2 tiles might be needed). It then downloads that data into
the download path parametered in the config file. Paths to the downloaded data are
returned and saved as a `csv` file.
## Arguments
1. start_date: `str`
beginning of the time window to download (format: `yyyy-mm-dd`)
2. end_date: `str`
end of the time window to download (format: `yyyy-mm-dd`)
3. preferred_provider: `str`
chosen source of the Sentinel-2 data (`copernicus` or `theia`)
4. save_path: `str`
path where a csv file containing the product paths will be saved
5. shapefile: `str`
path to the shapefile (`.shp`) for which the data is downloaded
6. cloud_cover_limit: `int` `default = 80`
maximum percentage to pass the filter before download (between 0 and 100)
## Returns
1. product_paths: `list[str]`
a list of the paths to the downloaded data
"""
setup_logging(2) # 3 for even more information
dag = EODataAccessGateway()
# Open shapefile containing geometry
geopandas_shape = gpd.read_file(shapefile)
geopandas_shape = geopandas_shape.to_crs(epsg = '4326') # Force WGS84 projection
bounds = geopandas_shape.geometry.total_bounds # In WGS84 projection
# Select product type based on preferred provider
if preferred_provider == 'theia':
product_type = 'S2_MSI_L2A_MAJA'
dag.set_preferred_provider('theia')
else:
product_type = 'S2_MSI_L2A'
dag.set_preferred_provider('scihub')
# Create a search criteria to feed into the eodag search_all method
search_criteria = {
'productType': product_type,
'start': start_date,
'end': end_date,
'geom': list(bounds)
}
# Try to search all products corresponding to the search criteria. If a type error occurs it
# means there is an error in the search criteria parameters
try:
all_products = dag.search_all(**search_criteria)
except TypeError:
print('Something went wrong during the product search, check your inputs')
return None
# If the search_all method returns None, there is no product matching the search criteria
if len(all_products) == 0:
print('No products matching your search criteria were found')
return None
# Filter products that have more clouds than desired
products_to_download = all_products.filter_property(cloudCover = cloud_cover_limit, operator = 'lt')
product_paths = dag.download_all(products_to_download, extract = False) # No archive extraction
product_paths.sort()
# Save list of paths as a csv file for later use
with open(save_path, 'w', newline = '') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for product in product_paths:
write.writerow([product])
return product_paths
def extract_zip_archives(download_path: str, list_paths: List[str], bands_to_extract: List[str], save_path: str, remove_archive: bool = False) -> List[str]:
"""
Extract specific bands in a zip archive for a list of tar archives.
## Arguments
1. download_path: `str`
path in which the archives will be extracted (usually where the archives are located)
2. list_paths: `List[str]`
list of paths to the zip archives
3. bands_to_extract: `List[str]`
list of strings that will be used to match specific bands. For example if you are looking
for bands B3 and B4 in a given archive, `bands_to_extract = ['*_B3.TIF', '*_B4.TIF']`. This
depends on the product architecture.
4. save_path: `str`
path where a csv file containing the product paths will be saved
5. remove_archive: `bool` `default = False`
boolean to choose whether to remove the archive or not
## Returns
1. product_list: `List[str]`
list of the paths to the extracted products
"""
# Final product list
product_list = []
progress_bar = tqdm(total = len(list_paths))
for file_path in list_paths:
# Change progress bar to print current file
progress_bar.set_description_str(desc = '\rExtracting ' + os.path.basename(file_path) + '\ntotal progress')
Jeremy Auclair
committed
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# Get path in which to extract the archive
extract_path = download_path + os.sep + os.path.basename(file_path)[:-4]
# Extract desired bands from tar file
with zp.ZipFile(file_path, mode = 'r') as myzip:
file_list = (myzip.namelist())
for f in file_list:
for band in bands_to_extract:
if fnmatch(f, band):
# Extract file
myzip.extract(f, path = extract_path)
# Move extracted file to the root of the directory
f_name = os.path.basename(f)
shutil.move(extract_path + os.sep + f, extract_path + os.sep + f_name)
product_list.append(extract_path + os.sep + f_name)
# Remove unecessary empty directories
subfolder = [ f.path for f in os.scandir(extract_path) if f.is_dir()][0]
shutil.rmtree(subfolder)
if remove_archive:
# Remove zip file
os.remove(file_path)
progress_bar.update(1)
progress_bar.close()
# Save list of paths as a csv file for later use
with open(save_path, 'w', newline = '') as f:
# using csv.writer method from CSV package
write = csv.writer(f)
for product in product_list:
write.writerow([product])
return product_list