From 8426c3bfbbe4d292ed1b8a35ab42eb7a6a243e1a Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Fri, 26 Aug 2016 12:23:40 +0900 Subject: [PATCH] Bump Storm version to 1.0.2, current latest version * Storm 1.0.0+ provides various new features and great performance improvement * Storm 1.0.2 is recent version of Storm 1.x line which fixes various critical bugs --- bin/storm_env.sh | 2 +- build.sbt | 20 +++--- .../data/squall/components/Component.java | 4 +- .../components/DataSourceComponent.java | 4 +- .../squall/components/EquiJoinComponent.java | 4 +- .../squall/components/OperatorComponent.java | 4 +- .../hyper_cube/HyperCubeJoinComponent.java | 4 +- .../theta/AdaptiveThetaJoinComponent.java | 4 +- .../components/theta/ThetaJoinComponent.java | 4 +- .../connectors/hdfs/HDFSmaterializer.java | 6 +- .../squall/ewh/components/DummyComponent.java | 4 +- .../components/EWHSampleMatrixComponent.java | 4 +- .../EquiDepthHistogramComponent.java | 4 +- .../OkcanSampleMatrixComponent.java | 4 +- .../ewh/main/PullStatisticCollector.java | 2 +- .../SampleAsideAndForwardOperator.java | 6 +- .../ewh/storm_components/D2CombinerBolt.java | 20 +++--- .../ewh/storm_components/DummyBolt.java | 18 ++--- .../storm_components/EWHSampleMatrixBolt.java | 20 +++--- .../EquiDepthHistogramBolt.java | 18 ++--- .../OkcanSampleMatrixBolt.java | 18 ++--- .../S1ReservoirGenerator.java | 20 +++--- .../storm_components/S1ReservoirMerge.java | 20 +++--- .../RangeFilteredMulticastStreamGrouping.java | 4 +- .../RangeMulticastStreamGrouping.java | 6 +- .../java/ch/epfl/data/squall/main/Main.java | 4 +- .../data/squall/query_plans/QueryBuilder.java | 4 +- .../storm_components/StormBoltComponent.java | 16 ++--- .../storm_components/StormComponent.java | 2 +- .../storm_components/StormDataSource.java | 12 ++-- .../squall/storm_components/StormDstJoin.java | 8 +-- .../StormDstTupleStorageJoin.java | 8 +-- .../StormJoinerBoltComponent.java | 6 +- .../storm_components/StormOperator.java | 8 +-- .../storm_components/StormSpoutComponent.java | 14 ++-- .../storm_components/StormSrcHarmonizer.java | 20 +++--- .../squall/storm_components/StormSrcJoin.java | 4 +- .../storm_components/StormSrcStorage.java | 14 ++-- .../hyper_cube/StormHyperCubeJoin.java | 8 +-- .../stream_grouping/HyperCubeGrouping.java | 6 +- .../stream_grouping/BatchStreamGrouping.java | 10 +-- .../stream_grouping/HashStreamGrouping.java | 10 +-- .../ShuffleStreamGrouping.java | 6 +- .../synchronization/TopologyKiller.java | 26 +++---- .../theta/StormThetaJoin.java | 8 +-- .../ContentInsensitiveThetaJoinGrouping.java | 6 +- .../ContentSensitiveThetaJoinGrouping.java | 6 +- .../ThetaJoinerAdaptiveAdvisedEpochs.java | 18 ++--- .../ThetaReshufflerAdvisedEpochs.java | 18 ++--- ...ataMigrationJoinerToReshufflerMapping.java | 6 +- .../ThetaJoinAdaptiveMapping.java | 6 +- .../data/squall/utilities/MyUtilities.java | 20 +++--- .../data/squall/utilities/SquallContext.java | 14 ++-- .../SquallSerializationDelegate.java | 23 +++---- .../data/squall/utilities/StormWrapper.java | 54 +++++++-------- .../squall/utilities/SystemParameters.java | 4 +- .../thetajoin/dynamic/ThetaJoinUtilities.java | 2 +- .../WindowSemanticsManager.java | 2 +- .../src/test/resources/log4j2-test.xml | 14 ++++ .../src/test/resources/logback-test.xml | 12 ---- squall-core/src/test/scala/TestSuite.scala | 67 ++++++++++--------- .../ch/epfl/data/squall/api/scala/REPL.scala | 27 +++----- .../operators/predicates/ScalaPredicate.scala | 8 +-- .../DistributionSignalSpout.java | 10 +-- .../HarmonizerSignalSpout.java | 8 +-- .../SignaledDataSourceComponent.java | 4 +- .../StormSynchronizedSpoutComponent.java | 12 ++-- .../SynchronizedStormDataSource.java | 12 ++-- .../storm/BaseSignalBolt.java | 6 +- .../storm/BaseSignalSpout.java | 6 +- .../storm/StormSignalConnection.java | 2 +- 71 files changed, 393 insertions(+), 392 deletions(-) create mode 100644 squall-core/src/test/resources/log4j2-test.xml delete mode 100644 squall-core/src/test/resources/logback-test.xml diff --git a/bin/storm_env.sh b/bin/storm_env.sh index c7b4b5ba..255b50ca 100755 --- a/bin/storm_env.sh +++ b/bin/storm_env.sh @@ -1,7 +1,7 @@ #!/bin/bash #STORMNAME=storm-0.9.2-incubating -STORMNAME=apache-storm-0.9.4 +STORMNAME=apache-storm-1.0.2 # DBTOASTER installation folder. Only required if QueryPlan uses DBToasterJoinComponent DBTOASTER_HOME=../target/dbtoaster diff --git a/build.sbt b/build.sbt index b3b5b468..4e8a4122 100644 --- a/build.sbt +++ b/build.sbt @@ -80,7 +80,10 @@ lazy val squall = (project in file("squall-core")). unmanagedBase := baseDirectory.value / "../contrib", // We need to add Clojars as a resolver, as Storm depends on some // libraries from there. - resolvers += "clojars" at "https://clojars.org/repo", + resolvers ++= Seq( + "clojars" at "https://clojars.org/repo", + "confluent" at "http://packages.confluent.io/maven" + ), libraryDependencies ++= Seq( // Versions that were changed when migrating from Lein to sbt are // commented just before the library @@ -89,16 +92,17 @@ lazy val squall = (project in file("squall-core")). "net.sf.opencsv" % "opencsv" % "2.3", // bdb-je: 5.0.84 -> 5.0.73 //"com.sleepycat" % "je" % "5.0.73", - // storm-core: 0.9.2-incubating -> 0.9.4 - "org.apache.storm" % "storm-core" % "0.9.4" % "provided", + "org.apache.storm" % "storm-core" % "1.0.2" % "provided", "org.slf4j" % "log4j-over-slf4j" % "1.7.12", //"io.dropwizard" % "dropwizard-metrics" % "0.8.1", - //"org.apache.storm" % "storm-starter" % "0.9.4", + //"org.apache.storm" % "storm-starter" % "1.0.2", "junit" % "junit" % "4.12" % Test, "com.novocode" % "junit-interface" % "0.11" % Test, - "org.apache.hadoop" % "hadoop-client" % "2.2.0" exclude("org.slf4j", "slf4j-log4j12"), - "org.apache.hadoop" % "hadoop-hdfs" % "2.2.0" exclude("org.slf4j", "slf4j-log4j12"), - "org.apache.storm" % "storm-hdfs" % "0.10.0-beta1" + // storm-hdfs 1.0.2 includes hadoop 2.6.1 + // if you want to change hadoop version, please uncomment below dependencies and change version + "org.apache.storm" % "storm-hdfs" % "1.0.2" + //"org.apache.hadoop" % "hadoop-client" % "2.2.0" exclude("org.slf4j", "slf4j-log4j12"), + //"org.apache.hadoop" % "hadoop-hdfs" % "2.2.0" exclude("org.slf4j", "slf4j-log4j12"), //"com.github.ptgoetz" % "storm-signals" % "0.2.0", //"com.netflix.curator" % "curator-framework" % "1.0.1" ), @@ -183,7 +187,7 @@ lazy val functional = (project in file("squall-functional")). (test in Test).value }, name := "squall-frontend", - libraryDependencies += "org.apache.storm" % "storm-core" % "0.9.4" % "provided", + libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided", libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _), libraryDependencies += "org.scalatest" % "scalatest_2.11" % "2.2.4" % Test, // Interactive mode diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/Component.java b/squall-core/src/main/java/ch/epfl/data/squall/components/Component.java index aa1ec4c0..f61b49f6 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/Component.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/Component.java @@ -22,8 +22,8 @@ import java.io.Serializable; import java.util.List; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.expressions.ValueExpression; import ch.epfl.data.squall.operators.Operator; import ch.epfl.data.squall.storm_components.StormEmitter; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/DataSourceComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/components/DataSourceComponent.java index f74bdfa8..5c94835e 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/DataSourceComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/DataSourceComponent.java @@ -25,8 +25,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.StormDataSource; import ch.epfl.data.squall.storm_components.synchronization.TopologyKiller; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/EquiJoinComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/components/EquiJoinComponent.java index 07a4c826..6fb8013b 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/EquiJoinComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/EquiJoinComponent.java @@ -23,8 +23,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.operators.ProjectOperator; import ch.epfl.data.squall.predicates.Predicate; import ch.epfl.data.squall.storage.AggregationStore; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/OperatorComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/components/OperatorComponent.java index 3246ad1b..d1fc6579 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/OperatorComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/OperatorComponent.java @@ -24,8 +24,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.operators.Operator; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.StormOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/hyper_cube/HyperCubeJoinComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/components/hyper_cube/HyperCubeJoinComponent.java index 7d9c36e2..8eae612a 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/hyper_cube/HyperCubeJoinComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/hyper_cube/HyperCubeJoinComponent.java @@ -28,8 +28,8 @@ import ch.epfl.data.squall.types.Type; import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.AbstractJoinerComponent; import ch.epfl.data.squall.storm_components.synchronization.TopologyKiller; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/theta/AdaptiveThetaJoinComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/components/theta/AdaptiveThetaJoinComponent.java index e40156ef..8a3be47b 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/theta/AdaptiveThetaJoinComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/theta/AdaptiveThetaJoinComponent.java @@ -33,8 +33,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.AbstractJoinerComponent; import ch.epfl.data.squall.storm_components.StormComponent; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/components/theta/ThetaJoinComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/components/theta/ThetaJoinComponent.java index b0b6c8f7..bfa85c7b 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/components/theta/ThetaJoinComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/components/theta/ThetaJoinComponent.java @@ -23,8 +23,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.AbstractJoinerComponent; import ch.epfl.data.squall.predicates.Predicate; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/connectors/hdfs/HDFSmaterializer.java b/squall-core/src/main/java/ch/epfl/data/squall/connectors/hdfs/HDFSmaterializer.java index 2da2accd..b03c61c7 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/connectors/hdfs/HDFSmaterializer.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/connectors/hdfs/HDFSmaterializer.java @@ -35,9 +35,9 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.expressions.ValueExpression; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/DummyComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/DummyComponent.java index 7db3c0ee..52979ea5 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/DummyComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/DummyComponent.java @@ -26,8 +26,8 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.ewh.storm_components.DummyBolt; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EWHSampleMatrixComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EWHSampleMatrixComponent.java index 360176a7..19e5c658 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EWHSampleMatrixComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EWHSampleMatrixComponent.java @@ -25,8 +25,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.ewh.storm_components.D2CombinerBolt; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EquiDepthHistogramComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EquiDepthHistogramComponent.java index d0bf94eb..1e99523f 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EquiDepthHistogramComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/EquiDepthHistogramComponent.java @@ -24,8 +24,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.components.EquiJoinComponent; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/OkcanSampleMatrixComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/OkcanSampleMatrixComponent.java index 4dd41d45..4ddcad03 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/OkcanSampleMatrixComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/components/OkcanSampleMatrixComponent.java @@ -24,8 +24,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.components.EquiJoinComponent; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/main/PullStatisticCollector.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/main/PullStatisticCollector.java index 986c1129..235c2156 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/main/PullStatisticCollector.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/main/PullStatisticCollector.java @@ -31,7 +31,7 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; +import org.apache.storm.Config; import ch.epfl.data.squall.ewh.data_structures.JoinMatrix; import ch.epfl.data.squall.ewh.data_structures.Region; import ch.epfl.data.squall.ewh.data_structures.UJMPAdapterByteMatrix; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/operators/SampleAsideAndForwardOperator.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/operators/SampleAsideAndForwardOperator.java index ee678404..1e0f58b3 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/operators/SampleAsideAndForwardOperator.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/operators/SampleAsideAndForwardOperator.java @@ -27,9 +27,9 @@ import org.apache.log4j.Logger; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.operators.Operator; import ch.epfl.data.squall.operators.OneToOneOperator; import ch.epfl.data.squall.utilities.MyUtilities; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/D2CombinerBolt.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/D2CombinerBolt.java index 53b21183..650886e1 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/D2CombinerBolt.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/D2CombinerBolt.java @@ -28,16 +28,16 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.predicates.ComparisonPredicate; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.StormEmitter; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/DummyBolt.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/DummyBolt.java index ab31fe1a..5db50f80 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/DummyBolt.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/DummyBolt.java @@ -25,15 +25,15 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.StormEmitter; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EWHSampleMatrixBolt.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EWHSampleMatrixBolt.java index a9b7041e..79c70099 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EWHSampleMatrixBolt.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EWHSampleMatrixBolt.java @@ -30,16 +30,16 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.ewh.algorithms.BSPAlgorithm; import ch.epfl.data.squall.ewh.algorithms.InputOutputShallowCoarsener; import ch.epfl.data.squall.ewh.algorithms.ShallowCoarsener; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EquiDepthHistogramBolt.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EquiDepthHistogramBolt.java index f7c33b8f..c88e239b 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EquiDepthHistogramBolt.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/EquiDepthHistogramBolt.java @@ -26,15 +26,15 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.predicates.ComparisonPredicate; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.StormEmitter; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/OkcanSampleMatrixBolt.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/OkcanSampleMatrixBolt.java index dec74751..6e5af334 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/OkcanSampleMatrixBolt.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/OkcanSampleMatrixBolt.java @@ -26,15 +26,15 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.ewh.algorithms.OkcanCandidateInputAlgorithm; import ch.epfl.data.squall.ewh.algorithms.OkcanCandidateOutputAlgorithm; import ch.epfl.data.squall.ewh.algorithms.TilingAlgorithm; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirGenerator.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirGenerator.java index 1a53dae1..b49a9980 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirGenerator.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirGenerator.java @@ -31,16 +31,16 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.ewh.data_structures.FixedSizePriorityQueue; import ch.epfl.data.squall.ewh.data_structures.KeyPriorityProbability; import ch.epfl.data.squall.predicates.ComparisonPredicate; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirMerge.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirMerge.java index b7660e5f..c29c2e67 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirMerge.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/S1ReservoirMerge.java @@ -28,16 +28,16 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.ewh.data_structures.FixedSizePriorityQueue; import ch.epfl.data.squall.ewh.data_structures.KeyPriorityProbability; import ch.epfl.data.squall.predicates.ComparisonPredicate; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeFilteredMulticastStreamGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeFilteredMulticastStreamGrouping.java index 8890c1a9..97864e35 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeFilteredMulticastStreamGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeFilteredMulticastStreamGrouping.java @@ -26,8 +26,8 @@ import org.apache.log4j.Logger; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.predicates.ComparisonPredicate; import ch.epfl.data.squall.types.NumericType; import ch.epfl.data.squall.utilities.MyUtilities; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeMulticastStreamGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeMulticastStreamGrouping.java index ad317c60..2fe2c365 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeMulticastStreamGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/ewh/storm_components/stream_grouping/RangeMulticastStreamGrouping.java @@ -27,9 +27,9 @@ import org.apache.log4j.Logger; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.predicates.ComparisonPredicate; import ch.epfl.data.squall.types.NumericType; import ch.epfl.data.squall.utilities.DeepCopy; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/main/Main.java b/squall-core/src/main/java/ch/epfl/data/squall/main/Main.java index 07049317..be72224b 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/main/Main.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/main/Main.java @@ -27,8 +27,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.theta.AdaptiveThetaJoinComponent; import ch.epfl.data.squall.ewh.components.DummyComponent; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/query_plans/QueryBuilder.java b/squall-core/src/main/java/ch/epfl/data/squall/query_plans/QueryBuilder.java index ba8c8957..559ca841 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/query_plans/QueryBuilder.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/query_plans/QueryBuilder.java @@ -33,8 +33,8 @@ import ch.epfl.data.squall.ewh.components.DummyComponent; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.synchronization.TopologyKiller; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.Config; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.Config; import ch.epfl.data.squall.utilities.SystemParameters; import ch.epfl.data.squall.utilities.SquallContext; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormBoltComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormBoltComponent.java index d3663e5a..984ce30d 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormBoltComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormBoltComponent.java @@ -26,14 +26,14 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.ewh.main.PushStatisticCollector; import ch.epfl.data.squall.ewh.operators.SampleAsideAndForwardOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormComponent.java index a3040433..554142b7 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormComponent.java @@ -21,7 +21,7 @@ import java.util.List; -import backtype.storm.tuple.Tuple; +import org.apache.storm.tuple.Tuple; public interface StormComponent { diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDataSource.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDataSource.java index 3854f02a..b44a14aa 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDataSource.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDataSource.java @@ -29,12 +29,12 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.operators.AggregateOperator; import ch.epfl.data.squall.operators.ChainOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstJoin.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstJoin.java index 0ad5b446..26f1232f 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstJoin.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstJoin.java @@ -28,10 +28,10 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.operators.AggregateOperator; import ch.epfl.data.squall.operators.ChainOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstTupleStorageJoin.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstTupleStorageJoin.java index f5817157..1a38f0b1 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstTupleStorageJoin.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormDstTupleStorageJoin.java @@ -23,10 +23,10 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.predicates.Predicate; import ch.epfl.data.squall.storage.TupleStorage; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormJoinerBoltComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormJoinerBoltComponent.java index d922a8c2..9623a791 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormJoinerBoltComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormJoinerBoltComponent.java @@ -34,9 +34,9 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.operators.AggregateOperator; import ch.epfl.data.squall.operators.ChainOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormOperator.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormOperator.java index b0ff2383..d353b415 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormOperator.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormOperator.java @@ -25,10 +25,10 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.operators.AggregateOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSpoutComponent.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSpoutComponent.java index 830ea01c..39582c25 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSpoutComponent.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSpoutComponent.java @@ -25,13 +25,13 @@ import org.apache.log4j.Logger; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.ewh.operators.SampleAsideAndForwardOperator; import ch.epfl.data.squall.expressions.ValueExpression; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcHarmonizer.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcHarmonizer.java index 6960255b..b7af16d3 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcHarmonizer.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcHarmonizer.java @@ -24,16 +24,16 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.storm_components.synchronization.TopologyKiller; import ch.epfl.data.squall.utilities.MyUtilities; import ch.epfl.data.squall.utilities.SystemParameters; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcJoin.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcJoin.java index 01412370..60277298 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcJoin.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcJoin.java @@ -24,8 +24,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.expressions.ValueExpression; import ch.epfl.data.squall.operators.ProjectOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcStorage.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcStorage.java index 6f239913..01d684bf 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcStorage.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/StormSrcStorage.java @@ -25,13 +25,13 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.operators.AggregateOperator; import ch.epfl.data.squall.operators.ChainOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/StormHyperCubeJoin.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/StormHyperCubeJoin.java index 00b92269..97495a91 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/StormHyperCubeJoin.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/StormHyperCubeJoin.java @@ -39,10 +39,10 @@ import gnu.trove.list.array.TIntArrayList; import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.predicates.Predicate; import ch.epfl.data.squall.storage.TupleStorage; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/stream_grouping/HyperCubeGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/stream_grouping/HyperCubeGrouping.java index 87a9bb26..89fd8d43 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/stream_grouping/HyperCubeGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/hyper_cube/stream_grouping/HyperCubeGrouping.java @@ -1,8 +1,8 @@ package ch.epfl.data.squall.storm_components.hyper_cube.stream_grouping; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.thetajoin.matrix_assignment.HyperCubeAssignment; import ch.epfl.data.squall.utilities.MyUtilities; import org.apache.log4j.Logger; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/BatchStreamGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/BatchStreamGrouping.java index 14c1ac13..17313526 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/BatchStreamGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/BatchStreamGrouping.java @@ -23,16 +23,16 @@ import java.util.List; import java.util.Map; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.utilities.MyUtilities; import ch.epfl.data.squall.utilities.SystemParameters; public class BatchStreamGrouping implements CustomStreamGrouping { - /** - * + /** + * */ private static final long serialVersionUID = 1L; // the number of tasks on the level this stream grouping is sending to diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/HashStreamGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/HashStreamGrouping.java index c41cf3c6..b979a678 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/HashStreamGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/HashStreamGrouping.java @@ -23,9 +23,9 @@ import java.util.List; import java.util.Map; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.utilities.MyUtilities; /* @@ -36,8 +36,8 @@ */ public class HashStreamGrouping implements CustomStreamGrouping { - /** - * + /** + * */ private static final long serialVersionUID = 1L; // the number of tasks on the level this stream grouping is sending to diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/ShuffleStreamGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/ShuffleStreamGrouping.java index 57704fed..c02df898 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/ShuffleStreamGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/stream_grouping/ShuffleStreamGrouping.java @@ -24,9 +24,9 @@ import java.util.Map; import java.util.Random; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.utilities.MyUtilities; /* diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/synchronization/TopologyKiller.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/synchronization/TopologyKiller.java index dde08ba7..8d0fd835 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/synchronization/TopologyKiller.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/synchronization/TopologyKiller.java @@ -24,15 +24,15 @@ import org.apache.log4j.Logger; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.utilities.StormWrapper; import ch.epfl.data.squall.utilities.SystemParameters; @@ -42,8 +42,8 @@ * Otherwise, we receive from the Spouts when all of sent tuples are fully processed and acked */ public class TopologyKiller extends BaseRichBolt implements StormComponent { - /** - * + /** + * */ private static final long serialVersionUID = 1L; @@ -173,9 +173,9 @@ public void tupleSend(List tuple, Tuple stormTupleRcv, throw new UnsupportedOperationException( "These methods are not ment to be invoked for synchronizationStormComponents"); } - + public int getNumberRegisteredTasks() { - return _numberRegisteredTasks; + return _numberRegisteredTasks; } } diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/StormThetaJoin.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/StormThetaJoin.java index e9975ad6..dec00b4e 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/StormThetaJoin.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/StormThetaJoin.java @@ -23,10 +23,10 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Tuple; +import org.apache.storm.Config; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.predicates.Predicate; import ch.epfl.data.squall.storage.TupleStorage; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentInsensitiveThetaJoinGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentInsensitiveThetaJoinGrouping.java index 1db3c8b0..360fe390 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentInsensitiveThetaJoinGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentInsensitiveThetaJoinGrouping.java @@ -25,9 +25,9 @@ import org.apache.log4j.Logger; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.thetajoin.matrix_assignment.MatrixAssignment; import ch.epfl.data.squall.thetajoin.matrix_assignment.MatrixAssignment.Dimension; import ch.epfl.data.squall.utilities.MyUtilities; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentSensitiveThetaJoinGrouping.java b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentSensitiveThetaJoinGrouping.java index e6347bc3..8d7f98f8 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentSensitiveThetaJoinGrouping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/storm_components/theta/stream_grouping/ContentSensitiveThetaJoinGrouping.java @@ -25,9 +25,9 @@ import org.apache.log4j.Logger; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.thetajoin.matrix_assignment.MatrixAssignment; import ch.epfl.data.squall.thetajoin.matrix_assignment.MatrixAssignment.Dimension; import ch.epfl.data.squall.types.Type; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaJoinerAdaptiveAdvisedEpochs.java b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaJoinerAdaptiveAdvisedEpochs.java index 992c9c17..7cb443ac 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaJoinerAdaptiveAdvisedEpochs.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaJoinerAdaptiveAdvisedEpochs.java @@ -36,15 +36,15 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.operators.AggregateOperator; import ch.epfl.data.squall.operators.ChainOperator; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaReshufflerAdvisedEpochs.java b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaReshufflerAdvisedEpochs.java index c0cdd07c..59d77e3b 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaReshufflerAdvisedEpochs.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_component/ThetaReshufflerAdvisedEpochs.java @@ -31,15 +31,15 @@ import org.apache.log4j.Logger; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.storm_components.StormEmitter; import ch.epfl.data.squall.thetajoin.adaptive.advisor.Action; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaDataMigrationJoinerToReshufflerMapping.java b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaDataMigrationJoinerToReshufflerMapping.java index 2a3b2363..6273f5a6 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaDataMigrationJoinerToReshufflerMapping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaDataMigrationJoinerToReshufflerMapping.java @@ -24,9 +24,9 @@ import java.util.Map; import java.util.Random; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.utilities.SystemParameters; public class ThetaDataMigrationJoinerToReshufflerMapping implements diff --git a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaJoinAdaptiveMapping.java b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaJoinAdaptiveMapping.java index eb9a434c..bd75bd12 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaJoinAdaptiveMapping.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/thetajoin/adaptive/storm_matrix_mapping/ThetaJoinAdaptiveMapping.java @@ -24,9 +24,9 @@ import java.util.Map; import java.util.Random; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.task.WorkerTopologyContext; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.WorkerTopologyContext; import ch.epfl.data.squall.utilities.MyUtilities; public class ThetaJoinAdaptiveMapping implements CustomStreamGrouping { diff --git a/squall-core/src/main/java/ch/epfl/data/squall/utilities/MyUtilities.java b/squall-core/src/main/java/ch/epfl/data/squall/utilities/MyUtilities.java index 90bdb150..03d5f7c1 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/utilities/MyUtilities.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/utilities/MyUtilities.java @@ -39,16 +39,16 @@ import ch.epfl.data.squall.thetajoin.matrix_assignment.HyperCubeAssignment; import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.Constants; -import backtype.storm.generated.Grouping; -import backtype.storm.grouping.CustomStreamGrouping; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.InputDeclarer; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.generated.Grouping; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.InputDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.components.theta.ThetaJoinComponent; diff --git a/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallContext.java b/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallContext.java index 3d03830a..f0504fcb 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallContext.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallContext.java @@ -35,15 +35,15 @@ import ch.epfl.data.squall.utilities.ReaderProvider; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.operators.StoreOperator; -import backtype.storm.generated.Nimbus.Client; -import backtype.storm.generated.ClusterSummary; -import backtype.storm.generated.TopologySummary; -import backtype.storm.generated.TopologyInfo; -import backtype.storm.generated.NotAliveException; +import org.apache.storm.generated.Nimbus.Client; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.NotAliveException; import org.apache.log4j.Logger; -import backtype.storm.Config; +import org.apache.storm.Config; /* This class represents a context of execution. It provides a unified * interface for creating and submitting plans or running queries. @@ -58,7 +58,7 @@ public class SquallContext { public SquallContext() { this(new Config()); - Map stormConf = backtype.storm.utils.Utils.readStormConfig(); + Map stormConf = org.apache.storm.utils.Utils.readStormConfig(); conf.putAll(stormConf); // Load default values diff --git a/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallSerializationDelegate.java b/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallSerializationDelegate.java index e58650d5..b834f6d8 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallSerializationDelegate.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/utilities/SquallSerializationDelegate.java @@ -19,22 +19,19 @@ package ch.epfl.data.squall.utilities; -import backtype.storm.serialization.DefaultSerializationDelegate; - -import java.net.URLClassLoader; -import java.net.MalformedURLException; -import java.net.URL; import org.apache.commons.io.input.ClassLoaderObjectInputStream; -import java.io.File; +import org.apache.log4j.Logger; +import org.apache.storm.serialization.DefaultSerializationDelegate; + import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.util.Map; -import org.apache.log4j.Logger; - /* * Used in local mode to deserialize for the REPL and DBToaster @@ -64,9 +61,9 @@ public void prepare(Map stormConf) { } @Override - public Object deserialize(byte[] bytes) { + public T deserialize(byte[] bytes, Class clazz) { try { - return super.deserialize(bytes); + return super.deserialize(bytes, clazz); } catch (RuntimeException e) { try { if (classdir == null) throw e; @@ -79,7 +76,7 @@ public Object deserialize(byte[] bytes) { ObjectInputStream ois = new ClassLoaderObjectInputStream(classloader, bis); Object ret = ois.readObject(); ois.close(); - return ret; + return (T) ret; } catch (ClassNotFoundException error) { throw new RuntimeException(error); } catch (IOException error) { diff --git a/squall-core/src/main/java/ch/epfl/data/squall/utilities/StormWrapper.java b/squall-core/src/main/java/ch/epfl/data/squall/utilities/StormWrapper.java index 0d94ab5e..2c7779fa 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/utilities/StormWrapper.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/utilities/StormWrapper.java @@ -26,29 +26,29 @@ import java.util.concurrent.Semaphore; import org.apache.log4j.Logger; -import org.apache.thrift7.TException; -import org.apache.thrift7.transport.TTransportException; - -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.StormSubmitter; -import backtype.storm.generated.AlreadyAliveException; -import backtype.storm.generated.ClusterSummary; -import backtype.storm.generated.ErrorInfo; -import backtype.storm.generated.ExecutorInfo; -import backtype.storm.generated.ExecutorSpecificStats; -import backtype.storm.generated.ExecutorStats; -import backtype.storm.generated.ExecutorSummary; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Nimbus.Client; -import backtype.storm.generated.NotAliveException; -import backtype.storm.generated.TopologyInfo; -import backtype.storm.generated.TopologySummary; -import backtype.storm.generated.StormTopology; -import backtype.storm.generated.KillOptions; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.utils.NimbusClient; -import backtype.storm.utils.Utils; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.ClusterSummary; +import org.apache.storm.generated.ErrorInfo; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorSpecificStats; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.ExecutorSummary; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Nimbus.Client; +import org.apache.storm.generated.NotAliveException; +import org.apache.storm.generated.TopologyInfo; +import org.apache.storm.generated.TopologySummary; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.generated.KillOptions; +import org.apache.storm.thrift.TException; +import org.apache.storm.thrift.transport.TTransportException; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.NimbusClient; +import org.apache.storm.utils.Utils; import ch.epfl.data.squall.storage.BasicStore; import ch.epfl.data.squall.query_plans.QueryBuilder; @@ -89,7 +89,7 @@ private static Client getNimbusStub(Map conf) { try { Map securityMap = new HashMap(); securityMap.put("storm.thrift.transport", - "backtype.storm.security.auth.SimpleTransportPlugin"); + "org.apache.storm.security.auth.SimpleTransportPlugin"); nimbus = new NimbusClient(securityMap, nimbusHost, nimbusThriftPort); } catch (TTransportException e) { @@ -365,12 +365,12 @@ public static void writeStormStats(Map conf) { final String strStats = sb.toString(); LOG.info(strStats); - } catch (final TException ex) { - LOG.info("writeStats:" + MyUtilities.getStackTrace(ex)); } catch (final NotAliveException ex) { LOG.info(MyUtilities.getStackTrace(ex)); + } catch (final TException ex) { + LOG.info("writeStats:" + MyUtilities.getStackTrace(ex)); } - } + } public static ClusterSummary getClusterInfo(boolean local, Map conf) { if (local) { diff --git a/squall-core/src/main/java/ch/epfl/data/squall/utilities/SystemParameters.java b/squall-core/src/main/java/ch/epfl/data/squall/utilities/SystemParameters.java index d821a783..e141f2a3 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/utilities/SystemParameters.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/utilities/SystemParameters.java @@ -27,8 +27,8 @@ import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.utils.Utils; +import org.apache.storm.Config; +import org.apache.storm.utils.Utils; public class SystemParameters { // histogram types for building the partitioning scheme in our algorithm diff --git a/squall-core/src/main/java/ch/epfl/data/squall/utilities/thetajoin/dynamic/ThetaJoinUtilities.java b/squall-core/src/main/java/ch/epfl/data/squall/utilities/thetajoin/dynamic/ThetaJoinUtilities.java index 4bdea81f..4b67275c 100755 --- a/squall-core/src/main/java/ch/epfl/data/squall/utilities/thetajoin/dynamic/ThetaJoinUtilities.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/utilities/thetajoin/dynamic/ThetaJoinUtilities.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; -import backtype.storm.tuple.Tuple; +import org.apache.storm.tuple.Tuple; public class ThetaJoinUtilities { public static boolean assertDimensions(String inputDim, String originalDim, diff --git a/squall-core/src/main/java/ch/epfl/data/squall/window_semantics/WindowSemanticsManager.java b/squall-core/src/main/java/ch/epfl/data/squall/window_semantics/WindowSemanticsManager.java index 321e0dd4..1a8c8820 100644 --- a/squall-core/src/main/java/ch/epfl/data/squall/window_semantics/WindowSemanticsManager.java +++ b/squall-core/src/main/java/ch/epfl/data/squall/window_semantics/WindowSemanticsManager.java @@ -21,7 +21,7 @@ import java.util.List; -import backtype.storm.tuple.Tuple; +import org.apache.storm.tuple.Tuple; import ch.epfl.data.squall.storm_components.StormBoltComponent; import ch.epfl.data.squall.storm_components.StormComponent; import ch.epfl.data.squall.utilities.MyUtilities; diff --git a/squall-core/src/test/resources/log4j2-test.xml b/squall-core/src/test/resources/log4j2-test.xml new file mode 100644 index 00000000..888df462 --- /dev/null +++ b/squall-core/src/test/resources/log4j2-test.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + diff --git a/squall-core/src/test/resources/logback-test.xml b/squall-core/src/test/resources/logback-test.xml deleted file mode 100644 index 86e02c67..00000000 --- a/squall-core/src/test/resources/logback-test.xml +++ /dev/null @@ -1,12 +0,0 @@ - - - - - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n - - - - - - - diff --git a/squall-core/src/test/scala/TestSuite.scala b/squall-core/src/test/scala/TestSuite.scala index 61dff412..3c93287b 100644 --- a/squall-core/src/test/scala/TestSuite.scala +++ b/squall-core/src/test/scala/TestSuite.scala @@ -20,16 +20,16 @@ package ch.epfl.data.squall.test import ch.epfl.data.squall.main.Main -import ch.epfl.data.squall.query_plans.QueryBuilder import ch.epfl.data.squall.storage.{BasicStore, KeyValueStore} -import ch.epfl.data.squall.utilities.{LocalMergeResults, StormWrapper, SystemParameters, SquallContext} -import ch.qos.logback.classic.{Logger, LoggerContext} -import ch.qos.logback.classic.encoder.PatternLayoutEncoder -import ch.qos.logback.classic.spi.ILoggingEvent -import ch.qos.logback.core.FileAppender +import ch.epfl.data.squall.utilities.{LocalMergeResults, SquallContext, StormWrapper, SystemParameters} import java.io._ + +import org.apache.logging.log4j.LogManager +import org.apache.logging.log4j.core.{Appender, Logger, LoggerContext} +import org.apache.logging.log4j.core.appender.FileAppender +import org.apache.logging.log4j.core.config.Configuration +import org.apache.logging.log4j.core.layout.PatternLayout import org.scalatest._ -import org.slf4j.LoggerFactory class TestSuite extends FunSuite with BeforeAndAfterAll { @@ -57,43 +57,48 @@ class TestSuite extends FunSuite with BeforeAndAfterAll { } object Logging { - var fileAppender: FileAppender[ILoggingEvent] = null; - var logbackLogger: Logger = null; + var fileAppender: FileAppender = _ + var log4j2Logger: Logger = _ def beginLog(confName: String) = { - // http://stackoverflow.com/questions/7824620/logback-set-log-file-name-programatically - val loggerContext: LoggerContext = LoggerFactory.getILoggerFactory().asInstanceOf[LoggerContext] - logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) + val loggerContext: LoggerContext = LogManager.getContext(false).asInstanceOf[LoggerContext] - val encoder: PatternLayoutEncoder = new PatternLayoutEncoder() - encoder.setContext(loggerContext) - encoder.setPattern("%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n") - encoder.start() + log4j2Logger = loggerContext.getLogger(LogManager.ROOT_LOGGER_NAME) + val configuration: Configuration = loggerContext.getConfiguration - val verbosity = System.getenv("SQUALL_LOG_VERBOSE") + val verbosity = Option(System.getenv("SQUALL_LOG_VERBOSE")).getOrElse("FALSE") if (verbosity != "TRUE") { - logbackLogger.detachAppender("STDOUT") + val appender: Appender = log4j2Logger.getAppenders.get("STDOUT") + if (appender != null) { + log4j2Logger.removeAppender(appender) + } } - fileAppender = new FileAppender() - fileAppender.setContext(loggerContext) - fileAppender.setName(confName) - // set the file name val tempFile = File.createTempFile(confName, ".log") - println("\tWriting test output to " + tempFile.getAbsolutePath()) - fileAppender.setFile(tempFile.getAbsolutePath()) - fileAppender.setEncoder(encoder) + println("\tWriting test output to " + tempFile.getAbsolutePath) + + val layout: PatternLayout = PatternLayout + .newBuilder() + .withConfiguration(configuration) + .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n") + .build() + + fileAppender = FileAppender.createAppender(tempFile.getAbsolutePath, "false", "false", + confName, "true", "true", "true", "8192", layout, null, "true", null, + configuration) fileAppender.start() - logbackLogger.addAppender(fileAppender) + log4j2Logger.addAppender(fileAppender) - // OPTIONAL: print logback internal status messages - // StatusPrinter.print(loggerContext) + loggerContext.updateLoggers() } def endLog() = { - logbackLogger.detachAppender(fileAppender) + val loggerContext: LoggerContext = LogManager.getContext(false).asInstanceOf[LoggerContext] + + log4j2Logger.removeAppender(fileAppender) fileAppender.stop() + loggerContext.updateLoggers() } } @@ -104,7 +109,7 @@ class TestSuite extends FunSuite with BeforeAndAfterAll { val queryPlan = Main.chooseQueryPlan(conf) SystemParameters.putInMap(conf, "DIP_TOPOLOGY_NAME", confName) - val context = new SquallContext(conf); + val context = new SquallContext(conf) val builder = queryPlan.createTopology(context) @@ -123,7 +128,7 @@ class TestSuite extends FunSuite with BeforeAndAfterAll { val queryPlan = parser.generatePlan(conf) parser.putAckers(queryPlan, conf) SystemParameters.putInMap(conf, "DIP_TOPOLOGY_NAME", confName) - val context = new SquallContext(conf); + val context = new SquallContext(conf) val builder = queryPlan.createTopology(context) val result = StormWrapper.localSubmitAndWait(context, queryPlan) diff --git a/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/REPL.scala b/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/REPL.scala index 9939b973..ee4337af 100644 --- a/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/REPL.scala +++ b/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/REPL.scala @@ -24,8 +24,8 @@ import ch.epfl.data.squall.api.scala.Stream._ import ch.epfl.data.squall.api.scala.TPCHSchema._ import ch.epfl.data.squall.query_plans.QueryBuilder import ch.epfl.data.squall.utilities.SquallContext -import ch.qos.logback.classic.{Level, Logger, LoggerContext} -import org.slf4j.LoggerFactory +import org.apache.logging.log4j.core.LoggerContext +import org.apache.logging.log4j.{Level, LogManager} import scala.collection.JavaConversions._ @@ -75,21 +75,10 @@ Type "help" for Squall related help var count = 0 def prepareSubmit(): String = { if (context.isDistributed()) { - val jar = packClasses() - ////// Here comes the ugly part. We have to trick Storm, as we are doing ////// things that are not really standard. - //// TODO: HACK FOR STORM 0.9.3, if we ever go to 0.9.4 this won't be necessary (I think) - // In storm 0.9.3 once one jar is submitted, no other jar can be submitted - // as it assumes that it has already been submitted. - // We can use Java reflection to hack into StormSubmitter and "reset" it, - // so we can use submit multiple topologies during one run. - import backtype.storm.StormSubmitter - import java.lang.reflect.Field - val f : Field = (new StormSubmitter()).getClass().getDeclaredField("submittedJar"); - f.setAccessible(true); - f.set(new StormSubmitter(), null); + val jar = packClasses() // Now we have to trick storm into thinking we launched with the storm // script. This is easier! @@ -117,13 +106,17 @@ Type "help" for Squall related help context.submit(tpname, plan) } - private val loggerContext: LoggerContext = LoggerFactory.getILoggerFactory().asInstanceOf[LoggerContext] + private val loggerContext: LoggerContext = LogManager.getContext(false).asInstanceOf[LoggerContext] def activateLogging() = { - loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).setLevel(Level.INFO) + val loggerConfig = loggerContext.getConfiguration().getLoggerConfig(LogManager.ROOT_LOGGER_NAME); + loggerConfig.setLevel(Level.INFO) + loggerContext.updateLoggers() } def stopLogging() = { - loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).setLevel(Level.OFF) + val loggerConfig = loggerContext.getConfiguration().getLoggerConfig(LogManager.ROOT_LOGGER_NAME); + loggerConfig.setLevel(Level.OFF) + loggerContext.updateLoggers() } diff --git a/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/operators/predicates/ScalaPredicate.scala b/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/operators/predicates/ScalaPredicate.scala index 871e6db6..9fb4f98b 100644 --- a/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/operators/predicates/ScalaPredicate.scala +++ b/squall-functional/src/main/scala/ch/epfl/data/squall/api/scala/operators/predicates/ScalaPredicate.scala @@ -23,7 +23,7 @@ import ch.epfl.data.squall.predicates.Predicate import ch.epfl.data.squall.visitors.PredicateVisitor import scala.collection.JavaConverters._ import ch.epfl.data.squall.api.scala.SquallType._ -import backtype.storm.clojure.TupleValues +import org.apache.storm.clojure.TupleValues class ScalaPredicate[T: SquallType](fn: T => Boolean) extends Predicate { @@ -37,11 +37,11 @@ class ScalaPredicate[T: SquallType](fn: T => Boolean) extends Predicate { def test(tupleValues: java.util.List[String]): Boolean = { val squalType: SquallType[T] = implicitly[SquallType[T]] - //val x=seqAsJavaListConverter[String](tupleValues) - //println("At selection tuples are: "+tupleValues) + //val x=seqAsJavaListConverter[String](tupleValues) + //println("At selection tuples are: "+tupleValues) val scalaList = tupleValues.asScala.toList val squallTuple = squalType.convertBack(scalaList) - //println("The tuple is: "+squallTuple) + //println("The tuple is: "+squallTuple) val res = fn(squallTuple) res } diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/DistributionSignalSpout.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/DistributionSignalSpout.java index 33eaa59d..4cea747f 100644 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/DistributionSignalSpout.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/DistributionSignalSpout.java @@ -5,11 +5,11 @@ import org.apache.log4j.Logger; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.utils.Utils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.utils.Utils; import ch.epfl.data.squall.components.signal_components.storm.SignalClient; public class DistributionSignalSpout extends BaseRichSpout { diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/HarmonizerSignalSpout.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/HarmonizerSignalSpout.java index d16ea24c..39347142 100644 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/HarmonizerSignalSpout.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/HarmonizerSignalSpout.java @@ -15,10 +15,10 @@ import org.apache.log4j.Logger; import scala.Array; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.utils.Utils; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.utils.Utils; import ch.epfl.data.squall.components.signal_components.storm.BaseSignalSpout; import ch.epfl.data.squall.components.signal_components.storm.SignalClient; diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SignaledDataSourceComponent.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SignaledDataSourceComponent.java index a4992a33..a4beeca1 100644 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SignaledDataSourceComponent.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SignaledDataSourceComponent.java @@ -26,8 +26,8 @@ import org.apache.commons.lang.ArrayUtils; import org.apache.log4j.Logger; -import backtype.storm.Config; -import backtype.storm.topology.TopologyBuilder; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; import ch.epfl.data.squall.components.Component; import ch.epfl.data.squall.components.DataSourceComponent; import ch.epfl.data.squall.expressions.ValueExpression; diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/StormSynchronizedSpoutComponent.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/StormSynchronizedSpoutComponent.java index d57a6d61..88f86d3b 100644 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/StormSynchronizedSpoutComponent.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/StormSynchronizedSpoutComponent.java @@ -25,12 +25,12 @@ import org.apache.log4j.Logger; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.components.signal_components.storm.BaseSignalSpout; import ch.epfl.data.squall.ewh.operators.SampleAsideAndForwardOperator; diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SynchronizedStormDataSource.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SynchronizedStormDataSource.java index 8bc62420..b3e95346 100644 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SynchronizedStormDataSource.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/SynchronizedStormDataSource.java @@ -36,12 +36,12 @@ import com.esotericsoftware.minlog.Log; -import backtype.storm.Config; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import ch.epfl.data.squall.components.ComponentProperties; import ch.epfl.data.squall.components.signal_components.storm.SignalClient; import ch.epfl.data.squall.operators.ChainOperator; diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalBolt.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalBolt.java index 2c0a2a32..5e68f98e 100755 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalBolt.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalBolt.java @@ -7,9 +7,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichBolt; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseRichBolt; @SuppressWarnings("serial") public abstract class BaseSignalBolt extends BaseRichBolt implements diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalSpout.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalSpout.java index 639f63b3..260d0405 100755 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalSpout.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/BaseSignalSpout.java @@ -7,9 +7,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.base.BaseRichSpout; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.base.BaseRichSpout; @SuppressWarnings("serial") public abstract class BaseSignalSpout extends BaseRichSpout implements diff --git a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/StormSignalConnection.java b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/StormSignalConnection.java index cf327d24..9721e39a 100755 --- a/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/StormSignalConnection.java +++ b/squall-signals/src/main/java/ch/epfl/data/squall/components/signal_components/storm/StormSignalConnection.java @@ -9,7 +9,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.utils.Utils; +import org.apache.storm.utils.Utils; public class StormSignalConnection extends AbstractSignalConnection { private static final Logger LOG = LoggerFactory