Skip to content
Mohammed ElSeidy edited this page Jun 30, 2016 · 5 revisions

The functional frontend provides a Scala REPL (read-eval-print loop) that allows to interactively construct query plans. The REPL is based on the sbt console mode, and can be started from the main Squall directory, where build.sbt is placed, with sbt "functional\console" on Windows or sbt functional/console on Linux/OSX.

This will launch a console and automatically import and setup Squall. After compiling and packaging, you should see something like this:

[info] Starting scala interpreter...
[info] 

 ____   ___  _   _   _    _     _
/ ___| / _ \| | | | / \  | |   | |
\___ \| | | | | | |/ _ \ | |   | |
 ___) | |_| | |_| / ___ \| |___| |___
|____/ \__\_\\___/_/   \_|_____|_____|

Type "help" for Squall related help

import ch.epfl.data.squall.query_plans.QueryBuilder
import ch.epfl.data.squall.api.scala.SquallType._
import ch.epfl.data.squall.api.scala.Stream._
import ch.epfl.data.squall.api.scala.TPCHSchema._
REPL: ch.epfl.data.squall.api.scala.REPL = ch.epfl.data.squall.api.scala.REPL@5bcdcffb
import REPL._
Welcome to Scala version 2.11.6 (OpenJDK 64-Bit Server VM, Java 1.7.0_79).
Type in expressions to have them evaluated.
Type :help for more information.

scala>

As an example, we can type the Hyracks Query and submit it.

scala> val customers = Source[Customer]("customer").map { t => (t.custkey, t.mktsegment) }
customers: ch.epfl.data.squall.api.scala.Stream.Stream[(Int, String)] = MappedStream(Source(customer),<function1>)

scala> val orders = Source[Orders]("orders").map { _.custkey }
orders: ch.epfl.data.squall.api.scala.Stream.Stream[Int] = MappedStream(Source(orders),<function1>)

scala> val join = (customers join orders)(_._1)(x => x)
join: ch.epfl.data.squall.api.scala.Stream.JoinStream[((Int, String), Int)] = JoinedStream(MappedStream(Source(customer),<function1>),MappedStream(Source(orders),<function1>),<function1>,<function1>)

scala> val agg = join.groupByKey(x => 1, _._1._2)
agg: ch.epfl.data.squall.api.scala.Stream.TailStream[((Int, String), Int),String,Int] = GroupedStream(JoinedStream(MappedStream(Source(customer),<function1>),MappedStream(Source(orders),<function1>),<function1>,<function1>),<function1>,<function1>)

scala> val plan = agg.execute(conf)
plan: ch.epfl.data.squall.query_plans.QueryBuilder = ch.epfl.data.squall.query_plans.QueryBuilder@3450e6b9

Submitting

Local mode

To submit the plan in local mode, you can use submitLocal(plan). This will return a Map that will be filled with the results concurrently.

scala> submitLocal(plan)
res1: java.util.Map[String,String] = {}

As you can see, it is empty at the beginning. If you wait a few seconds you can start seeing the results.

scala> res1
res2: java.util.Map[String,String] = {MACHINERY=2536, AUTOMOBILE=2979, HOUSEHOLD=2772, BUILDING=3706, FURNITURE=3007}

Distributed mode

To submit a plan in distributed mode, you need to set the mode before defining it by calling context.setDistributed

After creating the plan, it can be submitted with submitDistributed(plan). This will use the configuration in ~/.storm/storm.yaml to connect to the cluster and submit the resulting topology.

Managing queries

The variable context contains the current SquallContext. This context can be used for managing the queries running either in local mode or in a cluster. The interesting methods are context.getQueries(), context.getQueryStatus(queryname), and context.killQuery(queryname).

Here is an example:

scala> context.getQueries()
res9: java.util.List[String] = [repl_2_933602501, username_0_01G_tpch4, repl_1_2136117608, repl_1_-419033134]

scala> context.getQueryStatus("repl_2_933602501")
res10: String = ACTIVE

scala> context.killQuery("repl_2_933602501")

scala> context.getQueryStatus("repl_2_933602501")
res12: String = KILLED

Running a Twitter query on REPL

Here, we give an interactive example on how to use Squall using REPL. The example demonstrates a simple word count program on a stream of twitter tweets.

Logging

By default, logging is disabled for the REPL. In many cases it is interesting to activate it for debugging, and this can be done by calling activateLogging. It can be deactivated again by calling stopLogging.

Imperative interface

The REPL is not limited to the functional frontend: you can import and use the imperative interface to build the plan and submit it in the same way.