Skip to content

Commit

Permalink
Group MySQL and Postgres stream states
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <[email protected]>
  • Loading branch information
oeyh committed Jan 22, 2025
1 parent a3bec6f commit 00abf68
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 54 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;

public class MySqlStreamState {

@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;

public class PostgresStreamState {

@JsonProperty("currentLsn")
private String currentLsn;

@JsonProperty("replicationSlotName")
private String replicationSlotName;

public String getCurrentLsn() {
return currentLsn;
}

public void setCurrentLsn(String currentLsn) {
this.currentLsn = currentLsn;
}

public String getReplicationSlotName() {
return replicationSlotName;
}

public void setReplicationSlotName(String replicationSlotName) {
this.replicationSlotName = replicationSlotName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
package org.opensearch.dataprepper.plugins.source.rds.coordination.state;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyRelation;

import java.util.List;
import java.util.Map;

public class StreamProgressState {

// TODO: separate MySQL and Postgres properties into different progress state classes
// Common
@JsonProperty("engineType")
private String engineType;

Expand All @@ -28,19 +24,11 @@ public class StreamProgressState {
@JsonProperty("primaryKeyMap")
private Map<String, List<String>> primaryKeyMap;

// For MySQL
@JsonProperty("currentPosition")
private BinlogCoordinate currentPosition;
@JsonProperty("mySqlStreamState")
private MySqlStreamState mySqlStreamState;

@JsonProperty("foreignKeyRelations")
private List<ForeignKeyRelation> foreignKeyRelations;

// For Postgres
@JsonProperty("currentLsn")
private String currentLsn;

@JsonProperty("replicationSlotName")
private String replicationSlotName;
@JsonProperty("postgresStreamState")
private PostgresStreamState postgresStreamState;

public String getEngineType() {
return engineType;
Expand All @@ -50,14 +38,6 @@ public void setEngineType(String engineType) {
this.engineType = engineType;
}

public BinlogCoordinate getCurrentPosition() {
return currentPosition;
}

public String getCurrentLsn() {
return currentLsn;
}

public Map<String, List<String>> getPrimaryKeyMap() {
return primaryKeyMap;
}
Expand All @@ -66,18 +46,6 @@ public void setPrimaryKeyMap(Map<String, List<String>> primaryKeyMap) {
this.primaryKeyMap = primaryKeyMap;
}

public String getReplicationSlotName() {
return replicationSlotName;
}

public void setCurrentPosition(BinlogCoordinate currentPosition) {
this.currentPosition = currentPosition;
}

public void setReplicationSlotName(String replicationSlotName) {
this.replicationSlotName = replicationSlotName;
}

public boolean shouldWaitForExport() {
return waitForExport;
}
Expand All @@ -86,11 +54,19 @@ public void setWaitForExport(boolean waitForExport) {
this.waitForExport = waitForExport;
}

public List<ForeignKeyRelation> getForeignKeyRelations() {
return foreignKeyRelations;
public MySqlStreamState getMySqlStreamState() {
return mySqlStreamState;
}

public void setMySqlStreamState(MySqlStreamState mySqlStreamState) {
this.mySqlStreamState = mySqlStreamState;
}

public PostgresStreamState getPostgresStreamState() {
return postgresStreamState;
}

public void setForeignKeyRelations(List<ForeignKeyRelation> foreignKeyRelations) {
this.foreignKeyRelations = foreignKeyRelations;
public void setPostgresStreamState(PostgresStreamState postgresStreamState) {
this.postgresStreamState = postgresStreamState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ExportProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.LeaderProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;
import org.opensearch.dataprepper.plugins.source.rds.model.DbTableMetadata;
Expand Down Expand Up @@ -166,15 +167,16 @@ private void createStreamPartition(RdsSourceConfig sourceConfig) {
progressState.setWaitForExport(sourceConfig.isExportEnabled());
progressState.setPrimaryKeyMap(getPrimaryKeyMap());
if (sourceConfig.getEngine() == EngineType.MYSQL) {
getCurrentBinlogPosition().ifPresent(progressState::setCurrentPosition);
progressState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames()));
final MySqlStreamState mySqlStreamState = progressState.getMySqlStreamState();
getCurrentBinlogPosition().ifPresent(mySqlStreamState::setCurrentPosition);
mySqlStreamState.setForeignKeyRelations(((MySqlSchemaManager)schemaManager).getForeignKeyRelations(sourceConfig.getTableNames()));
} else {
// Postgres
// Create replication slot, which will mark the starting point for stream
final String publicationName = generatePublicationName();
final String slotName = generateReplicationSlotName();
((PostgresSchemaManager)schemaManager).createLogicalReplicationSlot(sourceConfig.getTableNames(), publicationName, slotName);
progressState.setReplicationSlotName(slotName);
progressState.getPostgresStreamState().setReplicationSlotName(slotName);
}
StreamPartition streamPartition = new StreamPartition(sourceConfig.getDbIdentifier(), progressState);
sourceCoordinator.createPartition(streamPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Map<String, ParentTable> getParentTableMap(StreamPartition streamPartitio
return parentTableMap;
}

List<ForeignKeyRelation> foreignKeyRelations = streamPartition.getProgressState().get().getForeignKeyRelations();;
List<ForeignKeyRelation> foreignKeyRelations = streamPartition.getProgressState().get().getMySqlStreamState().getForeignKeyRelations();;

for (ForeignKeyRelation foreignKeyRelation : foreignKeyRelations) {
if (!ForeignKeyRelation.containsCascadingAction(foreignKeyRelation)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private BinaryLogClient createBinaryLogClient() {
}

private LogicalReplicationClient createLogicalReplicationClient(StreamPartition streamPartition) {
final String replicationSlotName = streamPartition.getProgressState().get().getReplicationSlotName();
final String replicationSlotName = streamPartition.getProgressState().get().getPostgresStreamState().getReplicationSlotName();
if (replicationSlotName == null) {
throw new NoSuchElementException("Replication slot name is not found in progress state.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public StreamCheckpointer(final EnhancedSourceCoordinator sourceCoordinator,
public void checkpoint(final BinlogCoordinate binlogCoordinate) {
LOG.debug("Checkpointing stream partition {} with binlog coordinate {}", streamPartition.getPartitionKey(), binlogCoordinate);
Optional<StreamProgressState> progressState = streamPartition.getProgressState();
progressState.get().setCurrentPosition(binlogCoordinate);
progressState.get().getMySqlStreamState().setCurrentPosition(binlogCoordinate);
sourceCoordinator.saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
checkpointCounter.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private boolean isExportDone(StreamPartition streamPartition) {
}

private void setStartBinlogPosition(final StreamPartition streamPartition) {
final BinlogCoordinate startBinlogPosition = streamPartition.getProgressState().get().getCurrentPosition();
final BinlogCoordinate startBinlogPosition = streamPartition.getProgressState().get().getMySqlStreamState().getCurrentPosition();

// set start of binlog stream to current position if exists
if (startBinlogPosition != null) {
Expand All @@ -102,7 +102,7 @@ private void setStartBinlogPosition(final StreamPartition streamPartition) {
}

private void setStartLsn(final StreamPartition streamPartition) {
final String startLsn = streamPartition.getProgressState().get().getCurrentLsn();
final String startLsn = streamPartition.getProgressState().get().getPostgresStreamState().getCurrentLsn();

if (startLsn != null) {
LOG.debug("Will start logical replication from LSN {}", startLsn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.ResyncPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.ResyncProgressState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.ForeignKeyAction;
Expand Down Expand Up @@ -75,8 +76,10 @@ void testGetParentTableMap_returns_empty_list_if_stream_progress_state_is_empty(
@Test
void testGetParentTableMap_returns_only_foreign_relations_with_cascading_actions() {
final StreamProgressState progressState = mock(StreamProgressState.class);
final MySqlStreamState mySqlStreamState = mock(MySqlStreamState.class);
when(streamPartition.getProgressState()).thenReturn(Optional.of(progressState));
when(progressState.getForeignKeyRelations()).thenReturn(List.of(foreignKeyRelationWithCascading, foreignKeyRelationWithoutCascading));
when(progressState.getMySqlStreamState()).thenReturn(mySqlStreamState);
when(mySqlStreamState.getForeignKeyRelations()).thenReturn(List.of(foreignKeyRelationWithCascading, foreignKeyRelationWithoutCascading));

Map<String, ParentTable> actualParentTableMap = objectUnderTest.getParentTableMap(streamPartition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.dataprepper.plugins.source.rds.RdsSourceConfig;
import org.opensearch.dataprepper.plugins.source.rds.configuration.EngineType;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.PostgresStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.DbMetadata;
import software.amazon.awssdk.services.rds.RdsClient;
Expand Down Expand Up @@ -71,6 +72,7 @@ void test_create_logical_replication_client() {
final String username = UUID.randomUUID().toString();
final String password = UUID.randomUUID().toString();
final StreamProgressState streamProgressState = mock(StreamProgressState.class);
final PostgresStreamState postgresStreamState = mock(PostgresStreamState.class);
final String slotName = UUID.randomUUID().toString();
final List<String> tableNames = List.of("table1", "table2");

Expand All @@ -80,7 +82,8 @@ void test_create_logical_replication_client() {
when(sourceConfig.getAuthenticationConfig().getUsername()).thenReturn(username);
when(sourceConfig.getAuthenticationConfig().getPassword()).thenReturn(password);
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
when(streamProgressState.getReplicationSlotName()).thenReturn(slotName);
when(streamProgressState.getPostgresStreamState()).thenReturn(postgresStreamState);
when(postgresStreamState.getReplicationSlotName()).thenReturn(slotName);

replicationLogClientFactory = createObjectUnderTest();
ReplicationLogClient replicationLogClient = replicationLogClientFactory.create(streamPartition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;

Expand All @@ -35,6 +36,9 @@ class StreamCheckpointerTest {
@Mock
private StreamPartition streamPartition;

@Mock
private MySqlStreamState mySqlStreamState;

@Mock
private PluginMetrics pluginMetrics;

Expand All @@ -55,10 +59,11 @@ void test_checkpoint() {
final BinlogCoordinate binlogCoordinate = mock(BinlogCoordinate.class);
final StreamProgressState streamProgressState = mock(StreamProgressState.class);
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
when(streamProgressState.getMySqlStreamState()).thenReturn(mySqlStreamState);

streamCheckpointer.checkpoint(binlogCoordinate);

verify(streamProgressState).setCurrentPosition(binlogCoordinate);
verify(mySqlStreamState).setCurrentPosition(binlogCoordinate);
verify(sourceCoordinator).saveProgressStateForPartition(streamPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
verify(checkpointCounter).increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.rds.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.MySqlStreamState;
import org.opensearch.dataprepper.plugins.source.rds.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.rds.model.BinlogCoordinate;

Expand Down Expand Up @@ -54,10 +55,12 @@ void setUp() {
@Test
void test_processStream_with_given_binlog_coordinates() throws IOException {
final StreamProgressState streamProgressState = mock(StreamProgressState.class);
final MySqlStreamState mySqlStreamState = mock(MySqlStreamState.class);
final String binlogFilename = UUID.randomUUID().toString();
final long binlogPosition = 100L;
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
when(streamProgressState.getCurrentPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition));
when(streamProgressState.getMySqlStreamState()).thenReturn(mySqlStreamState);
when(mySqlStreamState.getCurrentPosition()).thenReturn(new BinlogCoordinate(binlogFilename, binlogPosition));
when(streamProgressState.shouldWaitForExport()).thenReturn(false);
when(binlogClientWrapper.getBinlogClient()).thenReturn(binaryLogClient);

Expand All @@ -70,11 +73,13 @@ void test_processStream_with_given_binlog_coordinates() throws IOException {

@Test
void test_processStream_without_current_binlog_coordinates() throws IOException {
StreamProgressState streamProgressState = mock(StreamProgressState.class);
final StreamProgressState streamProgressState = mock(StreamProgressState.class);
final MySqlStreamState mySqlStreamState = mock(MySqlStreamState.class);
when(streamPartition.getProgressState()).thenReturn(Optional.of(streamProgressState));
final String binlogFilename = "binlog-001";
final long binlogPosition = 100L;
when(streamProgressState.getCurrentPosition()).thenReturn(null);
when(streamProgressState.getMySqlStreamState()).thenReturn(mySqlStreamState);
when(mySqlStreamState.getCurrentPosition()).thenReturn(null);
when(streamProgressState.shouldWaitForExport()).thenReturn(false);

streamWorker.processStream(streamPartition);
Expand Down

0 comments on commit 00abf68

Please sign in to comment.