Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8053 Handle empty shards #202

Merged
merged 5 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 4 additions & 39 deletions src/main/java/io/debezium/connector/vitess/VitessConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import io.debezium.relational.TableId;
import io.debezium.util.Strings;
import io.grpc.StatusRuntimeException;
import io.vitess.proto.Query;
import io.vitess.proto.Vtgate;

/** Vitess Connector entry point */
public class VitessConnector extends RelationalBaseSourceConnector {
Expand All @@ -43,12 +41,14 @@ public class VitessConnector extends RelationalBaseSourceConnector {

private Map<String, String> properties;
private VitessConnectorConfig connectorConfig;
private VitessMetadata vitessMetadata;

@Override
public void start(Map<String, String> props) {
LOGGER.info("Starting Vitess Connector");
this.properties = Collections.unmodifiableMap(props);
this.connectorConfig = new VitessConnectorConfig(Configuration.from(properties));
this.vitessMetadata = new VitessMetadata(connectorConfig);

}

Expand Down Expand Up @@ -115,7 +115,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
if (connectorConfig.offsetStoragePerTask()) {
shards = connectorConfig.getShard();
if (shards == null) {
shards = getVitessShards(connectorConfig);
shards = vitessMetadata.getShards();
}
}
return taskConfigs(maxTasks, shards);
Expand Down Expand Up @@ -279,22 +279,6 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
}
}

private static List<String> getRowsFromQuery(VitessConnectorConfig connectionConfig, String query) {
try (VitessReplicationConnection connection = new VitessReplicationConnection(connectionConfig, null)) {
Vtgate.ExecuteResponse response = connection.execute(query);
LOGGER.info("Got response: {} for query: {}", response, query);
assert response != null && !response.hasError() && response.hasResult()
: String.format("Error response: %s", response);
Query.QueryResult result = response.getResult();
List<Query.Row> rows = result.getRowsList();
assert !rows.isEmpty() : String.format("Empty response: %s", response);
return rows.stream().map(s -> s.getValues().toStringUtf8()).collect(Collectors.toList());
}
catch (Exception e) {
throw new RuntimeException(String.format("Unexpected error while running query: %s", query), e);
}
}

public static List<String> getIncludedTables(String keyspace, String tableIncludeList, List<String> allTables) {
// table.include.list are list of patterns, filter all the tables in the keyspace through those patterns
// to get the list of table names.
Expand All @@ -311,25 +295,6 @@ public static List<String> getIncludedTables(String keyspace, String tableInclud
return includedTables;
}

public static List<String> getKeyspaceTables(VitessConnectorConfig connectionConfig) {
String query = String.format("SHOW TABLES FROM %s", connectionConfig.getKeyspace());
List<String> tables = getRowsFromQuery(connectionConfig, query);
LOGGER.info("All tables from keyspace {} are: {}", connectionConfig.getKeyspace(), tables);
return tables;
}

public static List<String> getVitessShards(VitessConnectorConfig connectionConfig) {
String query = String.format("SHOW VITESS_SHARDS LIKE '%s/%%'", connectionConfig.getKeyspace());
List<String> rows = getRowsFromQuery(connectionConfig, query);
List<String> shards = rows.stream().map(fieldValue -> {
String[] parts = fieldValue.split("/");
assert parts != null && parts.length == 2 : String.format("Wrong field format: %s", fieldValue);
return parts[1];
}).collect(Collectors.toList());
LOGGER.info("Shards: {}", shards);
return shards;
}

@Override
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
LOGGER.info("Validating config: {}", config);
Expand Down Expand Up @@ -370,7 +335,7 @@ protected Map<String, ConfigValue> validateAllFields(Configuration config) {
public List<TableId> getMatchingCollections(Configuration configuration) {
VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(configuration);
String keyspace = vitessConnectorConfig.getKeyspace();
List<String> allTables = getKeyspaceTables(vitessConnectorConfig);
List<String> allTables = vitessMetadata.getTables();
List<String> includedTables = getIncludedTables(keyspace,
vitessConnectorConfig.tableIncludeList(), allTables);
return includedTables.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ " If not configured, the connector streams changes from the latest position for the given shard(s)."
+ " If snapshot.mode is INITIAL (default), the connector starts copying the tables for the given shard(s) first regardless of gtid value.");

public static final Field EXCLUDE_EMPTY_SHARDS = Field.create(VITESS_CONFIG_GROUP_PREFIX + "exclude.empty.shards")
.withDisplayName("exclude.empty.shards")
.withType(Type.BOOLEAN)
.withWidth(Width.SHORT)
.withDefault(false)
.withImportance(ConfigDef.Importance.LOW)
.withDescription("Auto-detects and excludes empty shards from queries & shard lists used for VStreams");

public static final Field TABLET_TYPE = Field.create(VITESS_CONFIG_GROUP_PREFIX + "tablet.type")
.withDisplayName("Tablet type to get data-changes")
.withType(Type.STRING)
Expand Down Expand Up @@ -452,7 +460,8 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
PREV_NUM_TASKS)
PREV_NUM_TASKS,
EXCLUDE_EMPTY_SHARDS)
.events(
INCLUDE_UNKNOWN_DATATYPES,
SOURCE_INFO_STRUCT_MAKER)
Expand Down Expand Up @@ -544,6 +553,10 @@ public String getVgtid() {
return (value != null && !VGTID.defaultValueAsString().equals(value)) ? value : Vgtid.CURRENT_GTID;
}

public boolean excludeEmptyShards() {
return getConfig().getBoolean(EXCLUDE_EMPTY_SHARDS);
}

private static int validateVgtids(Configuration config, Field field, ValidationOutput problems) {
// Get the GTID as a string so that the default value is used if GTID is not set
String vgtidString = config.getString(field);
Expand Down
201 changes: 201 additions & 0 deletions src/main/java/io/debezium/connector/vitess/VitessMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess;

import static java.lang.Math.toIntExact;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.ByteString;

import io.debezium.annotation.VisibleForTesting;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.vitess.proto.Query;
import io.vitess.proto.Vtgate;

/**
* Class for getting metadata on Vitess, e.g., tables, shards. Supports shard-specific queries.
*/
public class VitessMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to convert this class to be no-static? A lot of methods uses VitessConnectorConfig so maybe an instance can be created that will receive the config in constructor.


private static final Logger LOGGER = LoggerFactory.getLogger(VitessMetadata.class);
private VitessConnectorConfig config;

public VitessMetadata(VitessConnectorConfig config) {
this.config = config;
}

public List<String> getShards() {
List<String> shards;
if (config.excludeEmptyShards()) {
LOGGER.info("Excluding empty shards");
shards = getVitessShardsFromTablets();
}
else {
shards = getVitessShards();
}
LOGGER.info("Shards: {}", shards);
return shards;
}

public List<String> getTables() {
Vtgate.ExecuteResponse response;
String query;
if (config.excludeEmptyShards()) {
query = "SHOW TABLES";
List<String> shardsToQuery;
if (config.getShard() != null && !config.getShard().isEmpty()) {
LOGGER.info("Getting tables from one of the configured shards");
shardsToQuery = config.getShard();
}
else {
LOGGER.info("Getting tables from a non-empty shard");
shardsToQuery = getVitessShardsFromTablets();
}
String randomShard = shardsToQuery.get(new Random().nextInt(shardsToQuery.size()));
LOGGER.info("Get tables from shard: {}", randomShard);
response = executeQuery(query, randomShard);
}
else {
query = String.format("SHOW TABLES FROM %s", config.getKeyspace());
response = executeQuery(query);
}
logResponse(response, query);
List<String> tables = getFlattenedRowsFromResponse(response);
LOGGER.info("All tables from keyspace {} are: {}", config.getKeyspace(), tables);
return tables;
}

private static void logResponse(Vtgate.ExecuteResponse response, String query) {
LOGGER.debug("Got response: {} for query: {}", response, query);
}

private List<String> getVitessShards() {
String query = String.format("SHOW VITESS_SHARDS LIKE '%s/%%'", config.getKeyspace());
Vtgate.ExecuteResponse response = executeQuery(query);
logResponse(response, query);
List<String> rows = getFlattenedRowsFromResponse(response);
List<String> shards = rows.stream().map(fieldValue -> {
String[] parts = fieldValue.split("/");
assert parts != null && parts.length == 2 : String.format("Wrong field format: %s", fieldValue);
return parts[1];
}).collect(Collectors.toList());
return shards;
}

private List<String> getVitessShardsFromTablets() {
String query = "SHOW VITESS_TABLETS";
Vtgate.ExecuteResponse response = executeQuery(query);
// Do not log the response since there is no way to filter tablets: it includes all tablets of all shards of all keyspaces
List<List<String>> rowValues = getRowsFromResponse(response);
List<String> shards = VitessMetadata.getNonEmptyShards(rowValues, config.getKeyspace());
return shards;
}

private Vtgate.ExecuteResponse executeQuery(String query) {
return executeQuery(query, null);
}

@VisibleForTesting
protected Vtgate.ExecuteResponse executeQuery(String query, String shard) {
// Some tests need to be issue a shard-specific query, so make this visible
try (VitessReplicationConnection connection = new VitessReplicationConnection(config, null)) {
Vtgate.ExecuteResponse response;
if (shard != null) {
response = connection.execute(query, shard);
}
else {
response = connection.execute(query);
}
return response;
}
catch (Exception e) {
throw new RuntimeException(String.format("Unexpected error while running query: %s", query), e);
}
}

private static List<String> getFlattenedRowsFromResponse(Vtgate.ExecuteResponse response) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to make the rest of the methods non-static too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible, but I don't think we want to, since they don't rely on any instance variables. Static clearly conveys these methods do not rely on object state.

validateResponse(response);
Query.QueryResult result = response.getResult();
validateResult(result);
List<List<String>> rows = parseRows(result.getRowsList());
return flattenAndConcat(rows);
}

private static List<List<String>> getRowsFromResponse(Vtgate.ExecuteResponse response) {
validateResponse(response);
Query.QueryResult result = response.getResult();
validateResult(result);
return parseRows(result.getRowsList());
}

private static void validateResponse(Vtgate.ExecuteResponse response) {
assert response != null && !response.hasError() && response.hasResult()
: String.format("Error response: %s", response);
}

private static void validateResult(Query.QueryResult result) {
List<Query.Row> rows = result.getRowsList();
assert !rows.isEmpty() : String.format("Empty response: %s", result);
}

@VisibleForTesting
protected static List<List<String>> parseRows(List<Query.Row> rows) {
List<List<String>> allRowValues = new ArrayList<>();
for (Query.Row row : rows) {
List<String> currentRowValues = new ArrayList();
List<Integer> lengths = row.getLengthsList().stream().map(x -> toIntExact(x)).collect(Collectors.toList());
ByteString values = row.getValues();

int offset = 0;
for (int length : lengths) {
if (length == -1) {
currentRowValues.add(null); // Handle NULL values
}
else {
String value = values.substring(offset, offset + length).toStringUtf8();
currentRowValues.add(value);
offset += length;
}
}
allRowValues.add(currentRowValues);
}
return allRowValues;
}

@VisibleForTesting
protected static List<String> getNonEmptyShards(List<List<String>> vitessTabletRows, String keyspace) {
Set<String> shardSet = new HashSet<>();

for (List<String> row : vitessTabletRows) {
if (row.size() < 3) {
continue; // skip rows with insufficient data
}
String rowKeyspace = row.get(1);
if (rowKeyspace.equals(keyspace)) {
shardSet.add(row.get(2)); // add the shard value
}
}

return shardSet.stream().sorted().collect(Collectors.toList());
}

@VisibleForTesting
protected static List<String> flattenAndConcat(List<List<String>> nestedList) {
return nestedList.stream()
.map(innerList -> String.join("", innerList))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.debezium.connector.vitess.VitessConnector;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.connector.vitess.VitessMetadata;
import io.debezium.util.Strings;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
Expand Down Expand Up @@ -65,12 +66,27 @@ public VitessReplicationConnection(VitessConnectorConfig config, VitessDatabaseS
* @throws StatusRuntimeException if the connection is not valid, or SQL statement can not be successfully exected
*/
public Vtgate.ExecuteResponse execute(String sqlStatement) {
LOGGER.debug("Executing sqlStament {}", sqlStatement);
ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize());
managedChannel.compareAndSet(null, channel);

Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(sqlStatement, Collections.emptyMap()))
.build();
return newBlockingStub(channel).execute(request);
}

public Vtgate.ExecuteResponse execute(String sqlStatement, String shard) {
LOGGER.info("Executing sqlStament {}", sqlStatement);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOGGER.info("Executing sqlStament {}", sqlStatement);
LOGGER.debug("Executing sqlStament {}", sqlStatement);

ManagedChannel channel = newChannel(config.getVtgateHost(), config.getVtgatePort(), config.getGrpcMaxInboundMessageSize());
managedChannel.compareAndSet(null, channel);

String target = String.format("%s:%s@%s", config.getKeyspace(), shard, config.getTabletType());
Vtgate.Session session = Vtgate.Session.newBuilder().setTargetString(target).setAutocommit(true).build();
LOGGER.debug("Autocommit {}", session.getAutocommit());
Vtgate.ExecuteRequest request = Vtgate.ExecuteRequest.newBuilder()
.setQuery(Proto.bindQuery(sqlStatement, Collections.emptyMap()))
.setSession(session)
.build();
return newBlockingStub(channel).execute(request);
}
Expand Down Expand Up @@ -277,7 +293,7 @@ private void setError(String msg) {
Binlogdata.Filter.Builder filterBuilder = Binlogdata.Filter.newBuilder();
if (!Strings.isNullOrEmpty(config.tableIncludeList())) {
final String keyspace = config.getKeyspace();
final List<String> allTables = VitessConnector.getKeyspaceTables(config);
final List<String> allTables = new VitessMetadata(config).getTables();
List<String> includedTables = VitessConnector.getIncludedTables(config.getKeyspace(),
config.tableIncludeList(), allTables);
for (String table : includedTables) {
Expand Down Expand Up @@ -398,7 +414,7 @@ public static Vgtid defaultVgtid(VitessConnectorConfig config) {
if (config.getShard() == null || config.getShard().isEmpty()) {
// This case is not supported by the Vitess, so our workaround is to get all the shards from vtgate.
if (config.getVgtid() == Vgtid.EMPTY_GTID) {
List<String> shards = VitessConnector.getVitessShards(config);
List<String> shards = new VitessMetadata(config).getShards();
List<String> gtids = Collections.nCopies(shards.size(), config.getVgtid());
vgtid = buildVgtid(config.getKeyspace(), shards, gtids);
}
Expand Down
Loading