Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37224] Add the missing documents and parameters of MongoDB CDC. #3895

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,62 @@ public class MongoDBIncrementalSourceExample {
- 如果使用数据库正则表达式,则需要 `readAnyDatabase` 角色。
- 增量快照功能仅支持 MongoDB 4.0 之后的版本。

### 完整的 Changelog

MongoDB 6.0 以及更高的版本支持发送变更流事件,其中包含文档的更新前和更新后的内容(或者说数据的前后镜像)。

- 前镜像是指被替换、更新或删除之前的文档。对于插入操作没有前镜像。

- 后镜像是指被替换、更新或删除之后的文档。对于删除操作没有后镜像。

MongoDB CDC 能够使用前镜像和后镜像来生成完整的变更日志流,包括插入、更新前、更新后和删除的数据行,从而避免了额外的 `ChangelogNormalize` 下游节点。

为了启用此功能,你需要满足以下条件:

- MongoDB 的版本必须为 6.0 或更高版本。
- 启用 `preAndPostImages` 功能。

```javascript
db.runCommand({
setClusterParameter: {
changeStreamOptions: {
preAndPostImages: {
expireAfterSeconds: 'off' // replace with custom image expiration time
}
}
}
})
```

- 为希望监控的 collection 启用 `changeStreamPreAndPostImages` 功能:
```javascript
db.runCommand({
collMod: "<< collection name >>",
changeStreamPreAndPostImages: {
enabled: true
}
})
```

在 DataStream 中开启 MongoDB CDC 的 `scan.full-changelog` 功能:

```java
MongoDBSource.builder()
.scanFullChangelog(true)
...
.build()
```

或者使用 Flink SQL:

```SQL
CREATE TABLE mongodb_source (...) WITH (
'connector' = 'mongodb-cdc',
'scan.full-changelog' = 'true',
...
)
```

数据类型映射
----------------
[BSON](https://docs.mongodb.com/manual/reference/bson-types/) **二进制 JSON**的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。
Expand Down
Loading