diff --git a/effector/CookieCutter.py b/effector/CookieCutter.py deleted file mode 100644 index 19d61df791e1daf3c602d245bc683bf815189d93..0000000000000000000000000000000000000000 --- a/effector/CookieCutter.py +++ /dev/null @@ -1,31 +0,0 @@ -# -# Based on lsf CookieCutter.py -# -import os -import json - -d = os.path.dirname(__file__) -with open(os.path.join(d, "settings.json")) as fh: - settings = json.load(fh) - - -class CookieCutter: - - SBATCH_DEFAULTS = settings['SBATCH_DEFAULTS'] - CLUSTER_NAME = settings['CLUSTER_NAME'] - CLUSTER_CONFIG = settings['CLUSTER_CONFIG'] - ADVANCED_ARGUMENT_CONVERSION = settings['ADVANCED_ARGUMENT_CONVERSION'] - - @staticmethod - def get_cluster_option() -> str: - cluster = CookieCutter.CLUSTER_NAME - if cluster != "": - return f"--cluster={cluster}" - return "" - - @staticmethod - def get_advanced_argument_conversion() -> bool: - val = {"yes": True, "no": False}[ - CookieCutter.ADVANCED_ARGUMENT_CONVERSION - ] - return val diff --git a/effector/__pycache__/CookieCutter.cpython-37.pyc b/effector/__pycache__/CookieCutter.cpython-37.pyc deleted file mode 100644 index df18a126968440b7a86cf5b8997acc5f4d3c19fd..0000000000000000000000000000000000000000 Binary files a/effector/__pycache__/CookieCutter.cpython-37.pyc and /dev/null differ diff --git a/effector/__pycache__/slurm_utils.cpython-37.pyc b/effector/__pycache__/slurm_utils.cpython-37.pyc deleted file mode 100644 index 38a06d13029b3157be5a5898bc352b16f1119838..0000000000000000000000000000000000000000 Binary files a/effector/__pycache__/slurm_utils.cpython-37.pyc and /dev/null differ diff --git a/effector/cluster_config_SLURM.yaml b/effector/cluster_config_SLURM.yaml deleted file mode 100644 index 1f045d507ef77fe27a9f335f5ee2bb7fd962b76f..0000000000000000000000000000000000000000 --- a/effector/cluster_config_SLURM.yaml +++ /dev/null @@ -1,95 +0,0 @@ -__default__: - cpus-per-task : 4 - mem-per-cpu : 10G - partition : long - output : 'slurm_logs/stdout/{rule}/{wildcards}.o' - error : 'slurm_logs/error/{rule}/{wildcards}.e' - job-name : '{rule}.{wildcards}' - - -rename_protein: - cpus-per-task: 1 - partition: fast - -phobius: - cpus-per-task: 5 - mem-per-cpu: 10G - partition: fast - -signalP: - cpus-per-task: 10 - mem-per-cpu: 10G - partition: fast - -targetp: - cpus-per-task: 10 - mem-per-cpu: 10G - partition: fast - -predgpi: - cpus-per-task: 10 - mem-per-cpu: 10G - partition: fast - -parse_phobius: - cpus-per-task: 1 - partition: fast - -parse_signalp: - cpus-per-task: 1 - partition: fast - -parse_targetp: - cpus-per-task: 1 - partition: fast - -parse_predgpi: - cpus-per-task: 1 - partition: fast - -intersect_tools: - cpus-per-task: 1 - partition: fast - -fasta_intersect: - cpus-per-task: 1 - partition: fast - -tmhmm: - cpus-per-task: 5 - partition: fast - -parse_tmhmm: - cpus-per-task: 1 - partition: fast - -tmhmm_fasta: - cpus-per-task: 1 - partition: fast - -wolfpsort: - cpus-per-task: 10 - mem-per-cpu: 10G - partition: fast - -parse_wolfpsort: - cpus-per-task: 1 - partition: fast - -id_tofasta_secreted: - cpus-per-task: 1 - partition: fast - -hmmer_pfam: - cpus-per-task: 8 - mem-per-cpu: 5G - partition: fast - -effectorP: - cpus-per-task: 10 - mem-per-cpu: 10G - partition: fast - -count_effector: - cpus-per-task: 1 - partition: fast diff --git a/effector/config.yaml b/effector/config.yaml deleted file mode 100644 index 7ea6574c445df10e496732f3840db4fb57a598eb..0000000000000000000000000000000000000000 --- a/effector/config.yaml +++ /dev/null @@ -1,27 +0,0 @@ -restart-times: 0 -jobscript: "slurm-jobscript.sh" -cluster: "slurm-submit.py" -cluster-status: "slurm-status.py" -max-jobs-per-second: 1 -max-status-checks-per-second: 10 -local-cores: 1 -jobs: 200 -latency-wait: 60000000 -use-envmodules: true -use-singularity: false -rerun-incomplete: false -printshellcmds: true - -# Example resource configuration -# default-resources: -# - runtime=100 -# - mem_mb=6000 -# - disk_mb=1000000 -# # set-threads: map rule names to threads -# set-threads: -# - single_core_rule=1 -# - multi_core_rule=10 -# # set-resources: map rule names to resources in general -# set-resources: -# - high_memory_rule:mem_mb=12000 -# - long_running_rule:runtime=1200 diff --git a/effector/settings.json b/effector/settings.json deleted file mode 100644 index daf60247422dd4b93b91063f93e2e7f63d44d461..0000000000000000000000000000000000000000 --- a/effector/settings.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "SBATCH_DEFAULTS": "--export=ALL", - "CLUSTER_NAME": "", - "CLUSTER_CONFIG": "/shared/home/tdurand/effector/cluster_config_SLURM.yaml", - "ADVANCED_ARGUMENT_CONVERSION": "no" -} diff --git a/effector/slurm-jobscript.sh b/effector/slurm-jobscript.sh deleted file mode 100644 index 391741ef8824f4b691752e68651f097395d17f70..0000000000000000000000000000000000000000 --- a/effector/slurm-jobscript.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -# properties = {properties} -{exec_job} diff --git a/effector/slurm-status.py b/effector/slurm-status.py deleted file mode 100644 index 6dc23237c0982578cf013ca0bfd25f787fd04938..0000000000000000000000000000000000000000 --- a/effector/slurm-status.py +++ /dev/null @@ -1,72 +0,0 @@ -#!/usr/bin/env python3 -import re -import subprocess as sp -import shlex -import sys -import time -import logging -from CookieCutter import CookieCutter - -logger = logging.getLogger("__name__") - -STATUS_ATTEMPTS = 20 - -jobid = sys.argv[1] - -cluster = CookieCutter.get_cluster_option() - -for i in range(STATUS_ATTEMPTS): - try: - sacct_res = sp.check_output(shlex.split(f"sacct {cluster} -P -b -j {jobid} -n")) - res = { - x.split("|")[0]: x.split("|")[1] - for x in sacct_res.decode().strip().split("\n") - } - break - except sp.CalledProcessError as e: - logger.error("sacct process error") - logger.error(e) - except IndexError as e: - logger.error(e) - pass - # Try getting job with scontrol instead in case sacct is misconfigured - try: - sctrl_res = sp.check_output( - shlex.split(f"scontrol {cluster} -o show job {jobid}") - ) - m = re.search(r"JobState=(\w+)", sctrl_res.decode()) - res = {jobid: m.group(1)} - break - except sp.CalledProcessError as e: - logger.error("scontrol process error") - logger.error(e) - if i >= STATUS_ATTEMPTS - 1: - print("failed") - exit(0) - else: - time.sleep(1) - -status = res[jobid] - -if status == "BOOT_FAIL": - print("failed") -elif status == "OUT_OF_MEMORY": - print("failed") -elif status.startswith("CANCELLED"): - print("failed") -elif status == "COMPLETED": - print("success") -elif status == "DEADLINE": - print("failed") -elif status == "FAILED": - print("failed") -elif status == "NODE_FAIL": - print("failed") -elif status == "PREEMPTED": - print("failed") -elif status == "TIMEOUT": - print("failed") -elif status == "SUSPENDED": - print("running") -else: - print("running") diff --git a/effector/slurm-submit.py b/effector/slurm-submit.py deleted file mode 100644 index a8a780b11c45fd129f8d1345dc54ae0e3b0b8e0b..0000000000000000000000000000000000000000 --- a/effector/slurm-submit.py +++ /dev/null @@ -1,61 +0,0 @@ -#!/usr/bin/env python3 -""" -Snakemake SLURM submit script. -""" -from snakemake.utils import read_job_properties - -import slurm_utils -from CookieCutter import CookieCutter - -# cookiecutter arguments -SBATCH_DEFAULTS = CookieCutter.SBATCH_DEFAULTS -CLUSTER = CookieCutter.get_cluster_option() -CLUSTER_CONFIG = CookieCutter.CLUSTER_CONFIG -ADVANCED_ARGUMENT_CONVERSION = CookieCutter.get_advanced_argument_conversion() - -RESOURCE_MAPPING = { - "time": ("time", "runtime", "walltime"), - "mem": ("mem", "mem_mb", "ram", "memory"), - "mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"), - "nodes": ("nodes", "nnodes"), - "partition": ("partition", "queue"), -} - -# parse job -jobscript = slurm_utils.parse_jobscript() -job_properties = read_job_properties(jobscript) - -sbatch_options = {} -cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG) - -# 1) sbatch default arguments and cluster -sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS)) -sbatch_options.update(slurm_utils.parse_sbatch_defaults(CLUSTER)) - -# 2) cluster_config defaults -sbatch_options.update(cluster_config["__default__"]) - -# 3) Convert resources (no unit conversion!) and threads -sbatch_options.update( - slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING) -) - -# 4) cluster_config for particular rule -sbatch_options.update(cluster_config.get(job_properties.get("rule"), {})) - -# 5) cluster_config options -sbatch_options.update(job_properties.get("cluster", {})) - -# 6) Advanced conversion of parameters -if ADVANCED_ARGUMENT_CONVERSION: - sbatch_options = slurm_utils.advanced_argument_conversion(sbatch_options) - -# 7) Format pattern in snakemake style -sbatch_options = slurm_utils.format_values(sbatch_options, job_properties) - -# ensure sbatch output dirs exist -for o in ("output", "error"): - slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None - -# submit job and echo id back to Snakemake (must be the only stdout) -print(slurm_utils.submit_job(jobscript, **sbatch_options)) diff --git a/effector/slurm_utils.py b/effector/slurm_utils.py deleted file mode 100644 index d43c0703dad64faac4b5e70cbe3b83636c3aea39..0000000000000000000000000000000000000000 --- a/effector/slurm_utils.py +++ /dev/null @@ -1,345 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -from os.path import dirname -import re -import math -import argparse -import subprocess as sp -from io import StringIO - -from snakemake import io -from snakemake.io import Wildcards -from snakemake.utils import SequenceFormatter -from snakemake.utils import AlwaysQuotedFormatter -from snakemake.utils import QuotedFormatter -from snakemake.exceptions import WorkflowError -from snakemake.logging import logger - -from CookieCutter import CookieCutter - - -def _convert_units_to_mb(memory): - """If memory is specified with SI unit, convert to MB""" - if isinstance(memory, int) or isinstance(memory, float): - return int(memory) - siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6} - regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys()))) - m = regex.match(memory) - if m is None: - logger.error( - ( - f"unsupported memory specification '{memory}';" - " allowed suffixes: [K|M|G|T]" - ) - ) - sys.exit(1) - factor = siunits[m.group(2)] - return int(int(m.group(1)) * factor) - - -def parse_jobscript(): - """Minimal CLI to require/only accept single positional argument.""" - p = argparse.ArgumentParser(description="SLURM snakemake submit script") - p.add_argument("jobscript", help="Snakemake jobscript with job properties.") - return p.parse_args().jobscript - - -def parse_sbatch_defaults(parsed): - """Unpack SBATCH_DEFAULTS.""" - d = parsed.split() if type(parsed) == str else parsed - args = {} - for keyval in [a.split("=") for a in d]: - k = keyval[0].strip().strip("-") - v = keyval[1].strip() if len(keyval) == 2 else None - args[k] = v - return args - - -def load_cluster_config(path): - """Load config to dict - - Load configuration to dict either from absolute path or relative - to profile dir. - """ - if path: - path = os.path.join(dirname(__file__), os.path.expandvars(path)) - dcc = io.load_configfile(path) - else: - dcc = {} - if "__default__" not in dcc: - dcc["__default__"] = {} - return dcc - - -# adapted from format function in snakemake.utils -def format(_pattern, _quote_all=False, **kwargs): # noqa: A001 - """Format a pattern in Snakemake style. - This means that keywords embedded in braces are replaced by any variable - values that are available in the current namespace. - """ - fmt = SequenceFormatter(separator=" ") - if _quote_all: - fmt.element_formatter = AlwaysQuotedFormatter() - else: - fmt.element_formatter = QuotedFormatter() - try: - return fmt.format(_pattern, **kwargs) - except KeyError as ex: - raise NameError( - f"The name {ex} is unknown in this context. Please " - "make sure that you defined that variable. " - "Also note that braces not used for variable access " - "have to be escaped by repeating them " - ) - - -# adapted from Job.format_wildcards in snakemake.jobs -def format_wildcards(string, job_properties): - """ Format a string with variables from the job. """ - - class Job(object): - def __init__(self, job_properties): - for key in job_properties: - setattr(self, key, job_properties[key]) - - job = Job(job_properties) - if "params" in job_properties: - job._format_params = Wildcards(fromdict=job_properties["params"]) - else: - job._format_params = None - if "wildcards" in job_properties: - job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"]) - else: - job._format_wildcards = None - _variables = dict() - _variables.update( - dict(params=job._format_params, wildcards=job._format_wildcards) - ) - if hasattr(job, "rule"): - _variables.update(dict(rule=job.rule)) - try: - return format(string, **_variables) - except NameError as ex: - raise WorkflowError( - "NameError with group job {}: {}".format(job.jobid, str(ex)) - ) - except IndexError as ex: - raise WorkflowError( - "IndexError with group job {}: {}".format(job.jobid, str(ex)) - ) - - -# adapted from ClusterExecutor.cluster_params function in snakemake.executor -def format_values(dictionary, job_properties): - formatted = dictionary.copy() - for key, value in list(formatted.items()): - if key == "mem": - value = str(_convert_units_to_mb(value)) - if isinstance(value, str): - try: - formatted[key] = format_wildcards(value, job_properties) - except NameError as e: - msg = "Failed to format cluster config " "entry for job {}.".format( - job_properties["rule"] - ) - raise WorkflowError(msg, e) - return formatted - - -def convert_job_properties(job_properties, resource_mapping=None): - options = {} - if resource_mapping is None: - resource_mapping = {} - resources = job_properties.get("resources", {}) - for k, v in resource_mapping.items(): - options.update({k: resources[i] for i in v if i in resources}) - - if "threads" in job_properties: - options["cpus-per-task"] = job_properties["threads"] - return options - - -def ensure_dirs_exist(path): - """Ensure output folder for Slurm log files exist.""" - di = dirname(path) - if di == "": - return - if not os.path.exists(di): - os.makedirs(di, exist_ok=True) - return - - -def format_sbatch_options(**sbatch_options): - """Format sbatch options""" - options = [] - for k, v in sbatch_options.items(): - val = "" - if v is not None: - val = f"={v}" - options.append(f"--{k}{val}") - return options - - -def submit_job(jobscript, **sbatch_options): - """Submit jobscript and return jobid.""" - options = format_sbatch_options(**sbatch_options) - try: - cmd = ["sbatch"] + ["--parsable"] + options + [jobscript] - res = sp.check_output(cmd) - except sp.CalledProcessError as e: - raise e - # Get jobid - res = res.decode() - try: - jobid = re.search(r"(\d+)", res).group(1) - except Exception as e: - raise e - return jobid - - -def advanced_argument_conversion(arg_dict): - """Experimental adjustment of sbatch arguments to the given or default partition.""" - # Currently not adjusting for multiple node jobs - nodes = int(arg_dict.get("nodes", 1)) - if nodes > 1: - return arg_dict - partition = arg_dict.get("partition", None) or _get_default_partition() - constraint = arg_dict.get("constraint", None) - ncpus = int(arg_dict.get("cpus-per-task", 1)) - runtime = arg_dict.get("time", None) - memory = _convert_units_to_mb(arg_dict.get("mem", 0)) - config = _get_cluster_configuration(partition, constraint, memory) - mem = arg_dict.get("mem", ncpus * min(config["MEMORY_PER_CPU"])) - mem = _convert_units_to_mb(mem) - if mem > max(config["MEMORY"]): - logger.info( - f"requested memory ({mem}) > max memory ({max(config['MEMORY'])}); " - "adjusting memory settings" - ) - mem = max(config["MEMORY"]) - - # Calculate available memory as defined by the number of requested - # cpus times memory per cpu - AVAILABLE_MEM = ncpus * min(config["MEMORY_PER_CPU"]) - # Add additional cpus if memory is larger than AVAILABLE_MEM - if mem > AVAILABLE_MEM: - logger.info( - f"requested memory ({mem}) > " - f"ncpus x MEMORY_PER_CPU ({AVAILABLE_MEM}); " - "trying to adjust number of cpus up" - ) - ncpus = int(math.ceil(mem / min(config["MEMORY_PER_CPU"]))) - if ncpus > max(config["CPUS"]): - logger.info( - f"ncpus ({ncpus}) > available cpus ({max(config['CPUS'])}); " - "adjusting number of cpus down" - ) - ncpus = min(int(max(config["CPUS"])), ncpus) - adjusted_args = {"mem": int(mem), "cpus-per-task": ncpus} - - # Update time. If requested time is larger than maximum allowed time, reset - if runtime: - runtime = time_to_minutes(runtime) - time_limit = max(config["TIMELIMIT_MINUTES"]) - if runtime > time_limit: - logger.info( - f"time (runtime) > time limit {time_limit}; " "adjusting time down" - ) - adjusted_args["time"] = time_limit - - # update and return - arg_dict.update(adjusted_args) - return arg_dict - - -timeformats = [ - re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"), - re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$"), - re.compile(r"^(?P<days>\d+)-(?P<hours>\d+)$"), - re.compile(r"^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"), - re.compile(r"^(?P<minutes>\d+):(?P<seconds>\d+)$"), - re.compile(r"^(?P<minutes>\d+)$"), -] - - -def time_to_minutes(time): - """Convert time string to minutes. - - According to slurm: - - Acceptable time formats include "minutes", "minutes:seconds", - "hours:minutes:seconds", "days-hours", "days-hours:minutes" - and "days-hours:minutes:seconds". - - """ - if not isinstance(time, str): - time = str(time) - d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0} - regex = list(filter(lambda regex: regex.match(time) is not None, timeformats)) - if len(regex) == 0: - return - assert len(regex) == 1, "multiple time formats match" - m = regex[0].match(time) - d.update(m.groupdict()) - minutes = ( - int(d["days"]) * 24 * 60 - + int(d["hours"]) * 60 - + int(d["minutes"]) - + math.ceil(int(d["seconds"]) / 60) - ) - assert minutes > 0, "minutes has to be greater than 0" - return minutes - - -def _get_default_partition(): - """Retrieve default partition for cluster""" - cluster = CookieCutter.get_cluster_option() - cmd = f"sinfo -O partition {cluster}" - res = sp.check_output(cmd.split()) - m = re.search(r"(?P<partition>\S+)\*", res.decode(), re.M) - partition = m.group("partition") - return partition - - -def _get_cluster_configuration(partition, constraints=None, memory=0): - """Retrieve cluster configuration. - - Retrieve cluster configuration for a partition filtered by - constraints, memory and cpus - - """ - try: - import pandas as pd - except ImportError: - print( - "Error: currently advanced argument conversion " - "depends on 'pandas'.", file=sys.stderr - ) - sys.exit(1) - - if constraints: - constraint_set = set(constraints.split(",")) - cluster = CookieCutter.get_cluster_option() - cmd = f"sinfo -e -o %all -p {partition} {cluster}".split() - try: - output = sp.Popen(" ".join(cmd), shell=True, stdout=sp.PIPE).communicate() - except Exception as e: - print(e) - raise - data = re.sub("^CLUSTER:.+\n", "", re.sub(" \\|", "|", output[0].decode())) - df = pd.read_csv(StringIO(data), sep="|") - try: - df["TIMELIMIT_MINUTES"] = df["TIMELIMIT"].apply(time_to_minutes) - df["MEMORY_PER_CPU"] = df["MEMORY"] / df["CPUS"] - df["FEATURE_SET"] = df["AVAIL_FEATURES"].str.split(",").apply(set) - except Exception as e: - print(e) - raise - if constraints: - constraint_set = set(constraints.split(",")) - i = df["FEATURE_SET"].apply(lambda x: len(x.intersection(constraint_set)) > 0) - df = df.loc[i] - memory = min(_convert_units_to_mb(memory), max(df["MEMORY"])) - df = df.loc[df["MEMORY"] >= memory] - return df