Skip to content

Commit

Permalink
STORM-350: Upgrade to newer version of disruptor
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert (Bobby) Evans committed Oct 26, 2015
1 parent f582b8f commit 48c55c8
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 84 deletions.
1 change: 0 additions & 1 deletion conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ topology.executor.send.buffer.size: 1024 #individual messages
topology.transfer.buffer.size: 1024 # batched
topology.tick.tuple.freq.secs: null
topology.worker.shared.thread.pool.size: 4
topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy"
topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms: 1
topology.error.throttle.interval.secs: 10
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.starter;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.*;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

/**
* WordCount but teh spout does not stop, and the bolts are implemented in
* java. This can show how fast the word count can run.
*/
public class FastWordCountTopology {
public static class FastRandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = ThreadLocalRandom.current();
}

@Override
public void nextTuple() {
String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence), sentence);
}

@Override
public void ack(Object id) {
//Ignored
}

@Override
public void fail(Object id) {
_collector.emit(new Values(id), id);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}

public static class SplitSentence extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String sentence = tuple.getString(0);
for (String word: sentence.split("\\s+")) {
collector.emit(new Values(word, 1));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

public static void printMetrics(Nimbus.Client client, String name) throws Exception {
ClusterSummary summary = client.getClusterInfo();
String id = null;
for (TopologySummary ts: summary.get_topologies()) {
if (name.equals(ts.get_name())) {
id = ts.get_id();
}
}
if (id == null) {
throw new Exception("Could not find a topology named "+name);
}
TopologyInfo info = client.getTopologyInfo(id);
int uptime = info.get_uptime_secs();
long acked = 0;
double weightedAvgTotal = 0.0;
for (ExecutorSummary exec: info.get_executors()) {
if ("spout".equals(exec.get_component_id())) {
SpoutStats stats = exec.get_stats().get_specific().get_spout();
Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
for (String key: ackedMap.keySet()) {
long ackVal = ackedMap.get(key);
double latVal = avgLatMap.get(key) * ackVal;
acked += ackVal;
weightedAvgTotal += latVal;
}
}
}
double avgLatency = weightedAvgTotal/acked;
System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime));
}

public static void kill(Nimbus.Client client, String name) throws Exception {
KillOptions opts = new KillOptions();
opts.set_wait_secs(0);
client.killTopologyWithOpts(name, opts);
}

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new FastRandomSentenceSpout(), 4);

builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word"));

Config conf = new Config();
conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class);

String name = "wc-test";
if (args != null && args.length > 0) {
name = args[0];
}

conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

Map clusterConf = Utils.readStormConfig();
clusterConf.putAll(Utils.readCommandLineOpts());
Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();

//Sleep for 5 mins
for (int i = 0; i < 10; i++) {
Thread.sleep(30 * 1000);
printMetrics(client, name);
}
kill(client, name);
}
}
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@
<snakeyaml.version>1.11</snakeyaml.version>
<httpclient.version>4.3.3</httpclient.version>
<clojure.tools.cli.version>0.2.4</clojure.tools.cli.version>
<disruptor.version>2.10.4</disruptor.version>
<disruptor.version>3.3.2</disruptor.version>
<jgrapht.version>0.9.0</jgrapht.version>
<guava.version>16.0.1</guava.version>
<netty.version>3.9.0.Final</netty.version>
Expand Down Expand Up @@ -549,7 +549,7 @@
<version>${clojure.tools.cli.version}</version>
</dependency>
<dependency>
<groupId>com.googlecode.disruptor</groupId>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>
</dependency>
Expand Down
2 changes: 1 addition & 1 deletion storm-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.googlecode.disruptor</groupId>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<dependency>
Expand Down
3 changes: 1 addition & 2 deletions storm-core/src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@
(str "executor" executor-id "-send-queue")
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:claim-strategy :single-threaded
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
:producer-type :single-threaded)
]
(recursive-map
:worker worker
Expand Down
6 changes: 2 additions & 4 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@
;; TODO: this depends on the type of executor
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))]))
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS))]))
(into {})
))

Expand Down Expand Up @@ -232,8 +231,7 @@
(let [assignment-versions (atom {})
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
:wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors)

receive-queue-map (->> executor-receive-queue-map
Expand Down
32 changes: 7 additions & 25 deletions storm-core/src/clj/backtype/storm/disruptor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,21 @@

(ns backtype.storm.disruptor
(:import [backtype.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback])
(:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy
BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
BusySpinWaitStrategy])
(:import [com.lmax.disruptor.dsl ProducerType])
(:require [clojure [string :as str]])
(:require [clojure [set :as set]])
(:use [clojure walk])
(:use [backtype.storm util log]))

(def CLAIM-STRATEGY
{:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size)))
:single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))})
(def PRODUCER-TYPE
{:multi-threaded ProducerType/MULTI
:single-threaded ProducerType/SINGLE})

(def WAIT-STRATEGY
{:block (fn [] (BlockingWaitStrategy.))
:yield (fn [] (YieldingWaitStrategy.))
:sleep (fn [] (SleepingWaitStrategy.))
:spin (fn [] (BusySpinWaitStrategy.))})

(defn- mk-wait-strategy
[spec]
(if (keyword? spec)
((WAIT-STRATEGY spec))
(-> (str spec) new-instance)))

;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue.
;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
;; unblocking the consumer
(defnk disruptor-queue
[^String queue-name buffer-size timeout :claim-strategy :multi-threaded :wait-strategy :block]
[^String queue-name buffer-size timeout :producer-type :multi-threaded]
(DisruptorQueue. queue-name
((CLAIM-STRATEGY claim-strategy) buffer-size)
(mk-wait-strategy wait-strategy) timeout))
(PRODUCER-TYPE producer-type) buffer-size
timeout))

(defn clojure-handler
[afn]
Expand Down
11 changes: 6 additions & 5 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1479,10 +1479,12 @@ public class Config extends HashMap<String, Object> {
@isInteger
public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";

/**
* Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
* vs. throughput
*/
/**
* @deprecated this is no longer supported
* Configure the wait strategy used for internal queuing. Can be used to tradeoff latency
* vs. throughput
*/
@Deprecated
@isString
public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy";

Expand Down Expand Up @@ -1658,7 +1660,6 @@ public class Config extends HashMap<String, Object> {
* vs. CPU usage
*/
@isInteger
@isPositiveNumber
@NotNull
public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis";

Expand Down
Loading

0 comments on commit 48c55c8

Please sign in to comment.