From 66ce0157ec56be4e35746701ba2e65763822bac6 Mon Sep 17 00:00:00 2001
From: wudi <676366545@qq.com>
Date: Fri, 11 Oct 2024 14:16:09 +0800
Subject: [PATCH] [Chore] improve dep and add builder comment (#496)
---
flink-doris-connector/pom.xml | 10 +-
.../flink/cfg/DorisExecutionOptions.java | 115 ++++++++++++++++++
.../apache/doris/flink/cfg/DorisOptions.java | 54 +++++++-
.../doris/flink/cfg/DorisReadOptions.java | 101 ++++++++++++++-
.../apache/doris/flink/sink/DorisSink.java | 30 +++++
.../doris/flink/source/DorisSource.java | 24 ++++
6 files changed, 315 insertions(+), 19 deletions(-)
diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml
index d773339b3..775242b05 100644
--- a/flink-doris-connector/pom.xml
+++ b/flink-doris-connector/pom.xml
@@ -105,14 +105,6 @@ under the License.
thrift-service
${thrift-service.version}
-
-
- org.apache.flink
- flink-clients
- ${flink.version}
- provided
-
-
org.apache.flink
flink-table-planner-loader
@@ -369,7 +361,7 @@ under the License.
org.apache.flink
flink-runtime-web
${flink.version}
- provided
+ test
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 7ad8ba971..831a317ee 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -292,51 +292,112 @@ public static class Builder {
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;
+ /**
+ * Sets the checkInterval to check exception with the interval while loading, The default is
+ * 0, disabling the checker thread.
+ *
+ * @param checkInterval
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
return this;
}
+ /**
+ * Sets the maxRetries to load data. In batch mode, this parameter is the number of stream
+ * load retries, In non-batch mode, this parameter is the number of retries in the commit
+ * phase.
+ *
+ * @param maxRetries
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setMaxRetries(Integer maxRetries) {
this.maxRetries = maxRetries;
return this;
}
+ /**
+ * Sets the buffer size to cache data for stream load. Only valid in non-batch mode.
+ *
+ * @param bufferSize
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
+ /**
+ * Sets the buffer count to cache data for stream load. Only valid in non-batch mode.
+ *
+ * @param bufferCount
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferCount(int bufferCount) {
this.bufferCount = bufferCount;
return this;
}
+ /**
+ * Sets the unique label prefix for stream load.
+ *
+ * @param labelPrefix
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setLabelPrefix(String labelPrefix) {
this.labelPrefix = labelPrefix;
return this;
}
+ /**
+ * Sets whether to use cache for stream load. Only valid in non-batch mode.
+ *
+ * @param useCache
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setUseCache(boolean useCache) {
this.useCache = useCache;
return this;
}
+ /**
+ * Sets the properties for stream load.
+ *
+ * @param streamLoadProp
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setStreamLoadProp(Properties streamLoadProp) {
this.streamLoadProp = streamLoadProp;
return this;
}
+ /**
+ * Sets whether to perform the deletion operation for stream load.
+ *
+ * @param enableDelete
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setDeletable(Boolean enableDelete) {
this.enableDelete = enableDelete;
return this;
}
+ /**
+ * Sets whether to disable 2pc(two-phase commit) for stream load.
+ *
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder disable2PC() {
this.enable2PC = false;
return this;
}
+ /**
+ * Sets whether to force 2pc on. The default uniq model will turn off 2pc.
+ *
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder enable2PC() {
this.enable2PC = true;
// Force open 2pc
@@ -344,6 +405,12 @@ public Builder enable2PC() {
return this;
}
+ /**
+ * Set whether to use batch mode to stream load.
+ *
+ * @param enableBatchMode
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBatchMode(Boolean enableBatchMode) {
this.enableBatchMode = enableBatchMode;
if (enableBatchMode.equals(Boolean.TRUE)) {
@@ -352,41 +419,89 @@ public Builder setBatchMode(Boolean enableBatchMode) {
return this;
}
+ /**
+ * Set queue size in batch mode.
+ *
+ * @param flushQueueSize
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setFlushQueueSize(int flushQueueSize) {
this.flushQueueSize = flushQueueSize;
return this;
}
+ /**
+ * Set the flush interval mills for stream load in batch mode.
+ *
+ * @param bufferFlushIntervalMs
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}
+ /**
+ * Set the max flush rows for stream load in batch mode.
+ *
+ * @param bufferFlushMaxRows
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushMaxRows(int bufferFlushMaxRows) {
this.bufferFlushMaxRows = bufferFlushMaxRows;
return this;
}
+ /**
+ * Set the max flush bytes for stream load in batch mode.
+ *
+ * @param bufferFlushMaxBytes
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setBufferFlushMaxBytes(int bufferFlushMaxBytes) {
this.bufferFlushMaxBytes = bufferFlushMaxBytes;
return this;
}
+ /**
+ * Set Whether to ignore the ignore updateBefore event.
+ *
+ * @param ignoreUpdateBefore
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setIgnoreUpdateBefore(boolean ignoreUpdateBefore) {
this.ignoreUpdateBefore = ignoreUpdateBefore;
return this;
}
+ /**
+ * Set the writing mode, only supports STREAM_LOAD and STREAM_LOAD_BATCH
+ *
+ * @param writeMode
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setWriteMode(WriteMode writeMode) {
this.writeMode = writeMode;
return this;
}
+ /**
+ * Set whether to ignore commit failure errors. This is only valid in non-batch mode 2pc.
+ * When ignored, data loss may occur.
+ *
+ * @param ignoreCommitError
+ * @return this DorisExecutionOptions.builder.
+ */
public Builder setIgnoreCommitError(boolean ignoreCommitError) {
this.ignoreCommitError = ignoreCommitError;
return this;
}
+ /**
+ * Build the {@link DorisExecutionOptions}.
+ *
+ * @return a DorisExecutionOptions with the settings made for this builder.
+ */
public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record may not be written.
if (streamLoadProp != null
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
index bf6c7a28c..69273c9e0 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisOptions.java
@@ -101,47 +101,89 @@ public static class Builder {
private boolean autoRedirect = true;
private String tableIdentifier;
- /** required, tableIdentifier. */
+ /**
+ * Sets the tableIdentifier for the DorisOptions.
+ *
+ * @param tableIdentifier Doris's database name and table name, such as db.tbl
+ * @return this DorisOptions.builder.
+ */
public Builder setTableIdentifier(String tableIdentifier) {
this.tableIdentifier = tableIdentifier;
return this;
}
- /** optional, user name. */
+ /**
+ * Sets the username of doris cluster.
+ *
+ * @param username Doris cluster username
+ * @return this DorisOptions.builder.
+ */
public Builder setUsername(String username) {
this.username = username;
return this;
}
- /** optional, password. */
+ /**
+ * Sets the password of doris cluster.
+ *
+ * @param password Doris cluster password
+ * @return this DorisOptions.builder.
+ */
public Builder setPassword(String password) {
this.password = password;
return this;
}
- /** required, Frontend Http Rest url. */
+ /**
+ * Sets the doris frontend http rest url, such as 127.0.0.1:8030,127.0.0.2:8030
+ *
+ * @param fenodes
+ * @return this DorisOptions.builder.
+ */
public Builder setFenodes(String fenodes) {
this.fenodes = fenodes;
return this;
}
- /** optional, Backend Http Port. */
+ /**
+ * Sets the doris backend http rest url, such as 127.0.0.1:8040,127.0.0.2:8040
+ *
+ * @param benodes
+ * @return this DorisOptions.builder.
+ */
public Builder setBenodes(String benodes) {
this.benodes = benodes;
return this;
}
- /** not required, fe jdbc url, for lookup query. */
+ /**
+ * Sets the doris fe jdbc url for lookup query, such as jdbc:mysql://127.0.0.1:9030
+ *
+ * @param jdbcUrl
+ * @return this DorisOptions.builder.
+ */
public Builder setJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
return this;
}
+ /**
+ * Sets the autoRedirect for DorisOptions. If true, stream load will be written directly to
+ * fe. If false, it will first get the be list and write directly to be.
+ *
+ * @param autoRedirect
+ * @return this DorisOptions.builder.
+ */
public Builder setAutoRedirect(boolean autoRedirect) {
this.autoRedirect = autoRedirect;
return this;
}
+ /**
+ * Build the {@link DorisOptions}.
+ *
+ * @return a DorisOptions with the settings made for this builder.
+ */
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
// multi table load, don't need check
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 937d32866..0448d60a9 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -223,76 +223,169 @@ public static class Builder {
private Boolean useFlightSql = false;
private Integer flightSqlPort;
+ /**
+ * Sets the readFields for doris table to push down projection, such as name,age.
+ *
+ * @param readFields
+ * @return this DorisReadOptions.builder.
+ */
public Builder setReadFields(String readFields) {
this.readFields = readFields;
return this;
}
+ /**
+ * Sets the filterQuery for doris table to push down filter, such as name,age.
+ *
+ * @param filterQuery
+ * @return this DorisReadOptions.builder.
+ */
public Builder setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
return this;
}
+ /**
+ * Sets the requestTabletSize for DorisReadOptions. The number of Doris Tablets
+ * corresponding to a Partition, the smaller this value is set, the more Partitions will be
+ * generated. This improves the parallelism on the Flink side, but at the same time puts
+ * more pressure on Doris.
+ *
+ * @param requestTabletSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestTabletSize(Integer requestTabletSize) {
this.requestTabletSize = requestTabletSize;
return this;
}
+ /**
+ * Sets the request connect timeout for DorisReadOptions.
+ *
+ * @param requestConnectTimeoutMs
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestConnectTimeoutMs(Integer requestConnectTimeoutMs) {
this.requestConnectTimeoutMs = requestConnectTimeoutMs;
return this;
}
+ /**
+ * Sets the request read timeout for DorisReadOptions.
+ *
+ * @param requestReadTimeoutMs
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestReadTimeoutMs(Integer requestReadTimeoutMs) {
this.requestReadTimeoutMs = requestReadTimeoutMs;
return this;
}
+ /**
+ * Sets the timeout time for querying Doris for DorisReadOptions.
+ *
+ * @param requesQueryTimeoutS
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestQueryTimeoutS(Integer requesQueryTimeoutS) {
this.requestQueryTimeoutS = requesQueryTimeoutS;
return this;
}
+ /**
+ * Sets the number of retries to send requests to Doris for DorisReadOptions.
+ *
+ * @param requestRetries
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestRetries(Integer requestRetries) {
this.requestRetries = requestRetries;
return this;
}
+ /**
+ * Sets the read batch size for DorisReadOptions.
+ *
+ * @param requestBatchSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setRequestBatchSize(Integer requestBatchSize) {
this.requestBatchSize = requestBatchSize;
return this;
}
+ /**
+ * Sets the Memory limit for a single query for DorisReadOptions.
+ *
+ * @param execMemLimit
+ * @return this DorisReadOptions.builder.
+ */
public Builder setExecMemLimit(Long execMemLimit) {
this.execMemLimit = execMemLimit;
return this;
}
+ /**
+ * Sets the Asynchronous conversion of internal processing queue in Arrow format
+ *
+ * @param deserializeQueueSize
+ * @return this DorisReadOptions.builder.
+ */
public Builder setDeserializeQueueSize(Integer deserializeQueueSize) {
this.deserializeQueueSize = deserializeQueueSize;
return this;
}
+ /**
+ * Sets Whether to support asynchronous conversion of Arrow format to RowBatch needed for
+ * connector iterations.
+ *
+ * @param deserializeArrowAsync
+ * @return this DorisReadOptions.builder.
+ */
public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
this.deserializeArrowAsync = deserializeArrowAsync;
return this;
}
- public Builder setUseFlightSql(Boolean useFlightSql) {
- this.useFlightSql = useFlightSql;
+ /**
+ * Whether to use the legacy source api
+ *
+ * @param useOldApi
+ * @return this DorisReadOptions.builder.
+ */
+ public Builder setUseOldApi(Boolean useOldApi) {
+ this.useOldApi = useOldApi;
return this;
}
- public Builder setUseOldApi(Boolean useOldApi) {
- this.useOldApi = useOldApi;
+ /**
+ * Whether to use arrow flight sql for query, only supports Doris2.1 and above
+ *
+ * @param useFlightSql
+ * @return this DorisReadOptions.builder.
+ */
+ public Builder setUseFlightSql(Boolean useFlightSql) {
+ this.useFlightSql = useFlightSql;
return this;
}
+ /**
+ * Sets the flight sql port for DorisReadOptions.
+ *
+ * @param flightSqlPort
+ * @return this DorisReadOptions.builder.
+ */
public Builder setFlightSqlPort(Integer flightSqlPort) {
this.flightSqlPort = flightSqlPort;
return this;
}
+ /**
+ * Build the {@link DorisReadOptions}.
+ *
+ * @return a DorisReadOptions with the settings made for this builder.
+ */
public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index fd61d7fd9..d8e0d8277 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -161,26 +161,56 @@ public static class Builder {
private DorisExecutionOptions dorisExecutionOptions;
private DorisRecordSerializer serializer;
+ /**
+ * Sets the DorisOptions for the DorisSink.
+ *
+ * @param dorisOptions the common options of the doris cluster.
+ * @return this DorisSink.Builder.
+ */
public Builder setDorisOptions(DorisOptions dorisOptions) {
this.dorisOptions = dorisOptions;
return this;
}
+ /**
+ * Sets the DorisReadOptions for the DorisSink.
+ *
+ * @param dorisReadOptions the read options of the DorisSink.
+ * @return this DorisSink.Builder.
+ */
public Builder setDorisReadOptions(DorisReadOptions dorisReadOptions) {
this.dorisReadOptions = dorisReadOptions;
return this;
}
+ /**
+ * Sets the DorisExecutionOptions for the DorisSink.
+ *
+ * @param dorisExecutionOptions the execution options of the DorisSink.
+ * @return this DorisSink.Builder.
+ */
public Builder setDorisExecutionOptions(DorisExecutionOptions dorisExecutionOptions) {
this.dorisExecutionOptions = dorisExecutionOptions;
return this;
}
+ /**
+ * Sets the {@link DorisRecordSerializer serializer} that transforms incoming records to
+ * DorisRecord
+ *
+ * @param serializer
+ * @return this DorisSink.Builder.
+ */
public Builder setSerializer(DorisRecordSerializer serializer) {
this.serializer = serializer;
return this;
}
+ /**
+ * Build the {@link DorisSink}.
+ *
+ * @return a DorisSink with the settings made for this builder.
+ */
public DorisSink build() {
Preconditions.checkNotNull(dorisOptions);
Preconditions.checkNotNull(dorisExecutionOptions);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
index 1b05453ad..19a7fe36d 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java
@@ -168,21 +168,40 @@ public static class DorisSourceBuilder {
boundedness = Boundedness.BOUNDED;
}
+ /**
+ * Sets the DorisOptions for the DorisSource.
+ *
+ * @param options the common options of the doris cluster.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder setDorisOptions(DorisOptions options) {
this.options = options;
return this;
}
+ /**
+ * Sets the DorisReadOptions for the DorisSource.
+ *
+ * @param readOptions the read options of the DorisSource.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder setDorisReadOptions(DorisReadOptions readOptions) {
this.readOptions = readOptions;
return this;
}
+ /** Sets the Boundedness for the DorisSource, Currently only BOUNDED is supported. */
public DorisSourceBuilder setBoundedness(Boundedness boundedness) {
this.boundedness = boundedness;
return this;
}
+ /**
+ * Sets the {@link DorisDeserializationSchema deserializer} of the Record for DorisSource.
+ *
+ * @param deserializer the deserializer for Doris Record.
+ * @return this DorisSourceBuilder.
+ */
public DorisSourceBuilder setDeserializer(
DorisDeserializationSchema deserializer) {
this.deserializer = deserializer;
@@ -194,6 +213,11 @@ public DorisSourceBuilder setResolvedFilterQuery(List resolvedFilte
return this;
}
+ /**
+ * Build the {@link DorisSource}.
+ *
+ * @return a DorisSource with the settings made for this builder.
+ */
public DorisSource build() {
if (readOptions == null) {
readOptions = DorisReadOptions.builder().build();