-
Notifications
You must be signed in to change notification settings - Fork 96
Squall REPL
The functional frontend provides a Scala REPL (read-eval-print loop) that allows to interactively construct query plans. The REPL is based on the sbt console mode, and can be started from the main Squall directory, where build.sbt
is placed, with sbt "functional\console"
on Windows or sbt functional/console
on Linux/OSX.
This will launch a console and automatically import and setup Squall. After compiling and packaging, you should see something like this:
[info] Starting scala interpreter...
[info]
____ ___ _ _ _ _ _
/ ___| / _ \| | | | / \ | | | |
\___ \| | | | | | |/ _ \ | | | |
___) | |_| | |_| / ___ \| |___| |___
|____/ \__\_\\___/_/ \_|_____|_____|
Type "help" for Squall related help
import ch.epfl.data.squall.query_plans.QueryBuilder
import ch.epfl.data.squall.api.scala.SquallType._
import ch.epfl.data.squall.api.scala.Stream._
import ch.epfl.data.squall.api.scala.TPCHSchema._
REPL: ch.epfl.data.squall.api.scala.REPL = ch.epfl.data.squall.api.scala.REPL@5bcdcffb
import REPL._
Welcome to Scala version 2.11.6 (OpenJDK 64-Bit Server VM, Java 1.7.0_79).
Type in expressions to have them evaluated.
Type :help for more information.
scala>
As an example, we can type the Hyracks Query and submit it.
scala> val customers = Source[Customer]("customer").map { t => (t.custkey, t.mktsegment) }
customers: ch.epfl.data.squall.api.scala.Stream.Stream[(Int, String)] = MappedStream(Source(customer),<function1>)
scala> val orders = Source[Orders]("orders").map { _.custkey }
orders: ch.epfl.data.squall.api.scala.Stream.Stream[Int] = MappedStream(Source(orders),<function1>)
scala> val join = (customers join orders)(_._1)(x => x)
join: ch.epfl.data.squall.api.scala.Stream.JoinStream[((Int, String), Int)] = JoinedStream(MappedStream(Source(customer),<function1>),MappedStream(Source(orders),<function1>),<function1>,<function1>)
scala> val agg = join.groupByKey(x => 1, _._1._2)
agg: ch.epfl.data.squall.api.scala.Stream.TailStream[((Int, String), Int),String,Int] = GroupedStream(JoinedStream(MappedStream(Source(customer),<function1>),MappedStream(Source(orders),<function1>),<function1>,<function1>),<function1>,<function1>)
scala> val plan = agg.execute(conf)
plan: ch.epfl.data.squall.query_plans.QueryBuilder = ch.epfl.data.squall.query_plans.QueryBuilder@3450e6b9
To submit the plan in local mode, you can use submitLocal(plan)
. This will return a Map that will be filled with the results concurrently.
scala> submitLocal(plan)
res1: java.util.Map[String,String] = {}
As you can see, it is empty at the beginning. If you wait a few seconds you can start seeing the results.
scala> res1
res2: java.util.Map[String,String] = {MACHINERY=2536, AUTOMOBILE=2979, HOUSEHOLD=2772, BUILDING=3706, FURNITURE=3007}
To submit a plan in distributed mode, you need to set the mode before defining it by calling context.setDistributed
After creating the plan, it can be submitted with submitDistributed(plan)
. This will use the configuration in ~/.storm/storm.yaml
to connect to the cluster and submit the resulting topology.
The variable context
contains the current SquallContext. This context can be used for managing the queries running either in local mode or in a cluster. The interesting methods are context.getQueries()
, context.getQueryStatus(queryname)
, and context.killQuery(queryname)
.
Here is an example:
scala> context.getQueries()
res9: java.util.List[String] = [repl_2_933602501, username_0_01G_tpch4, repl_1_2136117608, repl_1_-419033134]
scala> context.getQueryStatus("repl_2_933602501")
res10: String = ACTIVE
scala> context.killQuery("repl_2_933602501")
scala> context.getQueryStatus("repl_2_933602501")
res12: String = KILLED
Here, we give an interactive example on how to use Squall using REPL. The example demonstrates a simple word count program on a stream of twitter tweets.
By default, logging is disabled for the REPL. In many cases it is interesting to activate it for debugging, and this can be done by calling activateLogging
. It can be deactivated again by calling stopLogging
.
The REPL is not limited to the functional frontend: you can import and use the imperative interface to build the plan and submit it in the same way.