From eb50c6ea91e3697292d42906d4c6f142f0c86d36 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Sat, 25 Jan 2025 16:05:46 +0800 Subject: [PATCH] [FLINK-37224] Add the missing documents and parameters of MongoDB CDC. --- .../connectors/flink-sources/mongodb-cdc.md | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md index 636c229f1ad..9491de62212 100644 --- a/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md +++ b/docs/content.zh/docs/connectors/flink-sources/mongodb-cdc.md @@ -488,6 +488,62 @@ public class MongoDBIncrementalSourceExample { - 如果使用数据库正则表达式,则需要 `readAnyDatabase` 角色。 - 增量快照功能仅支持 MongoDB 4.0 之后的版本。 +### 完整的 Changelog + +MongoDB 6.0 以及更高的版本支持发送变更流事件,其中包含文档的更新前和更新后的内容(或者说数据的前后镜像)。 + +- 前镜像是指被替换、更新或删除之前的文档。对于插入操作没有前镜像。 + +- 后镜像是指被替换、更新或删除之后的文档。对于删除操作没有后镜像。 + +MongoDB CDC 能够使用前镜像和后镜像来生成完整的变更日志流,包括插入、更新前、更新后和删除的数据行,从而避免了额外的 `ChangelogNormalize` 下游节点。 + +为了启用此功能,你需要满足以下条件: + +- MongoDB 的版本必须为 6.0 或更高版本。 +- 启用 `preAndPostImages` 功能。 + +```javascript +db.runCommand({ + setClusterParameter: { + changeStreamOptions: { + preAndPostImages: { + expireAfterSeconds: 'off' // replace with custom image expiration time + } + } + } +}) +``` + +- 为希望监控的 collection 启用 `changeStreamPreAndPostImages` 功能: +```javascript +db.runCommand({ + collMod: "<< collection name >>", + changeStreamPreAndPostImages: { + enabled: true + } +}) +``` + +在 DataStream 中开启 MongoDB CDC 的 `scan.full-changelog` 功能: + +```java +MongoDBSource.builder() + .scanFullChangelog(true) + ... + .build() +``` + +或者使用 Flink SQL: + +```SQL +CREATE TABLE mongodb_source (...) WITH ( + 'connector' = 'mongodb-cdc', + 'scan.full-changelog' = 'true', + ... +) +``` + 数据类型映射 ---------------- [BSON](https://docs.mongodb.com/manual/reference/bson-types/) **二进制 JSON**的缩写是一种类似 JSON 格式的二进制编码序列,用于在 MongoDB 中存储文档和进行远程过程调用。