From b78a00511222379d99f2b7cb19f3ac63dbfd2779 Mon Sep 17 00:00:00 2001 From: zoudaokoulife Date: Tue, 10 Mar 2020 16:09:31 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=90=88=E5=B9=B6=E5=86=B2?= =?UTF-8?q?=E7=AA=81=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sink/cassandra/CassandraOutputFormat.java | 2 +- .../flink/sql/side/kudu/KuduAllReqRow.java | 6 ++++++ .../flink/sql/side/kudu/KuduAsyncReqRow.java | 19 +++++++++---------- .../flink/sql/side/redis/RedisAllReqRow.java | 13 ++++++------- .../sql/sink/redis/RedisOutputFormat.java | 5 +---- 5 files changed, 23 insertions(+), 22 deletions(-) diff --git a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java index 9cb180f8e..db9b87d8d 100644 --- a/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java +++ b/cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraOutputFormat.java @@ -54,7 +54,7 @@ import com.datastax.driver.core.SocketOptions; import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; import com.datastax.driver.core.policies.RetryPolicy; -import com.dtstack.flink.sql.outputformat.DtRichOutputFormat; +import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java b/kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java index 505b21479..31f9aa13d 100644 --- a/kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java +++ b/kudu/kudu-side/kudu-all-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAllReqRow.java @@ -7,11 +7,17 @@ import com.dtstack.flink.sql.side.AbstractSideTableInfo; import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo; import com.dtstack.flink.sql.side.kudu.utils.KuduUtil; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.calcite.sql.JoinType; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.KuduClient; diff --git a/kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java b/kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java index 5d3d10a79..e534a85f9 100644 --- a/kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java +++ b/kudu/kudu-side/kudu-async-side/src/main/java/com/dtstack/flink/sql/side/kudu/KuduAsyncReqRow.java @@ -1,20 +1,12 @@ package com.dtstack.flink.sql.side.kudu; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.async.ResultFuture; -import org.apache.flink.table.runtime.types.CRow; -import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; -import org.apache.flink.types.Row; -import org.apache.flink.util.Preconditions; - import com.dtstack.flink.sql.enums.ECacheContentType; -import com.dtstack.flink.sql.side.AsyncReqRow; +import com.dtstack.flink.sql.side.AbstractSideTableInfo; +import com.dtstack.flink.sql.side.BaseAsyncReqRow; import com.dtstack.flink.sql.side.CacheMissVal; import com.dtstack.flink.sql.side.FieldInfo; import com.dtstack.flink.sql.side.JoinInfo; import com.dtstack.flink.sql.side.PredicateInfo; -import com.dtstack.flink.sql.side.SideTableInfo; import com.dtstack.flink.sql.side.cache.CacheObj; import com.dtstack.flink.sql.side.kudu.table.KuduSideTableInfo; import com.dtstack.flink.sql.side.kudu.utils.KuduUtil; @@ -24,6 +16,13 @@ import com.stumbleupon.async.Deferred; import io.vertx.core.json.JsonArray; import org.apache.commons.lang3.StringUtils; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.AsyncKuduClient; diff --git a/redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java b/redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java index 7bc4fe60e..02f480105 100644 --- a/redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java +++ b/redis5/redis5-side/redis-all-side/src/main/java/com/dtstack/flink/sql/side/redis/RedisAllReqRow.java @@ -18,15 +18,10 @@ package com.dtstack.flink.sql.side.redis; -import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.table.runtime.types.CRow; -import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; - -import com.dtstack.flink.sql.side.AllReqRow; +import com.dtstack.flink.sql.side.AbstractSideTableInfo; +import com.dtstack.flink.sql.side.BaseAllReqRow; import com.dtstack.flink.sql.side.FieldInfo; import com.dtstack.flink.sql.side.JoinInfo; -import com.dtstack.flink.sql.side.SideTableInfo; import com.dtstack.flink.sql.side.redis.table.RedisSideReqRow; import com.dtstack.flink.sql.side.redis.table.RedisSideTableInfo; import com.esotericsoftware.minlog.Log; @@ -34,6 +29,10 @@ import org.apache.calcite.sql.JoinType; import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.runtime.types.CRow; +import org.apache.flink.types.Row; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.HostAndPort; diff --git a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java index 8e807d0a9..ae4fe5a4b 100644 --- a/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java +++ b/redis5/redis5-sink/src/main/java/com/dtstack/flink/sql/sink/redis/RedisOutputFormat.java @@ -19,15 +19,12 @@ package com.dtstack.flink.sql.sink.redis; import com.dtstack.flink.sql.outputformat.AbstractDtRichOutputFormat; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.types.Row; - -import com.dtstack.flink.sql.outputformat.DtRichOutputFormat; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.HostAndPort;