From 56f13e920d9e4fa03855a00c438a0b51e18ae575 Mon Sep 17 00:00:00 2001 From: Bibo <33744252+531651225@users.noreply.github.com> Date: Tue, 27 Feb 2024 11:37:59 +0800 Subject: [PATCH 1/2] Support multi-table sink feature for influxdb (#6278) --- .../seatunnel/influxdb/config/SinkConfig.java | 24 +++--- .../seatunnel/influxdb/sink/InfluxDBSink.java | 45 +++------- .../influxdb/sink/InfluxDBSinkFactory.java | 26 +++++- .../influxdb/sink/InfluxDBSinkWriter.java | 12 +-- .../e2e/connector/influxdb/InfluxdbIT.java | 62 ++++++++++++++ .../fake_to_infuxdb_with_multipletable.conf | 85 +++++++++++++++++++ 6 files changed, 200 insertions(+), 54 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java index 806309bffeba..071e3c235fcb 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/config/SinkConfig.java @@ -34,6 +34,7 @@ public class SinkConfig extends InfluxDBConfig { public SinkConfig(Config config) { super(config); + loadConfig(config); } public static final Option KEY_TIME = @@ -103,35 +104,32 @@ public SinkConfig(Config config) { private int maxRetryBackoffMs; private TimePrecision precision = DEFAULT_TIME_PRECISION; - public static SinkConfig loadConfig(Config config) { - SinkConfig sinkConfig = new SinkConfig(config); + public void loadConfig(Config config) { if (config.hasPath(KEY_TIME.key())) { - sinkConfig.setKeyTime(config.getString(KEY_TIME.key())); + setKeyTime(config.getString(KEY_TIME.key())); } if (config.hasPath(KEY_TAGS.key())) { - sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key())); + setKeyTags(config.getStringList(KEY_TAGS.key())); } if (config.hasPath(MAX_RETRIES.key())) { - sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key())); + setMaxRetries(config.getInt(MAX_RETRIES.key())); } if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) { - sinkConfig.setRetryBackoffMultiplierMs( - config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key())); + setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key())); } if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) { - sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key())); + setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key())); } if (config.hasPath(WRITE_TIMEOUT.key())) { - sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key())); + setWriteTimeout(config.getInt(WRITE_TIMEOUT.key())); } if (config.hasPath(RETENTION_POLICY.key())) { - sinkConfig.setRp(config.getString(RETENTION_POLICY.key())); + setRp(config.getString(RETENTION_POLICY.key())); } if (config.hasPath(EPOCH.key())) { - sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key()))); + setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key()))); } - sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key())); - return sinkConfig; + setMeasurement(config.getString(KEY_MEASUREMENT.key())); } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java index 9cc03272d1ef..da7ba20f91d6 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSink.java @@ -17,61 +17,36 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.PrepareFailException; -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; -import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException; - -import com.google.auto.service.AutoService; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import java.io.IOException; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL; -import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT; +public class InfluxDBSink extends AbstractSimpleSink + implements SupportMultiTableSink { -@AutoService(SeaTunnelSink.class) -public class InfluxDBSink extends AbstractSimpleSink { - - private Config pluginConfig; private SeaTunnelRowType seaTunnelRowType; + private SinkConfig sinkConfig; @Override public String getPluginName() { return "InfluxDB"; } - @Override - public void prepare(Config config) throws PrepareFailException { - CheckResult result = - CheckConfigUtil.checkAllExists(config, URL.key(), KEY_MEASUREMENT.key()); - if (!result.isSuccess()) { - throw new InfluxdbConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SINK, result.getMsg())); - } - this.pluginConfig = config; - } - - @Override - public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) { + this.sinkConfig = sinkConfig; + this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType(); } @Override public AbstractSinkWriter createWriter(SinkWriter.Context context) throws IOException { - return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType); + return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType); } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java index 3d44158e78b1..81a294e95bc6 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkFactory.java @@ -17,11 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.CONNECT_TIMEOUT_MS; import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.DATABASES; @@ -36,6 +45,7 @@ import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS; @AutoService(Factory.class) +@Slf4j public class InfluxDBSinkFactory implements TableSinkFactory { @Override @@ -46,10 +56,11 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required(URL, DATABASES, KEY_MEASUREMENT) + .required(URL, DATABASES) .bundled(USERNAME, PASSWORD) .optional( CONNECT_TIMEOUT_MS, + KEY_MEASUREMENT, KEY_TAGS, KEY_TIME, BATCH_SIZE, @@ -57,4 +68,17 @@ public OptionRule optionRule() { RETRY_BACKOFF_MULTIPLIER_MS) .build(); } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + ReadonlyConfig config = context.getOptions(); + CatalogTable catalogTable = context.getCatalogTable(); + if (!config.getOptional(KEY_MEASUREMENT).isPresent()) { + Map map = config.toMap(); + map.put(KEY_MEASUREMENT.key(), catalogTable.getTableId().toTablePath().getFullName()); + config = ReadonlyConfig.fromMap(new HashMap<>(map)); + } + SinkConfig sinkConfig = new SinkConfig(config.toConfig()); + return () -> new InfluxDBSink(sinkConfig, catalogTable); + } } diff --git a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java index f2d401db5151..b0d23c7e7991 100644 --- a/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java +++ b/seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/sink/InfluxDBSinkWriter.java @@ -17,11 +17,11 @@ package org.apache.seatunnel.connectors.seatunnel.influxdb.sink; -import org.apache.seatunnel.shade.com.typesafe.config.Config; - +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.common.utils.JsonUtils; import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter; import org.apache.seatunnel.connectors.seatunnel.influxdb.client.InfluxDBClient; import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig; @@ -44,7 +44,8 @@ import java.util.Optional; @Slf4j -public class InfluxDBSinkWriter extends AbstractSinkWriter { +public class InfluxDBSinkWriter extends AbstractSinkWriter + implements SupportMultiTableSinkWriter { private final Serializer serializer; private InfluxDB influxdb; @@ -52,9 +53,10 @@ public class InfluxDBSinkWriter extends AbstractSinkWriter { private final List batchList; private volatile Exception flushException; - public InfluxDBSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) + public InfluxDBSinkWriter(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) throws ConnectException { - this.sinkConfig = SinkConfig.loadConfig(pluginConfig); + this.sinkConfig = sinkConfig; + log.info("sinkConfig is {}", JsonUtils.toJsonString(sinkConfig)); this.serializer = new DefaultSerializer( seaTunnelRowType, diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java index ddc7afadacbd..c139afc5e87f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/influxdb/InfluxdbIT.java @@ -25,7 +25,9 @@ import org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; @@ -51,10 +53,12 @@ import java.net.ConnectException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import java.util.stream.Stream; @Slf4j @@ -242,6 +246,64 @@ public void testInfluxdbWithTz(TestContainer container) } } + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently SPARK/FLINK do not support multiple table read") + public void testInfluxdbMultipleWrite(TestContainer container) + throws IOException, InterruptedException { + Container.ExecResult execResult = + container.executeJob("/fake_to_infuxdb_with_multipletable.conf"); + + Assertions.assertEquals(0, execResult.getExitCode()); + Assertions.assertAll( + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + 1627529632356l, + "label_1", + "sink_1", + 4.3, + 200, + 2.5, + 2, + 5, + true)) + .collect(Collectors.toList()), + readData("infulxdb_sink_1")); + }, + () -> { + Assertions.assertIterableEquals( + Stream.>of( + Arrays.asList( + 1627529632357l, + "label_2", + "sink_2", + 4.3, + 200, + 2.5, + 2, + 5, + true)) + .collect(Collectors.toList()), + readData("infulxdb_sink_2")); + }); + } + + public List> readData(String tableName) { + String sinkSql = + String.format( + "select time, label, c_string, c_double, c_bigint, c_float,c_int, c_smallint, c_boolean from %s order by time", + tableName); + QueryResult sinkQueryResult = influxDB.query(new Query(sinkSql, INFLUXDB_DATABASE)); + + List> sinkValues = + sinkQueryResult.getResults().get(0).getSeries().get(0).getValues(); + return sinkValues; + } + private void initializeInfluxDBClient() throws ConnectException { InfluxDBConfig influxDBConfig = new InfluxDBConfig(influxDBConnectUrl); influxDB = InfluxDBClient.getInfluxDB(influxDBConfig); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf new file mode 100644 index 000000000000..eda13ff7040a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-influxdb-e2e/src/test/resources/fake_to_infuxdb_with_multipletable.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set engine configuration here + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + tables_configs = [ + { + schema = { + table = "infulxdb_sink_1" + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356] + } + ] + }, + { + schema = { + table = "infulxdb_sink_2" + fields { + label = STRING + c_string = STRING + c_double = DOUBLE + c_bigint = BIGINT + c_float = FLOAT + c_int = INT + c_smallint = SMALLINT + c_boolean = BOOLEAN + time = BIGINT + } + } + rows = [ + { + kind = INSERT + fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357] + } + ] + } + ] + } +} + +sink { + InfluxDB { + url = "http://influxdb-host:8086" + database = "test" + key_time = "time" + batch_size = 1 + } +} \ No newline at end of file From 4d0ca15fb7906a9baa6363313a51acacd5f9a6b2 Mon Sep 17 00:00:00 2001 From: KingsleyY <42668765+CallMeKingsley97@users.noreply.github.com> Date: Wed, 28 Feb 2024 13:24:41 +0800 Subject: [PATCH 2/2] [Doc][Improve] support chinese for Feishu.md and HdfsFile.md (#6379) --- docs/zh/connector-v2/sink/Feishu.md | 66 ++++++++ docs/zh/connector-v2/sink/HdfsFile.md | 200 ++++++++++++++++++++++++ docs/zh/connector-v2/source/HdfsFile.md | 127 +++++++++++++++ 3 files changed, 393 insertions(+) create mode 100644 docs/zh/connector-v2/sink/Feishu.md create mode 100644 docs/zh/connector-v2/sink/HdfsFile.md create mode 100644 docs/zh/connector-v2/source/HdfsFile.md diff --git a/docs/zh/connector-v2/sink/Feishu.md b/docs/zh/connector-v2/sink/Feishu.md new file mode 100644 index 000000000000..01cdc7a48269 --- /dev/null +++ b/docs/zh/connector-v2/sink/Feishu.md @@ -0,0 +1,66 @@ +# 飞书 + +> 飞书 数据接收器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [ ] [精确一次](../../../en/concept/connector-v2-features.md) +- [ ] [变更数据捕获](../../../en/concept/connector-v2-features.md) + +## 描述 + +用于通过数据调用飞书的web hooks。 + +> 例如,如果来自上游的数据是 [`年龄: 12, 姓名: tyrantlucifer`],则 body 内容如下:`{"年龄": 12, "姓名": "tyrantlucifer"}` + +**提示:飞书接收器仅支持 `post json`类型的web hook,并且源数据将被视为web hook的正文内容。** + +## 数据类型映射 + +| SeaTunnel 数据类型 | 飞书数据类型 | +|-----------------------------|------------| +| ROW
MAP | Json | +| NULL | null | +| BOOLEAN | boolean | +| TINYINT | byte | +| SMALLINT | short | +| INT | int | +| BIGINT | long | +| FLOAT | float | +| DOUBLE | double | +| DECIMAL | BigDecimal | +| BYTES | byte[] | +| STRING | String | +| TIME
TIMESTAMP
TIME | String | +| ARRAY | JsonArray | + +## 接收器选项 + +| 名称 | 类型 | 是否必需 | 默认值 | 描述 | +|----------------|--------|------|-----|------------------------------------------------------------------------------------| +| url | String | 是 | - | 飞书web hook URL | +| headers | Map | 否 | - | HTTP 请求头 | +| common-options | | 否 | - | 接收器插件常见参数,请参阅 [接收器通用选项](../../../en/connector-v2/source/common-options.md) 以获取详细信息 | + +## 任务示例 + +### 简单示例: + +```hocon +Feishu { + url = "https://www.feishu.cn/flow/api/trigger-webhook/108bb8f208d9b2378c8c7aedad715c19" + } +``` + +## 更新日志 + +### 2.2.0-beta 2022-09-26 + +- 添加飞书接收器 + diff --git a/docs/zh/connector-v2/sink/HdfsFile.md b/docs/zh/connector-v2/sink/HdfsFile.md new file mode 100644 index 000000000000..1ed6ea2d5692 --- /dev/null +++ b/docs/zh/connector-v2/sink/HdfsFile.md @@ -0,0 +1,200 @@ +# Hdfs文件 + +> Hdfs文件 数据接收器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [x] [精确一次](../../../en/concept/connector-v2-features.md) + +默认情况下,我们使用2PC提交来确保"精确一次" + +- [x] 文件格式类型 + - [x] 文本 + - [x] CSV + - [x] Parquet + - [x] ORC + - [x] JSON + - [x] Excel +- [x] 压缩编解码器 + - [x] lzo + +## 描述 + +将数据输出到Hdfs文件 + +## 支持的数据源信息 + +| 数据源 | 支持的版本 | +|--------|------------------| +| Hdfs文件 | hadoop 2.x 和 3.x | + +## 接收器选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|----------------------------------|---------|------|--------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| fs.defaultFS | string | 是 | - | 以 `hdfs://` 开头的 Hadoop 集群地址,例如:`hdfs://hadoopcluster` | +| path | string | 是 | - | 目标目录路径是必需的。 | +| tmp_path | string | 是 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 `mv` 命令将临时目录提交到目标目录。需要一个Hdfs路径。 | +| hdfs_site_path | string | 否 | - | `hdfs-site.xml` 的路径,用于加载 namenodes 的 ha 配置。 | +| custom_filename | boolean | 否 | false | 是否需要自定义文件名 | +| file_name_expression | string | 否 | "${transactionId}" | 仅在 `custom_filename` 为 `true` 时使用。`file_name_expression` 描述将创建到 `path` 中的文件表达式。我们可以在 `file_name_expression` 中添加变量 `${now}` 或 `${uuid}`,例如 `test_${uuid}_${now}`,`${now}` 表示当前时间,其格式可以通过指定选项 `filename_time_format` 来定义。请注意,如果 `is_enable_transaction` 为 `true`,我们将在文件头部自动添加 `${transactionId}_`。 | +| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅在 `custom_filename` 为 `true` 时使用。当 `file_name_expression` 参数中的格式为 `xxxx-${now}` 时,`filename_time_format` 可以指定路径的时间格式,默认值为 `yyyy.MM.dd`。常用的时间格式如下所示:[y:年,M:月,d:月中的一天,H:一天中的小时(0-23),m:小时中的分钟,s:分钟中的秒] | +| file_format_type | string | 否 | "csv" | 我们支持以下文件类型:`text` `json` `csv` `orc` `parquet` `excel`。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 `txt`。 | +| field_delimiter | string | 否 | '\001' | 仅在 file_format 为 text 时使用,数据行中列之间的分隔符。仅需要 `text` 文件格式。 | +| row_delimiter | string | 否 | "\n" | 仅在 file_format 为 text 时使用,文件中行之间的分隔符。仅需要 `text` 文件格式。 | +| have_partition | boolean | 否 | false | 是否需要处理分区。 | +| partition_by | array | 否 | - | 仅在 have_partition 为 true 时使用,根据选定的字段对数据进行分区。 | +| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 仅在 have_partition 为 true 时使用,如果指定了 `partition_by`,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。默认 `partition_dir_expression` 为 `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`。`k0` 是第一个分区字段,`v0` 是第一个分区字段的值。 | +| is_partition_field_write_in_file | boolean | 否 | false | 仅当 `have_partition` 为 `true` 时使用。如果 `is_partition_field_write_in_file` 为 `true`,则分区字段及其值将写入数据文件中。例如,如果要写入Hive数据文件,则其值应为 `false`。 | +| sink_columns | array | 否 | | 当此参数为空时,所有字段都是接收器列。需要写入文件的列,默认值是从 `Transform` 或 `Source` 获取的所有列。字段的顺序确定了实际写入文件时的顺序。 | +| is_enable_transaction | boolean | 否 | true | 如果 `is_enable_transaction` 为 true,则在将数据写入目标目录时,我们将确保数据不会丢失或重复。请注意,如果 `is_enable_transaction` 为 `true`,我们将在文件头部自动添加 `${transactionId}_`。目前仅支持 `true`。 | +| batch_size | int | 否 | 1000000 | 文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 `batch_size` 和 `checkpoint.interval` 共同决定。如果 `checkpoint.interval` 的值足够大,则接收器写入器将在文件中写入行,直到文件中的行大于 `batch_size`。如果 `checkpoint.interval` 很小,则接收器写入器将在新检查点触发时创建一个新文件。 | +| compress_codec | string | 否 | none | 文件的压缩编解码器及其支持的细节如下所示:[txt: `lzo` `none`,json: `lzo` `none`,csv: `lzo` `none`,orc: `lzo` `snappy` `lz4` `zlib` `none`,parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`]。提示:excel类型不支持任何压缩格式。 | +| krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径 | +| kerberos_principal | string | 否 | - | kerberos 的主体 | +| kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径 | +| compress_codec | string | 否 | none | 压缩编解码器 | +| common-options | object | 否 | - | 接收器插件通用参数,请参阅 [接收器通用选项](../../../en/connector-v2/source/common-options.md) 了解详情 | +| max_rows_in_memory | int | 否 | - | 仅当 file_format 为 excel 时使用。当文件格式为 Excel 时,可以缓存在内存中的最大数据项数。 | +| sheet_name | string | 否 | Sheet${Random number} | 仅当 file_format 为 excel 时使用。将工作簿的表写入指定的表名 | + +### 提示 + +> 如果您使用 spark/flink,为了使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。 + +## 任务示例 + +### 简单示例: + +> 此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 Hdfs。 + +``` +# 定义运行时环境 +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + # 这是一个示例源插件 **仅用于测试和演示功能源插件** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的源端插件列表, + # 请访问 https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的转换插件列表, + # 请访问 https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + file_format_type = "orc" + } + # 如果您想获取有关如何配置 seatunnel 的更多信息和查看完整的接收器插件列表, + # 请访问 https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + +### orc 文件格式的简单配置 + +``` +HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + file_format_type = "orc" +} +``` + +### text 文件格式的配置,包括 `have_partition`、`custom_filename` 和 `sink_columns` + +``` +HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + file_format_type = "text" + field_delimiter = "\t" + row_delimiter = "\n" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + filename_time_format = "yyyy.MM.dd" + sink_columns = ["name","age"] + is_enable_transaction = true +} +``` + +### parquet 文件格式的配置,包括 `have_partition`、`custom_filename` 和 `sink_columns` + +``` +HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + have_partition = true + partition_by = ["age"] + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + custom_filename = true + file_name_expression = "${transactionId}_${now}" + filename_time_format = "yyyy.MM.dd" + file_format_type = "parquet" + sink_columns = ["name","age"] + is_enable_transaction = true +} +``` + +### kerberos 的简单配置 + +``` +HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + hdfs_site_path = "/path/to/your/hdfs_site_path" + kerberos_principal = "your_principal@EXAMPLE.COM" + kerberos_keytab_path = "/path/to/your/keytab/file.keytab" +} +``` + +### 压缩的简单配置 + +``` +HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + compress_codec = "lzo" +} +``` + diff --git a/docs/zh/connector-v2/source/HdfsFile.md b/docs/zh/connector-v2/source/HdfsFile.md new file mode 100644 index 000000000000..c9c1667da0a8 --- /dev/null +++ b/docs/zh/connector-v2/source/HdfsFile.md @@ -0,0 +1,127 @@ +# Hdfs文件 + +> Hdfs文件 数据源连接器 + +## 支持的引擎 + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## 主要特性 + +- [x] [批处理](../../../en/concept/connector-v2-features.md) +- [ ] [流处理](../../../en/concept/connector-v2-features.md) +- [x] [精确一次](../../../en/concept/connector-v2-features.md) + +在一次 pollNext 调用中读取分片中的所有数据。将读取的分片保存在快照中。 + +- [x] [列投影](../../../en/concept/connector-v2-features.md) +- [x] [并行度](../../../en/concept/connector-v2-features.md) +- [ ] [支持用户定义的分片](../../../en/concept/connector-v2-features.md) +- [x] 文件格式 + - [x] 文本 + - [x] CSV + - [x] Parquet + - [x] ORC + - [x] JSON + - [x] Excel + +## 描述 + +从Hdfs文件系统中读取数据。 + +## 支持的数据源信息 + +| 数据源 | 支持的版本 | +|--------|------------------| +| Hdfs文件 | hadoop 2.x 和 3.x | + +## 源选项 + +| 名称 | 类型 | 是否必须 | 默认值 | 描述 | +|---------------------------|---------|------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| path | string | 是 | - | 源文件路径。 | +| file_format_type | string | 是 | - | 我们支持以下文件类型:`text` `json` `csv` `orc` `parquet` `excel`。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 `txt`。 | +| fs.defaultFS | string | 是 | - | 以 `hdfs://` 开头的 Hadoop 集群地址,例如:`hdfs://hadoopcluster`。 | +| read_columns | list | 是 | - | 数据源的读取列列表,用户可以使用它实现字段投影。支持的文件类型的列投影如下所示:[text,json,csv,orc,parquet,excel]。提示:如果用户在读取 `text` `json` `csv` 文件时想要使用此功能,必须配置 schema 选项。 | +| hdfs_site_path | string | 否 | - | `hdfs-site.xml` 的路径,用于加载 namenodes 的 ha 配置。 | +| delimiter/field_delimiter | string | 否 | \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认 `\001`,与 Hive 的默认分隔符相同。 | +| parse_partition_from_path | boolean | 否 | true | 控制是否从文件路径中解析分区键和值。例如,如果您从路径 `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` 读取文件,则来自文件的每条记录数据将添加这两个字段:[name:tyrantlucifer,age:26]。提示:不要在 schema 选项中定义分区字段。 | +| date_format | string | 否 | yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持的格式如下:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`,默认 `yyyy-MM-dd`。日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持的格式如下:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`,默认 `yyyy-MM-dd HH:mm:ss`。 | +| time_format | string | 否 | HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持的格式如下:`HH:mm:ss` `HH:mm:ss.SSS`,默认 `HH:mm:ss`。 | +| remote_user | string | 否 | - | 用于连接 Hadoop 的登录用户。它旨在用于 RPC 中的远程用户,不会有任何凭据。 | +| krb5_path | string | 否 | /etc/krb5.conf | kerberos 的 krb5 路径。 | +| kerberos_principal | string | 否 | - | kerberos 的 principal。 | +| kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径。 | +| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件中的前两行。 | +| schema | config | 否 | - | 上游数据的模式字段。 | +| sheet_name | string | 否 | - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 | +| compress_codec | string | 否 | none | 文件的压缩编解码器。 | +| common-options | | 否 | - | 源插件通用参数,请参阅 [源通用选项](../../../en/connector-v2/source/common-options.md) 获取详细信息。 | + +### delimiter/field_delimiter [string] + +**delimiter** 参数在版本 2.3.5 后将被弃用,请改用 **field_delimiter**。 + +### compress_codec [string] + +文件的压缩编解码器及支持的详细信息如下所示: + +- txt:`lzo` `none` +- json:`lzo` `none` +- csv:`lzo` `none` +- orc/parquet: + 自动识别压缩类型,无需额外设置。 + +### 提示 + +> 如果您使用 spark/flink,为了 + +使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。 + +## 任务示例 + +### 简单示例: + +> 此示例定义了一个 SeaTunnel 同步任务,从 Hdfs 中读取数据并将其发送到 Hdfs。 + +``` +# 定义运行时环境 +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + HdfsFile { + schema { + fields { + name = string + age = int + } + } + path = "/apps/hive/demo/student" + type = "json" + fs.defaultFS = "hdfs://namenode001" + } + # 如果您想获取有关如何配置 seatunnel 和查看源插件完整列表的更多信息, + # 请访问 https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # 如果您想获取有关如何配置 seatunnel 和查看转换插件完整列表的更多信息, + # 请访问 https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + HdfsFile { + fs.defaultFS = "hdfs://hadoopcluster" + path = "/tmp/hive/warehouse/test2" + file_format = "orc" + } + # 如果您想获取有关如何配置 seatunnel 和查看接收器插件完整列表的更多信息, + # 请访问 https://seatunnel.apache.org/docs/category/sink-v2 +} +``` +