From 00abf6868b0466d66942229c782982224601d251 Mon Sep 17 00:00:00 2001 From: Hai Yan Date: Tue, 21 Jan 2025 23:56:00 -0600 Subject: [PATCH] Group MySQL and Postgres stream states Signed-off-by: Hai Yan --- .../coordination/state/MySqlStreamState.java | 41 ++++++++++++++ .../state/PostgresStreamState.java | 37 ++++++++++++ .../state/StreamProgressState.java | 56 ++++++------------- .../source/rds/leader/LeaderScheduler.java | 8 ++- .../rds/resync/CascadingActionDetector.java | 2 +- .../stream/ReplicationLogClientFactory.java | 2 +- .../source/rds/stream/StreamCheckpointer.java | 2 +- .../source/rds/stream/StreamWorker.java | 4 +- .../resync/CascadingActionDetectorTest.java | 5 +- .../ReplicationLogClientFactoryTest.java | 5 +- .../rds/stream/StreamCheckpointerTest.java | 7 ++- .../source/rds/stream/StreamWorkerTest.java | 11 +++- 12 files changed, 126 insertions(+), 54 deletions(-) create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/MySqlStreamState.java create mode 100644 data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/PostgresStreamState.java diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/MySqlStreamState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/MySqlStreamState.java new file mode 100644 index 0000000000..a42d45f073 --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/MySqlStreamState.java @@ -0,0 +1,41 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; +import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation; + +import java.util.List; + +public class MySqlStreamState { + + @JsonProperty("currentPosition") + private BinlogCoordinate currentPosition; + + @JsonProperty("foreignKeyRelations") + private List foreignKeyRelations; + + public BinlogCoordinate getCurrentPosition() { + return currentPosition; + } + + public void setCurrentPosition(BinlogCoordinate currentPosition) { + this.currentPosition = currentPosition; + } + + public List getForeignKeyRelations() { + return foreignKeyRelations; + } + + public void setForeignKeyRelations(List foreignKeyRelations) { + this.foreignKeyRelations = foreignKeyRelations; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/PostgresStreamState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/PostgresStreamState.java new file mode 100644 index 0000000000..011b1d37de --- /dev/null +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/PostgresStreamState.java @@ -0,0 +1,37 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.dataprepper.plugins.source.rds.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class PostgresStreamState { + + @JsonProperty("currentLsn") + private String currentLsn; + + @JsonProperty("replicationSlotName") + private String replicationSlotName; + + public String getCurrentLsn() { + return currentLsn; + } + + public void setCurrentLsn(String currentLsn) { + this.currentLsn = currentLsn; + } + + public String getReplicationSlotName() { + return replicationSlotName; + } + + public void setReplicationSlotName(String replicationSlotName) { + this.replicationSlotName = replicationSlotName; + } +} diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java index 80615bdebf..10423cb15b 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/coordination/state/StreamProgressState.java @@ -6,16 +6,12 @@ package org.opensearch.dataprepper.plugins.source.rds.coordination.state; import com.fasterxml.jackson.annotation.JsonProperty; -import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; -import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation; import java.util.List; import java.util.Map; public class StreamProgressState { - // TODO: separate MySQL and Postgres properties into different progress state classes - // Common @JsonProperty("engineType") private String engineType; @@ -28,19 +24,11 @@ public class StreamProgressState { @JsonProperty("primaryKeyMap") private Map> primaryKeyMap; - // For MySQL - @JsonProperty("currentPosition") - private BinlogCoordinate currentPosition; + @JsonProperty("mySqlStreamState") + private MySqlStreamState mySqlStreamState; - @JsonProperty("foreignKeyRelations") - private List foreignKeyRelations; - - // For Postgres - @JsonProperty("currentLsn") - private String currentLsn; - - @JsonProperty("replicationSlotName") - private String replicationSlotName; + @JsonProperty("postgresStreamState") + private PostgresStreamState postgresStreamState; public String getEngineType() { return engineType; @@ -50,14 +38,6 @@ public void setEngineType(String engineType) { this.engineType = engineType; } - public BinlogCoordinate getCurrentPosition() { - return currentPosition; - } - - public String getCurrentLsn() { - return currentLsn; - } - public Map> getPrimaryKeyMap() { return primaryKeyMap; } @@ -66,18 +46,6 @@ public void setPrimaryKeyMap(Map> primaryKeyMap) { this.primaryKeyMap = primaryKeyMap; } - public String getReplicationSlotName() { - return replicationSlotName; - } - - public void setCurrentPosition(BinlogCoordinate currentPosition) { - this.currentPosition = currentPosition; - } - - public void setReplicationSlotName(String replicationSlotName) { - this.replicationSlotName = replicationSlotName; - } - public boolean shouldWaitForExport() { return waitForExport; } @@ -86,11 +54,19 @@ public void setWaitForExport(boolean waitForExport) { this.waitForExport = waitForExport; } - public List getForeignKeyRelations() { - return foreignKeyRelations; + public MySqlStreamState getMySqlStreamState() { + return mySqlStreamState; + } + + public void setMySqlStreamState(MySqlStreamState mySqlStreamState) { + this.mySqlStreamState = mySqlStreamState; + } + + public PostgresStreamState getPostgresStreamState() { + return postgresStreamState; } - public void setForeignKeyRelations(List foreignKeyRelations) { - this.foreignKeyRelations = foreignKeyRelations; + public void setPostgresStreamState(PostgresStreamState postgresStreamState) { + this.postgresStreamState = postgresStreamState; } } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java index f3beb3e12c..31ac120ce7 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/leader/LeaderScheduler.java @@ -15,6 +15,7 @@ import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata; @@ -166,15 +167,16 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) { progressState.setWaitForExport(sourceConfig.isExportEnabled()); progressState.setPrimaryKeyMap(getPrimaryKeyMap()); if (sourceConfig.getEngine() == EngineType.MYSQL) { - getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition); - progressState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames())); + final MySqlStreamState mySqlStreamState = progressState.getMySqlStreamState(); + getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition); + mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames())); } else { // Postgres // Create replication slot, which will mark the starting point for stream final String publicationName = generatePublicationName(); final String slotName = generateReplicationSlotName(); ((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName); - progressState.setReplicationSlotName(slotName); + progressState.getPostgresStreamState().setReplicationSlotName(slotName); } StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState); sourceCoordinator.createPartition(streamPartition); diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetector.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetector.java index 98df985ccb..b8e5a21662 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetector.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetector.java @@ -50,7 +50,7 @@ public Map getParentTableMap(StreamPartition streamPartitio return parentTableMap; } - List foreignKeyRelations = streamPartition.getProgressState().get().getForeignKeyRelations();; + List foreignKeyRelations = streamPartition.getProgressState().get().getMySqlStreamState().getForeignKeyRelations();; for (ForeignKeyRelation foreignKeyRelation : foreignKeyRelations) { if (!ForeignKeyRelation.containsCascadingAction(foreignKeyRelation)) { diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java index d9bc54570a..183ce05299 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactory.java @@ -67,7 +67,7 @@ private BinaryLogClient createBinaryLogClient() { } private LogicalReplicationClient createLogicalReplicationClient(StreamPartition streamPartition) { - final String replicationSlotName = streamPartition.getProgressState().get().getReplicationSlotName(); + final String replicationSlotName = streamPartition.getProgressState().get().getPostgresStreamState().getReplicationSlotName(); if (replicationSlotName == null) { throw new NoSuchElementException("Replication slot name is not found in progress state."); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java index b76dbab7c9..1f60f9715f 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointer.java @@ -41,7 +41,7 @@ public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator, public void checkpoint(final BinlogCoordinate binlogCoordinate) { LOG.debug("Checkpointing stream partition {} with binlog coordinate {}", streamPartition.getPartitionKey(), binlogCoordinate); Optional progressState = streamPartition.getProgressState(); - progressState.get().setCurrentPosition(binlogCoordinate); + progressState.get().getMySqlStreamState().setCurrentPosition(binlogCoordinate); sourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); checkpointCounter.increment(); } diff --git a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java index d6404a42ef..4da8798c90 100644 --- a/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java +++ b/data-prepper-plugins/rds-source/src/main/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorker.java @@ -88,7 +88,7 @@ private boolean isExportDone(StreamPartition streamPartition) { } private void setStartBinlogPosition(final StreamPartition streamPartition) { - final BinlogCoordinate startBinlogPosition = streamPartition.getProgressState().get().getCurrentPosition(); + final BinlogCoordinate startBinlogPosition = streamPartition.getProgressState().get().getMySqlStreamState().getCurrentPosition(); // set start of binlog stream to current position if exists if (startBinlogPosition != null) { @@ -102,7 +102,7 @@ private void setStartBinlogPosition(final StreamPartition streamPartition) { } private void setStartLsn(final StreamPartition streamPartition) { - final String startLsn = streamPartition.getProgressState().get().getCurrentLsn(); + final String startLsn = streamPartition.getProgressState().get().getPostgresStreamState().getCurrentLsn(); if (startLsn != null) { LOG.debug("Will start logical replication from LSN {}", startLsn); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetectorTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetectorTest.java index c17e003782..1bac04249d 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetectorTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/resync/CascadingActionDetectorTest.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyAction; @@ -75,8 +76,10 @@ void testGetParentTableMap_returns_empty_list_if_stream_progress_state_is_empty( @Test void testGetParentTableMap_returns_only_foreign_relations_with_cascading_actions() { final StreamProgressState progressState = mock(StreamProgressState.class); + final MySqlStreamState mySqlStreamState = mock(MySqlStreamState.class); when(streamPartition.getProgressState()).thenReturn(Optional.of(progressState)); - when(progressState.getForeignKeyRelations()).thenReturn(List.of(foreignKeyRelationWithCascading, foreignKeyRelationWithoutCascading)); + when(progressState.getMySqlStreamState()).thenReturn(mySqlStreamState); + when(mySqlStreamState.getForeignKeyRelations()).thenReturn(List.of(foreignKeyRelationWithCascading, foreignKeyRelationWithoutCascading)); Map actualParentTableMap = objectUnderTest.getParentTableMap(streamPartition); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java index 43978eaea4..83dafe9887 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/ReplicationLogClientFactoryTest.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig; import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata; import software.amazon.awssdk.services.rds.RdsClient; @@ -71,6 +72,7 @@ void test_create_logical_replication_client() { final String username = UUID.randomUUID().toString(); final String password = UUID.randomUUID().toString(); final StreamProgressState streamProgressState = mock(StreamProgressState.class); + final PostgresStreamState postgresStreamState = mock(PostgresStreamState.class); final String slotName = UUID.randomUUID().toString(); final List tableNames = List.of("table1", "table2"); @@ -80,7 +82,8 @@ void test_create_logical_replication_client() { when(sourceConfig.getAuthenticationConfig().getUsername()).thenReturn(username); when(sourceConfig.getAuthenticationConfig().getPassword()).thenReturn(password); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); - when(streamProgressState.getReplicationSlotName()).thenReturn(slotName); + when(streamProgressState.getPostgresStreamState()).thenReturn(postgresStreamState); + when(postgresStreamState.getReplicationSlotName()).thenReturn(slotName); replicationLogClientFactory = createObjectUnderTest(); ReplicationLogClient replicationLogClient = replicationLogClientFactory.create(streamPartition); diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java index 2fdac1065f..3327e847f5 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamCheckpointerTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; @@ -35,6 +36,9 @@ class StreamCheckpointerTest { @Mock private StreamPartition streamPartition; + @Mock + private MySqlStreamState mySqlStreamState; + @Mock private PluginMetrics pluginMetrics; @@ -55,10 +59,11 @@ void test_checkpoint() { final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class); final StreamProgressState streamProgressState = mock(StreamProgressState.class); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); + when(streamProgressState.getMySqlStreamState()).thenReturn(mySqlStreamState); streamCheckpointer.checkpoint(binlogCoordinate); - verify(streamProgressState).setCurrentPosition(binlogCoordinate); + verify(mySqlStreamState).setCurrentPosition(binlogCoordinate); verify(sourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE); verify(checkpointCounter).increment(); } diff --git a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java index 1eaf719cf5..ede13a40ec 100644 --- a/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java +++ b/data-prepper-plugins/rds-source/src/test/java/org/opensearch/dataprepper/plugins/source/rds/stream/StreamWorkerTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition; +import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState; import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate; @@ -54,10 +55,12 @@ void setUp() { @Test void test_processStream_with_given_binlog_coordinates() throws IOException { final StreamProgressState streamProgressState = mock(StreamProgressState.class); + final MySqlStreamState mySqlStreamState = mock(MySqlStreamState.class); final String binlogFilename = UUID.randomUUID().toString(); final long binlogPosition = 100L; when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); - when(streamProgressState.getCurrentPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition)); + when(streamProgressState.getMySqlStreamState()).thenReturn(mySqlStreamState); + when(mySqlStreamState.getCurrentPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition)); when(streamProgressState.shouldWaitForExport()).thenReturn(false); when(binlogClientWrapper.getBinlogClient()).thenReturn(binaryLogClient); @@ -70,11 +73,13 @@ void test_processStream_with_given_binlog_coordinates() throws IOException { @Test void test_processStream_without_current_binlog_coordinates() throws IOException { - StreamProgressState streamProgressState = mock(StreamProgressState.class); + final StreamProgressState streamProgressState = mock(StreamProgressState.class); + final MySqlStreamState mySqlStreamState = mock(MySqlStreamState.class); when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState)); final String binlogFilename = "binlog-001"; final long binlogPosition = 100L; - when(streamProgressState.getCurrentPosition()).thenReturn(null); + when(streamProgressState.getMySqlStreamState()).thenReturn(mySqlStreamState); + when(mySqlStreamState.getCurrentPosition()).thenReturn(null); when(streamProgressState.shouldWaitForExport()).thenReturn(false); streamWorker.processStream(streamPartition);