Skip to content
Snippets Groups Projects
process_l2a_and_indices_by_identifier_multiprocessing.py 3.29 KiB
Newer Older
# -*- coding:utf-8 -*-

""" lance les traitements sur les fichiers L1C qui ont
été téléchargés mais qui n'ont pas été traités 
"""
import logging
import datetime
import pandas as pd
from sen2chain import Tile, L1cProduct, L2aProduct, Library
from sen2chain import l2a_multiprocessing, cldidx_multiprocessing, cld_multiprocessing, idx_multiprocessing
import os, shutil, sys
import glob
import math
from itertools import chain

try:
    arg = sys.argv[1]
except:
    arg = []
print(arg)

logger = logging.getLogger("Traitement L2A")
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.INFO)


# default nombre de jours à requeter à partir de today
# si le champs n'est pas renseingné ds le csv
delta_t = 15

# liste de tuiles à traiter
fwd = os.path.dirname(os.path.realpath(__file__))
tiles_file = fwd + "/97_02_traitement_l2a_identifier_multiprocessing.csv"

tiles_list = pd.read_csv(tiles_file, sep = '_', na_values="", na_filter=False, comment='#')

for index, row in tiles_list.iterrows():
    date = datetime.datetime.strptime(row.starttime, '%Y%m%dT%H%M%S')
    tiles_list.at[index, "starttime"] = date.strftime('%Y-%m-%d')
    tiles_list.at[index, "endtime"] = (date + datetime.timedelta(days=1)).strftime('%Y-%m-%d')
    tiles_list.at[index, "tile"] = row.tile[1:]
    if not row.indices:
            tiles_list.at[index, "indices"] = 'NDVI'

logger.info("\n{}".format(tiles_list))


# Nettoyage de la librairie
clean_list = []
for index, row in tiles_list.iterrows():
    clean_list.append(row.tile)
lib = Library()
lib.clean(clean_list, remove=True)


# Traitement des L1C en L2A
l1c_process_list =  []
for index, row in tiles_list.iterrows():
    t = Tile(row.tile)
    l1c_process_list.append(list(p.identifier for p in t.l2a_missings.filter_dates(date_min = row.starttime, date_max = row.endtime)))
l1c_process_list = list(chain.from_iterable(l1c_process_list))
logger.info("l1c_process_list ({} files): {}\n".format(len(l1c_process_list), l1c_process_list))

l2a_res = False
if l1c_process_list:
    if not arg:
        l2a_res = l2a_multiprocessing(l1c_process_list, nb_proc=12)


# Traitement des L2A (clouds)
cld_l2a_process_list =  []
for index, row in tiles_list.iterrows():
    t = Tile(row.tile)
    cld_l2a_process_list.append(list(p.identifier for p in t.cloudmasks_missings.filter_dates(date_min = row.starttime, date_max = row.endtime)))
cld_l2a_process_list = list(chain.from_iterable(cld_l2a_process_list))
logger.info("cld_l2a_process_list ({} files): {}\n".format(len(cld_l2a_process_list), cld_l2a_process_list))

cld_res = False
if cld_l2a_process_list:
    if not arg:
        cld_res = cld_multiprocessing(cld_l2a_process_list, nb_proc=4)


# Traitement des L2A (indices)
indices_l2a_process_list =  []
for index, row in tiles_list.iterrows():
    t = Tile(row.tile)
    indices_list = row.indices.split("/")
    for i in indices_list:
        l2a_list = [p.identifier for p in t.missing_indices(i).filter_dates(date_min = row.starttime, date_max = row.endtime)]
        for j in l2a_list:
            indices_l2a_process_list.append([j, i])
logger.info("indices_l2a_process_list ({} files): {}\n".format(len(indices_l2a_process_list), indices_l2a_process_list))

indices_res = False
if indices_l2a_process_list:
    if not arg:
        indices_res = idx_multiprocessing(indices_l2a_process_list, nb_proc=4)