From 2fee31395fbc6d9e31b21bb22435da36f5ceb872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=B1=E6=98=8E=E4=BA=AE?= Date: Fri, 16 Nov 2018 11:47:06 +0800 Subject: [PATCH] update mlsql-stream.md update mlsql-stream.md --- docs/en/mlsql-stream.md | 68 ++++++++++++------ docs/mlsql-stream.md | 151 ++++++++++++++++++++++++++++++++++------ 2 files changed, 178 insertions(+), 41 deletions(-) diff --git a/docs/en/mlsql-stream.md b/docs/en/mlsql-stream.md index 38f6b5f82..2ff5fb480 100644 --- a/docs/en/mlsql-stream.md +++ b/docs/en/mlsql-stream.md @@ -157,65 +157,93 @@ In real world you can load kafka source like this: ```sql --- if you are using kafka 1.0 -load kafka.`pi-content-realtime-db` options +-- if you are using kafka 0.10 +load kafka.`pi-content-realtime-db` options `kafka.bootstrap.servers`="---" +and `subscribe`="---" as kafka_post_parquet; -- if you are using kafka 0.8.0/0.9.0 -load kafka9.`pi-content-realtime-db` options +load kafka8.`pi-content-realtime-db` options `kafka.bootstrap.servers`="---" +and `topics`="---" as kafka_post_parquet; ``` +If you want to save data with dynamic partition: + +```sql + +save append table +as parquet.`table` +options mode="Append" +and duration="10" +and checkpointLocation="/tmp/ckl1" +partitionBy partition_field; +``` + If you want to save data with static partition: ```sql -save append post_parquet -as newParquet.`/table1/hp_stat_date=${date.toString("yyyy-MM-dd")}` -options mode="Append" -and duration="30" +save append post_parquet +as parquet.`/table1/hp_stat_date=${pathDate.toString("yyyy-MM-dd")}` +options mode="Append" +and duration="30" and checkpointLocation="/tmp/ckl1"; ``` If you want to save data to MySQL: ```sql - +-- connect mysql as the data sink. +connect jdbc where +driver="com.mysql.jdbc.Driver" +and url="jdbc:mysql://127.0.0.1:3306/wow" +and driver="com.mysql.jdbc.Driver" +and user="---" +and password="----" +as mysql1; -- Save the data to MYSQL -save append table21 -as NONE.`mysql1.test1` -options mode="Complete" -and implClass="org.apache.spark.sql.execution.streaming.JDBCSinkProvider" --- executed in driver +save append table21 +as jdbc.`mysql1.test1` +options mode="append" and `driver-statement-0`="create table test1 if not exists........." -- executed in executor -and `statement-0`="insert into wow.test1(k,c) values(?,?)" +and `statement-0`="replace into wow.test1(k,c) values(?,?)" +-- one batch duration and duration="3" and checkpointLocation="/tmp/cpl3"; ``` -If you want to add watermark for a table: +If you want to use window with watermark for a table: ```sql -select ..... as table1; +select ts,f1 as table1; -- register watermark for table1 register WaterMarkInPlace.`table1` as tmp1 options eventTimeCol="ts" -and delayThreshold="1 seconds"; +and delayThreshold="10 seconds"; -- process table1 -select count(*) as num from table1 -group by window(ts,"30 minutes","10 seconds") +select f1,count(*) as num from table1 +-- window size is 30 minutes,slide is 10 seconds +group by f1, window(ts,"30 minutes","10 seconds") as table2; -save append ...... +save append table2 as .... ``` +Streaming has three modes: + +Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. + +Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change. + +Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage. Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode. ## How to manager your stream jobs? diff --git a/docs/mlsql-stream.md b/docs/mlsql-stream.md index 643148158..16e895567 100644 --- a/docs/mlsql-stream.md +++ b/docs/mlsql-stream.md @@ -9,37 +9,146 @@ StreamingPro现在也支持用XQL定义流式计算了 post 参数名称为:sql -具体脚本内容为: +下面脚本是个自定义数据源的例子输出console的例子,具体脚本内容为: ``` --- 设置该流式任务的名字。这个名字需要保持全局唯一。 -set streamName="streamExample" +```sql +-- mock some data. +set data=''' +{"key":"yes","value":"no","topic":"test","partition":0,"offset":0,"timestamp":"2008-01-24 18:01:01.001","timestampType":0} +{"key":"yes","value":"no","topic":"test","partition":0,"offset":1,"timestamp":"2008-01-24 18:01:01.002","timestampType":0} +{"key":"yes","value":"no","topic":"test","partition":0,"offset":2,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} +{"key":"yes","value":"no","topic":"test","partition":0,"offset":3,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} +{"key":"yes","value":"no","topic":"test","partition":0,"offset":4,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} +{"key":"yes","value":"no","topic":"test","partition":0,"offset":5,"timestamp":"2008-01-24 18:01:01.003","timestampType":0} +'''; + +-- load data as table +load jsonStr.`data` as datasource; + +-- convert table as stream source +load mockStream.`datasource` options +stepSizeRange="0-3" +as newkafkatable1; --- 加载kafka数据, 如果是0.8,0.9 那么分别使用kafka8 kafka9。如果是1.0 则 --- 直接使用kafka即可。指的注意是参数遵循原有spark规范,比如1.0参数你需要查看spark --- 相关文档 +-- aggregation +select cast(key as string) as k,count(*) as c from newkafkatable1 group by key +as table21; -load kafka9.`` options `kafka.bootstrap.servers`="127.0.0.1:9092" -and `topics`="testM" --- watermark的设置 --- and eventTimeCol="" and delayThreshold="" -as newkafkatable1; +-- output the the result to console. +save append table21 +as console.`` +options mode="Complete" +and duration="10" +and checkpointLocation="/tmp/cpl3"; +``` --- 简单获得该周期的所有卡夫卡数据,不做任何处理。 -select * from newkafkatable1 -as table21; +提交该任务后,日志可以看到输出: +``` +------------------------------------------- +Batch: 6 +------------------------------------------- ++-------+-------+-------+--------------------+ +|column1|column2|column3| kafkaValue| ++-------+-------+-------+--------------------+ +| m| m| m|[yes, 0, 5, 2008-...| ++-------+-------+-------+--------------------+ +``` + +你可以以kafka为数据源,脚本如下: + +```sql + +-- if you are using kafka 0.10 +load kafka.`pi-content-realtime-db` options +`kafka.bootstrap.servers`="---" +and `subscribe`="---" +as kafka_post_parquet; + +-- if you are using kafka 0.8.0/0.9.0 +load kafka8.`pi-content-realtime-db` options +`kafka.bootstrap.servers`="---" +and `topics`="---" +as kafka_post_parquet; + +``` + +你可以将结果数据写入mysql + +```sql +-- connect mysql as the data sink. +connect jdbc where +driver="com.mysql.jdbc.Driver" +and url="jdbc:mysql://127.0.0.1:3306/wow" +and driver="com.mysql.jdbc.Driver" +and user="---" +and password="----" +as mysql1; +-- Save the data to MYSQL +save append table21 +as jdbc.`mysql1.test1` +options mode="append" +and `driver-statement-0`="create table test1 if not exists........." +-- executed in executor +and `statement-0`="replace into wow.test1(k,c) values(?,?)" +-- one batch duration +and duration="3" +and checkpointLocation="/tmp/cpl3"; +``` + +你还可以将结果写入parquet静态分区表或者动态分区表 + +静态分区脚本: --- 把数据增量保存到hdfs上,并且设置为json格式。 运行周期是10s,ck目录是 /tmp/cpl2 -save append table21 -as json.`/tmp/abc2` +```sql + +save append table +as parquet.`/table/hp_stat_date=${pathDate.toString("yyyy-MM-dd")}` options mode="Append" -and duration="10" -and checkpointLocation="/tmp/cpl2"; +and duration="30" +and checkpointLocation="/tmp/ckl1"; +``` + +动态分区脚本: + +```sql +save append table +as parquet.`table` +options mode="Append" +and duration="10" +and checkpointLocation="/tmp/ckl1" +partitionBy partition_field; ``` -提交该任务后,等待几秒钟,就可以通过接口 +mode 有三种模式: + +Append模式:顾名思义,既然是Append,那就意味着它每次都是添加新的行,那么也就是说:它适用且只适用于那些一旦产生计算结果便永远不会去修改的情形, 所以它能保证每一行数据只被数据一次 + +Complete模式:整张结果表在每次触发时都会全量输出!这显然是是要支撑那些针对数据全集进行的计算,例如:聚合 + +Update模式:某种意义上是和Append模式针锋相对的一个种模式,它只输出上次trigger之后,发生了“更新”的数据的,这包含新生的数据和行 + +你可以用window聚合,并使用watermark处理延时数据,脚本如下: + +```sql + +select ts,f1 as table1; + +-- register watermark for table1 +register WaterMarkInPlace.`table1` as tmp1 +options eventTimeCol="ts" +and delayThreshold="10 seconds"; + +-- process table1 +select f1,count(*) as num from table1 +-- 30 minutes为窗口大小,10 seconds为滑动时间 +group by f1, window(ts,"30 minutes","10 seconds") +as table2; + +save append table2 as .... +``` 通过接口: @@ -51,4 +160,4 @@ http://ip:port/stream/jobs/running http://ip:port/stream/jobs/kill?groupId=.... 可以杀死正在运行的流式任务 -当然,你也可以打开SparkUI查看相关信息。 \ No newline at end of file +当然,你也可以打开SparkUI查看相关信息。