Skip to content
Snippets Groups Projects
process_l2a_and_indices_by_identifier_multiprocessing.py 3.54 KiB
Newer Older
""" Process L1C products by identifiers 
    
    Input : - an identifier list in a csv file
            - a Sen2chain L1C library 
    Output : L2A / indices
    
    If any argument is supplied nothiing is done, this script outputs only a product list to process but without any processing
    
    This script uses multiprocessing to process files. 
    Please adapt the number of process lines 65, 79, 96 according to your hardware
    
    Please refer to sen2chain wiki for more info : https://framagit.org/jebins/sen2chain/wikis/home
import logging
import datetime
import pandas as pd
from sen2chain import Tile, L1cProduct, L2aProduct, Library
from sen2chain import l2a_multiprocessing, cldidx_multiprocessing, cld_multiprocessing, idx_multiprocessing
import os, shutil, sys
import glob
import math
from itertools import chain

try:
    arg = sys.argv[1]
except:
    arg = []
print(arg)

logger = logging.getLogger("L2A Processing")
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.INFO)


# default number of days to check before today 
# if this field is empty in the csv file
# file path to tile identifiers list to process
fwd = os.path.dirname(os.path.realpath(__file__))
tiles_file = fwd + "/process_l2a_and_indices_by_identifier_multiprocessing.csv"

tiles_list = pd.read_csv(tiles_file, sep = '_', na_values="", na_filter=False, comment='#')

for index, row in tiles_list.iterrows():
    date = datetime.datetime.strptime(row.starttime, '%Y%m%dT%H%M%S')
    tiles_list.at[index, "starttime"] = date.strftime('%Y-%m-%d')
    tiles_list.at[index, "endtime"] = (date + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    tiles_list.at[index, "tile"] = row.tile[1:]
    if not row.indices:
            tiles_list.at[index, "indices"] = 'NDVI'

logger.info("\n{}".format(tiles_list))


# Processing L1C to L2A
l1c_process_list =  []
for index, row in tiles_list.iterrows():
    t = Tile(row.tile)
    l1c_process_list.append(list(p.identifier for p in t.l2a_missings.filter_dates(date_min = row.starttime, date_max = row.endtime)))
l1c_process_list = list(chain.from_iterable(l1c_process_list))
logger.info("l1c_process_list ({} files): {}\n".format(len(l1c_process_list), l1c_process_list))

l2a_res = False
if l1c_process_list:
    if not arg:
        l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=12)


# Processing L2A cloud masks
cld_l2a_process_list =  []
for index, row in tiles_list.iterrows():
    t = Tile(row.tile)
    cld_l2a_process_list.append(list(p.identifier for p in t.cloudmasks_missings.filter_dates(date_min = row.starttime, date_max = row.endtime)))
cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list))
logger.info("cld_l2a_process_list ({} files): {}\n".format(len(cld_l2a_process_list), cld_l2a_process_list))

cld_res = False
if cld_l2a_process_list:
    if not arg:
        cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=4)


# Processing L2A indices
indices_l2a_process_list =  []
for index, row in tiles_list.iterrows():
    t = Tile(row.tile)
    indices_list = row.indices.split("/")
    for i in indices_list:
        l2a_list = [p.identifier for p in t.missing_indices(i).filter_dates(date_min = row.starttime, date_max = row.endtime)]
        for j in l2a_list:
            indices_l2a_process_list.append([j, i])
logger.info("indices_l2a_process_list ({} files): {}\n".format(len(indices_l2a_process_list), indices_l2a_process_list))

indices_res = False
if indices_l2a_process_list:
    if not arg:
        indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc=4)