Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport to branch(3.10) : Fix scan with limit behavior in DynamoDB adapter #2308

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions core/src/main/java/com/scalar/db/storage/dynamo/QueryScanner.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,34 @@
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

@NotThreadSafe
public class QueryScanner implements Scanner {

private final PaginatedRequest request;
private final ResultInterpreter resultInterpreter;

private Iterator<Map<String, AttributeValue>> itemsIterator;
@Nullable private Integer remainingLimit;
@Nullable private Map<String, AttributeValue> lastEvaluatedKey;
private int totalResultCount;

private ScannerIterator scannerIterator;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public QueryScanner(PaginatedRequest request, ResultInterpreter resultInterpreter) {
public QueryScanner(PaginatedRequest request, int limit, ResultInterpreter resultInterpreter) {
this.request = request;
this.resultInterpreter = resultInterpreter;

handleResponse(request.execute());
if (limit > 0) {
remainingLimit = limit;
handleResponse(request.execute(limit));
} else {
remainingLimit = null;
handleResponse(request.execute());
}

this.resultInterpreter = resultInterpreter;
}

@Override
Expand All @@ -49,18 +58,23 @@ private boolean hasNext() {
return true;
}
if (lastEvaluatedKey != null) {
handleResponse(request.execute(lastEvaluatedKey));
if (remainingLimit != null) {
handleResponse(request.execute(lastEvaluatedKey, remainingLimit));
} else {
handleResponse(request.execute(lastEvaluatedKey));
}
return itemsIterator.hasNext();
}
return false;
}

private void handleResponse(PaginatedRequestResponse response) {
List<Map<String, AttributeValue>> items = response.items();
totalResultCount += items.size();
if (remainingLimit != null) {
remainingLimit -= items.size();
}
itemsIterator = items.iterator();
if ((request.limit() == null || totalResultCount < request.limit())
&& response.hasLastEvaluatedKey()) {
if ((remainingLimit == null || remainingLimit > 0) && response.hasLastEvaluatedKey()) {
lastEvaluatedKey = response.lastEvaluatedKey();
} else {
lastEvaluatedKey = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ private Scanner executeScanWithIndex(Selection selection, TableMetadata tableMet

builder.expressionAttributeNames(expressionAttributeNames);

int limit = 0;
if (selection instanceof Scan) {
Scan scan = (Scan) selection;
if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}
limit = scan.getLimit();
}

com.scalar.db.storage.dynamo.request.QueryRequest request =
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
return new QueryScanner(
request, new ResultInterpreter(selection.getProjections(), tableMetadata));
request, limit, new ResultInterpreter(selection.getProjections(), tableMetadata));
}

private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
Expand All @@ -170,10 +170,6 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
}
}

if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}

if (!scan.getProjections().isEmpty()) {
Map<String, String> expressionAttributeNames = new HashMap<>();
projectionExpression(builder, scan, expressionAttributeNames);
Expand All @@ -183,20 +179,17 @@ private Scanner executeScan(Scan scan, TableMetadata tableMetadata) {
if (scan.getConsistency() != Consistency.EVENTUAL) {
builder.consistentRead(true);
}

com.scalar.db.storage.dynamo.request.QueryRequest queryRequest =
new com.scalar.db.storage.dynamo.request.QueryRequest(client, builder.build());
return new QueryScanner(
queryRequest, new ResultInterpreter(scan.getProjections(), tableMetadata));
queryRequest, scan.getLimit(), new ResultInterpreter(scan.getProjections(), tableMetadata));
}

private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
DynamoOperation dynamoOperation = new DynamoOperation(scan, tableMetadata);
ScanRequest.Builder builder = ScanRequest.builder().tableName(dynamoOperation.getTableName());

if (scan.getLimit() > 0) {
builder.limit(scan.getLimit());
}

if (!scan.getProjections().isEmpty()) {
Map<String, String> expressionAttributeNames = new HashMap<>();
projectionExpression(builder, scan, expressionAttributeNames);
Expand All @@ -206,10 +199,13 @@ private Scanner executeFullScan(ScanAll scan, TableMetadata tableMetadata) {
if (scan.getConsistency() != Consistency.EVENTUAL) {
builder.consistentRead(true);
}

com.scalar.db.storage.dynamo.request.ScanRequest requestWrapper =
new com.scalar.db.storage.dynamo.request.ScanRequest(client, builder.build());
return new QueryScanner(
requestWrapper, new ResultInterpreter(scan.getProjections(), tableMetadata));
requestWrapper,
scan.getLimit(),
new ResultInterpreter(scan.getProjections(), tableMetadata));
}

private void projectionExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,34 @@ public interface PaginatedRequest {
/**
* Execute the request
*
* @return the request response
* @return the response
*/
PaginatedRequestResponse execute();

/**
* Execute the request with limit
*
* @param limit the maximum number of items to evaluate (not necessarily the number of matching
* items)
* @return the response
*/
PaginatedRequestResponse execute(int limit);

/**
* Execute the request that will be evaluated starting from the given start key
*
* @param exclusiveStartKey The primary key of the first item that this operation will evaluate.
* @return the request response
* @param exclusiveStartKey the primary key of the first item that this operation will evaluate.
* @return the response
*/
PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey);

/**
* Returns the request limit
* Execute the request that will be evaluated starting from the given start key with limit
*
* @return the request limit
* @param exclusiveStartKey the primary key of the first item that this operation will evaluate.
* @param limit the maximum number of items to evaluate (not necessarily the number of matching
* items)
* @return the response
*/
Integer limit();
PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey, int limit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ public QueryRequest(
this.dynamoRequest = dynamoRequest;
}

@Override
public PaginatedRequestResponse execute() {
QueryResponse response = client.query(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public PaginatedRequestResponse execute(int limit) {
QueryRequest request =
new QueryRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build());
return request.execute();
}

@Override
public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey) {
QueryRequest request =
Expand All @@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveSta
}

@Override
public PaginatedRequestResponse execute() {
QueryResponse response = client.query(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public Integer limit() {
return dynamoRequest.limit();
public PaginatedRequestResponse execute(
Map<String, AttributeValue> exclusiveStartKey, int limit) {
QueryRequest request =
new QueryRequest(
this.client,
this.dynamoRequest
.toBuilder()
.exclusiveStartKey(exclusiveStartKey)
.limit(limit)
.build());
return request.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ public ScanRequest(
this.dynamoRequest = dynamoRequest;
}

@Override
public PaginatedRequestResponse execute() {
ScanResponse response = client.scan(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public PaginatedRequestResponse execute(int limit) {
ScanRequest request =
new ScanRequest(this.client, this.dynamoRequest.toBuilder().limit(limit).build());
return request.execute();
}

@Override
public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveStartKey) {
ScanRequest request =
Expand All @@ -29,15 +44,16 @@ public PaginatedRequestResponse execute(Map<String, AttributeValue> exclusiveSta
}

@Override
public PaginatedRequestResponse execute() {
ScanResponse response = client.scan(dynamoRequest);

return new PaginatedRequestResponse(
response.items(), response.hasLastEvaluatedKey(), response.lastEvaluatedKey());
}

@Override
public Integer limit() {
return dynamoRequest.limit();
public PaginatedRequestResponse execute(
Map<String, AttributeValue> exclusiveStartKey, int limit) {
ScanRequest request =
new ScanRequest(
this.client,
this.dynamoRequest
.toBuilder()
.exclusiveStartKey(exclusiveStartKey)
.limit(limit)
.build());
return request.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ public void one_ShouldReturnResult() {
Map<String, AttributeValue> item = Collections.emptyMap();
List<Map<String, AttributeValue>> items = Arrays.asList(item, item, item);
when(request.execute()).thenReturn(response);
when(request.limit()).thenReturn(null);
when(response.items()).thenReturn(items);
when(resultInterpreter.interpret(item)).thenReturn(result);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
Optional<Result> actual1 = queryScanner.one();
Expand All @@ -71,11 +70,10 @@ public void all_ShouldReturnResults() {
Map<String, AttributeValue> item = Collections.emptyMap();
List<Map<String, AttributeValue>> items = Arrays.asList(item, item, item);
when(request.execute()).thenReturn(response);
when(request.limit()).thenReturn(null);
when(response.items()).thenReturn(items);
when(resultInterpreter.interpret(item)).thenReturn(result);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
List<Result> results1 = queryScanner.all();
Expand All @@ -100,9 +98,8 @@ public void iterator_ShouldReturnResults() {
when(response.items()).thenReturn(items);
when(resultInterpreter.interpret(item)).thenReturn(result);
when(request.execute()).thenReturn(response);
when(request.limit()).thenReturn(null);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
Iterator<Result> iterator = queryScanner.iterator();
Expand Down Expand Up @@ -134,9 +131,8 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() {
when(resultInterpreter.interpret(item)).thenReturn(result);
when(request.execute()).thenReturn(response);
when(request.execute(lastEvaluatedKey)).thenReturn(response);
when(request.limit()).thenReturn(null);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, 0, resultInterpreter);

// Act
Optional<Result> actual1 = queryScanner.one();
Expand Down Expand Up @@ -164,26 +160,27 @@ public void one_ResponseWithLastEvaluatedKey_ShouldReturnResults() {
@Test
public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResults() {
// Arrange
int limit = 3;

Map<String, AttributeValue> item = Collections.emptyMap();
List<Map<String, AttributeValue>> items = Arrays.asList(item, item);
List<Map<String, AttributeValue>> items1 = Arrays.asList(item, item);
List<Map<String, AttributeValue>> items2 = Collections.singletonList(item);
Map<String, AttributeValue> lastEvaluatedKey = Collections.emptyMap();

when(request.limit()).thenReturn(4);
when(response.items()).thenReturn(items);
when(response.items()).thenReturn(items1).thenReturn(items2);
when(response.hasLastEvaluatedKey()).thenReturn(true);
when(response.lastEvaluatedKey()).thenReturn(lastEvaluatedKey);
when(request.execute()).thenReturn(response);
when(request.execute(lastEvaluatedKey)).thenReturn(response);
when(request.execute(limit)).thenReturn(response);
when(request.execute(lastEvaluatedKey, limit - items1.size())).thenReturn(response);
when(resultInterpreter.interpret(item)).thenReturn(result);

QueryScanner queryScanner = new QueryScanner(request, resultInterpreter);
QueryScanner queryScanner = new QueryScanner(request, limit, resultInterpreter);

// Act
Optional<Result> actual1 = queryScanner.one();
Optional<Result> actual2 = queryScanner.one();
Optional<Result> actual3 = queryScanner.one();
Optional<Result> actual4 = queryScanner.one();
Optional<Result> actual5 = queryScanner.one();

// Assert
assertThat(actual1).isPresent();
Expand All @@ -192,12 +189,10 @@ public void one_RequestWithLimitAndResponseWithLastEvaluatedKey_ShouldReturnResu
assertThat(actual2.get()).isEqualTo(result);
assertThat(actual3).isPresent();
assertThat(actual3.get()).isEqualTo(result);
assertThat(actual4).isPresent();
assertThat(actual4.get()).isEqualTo(result);
assertThat(actual5).isNotPresent();
assertThat(actual4).isNotPresent();

verify(resultInterpreter, times(4)).interpret(item);
verify(request).execute(lastEvaluatedKey);
verify(request).execute();
verify(resultInterpreter, times(limit)).interpret(item);
verify(request).execute(limit);
verify(request).execute(lastEvaluatedKey, limit - items1.size());
}
}
Loading