From 93929a7fcb42c277a76e226c50b79c4f08c89f75 Mon Sep 17 00:00:00 2001
From: DongLiang-0 <1747644936@qq.com>
Date: Wed, 23 Aug 2023 17:39:36 +0800
Subject: [PATCH] [Improve]Format the code for the entire project
---
.github/workflows/checkstyle.yaml | 38 ++
.../checkstyle/checkstyle-apache-header.txt | 16 +
.../check/checkstyle/checkstyle.xml | 429 ++++++++++++++++++
.../check/checkstyle/import-control.xml | 43 ++
.../check/checkstyle/suppressions.xml | 63 +++
spark-doris-connector/pom.xml | 37 +-
.../doris/spark/backend/BackendClient.java | 15 +-
.../doris/spark/cfg/ConfigurationOptions.java | 7 +-
.../doris/spark/cfg/PropertiesSettings.java | 4 +-
.../org/apache/doris/spark/cfg/Settings.java | 29 +-
.../apache/doris/spark/cfg/SparkSettings.java | 8 +-
.../doris/spark/exception/DorisException.java | 8 +-
.../exception/DorisInternalException.java | 3 +-
.../spark/exception/StreamLoadException.java | 8 +-
.../load/CachedDorisStreamLoadClient.java | 4 +-
.../doris/spark/load/DorisStreamLoad.java | 67 +--
.../doris/spark/rest/PartitionDefinition.java | 36 +-
.../apache/doris/spark/rest/RestService.java | 226 ++++-----
.../doris/spark/rest/models/Backend.java | 2 +
.../doris/spark/rest/models/BackendRow.java | 11 +-
.../doris/spark/rest/models/BackendV2.java | 2 +
.../apache/doris/spark/rest/models/Field.java | 27 +-
.../doris/spark/rest/models/QueryPlan.java | 6 +-
.../doris/spark/rest/models/RespContent.java | 1 +
.../doris/spark/rest/models/Schema.java | 14 +-
.../doris/spark/rest/models/Tablet.java | 8 +-
.../doris/spark/serialization/Routing.java | 14 +-
.../doris/spark/serialization/RowBatch.java | 49 +-
.../doris/spark/util/ErrorMessages.java | 3 +-
.../org/apache/doris/spark/util/IOUtils.java | 4 +-
.../apache/doris/spark/util/ResponseUtil.java | 13 +-
.../org/apache/doris/spark/package.scala | 4 +-
.../doris/spark/rdd/AbstractDorisRDD.scala | 7 +-
.../apache/doris/spark/rdd/DorisSpark.scala | 1 -
.../doris/spark/rdd/ScalaDorisRDD.scala | 5 +-
.../doris/spark/rdd/ScalaValueReader.scala | 12 +-
.../doris/spark/sql/DorisRelation.scala | 9 +-
.../doris/spark/sql/ScalaDorisRow.scala | 4 +-
.../doris/spark/sql/ScalaDorisRowRDD.scala | 3 +-
.../spark/sql/ScalaDorisRowValueReader.scala | 3 +-
.../apache/doris/spark/sql/SchemaUtils.scala | 6 +-
.../spark/rest/TestPartitionDefinition.java | 6 +-
.../doris/spark/rest/TestRestService.java | 89 ++--
.../doris/spark/rest/models/TestSchema.java | 2 +-
.../spark/serialization/TestRouting.java | 5 +-
.../spark/serialization/TestRowBatch.java | 150 ++----
.../apache/doris/spark/util/DataUtilTest.java | 5 +-
.../doris/spark/util/TestListUtils.java | 1 +
.../doris/spark/sql/TestSparkConnector.scala | 3 +-
49 files changed, 1055 insertions(+), 455 deletions(-)
create mode 100644 .github/workflows/checkstyle.yaml
create mode 100644 spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt
create mode 100644 spark-doris-connector/check/checkstyle/checkstyle.xml
create mode 100644 spark-doris-connector/check/checkstyle/import-control.xml
create mode 100644 spark-doris-connector/check/checkstyle/suppressions.xml
diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml
new file mode 100644
index 00000000..284b5580
--- /dev/null
+++ b/.github/workflows/checkstyle.yaml
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: Code Style Checker
+
+on:
+ pull_request:
+
+jobs:
+ java-checkstyle:
+ name: "CheckStyle"
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v3
+ with:
+ persist-credentials: false
+ submodules: recursive
+
+ - name: Run java checkstyle
+ run:
+ cd spark-doris-connector && mvn clean checkstyle:check
+
diff --git a/spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt b/spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt
new file mode 100644
index 00000000..6e778edd
--- /dev/null
+++ b/spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt
@@ -0,0 +1,16 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
\ No newline at end of file
diff --git a/spark-doris-connector/check/checkstyle/checkstyle.xml b/spark-doris-connector/check/checkstyle/checkstyle.xml
new file mode 100644
index 00000000..fca43b92
--- /dev/null
+++ b/spark-doris-connector/check/checkstyle/checkstyle.xml
@@ -0,0 +1,429 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/spark-doris-connector/check/checkstyle/import-control.xml b/spark-doris-connector/check/checkstyle/import-control.xml
new file mode 100644
index 00000000..9ff32f7d
--- /dev/null
+++ b/spark-doris-connector/check/checkstyle/import-control.xml
@@ -0,0 +1,43 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/spark-doris-connector/check/checkstyle/suppressions.xml b/spark-doris-connector/check/checkstyle/suppressions.xml
new file mode 100644
index 00000000..20e5949e
--- /dev/null
+++ b/spark-doris-connector/check/checkstyle/suppressions.xml
@@ -0,0 +1,63 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml
index 74a53cc0..3b28b0c3 100644
--- a/spark-doris-connector/pom.xml
+++ b/spark-doris-connector/pom.xml
@@ -318,6 +318,42 @@
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 3.1.2
+
+
+ com.puppycrawl.tools
+ checkstyle
+ 9.3
+
+
+
+ check/checkstyle/checkstyle.xml
+ check/checkstyle/suppressions.xml
+ UTF-8
+ true
+ true
+ false
+ true
+
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+
+ org.sonarsource.scanner.maven
+ sonar-maven-plugin
+ 3.9.1.2184
+
@@ -349,7 +385,6 @@
-
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
index aaafe096..00a65fb1 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java
@@ -26,12 +26,13 @@
import org.apache.doris.sdk.thrift.TScanOpenResult;
import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.doris.spark.cfg.ConfigurationOptions;
+import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.DorisInternalException;
-import org.apache.doris.spark.util.ErrorMessages;
-import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.serialization.Routing;
+import org.apache.doris.spark.util.ErrorMessages;
+
import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -46,7 +47,7 @@
* Client to request Doris BE
*/
public class BackendClient {
- private final static Logger logger = LoggerFactory.getLogger(BackendClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(BackendClient.class);
private Routing routing;
@@ -78,7 +79,8 @@ private void open() throws ConnectedFailedException {
logger.debug("Attempt {} to connect {}.", attempt, routing);
try {
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
- transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
+ transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout,
+ connectTimeout);
TProtocol protocol = factory.getProtocol(transport);
client = new TDorisExternalService.Client(protocol);
logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected);
@@ -115,6 +117,7 @@ private void close() {
/**
* Open a scanner for reading Doris data.
+ *
* @param openParams thrift struct to required by request
* @return scan open result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -154,6 +157,7 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedF
/**
* get next row batch from Doris BE
+ *
* @param nextBatchParams thrift struct to required by request
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
@@ -168,7 +172,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
- result = client.getNext(nextBatchParams);
+ result = client.getNext(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@@ -196,6 +200,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor
/**
* close an scanner.
+ *
* @param closeParams thrift struct to required by request
*/
public void closeScanner(TScanCloseParams closeParams) {
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
index 2ab200d8..3c22ad38 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java
@@ -79,8 +79,11 @@ public interface ConfigurationOptions {
String DORIS_SINK_TASK_PARTITION_SIZE = "doris.sink.task.partition.size";
/**
- * Set doris sink task partition size. If you set a small coalesce size and you don't have the action operations, this may result in the same parallelism in your computation.
- * To avoid this, you can use repartition operations. This will add a shuffle step, but means the current upstream partitions will be executed in parallel.
+ * Set doris sink task partition size.
+ * If you set a small coalesce size and you don't have the action operations, this may result in the same
+ * parallelism in your computation.
+ * To avoid this, you can use repartition operations. This will add a shuffle step, but means the current
+ * upstream partitions will be executed in parallel.
*/
String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition";
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java
index cb027667..a87f14b8 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java
@@ -17,10 +17,10 @@
package org.apache.doris.spark.cfg;
-import java.util.Properties;
-
import com.google.common.base.Preconditions;
+import java.util.Properties;
+
public class PropertiesSettings extends Settings {
protected final Properties props;
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
index 798ec8cf..f42d08d7 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java
@@ -17,28 +17,23 @@
package org.apache.doris.spark.cfg;
-import java.util.Enumeration;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.lang3.StringUtils;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.doris.spark.util.IOUtils;
+
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.Properties;
+
public abstract class Settings {
- private final static Logger logger = LoggerFactory.getLogger(Settings.class);
+ private static final Logger logger = LoggerFactory.getLogger(Settings.class);
public abstract String getProperty(String name);
- public abstract void setProperty(String name, String value);
-
- public abstract Properties asProperties();
-
- public abstract Settings copy();
-
public String getProperty(String name, String defaultValue) {
String value = getProperty(name);
if (StringUtils.isEmpty(value)) {
@@ -47,6 +42,12 @@ public String getProperty(String name, String defaultValue) {
return value;
}
+ public abstract void setProperty(String name, String value);
+
+ public abstract Properties asProperties();
+
+ public abstract Settings copy();
+
public Integer getIntegerProperty(String name) {
return getIntegerProperty(name, null);
}
@@ -54,7 +55,7 @@ public Integer getIntegerProperty(String name) {
public Integer getIntegerProperty(String name, Integer defaultValue) {
try {
if (getProperty(name) != null) {
- return Integer.parseInt(getProperty(name));
+ return Integer.parseInt(getProperty(name));
}
} catch (NumberFormatException e) {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, name, getProperty(name));
@@ -80,7 +81,7 @@ public Settings merge(Properties properties) {
Enumeration> propertyNames = properties.propertyNames();
- for (; propertyNames.hasMoreElements();) {
+ for (; propertyNames.hasMoreElements(); ) {
Object prop = propertyNames.nextElement();
if (prop instanceof String) {
Object value = properties.get(prop);
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
index 39fcd75b..f64102e3 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java
@@ -17,16 +17,14 @@
package org.apache.doris.spark.cfg;
-import java.util.Properties;
-
-import org.apache.spark.SparkConf;
-
import com.google.common.base.Preconditions;
-
+import org.apache.spark.SparkConf;
import scala.Option;
import scala.Serializable;
import scala.Tuple2;
+import java.util.Properties;
+
public class SparkSettings extends Settings implements Serializable {
private final SparkConf cfg;
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java
index 6c47db0b..91a69882 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java
@@ -21,18 +21,22 @@ public class DorisException extends Exception {
public DorisException() {
super();
}
+
public DorisException(String message) {
super(message);
}
+
public DorisException(String message, Throwable cause) {
super(message, cause);
}
+
public DorisException(Throwable cause) {
super(cause);
}
+
protected DorisException(String message, Throwable cause,
- boolean enableSuppression,
- boolean writableStackTrace) {
+ boolean enableSuppression,
+ boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java
index 05b560f2..99fb372f 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java
@@ -23,7 +23,8 @@
public class DorisInternalException extends DorisException {
public DorisInternalException(String server, TStatusCode statusCode, List errorMsgs) {
- super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs);
+ super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is "
+ + errorMsgs);
}
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java
index ec9f77f9..b725cc1f 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java
@@ -21,18 +21,22 @@ public class StreamLoadException extends Exception {
public StreamLoadException() {
super();
}
+
public StreamLoadException(String message) {
super(message);
}
+
public StreamLoadException(String message, Throwable cause) {
super(message, cause);
}
+
public StreamLoadException(Throwable cause) {
super(cause);
}
+
protected StreamLoadException(String message, Throwable cause,
- boolean enableSuppression,
- boolean writableStackTrace) {
+ boolean enableSuppression,
+ boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
index d3dab491..8626068b 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java
@@ -17,10 +17,12 @@
package org.apache.doris.spark.load;
+import org.apache.doris.spark.cfg.SparkSettings;
+
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
-import org.apache.doris.spark.cfg.SparkSettings;
+
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
index 4a7b1e05..2b742779 100644
--- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
+++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java
@@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
+
package org.apache.doris.spark.load;
import org.apache.doris.spark.cfg.ConfigurationOptions;
@@ -50,7 +51,6 @@
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
-import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
@@ -65,22 +65,23 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
/**
* DorisStreamLoad
**/
public class DorisStreamLoad implements Serializable {
- private String FIELD_DELIMITER;
- private final String LINE_DELIMITER;
- private static final String NULL_VALUE = "\\N";
-
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class);
- private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
+ private static final String NULL_VALUE = "\\N";
+ private static final List DORIS_SUCCESS_STATUS = new ArrayList<>(
+ Arrays.asList("Success", "Publish Timeout"));
+ private static final long cacheExpireTimeout = 4 * 60;
private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
private static String abortUrlPattern = "http://%s/api/%s/%s/_stream_load_2pc?";
-
+ private final LoadingCache> cache;
+ private final String fileType;
+ private final String LINE_DELIMITER;
+ private String FIELD_DELIMITER;
private String user;
private String passwd;
private String loadUrlStr;
@@ -90,10 +91,6 @@ public class DorisStreamLoad implements Serializable {
private String columns;
private String maxFilterRatio;
private Map streamLoadProp;
- private static final long cacheExpireTimeout = 4 * 60;
- private final LoadingCache> cache;
- private final String fileType;
-
private boolean readJsonByLine = false;
public DorisStreamLoad(SparkSettings settings) {
@@ -106,7 +103,8 @@ public DorisStreamLoad(SparkSettings settings) {
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
this.streamLoadProp = getStreamLoadProp(settings);
- cache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES).build(new BackendCacheLoader(settings));
+ cache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES)
+ .build(new BackendCacheLoader(settings));
fileType = streamLoadProp.getOrDefault("format", "csv");
if ("csv".equals(fileType)) {
FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t"));
@@ -114,7 +112,8 @@ public DorisStreamLoad(SparkSettings settings) {
readJsonByLine = Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false"));
boolean stripOuterArray = Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false"));
if (readJsonByLine && stripOuterArray) {
- throw new IllegalArgumentException("Only one of options 'read_json_by_line' and 'strip_outer_array' can be set to true");
+ throw new IllegalArgumentException(
+ "Only one of options 'read_json_by_line' and 'strip_outer_array' can be set to true");
} else if (!readJsonByLine && !stripOuterArray) {
LOG.info("set default json mode: strip_outer_array");
streamLoadProp.put("strip_outer_array", "true");
@@ -173,7 +172,8 @@ public String toString() {
}
}
- public List loadV2(List> rows, String[] dfColumns, Boolean enable2PC) throws StreamLoadException, JsonProcessingException {
+ public List loadV2(List> rows, String[] dfColumns, Boolean enable2PC)
+ throws StreamLoadException, JsonProcessingException {
List loadData = parseLoadData(rows, dfColumns);
List txnIds = new ArrayList<>(loadData.size());
@@ -213,11 +213,13 @@ public int load(String value, Boolean enable2PC) throws StreamLoadException {
HttpResponse httpResponse = httpClient.execute(httpPut);
responseHttpStatus = httpResponse.getStatusLine().getStatusCode();
String respMsg = httpResponse.getStatusLine().getReasonPhrase();
- String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8);
+ String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()),
+ StandardCharsets.UTF_8);
loadResponse = new LoadResponse(responseHttpStatus, respMsg, response);
} catch (IOException e) {
e.printStackTrace();
- String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark stream load with label: " + label;
+ String err = "http request exception,load url : " + loadUrlStr
+ + ",failed to execute spark stream load with label: " + label;
LOG.warn(err, e);
loadResponse = new LoadResponse(responseHttpStatus, e.getMessage(), err);
}
@@ -314,7 +316,8 @@ public void abort(int txnId) throws StreamLoadException {
});
if (!"Success".equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
- throw new StreamLoadException("try abort committed transaction, " + "do you recover from old savepoint?");
+ throw new StreamLoadException(
+ "try abort committed transaction, " + "do you recover from old savepoint?");
}
LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnId, res.get("msg"));
}
@@ -359,7 +362,8 @@ private static String getAuthEncoded(String user, String passwd) {
/**
* serializable be cache loader
*/
- private static class BackendCacheLoader extends CacheLoader> implements Serializable {
+ private static class BackendCacheLoader extends CacheLoader>
+ implements Serializable {
private final SparkSettings settings;
@@ -374,20 +378,18 @@ public List load(String key) throws Exception {
}
- private List parseLoadData(List> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException {
+ private List parseLoadData(List> rows, String[] dfColumns)
+ throws StreamLoadException, JsonProcessingException {
List loadDataList;
switch (fileType.toUpperCase()) {
case "CSV":
- loadDataList = Collections.singletonList(
- rows.stream()
- .map(row -> row.stream()
- .map(DataUtil::handleColumnValue)
- .map(Object::toString)
- .collect(Collectors.joining(FIELD_DELIMITER))
- ).collect(Collectors.joining(LINE_DELIMITER)));
+ loadDataList = Collections.singletonList(rows.stream()
+ .map(row -> row.stream().map(DataUtil::handleColumnValue).map(Object::toString)
+ .collect(Collectors.joining(FIELD_DELIMITER)))
+ .collect(Collectors.joining(LINE_DELIMITER)));
break;
case "JSON":
List