-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrunutil.py
132 lines (95 loc) · 3.66 KB
/
runutil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
A collection of utilities for running commands and snakemake targets.
"""
import os
import subprocess
import sys
from eeepy import fileutil
def run_cmd(args, env=None):
"""
Run a command with the proper environment set.
:param args: A tuple of arguments starting with the command name.
:param env: A dictionary of environment variables for the process to run in. If `None`, the current environment is
used.
:return: The return code of the command.
"""
sys.stdout.flush()
p = subprocess.Popen(args, env=env)
p.wait()
return p.returncode
class SnakeRunner:
"""
Executes Snakemake targets.
"""
def __init__(self, snakefile='snakefile', params=None, env=None, snake_cmd='snakemake', timestamp=True, rerun=True):
"""
Initialize the snakemake runner.
:param snakefile: Snakemake file. Assumed to be an absolute path or relative to the current working
directory.
:param env: A dictionary of environment variables the snakemake process will be executed in. If `None`, the
current environment is used.
:param snake_cmd: The snakemake command to run. May be a full path to the snakemake executable.
:param timestamp: Log with timestamps.
:param rerun: Rerun incomplete targets.
"""
# Convert snakefile to a normalized absolute file name and raise IOError if it is not a regular file
snakefile = fileutil.make_abs(snakefile)
# Get environment snakemake will run in
if env is None:
env = os.environ.copy()
if params is None:
params = dict()
# Assign fields
self.snakefile = snakefile
self.params = params
self.env = env
self.snake_cmd = snake_cmd
self.timestamp = timestamp
self.rerun = rerun
def run(self, target, target_opts=None, params=None, dryrun=False):
# Initialize run command
snakemake_cmd = [
self.snake_cmd
]
# Set timestamp option
if self.timestamp:
snakemake_cmd.append('-T')
# Set rerun option
if self.rerun:
snakemake_cmd.append('--rerun-incomplete')
# Set dry-run option
if dryrun:
snakemake_cmd.append('--dryrun')
# Set snakefile and target
snakemake_cmd.extend((
'--snakefile',
self.snakefile,
target
))
# Set target options
if target_opts is not None:
snakemake_cmd.extend(target_opts)
# Set parameters
snakemake_cmd.extend([val for val in self._param_list_iter(params)])
# Run snakemake command
return run_cmd(snakemake_cmd, self.env)
def _param_list_iter(self, params):
"""
Return an iterator over parameters as "key=value" strings.
:param params: Run target parameters. These will be added to the objects parameters.
:return: An iterator over the parameters "key=value" strings.
"""
# Get a dictionary of parameters
run_params = self.params.copy()
if params is not None:
run_params.update(params)
for key in params:
yield print('{0}={1}'.format(key, params[key]))
def _get_param_list(self, params, delim=' '):
"""
Return a list of parameters as delimited list.
:param params: Run target parameters. These will be added to the objects parameters.
:param delim: Join parameter "key=value" strings on this string.
:return: A joined string of parameter "key=value" pairs.
"""
return delim.join(self._param_list_iter(params))