diff --git a/README.md b/README.md index db25164b6..0a42e30ce 100644 --- a/README.md +++ b/README.md @@ -1,328 +1,36 @@ -# flinkStreamSQL -> * 基于开源的flink,对其实时sql进行扩展 -> > * 自定义create table 语法(包括源表,输出表,维表) -> > * 自定义create view 语法 -> > * 自定义create function 语法 -> > * 实现了流与维表的join -> > * 支持原生FLinkSQL所有的语法 -> > * 扩展了输入和输出的性能指标到promethus - - ## 新特性: - * 1.kafka源表支持not null语法,支持字符串类型的时间转换。 - * 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。 - * 3.异步维表支持非等值连接,比如:<>,<,>。 - * 4.增加kafka数组解析 - * 5.增加kafka1.0以上版本的支持 - * 6.增加postgresql、kudu、clickhouse维表、结果表的支持 - * 7.支持插件的依赖方式,参考pluginLoadMode参数 - * 8.支持cep处理 - * 9.支持udaf - * 10.支持谓词下移 - * 11.支持状态的ttl - - ## BUG修复: - * 1.修复不能解析sql中orderby,union语法。 - * 2.修复yarnPer模式提交失败的异常。 - * 3.一些bug的修复 - -# 已支持 - * 源表:kafka 0.9、0.10、0.11、1.x版本 - * 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver - * 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver - -# 后续开发计划 - * 维表快照 - * kafka avro格式 - * topN - -## 1 快速起步 -### 1.1 运行模式 - - -* 单机模式:对应Flink集群的单机模式 -* standalone模式:对应Flink集群的分布式模式 -* yarn模式:对应Flink集群的yarn模式 - -### 1.2 执行环境 - -* Java: JDK8及以上 -* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群) -* 操作系统:理论上不限 -* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例: -``` -## hadoop配置文件路径 -fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250 -security.kerberos.login.use-ticket-cache: true -security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab -security.kerberos.login.principal: maqi@DTSTACK.COM -security.kerberos.login.contexts: Client,KafkaClient -zookeeper.sasl.service-name: zookeeper -zookeeper.sasl.login-context-name: Client - -``` - -### 1.3 打包 - -进入项目根目录,使用maven打包: - -``` -mvn clean package -Dmaven.test.skip - -``` - -打包完成后的包结构: - -> * dt-center-flinkStreamSQL -> > * bin: 任务启动脚本 -> > * lib: launcher包存储路径,是任务提交的入口 -> > * plugins: 插件包存储路径 -> > * ........ : core及插件代码 - -### 1.4 启动 - -#### 1.4.1 启动命令 - -``` -sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"} -``` - -#### 1.4.2 命令行参数选项 - -* **mode** - * 描述:执行模式,也就是flink集群的工作模式 - * local: 本地模式 - * standalone: 提交到独立部署模式的flink集群 - * yarn: 提交到yarn模式的flink集群(即提交到已有flink集群) - * yarnPer: yarn per_job模式提交(即创建新flink application) - * 必选:否 - * 默认值:local - -* **name** - * 描述:flink 任务对应名称。 - * 必选:是 - * 默认值:无 - -* **sql** - * 描述:执行flink sql 的主体语句。 - * 必选:是 - * 默认值:无 - -* **localSqlPluginPath** - * 描述:本地插件根目录地址,也就是打包后产生的plugins目录。 - * 必选:是 - * 默认值:无 - -* **remoteSqlPluginPath** - * 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。 - * 必选:否 - * 默认值:无 - -* **addjar** - * 描述:扩展jar路径,当前主要是UDF定义的jar; - * 格式:json - * 必选:否 - * 默认值:无 +FlinkStreamSQL +============ +[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html) + +##技术交流 +- 招聘**大数据平台开发工程师**,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至[sishu@dtstack.com](mailto:sishu@dtstack.com) +- 我们使用[钉钉](https://www.dingtalk.com/)沟通交流,可以搜索群号[**30537511**]或者扫描下面的二维码进入钉钉群 +
+ +
+ +##介绍 + * 基于开源的flink,对其实时sql进行扩展 + * 自定义create table 语法(包括源表,输出表,维表) + * 自定义create view 语法 + * 自定义create function 语法 + * 实现了流与维表的join + * 支持原生FLinkSQL所有的语法 + * 扩展了输入和输出的性能指标到promethus -* **confProp** - * 描述:一些参数设置 - * 格式: json - * 必选:是 (如无参数填写空json即可) - * 默认值:无 - * 可选参数: - * sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒) - * sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟 - * state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。 - * state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。 - * state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。 - * sql.env.parallelism: 默认并行度设置 - * sql.max.env.parallelism: 最大并行度设置 - * time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime] - * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms) - * sql.checkpoint.mode: 可选值[EXACTLY_ONCE|AT_LEAST_ONCE] - * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms) - * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数 - * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)] - * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file:// - * jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768) - * taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768) - * taskmanager.num: per_job模式下指定taskmanager的实例数(默认1) - * taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1) - * savePointPath:任务恢复点的路径(默认无) - * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false) - * logLevel: 日志级别动态配置(默认info) - * [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例 +##目录 + +[ 1.1 demo]((docs/demo.md)) +[ 1.2 快速开始](docs/quickStart.md) +[ 1.3 参数配置](docs/config.md) +[ 1.4 支持的插件介绍和demo](docs/pluginsInfo.md) +[ 1.5 指标参数](docs/newMetric.md) +[ 1.6 自定义函数](docs/function.md) +[ 1.7 视图定义](docs/createView.md) + - -* **flinkconf** - * 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf - * 必选:否 - * 默认值:无 - -* **yarnconf** - * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop - * 必选:否 - * 默认值:无 - -* **flinkJarPath** - * 描述:per_job 模式提交需要指定本地的flink jar存放路径 - * 必选:否 - * 默认值:false - -* **queue** - * 描述:per_job 模式下指定的yarn queue - * 必选:否 - * 默认值:false - -* **pluginLoadMode** - * 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件 - * 必选:否 - * 默认值:classpath - -* **yarnSessionConf** - * 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid - * 必选:否 - * 默认值:false - - -## 2 结构 -### 2.1 源表插件 -* [kafka 源表插件](docs/kafkaSource.md) - -### 2.2 结果表插件 -* [elasticsearch 结果表插件](docs/elasticsearchSink.md) -* [hbase 结果表插件](docs/hbaseSink.md) -* [mysql 结果表插件](docs/mysqlSink.md) -* [oracle 结果表插件](docs/oracleSink.md) -* [mongo 结果表插件](docs/mongoSink.md) -* [redis 结果表插件](docs/redisSink.md) -* [cassandra 结果表插件](docs/cassandraSink.md) -* [kudu 结果表插件](docs/kuduSink.md) -* [postgresql 结果表插件](docs/postgresqlSink.md) -* [clickhouse 结果表插件](docs/clickhouseSink.md) -* [impala 结果表插件](docs/impalaSink.md) -* [db2 结果表插件](docs/db2Sink.md) -* [sqlserver 结果表插件](docs/sqlserverSink.md) - -### 2.3 维表插件 -* [hbase 维表插件](docs/hbaseSide.md) -* [mysql 维表插件](docs/mysqlSide.md) -* [oracle 维表插件](docs/oracleSide.md) -* [mongo 维表插件](docs/mongoSide.md) -* [redis 维表插件](docs/redisSide.md) -* [cassandra 维表插件](docs/cassandraSide.md) -* [kudu 维表插件](docs/kuduSide.md) -* [postgresql 维表插件](docs/postgresqlSide.md) -* [clickhouse 维表插件](docs/clickhouseSide.md) -* [impala 维表插件](docs/impalaSide.md) -* [db2 维表插件](docs/db2Side.md) -* [sqlserver 维表插件](docs/sqlserverSide.md) - -## 3 性能指标(新增) - -### kafka插件 -* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s) - 数据本身的时间和进入flink的当前时间的差值. - -* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData - 从kafka获取的数据解析失败的视为脏数据 - -* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate - kafka接受的记录数(未解析前)/s - -* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate - kafka接受的记录数(解析后)/s - -* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate - kafka接受的字节数/s - -* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag - 当前kafka10,kafka11有采集该指标 - -* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate - 写入的外部记录数/s - - -## 4 样例 - -``` - -CREATE (scala|table|aggregate) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun; - - -CREATE TABLE MyTable( - name varchar, - channel varchar, - pv int, - xctime bigint, - CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数 - )WITH( - type ='kafka09', - bootstrapServers ='172.16.8.198:9092', - zookeeperQuorum ='172.16.8.198:2181/kafka', - offsetReset ='latest', - topic ='nbTest1', - parallelism ='1' - ); - -CREATE TABLE MyResult( - channel varchar, - pv varchar - )WITH( - type ='mysql', - url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8', - userName ='dtstack', - password ='abc123', - tableName ='pv2', - parallelism ='1' - ); - -CREATE TABLE workerinfo( - cast(logtime as TIMESTAMP) AS rtime, - cast(logtime) AS rtime - )WITH( - type ='hbase', - zookeeperQuorum ='rdos1:2181', - tableName ='workerinfo', - rowKey ='ce,de', - parallelism ='1', - zookeeperParent ='/hbase' - ); - -CREATE TABLE sideTable( - cf:name varchar as name, - cf:info varchar as info, - PRIMARY KEY(name), - PERIOD FOR SYSTEM_TIME //维表标识 - )WITH( - type ='hbase', - zookeeperQuorum ='rdos1:2181', - zookeeperParent ='/hbase', - tableName ='workerinfo', - cache ='LRU', - cacheSize ='10000', - cacheTTLMs ='60000', - parallelism ='1' - ); - -insert -into - MyResult - select - d.channel, - d.info - from - ( select - a.*,b.info - from - MyTable a - join - sideTable b - on a.channel=b.name - where - a.channel = 'xc2' - and a.pv=10 ) as d -``` +## License -# 招聘 -1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至sishu@dtstack.com。 - +FlinkStreamSQL is under the Apache 2.0 license. See the [LICENSE](http://www.apache.org/licenses/LICENSE-2.0) file for details. + + diff --git a/README.md.back b/README.md.back new file mode 100644 index 000000000..db25164b6 --- /dev/null +++ b/README.md.back @@ -0,0 +1,328 @@ +# flinkStreamSQL +> * 基于开源的flink,对其实时sql进行扩展 +> > * 自定义create table 语法(包括源表,输出表,维表) +> > * 自定义create view 语法 +> > * 自定义create function 语法 +> > * 实现了流与维表的join +> > * 支持原生FLinkSQL所有的语法 +> > * 扩展了输入和输出的性能指标到promethus + + ## 新特性: + * 1.kafka源表支持not null语法,支持字符串类型的时间转换。 + * 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。 + * 3.异步维表支持非等值连接,比如:<>,<,>。 + * 4.增加kafka数组解析 + * 5.增加kafka1.0以上版本的支持 + * 6.增加postgresql、kudu、clickhouse维表、结果表的支持 + * 7.支持插件的依赖方式,参考pluginLoadMode参数 + * 8.支持cep处理 + * 9.支持udaf + * 10.支持谓词下移 + * 11.支持状态的ttl + + ## BUG修复: + * 1.修复不能解析sql中orderby,union语法。 + * 2.修复yarnPer模式提交失败的异常。 + * 3.一些bug的修复 + +# 已支持 + * 源表:kafka 0.9、0.10、0.11、1.x版本 + * 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver + * 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver + +# 后续开发计划 + * 维表快照 + * kafka avro格式 + * topN + +## 1 快速起步 +### 1.1 运行模式 + + +* 单机模式:对应Flink集群的单机模式 +* standalone模式:对应Flink集群的分布式模式 +* yarn模式:对应Flink集群的yarn模式 + +### 1.2 执行环境 + +* Java: JDK8及以上 +* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群) +* 操作系统:理论上不限 +* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例: +``` +## hadoop配置文件路径 +fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250 +security.kerberos.login.use-ticket-cache: true +security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab +security.kerberos.login.principal: maqi@DTSTACK.COM +security.kerberos.login.contexts: Client,KafkaClient +zookeeper.sasl.service-name: zookeeper +zookeeper.sasl.login-context-name: Client + +``` + +### 1.3 打包 + +进入项目根目录,使用maven打包: + +``` +mvn clean package -Dmaven.test.skip + +``` + +打包完成后的包结构: + +> * dt-center-flinkStreamSQL +> > * bin: 任务启动脚本 +> > * lib: launcher包存储路径,是任务提交的入口 +> > * plugins: 插件包存储路径 +> > * ........ : core及插件代码 + +### 1.4 启动 + +#### 1.4.1 启动命令 + +``` +sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin -localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins -addjar \["udf.jar\"\] -mode yarn -flinkconf D:\flink_home\kudu150etc -yarnconf D:\hadoop\etc\hadoopkudu -confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} -yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"} +``` + +#### 1.4.2 命令行参数选项 + +* **mode** + * 描述:执行模式,也就是flink集群的工作模式 + * local: 本地模式 + * standalone: 提交到独立部署模式的flink集群 + * yarn: 提交到yarn模式的flink集群(即提交到已有flink集群) + * yarnPer: yarn per_job模式提交(即创建新flink application) + * 必选:否 + * 默认值:local + +* **name** + * 描述:flink 任务对应名称。 + * 必选:是 + * 默认值:无 + +* **sql** + * 描述:执行flink sql 的主体语句。 + * 必选:是 + * 默认值:无 + +* **localSqlPluginPath** + * 描述:本地插件根目录地址,也就是打包后产生的plugins目录。 + * 必选:是 + * 默认值:无 + +* **remoteSqlPluginPath** + * 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。 + * 必选:否 + * 默认值:无 + +* **addjar** + * 描述:扩展jar路径,当前主要是UDF定义的jar; + * 格式:json + * 必选:否 + * 默认值:无 + +* **confProp** + * 描述:一些参数设置 + * 格式: json + * 必选:是 (如无参数填写空json即可) + * 默认值:无 + * 可选参数: + * sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒) + * sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟 + * state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。 + * state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。 + * state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。 + * sql.env.parallelism: 默认并行度设置 + * sql.max.env.parallelism: 最大并行度设置 + * time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime] + * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms) + * sql.checkpoint.mode: 可选值[EXACTLY_ONCE|AT_LEAST_ONCE] + * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms) + * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数 + * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)] + * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file:// + * jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768) + * taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768) + * taskmanager.num: per_job模式下指定taskmanager的实例数(默认1) + * taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1) + * savePointPath:任务恢复点的路径(默认无) + * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false) + * logLevel: 日志级别动态配置(默认info) + * [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例 + + +* **flinkconf** + * 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf + * 必选:否 + * 默认值:无 + +* **yarnconf** + * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop + * 必选:否 + * 默认值:无 + +* **flinkJarPath** + * 描述:per_job 模式提交需要指定本地的flink jar存放路径 + * 必选:否 + * 默认值:false + +* **queue** + * 描述:per_job 模式下指定的yarn queue + * 必选:否 + * 默认值:false + +* **pluginLoadMode** + * 描述:per_job 模式下的插件包加载方式。classpath:从每台机器加载插件包,shipfile:将需要插件从提交的节点上传到hdfs,不需要每台安装插件 + * 必选:否 + * 默认值:classpath + +* **yarnSessionConf** + * 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid + * 必选:否 + * 默认值:false + + +## 2 结构 +### 2.1 源表插件 +* [kafka 源表插件](docs/kafkaSource.md) + +### 2.2 结果表插件 +* [elasticsearch 结果表插件](docs/elasticsearchSink.md) +* [hbase 结果表插件](docs/hbaseSink.md) +* [mysql 结果表插件](docs/mysqlSink.md) +* [oracle 结果表插件](docs/oracleSink.md) +* [mongo 结果表插件](docs/mongoSink.md) +* [redis 结果表插件](docs/redisSink.md) +* [cassandra 结果表插件](docs/cassandraSink.md) +* [kudu 结果表插件](docs/kuduSink.md) +* [postgresql 结果表插件](docs/postgresqlSink.md) +* [clickhouse 结果表插件](docs/clickhouseSink.md) +* [impala 结果表插件](docs/impalaSink.md) +* [db2 结果表插件](docs/db2Sink.md) +* [sqlserver 结果表插件](docs/sqlserverSink.md) + +### 2.3 维表插件 +* [hbase 维表插件](docs/hbaseSide.md) +* [mysql 维表插件](docs/mysqlSide.md) +* [oracle 维表插件](docs/oracleSide.md) +* [mongo 维表插件](docs/mongoSide.md) +* [redis 维表插件](docs/redisSide.md) +* [cassandra 维表插件](docs/cassandraSide.md) +* [kudu 维表插件](docs/kuduSide.md) +* [postgresql 维表插件](docs/postgresqlSide.md) +* [clickhouse 维表插件](docs/clickhouseSide.md) +* [impala 维表插件](docs/impalaSide.md) +* [db2 维表插件](docs/db2Side.md) +* [sqlserver 维表插件](docs/sqlserverSide.md) + +## 3 性能指标(新增) + +### kafka插件 +* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s) + 数据本身的时间和进入flink的当前时间的差值. + +* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData + 从kafka获取的数据解析失败的视为脏数据 + +* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate + kafka接受的记录数(未解析前)/s + +* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate + kafka接受的记录数(解析后)/s + +* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate + kafka接受的字节数/s + +* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag + 当前kafka10,kafka11有采集该指标 + +* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate + 写入的外部记录数/s + + +## 4 样例 + +``` + +CREATE (scala|table|aggregate) FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun; + + +CREATE TABLE MyTable( + name varchar, + channel varchar, + pv int, + xctime bigint, + CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数 + )WITH( + type ='kafka09', + bootstrapServers ='172.16.8.198:9092', + zookeeperQuorum ='172.16.8.198:2181/kafka', + offsetReset ='latest', + topic ='nbTest1', + parallelism ='1' + ); + +CREATE TABLE MyResult( + channel varchar, + pv varchar + )WITH( + type ='mysql', + url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8', + userName ='dtstack', + password ='abc123', + tableName ='pv2', + parallelism ='1' + ); + +CREATE TABLE workerinfo( + cast(logtime as TIMESTAMP) AS rtime, + cast(logtime) AS rtime + )WITH( + type ='hbase', + zookeeperQuorum ='rdos1:2181', + tableName ='workerinfo', + rowKey ='ce,de', + parallelism ='1', + zookeeperParent ='/hbase' + ); + +CREATE TABLE sideTable( + cf:name varchar as name, + cf:info varchar as info, + PRIMARY KEY(name), + PERIOD FOR SYSTEM_TIME //维表标识 + )WITH( + type ='hbase', + zookeeperQuorum ='rdos1:2181', + zookeeperParent ='/hbase', + tableName ='workerinfo', + cache ='LRU', + cacheSize ='10000', + cacheTTLMs ='60000', + parallelism ='1' + ); + +insert +into + MyResult + select + d.channel, + d.info + from + ( select + a.*,b.info + from + MyTable a + join + sideTable b + on a.channel=b.name + where + a.channel = 'xc2' + and a.pv=10 ) as d +``` + +# 招聘 +1.大数据平台开发工程师,想了解岗位详细信息可以添加本人微信号ysqwhiletrue,注明招聘,如有意者发送简历至sishu@dtstack.com。 + diff --git a/docs/config.md b/docs/config.md new file mode 100644 index 000000000..48aeefc5a --- /dev/null +++ b/docs/config.md @@ -0,0 +1,105 @@ +### 命令行参数选项 + +``` +sh submit.sh -key1 val1 -key2 val2 +``` +其中key的可选参数描述如下 + +* **mode** + * 描述:执行模式,也就是flink集群的工作模式 + * local: 本地模式 + * standalone: 提交到独立部署模式的flink集群 + * yarn: 提交到yarn模式的flink集群(即提交到已有flink集群) + * yarnPer: yarn per_job模式提交(即创建新flink application) + * 必选:否 + * 默认值:local + +* **pluginLoadMode** + * 描述:yarnPer 模式下的插件包加载方式。 + * classpath: 从节点机器加载指定remoteSqlPluginPath路径插件包,需要预先在每个运行节点下存放一份插件包。 + * shipfile: 将localSqlPluginPath路径下的插件从本地上传到hdfs,不需要集群的每台机器存放一份插件包。 + * 必选:否 + * 默认值:classpath + + +* **name** + * 描述:flink 任务对应名称。 + * 必选:是 + * 默认值:无 + +* **sql** + * 描述:执行flink sql 的主体语句。 + * 必选:是 + * 默认值:无 + +* **localSqlPluginPath** + * 描述:本地插件根目录地址,也就是打包后产生的plugins目录。 + * 必选:是 + * 默认值:无 + +* **remoteSqlPluginPath** + * 描述:flink执行集群上的插件根目录地址(将打包好的插件存放到各个flink节点上,如果是yarn集群需要存放到所有的nodemanager上)。 + * 必选:否 + * 默认值:无 + +* **addjar** + * 描述:扩展jar路径,当前主要是UDF定义的jar; + * 格式:json + * 必选:否 + * 默认值:无 + +* **confProp** + * 描述:一些参数设置 + * 格式: json + * 必选:是 (如无参数填写空json即可) + * 默认值:无 + * 可选参数: + * sql.ttl.min: 最小过期时间,大于0的整数,如1d、1h(d\D:天,h\H:小时,m\M:分钟,s\s:秒) + * sql.ttl.max: 最大过期时间,大于0的整数,如2d、2h(d\D:天,h\H:小时,m\M:分钟,s\s:秒),需同时设置最小时间,且比最小时间大5分钟 + * state.backend: 任务状态后端,可选为MEMORY,FILESYSTEM,ROCKSDB,默认为flinkconf中的配置。 + * state.checkpoints.dir: FILESYSTEM,ROCKSDB状态后端文件系统存储路径,例如:hdfs://ns1/dtInsight/flink180/checkpoints。 + * state.backend.incremental: ROCKSDB状态后端是否开启增量checkpoint,默认为true。 + * sql.env.parallelism: 默认并行度设置 + * sql.max.env.parallelism: 最大并行度设置 + * time.characteristic: 可选值[ProcessingTime|IngestionTime|EventTime] + * sql.checkpoint.interval: 设置了该参数表明开启checkpoint(ms) + * sql.checkpoint.mode: 可选值[EXACTLY_ONCE|AT_LEAST_ONCE] + * sql.checkpoint.timeout: 生成checkpoint的超时时间(ms) + * sql.max.concurrent.checkpoints: 最大并发生成checkpoint数 + * sql.checkpoint.cleanup.mode: 默认是不会将checkpoint存储到外部存储,[true(任务cancel之后会删除外部存储)|false(外部存储需要手动删除)] + * flinkCheckpointDataURI: 设置checkpoint的外部存储路径,根据实际的需求设定文件路径,hdfs://, file:// + * jobmanager.memory.mb: per_job模式下指定jobmanager的内存大小(单位MB, 默认值:768) + * taskmanager.memory.mb: per_job模式下指定taskmanager的内存大小(单位MB, 默认值:768) + * taskmanager.num: per_job模式下指定taskmanager的实例数(默认1) + * taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1) + * savePointPath:任务恢复点的路径(默认无) + * allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false) + * logLevel: 日志级别动态配置(默认info) + * [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例 + + +* **flinkconf** + * 描述:flink配置文件所在的目录(单机模式下不需要),如/hadoop/flink-1.4.0/conf + * 必选:否 + * 默认值:无 + +* **yarnconf** + * 描述:Hadoop配置文件(包括hdfs和yarn)所在的目录(单机模式下不需要),如/hadoop/etc/hadoop + * 必选:否 + * 默认值:无 + +* **flinkJarPath** + * 描述:yarnPer 模式提交需要指定本地的flink jar存放路径 + * 必选:否 + * 默认值:false + +* **queue** + * 描述:yarnPer 模式下指定的yarn queue + * 必选:否 + * 默认值:false + +* **yarnSessionConf** + * 描述:yarn session 模式下指定的运行的一些参数,[可参考](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/cli.html),目前只支持指定yid + * 必选:否 + * 默认值:false + \ No newline at end of file diff --git a/docs/demo.md b/docs/demo.md new file mode 100644 index 000000000..fe4b5c720 --- /dev/null +++ b/docs/demo.md @@ -0,0 +1,85 @@ +### 样例1: +注册自定义函数,并指定某一列作为eventTime; +kafkaSource join hbaseDim ==> hbaseOut + +``` +CREATE scala FUNCTION CHARACTER_LENGTH WITH com.dtstack.Kun; + +CREATE TABLE MyTable( + name varchar, + channel varchar, + pv int, + xctime bigint, + CHARACTER_LENGTH(channel) AS timeLeng //自定义的函数 + )WITH( + type ='kafka09', + bootstrapServers ='172.16.8.198:9092', + zookeeperQuorum ='172.16.8.198:2181/kafka', + offsetReset ='latest', + topic ='nbTest1', + parallelism ='1' + ); + +CREATE TABLE MyResult( + channel varchar, + pv varchar + )WITH( + type ='mysql', + url ='jdbc:mysql://172.16.8.104:3306/test?charset=utf8', + userName ='dtstack', + password ='abc123', + tableName ='pv2', + parallelism ='1' + ); + +CREATE TABLE workerinfo( + cast(logtime as TIMESTAMP) AS rtime, + cast(logtime) AS rtime + )WITH( + type ='hbase', + zookeeperQuorum ='rdos1:2181', + tableName ='workerinfo', + rowKey ='ce,de', + parallelism ='1', + zookeeperParent ='/hbase' + ); + +CREATE TABLE sideTable( + cf:name varchar as name, + cf:info varchar as info, + PRIMARY KEY(name), + PERIOD FOR SYSTEM_TIME //维表标识 + )WITH( + type ='hbase', + zookeeperQuorum ='rdos1:2181', + zookeeperParent ='/hbase', + tableName ='workerinfo', + cache ='LRU', + cacheSize ='10000', + cacheTTLMs ='60000', + parallelism ='1' + ); + +insert +into + MyResult + select + d.channel, + d.info + from + ( select + a.*,b.info + from + MyTable a + join + sideTable b + on a.channel=b.name + where + a.channel = 'xc2' + and a.pv=10 ) as d +``` + +### 创建视图demo: + +``` +``` diff --git a/docs/images/streamsql_dd.jpg b/docs/images/streamsql_dd.jpg new file mode 100644 index 000000000..4a32461c1 Binary files /dev/null and b/docs/images/streamsql_dd.jpg differ diff --git a/docs/newMetric.md b/docs/newMetric.md new file mode 100644 index 000000000..771689e5f --- /dev/null +++ b/docs/newMetric.md @@ -0,0 +1,32 @@ +### 1. 自定义的性能指标(新增) + +### 开启prometheus 需要设置的 confProp 参数 +* metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter +* metrics.reporter.promgateway.host: prometheus pushgateway的地址 +* metrics.reporter.promgateway.port:prometheus pushgateway的端口 +* metrics.reporter.promgateway.jobName: 实例名称 +* metrics.reporter.promgateway.randomJobNameSuffix: 是否在实例名称后面添加随机字符串(默认:true) +* metrics.reporter.promgateway.deleteOnShutdown: 是否在停止的时候删除数据(默认false) + +#### kafka插件 +* 业务延迟: flink_taskmanager_job_task_operator_dtEventDelay(单位s) + 数据本身的时间和进入flink的当前时间的差值. + +* 各个输入源的脏数据:flink_taskmanager_job_task_operator_dtDirtyData + 从kafka获取的数据解析失败的视为脏数据 + +* 各Source的数据输入TPS: flink_taskmanager_job_task_operator_dtNumRecordsInRate + kafka接受的记录数(未解析前)/s + +* 各Source的数据输入RPS: flink_taskmanager_job_task_operator_dtNumRecordsInResolveRate + kafka接受的记录数(解析后)/s + +* 各Source的数据输入BPS: flink_taskmanager_job_task_operator_dtNumBytesInRate + kafka接受的字节数/s + +* Kafka作为输入源的各个分区的延迟数: flink_taskmanager_job_task_operator_topic_partition_dtTopicPartitionLag + 当前kafka10,kafka11有采集该指标 + +* 各个输出源RPS: flink_taskmanager_job_task_operator_dtNumRecordsOutRate + 写入的外部记录数/s + \ No newline at end of file diff --git a/docs/cassandraSide.md b/docs/plugin/cassandraSide.md similarity index 96% rename from docs/cassandraSide.md rename to docs/plugin/cassandraSide.md index 131560047..f5210a900 100644 --- a/docs/cassandraSide.md +++ b/docs/plugin/cassandraSide.md @@ -9,7 +9,7 @@ )WITH( type ='cassandra', address ='ip:port[,ip:port]', - userName='dbUserName', + userNae='dbUserName', password='dbPwd', tableName='tableName', database='database', @@ -30,7 +30,7 @@ |----|---| | tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/cassandraSink.md b/docs/plugin/cassandraSink.md similarity index 96% rename from docs/cassandraSink.md rename to docs/plugin/cassandraSink.md index 8ea38e104..6d6dd2301 100644 --- a/docs/cassandraSink.md +++ b/docs/plugin/cassandraSink.md @@ -25,7 +25,7 @@ CREATE TABLE tableName( |----|---| | tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/clickhouseSide.md b/docs/plugin/clickhouseSide.md similarity index 97% rename from docs/clickhouseSide.md rename to docs/plugin/clickhouseSide.md index 63d3cc3da..57e4eef6b 100644 --- a/docs/clickhouseSide.md +++ b/docs/plugin/clickhouseSide.md @@ -29,7 +29,7 @@ |----|---| | tableName | clickhouse表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/clickhouseSink.md b/docs/plugin/clickhouseSink.md similarity index 94% rename from docs/clickhouseSink.md rename to docs/plugin/clickhouseSink.md index d9774727f..780ac77b0 100644 --- a/docs/clickhouseSink.md +++ b/docs/plugin/clickhouseSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| clickhouse表名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/consoleSink.md b/docs/plugin/consoleSink.md similarity index 92% rename from docs/consoleSink.md rename to docs/plugin/consoleSink.md index 206d7faaa..d756f5a36 100644 --- a/docs/consoleSink.md +++ b/docs/plugin/consoleSink.md @@ -20,7 +20,7 @@ CREATE TABLE tableName( |----|---| | tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/db2Side.md b/docs/plugin/db2Side.md similarity index 96% rename from docs/db2Side.md rename to docs/plugin/db2Side.md index ed08ecb7d..8381d6206 100644 --- a/docs/db2Side.md +++ b/docs/plugin/db2Side.md @@ -29,7 +29,7 @@ |----|---| | tableName | db2表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/db2Sink.md b/docs/plugin/db2Sink.md similarity index 93% rename from docs/db2Sink.md rename to docs/plugin/db2Sink.md index 04d745c83..0fcbdd3d3 100644 --- a/docs/db2Sink.md +++ b/docs/plugin/db2Sink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| db2表名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/elasticsearch6Side.md b/docs/plugin/elasticsearch6Side.md similarity index 97% rename from docs/elasticsearch6Side.md rename to docs/plugin/elasticsearch6Side.md index 234143cbd..62da6ca62 100644 --- a/docs/elasticsearch6Side.md +++ b/docs/plugin/elasticsearch6Side.md @@ -32,7 +32,7 @@ |----|---| | tableName | elasticsearch表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/elasticsearch6Sink.md b/docs/plugin/elasticsearch6Sink.md similarity index 96% rename from docs/elasticsearch6Sink.md rename to docs/plugin/elasticsearch6Sink.md index 59a4ab53a..0bab12cc0 100644 --- a/docs/elasticsearch6Sink.md +++ b/docs/plugin/elasticsearch6Sink.md @@ -25,7 +25,7 @@ CREATE TABLE tableName( |----|---| |tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称| |colName|列名称| -|colType|列类型 [colType支持的类型](colType.md)| +|colType|列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: |参数名称|含义|是否必填|默认值| diff --git a/docs/elasticsearchSink.md b/docs/plugin/elasticsearchSink.md similarity index 95% rename from docs/elasticsearchSink.md rename to docs/plugin/elasticsearchSink.md index 8a47954e6..4908f8a05 100644 --- a/docs/elasticsearchSink.md +++ b/docs/plugin/elasticsearchSink.md @@ -22,7 +22,7 @@ CREATE TABLE tableName( |----|---| |tableName|在 sql 中使用的名称;即注册到flink-table-env上的名称| |colName|列名称| -|colType|列类型 [colType支持的类型](colType.md)| +|colType|列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: |参数名称|含义|是否必填|默认值| diff --git a/docs/hbaseSide.md b/docs/plugin/hbaseSide.md similarity index 100% rename from docs/hbaseSide.md rename to docs/plugin/hbaseSide.md diff --git a/docs/hbaseSink.md b/docs/plugin/hbaseSink.md similarity index 94% rename from docs/hbaseSink.md rename to docs/plugin/hbaseSink.md index cfe9de9c0..5bc917bb9 100644 --- a/docs/hbaseSink.md +++ b/docs/plugin/hbaseSink.md @@ -24,7 +24,7 @@ hbase2.0 |----|---| | tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称 | colFamily:colName | hbase中的列族名称和列名称 -| colType | 列类型 [colType支持的类型](colType.md) +| colType | 列类型 [colType支持的类型](docs/colType.md) ## 4.参数: diff --git a/docs/impalaSide.md b/docs/plugin/impalaSide.md similarity index 98% rename from docs/impalaSide.md rename to docs/plugin/impalaSide.md index 182731fba..5c7479b04 100644 --- a/docs/impalaSide.md +++ b/docs/plugin/impalaSide.md @@ -29,7 +29,7 @@ |----|---| | tableName | 注册到flink的表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/impalaSink.md b/docs/plugin/impalaSink.md similarity index 97% rename from docs/impalaSink.md rename to docs/plugin/impalaSink.md index a672fa98c..af3baa282 100644 --- a/docs/impalaSink.md +++ b/docs/plugin/impalaSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/kafkaSource.md b/docs/plugin/kafkaSource.md similarity index 98% rename from docs/kafkaSource.md rename to docs/plugin/kafkaSource.md index 76096aba6..035e39ffb 100644 --- a/docs/kafkaSource.md +++ b/docs/plugin/kafkaSource.md @@ -31,7 +31,7 @@ CREATE TABLE tableName( |----|---| | tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| | function(colNameX) as aliasName | 支持在定义列信息的时候根据已有列类型生成新的列(函数可以使用系统函数和已经注册的UDF)| | WATERMARK FOR colName AS withOffset( colName , delayTime ) | 标识输入流生的watermake生成规则,根据指定的colName(当前支持列的类型为Long | Timestamp) 和delayTime生成waterMark 同时会在注册表的使用附带上rowtime字段(如果未指定则默认添加proctime字段);注意:添加该标识的使用必须设置系统参数 time.characteristic:EventTime; delayTime: 数据最大延迟时间(ms)| @@ -48,7 +48,7 @@ CREATE TABLE tableName( |offsetReset | 读取的topic 的offset初始位置[latest|earliest|指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]|否|latest| |parallelism | 并行度设置|否|1| |sourcedatatype | 数据类型|否|json| -|timezone|时区设置[timezone支持的参数](timeZone.md)|否|'Asia/Shanghai' +|timezone|时区设置[timezone支持的参数](../timeZone.md)|否|'Asia/Shanghai' **kafka相关参数可以自定义,使用kafka.开头即可。** ``` kafka.consumer.id diff --git a/docs/kuduSide.md b/docs/plugin/kuduSide.md similarity index 97% rename from docs/kuduSide.md rename to docs/plugin/kuduSide.md index 22503f202..718d43117 100644 --- a/docs/kuduSide.md +++ b/docs/plugin/kuduSide.md @@ -55,7 +55,7 @@ kudu 1.10.0+cdh6.2.0 |----|---| | tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/kuduSink.md b/docs/plugin/kuduSink.md similarity index 95% rename from docs/kuduSink.md rename to docs/plugin/kuduSink.md index 4f3a63f4f..25f44ede1 100644 --- a/docs/kuduSink.md +++ b/docs/plugin/kuduSink.md @@ -27,7 +27,7 @@ kudu 1.9.0+cdh6.2.0 |----|---| | tableName | 在 sql 中使用的名称;即注册到flink-table-env上的名称 | colName | 列名称,redis中存储为 表名:主键名:主键值:列名]| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/mongoSide.md b/docs/plugin/mongoSide.md similarity index 96% rename from docs/mongoSide.md rename to docs/plugin/mongoSide.md index db557f441..90ffa2582 100644 --- a/docs/mongoSide.md +++ b/docs/plugin/mongoSide.md @@ -30,7 +30,7 @@ |----|---| | tableName | 注册到flink的表名称(可选填;不填默认和hbase对应的表名称相同)| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/mongoSink.md b/docs/plugin/mongoSink.md similarity index 94% rename from docs/mongoSink.md rename to docs/plugin/mongoSink.md index b0f916aab..9900dea12 100644 --- a/docs/mongoSink.md +++ b/docs/plugin/mongoSink.md @@ -25,7 +25,7 @@ CREATE TABLE tableName( |----|---| | tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/mysqlSide.md b/docs/plugin/mysqlSide.md similarity index 96% rename from docs/mysqlSide.md rename to docs/plugin/mysqlSide.md index f0eb16090..6a0e0aea9 100644 --- a/docs/mysqlSide.md +++ b/docs/plugin/mysqlSide.md @@ -29,7 +29,7 @@ |----|---| | tableName | mysql表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/mysqlSink.md b/docs/plugin/mysqlSink.md similarity index 93% rename from docs/mysqlSink.md rename to docs/plugin/mysqlSink.md index 3218f9371..300086e21 100644 --- a/docs/mysqlSink.md +++ b/docs/plugin/mysqlSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| mysql表名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/oracleSide.md b/docs/plugin/oracleSide.md similarity index 96% rename from docs/oracleSide.md rename to docs/plugin/oracleSide.md index 74fc56680..47b8e36c7 100644 --- a/docs/oracleSide.md +++ b/docs/plugin/oracleSide.md @@ -28,7 +28,7 @@ |----|---| | tableName | oracle表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/oracleSink.md b/docs/plugin/oracleSink.md similarity index 94% rename from docs/oracleSink.md rename to docs/plugin/oracleSink.md index 47ddd8371..c15a6109a 100644 --- a/docs/oracleSink.md +++ b/docs/plugin/oracleSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| oracle表名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/polardbSide.md b/docs/plugin/polardbSide.md similarity index 96% rename from docs/polardbSide.md rename to docs/plugin/polardbSide.md index c393d6836..625613f98 100644 --- a/docs/polardbSide.md +++ b/docs/plugin/polardbSide.md @@ -28,7 +28,7 @@ |----|---| | tableName | polardb表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/polardbSink.md b/docs/plugin/polardbSink.md similarity index 93% rename from docs/polardbSink.md rename to docs/plugin/polardbSink.md index 75582042c..d252dd712 100644 --- a/docs/polardbSink.md +++ b/docs/plugin/polardbSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| polardb表名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/postgresqlSide.md b/docs/plugin/postgresqlSide.md similarity index 96% rename from docs/postgresqlSide.md rename to docs/plugin/postgresqlSide.md index b38d7d1f2..6928042f6 100644 --- a/docs/postgresqlSide.md +++ b/docs/plugin/postgresqlSide.md @@ -29,7 +29,7 @@ |----|---| | tableName | 注册到flink的表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/postgresqlSink.md b/docs/plugin/postgresqlSink.md similarity index 94% rename from docs/postgresqlSink.md rename to docs/plugin/postgresqlSink.md index 7fa7cab2f..6d7d87834 100644 --- a/docs/postgresqlSink.md +++ b/docs/plugin/postgresqlSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| 在 sql 中使用的名称;即注册到flink-table-env上的名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/redisSide.md b/docs/plugin/redisSide.md similarity index 100% rename from docs/redisSide.md rename to docs/plugin/redisSide.md diff --git a/docs/redisSink.md b/docs/plugin/redisSink.md similarity index 100% rename from docs/redisSink.md rename to docs/plugin/redisSink.md diff --git a/docs/serverSocketSource.md b/docs/plugin/serverSocketSource.md similarity index 100% rename from docs/serverSocketSource.md rename to docs/plugin/serverSocketSource.md diff --git a/docs/sqlserverSide.md b/docs/plugin/sqlserverSide.md similarity index 96% rename from docs/sqlserverSide.md rename to docs/plugin/sqlserverSide.md index 7a21de778..1cab43cbd 100644 --- a/docs/sqlserverSide.md +++ b/docs/plugin/sqlserverSide.md @@ -29,7 +29,7 @@ |----|---| | tableName | sqlserver表名称| | colName | 列名称| - | colType | 列类型 [colType支持的类型](colType.md)| + | colType | 列类型 [colType支持的类型](docs/colType.md)| | PERIOD FOR SYSTEM_TIME | 关键字表明该定义的表为维表信息| | PRIMARY KEY(keyInfo) | 维表主键定义;多个列之间用逗号隔开| diff --git a/docs/sqlserverSink.md b/docs/plugin/sqlserverSink.md similarity index 94% rename from docs/sqlserverSink.md rename to docs/plugin/sqlserverSink.md index 8c2df2855..8ecb14308 100644 --- a/docs/sqlserverSink.md +++ b/docs/plugin/sqlserverSink.md @@ -24,7 +24,7 @@ CREATE TABLE tableName( |----|---| | tableName| sqlserver表名称| | colName | 列名称| -| colType | 列类型 [colType支持的类型](colType.md)| +| colType | 列类型 [colType支持的类型](docs/colType.md)| ## 4.参数: diff --git a/docs/pluginsInfo.md b/docs/pluginsInfo.md new file mode 100644 index 000000000..6baec3faf --- /dev/null +++ b/docs/pluginsInfo.md @@ -0,0 +1,32 @@ +### 1 插件列表 +#### 1.1 源表插件 +* [kafka 源表插件](docs/plugin/kafkaSource.md) + +#### 1.2 结果表插件 +* [elasticsearch 结果表插件](docs/plugin/elasticsearchSink.md) +* [hbase 结果表插件](docs/plugin/hbaseSink.md) +* [mysql 结果表插件](docs/plugin/mysqlSink.md) +* [oracle 结果表插件](docs/plugin/oracleSink.md) +* [mongo 结果表插件](docs/plugin/mongoSink.md) +* [redis 结果表插件](docs/plugin/redisSink.md) +* [cassandra 结果表插件](docs/plugin/cassandraSink.md) +* [kudu 结果表插件](docs/plugin/kuduSink.md) +* [postgresql 结果表插件](docs/plugin/postgresqlSink.md) +* [clickhouse 结果表插件](docs/plugin/clickhouseSink.md) +* [impala 结果表插件](docs/plugin/impalaSink.md) +* [db2 结果表插件](docs/plugin/db2Sink.md) +* [sqlserver 结果表插件](docs/plugin/sqlserverSink.md) + +#### 1.3 维表插件 +* [hbase 维表插件](docs/plugin/hbaseSide.md) +* [mysql 维表插件](docs/plugin/mysqlSide.md) +* [oracle 维表插件](docs/plugin/oracleSide.md) +* [mongo 维表插件](docs/plugin/mongoSide.md) +* [redis 维表插件](docs/plugin/redisSide.md) +* [cassandra 维表插件](docs/plugin/cassandraSide.md) +* [kudu 维表插件](docs/plugin/kuduSide.md) +* [postgresql 维表插件](docs/plugin/postgresqlSide.md) +* [clickhouse 维表插件](docs/plugin/clickhouseSide.md) +* [impala 维表插件](docs/plugin/impalaSide.md) +* [db2 维表插件](docs/plugin/db2Side.md) +* [sqlserver 维表插件](docs/plugin/sqlserverSide.md) diff --git a/docs/prometheus.md b/docs/prometheus.md deleted file mode 100644 index a36498b42..000000000 --- a/docs/prometheus.md +++ /dev/null @@ -1,7 +0,0 @@ -## 使用 prometheus pushgateway 需要设置的 confProp 参数 -* metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter -* metrics.reporter.promgateway.host: prometheus pushgateway的地址 -* metrics.reporter.promgateway.port:prometheus pushgateway的端口 -* metrics.reporter.promgateway.jobName: 实例名称 -* metrics.reporter.promgateway.randomJobNameSuffix: 是否在实例名称后面添加随机字符串(默认:true) -* metrics.reporter.promgateway.deleteOnShutdown: 是否在停止的时候删除数据(默认false) \ No newline at end of file diff --git a/docs/quickStart.md b/docs/quickStart.md new file mode 100644 index 000000000..dda046062 --- /dev/null +++ b/docs/quickStart.md @@ -0,0 +1,63 @@ +### 1.1 运行模式 + +* 本地模式:通常用于本地开发调试(--mode:local) +* standalone模式:Flink 本身提供到集群分布式模式(--mode:standalone) +* yarn-session模式:在yarn上已经预先启动了flink集群(--mode:yarn) +* yarn-perjob 模式:每个任务单独启动一个yarn application,推荐使用该模式.(--mode:yarnPer) + +### 1.2 执行环境 + +* Java: JDK8及以上 +* Flink集群: 1.4,1.5,1.8(单机模式不需要安装Flink集群) +* 操作系统:理论上不限 +* kerberos环境需要在flink-conf.yaml配置security.kerberos.login.keytab以及security.kerberos.login.principal参数,配置案例: +``` +#提交到hadoop环境一定要配置fs.hdfs.hadoopconf参数 +fs.hdfs.hadoopconf: /Users/maqi/tmp/hadoopconf/hadoop_250 +security.kerberos.login.use-ticket-cache: true +security.kerberos.login.keytab: /Users/maqi/tmp/hadoopconf/hadoop_250/maqi.keytab +security.kerberos.login.principal: maqi@DTSTACK.COM +security.kerberos.login.contexts: Client,KafkaClient +zookeeper.sasl.service-name: zookeeper +zookeeper.sasl.login-context-name: Client +``` + + +### 1.3 打包 + +进入项目根目录,使用maven打包: + +``` +mvn clean package -Dmaven.test.skip + +``` + +####可运行的目录结构: +``` +| +|-----bin +| |--- submit.sh 任务启动脚本 +|-----lib +| |--- sql.launcher.jar 包存储路径,是任务提交的入口(需要手动移动到该目录) +|-----plugins: 插件包存储路径(mvn 打包之后会自动将jar移动到该目录下) +| |--- core.jar +| |--- xxxsource +| |--- xxxsink +| |--- xxxside +``` +### 1.4 启动 + +#### 1.4.1 启动命令 + +``` +sh submit.sh -sql D:\sideSql.txt +-name xctest +-remoteSqlPluginPath /opt/dtstack/150_flinkplugin/sqlplugin +-localSqlPluginPath D:\gitspace\flinkStreamSQL\plugins +-addjar \["udf.jar\"\] +-mode yarn +-flinkconf D:\flink_home\kudu150etc +-yarnconf D:\hadoop\etc\hadoopkudu +-confProp \{\"time.characteristic\":\"EventTime\",\"sql.checkpoint.interval\":10000\} +-yarnSessionConf \{\"yid\":\"application_1564971615273_38182\"} +``` \ No newline at end of file