Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
dzygoon authored Feb 28, 2024
2 parents 50f501b + 4d0ca15 commit 44dc2c4
Show file tree
Hide file tree
Showing 9 changed files with 593 additions and 54 deletions.
66 changes: 66 additions & 0 deletions docs/zh/connector-v2/sink/Feishu.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# 飞书

> 飞书 数据接收器
## 支持的引擎

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## 主要特性

- [ ] [精确一次](../../../en/concept/connector-v2-features.md)
- [ ] [变更数据捕获](../../../en/concept/connector-v2-features.md)

## 描述

用于通过数据调用飞书的web hooks。

> 例如,如果来自上游的数据是 [`年龄: 12, 姓名: tyrantlucifer`],则 body 内容如下:`{"年龄": 12, "姓名": "tyrantlucifer"}`
**提示:飞书接收器仅支持 `post json`类型的web hook,并且源数据将被视为web hook的正文内容。**

## 数据类型映射

| SeaTunnel 数据类型 | 飞书数据类型 |
|-----------------------------|------------|
| ROW<br/>MAP | Json |
| NULL | null |
| BOOLEAN | boolean |
| TINYINT | byte |
| SMALLINT | short |
| INT | int |
| BIGINT | long |
| FLOAT | float |
| DOUBLE | double |
| DECIMAL | BigDecimal |
| BYTES | byte[] |
| STRING | String |
| TIME<br/>TIMESTAMP<br/>TIME | String |
| ARRAY | JsonArray |

## 接收器选项

| 名称 | 类型 | 是否必需 | 默认值 | 描述 |
|----------------|--------|------|-----|------------------------------------------------------------------------------------|
| url | String || - | 飞书web hook URL |
| headers | Map || - | HTTP 请求头 |
| common-options | || - | 接收器插件常见参数,请参阅 [接收器通用选项](../../../en/connector-v2/source/common-options.md) 以获取详细信息 |

## 任务示例

### 简单示例:

```hocon
Feishu {
url = "https://www.feishu.cn/flow/api/trigger-webhook/108bb8f208d9b2378c8c7aedad715c19"
}
```

## 更新日志

### 2.2.0-beta 2022-09-26

- 添加飞书接收器

200 changes: 200 additions & 0 deletions docs/zh/connector-v2/sink/HdfsFile.md

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions docs/zh/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Hdfs文件

> Hdfs文件 数据源连接器
## 支持的引擎

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
## 主要特性

- [x] [批处理](../../../en/concept/connector-v2-features.md)
- [ ] [流处理](../../../en/concept/connector-v2-features.md)
- [x] [精确一次](../../../en/concept/connector-v2-features.md)

在一次 pollNext 调用中读取分片中的所有数据。将读取的分片保存在快照中。

- [x] [列投影](../../../en/concept/connector-v2-features.md)
- [x] [并行度](../../../en/concept/connector-v2-features.md)
- [ ] [支持用户定义的分片](../../../en/concept/connector-v2-features.md)
- [x] 文件格式
- [x] 文本
- [x] CSV
- [x] Parquet
- [x] ORC
- [x] JSON
- [x] Excel

## 描述

从Hdfs文件系统中读取数据。

## 支持的数据源信息

| 数据源 | 支持的版本 |
|--------|------------------|
| Hdfs文件 | hadoop 2.x 和 3.x |

## 源选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|---------------------------|---------|------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| path | string || - | 源文件路径。 |
| file_format_type | string || - | 我们支持以下文件类型:`text` `json` `csv` `orc` `parquet` `excel`。请注意,最终文件名将以文件格式的后缀结束,文本文件的后缀是 `txt`|
| fs.defaultFS | string || - |`hdfs://` 开头的 Hadoop 集群地址,例如:`hdfs://hadoopcluster`|
| read_columns | list || - | 数据源的读取列列表,用户可以使用它实现字段投影。支持的文件类型的列投影如下所示:[text,json,csv,orc,parquet,excel]。提示:如果用户在读取 `text` `json` `csv` 文件时想要使用此功能,必须配置 schema 选项。 |
| hdfs_site_path | string || - | `hdfs-site.xml` 的路径,用于加载 namenodes 的 ha 配置。 |
| delimiter/field_delimiter | string || \001 | 字段分隔符,用于告诉连接器在读取文本文件时如何切分字段。默认 `\001`,与 Hive 的默认分隔符相同。 |
| parse_partition_from_path | boolean || true | 控制是否从文件路径中解析分区键和值。例如,如果您从路径 `hdfs://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26` 读取文件,则来自文件的每条记录数据将添加这两个字段:[name:tyrantlucifer,age:26]。提示:不要在 schema 选项中定义分区字段。 |
| date_format | string || yyyy-MM-dd | 日期类型格式,用于告诉连接器如何将字符串转换为日期,支持的格式如下:`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`,默认 `yyyy-MM-dd`。日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持的格式如下:`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`,默认 `yyyy-MM-dd HH:mm:ss`|
| time_format | string || HH:mm:ss | 时间类型格式,用于告诉连接器如何将字符串转换为时间,支持的格式如下:`HH:mm:ss` `HH:mm:ss.SSS`,默认 `HH:mm:ss`|
| remote_user | string || - | 用于连接 Hadoop 的登录用户。它旨在用于 RPC 中的远程用户,不会有任何凭据。 |
| krb5_path | string || /etc/krb5.conf | kerberos 的 krb5 路径。 |
| kerberos_principal | string || - | kerberos 的 principal。 |
| kerberos_keytab_path | string || - | kerberos 的 keytab 路径。 |
| skip_header_row_number | long || 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件中的前两行。 |
| schema | config || - | 上游数据的模式字段。 |
| sheet_name | string || - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 |
| compress_codec | string || none | 文件的压缩编解码器。 |
| common-options | || - | 源插件通用参数,请参阅 [源通用选项](../../../en/connector-v2/source/common-options.md) 获取详细信息。 |

### delimiter/field_delimiter [string]

**delimiter** 参数在版本 2.3.5 后将被弃用,请改用 **field_delimiter**

### compress_codec [string]

文件的压缩编解码器及支持的详细信息如下所示:

- txt:`lzo` `none`
- json:`lzo` `none`
- csv:`lzo` `none`
- orc/parquet:
自动识别压缩类型,无需额外设置。

### 提示

> 如果您使用 spark/flink,为了
使用此连接器,您必须确保您的 spark/flink 集群已经集成了 hadoop。测试过的 hadoop 版本是 2.x。如果您使用 SeaTunnel Engine,则在下载和安装 SeaTunnel Engine 时会自动集成 hadoop jar。您可以检查 `${SEATUNNEL_HOME}/lib` 下的 jar 包来确认这一点。

## 任务示例

### 简单示例:

> 此示例定义了一个 SeaTunnel 同步任务,从 Hdfs 中读取数据并将其发送到 Hdfs。
```
# 定义运行时环境
env {
parallelism = 1
job.mode = "BATCH"
}
source {
HdfsFile {
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
type = "json"
fs.defaultFS = "hdfs://namenode001"
}
# 如果您想获取有关如何配置 seatunnel 和查看源插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/category/source-v2
}
transform {
# 如果您想获取有关如何配置 seatunnel 和查看转换插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/category/transform-v2
}
sink {
HdfsFile {
fs.defaultFS = "hdfs://hadoopcluster"
path = "/tmp/hive/warehouse/test2"
file_format = "orc"
}
# 如果您想获取有关如何配置 seatunnel 和查看接收器插件完整列表的更多信息,
# 请访问 https://seatunnel.apache.org/docs/category/sink-v2
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class SinkConfig extends InfluxDBConfig {
public SinkConfig(Config config) {
super(config);
loadConfig(config);
}

public static final Option<String> KEY_TIME =
Expand Down Expand Up @@ -103,35 +104,32 @@ public SinkConfig(Config config) {
private int maxRetryBackoffMs;
private TimePrecision precision = DEFAULT_TIME_PRECISION;

public static SinkConfig loadConfig(Config config) {
SinkConfig sinkConfig = new SinkConfig(config);
public void loadConfig(Config config) {

if (config.hasPath(KEY_TIME.key())) {
sinkConfig.setKeyTime(config.getString(KEY_TIME.key()));
setKeyTime(config.getString(KEY_TIME.key()));
}
if (config.hasPath(KEY_TAGS.key())) {
sinkConfig.setKeyTags(config.getStringList(KEY_TAGS.key()));
setKeyTags(config.getStringList(KEY_TAGS.key()));
}
if (config.hasPath(MAX_RETRIES.key())) {
sinkConfig.setMaxRetries(config.getInt(MAX_RETRIES.key()));
setMaxRetries(config.getInt(MAX_RETRIES.key()));
}
if (config.hasPath(RETRY_BACKOFF_MULTIPLIER_MS.key())) {
sinkConfig.setRetryBackoffMultiplierMs(
config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
setRetryBackoffMultiplierMs(config.getInt(RETRY_BACKOFF_MULTIPLIER_MS.key()));
}
if (config.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
sinkConfig.setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
setMaxRetryBackoffMs(config.getInt(MAX_RETRY_BACKOFF_MS.key()));
}
if (config.hasPath(WRITE_TIMEOUT.key())) {
sinkConfig.setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
setWriteTimeout(config.getInt(WRITE_TIMEOUT.key()));
}
if (config.hasPath(RETENTION_POLICY.key())) {
sinkConfig.setRp(config.getString(RETENTION_POLICY.key()));
setRp(config.getString(RETENTION_POLICY.key()));
}
if (config.hasPath(EPOCH.key())) {
sinkConfig.setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
setPrecision(TimePrecision.getPrecision(config.getString(EPOCH.key())));
}
sinkConfig.setMeasurement(config.getString(KEY_MEASUREMENT.key()));
return sinkConfig;
setMeasurement(config.getString(KEY_MEASUREMENT.key()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,61 +17,36 @@

package org.apache.seatunnel.connectors.seatunnel.influxdb.sink;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.influxdb.exception.InfluxdbConnectorException;

import com.google.auto.service.AutoService;
import org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig;

import java.io.IOException;

import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.InfluxDBConfig.URL;
import static org.apache.seatunnel.connectors.seatunnel.influxdb.config.SinkConfig.KEY_MEASUREMENT;
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void>
implements SupportMultiTableSink {

@AutoService(SeaTunnelSink.class)
public class InfluxDBSink extends AbstractSimpleSink<SeaTunnelRow, Void> {

private Config pluginConfig;
private SeaTunnelRowType seaTunnelRowType;
private SinkConfig sinkConfig;

@Override
public String getPluginName() {
return "InfluxDB";
}

@Override
public void prepare(Config config) throws PrepareFailException {
CheckResult result =
CheckConfigUtil.checkAllExists(config, URL.key(), KEY_MEASUREMENT.key());
if (!result.isSuccess()) {
throw new InfluxdbConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format(
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SINK, result.getMsg()));
}
this.pluginConfig = config;
}

@Override
public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public InfluxDBSink(SinkConfig sinkConfig, CatalogTable catalogTable) {
this.sinkConfig = sinkConfig;
this.seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
}

@Override
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context)
throws IOException {
return new InfluxDBSinkWriter(pluginConfig, seaTunnelRowType);
return new InfluxDBSinkWriter(sinkConfig, seaTunnelRowType);
}
}
Loading

0 comments on commit 44dc2c4

Please sign in to comment.