Skip to content

Commit

Permalink
Added batching of tuples at a disruptor queue level.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert (Bobby) Evans committed Oct 27, 2015
1 parent 1be78e7 commit c415d32
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 339 deletions.
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ topology.classpath: null
topology.environment: null
topology.bolts.outgoing.overflow.buffer.enable: false
topology.disruptor.wait.timeout.millis: 1000
topology.disruptor.batch.size: 100
topology.disruptor.batch.timeout.millis: 1

# Configs for Resource Aware Scheduler
topology.component.resources.onheap.memory.mb: 128.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ public class FastWordCountTopology {
public static class FastRandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
private static final String[] CHOICES = {
"marry had a little lamb whos fleese was white as snow",
"and every where that marry went the lamb was sure to go",
"one two three four five six seven eight nine ten",
"this is a test of the emergency broadcast system this is only a test",
"peter piper picked a peck of pickeled peppers"
};

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
Expand All @@ -56,9 +63,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect

@Override
public void nextTuple() {
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
String sentence = sentences[_rand.nextInt(sentences.length)];
String sentence = CHOICES[_rand.nextInt(CHOICES.length)];
_collector.emit(new Values(sentence), sentence);
}

Expand Down
8 changes: 4 additions & 4 deletions storm-core/src/clj/backtype/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,10 @@
(let [path (backpressure-path storm-id node port)
existed (exists-node? cluster-state path false)]
(if existed
(if (not on?)
(delete-node cluster-state path)) ;; delete the znode since the worker is not congested
(if on?
(set-ephemeral-node cluster-state path nil acls))))) ;; create the znode since worker is congested
(when (not on?)
(delete-node cluster-state path))
(when on?
(set-ephemeral-node cluster-state path nil acls)))))

(topology-backpressure
[this storm-id callback]
Expand Down
104 changes: 24 additions & 80 deletions storm-core/src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -197,23 +197,10 @@
;; in its own function so that it can be mocked out by tracked topologies
(defn mk-executor-transfer-fn [batch-transfer->worker storm-conf]
(fn this
([task tuple block? ^ConcurrentLinkedQueue overflow-buffer]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "TRANSFERING tuple TASK: " task " TUPLE: " tuple))
(if (and overflow-buffer (not (.isEmpty overflow-buffer)))
(.add overflow-buffer [task tuple])
(try-cause
(disruptor/publish batch-transfer->worker [task tuple] block?)
(catch InsufficientCapacityException e
(if overflow-buffer
(.add overflow-buffer [task tuple])
(throw e))
))))
([task tuple overflow-buffer]
(this task tuple (nil? overflow-buffer) overflow-buffer))
([task tuple]
(this task tuple nil)
)))
[task tuple]
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "TRANSFERING tuple TASK: " task " TUPLE: " tuple))
(disruptor/publish batch-transfer->worker [task tuple])))

(defn mk-executor-data [worker executor-id]
(let [worker-context (worker-context worker)
Expand All @@ -225,7 +212,9 @@
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:producer-type :single-threaded)
:producer-type :single-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
]
(recursive-map
:worker worker
Expand Down Expand Up @@ -283,6 +272,7 @@
"When receive queue is below lowWaterMark"
(if @(:backpressure executor-data)
(do (reset! (:backpressure executor-data) false)
(log-debug "executor " (:executor-id executor-data) " is not-congested, set backpressure flag false")
(WorkerBackpressureThread/notifyBackpressureChecker (:backpressure-trigger (:worker executor-data))))))))

(defn start-batch-transfer->worker-handler! [worker executor-data]
Expand Down Expand Up @@ -316,7 +306,7 @@
[[nil (TupleImpl. worker-context [interval] Constants/SYSTEM_TASK_ID Constants/METRICS_TICK_STREAM_ID)]]))))))

(defn metrics-tick
([executor-data task-data ^TupleImpl tuple overflow-buffer]
[executor-data task-data ^TupleImpl tuple]
(let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
interval (.getInteger tuple 0)
task-id (:task-id task-data)
Expand All @@ -336,11 +326,7 @@
(filter identity)
(into []))]
(if (seq data-points)
(task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points] overflow-buffer))))
([executor-data task-data ^TupleImpl tuple]
(metrics-tick executor-data task-data tuple nil)
))

(task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))

(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
Expand Down Expand Up @@ -403,8 +389,7 @@
context (:worker-context executor-data)]
(disruptor/publish
receive-queue
[[nil (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID)]]
)))
[[nil (TupleImpl. context [creds] Constants/SYSTEM_TASK_ID Constants/CREDENTIALS_CHANGED_STREAM_ID)]])))
(get-backpressure-flag [this]
@(:backpressure executor-data))
Shutdownable
Expand Down Expand Up @@ -480,7 +465,7 @@

;; Send sampled data to the eventlogger if the global or component level
;; debug flag is set (via nimbus api).
(defn send-to-eventlogger [executor-data task-data values overflow-buffer component-id message-id random]
(defn send-to-eventlogger [executor-data task-data values component-id message-id random]
(let [c->d @(:storm-component->debug-atom executor-data)
options (get c->d component-id (get c->d (:storm-id executor-data)))
spct (if (and (not-nil? options) (:enable options)) (:samplingpct options) 0)]
Expand All @@ -490,8 +475,7 @@
(task/send-unanchored
task-data
EVENTLOGGER-STREAM-ID
[component-id message-id (System/currentTimeMillis) values]
overflow-buffer))))
[component-id message-id (System/currentTimeMillis) values]))))

(defmethod mk-threads :spout [executor-data task-datas initial-credentials]
(let [{:keys [storm-conf component-id worker-context transfer-fn report-error sampler open-or-prepare-was-called?]} executor-data
Expand All @@ -501,15 +485,7 @@
last-active (atom false)
spouts (ArrayList. (map :object (vals task-datas)))
rand (Random. (Utils/secureRandomLong))

;; the overflow buffer is used to ensure that spouts never block when emitting
;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
;; buffers filled up)
;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,
;; preventing memory issues
overflow-buffer (ConcurrentLinkedQueue.)
^DisruptorQueue transfer-queue (executor-data :batch-transfer-queue)

pending (RotatingMap.
2 ;; microoptimize for performance of .size method
Expand All @@ -522,7 +498,7 @@
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple overflow-buffer)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
Constants/CREDENTIALS_CHANGED_STREAM_ID
(let [task-data (get task-datas task-id)
spout-obj (:object task-data)]
Expand Down Expand Up @@ -578,11 +554,10 @@
out-stream-id
tuple-id)]
(transfer-fn out-task
out-tuple
overflow-buffer)
out-tuple)
))
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values overflow-buffer component-id message-id rand))
(send-to-eventlogger executor-data task-data values component-id message-id rand))
(if (and rooted?
(not (.isEmpty out-ids)))
(do
Expand All @@ -592,8 +567,7 @@
(if (sampler) (System/currentTimeMillis))])
(task/send-unanchored task-data
ACKER-INIT-STREAM-ID
[root-id (bit-xor-vals out-ids) task-id]
overflow-buffer))
[root-id (bit-xor-vals out-ids) task-id]))
(when message-id
(ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values}
Expand Down Expand Up @@ -628,20 +602,10 @@
(log-message "Opened spout " component-id ":" (keys task-datas))
(setup-metrics! executor-data)

(disruptor/consumer-started! (:receive-queue executor-data))
(fn []
;; This design requires that spouts be non-blocking
(disruptor/consume-batch receive-queue event-handler)

;; try to clear the overflow-buffer
(try-cause
(while (not (.isEmpty overflow-buffer))
(let [[out-task out-tuple] (.peek overflow-buffer)]
(transfer-fn out-task out-tuple false nil)
(.poll overflow-buffer)))
(catch InsufficientCapacityException e
))

(let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)
backpressure-enabled ((:storm-conf executor-data) TOPOLOGY-BACKPRESSURE-ENABLE)
Expand All @@ -650,7 +614,7 @@
reached-max-spout-pending (and max-spout-pending
(>= (.size pending) max-spout-pending))
]
(if (and (.isEmpty overflow-buffer)
(if (and (not (.isFull transfer-queue))
(not throttle-on)
(not reached-max-spout-pending))
(if active?
Expand Down Expand Up @@ -706,14 +670,6 @@
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))

;; the overflow buffer is used to ensure that bolts do not block when emitting
;; this ensures that the bolt can always clear the incoming messages, which
;; prevents deadlock from occurs across the topology
;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
;; buffers filled up)
;; the overflow buffer is might gradually fill degrading the performance gradually
;; eventually running out of memory, but at least prevent live-locks/deadlocks.
overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)
tuple-action-fn (fn [task-id ^TupleImpl tuple]
;; synchronization needs to be done with a key provided by this bolt, otherwise:
;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
Expand All @@ -738,7 +694,7 @@
bolt-obj (:object task-data)]
(when (instance? ICredentialsListener bolt-obj)
(.setCredentials bolt-obj (.getValue tuple 0))))
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple overflow-buffer)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
(let [task-data (get task-datas task-id)
^IBolt bolt-obj (:object task-data)
user-context (:user-context task-data)
Expand Down Expand Up @@ -795,10 +751,9 @@
values
task-id
stream
(MessageId/makeId anchors-to-ids))
overflow-buffer)))
(MessageId/makeId anchors-to-ids)))))
(if has-eventloggers?
(send-to-eventlogger executor-data task-data values overflow-buffer component-id nil rand))
(send-to-eventlogger executor-data task-data values component-id nil rand))
(or out-tasks [])))]]
(builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))
Expand Down Expand Up @@ -830,7 +785,7 @@
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
ACKER-ACK-STREAM-ID
[root (bit-xor id ack-val)] overflow-buffer)
[root (bit-xor id ack-val)])
))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
Expand All @@ -846,7 +801,7 @@
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
ACKER-FAIL-STREAM-ID
[root] overflow-buffer))
[root]))
(let [delta (tuple-time-delta! tuple)
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
Expand All @@ -866,19 +821,8 @@

(let [receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)]
(disruptor/consumer-started! receive-queue)
(fn []
(disruptor/consume-batch-when-available receive-queue event-handler)
;; try to clear the overflow-buffer
(try-cause
(while (and overflow-buffer (not (.isEmpty overflow-buffer)))
(let [[out-task out-tuple] (.peek overflow-buffer)]
(transfer-fn out-task out-tuple false nil)
(.poll overflow-buffer)))
(catch InsufficientCapacityException e
(when (= true (storm-conf TOPOLOGY-DEBUG))
(log-message "Insufficient Capacity on queue to emit by bolt " component-id ":" (keys task-datas) ))
))
0)))
:kill-fn (:report-error-and-die executor-data)
:factory? true
Expand Down
10 changes: 2 additions & 8 deletions storm-core/src/clj/backtype/storm/daemon/task.clj
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@

;; TODO: this is all expensive... should be precomputed
(defn send-unanchored
([task-data stream values overflow-buffer]
[task-data stream values]
(let [^TopologyContext topology-context (:system-context task-data)
tasks-fn (:tasks-fn task-data)
transfer-fn (-> task-data :executor-data :transfer-fn)
Expand All @@ -114,13 +114,7 @@
(.getThisTaskId topology-context)
stream)]
(fast-list-iter [t (tasks-fn stream values)]
(transfer-fn t
out-tuple
overflow-buffer)
)))
([task-data stream values]
(send-unanchored task-data stream values nil)
))
(transfer-fn t out-tuple))))

(defn mk-tasks-fn [task-data]
(let [task-id (:task-id task-data)
Expand Down
12 changes: 8 additions & 4 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@

transfer-fn
(fn [^KryoTupleSerializer serializer tuple-batch]
(let [local (ArrayList.)
remoteMap (HashMap.)]
(let [^ArrayList local (ArrayList.)
^HashMap remoteMap (HashMap.)]
(fast-list-iter [[task tuple :as pair] tuple-batch]
(if (local-tasks task)
(.add local pair)
Expand Down Expand Up @@ -190,7 +190,9 @@
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS))]))
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(into {})
))

Expand Down Expand Up @@ -231,7 +233,9 @@
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS))
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)

receive-queue-map (->> executor-receive-queue-map
Expand Down
Loading

0 comments on commit c415d32

Please sign in to comment.