From 101493f3a37c7b6ca2ff882fec090589ea413e21 Mon Sep 17 00:00:00 2001 From: litiliu Date: Fri, 17 Jan 2025 11:29:32 +0800 Subject: [PATCH 1/4] fix npe --- .../source/reader/snapshot/PostgresSnapshotSplitReadTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java index 43ba38eb855..267fe05a24a 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java @@ -166,7 +166,7 @@ private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId t EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); log.debug("Snapshotting table {}", tableId); - TableId newTableId = new TableId(null, tableId.schema(), tableId.table()); + TableId newTableId = new TableId(tableId.catalog(), tableId.schema(), tableId.table()); createDataEventsForTable( snapshotContext, snapshotReceiver, databaseSchema.tableFor(newTableId)); snapshotReceiver.completeSnapshot(); From 5577a2fe455d2a7552069f0bf01576d0fdc3e2e6 Mon Sep 17 00:00:00 2001 From: litiliu Date: Fri, 17 Jan 2025 13:43:49 +0800 Subject: [PATCH 2/4] fix npe --- .../source/reader/snapshot/PostgresSnapshotSplitReadTask.java | 3 +-- .../connectors/seatunnel/cdc/postgres/PostgresCDCIT.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java index 267fe05a24a..208758afe23 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java @@ -166,9 +166,8 @@ private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId t EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); log.debug("Snapshotting table {}", tableId); - TableId newTableId = new TableId(tableId.catalog(), tableId.schema(), tableId.table()); createDataEventsForTable( - snapshotContext, snapshotReceiver, databaseSchema.tableFor(newTableId)); + snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId)); snapshotReceiver.completeSnapshot(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index d171d104056..06f1a2a67c0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -407,8 +407,8 @@ public void testMultiTableWithRestore(TestContainer container) log.info("****************** container logs start ******************"); String containerLogs = container.getServerLogs(); log.info(containerLogs); - // pg cdc logs contain ERROR - // Assertions.assertFalse(containerLogs.contains("ERROR")); + // pg cdc logs contain NullPointerException + Assertions.assertFalse(containerLogs.contains("NullPointerException")); log.info("****************** container logs end ******************"); } finally { // Clear related content to ensure that multiple operations are not affected From 647206d2ce57738a1e8fad9909ea961549c1b2f7 Mon Sep 17 00:00:00 2001 From: litiliu Date: Mon, 20 Jan 2025 10:31:03 +0800 Subject: [PATCH 3/4] fix npe --- .../source/reader/snapshot/PostgresSnapshotSplitReadTask.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java index 208758afe23..267fe05a24a 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java @@ -166,8 +166,9 @@ private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId t EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); log.debug("Snapshotting table {}", tableId); + TableId newTableId = new TableId(tableId.catalog(), tableId.schema(), tableId.table()); createDataEventsForTable( - snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId)); + snapshotContext, snapshotReceiver, databaseSchema.tableFor(newTableId)); snapshotReceiver.completeSnapshot(); } From ccf7b521874d6692b8b67811be43a545a53a7392 Mon Sep 17 00:00:00 2001 From: litiliu Date: Wed, 22 Jan 2025 16:57:23 +0800 Subject: [PATCH 4/4] add debug log --- .../reader/snapshot/PostgresSnapshotSplitReadTask.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java index 267fe05a24a..4864c53ca82 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java @@ -165,8 +165,13 @@ private void createDataEvents(PostgresSnapshotContext snapshotContext, TableId t throws Exception { EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); - log.debug("Snapshotting table {}", tableId); TableId newTableId = new TableId(tableId.catalog(), tableId.schema(), tableId.table()); + log.info("Snapshotting table {}", newTableId); + databaseSchema.tableIds().stream() + .forEach( + tid -> { + log.info("table id: {} in databaseSchema", tid); + }); createDataEventsForTable( snapshotContext, snapshotReceiver, databaseSchema.tableFor(newTableId)); snapshotReceiver.completeSnapshot();