Skip to content

Commit

Permalink
Backport to branch(3) : Revisit behavior of multiple mutations for sa…
Browse files Browse the repository at this point in the history
…me record in transaction in Consensus Commit (#2348)

Co-authored-by: Toshihiro Suzuki <[email protected]>
  • Loading branch information
feeblefakie and brfrn169 authored Nov 18, 2024
1 parent 0323d00 commit 6c9601c
Show file tree
Hide file tree
Showing 7 changed files with 1,665 additions and 494 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/com/scalar/db/common/error/CoreError.java
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,10 @@ public enum CoreError implements ScalarDbError {
+ "Primary-key columns must not contain any of the following characters in Cosmos DB: ':', '/', '\\', '#', '?'. Value: %s",
"",
""),
CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED(
Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""),
CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED(
Category.USER_ERROR, "0147", "Deleting already-inserted data is not allowed", "", ""),

//
// Errors for the concurrency error category
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -101,9 +100,9 @@ void read(Snapshot.Key key, Get get) throws CrudException {
// transaction read it first. However, we update it only if a get operation has no
// conjunction or the result exists. This is because we don’t know whether the record
// actually exists or not due to the conjunction.
snapshot.put(key, result);
snapshot.putIntoReadSet(key, result);
}
snapshot.put(get, result); // for re-read and validation
snapshot.putIntoGetSet(get, result); // for re-read and validation
return;
}
throw new UncommittedRecordException(
Expand All @@ -117,7 +116,7 @@ private Optional<Result> createGetResult(Snapshot.Key key, Get get, List<String>
throws CrudException {
TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable());
return snapshot
.mergeResult(key, snapshot.get(get), get.getConjunctions())
.getResult(key, get)
.map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled));
}

Expand All @@ -139,18 +138,13 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
List<String> originalProjections = new ArrayList<>(originalScan.getProjections());
Scan scan = (Scan) prepareStorageSelection(originalScan);

Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.get(scan);
Optional<Map<Snapshot.Key, TransactionResult>> resultsInSnapshot = snapshot.getResults(scan);
if (resultsInSnapshot.isPresent()) {
for (Entry<Snapshot.Key, TransactionResult> entry : resultsInSnapshot.get().entrySet()) {
snapshot
.mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}
return createScanResults(scan, originalProjections, results);
return createScanResults(scan, originalProjections, resultsInSnapshot.get());
}

Map<Snapshot.Key, TransactionResult> results = new LinkedHashMap<>();

Scanner scanner = null;
try {
scanner = scanFromStorage(scan);
Expand All @@ -169,9 +163,9 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
// We always update the read set to create before image by using the latest record (result)
// because another conflicting transaction might have updated the record after this
// transaction read it first.
snapshot.put(key, Optional.of(result));
snapshot.putIntoReadSet(key, Optional.of(result));

snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value));
snapshot.getResult(key).ifPresent(value -> results.put(key, value));
}
} finally {
if (scanner != null) {
Expand All @@ -182,7 +176,7 @@ private List<Result> scanInternal(Scan originalScan) throws CrudException {
}
}
}
snapshot.put(scan, results);
snapshot.putIntoScanSet(scan, results);

return createScanResults(scan, originalProjections, results);
}
Expand Down Expand Up @@ -213,10 +207,10 @@ public void put(Put put) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
put, snapshot.getFromReadSet(key).orElse(null));
put, snapshot.getResult(key).orElse(null));
}

snapshot.put(key, put);
snapshot.putIntoWriteSet(key, put);
}

public void delete(Delete delete) throws CrudException {
Expand All @@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException {
read(key, createGet(key));
}
mutationConditionsValidator.checkIfConditionIsSatisfied(
delete, snapshot.getFromReadSet(key).orElse(null));
delete, snapshot.getResult(key).orElse(null));
}

snapshot.put(key, delete);
snapshot.putIntoDeleteSet(key, delete);
}

public void readIfImplicitPreReadEnabled() throws CrudException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.scalar.db.api.Get;
import com.scalar.db.api.Operation;
import com.scalar.db.api.Put;
import com.scalar.db.api.PutBuilder;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.ScanAll;
Expand All @@ -30,6 +31,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -113,45 +115,62 @@ Isolation getIsolation() {

// Although this class is not thread-safe, this method is actually thread-safe because the readSet
// is a concurrent map
public void put(Key key, Optional<TransactionResult> result) {
public void putIntoReadSet(Key key, Optional<TransactionResult> result) {
readSet.put(key, result);
}

// Although this class is not thread-safe, this method is actually thread-safe because the getSet
// is a concurrent map
public void put(Get get, Optional<TransactionResult> result) {
public void putIntoGetSet(Get get, Optional<TransactionResult> result) {
getSet.put(get, result);
}

public void put(Scan scan, Map<Key, TransactionResult> results) {
public void putIntoScanSet(Scan scan, Map<Key, TransactionResult> results) {
scanSet.put(scan, results);
}

public void put(Key key, Put put) {
public void putIntoWriteSet(Key key, Put put) {
if (deleteSet.containsKey(key)) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_WRITING_ALREADY_DELETED_DATA_NOT_ALLOWED.buildMessage());
}
if (writeSet.containsKey(key)) {
if (put.isInsertModeEnabled()) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage());
}

// merge the previous put in the write set and the new put
Put originalPut = writeSet.get(key);
put.getColumns().values().forEach(originalPut::withValue);
PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut);
put.getColumns().values().forEach(putBuilder::value);

// If the implicit pre-read is enabled for the new put, it should also be enabled for the
// merged put. However, if the previous put is in insert mode, this doesn’t apply. This is
// because, in insert mode, the read set is not used during the preparation phase. Therefore,
// we only need to enable the implicit pre-read if the previous put is not in insert mode
if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) {
putBuilder.enableImplicitPreRead();
}

writeSet.put(key, putBuilder.build());
} else {
writeSet.put(key, put);
}
}

public void put(Key key, Delete delete) {
writeSet.remove(key);
deleteSet.put(key, delete);
}
public void putIntoDeleteSet(Key key, Delete delete) {
Put put = writeSet.get(key);
if (put != null) {
if (put.isInsertModeEnabled()) {
throw new IllegalArgumentException(
CoreError.CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED.buildMessage());
}

public boolean containsKeyInReadSet(Key key) {
return readSet.containsKey(key);
}
writeSet.remove(key);
}

public Optional<TransactionResult> getFromReadSet(Key key) {
return readSet.getOrDefault(key, Optional.empty());
deleteSet.put(key, delete);
}

public List<Put> getPutsInWriteSet() {
Expand All @@ -166,7 +185,39 @@ public ReadWriteSets getReadWriteSets() {
return new ReadWriteSets(id, readSet, writeSet.entrySet(), deleteSet.entrySet());
}

public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
public boolean containsKeyInReadSet(Key key) {
return readSet.containsKey(key);
}

public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public Optional<TransactionResult> getResult(Key key) throws CrudException {
Optional<TransactionResult> result = readSet.getOrDefault(key, Optional.empty());
return mergeResult(key, result);
}

public Optional<TransactionResult> getResult(Key key, Get get) throws CrudException {
Optional<TransactionResult> result = getSet.getOrDefault(get, Optional.empty());
return mergeResult(key, result, get.getConjunctions());
}

public Optional<Map<Snapshot.Key, TransactionResult>> getResults(Scan scan) throws CrudException {
if (!scanSet.containsKey(scan)) {
return Optional.empty();
}

Map<Key, TransactionResult> results = new LinkedHashMap<>();
for (Entry<Snapshot.Key, TransactionResult> entry : scanSet.get(scan).entrySet()) {
mergeResult(entry.getKey(), Optional.of(entry.getValue()))
.ifPresent(result -> results.put(entry.getKey(), result));
}

return Optional.of(results);
}

private Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResult> result)
throws CrudException {
if (deleteSet.containsKey(key)) {
return Optional.empty();
Expand All @@ -180,7 +231,7 @@ public Optional<TransactionResult> mergeResult(Key key, Optional<TransactionResu
}
}

public Optional<TransactionResult> mergeResult(
private Optional<TransactionResult> mergeResult(
Key key, Optional<TransactionResult> result, Set<Conjunction> conjunctions)
throws CrudException {
return mergeResult(key, result)
Expand Down Expand Up @@ -209,32 +260,6 @@ private TableMetadata getTableMetadata(Key key) throws CrudException {
}
}

private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
if (metadata == null) {
throw new IllegalArgumentException(
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
}
return metadata.getTableMetadata();
}

public boolean containsKeyInGetSet(Get get) {
return getSet.containsKey(get);
}

public Optional<TransactionResult> get(Get get) {
// We expect this method is called after putting the result of the get operation in the get set.
assert getSet.containsKey(get);
return getSet.get(get);
}

public Optional<Map<Key, TransactionResult>> get(Scan scan) {
if (scanSet.containsKey(scan)) {
return Optional.ofNullable(scanSet.get(scan));
}
return Optional.empty();
}

public void verify(Scan scan) {
if (isWriteSetOverlappedWith(scan)) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -536,6 +561,15 @@ void toSerializableWithExtraRead(DistributedStorage storage)
parallelExecutor.validate(tasks, getId());
}

private TableMetadata getTableMetadata(Scan scan) throws ExecutionException {
TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan);
if (metadata == null) {
throw new IllegalArgumentException(
CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get()));
}
return metadata.getTableMetadata();
}

private boolean isChanged(
Optional<TransactionResult> latestResult, Optional<TransactionResult> result) {
if (latestResult.isPresent() != result.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() {
// different partition
Put put1 = preparePut1();
Put put2 = preparePut2();
snapshot.put(new Snapshot.Key(put1), put1);
snapshot.put(new Snapshot.Key(put2), put2);
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2);

return snapshot;
}
Expand All @@ -148,8 +148,8 @@ private Snapshot prepareSnapshotWithSamePartitionPut() {
// same partition
Put put1 = preparePut1();
Put put3 = preparePut3();
snapshot.put(new Snapshot.Key(put1), put1);
snapshot.put(new Snapshot.Key(put3), put3);
snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1);
snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3);

return snapshot;
}
Expand Down
Loading

0 comments on commit 6c9601c

Please sign in to comment.