Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/1.8_test_3.10.x' into 1.8_releas…
Browse files Browse the repository at this point in the history
…e_3.10.x

# Conflicts:
#	core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java
#	core/src/main/java/com/dtstack/flink/sql/side/JoinNodeDealer.java
#	core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java
#	core/src/main/java/com/dtstack/flink/sql/side/SideSQLParser.java
#	core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java
#	oracle/oracle-side/oracle-async-side/src/main/java/com/dtstack/flink/sql/side/oracle/OracleAsyncSideInfo.java
#	rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/AbstractRdbSink.java
  • Loading branch information
todd5167 committed Apr 8, 2020
2 parents 22ac02a + e1736f3 commit b43aec0
Show file tree
Hide file tree
Showing 257 changed files with 3,576 additions and 2,641 deletions.
10 changes: 10 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
build:
stage: test
script:
- mvn clean org.jacoco:jacoco-maven-plugin:0.7.8:prepare-agent package -Dmaven.test.failure.ignore=true -q
- mvn sonar:sonar -Dsonar.projectKey="dt-insight-engine/flinkStreamSQL" -Dsonar.branch.name="v1.8.0_dev" -Dsonar.login=11974c5e9a29625efa09fdc3c3fdc031efb1aab1 -Dsonar.host.url=http://172.16.100.198:9000 -Dsonar.jdbc.url=jdbc:postgresql://172.16.100.198:5432/sonar -Dsonar.java.binaries=target/sonar
- sh ci/sonar_notify.sh
only:
- v1.8.0_dev
tags:
- dt-insight-engine
29 changes: 5 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,11 @@
> > * 支持原生FLinkSQL所有的语法
> > * 扩展了输入和输出的性能指标到promethus
## 新特性:
* 1.kafka源表支持not null语法,支持字符串类型的时间转换。
* 2.rdb维表与DB建立连接时,周期进行连接,防止连接断开。rdbsink写入时,对连接进行检查。
* 3.异步维表支持非等值连接,比如:<>,<,>。
* 4.增加kafka数组解析
* 5.增加kafka1.0以上版本的支持
* 6.增加postgresql、kudu、clickhouse维表、结果表的支持
* 7.支持插件的依赖方式,参考pluginLoadMode参数
* 8.支持cep处理
* 9.支持udaf
* 10.支持谓词下移
* 11.支持状态的ttl

## BUG修复:
* 1.修复不能解析sql中orderby,union语法。
* 2.修复yarnPer模式提交失败的异常。
* 3.一些bug的修复

# 已支持
* 源表:kafka 0.9、0.10、0.11、1.x版本
* 维表:mysql, SQlServer,oracle, hbase, mongo, redis, cassandra, serversocket, kudu, postgresql, clickhouse, impala, db2, sqlserver
* 结果表:mysql, SQlServer, oracle, hbase, elasticsearch5.x, mongo, redis, cassandra, console, kudu, postgresql, clickhouse, impala, db2, sqlserver

# 后续开发计划
* 维表快照
* kafka avro格式
* topN

## 1 快速起步
### 1.1 运行模式

Expand Down Expand Up @@ -149,7 +126,10 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* taskmanager.slots:per_job模式下指定每个taskmanager对应的slot数量(默认1)
* savePointPath:任务恢复点的路径(默认无)
* allowNonRestoredState:指示保存点是否允许非还原状态的标志(默认false)
* logLevel: 日志级别动态配置(默认info)
* restore.enable:是否失败重启(默认是true)
* failure.interval:衡量失败率的时间段,单位分钟(默认6m)
* delay.interval:连续两次重启尝试间的间隔,单位是秒(默认10s)
* logLevel: 日志级别动态配置(默认info)
* [prometheus 相关参数](docs/prometheus.md) per_job可指定metric写入到外部监控组件,以prometheus pushgateway举例


Expand Down Expand Up @@ -202,6 +182,7 @@ sh submit.sh -sql D:\sideSql.txt -name xctest -remoteSqlPluginPath /opt/dtstack
* [impala 结果表插件](docs/impalaSink.md)
* [db2 结果表插件](docs/db2Sink.md)
* [sqlserver 结果表插件](docs/sqlserverSink.md)
* [kafka 结果表插件](docs/kafkaSink.md)

### 2.3 维表插件
* [hbase 维表插件](docs/hbaseSide.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.side.AllReqRow;
import com.dtstack.flink.sql.side.BaseAllReqRow;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand All @@ -62,14 +62,12 @@
*
* @author xuqianjin
*/
public class CassandraAllReqRow extends AllReqRow {
public class CassandraAllReqRow extends BaseAllReqRow {

private static final long serialVersionUID = 54015343561288219L;

private static final Logger LOG = LoggerFactory.getLogger(CassandraAllReqRow.class);

private static final String cassandra_DRIVER = "com.cassandra.jdbc.Driver";

private static final int CONN_RETRY_NUM = 3;

private static final int FETCH_SIZE = 1000;
Expand All @@ -79,7 +77,7 @@ public class CassandraAllReqRow extends AllReqRow {

private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();

public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(new com.dtstack.flink.sql.side.cassandra.CassandraAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlNode;
Expand All @@ -37,16 +37,16 @@
*
* @author xuqianjin
*/
public class CassandraAllSideInfo extends SideInfo {
public class CassandraAllSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -8690814317653033557L;

public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAllSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

@Override
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;

sqlCondition = "select ${selectField} from ${tableName} ";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.enums.ECacheContentType;
import com.dtstack.flink.sql.side.AsyncReqRow;
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
import com.dtstack.flink.sql.side.CacheMissVal;
import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cache.CacheObj;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.google.common.base.Function;
Expand All @@ -67,7 +67,7 @@
*
* @author xuqianjin
*/
public class CassandraAsyncReqRow extends AsyncReqRow {
public class CassandraAsyncReqRow extends BaseAsyncReqRow {

private static final long serialVersionUID = 6631584128079864735L;

Expand All @@ -83,7 +83,7 @@ public class CassandraAsyncReqRow extends AsyncReqRow {
private transient ListenableFuture session;
private transient CassandraSideTableInfo cassandraSideTableInfo;

public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
public CassandraAsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(new com.dtstack.flink.sql.side.cassandra.CassandraAsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
}

Expand Down Expand Up @@ -216,7 +216,7 @@ public void asyncInvoke(CRow input, ResultFuture<CRow> resultFuture) throws Exce
connCassandraDB(cassandraSideTableInfo);

String sqlCondition = sideInfo.getSqlCondition() + " " + sqlWhere + " ALLOW FILTERING ";
System.out.println("sqlCondition:" + sqlCondition);
LOG.info("sqlCondition:{}" + sqlCondition);

ListenableFuture<ResultSet> resultSet = Futures.transformAsync(session,
new AsyncFunction<Session, ResultSet>() {
Expand Down Expand Up @@ -265,7 +265,6 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
public void onFailure(Throwable t) {
LOG.error("Failed to retrieve the data: %s%n",
t.getMessage());
System.out.println("Failed to retrieve the data: " + t.getMessage());
cluster.closeAsync();
resultFuture.completeExceptionally(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.dtstack.flink.sql.side.FieldInfo;
import com.dtstack.flink.sql.side.JoinInfo;
import com.dtstack.flink.sql.side.SideInfo;
import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.BaseSideInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
import com.dtstack.flink.sql.util.ParseUtils;
import org.apache.calcite.sql.SqlBasicCall;
Expand All @@ -30,6 +30,8 @@
import org.apache.calcite.sql.SqlNode;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

Expand All @@ -39,16 +41,18 @@
*
* @author xuqianjin
*/
public class CassandraAsyncSideInfo extends SideInfo {
public class CassandraAsyncSideInfo extends BaseSideInfo {

private static final long serialVersionUID = -4403313049809013362L;
private static final Logger LOG = LoggerFactory.getLogger(CassandraAsyncSideInfo.class.getSimpleName());

public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {

public CassandraAsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
}

@Override
public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
CassandraSideTableInfo cassandraSideTableInfo = (CassandraSideTableInfo) sideTableInfo;

String sideTableName = joinInfo.getSideTableName();
Expand All @@ -63,9 +67,9 @@ public void buildEqualInfo(JoinInfo joinInfo, SideTableInfo sideTableInfo) {
}

sqlCondition = "select ${selectField} from ${tableName}";

sqlCondition = sqlCondition.replace("${tableName}", cassandraSideTableInfo.getDatabase()+"."+cassandraSideTableInfo.getTableName()).replace("${selectField}", sideSelectFields);
System.out.println("---------side_exe_sql-----\n" + sqlCondition);

LOG.info("---------side_exe_sql-----\n{}" + sqlCondition);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@

package com.dtstack.flink.sql.side.cassandra.table;

import com.dtstack.flink.sql.table.AbsSideTableParser;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractSideTableParser;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.MathUtil;

import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;

/**
* Reason:
* Date: 2018/11/22
*
* @author xuqianjin
*/
public class CassandraSideParser extends AbsSideTableParser {
public class CassandraSideParser extends AbstractSideTableParser {

private final static String SIDE_SIGN_KEY = "sideSignKey";

Expand Down Expand Up @@ -73,7 +71,7 @@ public CassandraSideParser() {
}

@Override
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo cassandraSideTableInfo = new com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo();
cassandraSideTableInfo.setName(tableName);
parseFieldsInfo(fieldsInfo, cassandraSideTableInfo);
Expand All @@ -96,9 +94,10 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O
return cassandraSideTableInfo;
}

private void dealSideSign(Matcher matcher, TableInfo tableInfo) {
private void dealSideSign(Matcher matcher, AbstractTableInfo tableInfo) {
}

@Override
public Class dbTypeConvertToJavaType(String fieldType) {
switch (fieldType.toLowerCase()) {
case "bigint":
Expand All @@ -121,6 +120,8 @@ public Class dbTypeConvertToJavaType(String fieldType) {
return Double.class;
case "timestamp":
return Timestamp.class;
default:
break;
}

throw new RuntimeException("不支持 " + fieldType + " 类型");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package com.dtstack.flink.sql.side.cassandra.table;

import com.dtstack.flink.sql.side.SideTableInfo;
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
import com.google.common.base.Preconditions;

/**
Expand All @@ -28,7 +28,7 @@
*
* @author xuqianjin
*/
public class CassandraSideTableInfo extends SideTableInfo {
public class CassandraSideTableInfo extends AbstractSideTableInfo {

private static final long serialVersionUID = -5556431094535478915L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import com.datastax.driver.core.SocketOptions;
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.dtstack.flink.sql.outputformat.DtRichOutputFormat;
import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -72,7 +72,7 @@
* @see Tuple
* @see DriverManager
*/
public class CassandraOutputFormat extends DtRichOutputFormat<Tuple2> {
public class CassandraOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
private static final long serialVersionUID = -7994311331389155692L;

private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormat.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import com.dtstack.flink.sql.sink.IStreamSinkGener;
import com.dtstack.flink.sql.sink.cassandra.table.CassandraTableInfo;
import com.dtstack.flink.sql.table.TargetTableInfo;
import com.dtstack.flink.sql.table.AbstractTargetTableInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
Expand Down Expand Up @@ -63,7 +63,7 @@ public CassandraSink() {
}

@Override
public CassandraSink genStreamSink(TargetTableInfo targetTableInfo) {
public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
CassandraTableInfo cassandraTableInfo = (CassandraTableInfo) targetTableInfo;
this.address = cassandraTableInfo.getAddress();
this.tableName = cassandraTableInfo.getTableName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@

package com.dtstack.flink.sql.sink.cassandra.table;

import com.dtstack.flink.sql.table.AbsTableParser;
import com.dtstack.flink.sql.table.TableInfo;
import com.dtstack.flink.sql.table.AbstractTableParser;
import com.dtstack.flink.sql.table.AbstractTableInfo;
import com.dtstack.flink.sql.util.MathUtil;

import java.util.Map;

import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY;
import static com.dtstack.flink.sql.table.AbstractTableInfo.PARALLELISM_KEY;

/**
* Reason:
* Date: 2018/11/22
*
* @author xuqianjin
*/
public class CassandraSinkParser extends AbsTableParser {
public class CassandraSinkParser extends AbstractTableParser {

public static final String ADDRESS_KEY = "address";

Expand All @@ -60,7 +60,7 @@ public class CassandraSinkParser extends AbsTableParser {
public static final String POOL_TIMEOUT_MILLIS_KEY = "poolTimeoutMillis";

@Override
public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, Object> props) {
CassandraTableInfo cassandraTableInfo = new CassandraTableInfo();
cassandraTableInfo.setName(tableName);
parseFieldsInfo(fieldsInfo, cassandraTableInfo);
Expand Down
Loading

0 comments on commit b43aec0

Please sign in to comment.