-
Christophe R authoredChristophe R authored
process_l2a_and_indices_multiprocessing.py 3.47 KiB
# -*- coding:utf-8 -*-
""" Process L1C products
Input : - a tile list in a csv file (tile;start_time;end_time;max_clouds;site;nom;indices)
- a Sen2chain L1C library
Output : L2A / indices
If any argument is supplied nothiing is done, this script outputs only a product list to process but without any processing
This script uses multiprocessing to process files.
Adapt the number of process (lines 65, 79, 96) according to your hardware
Please refer to sen2chain wiki for more info : https://framagit.org/jebins/sen2chain/wikis/home
"""
import logging
import datetime
import pandas as pd
from sen2chain import Tile
from sen2chain import l2a_multiprocessing, cld_multiprocessing, idx_multiprocessing
import os, sys
from itertools import chain
try:
arg = sys.argv[1]
except:
arg = []
print(arg)
logger = logging.getLogger("L2A Prcessing")
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.INFO)
# default number of days to check before today
# if this field is empty in the csv file
delta_t = 15
# file path to tile list to process
fwd = os.path.dirname(os.path.realpath(__file__))
tiles_file = fwd + "/process_l2a_and_indices_multiprocessing.csv"
tiles_list = pd.read_csv(tiles_file, sep = ';', na_values="", na_filter=False, comment='#')
for index, row in tiles_list.iterrows():
if not row.start_time:
tiles_list.at[index, "start_time"] = (datetime.datetime.now()-datetime.timedelta(days=delta_t)).strftime('%Y-%m-%d')
if not row.end_time:
tiles_list.at[index, "end_time"] = (datetime.datetime.now()+datetime.timedelta(days=1)).strftime('%Y-%m-%d')
if not row.indices:
tiles_list.at[index, "indices"] = 'NDVI'
logger.info("\n{}".format(tiles_list))
# Processing L1C to L2A
l1c_process_list = []
for index, row in tiles_list.iterrows():
t = Tile(row.tile)
l1c_process_list.append(list(p.identifier for p in t.l2a_missings.filter_dates(date_min = row.start_time, date_max = row.end_time)))
l1c_process_list = list(chain.from_iterable(l1c_process_list))
logger.info("l1c_process_list ({} files): {}\n".format(len(l1c_process_list), l1c_process_list))
l2a_res = False
if l1c_process_list:
if not arg:
l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=4)
# Processing L2A cloud masks
cld_l2a_process_list = []
for index, row in tiles_list.iterrows():
t = Tile(row.tile)
cld_l2a_process_list.append(list(p.identifier for p in t.cloudmasks_missings.filter_dates(date_min = row.start_time, date_max = row.end_time)))
cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list))
logger.info("cld_l2a_process_list ({} files): {}\n".format(len(cld_l2a_process_list), cld_l2a_process_list))
cld_res = False
if cld_l2a_process_list:
if not arg:
cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=4)
# Processing L2A indices
indices_l2a_process_list = []
for index, row in tiles_list.iterrows():
t = Tile(row.tile)
indices_list = row.indices.split("/")
for i in indices_list:
l2a_list = [p.identifier for p in t.missing_indices(i).filter_dates(date_min = row.start_time, date_max = row.end_time)]
for j in l2a_list:
indices_l2a_process_list.append([j, i])
logger.info("indices_l2a_process_list ({} files): {}\n".format(len(indices_l2a_process_list), indices_l2a_process_list))
indices_res = False
if indices_l2a_process_list:
if not arg:
indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc=4)