"git@forge.ird.fr:diade/frangipane.git" did not exist on "467bb5d60827363808cdcf95c7032a126e28c971"
Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
#!/usr/bin/env python3
import os
import sys
from os.path import dirname
import re
import math
import argparse
import subprocess as sp
from io import StringIO
from snakemake import io
from snakemake.io import Wildcards
from snakemake.utils import SequenceFormatter
from snakemake.utils import AlwaysQuotedFormatter
from snakemake.utils import QuotedFormatter
from snakemake.exceptions import WorkflowError
from snakemake.logging import logger
from CookieCutter import CookieCutter
def _convert_units_to_mb(memory):
"""If memory is specified with SI unit, convert to MB"""
if isinstance(memory, int) or isinstance(memory, float):
return int(memory)
siunits = {"K": 1e-3, "M": 1, "G": 1e3, "T": 1e6}
regex = re.compile(r"(\d+)({})$".format("|".join(siunits.keys())))
m = regex.match(memory)
if m is None:
logger.error(
(
f"unsupported memory specification '{memory}';"
" allowed suffixes: [K|M|G|T]"
)
)
sys.exit(1)
factor = siunits[m.group(2)]
return int(int(m.group(1)) * factor)
def parse_jobscript():
"""Minimal CLI to require/only accept single positional argument."""
p = argparse.ArgumentParser(description="SLURM snakemake submit script")
p.add_argument("jobscript", help="Snakemake jobscript with job properties.")
return p.parse_args().jobscript
def parse_sbatch_defaults(parsed):
"""Unpack SBATCH_DEFAULTS."""
d = parsed.split() if type(parsed) == str else parsed
args = {}
for keyval in [a.split("=") for a in d]:
k = keyval[0].strip().strip("-")
v = keyval[1].strip() if len(keyval) == 2 else None
args[k] = v
return args
def load_cluster_config(path):
"""Load config to dict
Load configuration to dict either from absolute path or relative
to profile dir.
"""
if path:
path = os.path.join(dirname(__file__), os.path.expandvars(path))
dcc = io.load_configfile(path)
else:
dcc = {}
if "__default__" not in dcc:
dcc["__default__"] = {}
return dcc
# adapted from format function in snakemake.utils
def format(_pattern, _quote_all=False, **kwargs): # noqa: A001
"""Format a pattern in Snakemake style.
This means that keywords embedded in braces are replaced by any variable
values that are available in the current namespace.
"""
fmt = SequenceFormatter(separator=" ")
if _quote_all:
fmt.element_formatter = AlwaysQuotedFormatter()
else:
fmt.element_formatter = QuotedFormatter()
try:
return fmt.format(_pattern, **kwargs)
except KeyError as ex:
raise NameError(
f"The name {ex} is unknown in this context. Please "
"make sure that you defined that variable. "
"Also note that braces not used for variable access "
"have to be escaped by repeating them "
)
# adapted from Job.format_wildcards in snakemake.jobs
def format_wildcards(string, job_properties):
""" Format a string with variables from the job. """
class Job(object):
def __init__(self, job_properties):
for key in job_properties:
setattr(self, key, job_properties[key])
job = Job(job_properties)
if "params" in job_properties:
job._format_params = Wildcards(fromdict=job_properties["params"])
else:
job._format_params = None
if "wildcards" in job_properties:
job._format_wildcards = Wildcards(fromdict=job_properties["wildcards"])
else:
job._format_wildcards = None
_variables = dict()
_variables.update(
dict(params=job._format_params, wildcards=job._format_wildcards)
)
if hasattr(job, "rule"):
_variables.update(dict(rule=job.rule))
try:
return format(string, **_variables)
except NameError as ex:
raise WorkflowError(
"NameError with group job {}: {}".format(job.jobid, str(ex))
)
except IndexError as ex:
raise WorkflowError(
"IndexError with group job {}: {}".format(job.jobid, str(ex))
)
# adapted from ClusterExecutor.cluster_params function in snakemake.executor
def format_values(dictionary, job_properties):
formatted = dictionary.copy()
for key, value in list(formatted.items()):
if key == "mem":
value = str(_convert_units_to_mb(value))
if isinstance(value, str):
try:
formatted[key] = format_wildcards(value, job_properties)
except NameError as e:
msg = "Failed to format cluster config " "entry for job {}.".format(
job_properties["rule"]
)
raise WorkflowError(msg, e)
return formatted
def convert_job_properties(job_properties, resource_mapping=None):
options = {}
if resource_mapping is None:
resource_mapping = {}
resources = job_properties.get("resources", {})
for k, v in resource_mapping.items():
options.update({k: resources[i] for i in v if i in resources})
if "threads" in job_properties:
options["cpus-per-task"] = job_properties["threads"]
return options
def ensure_dirs_exist(path):
"""Ensure output folder for Slurm log files exist."""
di = dirname(path)
if di == "":
return
if not os.path.exists(di):
os.makedirs(di, exist_ok=True)
return
def format_sbatch_options(**sbatch_options):
"""Format sbatch options"""
options = []
for k, v in sbatch_options.items():
val = ""
if v is not None:
val = f"={v}"
options.append(f"--{k}{val}")
return options
def submit_job(jobscript, **sbatch_options):
"""Submit jobscript and return jobid."""
options = format_sbatch_options(**sbatch_options)
try:
cmd = ["sbatch"] + ["--parsable"] + options + [jobscript]
res = sp.check_output(cmd)
except sp.CalledProcessError as e:
raise e
# Get jobid
res = res.decode()
try:
jobid = re.search(r"(\d+)", res).group(1)
except Exception as e:
raise e
return jobid
def advanced_argument_conversion(arg_dict):
"""Experimental adjustment of sbatch arguments to the given or default partition."""
# Currently not adjusting for multiple node jobs
nodes = int(arg_dict.get("nodes", 1))
if nodes > 1:
return arg_dict
partition = arg_dict.get("partition", None) or _get_default_partition()
constraint = arg_dict.get("constraint", None)
ncpus = int(arg_dict.get("cpus-per-task", 1))
runtime = arg_dict.get("time", None)
memory = _convert_units_to_mb(arg_dict.get("mem", 0))
config = _get_cluster_configuration(partition, constraint, memory)
mem = arg_dict.get("mem", ncpus * min(config["MEMORY_PER_CPU"]))
mem = _convert_units_to_mb(mem)
if mem > max(config["MEMORY"]):
logger.info(
f"requested memory ({mem}) > max memory ({max(config['MEMORY'])}); "
"adjusting memory settings"
)
mem = max(config["MEMORY"])
# Calculate available memory as defined by the number of requested
# cpus times memory per cpu
AVAILABLE_MEM = ncpus * min(config["MEMORY_PER_CPU"])
# Add additional cpus if memory is larger than AVAILABLE_MEM
if mem > AVAILABLE_MEM:
logger.info(
f"requested memory ({mem}) > "
f"ncpus x MEMORY_PER_CPU ({AVAILABLE_MEM}); "
"trying to adjust number of cpus up"
)
ncpus = int(math.ceil(mem / min(config["MEMORY_PER_CPU"])))
if ncpus > max(config["CPUS"]):
logger.info(
f"ncpus ({ncpus}) > available cpus ({max(config['CPUS'])}); "
"adjusting number of cpus down"
)
ncpus = min(int(max(config["CPUS"])), ncpus)
adjusted_args = {"mem": int(mem), "cpus-per-task": ncpus}
# Update time. If requested time is larger than maximum allowed time, reset
if runtime:
runtime = time_to_minutes(runtime)
time_limit = max(config["TIMELIMIT_MINUTES"])
if runtime > time_limit:
logger.info(
f"time (runtime) > time limit {time_limit}; " "adjusting time down"
)
adjusted_args["time"] = time_limit
# update and return
arg_dict.update(adjusted_args)
return arg_dict
timeformats = [
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+):(?P<minutes>\d+)$"),
re.compile(r"^(?P<days>\d+)-(?P<hours>\d+)$"),
re.compile(r"^(?P<hours>\d+):(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<minutes>\d+):(?P<seconds>\d+)$"),
re.compile(r"^(?P<minutes>\d+)$"),
]
def time_to_minutes(time):
"""Convert time string to minutes.
According to slurm:
Acceptable time formats include "minutes", "minutes:seconds",
"hours:minutes:seconds", "days-hours", "days-hours:minutes"
and "days-hours:minutes:seconds".
"""
if not isinstance(time, str):
time = str(time)
d = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0}
regex = list(filter(lambda regex: regex.match(time) is not None, timeformats))
if len(regex) == 0:
return
assert len(regex) == 1, "multiple time formats match"
m = regex[0].match(time)
d.update(m.groupdict())
minutes = (
int(d["days"]) * 24 * 60
+ int(d["hours"]) * 60
+ int(d["minutes"])
+ math.ceil(int(d["seconds"]) / 60)
)
assert minutes > 0, "minutes has to be greater than 0"
return minutes
def _get_default_partition():
"""Retrieve default partition for cluster"""
cluster = CookieCutter.get_cluster_option()
cmd = f"sinfo -O partition {cluster}"
res = sp.check_output(cmd.split())
m = re.search(r"(?P<partition>\S+)\*", res.decode(), re.M)
partition = m.group("partition")
return partition
def _get_cluster_configuration(partition, constraints=None, memory=0):
"""Retrieve cluster configuration.
Retrieve cluster configuration for a partition filtered by
constraints, memory and cpus
"""
try:
import pandas as pd
except ImportError:
print(
"Error: currently advanced argument conversion "
"depends on 'pandas'.", file=sys.stderr
)
sys.exit(1)
if constraints:
constraint_set = set(constraints.split(","))
cluster = CookieCutter.get_cluster_option()
cmd = f"sinfo -e -o %all -p {partition} {cluster}".split()
try:
output = sp.Popen(" ".join(cmd), shell=True, stdout=sp.PIPE).communicate()
except Exception as e:
print(e)
raise
data = re.sub("^CLUSTER:.+\n", "", re.sub(" \\|", "|", output[0].decode()))
df = pd.read_csv(StringIO(data), sep="|")
try:
df["TIMELIMIT_MINUTES"] = df["TIMELIMIT"].apply(time_to_minutes)
df["MEMORY_PER_CPU"] = df["MEMORY"] / df["CPUS"]
df["FEATURE_SET"] = df["AVAIL_FEATURES"].str.split(",").apply(set)
except Exception as e:
print(e)
raise
if constraints:
constraint_set = set(constraints.split(","))
i = df["FEATURE_SET"].apply(lambda x: len(x.intersection(constraint_set)) > 0)
df = df.loc[i]
memory = min(_convert_units_to_mb(memory), max(df["MEMORY"]))
df = df.loc[df["MEMORY"] >= memory]
return df