by Alex Robbins
Your long-running Cascalog jobs throw errors, then need to be completely restarted. You waste time waiting for steps to rerun when the problem was later in the workflow.
Cascalog Checkpoint is an excellent library that provides the ability to add checkpoints to your Cascalog job. If a step fails, the job is restarted at that step, instead of restarting from the beginning.
In an existing Cascalog project, such as the one generated by
[sec_cascalog_etl], add [cascalog/cascalog-checkpoint "1.10.2"]
to
your project’s dependencies and set the cookbook.checkpoint
namespace to be AOT-compiled.
Then use Cascalog Checkpoint’s workflow macro to set up your job. A hypothetical four-step job would look something like this:
(ns cookbook.checkpoint
(:require [cascalog.api :refer :all]
[cascalog.checkpoint :refer [workflow]]))
(defmain Main [in-path out-path & args]
(workflow ["/tmp/log-parsing"]
step-1 ([:temp-dirs parsed-logs-path]
(parse-logs in-path parsed-logs-path))
step-2 ([:temp-dirs [min-path max-path]]
(get-min parsed-logs-path min-path)
(get-max parsed-logs-path max-path))
step-3 ([:deps step-1 :temp-dirs log-sample-path]
(sample-logs parsed-logs-path log-sample-path))
step-4 ([:deps :all]
(summary parsed-logs-path
min-path
max-path
log-sample-path
out-path))))
Cascalog jobs often take hours to run. There are few things more frustrating than a typo in the last step breaking a job that has been running all weekend. Cascalog Checkpoint provides the workflow macro, which allows you to restart a job from the last step that successfully completed.
The workflow macro expects its first argument, checkpoint-dir, to be a vector with a path for temporary files. The output of each step is temporarily stored in folders inside this path, along with some files to keep track of what steps have successfully completed.
After the first argument, workflow expects pairs of step names and step definitions. A step definitions is a vector of options, followed by as many Cascalog queries as desired for that step. For example:
step-3 ([:deps step-1 :temp-dirs [log-sample-path log-other-sample-path]]
(sample-logs parsed-logs-path log-sample-path)
(other-sample-logs parsed-logs-path log-other-sample-path))
This step definition defines step-3. It depends on step-1, so it won’t run until step-1 has completed. This step creates two temporary directories for its queries. Both :deps and :temp-dirs can be either a symbol or a vector of symbols, or can be omitted. After the options vector, you can include one or many Cascalog queries; in this case, there are two queries.
:deps can take several different values. :last, which is the default value, makes the step depend on the step before it. :all makes the step depend on all previously defined steps. Providing a symbol, or vector of symbols, makes that step depend on that particular step or steps. A step won’t run until everything it depends upon has completed. If several steps have their dependencies met, they will all run in parallel.
Every symbol provided to :temp-dirs is turned into a directory within the temp directory. Later steps can use these directories to read data output by earlier steps. These directories are cleaned up once the workflow successfully runs all the way through. Until then, these directories hold the output from the different steps so the workflow can resume from the last incomplete step.
Note
|
If you want to restart a step that successfully completed, delete the file at <checkpoint-dir>/<step-name>. The :temp-dirs from the step definitions can be found in <checkpoint-dir>/data/<temp-dir>, in case you need to delete or modify the data there. |
Another method for dealing with errors is providing error taps for your Cascalog queries. Cascalog will put the input tuples that cause errors in a query into the error tap (for different processing or to dump for manual inspection). With error taps in place, a couple of malformed inputs won’t bring down your entire workflow.
Checkpointing your Cascalog jobs is a little bit of extra work initially, but it’ll save you a lot of time. Things will go wrong. The cluster will go down. You’ll discover typos and edge cases. It is wonderful to be able to restart your job from the last step that worked, instead of waiting for the entire thing to rerun every time.
-
The cascalog.checkpoint project page on GitHub