Skip to content

Commit

Permalink
Merge pull request #674 from zml1206/master
Browse files Browse the repository at this point in the history
Add mlsql-stream english doc
  • Loading branch information
allwefantasy authored Nov 16, 2018
2 parents 0e614f8 + 2fee313 commit 5d4232a
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 41 deletions.
68 changes: 48 additions & 20 deletions docs/en/mlsql-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,65 +157,93 @@ In real world you can load kafka source like this:

```sql

-- if you are using kafka 1.0
load kafka.`pi-content-realtime-db` options
-- if you are using kafka 0.10
load kafka.`pi-content-realtime-db` options
`kafka.bootstrap.servers`="---"
and `subscribe`="---"
as kafka_post_parquet;

-- if you are using kafka 0.8.0/0.9.0
load kafka9.`pi-content-realtime-db` options
load kafka8.`pi-content-realtime-db` options
`kafka.bootstrap.servers`="---"
and `topics`="---"
as kafka_post_parquet;

```

If you want to save data with dynamic partition:

```sql

save append table
as parquet.`table`
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/ckl1"
partitionBy partition_field;
```

If you want to save data with static partition:

```sql

save append post_parquet
as newParquet.`/table1/hp_stat_date=${date.toString("yyyy-MM-dd")}`
options mode="Append"
and duration="30"
save append post_parquet
as parquet.`/table1/hp_stat_date=${pathDate.toString("yyyy-MM-dd")}`
options mode="Append"
and duration="30"
and checkpointLocation="/tmp/ckl1";
```

If you want to save data to MySQL:

```sql

-- connect mysql as the data sink.
connect jdbc where
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:3306/wow"
and driver="com.mysql.jdbc.Driver"
and user="---"
and password="----"
as mysql1;
-- Save the data to MYSQL
save append table21
as NONE.`mysql1.test1`
options mode="Complete"
and implClass="org.apache.spark.sql.execution.streaming.JDBCSinkProvider"
-- executed in driver
save append table21
as jdbc.`mysql1.test1`
options mode="append"
and `driver-statement-0`="create table test1 if not exists........."
-- executed in executor
and `statement-0`="insert into wow.test1(k,c) values(?,?)"
and `statement-0`="replace into wow.test1(k,c) values(?,?)"
-- one batch duration
and duration="3"
and checkpointLocation="/tmp/cpl3";
```

If you want to add watermark for a table:
If you want to use window with watermark for a table:

```sql

select ..... as table1;
select ts,f1 as table1;

-- register watermark for table1
register WaterMarkInPlace.`table1` as tmp1
options eventTimeCol="ts"
and delayThreshold="1 seconds";
and delayThreshold="10 seconds";

-- process table1
select count(*) as num from table1
group by window(ts,"30 minutes","10 seconds")
select f1,count(*) as num from table1
-- window size is 30 minutes,slide is 10 seconds
group by f1, window(ts,"30 minutes","10 seconds")
as table2;

save append ......
save append table2 as ....
```

Streaming has three modes:

Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage. Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

## How to manager your stream jobs?

Expand Down
151 changes: 130 additions & 21 deletions docs/mlsql-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,146 @@ StreamingPro现在也支持用XQL定义流式计算了

post 参数名称为:sql

具体脚本内容为:
下面脚本是个自定义数据源的例子输出console的例子,具体脚本内容为:

```
-- 设置该流式任务的名字。这个名字需要保持全局唯一。
set streamName="streamExample"
```sql
-- mock some data.
set data='''
{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0}
''';
-- load data as table
load jsonStr.`data` as datasource;
-- convert table as stream source
load mockStream.`datasource` options
stepSizeRange="0-3"
as newkafkatable1;
-- 加载kafka数据, 如果是0.8,0.9 那么分别使用kafka8 kafka9。如果是1.0 则
-- 直接使用kafka即可。指的注意是参数遵循原有spark规范,比如1.0参数你需要查看spark
-- 相关文档
-- aggregation
select cast(key as string) as k,count(*) as c from newkafkatable1 group by key
as table21;
load kafka9.`` options `kafka.bootstrap.servers`="127.0.0.1:9092"
and `topics`="testM"
-- watermark的设置
-- and eventTimeCol="" and delayThreshold=""
as newkafkatable1;
-- output the the result to console.
save append table21
as console.``
options mode="Complete"
and duration="10"
and checkpointLocation="/tmp/cpl3";
```

-- 简单获得该周期的所有卡夫卡数据,不做任何处理。
select * from newkafkatable1
as table21;
提交该任务后,日志可以看到输出:
```
-------------------------------------------
Batch: 6
-------------------------------------------
+-------+-------+-------+--------------------+
|column1|column2|column3| kafkaValue|
+-------+-------+-------+--------------------+
| m| m| m|[yes, 0, 5, 2008-...|
+-------+-------+-------+--------------------+
```

你可以以kafka为数据源,脚本如下:

```sql

-- if you are using kafka 0.10
load kafka.`pi-content-realtime-db` options
`kafka.bootstrap.servers`="---"
and `subscribe`="---"
as kafka_post_parquet;

-- if you are using kafka 0.8.0/0.9.0
load kafka8.`pi-content-realtime-db` options
`kafka.bootstrap.servers`="---"
and `topics`="---"
as kafka_post_parquet;

```

你可以将结果数据写入mysql

```sql
-- connect mysql as the data sink.
connect jdbc where
driver="com.mysql.jdbc.Driver"
and url="jdbc:mysql://127.0.0.1:3306/wow"
and driver="com.mysql.jdbc.Driver"
and user="---"
and password="----"
as mysql1;
-- Save the data to MYSQL
save append table21
as jdbc.`mysql1.test1`
options mode="append"
and `driver-statement-0`="create table test1 if not exists........."
-- executed in executor
and `statement-0`="replace into wow.test1(k,c) values(?,?)"
-- one batch duration
and duration="3"
and checkpointLocation="/tmp/cpl3";
```

你还可以将结果写入parquet静态分区表或者动态分区表

静态分区脚本:

-- 把数据增量保存到hdfs上,并且设置为json格式。 运行周期是10s,ck目录是 /tmp/cpl2
save append table21
as json.`/tmp/abc2`
```sql

save append table
as parquet.`/table/hp_stat_date=${pathDate.toString("yyyy-MM-dd")}`
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/cpl2";
and duration="30"
and checkpointLocation="/tmp/ckl1";
```

动态分区脚本:

```sql

save append table
as parquet.`table`
options mode="Append"
and duration="10"
and checkpointLocation="/tmp/ckl1"
partitionBy partition_field;
```

提交该任务后,等待几秒钟,就可以通过接口
mode 有三种模式:

Append模式:顾名思义,既然是Append,那就意味着它每次都是添加新的行,那么也就是说:它适用且只适用于那些一旦产生计算结果便永远不会去修改的情形, 所以它能保证每一行数据只被数据一次

Complete模式:整张结果表在每次触发时都会全量输出!这显然是是要支撑那些针对数据全集进行的计算,例如:聚合

Update模式:某种意义上是和Append模式针锋相对的一个种模式,它只输出上次trigger之后,发生了“更新”的数据的,这包含新生的数据和行

你可以用window聚合,并使用watermark处理延时数据,脚本如下:

```sql

select ts,f1 as table1;

-- register watermark for table1
register WaterMarkInPlace.`table1` as tmp1
options eventTimeCol="ts"
and delayThreshold="10 seconds";

-- process table1
select f1,count(*) as num from table1
-- 30 minutes为窗口大小,10 seconds为滑动时间
group by f1, window(ts,"30 minutes","10 seconds")
as table2;

save append table2 as ....
```

通过接口:

Expand All @@ -51,4 +160,4 @@ http://ip:port/stream/jobs/running
http://ip:port/stream/jobs/kill?groupId=....
可以杀死正在运行的流式任务

当然,你也可以打开SparkUI查看相关信息。
当然,你也可以打开SparkUI查看相关信息。

0 comments on commit 5d4232a

Please sign in to comment.