From 93929a7fcb42c277a76e226c50b79c4f08c89f75 Mon Sep 17 00:00:00 2001 From: DongLiang-0 <1747644936@qq.com> Date: Wed, 23 Aug 2023 17:39:36 +0800 Subject: [PATCH] [Improve]Format the code for the entire project --- .github/workflows/checkstyle.yaml | 38 ++ .../checkstyle/checkstyle-apache-header.txt | 16 + .../check/checkstyle/checkstyle.xml | 429 ++++++++++++++++++ .../check/checkstyle/import-control.xml | 43 ++ .../check/checkstyle/suppressions.xml | 63 +++ spark-doris-connector/pom.xml | 37 +- .../doris/spark/backend/BackendClient.java | 15 +- .../doris/spark/cfg/ConfigurationOptions.java | 7 +- .../doris/spark/cfg/PropertiesSettings.java | 4 +- .../org/apache/doris/spark/cfg/Settings.java | 29 +- .../apache/doris/spark/cfg/SparkSettings.java | 8 +- .../doris/spark/exception/DorisException.java | 8 +- .../exception/DorisInternalException.java | 3 +- .../spark/exception/StreamLoadException.java | 8 +- .../load/CachedDorisStreamLoadClient.java | 4 +- .../doris/spark/load/DorisStreamLoad.java | 67 +-- .../doris/spark/rest/PartitionDefinition.java | 36 +- .../apache/doris/spark/rest/RestService.java | 226 ++++----- .../doris/spark/rest/models/Backend.java | 2 + .../doris/spark/rest/models/BackendRow.java | 11 +- .../doris/spark/rest/models/BackendV2.java | 2 + .../apache/doris/spark/rest/models/Field.java | 27 +- .../doris/spark/rest/models/QueryPlan.java | 6 +- .../doris/spark/rest/models/RespContent.java | 1 + .../doris/spark/rest/models/Schema.java | 14 +- .../doris/spark/rest/models/Tablet.java | 8 +- .../doris/spark/serialization/Routing.java | 14 +- .../doris/spark/serialization/RowBatch.java | 49 +- .../doris/spark/util/ErrorMessages.java | 3 +- .../org/apache/doris/spark/util/IOUtils.java | 4 +- .../apache/doris/spark/util/ResponseUtil.java | 13 +- .../org/apache/doris/spark/package.scala | 4 +- .../doris/spark/rdd/AbstractDorisRDD.scala | 7 +- .../apache/doris/spark/rdd/DorisSpark.scala | 1 - .../doris/spark/rdd/ScalaDorisRDD.scala | 5 +- .../doris/spark/rdd/ScalaValueReader.scala | 12 +- .../doris/spark/sql/DorisRelation.scala | 9 +- .../doris/spark/sql/ScalaDorisRow.scala | 4 +- .../doris/spark/sql/ScalaDorisRowRDD.scala | 3 +- .../spark/sql/ScalaDorisRowValueReader.scala | 3 +- .../apache/doris/spark/sql/SchemaUtils.scala | 6 +- .../spark/rest/TestPartitionDefinition.java | 6 +- .../doris/spark/rest/TestRestService.java | 89 ++-- .../doris/spark/rest/models/TestSchema.java | 2 +- .../spark/serialization/TestRouting.java | 5 +- .../spark/serialization/TestRowBatch.java | 150 ++---- .../apache/doris/spark/util/DataUtilTest.java | 5 +- .../doris/spark/util/TestListUtils.java | 1 + .../doris/spark/sql/TestSparkConnector.scala | 3 +- 49 files changed, 1055 insertions(+), 455 deletions(-) create mode 100644 .github/workflows/checkstyle.yaml create mode 100644 spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt create mode 100644 spark-doris-connector/check/checkstyle/checkstyle.xml create mode 100644 spark-doris-connector/check/checkstyle/import-control.xml create mode 100644 spark-doris-connector/check/checkstyle/suppressions.xml diff --git a/.github/workflows/checkstyle.yaml b/.github/workflows/checkstyle.yaml new file mode 100644 index 00000000..284b5580 --- /dev/null +++ b/.github/workflows/checkstyle.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +--- +name: Code Style Checker + +on: + pull_request: + +jobs: + java-checkstyle: + name: "CheckStyle" + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + persist-credentials: false + submodules: recursive + + - name: Run java checkstyle + run: + cd spark-doris-connector && mvn clean checkstyle:check + diff --git a/spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt b/spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt new file mode 100644 index 00000000..6e778edd --- /dev/null +++ b/spark-doris-connector/check/checkstyle/checkstyle-apache-header.txt @@ -0,0 +1,16 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. \ No newline at end of file diff --git a/spark-doris-connector/check/checkstyle/checkstyle.xml b/spark-doris-connector/check/checkstyle/checkstyle.xml new file mode 100644 index 00000000..fca43b92 --- /dev/null +++ b/spark-doris-connector/check/checkstyle/checkstyle.xml @@ -0,0 +1,429 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-doris-connector/check/checkstyle/import-control.xml b/spark-doris-connector/check/checkstyle/import-control.xml new file mode 100644 index 00000000..9ff32f7d --- /dev/null +++ b/spark-doris-connector/check/checkstyle/import-control.xml @@ -0,0 +1,43 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-doris-connector/check/checkstyle/suppressions.xml b/spark-doris-connector/check/checkstyle/suppressions.xml new file mode 100644 index 00000000..20e5949e --- /dev/null +++ b/spark-doris-connector/check/checkstyle/suppressions.xml @@ -0,0 +1,63 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 74a53cc0..3b28b0c3 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -318,6 +318,42 @@ + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.1.2 + + + com.puppycrawl.tools + checkstyle + 9.3 + + + + check/checkstyle/checkstyle.xml + check/checkstyle/suppressions.xml + UTF-8 + true + true + false + true + + + + validate + validate + + check + + + + + + org.sonarsource.scanner.maven + sonar-maven-plugin + 3.9.1.2184 + @@ -349,7 +385,6 @@ - diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java index aaafe096..00a65fb1 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/backend/BackendClient.java @@ -26,12 +26,13 @@ import org.apache.doris.sdk.thrift.TScanOpenResult; import org.apache.doris.sdk.thrift.TStatusCode; import org.apache.doris.spark.cfg.ConfigurationOptions; +import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.exception.ConnectedFailedException; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.DorisInternalException; -import org.apache.doris.spark.util.ErrorMessages; -import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.serialization.Routing; +import org.apache.doris.spark.util.ErrorMessages; + import org.apache.thrift.TConfiguration; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; @@ -46,7 +47,7 @@ * Client to request Doris BE */ public class BackendClient { - private final static Logger logger = LoggerFactory.getLogger(BackendClient.class); + private static final Logger logger = LoggerFactory.getLogger(BackendClient.class); private Routing routing; @@ -78,7 +79,8 @@ private void open() throws ConnectedFailedException { logger.debug("Attempt {} to connect {}.", attempt, routing); try { TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory(); - transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, connectTimeout); + transport = new TSocket(new TConfiguration(), routing.getHost(), routing.getPort(), socketTimeout, + connectTimeout); TProtocol protocol = factory.getProtocol(transport); client = new TDorisExternalService.Client(protocol); logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected); @@ -115,6 +117,7 @@ private void close() { /** * Open a scanner for reading Doris data. + * * @param openParams thrift struct to required by request * @return scan open result * @throws ConnectedFailedException throw if cannot connect to Doris BE @@ -154,6 +157,7 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedF /** * get next row batch from Doris BE + * * @param nextBatchParams thrift struct to required by request * @return scan batch result * @throws ConnectedFailedException throw if cannot connect to Doris BE @@ -168,7 +172,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to getNext {}.", attempt, routing); try { - result = client.getNext(nextBatchParams); + result = client.getNext(nextBatchParams); if (result == null) { logger.warn("GetNext result from {} is null.", routing); continue; @@ -196,6 +200,7 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Dor /** * close an scanner. + * * @param closeParams thrift struct to required by request */ public void closeScanner(TScanCloseParams closeParams) { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 2ab200d8..3c22ad38 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -79,8 +79,11 @@ public interface ConfigurationOptions { String DORIS_SINK_TASK_PARTITION_SIZE = "doris.sink.task.partition.size"; /** - * Set doris sink task partition size. If you set a small coalesce size and you don't have the action operations, this may result in the same parallelism in your computation. - * To avoid this, you can use repartition operations. This will add a shuffle step, but means the current upstream partitions will be executed in parallel. + * Set doris sink task partition size. + * If you set a small coalesce size and you don't have the action operations, this may result in the same + * parallelism in your computation. + * To avoid this, you can use repartition operations. This will add a shuffle step, but means the current + * upstream partitions will be executed in parallel. */ String DORIS_SINK_TASK_USE_REPARTITION = "doris.sink.task.use.repartition"; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java index cb027667..a87f14b8 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/PropertiesSettings.java @@ -17,10 +17,10 @@ package org.apache.doris.spark.cfg; -import java.util.Properties; - import com.google.common.base.Preconditions; +import java.util.Properties; + public class PropertiesSettings extends Settings { protected final Properties props; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java index 798ec8cf..f42d08d7 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/Settings.java @@ -17,28 +17,23 @@ package org.apache.doris.spark.cfg; -import java.util.Enumeration; -import java.util.Map; -import java.util.Properties; - -import org.apache.commons.lang3.StringUtils; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.util.ErrorMessages; import org.apache.doris.spark.util.IOUtils; + +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Enumeration; +import java.util.Map; +import java.util.Properties; + public abstract class Settings { - private final static Logger logger = LoggerFactory.getLogger(Settings.class); + private static final Logger logger = LoggerFactory.getLogger(Settings.class); public abstract String getProperty(String name); - public abstract void setProperty(String name, String value); - - public abstract Properties asProperties(); - - public abstract Settings copy(); - public String getProperty(String name, String defaultValue) { String value = getProperty(name); if (StringUtils.isEmpty(value)) { @@ -47,6 +42,12 @@ public String getProperty(String name, String defaultValue) { return value; } + public abstract void setProperty(String name, String value); + + public abstract Properties asProperties(); + + public abstract Settings copy(); + public Integer getIntegerProperty(String name) { return getIntegerProperty(name, null); } @@ -54,7 +55,7 @@ public Integer getIntegerProperty(String name) { public Integer getIntegerProperty(String name, Integer defaultValue) { try { if (getProperty(name) != null) { - return Integer.parseInt(getProperty(name)); + return Integer.parseInt(getProperty(name)); } } catch (NumberFormatException e) { logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, name, getProperty(name)); @@ -80,7 +81,7 @@ public Settings merge(Properties properties) { Enumeration propertyNames = properties.propertyNames(); - for (; propertyNames.hasMoreElements();) { + for (; propertyNames.hasMoreElements(); ) { Object prop = propertyNames.nextElement(); if (prop instanceof String) { Object value = properties.get(prop); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java index 39fcd75b..f64102e3 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/SparkSettings.java @@ -17,16 +17,14 @@ package org.apache.doris.spark.cfg; -import java.util.Properties; - -import org.apache.spark.SparkConf; - import com.google.common.base.Preconditions; - +import org.apache.spark.SparkConf; import scala.Option; import scala.Serializable; import scala.Tuple2; +import java.util.Properties; + public class SparkSettings extends Settings implements Serializable { private final SparkConf cfg; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java index 6c47db0b..91a69882 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisException.java @@ -21,18 +21,22 @@ public class DorisException extends Exception { public DorisException() { super(); } + public DorisException(String message) { super(message); } + public DorisException(String message, Throwable cause) { super(message, cause); } + public DorisException(Throwable cause) { super(cause); } + protected DorisException(String message, Throwable cause, - boolean enableSuppression, - boolean writableStackTrace) { + boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java index 05b560f2..99fb372f 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/DorisInternalException.java @@ -23,7 +23,8 @@ public class DorisInternalException extends DorisException { public DorisInternalException(String server, TStatusCode statusCode, List errorMsgs) { - super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs); + super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + + errorMsgs); } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java index ec9f77f9..b725cc1f 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/exception/StreamLoadException.java @@ -21,18 +21,22 @@ public class StreamLoadException extends Exception { public StreamLoadException() { super(); } + public StreamLoadException(String message) { super(message); } + public StreamLoadException(String message, Throwable cause) { super(message, cause); } + public StreamLoadException(Throwable cause) { super(cause); } + protected StreamLoadException(String message, Throwable cause, - boolean enableSuppression, - boolean writableStackTrace) { + boolean enableSuppression, + boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java index d3dab491..8626068b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/CachedDorisStreamLoadClient.java @@ -17,10 +17,12 @@ package org.apache.doris.spark.load; +import org.apache.doris.spark.cfg.SparkSettings; + import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import org.apache.doris.spark.cfg.SparkSettings; + import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java index 4a7b1e05..2b742779 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + package org.apache.doris.spark.load; import org.apache.doris.spark.cfg.ConfigurationOptions; @@ -50,7 +51,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.charset.StandardCharsets; -import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; @@ -65,22 +65,23 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; - /** * DorisStreamLoad **/ public class DorisStreamLoad implements Serializable { - private String FIELD_DELIMITER; - private final String LINE_DELIMITER; - private static final String NULL_VALUE = "\\N"; - private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoad.class); - private final static List DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); + private static final String NULL_VALUE = "\\N"; + private static final List DORIS_SUCCESS_STATUS = new ArrayList<>( + Arrays.asList("Success", "Publish Timeout")); + private static final long cacheExpireTimeout = 4 * 60; private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; private static String abortUrlPattern = "http://%s/api/%s/%s/_stream_load_2pc?"; - + private final LoadingCache> cache; + private final String fileType; + private final String LINE_DELIMITER; + private String FIELD_DELIMITER; private String user; private String passwd; private String loadUrlStr; @@ -90,10 +91,6 @@ public class DorisStreamLoad implements Serializable { private String columns; private String maxFilterRatio; private Map streamLoadProp; - private static final long cacheExpireTimeout = 4 * 60; - private final LoadingCache> cache; - private final String fileType; - private boolean readJsonByLine = false; public DorisStreamLoad(SparkSettings settings) { @@ -106,7 +103,8 @@ public DorisStreamLoad(SparkSettings settings) { this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); this.streamLoadProp = getStreamLoadProp(settings); - cache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES).build(new BackendCacheLoader(settings)); + cache = CacheBuilder.newBuilder().expireAfterWrite(cacheExpireTimeout, TimeUnit.MINUTES) + .build(new BackendCacheLoader(settings)); fileType = streamLoadProp.getOrDefault("format", "csv"); if ("csv".equals(fileType)) { FIELD_DELIMITER = escapeString(streamLoadProp.getOrDefault("column_separator", "\t")); @@ -114,7 +112,8 @@ public DorisStreamLoad(SparkSettings settings) { readJsonByLine = Boolean.parseBoolean(streamLoadProp.getOrDefault("read_json_by_line", "false")); boolean stripOuterArray = Boolean.parseBoolean(streamLoadProp.getOrDefault("strip_outer_array", "false")); if (readJsonByLine && stripOuterArray) { - throw new IllegalArgumentException("Only one of options 'read_json_by_line' and 'strip_outer_array' can be set to true"); + throw new IllegalArgumentException( + "Only one of options 'read_json_by_line' and 'strip_outer_array' can be set to true"); } else if (!readJsonByLine && !stripOuterArray) { LOG.info("set default json mode: strip_outer_array"); streamLoadProp.put("strip_outer_array", "true"); @@ -173,7 +172,8 @@ public String toString() { } } - public List loadV2(List> rows, String[] dfColumns, Boolean enable2PC) throws StreamLoadException, JsonProcessingException { + public List loadV2(List> rows, String[] dfColumns, Boolean enable2PC) + throws StreamLoadException, JsonProcessingException { List loadData = parseLoadData(rows, dfColumns); List txnIds = new ArrayList<>(loadData.size()); @@ -213,11 +213,13 @@ public int load(String value, Boolean enable2PC) throws StreamLoadException { HttpResponse httpResponse = httpClient.execute(httpPut); responseHttpStatus = httpResponse.getStatusLine().getStatusCode(); String respMsg = httpResponse.getStatusLine().getReasonPhrase(); - String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), StandardCharsets.UTF_8); + String response = EntityUtils.toString(new BufferedHttpEntity(httpResponse.getEntity()), + StandardCharsets.UTF_8); loadResponse = new LoadResponse(responseHttpStatus, respMsg, response); } catch (IOException e) { e.printStackTrace(); - String err = "http request exception,load url : " + loadUrlStr + ",failed to execute spark stream load with label: " + label; + String err = "http request exception,load url : " + loadUrlStr + + ",failed to execute spark stream load with label: " + label; LOG.warn(err, e); loadResponse = new LoadResponse(responseHttpStatus, e.getMessage(), err); } @@ -314,7 +316,8 @@ public void abort(int txnId) throws StreamLoadException { }); if (!"Success".equals(res.get("status"))) { if (ResponseUtil.isCommitted(res.get("msg"))) { - throw new StreamLoadException("try abort committed transaction, " + "do you recover from old savepoint?"); + throw new StreamLoadException( + "try abort committed transaction, " + "do you recover from old savepoint?"); } LOG.warn("Fail to abort transaction. txnId: {}, error: {}", txnId, res.get("msg")); } @@ -359,7 +362,8 @@ private static String getAuthEncoded(String user, String passwd) { /** * serializable be cache loader */ - private static class BackendCacheLoader extends CacheLoader> implements Serializable { + private static class BackendCacheLoader extends CacheLoader> + implements Serializable { private final SparkSettings settings; @@ -374,20 +378,18 @@ public List load(String key) throws Exception { } - private List parseLoadData(List> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException { + private List parseLoadData(List> rows, String[] dfColumns) + throws StreamLoadException, JsonProcessingException { List loadDataList; switch (fileType.toUpperCase()) { case "CSV": - loadDataList = Collections.singletonList( - rows.stream() - .map(row -> row.stream() - .map(DataUtil::handleColumnValue) - .map(Object::toString) - .collect(Collectors.joining(FIELD_DELIMITER)) - ).collect(Collectors.joining(LINE_DELIMITER))); + loadDataList = Collections.singletonList(rows.stream() + .map(row -> row.stream().map(DataUtil::handleColumnValue).map(Object::toString) + .collect(Collectors.joining(FIELD_DELIMITER))) + .collect(Collectors.joining(LINE_DELIMITER))); break; case "JSON": List> dataList = new ArrayList<>(); @@ -402,9 +404,11 @@ private List parseLoadData(List> rows, String[] dfColumns) dataList.add(dataMap); } } catch (Exception e) { - throw new StreamLoadException("The number of configured columns does not match the number of data columns."); + throw new StreamLoadException( + "The number of configured columns does not match the number of data columns."); } - // splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception + // splits large collections to normal collection to avoid + // the "Requested array size exceeds VM limit" exception loadDataList = ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null); break; default: @@ -419,7 +423,10 @@ private List parseLoadData(List> rows, String[] dfColumns) private String generateLoadLabel() { Calendar calendar = Calendar.getInstance(); - return String.format("spark_streamload_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", "")); + return String.format("spark_streamload_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), + calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), + calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), + UUID.randomUUID().toString().replaceAll("-", "")); } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/PartitionDefinition.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/PartitionDefinition.java index 0c2aae31..b807aa53 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/PartitionDefinition.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/PartitionDefinition.java @@ -17,16 +17,16 @@ package org.apache.doris.spark.rest; +import org.apache.doris.spark.cfg.PropertiesSettings; +import org.apache.doris.spark.cfg.Settings; +import org.apache.doris.spark.exception.IllegalArgumentException; + import java.io.Serializable; import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Set; -import org.apache.doris.spark.cfg.PropertiesSettings; -import org.apache.doris.spark.cfg.Settings; -import org.apache.doris.spark.exception.IllegalArgumentException; - /** * Doris RDD partition info. */ @@ -108,7 +108,7 @@ public int compareTo(PartitionDefinition o) { similar.retainAll(o.tabletIds); diffSelf.removeAll(similar); diffOther.removeAll(similar); - if (diffSelf.size() == 0) { + if (diffSelf.size() == 0) { return 0; } long diff = Collections.min(diffSelf) - Collections.min(diffOther); @@ -124,12 +124,12 @@ public boolean equals(Object o) { return false; } PartitionDefinition that = (PartitionDefinition) o; - return Objects.equals(database, that.database) && - Objects.equals(table, that.table) && - Objects.equals(beAddress, that.beAddress) && - Objects.equals(tabletIds, that.tabletIds) && - Objects.equals(queryPlan, that.queryPlan) && - Objects.equals(serializedSettings, that.serializedSettings); + return Objects.equals(database, that.database) + && Objects.equals(table, that.table) + && Objects.equals(beAddress, that.beAddress) + && Objects.equals(tabletIds, that.tabletIds) + && Objects.equals(queryPlan, that.queryPlan) + && Objects.equals(serializedSettings, that.serializedSettings); } @Override @@ -144,12 +144,12 @@ public int hashCode() { @Override public String toString() { - return "PartitionDefinition{" + - ", database='" + database + '\'' + - ", table='" + table + '\'' + - ", beAddress='" + beAddress + '\'' + - ", tabletIds=" + tabletIds + - ", queryPlan='" + queryPlan + '\'' + - '}'; + return "PartitionDefinition{" + + ", database='" + database + '\'' + + ", table='" + table + '\'' + + ", beAddress='" + beAddress + '\'' + + ", tabletIds=" + tabletIds + + ", queryPlan='" + queryPlan + '\'' + + '}'; } } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java index a00385cd..588a009b 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -17,6 +17,26 @@ package org.apache.doris.spark.rest; +import org.apache.doris.spark.cfg.ConfigurationOptions; +import org.apache.doris.spark.cfg.Settings; +import org.apache.doris.spark.cfg.SparkSettings; +import org.apache.doris.spark.exception.ConnectedFailedException; +import org.apache.doris.spark.exception.DorisException; +import org.apache.doris.spark.exception.IllegalArgumentException; +import org.apache.doris.spark.exception.ShouldNeverHappenException; +import org.apache.doris.spark.rest.models.Backend; +import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.BackendV2; +import org.apache.doris.spark.rest.models.QueryPlan; +import org.apache.doris.spark.rest.models.Schema; +import org.apache.doris.spark.rest.models.Tablet; + +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FILTER_QUERY; import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD; @@ -30,6 +50,13 @@ import static org.apache.doris.spark.util.ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE; import static org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; +import org.apache.http.HttpStatus; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpRequestBase; +import org.apache.http.entity.StringEntity; +import org.slf4j.Logger; import java.io.BufferedReader; import java.io.IOException; @@ -40,50 +67,22 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.HashMap; -import java.util.Base64; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; -import java.util.ArrayList; +import java.util.Map; import java.util.Set; -import java.util.HashSet; import java.util.stream.Collectors; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.core.JsonParseException; -import com.fasterxml.jackson.databind.JsonMappingException; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.doris.spark.cfg.ConfigurationOptions; -import org.apache.doris.spark.cfg.Settings; -import org.apache.doris.spark.cfg.SparkSettings; -import org.apache.doris.spark.exception.ConnectedFailedException; -import org.apache.doris.spark.exception.DorisException; -import org.apache.doris.spark.exception.IllegalArgumentException; -import org.apache.doris.spark.exception.ShouldNeverHappenException; -import org.apache.doris.spark.rest.models.Backend; -import org.apache.doris.spark.rest.models.BackendRow; -import org.apache.doris.spark.rest.models.BackendV2; -import org.apache.doris.spark.rest.models.QueryPlan; -import org.apache.doris.spark.rest.models.Schema; -import org.apache.doris.spark.rest.models.Tablet; -import org.apache.http.HttpStatus; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpRequestBase; -import org.apache.http.entity.StringEntity; -import org.slf4j.Logger; - -import com.google.common.annotations.VisibleForTesting; - /** * Service for communicate with Doris FE. */ public class RestService implements Serializable { - public final static int REST_RESPONSE_STATUS_OK = 200; + public static final int REST_RESPONSE_STATUS_OK = 200; private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; @@ -93,27 +92,25 @@ public class RestService implements Serializable { /** * send request to Doris FE and get response json string. + * * @param cfg configuration of request * @param request {@link HttpRequestBase} real request * @param logger {@link Logger} * @return Doris FE response in json string * @throws ConnectedFailedException throw when cannot connect to Doris FE */ - private static String send(Settings cfg, HttpRequestBase request, Logger logger) throws - ConnectedFailedException { + private static String send(Settings cfg, HttpRequestBase request, Logger logger) throws ConnectedFailedException { int connectTimeout = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT); int socketTimeout = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS, ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT); int retries = cfg.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES, ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT); - logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", - connectTimeout, socketTimeout, retries); + logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.", connectTimeout, + socketTimeout, retries); - RequestConfig requestConfig = RequestConfig.custom() - .setConnectTimeout(connectTimeout) - .setSocketTimeout(socketTimeout) - .build(); + RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(connectTimeout) + .setSocketTimeout(socketTimeout).build(); request.setConfig(requestConfig); String user = cfg.getProperty(DORIS_REQUEST_AUTH_USER, ""); @@ -126,21 +123,20 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger) logger.debug("Attempt {} to request {}.", attempt, request.getURI()); try { String response; - if (request instanceof HttpGet){ - response = getConnectionGet(request.getURI().toString(), user, password,logger); + if (request instanceof HttpGet) { + response = getConnectionGet(request.getURI().toString(), user, password, logger); } else { - response = getConnectionPost(request,user, password,logger); + response = getConnectionPost(request, user, password, logger); } if (response == null) { - logger.warn("Failed to get response from Doris FE {}, http code is {}", - request.getURI(), statusCode); + logger.warn("Failed to get response from Doris FE {}, http code is {}", request.getURI(), + statusCode); continue; } - logger.trace("Success get response from Doris FE: {}, response is: {}.", - request.getURI(), response); + logger.trace("Success get response from Doris FE: {}, response is: {}.", request.getURI(), response); ObjectMapper mapper = new ObjectMapper(); Map map = mapper.readValue(response, Map.class); - //Handle the problem of inconsistent data format returned by http v1 and v2 + // Handle the problem of inconsistent data format returned by http v1 and v2 if (map.containsKey("code") && map.containsKey("msg")) { Object data = map.get("data"); return mapper.writeValueAsString(data); @@ -157,21 +153,23 @@ private static String send(Settings cfg, HttpRequestBase request, Logger logger) throw new ConnectedFailedException(request.getURI().toString(), statusCode, ex); } - private static String getConnectionGet(String request,String user, String passwd,Logger logger) throws IOException { + private static String getConnectionGet(String request, String user, String passwd, Logger logger) + throws IOException { URL realUrl = new URL(request); // open connection - HttpURLConnection connection = (HttpURLConnection)realUrl.openConnection(); - String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + HttpURLConnection connection = (HttpURLConnection) realUrl.openConnection(); + String authEncoding = Base64.getEncoder() + .encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); connection.setRequestProperty("Authorization", "Basic " + authEncoding); connection.connect(); - return parseResponse(connection,logger); + return parseResponse(connection, logger); } - private static String parseResponse(HttpURLConnection connection,Logger logger) throws IOException { + private static String parseResponse(HttpURLConnection connection, Logger logger) throws IOException { if (connection.getResponseCode() != HttpStatus.SC_OK) { - logger.warn("Failed to get response from Doris {}, http code is {}", - connection.getURL(), connection.getResponseCode()); + logger.warn("Failed to get response from Doris {}, http code is {}", connection.getURL(), + connection.getResponseCode()); throw new IOException("Failed to get response from Doris"); } StringBuilder result = new StringBuilder(""); @@ -186,14 +184,16 @@ private static String parseResponse(HttpURLConnection connection,Logger logger) return result.toString(); } - private static String getConnectionPost(HttpRequestBase request,String user, String passwd,Logger logger) throws IOException { + private static String getConnectionPost(HttpRequestBase request, String user, String passwd, Logger logger) + throws IOException { URL url = new URL(request.getURI().toString()); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); conn.setRequestMethod(request.getMethod()); - String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + String authEncoding = Base64.getEncoder() + .encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); conn.setRequestProperty("Authorization", "Basic " + authEncoding); - InputStream content = ((HttpPost)request).getEntity().getContent(); + InputStream content = ((HttpPost) request).getEntity().getContent(); String res = IOUtils.toString(content); conn.setDoOutput(true); conn.setDoInput(true); @@ -203,10 +203,12 @@ private static String getConnectionPost(HttpRequestBase request,String user, Str // flush out.flush(); // read response - return parseResponse(conn,logger); + return parseResponse(conn, logger); } + /** * parse table identifier to array. + * * @param tableIdentifier table identifier string * @param logger {@link Logger} * @return first element is db name, second element is table name @@ -229,6 +231,7 @@ static String[] parseIdentifier(String tableIdentifier, Logger logger) throws Il /** * choice a Doris FE node to request. + * * @param feNodes Doris FE node list, separate be comma * @param logger slf4j logger * @return the chosen one Doris FE node @@ -248,6 +251,7 @@ static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgume /** * get a valid URI to connect Doris FE. + * * @param cfg configuration of request * @param logger {@link Logger} * @return uri string @@ -256,43 +260,36 @@ static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgume @VisibleForTesting static String getUriStr(Settings cfg, Logger logger) throws IllegalArgumentException { String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger); - return "http://" + - randomEndpoint(cfg.getProperty(DORIS_FENODES), logger) + API_PREFIX + - "/" + identifier[0] + - "/" + identifier[1] + - "/"; + return "http://" + randomEndpoint(cfg.getProperty(DORIS_FENODES), logger) + API_PREFIX + "/" + identifier[0] + + "/" + identifier[1] + "/"; } @VisibleForTesting - static String getUriStr(String feNode,Settings cfg, Logger logger) throws IllegalArgumentException { + static String getUriStr(String feNode, Settings cfg, Logger logger) throws IllegalArgumentException { String[] identifier = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger); - return "http://" + - feNode + API_PREFIX + - "/" + identifier[0] + - "/" + identifier[1] + - "/"; + return "http://" + feNode + API_PREFIX + "/" + identifier[0] + "/" + identifier[1] + "/"; } - /** * discover Doris table schema from Doris FE. + * * @param cfg configuration of request * @param logger slf4j logger * @return Doris table schema * @throws DorisException throw when discover failed */ - public static Schema getSchema(Settings cfg, Logger logger) - throws DorisException { + public static Schema getSchema(Settings cfg, Logger logger) throws DorisException { logger.trace("Finding schema."); List feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger); - for (String feNode: feNodeList) { + for (String feNode : feNodeList) { try { - HttpGet httpGet = new HttpGet(getUriStr(feNode,cfg, logger) + SCHEMA); + HttpGet httpGet = new HttpGet(getUriStr(feNode, cfg, logger) + SCHEMA); String response = send(cfg, httpGet, logger); logger.debug("Find schema response is '{}'.", response); return parseSchema(response, logger); } catch (ConnectedFailedException e) { - logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, + e.getMessage()); } } String errMsg = "No Doris FE is available, please check configuration"; @@ -302,6 +299,7 @@ public static Schema getSchema(Settings cfg, Logger logger) /** * translate Doris FE response to inner {@link Schema} struct. + * * @param response Doris FE response * @param logger {@link Logger} * @return inner {@link Schema} struct @@ -344,6 +342,7 @@ public static Schema parseSchema(String response, Logger logger) throws DorisExc /** * find Doris RDD partitions from Doris FE. + * * @param cfg configuration of request * @param logger {@link Logger} * @return an list of Doris RDD partitions @@ -351,18 +350,18 @@ public static Schema parseSchema(String response, Logger logger) throws DorisExc */ public static List findPartitions(Settings cfg, Logger logger) throws DorisException { String[] tableIdentifiers = parseIdentifier(cfg.getProperty(DORIS_TABLE_IDENTIFIER), logger); - String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") + - " from `" + tableIdentifiers[0] + "`.`" + tableIdentifiers[1] + "`"; + String sql = "select " + cfg.getProperty(DORIS_READ_FIELD, "*") + " from `" + tableIdentifiers[0] + "`.`" + + tableIdentifiers[1] + "`"; if (!StringUtils.isEmpty(cfg.getProperty(DORIS_FILTER_QUERY))) { sql += " where " + cfg.getProperty(DORIS_FILTER_QUERY); } logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); List feNodeList = allEndpoints(cfg.getProperty(DORIS_FENODES), logger); - for (String feNode: feNodeList) { + for (String feNode : feNodeList) { try { - HttpPost httpPost = new HttpPost(getUriStr(feNode,cfg, logger) + QUERY_PLAN); - String entity = "{\"sql\": \""+ sql +"\"}"; + HttpPost httpPost = new HttpPost(getUriStr(feNode, cfg, logger) + QUERY_PLAN); + String entity = "{\"sql\": \"" + sql + "\"}"; logger.debug("Post body Sending to Doris FE is: '{}'.", entity); StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); stringEntity.setContentEncoding("UTF-8"); @@ -373,15 +372,11 @@ public static List findPartitions(Settings cfg, Logger logg logger.debug("Find partition response is '{}'.", resStr); QueryPlan queryPlan = getQueryPlan(resStr, logger); Map> be2Tablets = selectBeForTablet(queryPlan, logger); - return tabletsMapToPartition( - cfg, - be2Tablets, - queryPlan.getOpaqued_query_plan(), - tableIdentifiers[0], - tableIdentifiers[1], - logger); + return tabletsMapToPartition(cfg, be2Tablets, queryPlan.getOpaqued_query_plan(), tableIdentifiers[0], + tableIdentifiers[1], logger); } catch (ConnectedFailedException e) { - logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, + e.getMessage()); } } String errMsg = "No Doris FE is available, please check configuration"; @@ -392,6 +387,7 @@ public static List findPartitions(Settings cfg, Logger logg /** * translate Doris FE response string to inner {@link QueryPlan} struct. + * * @param response Doris FE response string * @param logger {@link Logger} * @return inner {@link QueryPlan} struct @@ -433,13 +429,14 @@ static QueryPlan getQueryPlan(String response, Logger logger) throws DorisExcept /** * select which Doris BE to get tablet data. + * * @param queryPlan {@link QueryPlan} translated from Doris FE response * @param logger {@link Logger} * @return BE to tablets {@link Map} * @throws DorisException throw when select failed. */ @VisibleForTesting - static Map> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException { + static Map> selectBeForTablet(QueryPlan queryPlan, Logger logger) throws DorisException { Map> be2Tablets = new HashMap<>(); for (Map.Entry part : queryPlan.getPartitions().entrySet()) { logger.debug("Parse tablet info: '{}'.", part); @@ -484,6 +481,7 @@ static Map> selectBeForTablet(QueryPlan queryPlan, Logger lo /** * tablet count limit for one Doris RDD partition + * * @param cfg configuration of request * @param logger {@link Logger} * @return tablet count limit @@ -499,8 +497,8 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) { } } if (tabletsSize < DORIS_TABLET_SIZE_MIN) { - logger.warn("{} is less than {}, set to default value {}.", - DORIS_TABLET_SIZE, DORIS_TABLET_SIZE_MIN, DORIS_TABLET_SIZE_MIN); + logger.warn("{} is less than {}, set to default value {}.", DORIS_TABLET_SIZE, DORIS_TABLET_SIZE_MIN, + DORIS_TABLET_SIZE_MIN); tabletsSize = DORIS_TABLET_SIZE_MIN; } logger.debug("Tablet size is set to {}.", tabletsSize); @@ -509,27 +507,29 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) { /** * choice a Doris BE node to request. + * * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal - * Deprecated, use randomBackendV2 instead + * Deprecated, use randomBackendV2 instead */ @Deprecated @VisibleForTesting - public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException { + public static String randomBackend(SparkSettings sparkSettings, Logger logger) throws DorisException { List backends = getBackendRows(sparkSettings, logger); Collections.shuffle(backends); BackendV2.BackendRowV2 backend = backends.get(0); - return backend.getIp()+ ":" + backend.getHttpPort(); + return backend.getIp() + ":" + backend.getHttpPort(); } /** * translate Doris FE response to inner {@link BackendRow} struct. + * * @param response Doris FE response * @param logger {@link Logger} * @return inner {@link List} struct * @throws DorisException,IOException throw when translate failed - * */ + */ @Deprecated @VisibleForTesting static List parseBackend(String response, Logger logger) throws DorisException, IOException { @@ -555,23 +555,26 @@ static List parseBackend(String response, Logger logger) throws Dori logger.error(SHOULD_NOT_HAPPEN_MESSAGE); throw new ShouldNeverHappenException(); } - List backendRows = backend.getRows().stream().filter(v -> v.getAlive()).collect(Collectors.toList()); + List backendRows = backend.getRows().stream().filter(v -> v.getAlive()) + .collect(Collectors.toList()); logger.debug("Parsing schema result is '{}'.", backendRows); return backendRows; } /** * get Doris BE node list. + * * @param logger slf4j logger * @return the Doris BE node list * @throws IllegalArgumentException BE nodes is illegal */ @VisibleForTesting - public static List getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException { + public static List getBackendRows(SparkSettings sparkSettings, Logger logger) + throws DorisException { List feNodeList = allEndpoints(sparkSettings.getProperty(DORIS_FENODES), logger); - for (String feNode : feNodeList){ + for (String feNode : feNodeList) { try { - String beUrl = String.format("http://%s" + BACKENDS_V2, feNode); + String beUrl = String.format("http://%s" + BACKENDS_V2, feNode); HttpGet httpGet = new HttpGet(beUrl); String response = send(sparkSettings, httpGet, logger); logger.info("Backend Info:{}", response); @@ -583,7 +586,8 @@ public static List getBackendRows(SparkSettings sparkSet } return backends; } catch (ConnectedFailedException e) { - logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, e.getMessage()); + logger.info("Doris FE node {} is unavailable: {}, Request the next Doris FE node", feNode, + e.getMessage()); } } String errMsg = "No Doris FE is available, please check configuration"; @@ -593,6 +597,7 @@ public static List getBackendRows(SparkSettings sparkSet /** * choice a Doris BE node to request. + * * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal @@ -635,6 +640,7 @@ static List parseBackendV2(String response, Logger logge /** * translate BE tablets map to Doris RDD partition. + * * @param cfg configuration of request * @param be2Tablets BE to tablets {@link Map} * @param opaquedQueryPlan Doris BE execute plan getting from Doris FE @@ -646,8 +652,7 @@ static List parseBackendV2(String response, Logger logge */ @VisibleForTesting static List tabletsMapToPartition(Settings cfg, Map> be2Tablets, - String opaquedQueryPlan, String database, String table, Logger logger) - throws IllegalArgumentException { + String opaquedQueryPlan, String database, String table, Logger logger) throws IllegalArgumentException { int tabletsSize = tabletCountLimitForOnePartition(cfg, logger); List partitions = new ArrayList<>(); for (Map.Entry> beInfo : be2Tablets.entrySet()) { @@ -657,12 +662,11 @@ static List tabletsMapToPartition(Settings cfg, Map partitionTablets = new HashSet<>(beInfo.getValue().subList( - first, Math.min(beInfo.getValue().size(), first + tabletsSize))); + Set partitionTablets = new HashSet<>( + beInfo.getValue().subList(first, Math.min(beInfo.getValue().size(), first + tabletsSize))); first = first + tabletsSize; - PartitionDefinition partitionDefinition = - new PartitionDefinition(database, table, cfg, - beInfo.getKey(), partitionTablets, opaquedQueryPlan); + PartitionDefinition partitionDefinition = new PartitionDefinition(database, table, cfg, beInfo.getKey(), + partitionTablets, opaquedQueryPlan); logger.debug("Generate one PartitionDefinition '{}'.", partitionDefinition); partitions.add(partitionDefinition); } @@ -674,7 +678,7 @@ static List tabletsMapToPartition(Settings cfg, Map rowCountInOneBatch) { - String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + - rowCountInOneBatch; + String errMsg = "Get row offset: " + rowIndex + " larger than row size: " + rowCountInOneBatch; logger.error(errMsg); throw new NoSuchElementException(errMsg); } @@ -200,8 +192,9 @@ public void convertArrowToRowBatch() throws DorisException { } break; case "LARGEINT": - Preconditions.checkArgument(mt.equals(Types.MinorType.FIXEDSIZEBINARY) || - mt.equals(Types.MinorType.VARCHAR), typeMismatchMessage(currentType, mt)); + Preconditions.checkArgument( + mt.equals(Types.MinorType.FIXEDSIZEBINARY) || mt.equals(Types.MinorType.VARCHAR), + typeMismatchMessage(currentType, mt)); if (mt.equals(Types.MinorType.FIXEDSIZEBINARY)) { FixedSizeBinaryVector largeIntVector = (FixedSizeBinaryVector) curFieldVector; for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java index 44ca28b4..298f52eb 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ErrorMessages.java @@ -23,5 +23,6 @@ public abstract class ErrorMessages { public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} failed."; public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is illegal, value is '{}'."; public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come here."; - public static final String DORIS_INTERNAL_FAIL_MESSAGE = "Doris server '{}' internal failed, status is '{}', error message is '{}'"; + public static final String DORIS_INTERNAL_FAIL_MESSAGE + = "Doris server '{}' internal failed, status is '{}', error message is '{}'"; } diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/IOUtils.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/IOUtils.java index 03a9e005..26557423 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/IOUtils.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/IOUtils.java @@ -17,13 +17,13 @@ package org.apache.doris.spark.util; +import org.apache.doris.spark.exception.IllegalArgumentException; + import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.util.Properties; -import org.apache.doris.spark.exception.IllegalArgumentException; - public class IOUtils { public static String propsToString(Properties props) throws IllegalArgumentException { StringWriter sw = new StringWriter(); diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ResponseUtil.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ResponseUtil.java index 1b6a66b1..241b82fd 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ResponseUtil.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/util/ResponseUtil.java @@ -20,14 +20,13 @@ import java.util.regex.Pattern; public class ResponseUtil { - public static final Pattern LABEL_EXIST_PATTERN = - Pattern.compile("errCode = 2, detailMessage = Label \\[(.*)\\] " + - "has already been used, relate to txn \\[(\\d+)\\]"); - public static final Pattern COMMITTED_PATTERN = - Pattern.compile("errCode = 2, detailMessage = transaction \\[(\\d+)\\] " + - "is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed."); + public static final Pattern LABEL_EXIST_PATTERN = Pattern.compile( + "errCode = 2, detailMessage = Label \\[(.*)\\] " + "has already been used, relate to txn \\[(\\d+)\\]"); + public static final Pattern COMMITTED_PATTERN = Pattern.compile( + "errCode = 2, detailMessage = transaction \\[(\\d+)\\] " + + "is already \\b(COMMITTED|committed|VISIBLE|visible)\\b, not pre-committed."); public static boolean isCommitted(String msg) { - return COMMITTED_PATTERN.matcher(msg).matches(); + return COMMITTED_PATTERN.matcher(msg).matches(); } } \ No newline at end of file diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/package.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/package.scala index d08bdc0d..c326c6b5 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/package.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/package.scala @@ -17,11 +17,11 @@ package org.apache.doris -import scala.language.implicitConversions - import org.apache.doris.spark.rdd.DorisSpark import org.apache.spark.SparkContext +import scala.language.implicitConversions + package object spark { implicit def sparkContextFunctions(sc: SparkContext) = new SparkContextFunctions(sc) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala index 23a34c59..f4b806ff 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala @@ -17,15 +17,14 @@ package org.apache.doris.spark.rdd -import scala.collection.JavaConversions._ -import scala.reflect.ClassTag - import org.apache.doris.spark.cfg.SparkSettings import org.apache.doris.spark.rest.{PartitionDefinition, RestService} - import org.apache.spark.rdd.RDD import org.apache.spark.{Partition, SparkContext} +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag + private[spark] abstract class AbstractDorisRDD[T: ClassTag]( @transient private var sc: SparkContext, val params: Map[String, String] = Map.empty) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/DorisSpark.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/DorisSpark.scala index 9dc86aa6..7032c33d 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/DorisSpark.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/DorisSpark.scala @@ -18,7 +18,6 @@ package org.apache.doris.spark.rdd import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_FILTER_QUERY, DORIS_TABLE_IDENTIFIER} - import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala index 0ff8bbdf..62f1595a 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaDorisRDD.scala @@ -17,14 +17,13 @@ package org.apache.doris.spark.rdd -import scala.reflect.ClassTag - import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_VALUE_READER_CLASS import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.rest.PartitionDefinition - import org.apache.spark.{Partition, SparkContext, TaskContext} +import scala.reflect.ClassTag + private[spark] class ScalaDorisRDD[T: ClassTag]( sc: SparkContext, params: Map[String, String] = Map.empty) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 719b16be..c8cb5409 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -18,13 +18,6 @@ package org.apache.doris.spark.rdd import org.apache.doris.sdk.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} - -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent._ -import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} -import scala.collection.JavaConversions._ -import scala.util.Try - import org.apache.doris.spark.backend.BackendClient import org.apache.doris.spark.cfg.ConfigurationOptions._ import org.apache.doris.spark.cfg.Settings @@ -37,6 +30,11 @@ import org.apache.doris.spark.util.ErrorMessages import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.apache.spark.internal.Logging +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} +import scala.collection.JavaConversions._ +import scala.util.Try import scala.util.control.Breaks /** diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala index 4c9d3485..d736371e 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/DorisRelation.scala @@ -17,19 +17,18 @@ package org.apache.doris.spark.sql -import scala.collection.JavaConverters._ -import scala.collection.mutable -import scala.math.min - import org.apache.doris.spark.cfg.ConfigurationOptions._ import org.apache.doris.spark.cfg.{ConfigurationOptions, SparkSettings} - import org.apache.spark.rdd.RDD import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.math.min + private[sql] class DorisRelation( val sqlContext: SQLContext, parameters: Map[String, String]) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRow.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRow.scala index 06f5ca30..566a23b0 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRow.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRow.scala @@ -17,10 +17,10 @@ package org.apache.doris.spark.sql -import scala.collection.mutable.ArrayBuffer - import org.apache.spark.sql.Row +import scala.collection.mutable.ArrayBuffer + private[spark] class ScalaDorisRow(rowOrder: Seq[String]) extends Row { lazy val values: ArrayBuffer[Any] = ArrayBuffer.fill(rowOrder.size)(null) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowRDD.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowRDD.scala index b31a54dc..99ed0d17 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowRDD.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowRDD.scala @@ -21,10 +21,9 @@ import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_VALUE_READER_CLASS import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.rdd.{AbstractDorisRDD, AbstractDorisRDDIterator, DorisPartition} import org.apache.doris.spark.rest.PartitionDefinition - -import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.sql.Row import org.apache.spark.sql.types.StructType +import org.apache.spark.{Partition, SparkContext, TaskContext} private[spark] class ScalaDorisRowRDD( sc: SparkContext, diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala index 5b018540..528213fb 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/ScalaDorisRowValueReader.scala @@ -17,7 +17,6 @@ package org.apache.doris.spark.sql -import scala.collection.JavaConverters._ import org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_READ_FIELD import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.exception.ShouldNeverHappenException @@ -26,6 +25,8 @@ import org.apache.doris.spark.rest.PartitionDefinition import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.slf4j.{Logger, LoggerFactory} +import scala.collection.JavaConverters._ + class ScalaDorisRowValueReader( partition: PartitionDefinition, settings: Settings) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala index c8aa0349..2de74c02 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/SchemaUtils.scala @@ -18,16 +18,16 @@ package org.apache.doris.spark.sql import org.apache.doris.sdk.thrift.TScanColumnDesc - -import scala.collection.JavaConversions._ +import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD} import org.apache.doris.spark.cfg.Settings import org.apache.doris.spark.exception.DorisException import org.apache.doris.spark.rest.RestService import org.apache.doris.spark.rest.models.{Field, Schema} -import org.apache.doris.spark.cfg.ConfigurationOptions.{DORIS_IGNORE_TYPE, DORIS_READ_FIELD} import org.apache.spark.sql.types._ import org.slf4j.LoggerFactory +import scala.collection.JavaConversions._ + private[spark] object SchemaUtils { private val logger = LoggerFactory.getLogger(SchemaUtils.getClass.getSimpleName.stripSuffix("$")) diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java index 0bfa3aa5..8e170a8c 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestPartitionDefinition.java @@ -17,12 +17,12 @@ package org.apache.doris.spark.rest; -import java.util.HashSet; -import java.util.Set; - import org.junit.Assert; import org.junit.Test; +import java.util.HashSet; +import java.util.Set; + public class TestPartitionDefinition { private static final String DATABASE_1 = "database1"; private static final String TABLE_1 = "table1"; diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java index c87d94bc..cabfa820 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java @@ -17,22 +17,6 @@ package org.apache.doris.spark.rest; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN; -import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER; -import static org.hamcrest.core.StringStartsWith.startsWith; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.apache.doris.spark.cfg.PropertiesSettings; import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.exception.DorisException; @@ -43,6 +27,13 @@ import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.rest.models.Tablet; + +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_FENODES; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLET_SIZE_MIN; +import static org.apache.doris.spark.cfg.ConfigurationOptions.DORIS_TABLE_IDENTIFIER; +import static org.hamcrest.core.StringStartsWith.startsWith; import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; @@ -51,8 +42,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + public class TestRestService { - private final static Logger logger = LoggerFactory.getLogger(TestRestService.class); + private static final Logger logger = LoggerFactory.getLogger(TestRestService.class); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -119,8 +119,9 @@ public void testGetUriStr() throws Exception { @Test public void testFeResponseToSchema() throws Exception { - String res = "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\",\"aggregation_type\":\"\"},{\"name\":\"k5\"," - + "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\",\"aggregation_type\":\"\"}],\"status\":200}"; + String res = + "{\"properties\":[{\"type\":\"TINYINT\",\"name\":\"k1\",\"comment\":\"\",\"aggregation_type\":\"\"},{\"name\":\"k5\"," + + "\"scale\":\"0\",\"comment\":\"\",\"type\":\"DECIMALV2\",\"precision\":\"9\",\"aggregation_type\":\"\"}],\"status\":200}"; Schema expected = new Schema(); expected.setStatus(200); Field k1 = new Field("k1", "TINYINT", "", 0, 0, ""); @@ -232,9 +233,9 @@ public void testSelectTabletBe() throws Exception { thrown.expectMessage(startsWith("Cannot choice Doris BE for tablet")); RestService.selectBeForTablet(RestService.getQueryPlan(noBeRes, logger), logger); - String notNumberRes = "{\"partitions\":{" - + "\"11021xxx\":{\"routings\":[\"be1\"],\"version\":3,\"versionHash\":1,\"schemaHash\":1}}," - + "\"opaqued_query_plan\":\"query_plan\",\"status\":200}"; + // String notNumberRes = "{\"partitions\":{" + // + "\"11021xxx\":{\"routings\":[\"be1\"],\"version\":3,\"versionHash\":1,\"schemaHash\":1}}," + // + "\"opaqued_query_plan\":\"query_plan\",\"status\":200}"; thrown.expect(DorisException.class); thrown.expectMessage(startsWith("Parse tablet id ")); RestService.selectBeForTablet(RestService.getQueryPlan(noBeRes, logger), logger); @@ -269,29 +270,28 @@ public void testTabletsMapToPartition() throws Exception { Settings settings = new PropertiesSettings(); String opaquedQueryPlan = "query_plan"; - String cluster = "c"; String database = "d"; String table = "t"; Set be1Tablet = new HashSet<>(); be1Tablet.add(1L); be1Tablet.add(2L); - PartitionDefinition pd1 = new PartitionDefinition( - database, table, settings, "be1", be1Tablet, opaquedQueryPlan); + PartitionDefinition pd1 = new PartitionDefinition(database, table, settings, "be1", be1Tablet, + opaquedQueryPlan); Set be2Tablet = new HashSet<>(); be2Tablet.add(3L); be2Tablet.add(4L); - PartitionDefinition pd2 = new PartitionDefinition( - database, table, settings, "be2", be2Tablet, opaquedQueryPlan); + PartitionDefinition pd2 = new PartitionDefinition(database, table, settings, "be2", be2Tablet, + opaquedQueryPlan); List expected = new ArrayList<>(); expected.add(pd1); expected.add(pd2); Collections.sort(expected); - List actual = RestService.tabletsMapToPartition( - settings, beToTablets, opaquedQueryPlan, database, table, logger); + List actual = RestService.tabletsMapToPartition(settings, beToTablets, opaquedQueryPlan, + database, table, logger); Collections.sort(actual); Assert.assertEquals(expected, actual); @@ -300,26 +300,29 @@ public void testTabletsMapToPartition() throws Exception { @Deprecated @Ignore public void testParseBackend() throws Exception { - String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," + - "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," + - "\"HttpPort\",\"BrpcPort\",\"LastStartTime\",\"LastHeartbeat\",\"Alive\",\"SystemDecommissioned\"," + - "\"ClusterDecommissioned\",\"TabletNum\",\"DataUsedCapacity\",\"AvailCapacity\",\"TotalCapacity\"," + - "\"UsedPct\",\"MaxDiskUsedPct\",\"Tag\",\"ErrMsg\",\"Version\",\"Status\"],\"rows\":[{\"HttpPort\":" + - "\"8040\",\"Status\":\"{\\\"lastSuccessReportTabletsTime\\\":\\\"N/A\\\",\\\"lastStreamLoadTime\\\":" + - "-1}\",\"SystemDecommissioned\":\"false\",\"LastHeartbeat\":\"\\\\N\",\"DataUsedCapacity\":\"0.000 " + - "\",\"ErrMsg\":\"\",\"IP\":\"127.0.0.1\",\"UsedPct\":\"0.00 %\",\"__hrefPaths\":[\"/rest/v1/system?" + - "path=//backends/10002\"],\"Cluster\":\"default_cluster\",\"Alive\":\"true\",\"MaxDiskUsedPct\":" + - "\"0.00 %\",\"BrpcPort\":\"-1\",\"BePort\":\"-1\",\"ClusterDecommissioned\":\"false\"," + - "\"AvailCapacity\":\"1.000 B\",\"Version\":\"\",\"BackendId\":\"10002\",\"HeartbeatPort\":\"9050\"," + - "\"LastStartTime\":\"\\\\N\",\"TabletNum\":\"0\",\"TotalCapacity\":\"0.000 \",\"Tag\":" + - "\"{\\\"location\\\" : \\\"default\\\"}\",\"HostName\":\"localhost\"}]}"; + String response = + "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\",\"column_names\":" + + "[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\",\"HttpPort\"," + + "\"BrpcPort\",\"LastStartTime\",\"LastHeartbeat\",\"Alive\",\"SystemDecommissioned\"," + + "\"ClusterDecommissioned\",\"TabletNum\",\"DataUsedCapacity\",\"AvailCapacity\",\"TotalCapacity\"," + + "\"UsedPct\",\"MaxDiskUsedPct\",\"Tag\",\"ErrMsg\",\"Version\",\"Status\"],\"rows\":[{\"HttpPort\":" + + "\"8040\",\"Status\":\"{\\\"lastSuccessReportTabletsTime\\\":\\\"N/A\\\"," + + "\\\"lastStreamLoadTime\\\":-1}\",\"SystemDecommissioned\":\"false\",\"LastHeartbeat\":\"\\\\N\"," + + "\"DataUsedCapacity\":\"0.000 \",\"ErrMsg\":\"\",\"IP\":\"127.0.0.1\",\"UsedPct\":\"0.00 %\"," + + "\"__hrefPaths\":[\"/rest/v1/system?path=//backends/10002\"],\"Cluster\":\"default_cluster\"," + + "\"Alive\":\"true\",\"MaxDiskUsedPct\":\"0.00 %\",\"BrpcPort\":\"-1\",\"BePort\":\"-1\"," + + "\"ClusterDecommissioned\":\"false\",\"AvailCapacity\":\"1.000 B\",\"Version\":\"\"," + + "\"BackendId\":\"10002\",\"HeartbeatPort\":\"9050\",\"LastStartTime\":\"\\\\N\"," + + "\"TabletNum\":\"0\",\"TotalCapacity\":\"0.000 \",\"Tag\":\"{\\\"location\\\" : \\\"default\\\"}\"," + + "\"HostName\":\"localhost\"}]}\n"; List backendRows = RestService.parseBackend(response, logger); Assert.assertTrue(backendRows != null && !backendRows.isEmpty()); } @Test public void testParseBackendV2() throws Exception { - String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}"; + String response + = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}"; List backendRows = RestService.parseBackendV2(response, logger); Assert.assertEquals(2, backendRows.size()); } diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/models/TestSchema.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/models/TestSchema.java index ba674d54..ddb7be4b 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/models/TestSchema.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/models/TestSchema.java @@ -18,8 +18,8 @@ package org.apache.doris.spark.rest.models; import org.junit.Assert; -import org.junit.Test; import org.junit.Rule; +import org.junit.Test; import org.junit.rules.ExpectedException; public class TestSchema { diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRouting.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRouting.java index 4309bbf4..aabfb311 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRouting.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRouting.java @@ -17,15 +17,14 @@ package org.apache.doris.spark.serialization; -import static org.hamcrest.core.StringStartsWith.startsWith; - import org.apache.doris.spark.exception.IllegalArgumentException; + +import static org.hamcrest.core.StringStartsWith.startsWith; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; - public class TestRouting { @Rule public ExpectedException thrown = ExpectedException.none(); diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index ace928f5..1fdbd3c6 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -47,6 +47,7 @@ import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.commons.lang3.ArrayUtils; import org.apache.spark.sql.types.Decimal; +import static org.hamcrest.core.StringStartsWith.startsWith; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -63,10 +64,8 @@ import java.util.List; import java.util.NoSuchElementException; -import static org.hamcrest.core.StringStartsWith.startsWith; - public class TestRowBatch { - private final static Logger logger = LoggerFactory.getLogger(TestRowBatch.class); + private static final Logger logger = LoggerFactory.getLogger(TestRowBatch.class); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -80,8 +79,10 @@ public void testRowBatch() throws Exception { childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Int(16, true)), null)); childrenBuilder.add(new Field("k3", FieldType.nullable(new ArrowType.Int(32, true)), null)); childrenBuilder.add(new Field("k4", FieldType.nullable(new ArrowType.Int(64, true)), null)); - childrenBuilder.add(new Field("k9", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null)); - childrenBuilder.add(new Field("k8", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null)); + childrenBuilder.add( + new Field("k9", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)), null)); + childrenBuilder.add( + new Field("k8", FieldType.nullable(new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)), null)); childrenBuilder.add(new Field("k10", FieldType.nullable(new ArrowType.Utf8()), null)); childrenBuilder.add(new Field("k11", FieldType.nullable(new ArrowType.Utf8()), null)); childrenBuilder.add(new Field("k5", FieldType.nullable(new ArrowType.Utf8()), null)); @@ -91,16 +92,14 @@ public void testRowBatch() throws Exception { new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( - root, - new DictionaryProvider.MapDictionaryProvider(), - outputStream); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, + new DictionaryProvider.MapDictionaryProvider(), outputStream); arrowStreamWriter.start(); root.setRowCount(3); FieldVector vector = root.getVector("k0"); - BitVector bitVector = (BitVector)vector; + BitVector bitVector = (BitVector) vector; bitVector.setInitialCapacity(3); bitVector.allocateNew(3); bitVector.setSafe(0, 1); @@ -109,7 +108,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k1"); - TinyIntVector tinyIntVector = (TinyIntVector)vector; + TinyIntVector tinyIntVector = (TinyIntVector) vector; tinyIntVector.setInitialCapacity(3); tinyIntVector.allocateNew(3); tinyIntVector.setSafe(0, 1); @@ -118,7 +117,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k2"); - SmallIntVector smallIntVector = (SmallIntVector)vector; + SmallIntVector smallIntVector = (SmallIntVector) vector; smallIntVector.setInitialCapacity(3); smallIntVector.allocateNew(3); smallIntVector.setSafe(0, 1); @@ -127,7 +126,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k3"); - IntVector intVector = (IntVector)vector; + IntVector intVector = (IntVector) vector; intVector.setInitialCapacity(3); intVector.allocateNew(3); intVector.setSafe(0, 1); @@ -136,7 +135,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k4"); - BigIntVector bigIntVector = (BigIntVector)vector; + BigIntVector bigIntVector = (BigIntVector) vector; bigIntVector.setInitialCapacity(3); bigIntVector.allocateNew(3); bigIntVector.setSafe(0, 1); @@ -145,7 +144,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k5"); - VarCharVector varCharVector = (VarCharVector)vector; + VarCharVector varCharVector = (VarCharVector) vector; varCharVector.setInitialCapacity(3); varCharVector.allocateNew(); varCharVector.setIndexDefined(0); @@ -160,7 +159,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k6"); - VarCharVector charVector = (VarCharVector)vector; + VarCharVector charVector = (VarCharVector) vector; charVector.setInitialCapacity(3); charVector.allocateNew(); charVector.setIndexDefined(0); @@ -175,7 +174,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k8"); - Float8Vector float8Vector = (Float8Vector)vector; + Float8Vector float8Vector = (Float8Vector) vector; float8Vector.setInitialCapacity(3); float8Vector.allocateNew(3); float8Vector.setSafe(0, 1.1); @@ -184,7 +183,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k9"); - Float4Vector float4Vector = (Float4Vector)vector; + Float4Vector float4Vector = (Float4Vector) vector; float4Vector.setInitialCapacity(3); float4Vector.allocateNew(3); float4Vector.setSafe(0, 1.1f); @@ -193,7 +192,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k10"); - VarCharVector datecharVector = (VarCharVector)vector; + VarCharVector datecharVector = (VarCharVector) vector; datecharVector.setInitialCapacity(3); datecharVector.allocateNew(); datecharVector.setIndexDefined(0); @@ -208,7 +207,7 @@ public void testRowBatch() throws Exception { vector.setValueCount(3); vector = root.getVector("k11"); - VarCharVector timecharVector = (VarCharVector)vector; + VarCharVector timecharVector = (VarCharVector) vector; timecharVector.setInitialCapacity(3); timecharVector.allocateNew(); timecharVector.setIndexDefined(0); @@ -248,47 +247,14 @@ public void testRowBatch() throws Exception { RowBatch rowBatch = new RowBatch(scanBatchResult, schema); - List expectedRow1 = Arrays.asList( - Boolean.TRUE, - (byte) 1, - (short) 1, - 1, - 1L, - (float) 1.1, - (double) 1.1, - Date.valueOf("2008-08-08"), - "2008-08-08 00:00:00", - Decimal.apply(1234L, 4, 2), - "char1" - ); - - List expectedRow2 = Arrays.asList( - Boolean.FALSE, - (byte) 2, - (short) 2, - null, - 2L, - (float) 2.2, - (double) 2.2, - Date.valueOf("1900-08-08"), - "1900-08-08 00:00:00", - Decimal.apply(8888L, 4, 2), - "char2" - ); - - List expectedRow3 = Arrays.asList( - Boolean.TRUE, - (byte) 3, - (short) 3, - 3, - 3L, - (float) 3.3, - (double) 3.3, - Date.valueOf("2100-08-08"), - "2100-08-08 00:00:00", - Decimal.apply(10L, 2, 0), - "char3" - ); + List expectedRow1 = Arrays.asList(Boolean.TRUE, (byte) 1, (short) 1, 1, 1L, (float) 1.1, (double) 1.1, + Date.valueOf("2008-08-08"), "2008-08-08 00:00:00", Decimal.apply(1234L, 4, 2), "char1"); + + List expectedRow2 = Arrays.asList(Boolean.FALSE, (byte) 2, (short) 2, null, 2L, (float) 2.2, + (double) 2.2, Date.valueOf("1900-08-08"), "1900-08-08 00:00:00", Decimal.apply(8888L, 4, 2), "char2"); + + List expectedRow3 = Arrays.asList(Boolean.TRUE, (byte) 3, (short) 3, 3, 3L, (float) 3.3, (double) 3.3, + Date.valueOf("2100-08-08"), "2100-08-08 00:00:00", Decimal.apply(10L, 2, 0), "char3"); Assert.assertTrue(rowBatch.hasNext()); List actualRow1 = rowBatch.next(); @@ -321,10 +287,8 @@ public void testBinary() throws Exception { new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( - root, - new DictionaryProvider.MapDictionaryProvider(), - outputStream); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, + new DictionaryProvider.MapDictionaryProvider(), outputStream); arrowStreamWriter.start(); root.setRowCount(3); @@ -364,15 +328,15 @@ public void testBinary() throws Exception { Assert.assertTrue(rowBatch.hasNext()); List actualRow0 = rowBatch.next(); - Assert.assertArrayEquals(binaryRow0, (byte[])actualRow0.get(0)); + Assert.assertArrayEquals(binaryRow0, (byte[]) actualRow0.get(0)); Assert.assertTrue(rowBatch.hasNext()); List actualRow1 = rowBatch.next(); - Assert.assertArrayEquals(binaryRow1, (byte[])actualRow1.get(0)); + Assert.assertArrayEquals(binaryRow1, (byte[]) actualRow1.get(0)); Assert.assertTrue(rowBatch.hasNext()); List actualRow2 = rowBatch.next(); - Assert.assertArrayEquals(binaryRow2, (byte[])actualRow2.get(0)); + Assert.assertArrayEquals(binaryRow2, (byte[]) actualRow2.get(0)); Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); @@ -389,10 +353,8 @@ public void testDecimalV2() throws Exception { new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( - root, - new DictionaryProvider.MapDictionaryProvider(), - outputStream); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, + new DictionaryProvider.MapDictionaryProvider(), outputStream); arrowStreamWriter.start(); root.setRowCount(3); @@ -419,8 +381,7 @@ public void testDecimalV2() throws Exception { scanBatchResult.setRows(outputStream.toByteArray()); String schemaStr = "{\"properties\":[{\"type\":\"DECIMALV2\",\"scale\": 0," - + "\"precision\": 9, \"name\":\"k7\",\"comment\":\"\"}], " - + "\"status\":200}"; + + "\"precision\": 9, \"name\":\"k7\",\"comment\":\"\"}], " + "\"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); @@ -428,15 +389,15 @@ public void testDecimalV2() throws Exception { Assert.assertTrue(rowBatch.hasNext()); List actualRow0 = rowBatch.next(); - Assert.assertEquals(Decimal.apply(12340000000L, 11, 9), (Decimal)actualRow0.get(0)); + Assert.assertEquals(Decimal.apply(12340000000L, 11, 9), (Decimal) actualRow0.get(0)); Assert.assertTrue(rowBatch.hasNext()); List actualRow1 = rowBatch.next(); - Assert.assertEquals(Decimal.apply(88880000000L, 11, 9), (Decimal)actualRow1.get(0)); + Assert.assertEquals(Decimal.apply(88880000000L, 11, 9), (Decimal) actualRow1.get(0)); Assert.assertTrue(rowBatch.hasNext()); List actualRow2 = rowBatch.next(); - Assert.assertEquals(Decimal.apply(10000000000L, 11, 9), (Decimal)actualRow2.get(0)); + Assert.assertEquals(Decimal.apply(10000000000L, 11, 9), (Decimal) actualRow2.get(0)); Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); @@ -455,16 +416,14 @@ public void testDate() throws DorisException, IOException { new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( - root, - new DictionaryProvider.MapDictionaryProvider(), - outputStream); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, + new DictionaryProvider.MapDictionaryProvider(), outputStream); arrowStreamWriter.start(); root.setRowCount(1); FieldVector vector = root.getVector("k1"); - VarCharVector dateVector = (VarCharVector)vector; + VarCharVector dateVector = (VarCharVector) vector; dateVector.setInitialCapacity(1); dateVector.allocateNew(); dateVector.setIndexDefined(0); @@ -472,9 +431,8 @@ public void testDate() throws DorisException, IOException { dateVector.setSafe(0, "2023-08-09".getBytes()); vector.setValueCount(1); - vector = root.getVector("k2"); - VarCharVector dateV2Vector = (VarCharVector)vector; + VarCharVector dateV2Vector = (VarCharVector) vector; dateV2Vector.setInitialCapacity(1); dateV2Vector.allocateNew(); dateV2Vector.setIndexDefined(0); @@ -494,11 +452,8 @@ public void testDate() throws DorisException, IOException { scanBatchResult.setEos(false); scanBatchResult.setRows(outputStream.toByteArray()); - - String schemaStr = "{\"properties\":[" + - "{\"type\":\"DATE\",\"name\":\"k1\",\"comment\":\"\"}, " + - "{\"type\":\"DATEV2\",\"name\":\"k2\",\"comment\":\"\"}" + - "], \"status\":200}"; + String schemaStr = "{\"properties\":[" + "{\"type\":\"DATE\",\"name\":\"k1\",\"comment\":\"\"}, " + + "{\"type\":\"DATEV2\",\"name\":\"k2\",\"comment\":\"\"}" + "], \"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); @@ -527,16 +482,14 @@ public void testLargeInt() throws DorisException, IOException { new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( - root, - new DictionaryProvider.MapDictionaryProvider(), - outputStream); + ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(root, + new DictionaryProvider.MapDictionaryProvider(), outputStream); arrowStreamWriter.start(); root.setRowCount(1); FieldVector vector = root.getVector("k1"); - VarCharVector lageIntVector = (VarCharVector)vector; + VarCharVector lageIntVector = (VarCharVector) vector; lageIntVector.setInitialCapacity(1); lageIntVector.allocateNew(); lageIntVector.setIndexDefined(0); @@ -544,9 +497,8 @@ public void testLargeInt() throws DorisException, IOException { lageIntVector.setSafe(0, "9223372036854775808".getBytes()); vector.setValueCount(1); - vector = root.getVector("k2"); - FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector)vector; + FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector) vector; lageIntVector1.setInitialCapacity(1); lageIntVector1.allocateNew(); lageIntVector1.setIndexDefined(0); @@ -569,10 +521,8 @@ public void testLargeInt() throws DorisException, IOException { scanBatchResult.setEos(false); scanBatchResult.setRows(outputStream.toByteArray()); - String schemaStr = "{\"properties\":[" + - "{\"type\":\"LARGEINT\",\"name\":\"k1\",\"comment\":\"\"}, " + - "{\"type\":\"LARGEINT\",\"name\":\"k2\",\"comment\":\"\"}" + - "], \"status\":200}"; + String schemaStr = "{\"properties\":[" + "{\"type\":\"LARGEINT\",\"name\":\"k1\",\"comment\":\"\"}, " + + "{\"type\":\"LARGEINT\",\"name\":\"k2\",\"comment\":\"\"}" + "], \"status\":200}"; Schema schema = RestService.parseSchema(schemaStr, logger); diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java index 020a241c..8422a0ca 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/DataUtilTest.java @@ -26,7 +26,8 @@ public class DataUtilTest extends TestCase { public void testHandleColumnValue() { - Assert.assertEquals("2023-08-14 18:00:00.0", DataUtil.handleColumnValue(Timestamp.valueOf("2023-08-14 18:00:00"))); - Assert.assertEquals("[1, 2, 3]", DataUtil.handleColumnValue(WrappedArray.make(new Integer[]{1,2,3}))); + Assert.assertEquals("2023-08-14 18:00:00.0", + DataUtil.handleColumnValue(Timestamp.valueOf("2023-08-14 18:00:00"))); + Assert.assertEquals("[1, 2, 3]", DataUtil.handleColumnValue(WrappedArray.make(new Integer[] {1, 2, 3}))); } } \ No newline at end of file diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java index 4e364186..ed33958b 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/util/TestListUtils.java @@ -19,6 +19,7 @@ import org.junit.Assert; import org.junit.Test; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala index 54771df5..b4da9550 100644 --- a/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala +++ b/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -19,8 +19,7 @@ package org.apache.doris.spark.sql import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} -import org.junit.Ignore -import org.junit.Test +import org.junit.{Ignore, Test} // This test need real connect info to run. // Set the connect info before comment out this @Ignore