Skip to content
Snippets Groups Projects
Unverified Commit 2d66b735 authored by thdurand4's avatar thdurand4 Committed by GitHub
Browse files

Add files via upload

parent f7f9d527
No related branches found
No related tags found
No related merge requests found
#
# Based on lsf CookieCutter.py
#
import os
import json
d = os.path.dirname(__file__)
with open(os.path.join(d, "settings.json")) as fh:
settings = json.load(fh)
class CookieCutter:
SBATCH_DEFAULTS = settings['SBATCH_DEFAULTS']
CLUSTER_NAME = settings['CLUSTER_NAME']
CLUSTER_CONFIG = settings['CLUSTER_CONFIG']
ADVANCED_ARGUMENT_CONVERSION = settings['ADVANCED_ARGUMENT_CONVERSION']
@staticmethod
def get_cluster_option() -> str:
cluster = CookieCutter.CLUSTER_NAME
if cluster != "":
return f"--cluster={cluster}"
return ""
@staticmethod
def get_advanced_argument_conversion() -> bool:
val = {"yes": True, "no": False}[
CookieCutter.ADVANCED_ARGUMENT_CONVERSION
]
return val
File added
File added
__default__:
cpus-per-task : 4
mem-per-cpu : 10G
partition : long
output : 'slurm_logs/stdout/{rule}/{wildcards}.o'
error : 'slurm_logs/error/{rule}/{wildcards}.e'
job-name : '{rule}.{wildcards}'
rename_protein:
cpus-per-task: 1
partition: fast
phobius:
cpus-per-task: 5
mem-per-cpu: 10G
partition: fast
signalP:
cpus-per-task: 10
mem-per-cpu: 10G
partition: fast
targetp:
cpus-per-task: 10
mem-per-cpu: 10G
partition: fast
predgpi:
cpus-per-task: 10
mem-per-cpu: 10G
partition: fast
parse_phobius:
cpus-per-task: 1
partition: fast
parse_signalp:
cpus-per-task: 1
partition: fast
parse_targetp:
cpus-per-task: 1
partition: fast
parse_predgpi:
cpus-per-task: 1
partition: fast
intersect_tools:
cpus-per-task: 1
partition: fast
fasta_intersect:
cpus-per-task: 1
partition: fast
tmhmm:
cpus-per-task: 5
partition: fast
parse_tmhmm:
cpus-per-task: 1
partition: fast
tmhmm_fasta:
cpus-per-task: 1
partition: fast
wolfpsort:
cpus-per-task: 10
mem-per-cpu: 10G
partition: fast
parse_wolfpsort:
cpus-per-task: 1
partition: fast
id_tofasta_secreted:
cpus-per-task: 1
partition: fast
hmmer_pfam:
cpus-per-task: 8
mem-per-cpu: 5G
partition: fast
effectorP:
cpus-per-task: 10
mem-per-cpu: 10G
partition: fast
count_effector:
cpus-per-task: 1
partition: fast
restart-times: 0
jobscript: "slurm-jobscript.sh"
cluster: "slurm-submit.py"
cluster-status: "slurm-status.py"
max-jobs-per-second: 1
max-status-checks-per-second: 10
local-cores: 1
jobs: 200
latency-wait: 60000000
use-envmodules: true
use-singularity: false
rerun-incomplete: false
printshellcmds: true
# Example resource configuration
# default-resources:
# - runtime=100
# - mem_mb=6000
# - disk_mb=1000000
# # set-threads: map rule names to threads
# set-threads:
# - single_core_rule=1
# - multi_core_rule=10
# # set-resources: map rule names to resources in general
# set-resources:
# - high_memory_rule:mem_mb=12000
# - long_running_rule:runtime=1200
{
"SBATCH_DEFAULTS": "--export=ALL",
"CLUSTER_NAME": "",
"CLUSTER_CONFIG": "/shared/home/tdurand/effector/cluster_config_SLURM.yaml",
"ADVANCED_ARGUMENT_CONVERSION": "no"
}
#!/bin/bash
# properties = {properties}
{exec_job}
#!/usr/bin/env python3
import re
import subprocess as sp
import shlex
import sys
import time
import logging
from CookieCutter import CookieCutter
logger = logging.getLogger("__name__")
STATUS_ATTEMPTS = 20
jobid = sys.argv[1]
cluster = CookieCutter.get_cluster_option()
for i in range(STATUS_ATTEMPTS):
try:
sacct_res = sp.check_output(shlex.split(f"sacct {cluster} -P -b -j {jobid} -n"))
res = {
x.split("|")[0]: x.split("|")[1]
for x in sacct_res.decode().strip().split("\n")
}
break
except sp.CalledProcessError as e:
logger.error("sacct process error")
logger.error(e)
except IndexError as e:
logger.error(e)
pass
# Try getting job with scontrol instead in case sacct is misconfigured
try:
sctrl_res = sp.check_output(
shlex.split(f"scontrol {cluster} -o show job {jobid}")
)
m = re.search(r"JobState=(\w+)", sctrl_res.decode())
res = {jobid: m.group(1)}
break
except sp.CalledProcessError as e:
logger.error("scontrol process error")
logger.error(e)
if i >= STATUS_ATTEMPTS - 1:
print("failed")
exit(0)
else:
time.sleep(1)
status = res[jobid]
if status == "BOOT_FAIL":
print("failed")
elif status == "OUT_OF_MEMORY":
print("failed")
elif status.startswith("CANCELLED"):
print("failed")
elif status == "COMPLETED":
print("success")
elif status == "DEADLINE":
print("failed")
elif status == "FAILED":
print("failed")
elif status == "NODE_FAIL":
print("failed")
elif status == "PREEMPTED":
print("failed")
elif status == "TIMEOUT":
print("failed")
elif status == "SUSPENDED":
print("running")
else:
print("running")
#!/usr/bin/env python3
"""
Snakemake SLURM submit script.
"""
from snakemake.utils import read_job_properties
import slurm_utils
from CookieCutter import CookieCutter
# cookiecutter arguments
SBATCH_DEFAULTS = CookieCutter.SBATCH_DEFAULTS
CLUSTER = CookieCutter.get_cluster_option()
CLUSTER_CONFIG = CookieCutter.CLUSTER_CONFIG
ADVANCED_ARGUMENT_CONVERSION = CookieCutter.get_advanced_argument_conversion()
RESOURCE_MAPPING = {
"time": ("time", "runtime", "walltime"),
"mem": ("mem", "mem_mb", "ram", "memory"),
"mem-per-cpu": ("mem-per-cpu", "mem_per_cpu", "mem_per_thread"),
"nodes": ("nodes", "nnodes"),
"partition": ("partition", "queue"),
}
# parse job
jobscript = slurm_utils.parse_jobscript()
job_properties = read_job_properties(jobscript)
sbatch_options = {}
cluster_config = slurm_utils.load_cluster_config(CLUSTER_CONFIG)
# 1) sbatch default arguments and cluster
sbatch_options.update(slurm_utils.parse_sbatch_defaults(SBATCH_DEFAULTS))
sbatch_options.update(slurm_utils.parse_sbatch_defaults(CLUSTER))
# 2) cluster_config defaults
sbatch_options.update(cluster_config["__default__"])
# 3) Convert resources (no unit conversion!) and threads
sbatch_options.update(
slurm_utils.convert_job_properties(job_properties, RESOURCE_MAPPING)
)
# 4) cluster_config for particular rule
sbatch_options.update(cluster_config.get(job_properties.get("rule"), {}))
# 5) cluster_config options
sbatch_options.update(job_properties.get("cluster", {}))
# 6) Advanced conversion of parameters
if ADVANCED_ARGUMENT_CONVERSION:
sbatch_options = slurm_utils.advanced_argument_conversion(sbatch_options)
# 7) Format pattern in snakemake style
sbatch_options = slurm_utils.format_values(sbatch_options, job_properties)
# ensure sbatch output dirs exist
for o in ("output", "error"):
slurm_utils.ensure_dirs_exist(sbatch_options[o]) if o in sbatch_options else None
# submit job and echo id back to Snakemake (must be the only stdout)
print(slurm_utils.submit_job(jobscript, **sbatch_options))
#!/usr/bin/env python3
import os
import sys
from os.path import dirname
import re
import math
import argparse
import subprocess as sp
from io import StringIO
from snakemake import io
from snakemake.io import Wildcards
from snakemake.utils import SequenceFormatter
from snakemake.utils import AlwaysQuotedFormatter
from snakemake.utils import QuotedFormatter
from snakemake.exceptions import WorkflowError
from snakemake.logging import logger
from CookieCutter import CookieCutter
def _convert_units_to_mb(memory):
"""If memory is specified with SI unit, convert to MB"""
if isinstance(memory, int) or isinstance(memory, float):
return int(memory)
siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6}
regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys())))
m = regex.match(memory)
if m is None:
logger.error(
(
f"unsupported memory specification '{memory}';"
" allowed suffixes: [K|M|G|T]"
)
)
sys.exit(1)
factor = siunits[m.group(2)]
return int(int(m.group(1)) * factor)
def parse_jobscript():
"""Minimal CLI to require/only accept single positional argument."""
p = argparse.ArgumentParser(description="SLURM snakemake submit script")
p.add_argument("jobscript", help="Snakemake jobscript with job properties.")
return p.parse_args().jobscript
def parse_sbatch_defaults(parsed):
"""Unpack SBATCH_DEFAULTS."""
d = parsed.split() if type(parsed) == str else parsed
args = {}
for keyval in [a.split("=") for a in d]:
k = keyval[0].strip().strip("-")
v = keyval[1].strip() if len(keyval) == 2 else None
args[k] = v
return args
def load_cluster_config(path):
"""Load config to dict
Load configuration to dict either from absolute path or relative
to profile dir.
"""
if path:
path = os.path.join(dirname(__file__), os.path.expandvars(path))
dcc = io.load_configfile(path)
else:
dcc = {}
if "__default__" not in dcc:
dcc["__default__"] = {}
return dcc
# adapted from format function in snakemake.utils
def format(_pattern, _quote_all=False, **kwargs): # noqa: A001
"""Format a pattern in Snakemake style.
This means that keywords embedded in braces are replaced by any variable
values that are available in the current namespace.
"""
fmt = SequenceFormatter(separator=" ")
if _quote_all:
fmt.element_formatter = AlwaysQuotedFormatter()
else:
fmt.element_formatter = QuotedFormatter()
try:
return fmt.format(_pattern, **kwargs)
except KeyError as ex:
raise NameError(
f"The name {ex} is unknown in this context. Please "
"make sure that you defined that variable. "
"Also note that braces not used for variable access "
"have to be escaped by repeating them "
)
# adapted from Job.format_wildcards in snakemake.jobs
def format_wildcards(string, job_properties):
""" Format a string with variables from the job. """
class Job(object):
def __init__(self, job_properties):
for key in job_properties:
setattr(self, key, job_properties[key])
job = Job(job_properties)
if "params" in job_properties:
job._format_params = Wildcards(fromdict=job_properties["params"])
else:
job._format_params = None
if "wildcards" in job_properties:
job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"])
else:
job._format_wildcards = None
_variables = dict()
_variables.update(
dict(params=job._format_params, wildcards=job._format_wildcards)
)
if hasattr(job, "rule"):
_variables.update(dict(rule=job.rule))
try:
return format(string, **_variables)
except NameError as ex:
raise WorkflowError(
"NameError with group job {}: {}".format(job.jobid, str(ex))
)
except IndexError as ex:
raise WorkflowError(
"IndexError with group job {}: {}".format(job.jobid, str(ex))
)
# adapted from ClusterExecutor.cluster_params function in snakemake.executor
def format_values(dictionary, job_properties):
formatted = dictionary.copy()
for key, value in list(formatted.items()):
if key == "mem":
value = str(_convert_units_to_mb(value))
if isinstance(value, str):
try:
formatted[key] = format_wildcards(value, job_properties)
except NameError as e:
msg = "Failed to format cluster config " "entry for job {}.".format(
job_properties["rule"]
)
raise WorkflowError(msg, e)
return formatted
def convert_job_properties(job_properties, resource_mapping=None):
options = {}
if resource_mapping is None:
resource_mapping = {}
resources = job_properties.get("resources", {})
for k, v in resource_mapping.items():
options.update({k: resources[i] for i in v if i in resources})
if "threads" in job_properties:
options["cpus-per-task"] = job_properties["threads"]
return options
def ensure_dirs_exist(path):
"""Ensure output folder for Slurm log files exist."""
di = dirname(path)
if di == "":
return
if not os.path.exists(di):
os.makedirs(di, exist_ok=True)
return
def format_sbatch_options(**sbatch_options):
"""Format sbatch options"""
options = []
for k, v in sbatch_options.items():
val = ""
if v is not None:
val = f"={v}"
options.append(f"--{k}{val}")
return options
def submit_job(jobscript, **sbatch_options):
"""Submit jobscript and return jobid."""
options = format_sbatch_options(**sbatch_options)
try:
cmd = ["sbatch"] + ["--parsable"] + options + [jobscript]
res = sp.check_output(cmd)
except sp.CalledProcessError as e:
raise e
# Get jobid
res = res.decode()
try:
jobid = re.search(r"(\d+)", res).group(1)
except Exception as e:
raise e
return jobid
def advanced_argument_conversion(arg_dict):
"""Experimental adjustment of sbatch arguments to the given or default partition."""
# Currently not adjusting for multiple node jobs
nodes = int(arg_dict.get("nodes", 1))
if nodes > 1:
return arg_dict
partition = arg_dict.get("partition", None) or _get_default_partition()
constraint = arg_dict.get("constraint", None)
ncpus = int(arg_dict.get("cpus-per-task", 1))
runtime = arg_dict.get("time", None)
memory = _convert_units_to_mb(arg_dict.get("mem", 0))
config = _get_cluster_configuration(partition, constraint, memory)
mem = arg_dict.get("mem", ncpus * min(config["MEMORY_PER_CPU"]))
mem = _convert_units_to_mb(mem)
if mem > max(config["MEMORY"]):
logger.info(
f"requested memory ({mem}) > max memory ({max(config['MEMORY'])}); "
"adjusting memory settings"
)
mem = max(config["MEMORY"])
# Calculate available memory as defined by the number of requested
# cpus times memory per cpu
AVAILABLE_MEM = ncpus * min(config["MEMORY_PER_CPU"])
# Add additional cpus if memory is larger than AVAILABLE_MEM
if mem > AVAILABLE_MEM:
logger.info(
f"requested memory ({mem}) > "
f"ncpus x MEMORY_PER_CPU ({AVAILABLE_MEM}); "
"trying to adjust number of cpus up"
)
ncpus = int(math.ceil(mem / min(config["MEMORY_PER_CPU"])))
if ncpus > max(config["CPUS"]):
logger.info(
f"ncpus ({ncpus}) > available cpus ({max(config['CPUS'])}); "
"adjusting number of cpus down"
)
ncpus = min(int(max(config["CPUS"])), ncpus)
adjusted_args = {"mem": int(mem), "cpus-per-task": ncpus}
# Update time. If requested time is larger than maximum allowed time, reset
if runtime:
runtime = time_to_minutes(runtime)
time_limit = max(config["TIMELIMIT_MINUTES"])
if runtime > time_limit:
logger.info(
f"time (runtime) > time limit {time_limit}; " "adjusting time down"
)
adjusted_args["time"] = time_limit
# update and return
arg_dict.update(adjusted_args)
return arg_dict
timeformats = [
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$"),
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+)$"),
re.compile(r"^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<minutes>\d+)$"),
]
def time_to_minutes(time):
"""Convert time string to minutes.
According to slurm:
Acceptable time formats include "minutes", "minutes:seconds",
"hours:minutes:seconds", "days-hours", "days-hours:minutes"
and "days-hours:minutes:seconds".
"""
if not isinstance(time, str):
time = str(time)
d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0}
regex = list(filter(lambda regex: regex.match(time) is not None, timeformats))
if len(regex) == 0:
return
assert len(regex) == 1, "multiple time formats match"
m = regex[0].match(time)
d.update(m.groupdict())
minutes = (
int(d["days"]) * 24 * 60
+ int(d["hours"]) * 60
+ int(d["minutes"])
+ math.ceil(int(d["seconds"]) / 60)
)
assert minutes > 0, "minutes has to be greater than 0"
return minutes
def _get_default_partition():
"""Retrieve default partition for cluster"""
cluster = CookieCutter.get_cluster_option()
cmd = f"sinfo -O partition {cluster}"
res = sp.check_output(cmd.split())
m = re.search(r"(?P<partition>\S+)\*", res.decode(), re.M)
partition = m.group("partition")
return partition
def _get_cluster_configuration(partition, constraints=None, memory=0):
"""Retrieve cluster configuration.
Retrieve cluster configuration for a partition filtered by
constraints, memory and cpus
"""
try:
import pandas as pd
except ImportError:
print(
"Error: currently advanced argument conversion "
"depends on 'pandas'.", file=sys.stderr
)
sys.exit(1)
if constraints:
constraint_set = set(constraints.split(","))
cluster = CookieCutter.get_cluster_option()
cmd = f"sinfo -e -o %all -p {partition} {cluster}".split()
try:
output = sp.Popen(" ".join(cmd), shell=True, stdout=sp.PIPE).communicate()
except Exception as e:
print(e)
raise
data = re.sub("^CLUSTER:.+\n", "", re.sub(" \\|", "|", output[0].decode()))
df = pd.read_csv(StringIO(data), sep="|")
try:
df["TIMELIMIT_MINUTES"] = df["TIMELIMIT"].apply(time_to_minutes)
df["MEMORY_PER_CPU"] = df["MEMORY"] / df["CPUS"]
df["FEATURE_SET"] = df["AVAIL_FEATURES"].str.split(",").apply(set)
except Exception as e:
print(e)
raise
if constraints:
constraint_set = set(constraints.split(","))
i = df["FEATURE_SET"].apply(lambda x: len(x.intersection(constraint_set)) > 0)
df = df.loc[i]
memory = min(_convert_units_to_mb(memory), max(df["MEMORY"]))
df = df.loc[df["MEMORY"] >= memory]
return df
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment