Skip to content

Commit

Permalink
Merge branch 'feat/data-loader/tablemetadata-service' into feat/data-…
Browse files Browse the repository at this point in the history
…loader/scalardb-connection
  • Loading branch information
ypeckstadt committed May 26, 2024
2 parents 630cf95 + cee97dc commit 7702a21
Show file tree
Hide file tree
Showing 6 changed files with 446 additions and 1 deletion.
2 changes: 1 addition & 1 deletion data-loader/core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ archivesBaseName = "scalardb-data-loader-core"

dependencies {
// ScalarDB core

implementation project(':core')

// for SpotBugs
compileOnly "com.github.spotbugs:spotbugs-annotations:${spotbugsVersion}"
testCompileOnly "com.github.spotbugs:spotbugs-annotations:${spotbugsVersion}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.scalar.db.dataloader.core.tablemetadata;

/** Represents the request for metadata for a single ScalarDB table */
public class TableMetadataRequest {

private final String namespace;
private final String tableName;

/**
* Class constructor
*
* @param namespace ScalarDB namespace
* @param tableName ScalarDB table name
*/
public TableMetadataRequest(String namespace, String tableName) {
this.namespace = namespace;
this.tableName = tableName;
}

public String getNamespace() {
return namespace;
}

public String getTableName() {
return tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@

import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.error.CoreError;
import com.scalar.db.dataloader.core.util.TableMetadataUtils;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.service.StorageFactory;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* A service class that provides methods to get TableMetadata for a given namespace and table name.
*/
public class TableMetadataService {

private final StorageFactory storageFactory;
Expand Down Expand Up @@ -41,4 +48,26 @@ public TableMetadata getTableMetadata(String namespace, String tableName)
CoreError.DATA_LOADER_MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName));
}
}

/**
* Returns the TableMetadata for the given list of TableMetadataRequest.
*
* @param requests List of TableMetadataRequest
* @return Map of TableMetadata
* @throws TableMetadataException if the namespace or table is missing
*/
public Map<String, TableMetadata> getTableMetadata(Collection<TableMetadataRequest> requests)
throws TableMetadataException {
Map<String, TableMetadata> metadataMap = new HashMap<>();

for (TableMetadataRequest request : requests) {
String namespace = request.getNamespace();
String tableName = request.getTableName();
TableMetadata tableMetadata = getTableMetadata(namespace, tableName);
String key = TableMetadataUtils.getTableLookupKey(namespace, tableName);
metadataMap.put(key, tableMetadata);
}

return metadataMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package com.scalar.db.dataloader.core.util;

import com.scalar.db.api.TableMetadata;
import com.scalar.db.io.DataType;
import com.scalar.db.transaction.consensuscommit.Attribute;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** Utils for ScalarDB table metadata */
public final class TableMetadataUtils {

/** Format for the lookup key for a table in a namespace */
public static final String TABLE_LOOKUP_KEY_FORMAT = "%s.%s";

/** Private constructor to prevent instantiation */
private TableMetadataUtils() {}

/**
* Check if the field is a ScalarDB transaction metadata column or not
*
* @param columnName Table column name
* @param metadataColumns Fixed list of metadata columns
* @param columnNames List of all column names in a table
* @return The field is metadata or not
*/
public static boolean isMetadataColumn(
String columnName, Set<String> metadataColumns, Set<String> columnNames) {
// Skip field if it can be ignored
if (metadataColumns.contains(columnName)) {
return true;
}

// Skip if the field is a "before_" field
return columnName.startsWith(Attribute.BEFORE_PREFIX)
&& !columnNames.contains(Attribute.BEFORE_PREFIX + columnName);
}

/**
* Check if the field is a ScalarDB transaction metadata column or not
*
* @param columnName ScalarDB table column name5
* @param tableMetadata Metadata for a single ScalarDB
* @return is the field a metadata column or not
*/
public static boolean isMetadataColumn(String columnName, TableMetadata tableMetadata) {
Set<String> metadataColumns = getMetadataColumns();
LinkedHashSet<String> columnNames = tableMetadata.getColumnNames();

// Skip field if it can be ignored
if (metadataColumns.contains(columnName)) {
return true;
}

// Skip if the field is a "before_" field
return columnName.startsWith(Attribute.BEFORE_PREFIX)
&& !columnNames.contains(Attribute.BEFORE_PREFIX + columnName);
}

/**
* Return a list of fixed ScalarDB transaction metadata columns
*
* @return Set of columns
*/
public static Set<String> getMetadataColumns() {
return Stream.of(
Attribute.ID,
Attribute.STATE,
Attribute.VERSION,
Attribute.PREPARED_AT,
Attribute.COMMITTED_AT,
Attribute.BEFORE_ID,
Attribute.BEFORE_STATE,
Attribute.BEFORE_VERSION,
Attribute.BEFORE_PREPARED_AT,
Attribute.BEFORE_COMMITTED_AT)
.collect(Collectors.toCollection(HashSet::new));
}

/**
* Return a map with the data types for all columns in a ScalarDB table
*
* @param tableMetadata Metadata for a single ScalarDB table
* @return data types map
*/
public static Map<String, DataType> extractColumnDataTypes(TableMetadata tableMetadata) {
Map<String, DataType> columnDataTypes = new HashMap<>();
tableMetadata
.getColumnNames()
.forEach(
columnName ->
columnDataTypes.computeIfAbsent(columnName, tableMetadata::getColumnDataType));
return columnDataTypes;
}

/**
* Return lookup key for a table in a namespace
*
* @param namespace Namespace
* @param tableName Table name
* @return Table metadata lookup key
*/
public static String getTableLookupKey(String namespace, String tableName) {
return String.format(TABLE_LOOKUP_KEY_FORMAT, namespace, tableName);
}

/**
* Populate the projection columns with ScalarDB transaction metadata columns
*
* @param tableMetadata Metadata for a single ScalarDB table
* @param projections List of projection columns
* @return List of projection columns with metadata columns
*/
public static List<String> populateProjectionsWithMetadata(
TableMetadata tableMetadata, List<String> projections) {
List<String> projectionMetadata = new ArrayList<>();

// Add projection columns along with metadata columns
projections.forEach(
projection -> {
projectionMetadata.add(projection);
if (!isKeyColumn(projection, tableMetadata)) {
// Add metadata column before the projection if it's not a key column
projectionMetadata.add(Attribute.BEFORE_PREFIX + projection);
}
});

// Add fixed metadata columns
projectionMetadata.addAll(getMetadataColumns());

return projectionMetadata;
}

/**
* Checks if a column is a key column (partition key or clustering key) in the table.
*
* @param column The column name to check.
* @param tableMetadata The metadata of the ScalarDB table.
* @return True if the column is a key column, false otherwise.
*/
private static boolean isKeyColumn(String column, TableMetadata tableMetadata) {
return tableMetadata.getPartitionKeyNames().contains(column)
|| tableMetadata.getClusteringKeyNames().contains(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
import com.scalar.db.common.error.CoreError;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.service.StorageFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -82,4 +86,72 @@ void getTableMetadata_WithExecutionException_ShouldThrowTableMetadataException()
exception.getMessage());
verify(storageAdmin).getTableMetadata(namespace, tableName);
}

@Test
void getTableMetadata_WithValidRequests_ShouldReturnTableMetadataMap()
throws ExecutionException, TableMetadataException {
// Arrange
String namespace1 = "test_namespace1";
String tableName1 = "test_table1";
String namespace2 = "test_namespace2";
String tableName2 = "test_table2";
TableMetadataRequest request1 = new TableMetadataRequest(namespace1, tableName1);
TableMetadataRequest request2 = new TableMetadataRequest(namespace2, tableName2);
Collection<TableMetadataRequest> requests = Arrays.asList(request1, request2);
TableMetadata expectedMetadata1 = mock(TableMetadata.class);
TableMetadata expectedMetadata2 = mock(TableMetadata.class);
when(storageAdmin.getTableMetadata(namespace1, tableName1)).thenReturn(expectedMetadata1);
when(storageAdmin.getTableMetadata(namespace2, tableName2)).thenReturn(expectedMetadata2);

// Act
Map<String, TableMetadata> actualMetadataMap = tableMetadataService.getTableMetadata(requests);

// Assert
assertEquals(2, actualMetadataMap.size());
assertSame(expectedMetadata1, actualMetadataMap.get(namespace1 + "." + tableName1));
assertSame(expectedMetadata2, actualMetadataMap.get(namespace2 + "." + tableName2));
verify(storageAdmin).getTableMetadata(namespace1, tableName1);
verify(storageAdmin).getTableMetadata(namespace2, tableName2);
}

@Test
void getTableMetadata_WithMissingNamespaceOrTableInRequests_ShouldThrowTableMetadataException()
throws ExecutionException {
// Arrange
String namespace = "test_namespace";
String tableName = "test_table";
TableMetadataRequest request = new TableMetadataRequest(namespace, tableName);
Collection<TableMetadataRequest> requests = Collections.singletonList(request);
when(storageAdmin.getTableMetadata(namespace, tableName)).thenReturn(null);

// Act & Assert
TableMetadataException exception =
assertThrows(
TableMetadataException.class, () -> tableMetadataService.getTableMetadata(requests));
assertEquals(
CoreError.DATA_LOADER_MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName),
exception.getMessage());
verify(storageAdmin).getTableMetadata(namespace, tableName);
}

@Test
void getTableMetadata_WithExecutionExceptionInRequests_ShouldThrowTableMetadataException()
throws ExecutionException {
// Arrange
String namespace = "test_namespace";
String tableName = "test_table";
TableMetadataRequest request = new TableMetadataRequest(namespace, tableName);
Collection<TableMetadataRequest> requests = Collections.singletonList(request);
when(storageAdmin.getTableMetadata(namespace, tableName))
.thenThrow(new ExecutionException("error"));

// Act & Assert
TableMetadataException exception =
assertThrows(
TableMetadataException.class, () -> tableMetadataService.getTableMetadata(requests));
assertEquals(
CoreError.DATA_LOADER_MISSING_NAMESPACE_OR_TABLE.buildMessage(namespace, tableName),
exception.getMessage());
verify(storageAdmin).getTableMetadata(namespace, tableName);
}
}
Loading

0 comments on commit 7702a21

Please sign in to comment.