From 9009b2ade866227807718ab638380c1c9ff2dacd Mon Sep 17 00:00:00 2001 From: dzygoon <96854451+dzygoon@users.noreply.github.com> Date: Wed, 28 Feb 2024 17:20:55 +0800 Subject: [PATCH 1/7] [Fix][Doc]Seatunnel Engine/checkpoint-storage.md doc error(#6369) (#6404) Co-authored-by: dzy --- docs/en/seatunnel-engine/checkpoint-storage.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md index 29e16030588..332a3951bf0 100644 --- a/docs/en/seatunnel-engine/checkpoint-storage.md +++ b/docs/en/seatunnel-engine/checkpoint-storage.md @@ -143,7 +143,7 @@ seatunnel: fs.defaultFS: hdfs://localhost:9000 // if you used kerberos, you can config like this: kerberosPrincipal: your-kerberos-principal - kerberosKeytab: your-kerberos-keytab + kerberosKeytabFilePath: your-kerberos-keytab ``` if HDFS is in HA mode , you can config like this: From 48566458371df38548283d129d0b06f2fd6ebd43 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Thu, 29 Feb 2024 17:04:29 +0800 Subject: [PATCH 2/7] [Improve][CDC] Optimize memory allocation for snapshot split reading (#6281) --- .../reader/external/IncrementalSourceScanFetcher.java | 4 ++++ .../reader/external/IncrementalSourceStreamFetcher.java | 4 ++++ .../cdc/mongodb/source/fetch/MongodbFetchTaskContext.java | 7 ++++++- .../source/reader/fetch/MySqlSourceFetchTaskContext.java | 6 +++++- .../source/reader/fetch/OracleSourceFetchTaskContext.java | 5 ++++- .../source/reader/PostgresSourceFetchTaskContext.java | 5 ++++- .../reader/fetch/SqlServerSourceFetchTaskContext.java | 5 ++++- 7 files changed, 31 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java index 127b09406b6..da048b47e54 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceScanFetcher.java @@ -88,6 +88,10 @@ public void submitTask(FetchTask fetchTask) { executorService.submit( () -> { try { + log.info( + "Start snapshot read task for snapshot split: {} exactly-once: {}", + currentSnapshotSplit, + taskContext.isExactlyOnce()); snapshotSplitReadTask.execute(taskContext); } catch (Exception e) { log.error( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 31fdaaf2e50..e34970054b1 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -99,6 +99,10 @@ public void submitTask(FetchTask fetchTask) { executorService.submit( () -> { try { + log.info( + "Start incremental read task for incremental split: {} exactly-once: {}", + currentIncrementalSplit, + taskContext.isExactlyOnce()); streamFetchTask.execute(taskContext); } catch (Exception e) { log.error( diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java index 263b85d4931..9275c54bd2c 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/source/fetch/MongodbFetchTaskContext.java @@ -89,8 +89,13 @@ public MongodbFetchTaskContext( } public void configure(@Nonnull SourceSplitBase sourceSplitBase) { + // If in the snapshot read phase and enable exactly-once, the queue needs to be set to a + // maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise, + // use the configuration queue size. final int queueSize = - sourceSplitBase.isSnapshotSplit() ? Integer.MAX_VALUE : sourceConfig.getBatchSize(); + sourceSplitBase.isSnapshotSplit() && isExactlyOnce() + ? Integer.MAX_VALUE + : sourceConfig.getBatchSize(); this.changeEventQueue = new ChangeEventQueue.Builder() .pollInterval(Duration.ofMillis(sourceConfig.getPollAwaitTimeMillis())) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index 0b6ea40eaa9..e61dee226e5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -122,8 +122,12 @@ public void configure(SourceSplitBase sourceSplitBase) { this.taskContext = new MySqlTaskContextImpl(connectorConfig, databaseSchema, binaryLogClient); + + // If in the snapshot read phase and enable exactly-once, the queue needs to be set to a + // maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise, + // use the configuration queue size. final int queueSize = - sourceSplitBase.isSnapshotSplit() + sourceSplitBase.isSnapshotSplit() && isExactlyOnce() ? Integer.MAX_VALUE : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); this.queue = diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 3d7e7e3a6c7..184c2294a3f 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -119,8 +119,11 @@ public void configure(SourceSplitBase sourceSplitBase) { this.taskContext = new OracleTaskContext(connectorConfig, databaseSchema); + // If in the snapshot read phase and enable exactly-once, the queue needs to be set to a + // maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise, + // use the configuration queue size. final int queueSize = - sourceSplitBase.isSnapshotSplit() + sourceSplitBase.isSnapshotSplit() && isExactlyOnce() ? Integer.MAX_VALUE : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); this.queue = diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java index 091119ac50f..d5d725ed280 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/PostgresSourceFetchTaskContext.java @@ -145,8 +145,11 @@ public void configure(SourceSplitBase sourceSplitBase) { loadStartingOffsetState( new PostgresOffsetContext.Loader(connectorConfig), sourceSplitBase); + // If in the snapshot read phase and enable exactly-once, the queue needs to be set to a + // maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise, + // use the configuration queue size. final int queueSize = - sourceSplitBase.isSnapshotSplit() + sourceSplitBase.isSnapshotSplit() && isExactlyOnce() ? Integer.MAX_VALUE : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java index 200806f39b3..8178c9f30af 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/reader/fetch/SqlServerSourceFetchTaskContext.java @@ -122,8 +122,11 @@ public void configure(SourceSplitBase sourceSplitBase) { this.taskContext = new SqlServerTaskContext(connectorConfig, databaseSchema); + // If in the snapshot read phase and enable exactly-once, the queue needs to be set to a + // maximum size of `Integer.MAX_VALUE` (buffered a current snapshot all data). otherwise, + // use the configuration queue size. final int queueSize = - sourceSplitBase.isSnapshotSplit() + sourceSplitBase.isSnapshotSplit() && isExactlyOnce() ? Integer.MAX_VALUE : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); From 21a45935618dca42a48bc1365b3f1513856bdf1d Mon Sep 17 00:00:00 2001 From: zhongyuan du <96854451+dzygoon@users.noreply.github.com> Date: Thu, 29 Feb 2024 19:39:35 +0800 Subject: [PATCH 3/7] [Doc][Improve]fix doc start-v2/locally/quick-start-flink.md and start-v2/locally/quick-start-spark.md error (#6414) --- docs/en/start-v2/locally/quick-start-flink.md | 2 +- docs/en/start-v2/locally/quick-start-spark.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/start-v2/locally/quick-start-flink.md b/docs/en/start-v2/locally/quick-start-flink.md index 6b7b21f06af..2a7482ca222 100644 --- a/docs/en/start-v2/locally/quick-start-flink.md +++ b/docs/en/start-v2/locally/quick-start-flink.md @@ -61,7 +61,7 @@ sink { More information about config please check [config concept](../../concept/config.md) -## Step 3: Run SeaTunnel Application +## Step 4: Run SeaTunnel Application You could start the application by the following commands diff --git a/docs/en/start-v2/locally/quick-start-spark.md b/docs/en/start-v2/locally/quick-start-spark.md index 09786a22d6a..e057d479be2 100644 --- a/docs/en/start-v2/locally/quick-start-spark.md +++ b/docs/en/start-v2/locally/quick-start-spark.md @@ -62,7 +62,7 @@ sink { More information about config please check [config concept](../../concept/config.md) -## Step 3: Run SeaTunnel Application +## Step 4: Run SeaTunnel Application You could start the application by the following commands From 7b48a1674f0b2d6e978fb7c9ad30cbb2cc99cf87 Mon Sep 17 00:00:00 2001 From: lihjChina Date: Mon, 4 Mar 2024 19:42:49 +0800 Subject: [PATCH 4/7] add support chinese for transform-v2/* (#6384) --- docs/zh/transform-v2/common-options.md | 23 + docs/zh/transform-v2/copy.md | 65 ++ docs/zh/transform-v2/field-mapper.md | 64 ++ docs/zh/transform-v2/filter-rowkind.md | 68 ++ docs/zh/transform-v2/filter.md | 60 ++ docs/zh/transform-v2/jsonpath.md | 190 +++++ docs/zh/transform-v2/replace.md | 121 ++++ docs/zh/transform-v2/split.md | 72 ++ docs/zh/transform-v2/sql-functions.md | 966 +++++++++++++++++++++++++ docs/zh/transform-v2/sql-udf.md | 133 ++++ docs/zh/transform-v2/sql.md | 100 +++ 11 files changed, 1862 insertions(+) create mode 100644 docs/zh/transform-v2/common-options.md create mode 100644 docs/zh/transform-v2/copy.md create mode 100644 docs/zh/transform-v2/field-mapper.md create mode 100644 docs/zh/transform-v2/filter-rowkind.md create mode 100644 docs/zh/transform-v2/filter.md create mode 100644 docs/zh/transform-v2/jsonpath.md create mode 100644 docs/zh/transform-v2/replace.md create mode 100644 docs/zh/transform-v2/split.md create mode 100644 docs/zh/transform-v2/sql-functions.md create mode 100644 docs/zh/transform-v2/sql-udf.md create mode 100644 docs/zh/transform-v2/sql.md diff --git a/docs/zh/transform-v2/common-options.md b/docs/zh/transform-v2/common-options.md new file mode 100644 index 00000000000..9a756760f2c --- /dev/null +++ b/docs/zh/transform-v2/common-options.md @@ -0,0 +1,23 @@ +# 转换常见选项 + +> 源端连接器的常见参数 + +| 参数名称 | 参数类型 | 是否必须 | 默认值 | +|-------------------|--------|------|-----| +| result_table_name | string | no | - | +| source_table_name | string | no | - | + +### source_table_name [string] + +当未指定 `source_table_name` 时,当前插件在配置文件中处理由前一个插件输出的数据集 `(dataset)` ; + +当指定了 `source_table_name` 时,当前插件正在处理与该参数对应的数据集 + +### result_table_name [string] + +当未指定 `result_table_name` 时,此插件处理的数据不会被注册为其他插件可以直接访问的数据集,也不会被称为临时表 `(table)`; + +当指定了 `result_table_name` 时,此插件处理的数据将被注册为其他插件可以直接访问的数据集 `(dataset)`,或者被称为临时表 `(table)`。在这里注册的数据集可以通过指定 `source_table_name` 被其他插件直接访问。 + +## 示例 + diff --git a/docs/zh/transform-v2/copy.md b/docs/zh/transform-v2/copy.md new file mode 100644 index 00000000000..a4ca5c613a7 --- /dev/null +++ b/docs/zh/transform-v2/copy.md @@ -0,0 +1,65 @@ +# 复制 + +> 复制转换插件 + +## 描述 + +将字段复制到一个新字段。 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|--------|--------|------|-----| +| fields | Object | yes | | + +### fields [config] + +指定输入和输出之间的字段复制关系 + +### 常见选项 [string] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情。 + +## 示例 + +从源读取的数据是这样的一个表: + +| name | age | card | +|----------|-----|------| +| Joy Ding | 20 | 123 | +| May Ding | 20 | 123 | +| Kin Dom | 20 | 123 | +| Joy Dom | 20 | 123 | + +想要将字段 `name`、`age` 复制到新的字段 `name1`、`name2`、`age1`,我们可以像这样添加 `Copy` 转换: + +``` +transform { + Copy { + source_table_name = "fake" + result_table_name = "fake1" + fields { + name1 = name + name2 = name + age1 = age + } + } +} +``` + +那么结果表 `fake1` 中的数据将会像这样: + +| name | age | card | name1 | name2 | age1 | +|----------|-----|------|----------|----------|------| +| Joy Ding | 20 | 123 | Joy Ding | Joy Ding | 20 | +| May Ding | 20 | 123 | May Ding | May Ding | 20 | +| Kin Dom | 20 | 123 | Kin Dom | Kin Dom | 20 | +| Joy Dom | 20 | 123 | Joy Dom | Joy Dom | 20 | + +## 更新日志 + +### 新版本 + +- 添加复制转换连接器 +- 支持将字段复制到新字段 + diff --git a/docs/zh/transform-v2/field-mapper.md b/docs/zh/transform-v2/field-mapper.md new file mode 100644 index 00000000000..298d3fa72c9 --- /dev/null +++ b/docs/zh/transform-v2/field-mapper.md @@ -0,0 +1,64 @@ +# 字段映射 + +> 字段映射转换插件 + +## 描述 + +添加输入模式和输出模式映射 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|--------------|--------|------|-----| +| field_mapper | Object | yes | | + +### field_mapper [config] + +指定输入和输出之间的字段映射关系 + +### common options [config] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情 + +## 示例 + +源端数据读取的表格如下: + +| id | name | age | card | +|----|----------|-----|------| +| 1 | Joy Ding | 20 | 123 | +| 2 | May Ding | 20 | 123 | +| 3 | Kin Dom | 20 | 123 | +| 4 | Joy Dom | 20 | 123 | + +我们想要删除 `age` 字段,并更新字段顺序为 `id`、`card`、`name`,同时将 `name` 重命名为 `new_name`。我们可以像这样添加 `FieldMapper` 转换: + +``` +transform { + FieldMapper { + source_table_name = "fake" + result_table_name = "fake1" + field_mapper = { + id = id + card = card + name = new_name + } + } +} +``` + +那么结果表 `fake1` 中的数据将会像这样: + +| id | card | new_name | +|----|------|----------| +| 1 | 123 | Joy Ding | +| 2 | 123 | May Ding | +| 3 | 123 | Kin Dom | +| 4 | 123 | Joy Dom | + +## 更新日志 + +### 新版本 + +- 添加复制转换连接器 + diff --git a/docs/zh/transform-v2/filter-rowkind.md b/docs/zh/transform-v2/filter-rowkind.md new file mode 100644 index 00000000000..74d2b2d5b1e --- /dev/null +++ b/docs/zh/transform-v2/filter-rowkind.md @@ -0,0 +1,68 @@ +# 行类型过滤 + +> 行类型转换插件 + +## 描述 + +按行类型过滤数据 + +## 操作 + +| 名称 | 类型 | 是否必须 | 默认值 | +|---------------|-------|------|-----| +| include_kinds | array | yes | | +| exclude_kinds | array | yes | | + +### include_kinds [array] + +要包含的行类型 + +### exclude_kinds [array] + +要排除的行类型。 + +您只能配置 `include_kinds` 和 `exclude_kinds` 中的一个。 + +### common options [string] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情 + +## 示例 + +FakeSource 生成的数据的行类型是 `INSERT`。如果我们使用 `FilterRowKink` 转换并排除 `INSERT` 数据,我们将不会向接收器写入任何行。 + +```yaml + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + age = "int" + } + } + } +} + +transform { + FilterRowKind { + source_table_name = "fake" + result_table_name = "fake1" + exclude_kinds = ["INSERT"] + } +} + +sink { + Console { + source_table_name = "fake1" + } +} +``` + diff --git a/docs/zh/transform-v2/filter.md b/docs/zh/transform-v2/filter.md new file mode 100644 index 00000000000..706a72ead12 --- /dev/null +++ b/docs/zh/transform-v2/filter.md @@ -0,0 +1,60 @@ +# 过滤器 + +> 过滤器转换插件 + +## 描述 + +过滤字段 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|--------|-------|------|-----| +| fields | array | yes | | + +### fields [array] + +需要保留的字段列表。不在列表中的字段将被删除。 + +### common options [string] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情 + +## 示例 + +源端数据读取的表格如下: + +| name | age | card | +|----------|-----|------| +| Joy Ding | 20 | 123 | +| May Ding | 20 | 123 | +| Kin Dom | 20 | 123 | +| Joy Dom | 20 | 123 | + +我们想要删除字段 `age`,我们可以像这样添加 `Filter` 转换 + +``` +transform { + Filter { + source_table_name = "fake" + result_table_name = "fake1" + fields = [name, card] + } +} +``` + +那么结果表 `fake1` 中的数据将会像这样: + +| name | card | +|----------|------| +| Joy Ding | 123 | +| May Ding | 123 | +| Kin Dom | 123 | +| Joy Dom | 123 | + +## 更新日志 + +### 新版本 + +- 添加过滤转器换连接器 + diff --git a/docs/zh/transform-v2/jsonpath.md b/docs/zh/transform-v2/jsonpath.md new file mode 100644 index 00000000000..449f0f6a77f --- /dev/null +++ b/docs/zh/transform-v2/jsonpath.md @@ -0,0 +1,190 @@ +# JsonPath + +> JSONPath 转换插件 + +## 描述 + +> 支持使用 JSONPath 选择数据 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|---------|-------|------|-----| +| Columns | Array | Yes | | + +### common options [string] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情 + +### fields[array] + +#### 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|------------|--------|------|--------| +| src_field | String | Yes | | +| dest_field | String | Yes | | +| path | String | Yes | | +| dest_type | String | No | String | + +#### src_field + +> 要解析的 JSON 源字段 + +支持的Seatunnel数据类型 + +* STRING +* BYTES +* ARRAY +* MAP +* ROW + +#### dest_field + +> 使用 JSONPath 后的输出字段 + +#### dest_type + +> 目标字段的类型 + +#### path + +> Jsonpath + +## 读取 JSON 示例 + +从源读取的数据是像这样的 JSON + +```json +{ + "data": { + "c_string": "this is a string", + "c_boolean": true, + "c_integer": 42, + "c_float": 3.14, + "c_double": 3.14, + "c_decimal": 10.55, + "c_date": "2023-10-29", + "c_datetime": "16:12:43.459", + "c_array":["item1", "item2", "item3"] + } +} +``` + +假设我们想要使用 JsonPath 提取属性。 + +```json +transform { + JsonPath { + source_table_name = "fake" + result_table_name = "fake1" + columns = [ + { + "src_field" = "data" + "path" = "$.data.c_string" + "dest_field" = "c1_string" + }, + { + "src_field" = "data" + "path" = "$.data.c_boolean" + "dest_field" = "c1_boolean" + "dest_type" = "boolean" + }, + { + "src_field" = "data" + "path" = "$.data.c_integer" + "dest_field" = "c1_integer" + "dest_type" = "int" + }, + { + "src_field" = "data" + "path" = "$.data.c_float" + "dest_field" = "c1_float" + "dest_type" = "float" + }, + { + "src_field" = "data" + "path" = "$.data.c_double" + "dest_field" = "c1_double" + "dest_type" = "double" + }, + { + "src_field" = "data" + "path" = "$.data.c_decimal" + "dest_field" = "c1_decimal" + "dest_type" = "decimal(4,2)" + }, + { + "src_field" = "data" + "path" = "$.data.c_date" + "dest_field" = "c1_date" + "dest_type" = "date" + }, + { + "src_field" = "data" + "path" = "$.data.c_datetime" + "dest_field" = "c1_datetime" + "dest_type" = "time" + }, + { + "src_field" = "data" + "path" = "$.data.c_array" + "dest_field" = "c1_array" + "dest_type" = "array" + } + ] + } +} +``` + +那么数据结果表 `fake1` 将会像这样 + +| data | c1_string | c1_boolean | c1_integer | c1_float | c1_double | c1_decimal | c1_date | c1_datetime | c1_array | +|------------------------------|------------------|------------|------------|----------|-----------|------------|------------|--------------|-----------------------------| +| too much content not to show | this is a string | true | 42 | 3.14 | 3.14 | 10.55 | 2023-10-29 | 16:12:43.459 | ["item1", "item2", "item3"] | + +## 读取 SeatunnelRow 示例 + +假设数据行中的一列的类型是 SeatunnelRow,列的名称为 col + + + + + +
SeatunnelRow(col)other
nameage....
a18....
+ +JsonPath 转换将 seatunnel 的值转换为一个数组。 + +```json +transform { + JsonPath { + source_table_name = "fake" + result_table_name = "fake1" + columns = [ + { + "src_field" = "col" + "path" = "$[0]" + "dest_field" = "name" + "dest_type" = "string" + }, + { + "src_field" = "col" + "path" = "$[1]" + "dest_field" = "age" + "dest_type" = "int" + } + ] + } +} +``` + +那么数据结果表 `fake1` 将会像这样: + +| name | age | col | other | +|------|-----|----------|-------| +| a | 18 | ["a",18] | ... | + +## 更新日志 + +* 添加 JsonPath 转换 + diff --git a/docs/zh/transform-v2/replace.md b/docs/zh/transform-v2/replace.md new file mode 100644 index 00000000000..99eef89a1ab --- /dev/null +++ b/docs/zh/transform-v2/replace.md @@ -0,0 +1,121 @@ +# 替换 + +> 替换转换插件 + +## 描述 + +检查给定字段中的字符串值,并用给定的替换项替换与给定字符串字面量或正则表达式匹配的字符串值的子字符串。 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|---------------|---------|------|-------| +| replace_field | string | yes | | +| pattern | string | yes | - | +| replacement | string | yes | - | +| is_regex | boolean | no | false | +| replace_first | boolean | no | false | + +### replace_field [string] + +需要替换的字段 + +### pattern [string] + +将被替换的旧字符串 + +### replacement [string] + +用于替换的新字符串 + +### is_regex [boolean] + +使用正则表达式进行字符串匹配 + +### replace_first [boolean] + +是否替换第一个匹配字符串。仅在 `is_regex = true` 时使用。 + +### common options [string] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情 + +## 示例 + +源端数据读取的表格如下: + +| name | age | card | +|----------|-----|------| +| Joy Ding | 20 | 123 | +| May Ding | 20 | 123 | +| Kin Dom | 20 | 123 | +| Joy Dom | 20 | 123 | + +我们想要将 `name` 字段中的字符 ``替换为 `_`。然后我们可以添加一个 `Replace` 转换,像这样: + +``` +transform { + Replace { + source_table_name = "fake" + result_table_name = "fake1" + replace_field = "name" + pattern = " " + replacement = "_" + is_regex = true + } +} +``` + +那么结果表 `fake1` 中的数据将会更新为: + +| name | age | card | +|----------|-----|------| +| Joy_Ding | 20 | 123 | +| May_Ding | 20 | 123 | +| Kin_Dom | 20 | 123 | +| Joy_Dom | 20 | 123 | + +## 作业配置示例 + +``` +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + } + } + } +} + +transform { + Replace { + source_table_name = "fake" + result_table_name = "fake1" + replace_field = "name" + pattern = ".+" + replacement = "b" + is_regex = true + } +} + +sink { + Console { + source_table_name = "fake1" + } +} +``` + +## 更新日志 + +### 新版本 + +- 添加替换转换连接器 + diff --git a/docs/zh/transform-v2/split.md b/docs/zh/transform-v2/split.md new file mode 100644 index 00000000000..ef8c3f58540 --- /dev/null +++ b/docs/zh/transform-v2/split.md @@ -0,0 +1,72 @@ +# 拆分 + +> 拆分转换插件 + +## 描述 + +拆分一个字段为多个字段。 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|---------------|--------|------|-----| +| separator | string | yes | | +| split_field | string | yes | | +| output_fields | array | yes | | + +### separator [string] + +拆分内容的分隔符 + +### split_field[string] + +需要拆分的字段 + +### output_fields[array] + +拆分后的结果字段 + +### common options [string] + +转换插件的常见参数, 请参考 [Transform Plugin](common-options.md) 了解详情 + +## 示例 + +源端数据读取的表格如下: + +| name | age | card | +|----------|-----|------| +| Joy Ding | 20 | 123 | +| May Ding | 20 | 123 | +| Kin Dom | 20 | 123 | +| Joy Dom | 20 | 123 | + +我们想要将 `name` 字段拆分为 `first_name` 和 `second_name`,我们可以像这样添加 `Split` 转换: + +``` +transform { + Split { + source_table_name = "fake" + result_table_name = "fake1" + separator = " " + split_field = "name" + output_fields = [first_name, second_name] + } +} +``` + +那么结果表 `fake1` 中的数据将会像这样: + +| name | age | card | first_name | last_name | +|----------|-----|------|------------|-----------| +| Joy Ding | 20 | 123 | Joy | Ding | +| May Ding | 20 | 123 | May | Ding | +| Kin Dom | 20 | 123 | Kin | Dom | +| Joy Dom | 20 | 123 | Joy | Dom | + +## 更新日志 + +### 新版本 + +- 添加拆分转换连接器 + diff --git a/docs/zh/transform-v2/sql-functions.md b/docs/zh/transform-v2/sql-functions.md new file mode 100644 index 00000000000..cd90b948674 --- /dev/null +++ b/docs/zh/transform-v2/sql-functions.md @@ -0,0 +1,966 @@ +# SQL函数 + +> SQL函数转换插件功能 + +## 字符串函数 + +### ASCII + +```ASCII(string)``` + +返回字符串中第一个字符的ASCII值。此方法返回一个整数。 + +示例: + +ASCII('Hi') + +### BIT_LENGTH + +```BIT_LENGTH(bytes)``` + +返回二进制字符串中的位数。该方法返回一个长整型 + +示例: + +BIT_LENGTH(NAME) + +### CHAR_LENGTH / LENGTH + +```CHAR_LENGTH | LENGTH (string)``` + +这个方法返回一个字符串中字符的数量,返回类型为 long。 + +示例: + +CHAR_LENGTH(NAME) + +### OCTET_LENGTH + +```OCTET_LENGTH(bytes)``` + +返回二进制字符串中字节的数量。此方法返回一个 long 类型的值。 + +示例: + +OCTET_LENGTH(NAME) + +### CHAR / CHR + +```CHAR | CHR (int)``` + +返回表示ASCII值的字符。该方法返回一个字符串。 + +示例: + +CHAR(65) + +### CONCAT + +```CONCAT(string, string[, string ...] )``` + +组合字符串。与运算符 `||` 不同,**NULL** 参数会被忽略,不会导致结果变为 **NULL**。如果所有参数都是 NULL,则结果是一个空字符串。该方法返回一个字符串。 + +示例: + +CONCAT(NAME, '_') + +### CONCAT_WS + +```CONCAT_WS(separatorString, string, string[, string ...] )``` + +使用分隔符组合字符串。如果分隔符为 **NULL**,则会被视为空字符串。其他 **NULL** 参数会被忽略。剩余的 **非NULL** 参数(如果有)将用指定的分隔符连接起来。如果没有剩余参数,则结果是一个空字符串。该方法返回一个字符串。 + +示例: + +CONCAT_WS(',', NAME, '_') + +### HEXTORAW + +```HEXTORAW(string)``` + +将字符串的十六进制表示转换为字符串。每个字符串字符使用4个十六进制字符。 + +示例: + +HEXTORAW(DATA) + +### RAWTOHEX + +```RAWTOHEX(string)``` + +```RAWTOHEX(bytes)``` + +将字符串或字节转换为十六进制表示。每个字符串字符使用4个十六进制字符。该方法返回一个字符串。 + +示例: + +RAWTOHEX(DATA) + +### INSERT + +```INSERT(originalString, startInt, lengthInt, addString)``` + +在原始字符串的指定起始位置插入额外的字符串。长度参数指定在原始字符串的起始位置删除的字符数。该方法返回一个字符串。 + +示例: + +INSERT(NAME, 1, 1, ' ') + +### LOWER / LCASE + +```LOWER | LCASE (string)``` + +将字符串转换为小写形式。 + +示例: + +LOWER(NAME) + +### UPPER / UCASE + +```UPPER | UCASE (string)``` + +将字符串转换为大写形式。 + +示例: + +UPPER(NAME) + +### LEFT + +```LEFT(string, int)``` + +返回最左边的一定数量的字符。 + +示例: + +LEFT(NAME, 3) + +### RIGHT + +```RIGHT(string, int)``` + +返回最右边的一定数量的字符。 + +示例: + +RIGHT(NAME, 3) + +### LOCATE / INSTR / POSITION + +```LOCATE(searchString, string[, startInit])``` + +```INSTR(string, searchString[, startInit])``` + +```POSITION(searchString, string)``` + +返回字符串中搜索字符串的位置。如果使用了起始位置参数,则忽略它之前的字符。如果位置参数是负数,则返回最右边的位置。如果未找到搜索字符串,则返回 0。请注意,即使参数不区分大小写,此函数也区分大小写。 + +示例: + +LOCATE('.', NAME) + +### LPAD + +```LPAD(string ,int[, string])``` + +将字符串左侧填充到指定的长度。如果长度比字符串短,则字符串将在末尾被截断。如果未设置填充字符串,则使用空格填充。 + +示例: + +LPAD(AMOUNT, 10, '*') + +### RPAD + +```RPAD(string, int[, string])``` + +将字符串右侧填充到指定的长度。如果长度比字符串短,则字符串将被截断。如果未设置填充字符串,则使用空格填充。 + +示例: + +RPAD(TEXT, 10, '-') + +### LTRIM + +```LTRIM(string[, characterToTrimString])``` + +移除字符串中所有前导空格或其他指定的字符。 + +此函数已被弃用,请使用 TRIM 替代。 + +示例: + +LTRIM(NAME) + +### RTRIM + +```RTRIM(string[, characterToTrimString])``` + +移除字符串中所有尾随空格或其他指定的字符。 + +此函数已被弃用,请使用 TRIM 替代。 + +示例: + +RTRIM(NAME) + +### TRIM + +```TRIM(string[, characterToTrimString])``` + +移除字符串中所有前导空格或其他指定的字符。 + +此函数已被弃用,请使用 TRIM 替代。 + +示例: + +LTRIM(NAME) + +### REGEXP_REPLACE + +```REGEXP_REPLACE(inputString, regexString, replacementString[, flagsString])``` + +替换与正则表达式匹配的每个子字符串。详情请参阅 Java String.replaceAll() 方法。如果任何参数为 null(除了可选的 flagsString 参数),则结果为 null。 + +标志值限于 'i'、'c'、'n'、'm'。其他符号会引发异常。可以在一个 flagsString 参数中使用多个符号(例如 'im')。后面的标志会覆盖前面的标志,例如 'ic' 等同于区分大小写匹配 'c'。 + +'i' 启用不区分大小写匹配(Pattern.CASE_INSENSITIVE) + +'c' 禁用不区分大小写匹配(Pattern.CASE_INSENSITIVE) + +'n' 允许句点匹配换行符(Pattern.DOTALL) + +'m' 启用多行模式(Pattern.MULTILINE) + +示例: + +REGEXP_REPLACE('Hello World', ' +', ' ') +REGEXP_REPLACE('Hello WWWWorld', 'w+', 'W', 'i') + +### REGEXP_LIKE + +```REGEXP_LIKE(inputString, regexString[, flagsString])``` + +将字符串与正则表达式匹配。详情请参阅 Java Matcher.find() 方法。如果任何参数为 null(除了可选的 flagsString 参数),则结果为 null。 + +标志值限于 'i'、'c'、'n'、'm'。其他符号会引发异常。可以在一个 flagsString 参数中使用多个符号(例如 'im')。后面的标志会覆盖前面的标志,例如 'ic' 等同于区分大小写匹配 'c'。 + +'i' 启用不区分大小写匹配(Pattern.CASE_INSENSITIVE) + +'c' 禁用不区分大小写匹配(Pattern.CASE_INSENSITIVE) + +'n' 允许句点匹配换行符(Pattern.DOTALL) + +'m' 启用多行模式(Pattern.MULTILINE) + +示例: + +REGEXP_LIKE('Hello World', '[A-Z ]*', 'i') + +### REGEXP_SUBSTR + +```REGEXP_SUBSTR(inputString, regexString[, positionInt, occurrenceInt, flagsString, groupInt])``` + +将字符串与正则表达式匹配,并返回匹配的子字符串。详情请参阅 java.util.regex.Pattern 和相关功能。 + +参数 position 指定匹配应该从 inputString 的哪里开始。Occurrence 指示在 inputString 中搜索 pattern 的哪个出现。 + +标志值限于 'i'、'c'、'n'、'm'。其他符号会引发异常。可以在一个 flagsString 参数中使用多个符号(例如 'im')。后面的标志会覆盖前面的标志,例如 'ic' 等同于区分大小写匹配 'c'。 + +'i' 启用不区分大小写匹配(Pattern.CASE_INSENSITIVE) + +'c' 禁用不区分大小写匹配(Pattern.CASE_INSENSITIVE) + +'n' 允许句点匹配换行符(Pattern.DOTALL) + +'m' 启用多行模式(Pattern.MULTILINE) + +如果模式具有组,则可以使用 group 参数指定要返回的组。 + +示例: + +REGEXP_SUBSTR('2020-10-01', '\d{4}') +REGEXP_SUBSTR('2020-10-01', '(\d{4})-(\d{2})-(\d{2})', 1, 1, NULL, 2) + +### REPEAT + +```REPEAT(string, int)``` + +Returns a string repeated some number of times. + +示例: + +REPEAT(NAME || ' ', 10) + +### REPLACE + +```REPLACE(string, searchString[, replacementString])``` + +在文本中替换所有出现的搜索字符串为另一个字符串。如果没有指定替换字符串,则从原始字符串中移除搜索字符串。如果任何参数为 null,则结果为 null。 + +示例: + +REPLACE(NAME, ' ') + +### SOUNDEX + +```SOUNDEX(string)``` + +表示字符串发音。此方法返回一个字符串,如果参数为 null,则返回 null。有关更多信息,请参阅 https://en.wikipedia.org/wiki/Soundex 。 + +示例: + +SOUNDEX(NAME) + +### SPACE + +```SPACE(int)``` + +返回由一定数量的空格组成的字符串。 + +示例: + +SPACE(80) + +### SUBSTRING / SUBSTR + +```SUBSTRING | SUBSTR (string, startInt[, lengthInt ])``` + +返回从指定位置开始的字符串的子串。如果起始索引为负数,则相对于字符串的末尾计算起始索引。长度是可选的。 + +示例: + +CALL SUBSTRING('[Hello]', 2); +CALL SUBSTRING('hour', 3, 2); + +### TO_CHAR + +```TO_CHAR(value[, formatString])``` + +Oracle 兼容的 TO_CHAR 函数可用于格式化时间戳、数字或文本。 + +示例: + +CALL TO_CHAR(SYS_TIME, 'yyyy-MM-dd HH:mm:ss') + +### TRANSLATE + +```TRANSLATE(value, searchString, replacementString)``` + +Oracle 兼容的 TRANSLATE 函数用于将字符串中的一系列字符替换为另一组字符。 + +示例: + +CALL TRANSLATE('Hello world', 'eo', 'EO') + +## Numeric Functions + +### ABS + +```ABS(numeric)``` + +返回指定值的绝对值。返回的值与参数的数据类型相同。 + +请注意,TINYINT、SMALLINT、INT 和 BIGINT 数据类型无法表示它们的最小负值的绝对值,因为它们的负值比正值多。例如,对于 INT 数据类型,允许的值范围是从 -2147483648 到 2147483647。ABS(-2147483648) 应该是 2147483648,但是这个值对于这个数据类型是不允许的。这会导致异常。为了避免这种情况,请将此函数的参数转换为更高的数据类型。 + +示例: + +ABS(I) + +### ACOS + +```ACOS(numeric)``` + +计算反余弦值。另请参阅 Java Math.acos。该方法返回一个双精度浮点数。 + +示例: + +ACOS(D) + +### ASIN + +```ASIN(numeric)``` + +计算反正弦值。另请参阅 Java Math.asin。该方法返回一个双精度浮点数。 + +示例: + +ASIN(D) + +### ATAN + +```ATAN(numeric)``` + +计算反正切值。另请参阅 Java Math.atan。该方法返回一个双精度浮点数。 + +示例: + +ATAN(D) + +### COS + +```COS(numeric)``` + +计算三角余弦值。另请参阅 Java Math.cos。该方法返回一个双精度浮点数。 + +示例: + +COS(ANGLE) + +### COSH + +```COSH(numeric)``` + +计算双曲余弦值。另请参阅 Java Math.cosh。该方法返回一个双精度浮点数。 + +示例: + +COSH(X) + +### COT + +```COT(numeric)``` + +计算三角余切值(1/TAN(角度))。另请参阅 Java Math.* 函数。该方法返回一个双精度浮点数。 + +示例: + +COT(ANGLE) + +### SIN + +```SIN(numeric)``` + +计算三角正弦值。另请参阅 Java Math.sin。该方法返回一个双精度浮点数。 + +示例: + +SIN(ANGLE) + +### SINH + +```SINH(numeric)``` + +计算双曲正弦值。另请参阅 Java Math.sinh。该方法返回一个双精度浮点数。 + +示例: + +SINH(ANGLE) + +### TAN + +```TAN(numeric)``` + +计算三角正切值。另请参阅 Java Math.tan。该方法返回一个双精度浮点数。 + +示例: + +TAN(ANGLE) + +### TANH + +```TANH(numeric)``` + +计算双曲正切值。另请参阅 Java Math.tanh。该方法返回一个双精度浮点数。 + +示例: + +TANH(X) + +### MOD + +```MOD(dividendNumeric, divisorNumeric )``` + +取模运算表达式。 + +结果与除数的类型相同。如果任一参数为 NULL,则结果为 NULL。如果除数为 0,则会引发异常。结果与被除数的符号相同,或者等于 0。 + +通常情况下,参数应具有标度 0,但 H2 并不要求。 + +示例: + +MOD(A, B) + +### CEIL / CEILING + +```CEIL | CEILING (numeric)``` + +返回大于或等于参数的最小整数值。该方法返回与参数相同类型的值,但标度设置为 0,并且如果适用,则调整精度。 + +示例: + +CEIL(A) + +### EXP + +```EXP(numeric)``` + +请参阅 Java Math.exp。该方法返回一个双精度浮点数。 + +示例: + +EXP(A) + +### FLOOR + +```FLOOR(numeric)``` + +返回小于或等于参数的最大整数值。该方法返回与参数相同类型的值,但标度设置为 0,并且如果适用,则调整精度。 + +示例: + +FLOOR(A) + +### LN + +```LN(numeric)``` + +计算自然对数(以 e 为底)的双精度浮点数值。参数必须是一个正数值。 + +示例: + +LN(A) + +### LOG + +```LOG(baseNumeric, numeric)``` + +计算以指定底数的对数,返回一个双精度浮点数。参数和底数必须是正数值。底数不能等于1。 + +默认底数是 e(自然对数),在 PostgreSQL 模式下,默认底数是 10。在 MSSQLServer 模式下,可选的底数在参数之后指定。 + +LOG 函数的单参数变体已被弃用,请使用 LN 或 LOG10 替代。 + +示例: + +LOG(2, A) + +### LOG10 + +```LOG10(numeric)``` + +计算以 10 为底的对数,返回一个双精度浮点数。参数必须是一个正数值。 + +示例: + +LOG10(A) + +### RADIANS + +```RADIANS(numeric)``` + +请参阅 Java Math.toRadians。该方法返回一个双精度浮点数。 + +示例: + +RADIANS(A) + +### SQRT + +```SQRT(numeric)``` + +请参阅 Java Math.sqrt。该方法返回一个双精度浮点数。 + +示例: + +SQRT(A) + +### PI + +```PI()``` + +请参阅 Java Math.PI。该方法返回一个双精度浮点数。 + +示例: + +PI() + +### POWER + +```POWER(numeric, numeric)``` + +请参阅 Java Math.pow。该方法返回一个双精度浮点数。 + +示例: + +POWER(A, B) + +### RAND / RANDOM + +```RAND | RANDOM([ int ])``` + +如果不带参数调用该函数,则返回下一个伪随机数。如果带有参数调用,则将会给该会话的随机数生成器设定种子。该方法返回一个介于 0(包括)和 1(不包括)之间的双精度浮点数。 + +示例: + +RAND() + +### ROUND + +```ROUND(numeric[, digitsInt])``` + +四舍五入到指定的小数位数。该方法返回与参数相同类型的值,但如果适用,则调整精度和标度。 + +示例: + +ROUND(N, 2) + +### SIGN + +```SIGN(numeric)``` + +如果值小于 0,则返回 -1;如果值为零或 NaN,则返回 0;否则返回 1。 + +示例: + +SIGN(N) + +### TRUNC + +```TRUNC | TRUNCATE(numeric[, digitsInt])``` + +当指定了一个数值参数时,将其截断为指定的数字位数(接近0的下一个值),并返回与参数相同类型的值,但如果适用,则调整精度和标度。 + +示例: + +TRUNC(N, 2) + +## Time and Date Functions + +### CURRENT_DATE + +```CURRENT_DATE [()]``` + +返回当前日期。 + +这些函数在事务(默认)或命令内部返回相同的值,具体取决于数据库模式。 + +示例: + +CURRENT_DATE + +### CURRENT_TIME + +```CURRENT_TIME [()]``` + +返回带有系统时区的当前时间。实际可用的最大精度取决于操作系统和 JVM,可以是 3(毫秒)或更高。在 Java 9 之前不支持更高的精度。 + +示例: + +CURRENT_TIME + +### CURRENT_TIMESTAMP / NOW + +```CURRENT_TIMESTAMP[()] | NOW()``` + +返回带有系统时区的当前时间戳。实际可用的最大精度取决于操作系统和 JVM,可以是 3(毫秒)或更高。在 Java 9 之前不支持更高的精度。 + +示例: + +CURRENT_TIMESTAMP + +### DATEADD / TIMESTAMPADD + +```DATEADD| TIMESTAMPADD(dateAndTime, addIntLong, datetimeFieldString)``` + +将单位添加到日期时间值中。datetimeFieldString 表示单位。使用负值来减去单位。当操作毫秒、微秒或纳秒时,addIntLong 可能是一个 long 值,否则其范围被限制为 int。如果单位与指定值兼容,则此方法返回与指定值相同类型的值。如果指定的字段是 HOUR、MINUTE、SECOND、MILLISECOND 等,而值是 DATE 值,DATEADD 返回组合的 TIMESTAMP。对于 TIME 值,不允许使用 DAY、MONTH、YEAR、WEEK 等字段。 + +示例: + +DATEADD(CREATED, 1, 'MONTH') + +### DATEDIFF + +```DATEDIFF(aDateAndTime, bDateAndTime, datetimeFieldString)``` + +返回两个日期时间值之间跨越的单位边界数。此方法返回一个 long 值。datetimeField 表示单位。 + +示例: + +DATEDIFF(T1.CREATED, T2.CREATED, 'MONTH') + +### DATE_TRUNC + +```DATE_TRUNC (dateAndTime, datetimeFieldString)``` + +将指定的日期时间值截断到指定的字段。 + +示例: + +DATE_TRUNC(CREATED, 'DAY'); + +### DAYNAME + +```DAYNAME(dateAndTime)``` + +返回星期几的名称(英文)。 + +示例: + +DAYNAME(CREATED) + +### DAY_OF_MONTH + +```DAY_OF_MONTH(dateAndTime)``` + +返回月份中的日期(1-31)。 + +示例: + +DAY_OF_MONTH(CREATED) + +### DAY_OF_WEEK + +```DAY_OF_WEEK(dateAndTime)``` + +返回星期几的数值(1-7)(星期一至星期日),根据本地化设置。 + +示例: + +DAY_OF_WEEK(CREATED) + +### DAY_OF_YEAR + +```DAY_OF_YEAR(dateAndTime)``` + +返回一年中的日期(1-366)。 + +示例: + +DAY_OF_YEAR(CREATED) + +### EXTRACT + +```EXTRACT ( datetimeField FROM dateAndTime)``` + +从日期/时间值中返回特定时间单位的值。该方法对于 EPOCH 字段返回一个数值,对于其他字段返回一个整数。 + +示例: + +EXTRACT(SECOND FROM CURRENT_TIMESTAMP) + +### FORMATDATETIME + +```FORMATDATETIME (dateAndTime, formatString)``` + +将日期、时间或时间戳格式化为字符串。最重要的格式字符包括:y(年)、M(月)、d(日)、H(时)、m(分)、s(秒)。有关格式的详细信息,请参阅 java.time.format.DateTimeFormatter。 + +该方法返回一个字符串。 + +示例: + +CALL FORMATDATETIME(CREATED, 'yyyy-MM-dd HH:mm:ss') + +### HOUR + +```HOUR(dateAndTime)``` + +从日期/时间值中返回小时(0-23)。 + +示例: + +HOUR(CREATED) + +### MINUTE + +```MINUTE(dateAndTime)``` + +从日期/时间值中返回分钟(0-59)。 + +该函数已经被弃用,请使用 EXTRACT 替代。 + +示例: + +MINUTE(CREATED) + +### MONTH + +```MONTH(dateAndTime)``` + +从日期/时间值中返回月份(1-12)。 + +该函数已经被弃用,请使用 EXTRACT 替代。 + +示例: + +MONTH(CREATED) + +### MONTHNAME + +```MONTHNAME(dateAndTime)``` + +返回月份的名称(英文)。 + +示例: + +MONTHNAME(CREATED) + +### PARSEDATETIME / TO_DATE + +```PARSEDATETIME | TO_DATE(string, formatString)``` +解析一个字符串并返回一个 TIMESTAMP WITH TIME ZONE 值。最重要的格式字符包括:y(年)、M(月)、d(日)、H(时)、m(分)、s(秒)。有关格式的详细信息,请参阅 java.time.format.DateTimeFormatter。 + +示例: + +CALL PARSEDATETIME('2021-04-08 13:34:45','yyyy-MM-dd HH:mm:ss') + +### QUARTER + +```QUARTER(dateAndTime)``` + +从日期/时间值中返回季度(1-4)。 + +示例: + +QUARTER(CREATED) + +### SECOND + +```SECOND(dateAndTime)``` + +从日期/时间值中返回秒数(0-59)。 + +该函数已经被弃用,请使用 EXTRACT 替代。 + +示例: + +SECOND(CREATED) + +### WEEK + +```WEEK(dateAndTime)``` + +返回日期/时间值中的周数(1-53)。 + +该函数使用当前系统的区域设置。 + +示例: + +WEEK(CREATED) + +### YEAR + +```YEAR(dateAndTime)``` + +返回日期/时间值中的年份。 + +示例: + +YEAR(CREATED) + +### FROM_UNIXTIME + +```FROM_UNIXTIME (unixtime, formatString,timeZone)``` + +将从 UNIX 纪元(1970-01-01 00:00:00 UTC)开始的秒数转换为表示该时刻时间戳的字符串。 + +最重要的格式字符包括:y(年)、M(月)、d(日)、H(时)、m(分)、s(秒)。有关格式的详细信息,请参阅 `java.time.format.DateTimeFormatter`。 + +`timeZone` 是可选的,默认值为系统的时区。`timezone` 的值可以是一个 `UTC+ 时区偏移`,例如,`UTC+8` 表示亚洲/上海时区,请参阅 `java.time.ZoneId`。 + +该方法返回一个字符串。 + +示例: + +// 使用默认时区 + +CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss') + +or + +// 使用指定时区 + +CALL FROM_UNIXTIME(1672502400, 'yyyy-MM-dd HH:mm:ss','UTC+6') + +## System Functions + +### CAST + +```CAST(value as dataType)``` + +将一个值转换为另一个数据类型。 + +支持的数据类型有:STRING | VARCHAR,INT | INTEGER,LONG | BIGINT,BYTE,FLOAT,DOUBLE,DECIMAL(p,s),TIMESTAMP,DATE,TIME + +示例: + +CONVERT(NAME AS INT) + +### COALESCE + +```COALESCE(aValue, bValue [,...])``` + +返回第一个非空值。 + +示例: + +COALESCE(A, B, C) + +### IFNULL + +```IFNULL(aValue, bValue)``` + +返回第一个非空值。 + +示例: + +IFNULL(A, B) + +### NULLIF + +```NULLIF(aValue, bValue)``` + +如果 'a' 等于 'b',则返回 NULL,否则返回 'a'。 + +示例: + +NULLIF(A, B) + +### CASE WHEN + +``` +select + case + when c_string in ('c_string') then 1 + else 0 + end as c_string_1, + case + when c_string not in ('c_string') then 1 + else 0 + end as c_string_0, + case + when c_tinyint = 117 + and TO_CHAR(c_boolean) = 'true' then 1 + else 0 + end as c_tinyint_boolean_1, + case + when c_tinyint != 117 + and TO_CHAR(c_boolean) = 'true' then 1 + else 0 + end as c_tinyint_boolean_0, + case + when c_tinyint != 117 + or TO_CHAR(c_boolean) = 'true' then 1 + else 0 + end as c_tinyint_boolean_or_1, + case + when c_int > 1 + and c_bigint > 1 + and c_float > 1 + and c_double > 1 + and c_decimal > 1 then 1 + else 0 + end as c_number_1, + case + when c_tinyint <> 117 then 1 + else 0 + end as c_number_0 +from + fake +``` + +用于确定条件是否有效,并根据不同的判断返回不同的值 + +示例: + +case when c_string in ('c_string') then 1 else 0 end diff --git a/docs/zh/transform-v2/sql-udf.md b/docs/zh/transform-v2/sql-udf.md new file mode 100644 index 00000000000..4c1a3777408 --- /dev/null +++ b/docs/zh/transform-v2/sql-udf.md @@ -0,0 +1,133 @@ +# SQL用户定义函数 + +> SQL 转换插件的用户定义函数 (UDF) + +## 描述 + +使用UDF SPI扩展SQL转换函数库。 + +## UDF API + +```java +package org.apache.seatunnel.transform.sql.zeta; + +public interface ZetaUDF { + /** + * Function name + * + * @return function name + */ + String functionName(); + + /** + * The type of function result + * + * @param argsType input arguments type + * @return result type + */ + SeaTunnelDataType resultType(List> argsType); + + /** + * Evaluate + * + * @param args input arguments + * @return result value + */ + Object evaluate(List args); +} +``` + +## UDF 实现示例 + +将这些依赖项添加到您的 Maven 项目,并使用 provided 作用域。 + +```xml + + + + org.apache.seatunnel + seatunnel-transforms-v2 + 2.3.2 + provided + + + org.apache.seatunnel + seatunnel-api + 2.3.2 + provided + + + com.google.auto.service + auto-service + 1.0.1 + provided + + + +``` + +添加一个 Java 类来实现 ZetaUDF,类似于以下的方式: + +```java + +@AutoService(ZetaUDF.class) +public class ExampleUDF implements ZetaUDF { + @Override + public String functionName() { + return "EXAMPLE"; + } + + @Override + public SeaTunnelDataType resultType(List> argsType) { + return BasicType.STRING_TYPE; + } + + @Override + public Object evaluate(List args) { + String arg = (String) args.get(0); + if (arg == null) return null; + return "UDF: " + arg; + } +} +``` + +打包UDF项目并将jar文件复制到路径:${SEATUNNEL_HOME}/lib + +## 示例 + +源端数据读取的表格如下: + +| id | name | age | +|----|----------|-----| +| 1 | Joy Ding | 20 | +| 2 | May Ding | 21 | +| 3 | Kin Dom | 24 | +| 4 | Joy Dom | 22 | + +我们使用SQL查询中的UDF来转换源数据,类似于以下方式: + +``` +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "select id, example(name) as name, age from fake" + } +} +``` + +那么结果表 `fake1` 中的数据将会更新为 + +| id | name | age | +|----|---------------|-----| +| 1 | UDF: Joy Ding | 20 | +| 2 | UDF: May Ding | 21 | +| 3 | UDF: Kin Dom | 24 | +| 4 | UDF: Joy Dom | 22 | + +## 更新日志 + +### 新版本 + +- 添加SQL转换连接器的UDF + diff --git a/docs/zh/transform-v2/sql.md b/docs/zh/transform-v2/sql.md new file mode 100644 index 00000000000..ccbbc7f14cb --- /dev/null +++ b/docs/zh/transform-v2/sql.md @@ -0,0 +1,100 @@ +# SQL + +> SQL 转换插件 + +## 描述 + +使用 SQL 来转换给定的输入行。 + +SQL 转换使用内存中的 SQL 引擎,我们可以通过 SQL 函数和 SQL 引擎的能力来实现转换任务。 + +## 属性 + +| 名称 | 类型 | 是否必须 | 默认值 | +|-------------------|--------|------|-----| +| source_table_name | string | yes | - | +| result_table_name | string | yes | - | +| query | string | yes | - | + +### source_table_name [string] + +源表名称,查询 SQL 表名称必须与此字段匹配。 + +### query [string] + +查询 SQL,它是一个简单的 SQL,支持基本的函数和条件过滤操作。但是,复杂的 SQL 尚不支持,包括:多源表/行连接和聚合操作等。 + +## 示例 + +源端数据读取的表格如下: + +| id | name | age | +|----|----------|-----| +| 1 | Joy Ding | 20 | +| 2 | May Ding | 21 | +| 3 | Kin Dom | 24 | +| 4 | Joy Dom | 22 | + +我们使用 SQL 查询来转换源数据,类似这样: + +``` +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0" + } +} +``` + +那么结果表 `fake1` 中的数据将会更新为: + +| id | name | age | +|----|-----------|-----| +| 1 | Joy Ding_ | 21 | +| 2 | May Ding_ | 22 | +| 3 | Kin Dom_ | 25 | +| 4 | Joy Dom_ | 23 | + +## 作业配置示例 + +``` +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + age = "int" + } + } + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "select id, concat(name, '_') as name, age+1 as age from fake where id>0" + } +} + +sink { + Console { + source_table_name = "fake1" + } +} +``` + +## 更新日志 + +### 新版本 + +- 添加SQL转换连接器 + From 289254f50d18c6ddbea03e7794b74cbe923d49fa Mon Sep 17 00:00:00 2001 From: Jarvis Date: Wed, 6 Mar 2024 12:04:59 +0800 Subject: [PATCH 5/7] [Docs] update hive doc (#6437) - move `abort_drop_partition_metadata` to sink, source doesn't support this parameter, it's only available in sink --- docs/en/connector-v2/sink/Hive.md | 27 +++++++++++++++---------- docs/en/connector-v2/source/Hive.md | 31 ++++++++++++----------------- 2 files changed, 29 insertions(+), 29 deletions(-) diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md index 2ede5d07893..eec92b46b1b 100644 --- a/docs/en/connector-v2/sink/Hive.md +++ b/docs/en/connector-v2/sink/Hive.md @@ -30,17 +30,18 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | -|----------------------|--------|----------|----------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| compress_codec | string | no | none | -| hdfs_site_path | string | no | - | -| hive_site_path | string | no | - | -| krb5_path | string | no | /etc/krb5.conf | -| kerberos_principal | string | no | - | -| kerberos_keytab_path | string | no | - | -| common-options | | no | - | +| name | type | required | default value | +|-------------------------------|---------|----------|----------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| compress_codec | string | no | none | +| hdfs_site_path | string | no | - | +| hive_site_path | string | no | - | +| krb5_path | string | no | /etc/krb5.conf | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| abort_drop_partition_metadata | boolean | no | true | +| common-options | | no | - | ### table_name [string] @@ -70,6 +71,10 @@ The principal of kerberos The keytab path of kerberos +### abort_drop_partition_metadata [list] + +Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process). + ### common options Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index 14306ef953d..5d51a19f89c 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -33,20 +33,19 @@ Read all the data in a split in a pollNext call. What splits are read will be sa ## Options -| name | type | required | default value | -|-------------------------------|---------|----------|----------------| -| table_name | string | yes | - | -| metastore_uri | string | yes | - | -| krb5_path | string | no | /etc/krb5.conf | -| kerberos_principal | string | no | - | -| kerberos_keytab_path | string | no | - | -| hdfs_site_path | string | no | - | -| hive_site_path | string | no | - | -| read_partitions | list | no | - | -| read_columns | list | no | - | -| abort_drop_partition_metadata | boolean | no | true | -| compress_codec | string | no | none | -| common-options | | no | - | +| name | type | required | default value | +|----------------------|--------|----------|----------------| +| table_name | string | yes | - | +| metastore_uri | string | yes | - | +| krb5_path | string | no | /etc/krb5.conf | +| kerberos_principal | string | no | - | +| kerberos_keytab_path | string | no | - | +| hdfs_site_path | string | no | - | +| hive_site_path | string | no | - | +| read_partitions | list | no | - | +| read_columns | list | no | - | +| compress_codec | string | no | none | +| common-options | | no | - | ### table_name [string] @@ -87,10 +86,6 @@ The keytab file path of kerberos authentication The read column list of the data source, user can use it to implement field projection. -### abort_drop_partition_metadata [list] - -Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process). - ### compress_codec [string] The compress codec of files and the details that supported as the following shown: From a93de4cc3220ae62068ba675f130ebec4145873e Mon Sep 17 00:00:00 2001 From: Chengyu Yan Date: Wed, 6 Mar 2024 12:05:29 +0800 Subject: [PATCH 6/7] [Hotfix][Doc][Chinese] Fix invalid link about configure logging related parameters (#6442) --- docs/zh/faq.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/faq.md b/docs/zh/faq.md index 7451d81b9bf..8c836a36124 100644 --- a/docs/zh/faq.md +++ b/docs/zh/faq.md @@ -293,7 +293,7 @@ log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{ 参考: -https://stackoverflow.com/questions/27781187how-to-stop-info-messages-displaying-on-spark-console +https://stackoverflow.com/questions/27781187/how-to-stop-info-messages-displaying-on-spark-console http://spark.apache.org/docs/latest/configuration.html#configuring-logging From 6ec16ac46fe8b36c733556f100ac7bb7e4a7e9f4 Mon Sep 17 00:00:00 2001 From: Jarvis Date: Wed, 6 Mar 2024 12:05:59 +0800 Subject: [PATCH 7/7] [Fix][FakeSource] fix random from template not include the latest value issue (#6438) --- .../connectors/seatunnel/fake/utils/FakeDataRandomUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java index 2c4449d21f9..9a02eb50be2 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/utils/FakeDataRandomUtils.java @@ -37,7 +37,7 @@ public FakeDataRandomUtils(FakeConfig fakeConfig) { } private static T randomFromList(List list) { - int index = RandomUtils.nextInt(0, list.size() - 1); + int index = RandomUtils.nextInt(0, list.size()); return list.get(index); }