forked from ropensci-books/targets
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhpc.Rmd
300 lines (231 loc) · 14.7 KB
/
hpc.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
# High-performance computing {#hpc}
```{r, message = FALSE, warning = FALSE, echo = FALSE}
knitr::opts_knit$set(root.dir = fs::dir_create(tempfile()))
knitr::opts_chunk$set(collapse = TRUE, comment = "#>", eval = TRUE)
```
```{r, message = FALSE, warning = FALSE, echo = FALSE, eval = TRUE}
library(targets)
```
`targets` supports high-performance computing with the `tar_make_clustermq()` and `tar_make_future()` functions. These functions are like `tar_make()`, but they allow multiple targets to run simultaneously over parallel workers. These workers can be processes on your local machine, or they can be jobs on a computing cluster. The main process automatically sends a target to a worker as soon as
1. The worker is available, and
1. All the target's upstream dependency targets have been checked or built.
Practical real-world examples of high-performance computing in `targets` can be found at the examples [linked from here](https://docs.ropensci.org/targets/index.html#examples). But for the purposes of explaining the mechanics of the package, consider the following sketch of a pipeline.
```{r, echo = FALSE, eval = TRUE}
tar_script({
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
})
```
```{r, eval = FALSE}
# _targets.R
library(targets)
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
```{r, eval = TRUE}
# R console
tar_visnetwork()
```
When we run this pipeline with high-performance computing, `targets` automatically knows to wait for `data` to finish running before moving on to the other targets. Once `data` is finished, it moves on to targets `fast_fit` and `slow_fit`. If `fast_fit` finishes before `slow_fit`, target `plot_1` begins even as `slow_fit` is still running. Unlike [`drake`](https://github.com/ropensci/drake), `targets` applies this behavior not only to stem targets, but also to branches of patterns.
The following sections cover the mechanics and configuration details of high-performance computing in `targets`.
## Clustermq
`tar_make_clustermq()` uses persistent workers. That means all the parallel processes launch together as soon as there is a target to build, and all the processes keep running until the pipeline winds down. The video clip below visualizes the concept.
<script src="https://fast.wistia.com/embed/medias/ycczhxwkjw.jsonp" async></script><script src="https://fast.wistia.com/assets/external/E-v1.js" async></script><div class="wistia_responsive_padding" style="padding:56.21% 0 0 0;position:relative;"><div class="wistia_responsive_wrapper" style="height:100%;left:0;position:absolute;top:0;width:100%;"><div class="wistia_embed wistia_async_ycczhxwkjw videoFoam=true" style="height:100%;position:relative;width:100%"><div class="wistia_swatch" style="height:100%;left:0;opacity:0;overflow:hidden;position:absolute;top:0;transition:opacity 200ms;width:100%;"><img src="https://fast.wistia.com/embed/medias/ycczhxwkjw/swatch" style="filter:blur(5px);height:100%;object-fit:contain;width:100%;" alt="" onload="this.parentNode.style.opacity=1;" /></div></div></div></div>
### Clustermq installation
Persistent workers require the [`clustermq`](https://github.com/mschubert/clustermq) R package, which in turn requires [ZeroMQ](http://zeromq.org/). Please refer to the [`clustermq` installation guide](https://github.com/mschubert/clustermq/blob/main/README.md#installation) for specific instructions.
### Clustermq locally
When you write your `_targets.R` script, be sure to set the `clustermq.scheduler` global option to a a local scheduler like `"multiprocess"`. Many of the supported schedulers and their configuration details are [listed here](https://mschubert.github.io/clustermq/articles/userguide.html#configuration).
```{r, eval = FALSE}
# _targets.R
options(clustermq.scheduler = "multiprocess")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Then, run `tar_make_clustermq()` with the appropriate number of workers.
```{r, eval = FALSE}
# R console
tar_make_clustermq(workers = 2)
```
### Clustermq remotely
For parallel computing on a cluster,
1. Choose a [scheduler listed here](https://mschubert.github.io/clustermq/articles/userguide.html#configuration) that corresponds to your cluster's resource manager.
1. Create a [template file](https://github.com/mschubert/clustermq/tree/main/inst) that configures the computing requirements and other settings for the cluster.
Supply the scheduler option and template file to the `clustermq.scheduler` and `clustermq.template` global options in your `_targets.R` file.
```{r, eval = FALSE}
# _targets.R
options(clustermq.scheduler = "sge", clustermq.template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Above, `sge_tmpl` refers to a template file like the one below.
```
## From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }} # Worker name.
#$ -t 1-{{ n_jobs }} # Submit workers as an array.
#$ -j y # Combine stdout and stderr into one worker log file.
#$ -o /dev/null # Worker log files.
#$ -cwd # Use project root as working directory.
#$ -V # Use environment variables.
module load R/3.6.3 # Needed if R is an environment module on the cluster.
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")' # Leave alone.
```
Then, run `tar_make_clustermq()` as before.
```{r, eval = FALSE}
# R console
tar_make_clustermq(workers = 2)
```
See the examples [linked from here](https://docs.ropensci.org/targets/index.html#examples) to see how this setup works in real-world projects.
### Clustermq configuration
In addition to configuration options hard-coded in the template file, you can supply custom computing resources with the `resources` argument of `tar_option_set()`. As an example, let's use a wildcard for the number of cores per worker on an SGE cluster. In the template file, supply `{{ num_cores }}` wildcard to the `-pe smp` flag.
```
#$ -pe smp {{ num_cores }} # Number of cores per worker
#$ -N {{ job_name | 1 }}
#$ -t 1-{{ n_jobs }}
#$ -j y
#$ -o /dev/null
#$ -cwd
#$ -V
module load R/3.6.3
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")'
```
Then, supply the value of `num_cores` to the `resources` option from within `_targets.R`.
```{r, eval = FALSE}
# _targets.R
tar_option_set(resources = list(num_cores = 2))
list(
tar_target(...),
... # more targets
)
```
Finally, call `tar_make_clustermq()` normally.
```{r, eval = FALSE}
# R console
tar_make_clustermq(workers = 2)
```
This particular use case comes up when you have custom parallel computing within targets and need to take advantage of multiple cores.
## Future
`tar_make_future()` runs transient workers. That means each target gets its own worker which initializes when the target begins and terminates when the target ends. The following video clip demonstrates the concept.
<script src="https://fast.wistia.com/embed/medias/340yvlp515.jsonp" async></script><script src="https://fast.wistia.com/assets/external/E-v1.js" async></script><div class="wistia_responsive_padding" style="padding:56.21% 0 0 0;position:relative;"><div class="wistia_responsive_wrapper" style="height:100%;left:0;position:absolute;top:0;width:100%;"><div class="wistia_embed wistia_async_340yvlp515 videoFoam=true" style="height:100%;position:relative;width:100%"><div class="wistia_swatch" style="height:100%;left:0;opacity:0;overflow:hidden;position:absolute;top:0;transition:opacity 200ms;width:100%;"><img src="https://fast.wistia.com/embed/medias/340yvlp515/swatch" style="filter:blur(5px);height:100%;object-fit:contain;width:100%;" alt="" onload="this.parentNode.style.opacity=1;" /></div></div></div></div><br>
### Future installation
Install the [`future`](https://github.com/HenrikBengtsson/future) package.
```{r, eval = FALSE}
install.packages("future")
```
If you intend to use a cluster, be sure to install the [`future.batchtools`](https://github.com/HenrikBengtsson/future.batchtools) package too.
```{r, eval = FALSE}
install.packages("future.batchtools")
```
The [`future`](https://github.com/HenrikBengtsson/future) ecosystem contains even more packages that extend [`future`](https://github.com/HenrikBengtsson/future)'s parallel computing functionality, such as [`future.callr`](https://github.com/HenrikBengtsson/future.callr).
```{r, eval = FALSE}
install.packages("future.callr")
```
### Future locally
To parallelize targets over multiple processes on your local machine, declare a `future` plan in your `_targets.R` file. The `callr` plan from the [`future.callr`](https://future.callr.futureverse.org) package is recommended.^[Some alternative local future plans are [listed here](https://github.com/HenrikBengtsson/future/#controlling-how-futures-are-resolved).]
```{r, eval = FALSE}
# _targets.R
library(future)
library(future.callr)
plan(callr)
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Then, run `tar_make_future()` with the desired number of workers. Here, the `workers` argument specifies the maximum number of transient workers to allow at a given time. Some `future` plans also have optional `workers` arguments that set their own caps.
```{r, eval = FALSE}
# R console
tar_make_future(workers = 2)
```
### Future remotely
To run transient workers on a cluster, first install the [`future.batchtools`](https://github.com/HenrikBengtsson/future.batchtools) package. Then, set one of [these plans](https://github.com/HenrikBengtsson/future.batchtools#choosing-batchtools-backend) in your `_targets.R` file.
```{r, eval = FALSE}
# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Here, our template file `sge.tmpl` is configured for `batchtools`.
```
#!/bin/bash
#$ -cwd # Run in the current working directory.
#$ -j y # Direct stdout and stderr to the same file.
#$ -o <%= log.file %> # log file
#$ -V # Use environment variables.
#$ -N <%= job.name %> # job name
module load R/3.6.3 # Uncomment and adjust if R is an environment module.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")' # Leave alone.
exit 0 # Leave alone.
```
### Future configuration
The `tar_target()`, `tar_target_raw()`, and `tar_option_set()` functions accept a `resources` argument. For example, if our `batchtools` template file has a wildcard for the number of cores for a job,
```
#!/bin/bash
#$ -pe smp <%= resources[["num_cores"]] | 1 %> # Wildcard for cores per job.
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
module load R/3.6.3
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0
```
then you can set the number of cores for individual targets. In the case below, maybe the slow model needs 2 cores to run fast enough.
```{r, eval = FALSE}
# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data), resources = list(num_cores = 2)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
```
Then, run `tar_make_future()` as usual.
```{r, eval = FALSE}
# R console
tar_make_future(workers = 2)
```
The `resources` of `tar_target()` defaults to `tar_option_get("resources")`. You can set the default value for all targets using `tar_option_set()`.
## Advanced
Functions `tar_target()`, `tar_target_raw()`, and `tar_option_set()` support advanced configuration options for heavy-duty pipelines that require high-performance computing.
* `deployment`: With the `deployment` argument, you can choose to run some targets locally on the main process instead of on a high-performance computing worker. This options is suitable for lightweight targets such as R Markdown reports where runtime is quick and a cluster would be excessive.
* `memory`: Choose whether to retain a target in memory or remove it from memory whenever it is not needed at the moment. This is a tradeoff between memory consumption and storage read speeds, and like all of the options listed here, you can set it on a target-by-target basis. The default settings consume a lot of memory to avoid frequently reading from storage. To keep memory usage down to a minimum, set `memory = "transient"` and `garbage_collection = TRUE` in `tar_target()` or `tar_option_set()`. For cloud-based dynamic files such as `format = "aws_file"`, this memory policy applies to temporary local copies of the file in `_targets/scratch/`: `"persistent"` means they remain until the end of the pipeline, and `"transient"` means they get deleted from the file system as soon as possible. The former conserves bandwidth, and the latter conserves local storage.
* `garbage_collection`: Choose whether to run `base::gc()` just before running the target.
* `storage`: Choose whether the parallel workers or the main process is responsible for saving the target's value. For slow network file systems on clusters, `storage = "main"` is often faster for small numbers of targets. For large numbers of targets or low-bandwidth connections between the main and workers, `storage = "worker"` is often faster. Always choose `storage = "main"` if the workers do no have access to the file system with the `_targets/` data store.
* `retrieval`: Choose whether the parallel workers or the main process is responsible for reading dependency targets from disk. Should usually be set to whatever you choose for `storage` (default). Always choose `retrieval = "main"` if the workers do no have access to the file system with the `_targets/` data store.
* `format`: If your pipeline has large computation, it may also have large data. Consider setting the `format` argument to help `targets` store and retrieve your data faster.
* `error`: Set `error` to `"continue"` to let the rest of the pipeline keep running even if a target encounters an error.