-
Notifications
You must be signed in to change notification settings - Fork 96
Imperative Squall interface
Squall translates SQL queries to query plans. You can also write Squall plans by hand. Squall already contains several manually-specified query plans: TPC-H Q3, Q4, Q5, Q7, Q8, Q9 and Q10.
As usual, testing code in local mode is way better than in a cluster environment. Namely, in local mode all the errors go to the standard output/error, whereas in cluster mode one has to search for errors across many nodes. The same holds for presenting the final result.
Prerequisite: We assume that you already set up Squall such that it works with SQL queries in Local Mode.
We will demonstrate running Squall on manually-specified query plans on hyracks (in source code: squall-examples/squall-java-examples/src/ch/epfl/data/squall/examples/imperative/shj/HyracksPlan.java
). We will use test/squall_plan_runner/confs/local/0_01G_hyracks
.
You can run Squall in Local Mode:
cd bin
./squall_local.sh PLAN_RUNNER ..test/squall_plan_runner/confs/local/0_01G_hyracks
Now we will talk about an output of a Squall/Storm run. In Local mode, all
the output goes to console. First, Storm produces some output. You can safely
ignore messages of the form Task 0_01G_hyracks-1-1333023576:1 timed out
.
This kind of messages always occurs when a task is started.
Then you will see something like:
... TopologyKiller: Received EOF message from: 2
... TopologyKiller: 1 remaining
... TopologyKiller: Received EOF message from: 3
... TopologyKiller: 0 remaining
... TopologyKiller: Received EOF from all spouts. Killing cluster...
This shows the information about Spouts that have finished processing their input. When all Spouts are done, Squall/Storm produces the final result. In this case, it is:
All the tasks of the last component in total received 15000 tuples.
FURNITURE = 3007
AUTOMOBILE = 2979
MACHINERY = 2536
BUILDING = 3706
HOUSEHOLD = 2772
Iteration refers to the number of tuples the last component (named CUSTOMER_ORDERS
) received.
In test/squall_plan_runner/confs
you can find more examples of config files. In order to run Squall in Local Mode with some other config file, you must set
DIP_DATA_PATH
for that config file such that it points to a database of the
appropriate size on your machine and then run the following commands:
cd bin
./squall_local.sh PLAN_RUNNER $CONFIG_FILE_PATH
where $CONFIG_FILE_PATH is an absolute or relative path to a config file.
The corresponding SQL for hyracks query is as follows:
SELECT C_MKTSEGMENT, COUNT(O_ORDERKEY)
FROM CUSTOMER join ORDERS on C_CUSTKEY = O_CUSTKEY
GROUP BY C_MKTSEGMENT
For the SQL query we showed above, the corresponding query plan is presented below (this
is actually a part of squall-examples/squall-java-examples/src/ch/epfl/data/squall/examples/imperative/shj/HyracksPlan.java
class):
Component customer = new DataSourceComponent("customer", conf)
.add(new ProjectOperator(0, 6));
Component orders = new DataSourceComponent("orders", conf)
.add(new ProjectOperator(1));
Component custOrders = new EquiJoinComponent(customer, 0, orders, 0)
.add(new AggregateCountOperator(conf).setGroupByColumns(1));
You need not to specify schema if you are writing query plans manually, because the system refer to a column using an absolute position (index) within a tuple.
Note that we do not have to perform any projections on the CUSTOMER_ORDERS
join component. The output tuple does not contain CUSTKEY from the right parent, since it is already included (with the same value) from the left parent. A hash from a parent relation (CUSTOMER, ORDERS) refers to a position(s) in a tuple after projection is performed. The hash denotes the columns from a tuple that are join keys in the join component. You can find more query plan examples in the package plan_runner.query_plans
. Their corresponding config files can be found in test/squall_plan_runner/confs
.
Now we discuss parameters inside a config file. Once more, we will use 0.01G_hyracks_serial
for illustration:
DIP_DISTRIBUTED false
DIP_QUERY_NAME hyracks
DIP_TOPOLOGY_NAME_PREFIX username
DIP_NUM_ACKERS 0
DIP_DATA_PATH ../test/data/tpch/0.01G/
DIP_RESULT_ROOT ../test/results/
CUSTOMER_PAR 1
ORDERS_PAR 1
CUSTOMER_ORDERS_PAR 1
#below are unlikely to change
DIP_EXTENSION .tbl
DIP_READ_SPLIT_DELIMITER \|
DIP_GLOBAL_ADD_DELIMITER |
DIP_GLOBAL_SPLIT_DELIMITER \|
DIP_KILL_AT_THE_END true
# Storage manager parameters
# Storage directory for local runs
STORAGE_LOCAL_DIR /tmp/ramdisk
# Storage directory for cluster runs
STORAGE_DIP_DIR /export/home/squalldata/storage
STORAGE_COLD_START true
MEMORY_SIZE_MB 4096
DIP_DISTRIBUTED
must be false to execute the query plan in Local mode.
DIP_QUERY_NAME
must be case-insensitive equal to a query plan from plan_runner.query_plans
package, without the “Plan” suffix at the end. For example, here we have a query
named “hyracks” which targets HyracksPlan
class. Topology name is built by
concatenation of DIP_TOPOLOGY_NAME_PREFIX
and config file name.
DIP_TOPOLOGY_NAME_PREFIX
is there to distinguish different users, but it must
be set only in Cluster Mode.
DIP_NUM_ACKERS
represents the number of nodes used for ensuring that
each tuple is fully propagated, so the final result and the full execution time can be acquired. If a positive value is set, we ack each and every tuple. If you set this parameter to 0, each Spout sends a special message as the last tuple. For more information about implications of this parameter, please consult Squall query plans vs Storm topologies, section To ack or not to ack?. DIP_DATA_PATH
points to a location of your database.
The parallelism of a component is denoted through COMPONENT_NAME_PAR
.
Note the convention for naming an EquiJoinComponent, it should have the following
form: LEFTPARENT_RIGHTPARENT_PAR
. Due to the constrained main memory, you cannot run arbitrary large database with small component parallelism. For information on detecting this behavior, please consult Squall query plans vs Storm topologies, section How to know we run out of memory?.
Now we explain the parameters you most likely would not need to change;
DIP_EXTENSION
refers to file extension in your database. In our case, the names
of the database files were customer.tbl
, orders.tbl
, etc.
DIP_READ_SPLIT_DELIMITER
is a regular expression used for delimiting columns
of a tuple in a database file. DIP_GLOBAL_ADD_DELIMITER
and DIP_GLOBAL_SPLIT_DELIMITER
are used in
Squall internally for serializing and deserializing tuples between different components. DIP_KILL_AT_THE_END
assures your topology is killed after the final
result is written to a file. If you set this to false, your topology will execute
forever, consuming resources that could be used by other topologies executing
at the same time.
Other than for reading logs, you do not need to ssh to any of the cluster ma- chines.
Prerequisite: We assume that you already set up Squall such that it works with SQL queries in Cluster Mode.
We will illustrate the procedure with test/squall_plan_runner/confs/cluster/1G_hyracks
config file. You have to set DIP_DATA_PATH
such that it points to a 1-scalling factor TPC-H database on the cluster.
You can run Squall with the following commands:
cd bin
./squall_cluster.sh PLAN_RUNNER ../test/squall_plan_runner/confs/cluster/1G_hyracks
You can run Squall in Cluster Mode with some other config file:
cd bin
./squall_cluster.sh PLAN_RUNNER $CONFIG_FILE_PATH
where CONFIG_FILE_PATH
is a path to a config file. You can also write config files from scratch. Keep in mind that for any config file you are going to use which use different database size, you have to specify DIP_DATA_PATH
properly.
The config file test/squall_plan_runner/confs/cluster/1G_hyracks
) is presented here:
DIP_DISTRIBUTED true
DIP_QUERY_NAME hyracks
DIP_TOPOLOGY_NAME_PREFIX username
# the following two are optional, by default they use topology.workers and topology.ackers from storm.yaml
#DIP_NUM_WORKERS 176
#DIP_NUM_ACKERS 0
DIP_DATA_PATH /export/home/squalldata/tpchdb/1G/
CUSTOMER_PAR 8
ORDERS_PAR 8
CUSTOMER_ORDERS_PAR 8
#below are unlikely to change
DIP_EXTENSION .tbl
DIP_READ_SPLIT_DELIMITER \|
DIP_GLOBAL_ADD_DELIMITER |
DIP_GLOBAL_SPLIT_DELIMITER \|
DIP_KILL_AT_THE_END true
# Storage manager parameters
# Storage directory for local runs
STORAGE_LOCAL_DIR /tmp/ramdisk
# Storage directory for cluster runs
STORAGE_DIP_DIR /export/home/squalldata/storage
STORAGE_COLD_START true
MEMORY_SIZE_MB 4096
Now we explain the parameters which differs from their counterparts in Local
Mode. DIP_DISTRIBUTED
is set to true. The DIP_NUM_PARALLELISM
reflects the
maximum number of physical nodes you want to use. You cannot use more than
what the cluster offers. If your topology requires more nodes than are available through this parameter, some of the tasks will be collocated in a single node. More information about that can be found in Squall query plans vs Storm topologies, section Assigning Storm components to Nodes.
Due to the constrained main memory, you cannot run arbitrary large database with small component parallelism. For information on detecting this behavior, please consult Squall query plans vs Storm topologies, section How to know we run out of memory?.
In Cluster Mode, DIP_NUM_ACKERS
is commented out, because the default value (0) is set in storm.yaml. For more information about implications of this parameter, please consult Squall query plans vs Storm topologies, section To ack or not to ack?. DIP_NUM_WORKERS
represents number of worker processes across the cluster. We didn't say number of worker nodes, but processes, because there might be multiple Storm processes on each node. It directly corresponds to topology.workers
from storm.yaml, and it takes its default value from there.
When you run Squall in Cluster Mode, it will return as soon as the topology is submitted to the cluster. You can monitor the execution of your topology at http://STORM_UI_SERVER:8080. Here you can find various information such as information about active topologies and the number of tuples sent between Spouts and Bolts. Unfortunately, you can monitor your topology only in Cluster Mode.
Your topology will be killed after the final result is produced. You can also kill it explicitly:
storm kill myTopologyName
Now we are using 1GB database, so the correct result is:
The result for topology `username_1G_hyracks`
Component COUNTAGG:
Iteration 1500000:
FURNITURE = 299461
BUILDING = 303959
MACHINERY = 298980
HOUSEHOLD = 300147
AUTOMOBILE = 297453