From 6c9601c690d498de87bd73aa435946c6fa94f846 Mon Sep 17 00:00:00 2001 From: Hiroyuki Yamada Date: Mon, 18 Nov 2024 23:44:15 +0900 Subject: [PATCH] Backport to branch(3) : Revisit behavior of multiple mutations for same record in transaction in Consensus Commit (#2348) Co-authored-by: Toshihiro Suzuki --- .../com/scalar/db/common/error/CoreError.java | 4 + .../consensuscommit/CrudHandler.java | 34 +- .../transaction/consensuscommit/Snapshot.java | 118 ++- .../consensuscommit/CommitHandlerTest.java | 8 +- .../consensuscommit/CrudHandlerTest.java | 144 ++- .../consensuscommit/SnapshotTest.java | 945 +++++++++++------- .../ConsensusCommitIntegrationTestBase.java | 906 +++++++++++++++++ 7 files changed, 1665 insertions(+), 494 deletions(-) diff --git a/core/src/main/java/com/scalar/db/common/error/CoreError.java b/core/src/main/java/com/scalar/db/common/error/CoreError.java index a0dc398dfc..7b7bb29d05 100644 --- a/core/src/main/java/com/scalar/db/common/error/CoreError.java +++ b/core/src/main/java/com/scalar/db/common/error/CoreError.java @@ -666,6 +666,10 @@ public enum CoreError implements ScalarDbError { + "Primary-key columns must not contain any of the following characters in Cosmos DB: ':', '/', '\\', '#', '?'. Value: %s", "", ""), + CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED( + Category.USER_ERROR, "0146", "Inserting already-written data is not allowed", "", ""), + CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED( + Category.USER_ERROR, "0147", "Deleting already-inserted data is not allowed", "", ""), // // Errors for the concurrency error category diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java index 8f9544f5e2..b79d93582f 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/CrudHandler.java @@ -26,7 +26,6 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; import java.util.stream.Collectors; import javax.annotation.concurrent.NotThreadSafe; @@ -101,9 +100,9 @@ void read(Snapshot.Key key, Get get) throws CrudException { // transaction read it first. However, we update it only if a get operation has no // conjunction or the result exists. This is because we don’t know whether the record // actually exists or not due to the conjunction. - snapshot.put(key, result); + snapshot.putIntoReadSet(key, result); } - snapshot.put(get, result); // for re-read and validation + snapshot.putIntoGetSet(get, result); // for re-read and validation return; } throw new UncommittedRecordException( @@ -117,7 +116,7 @@ private Optional createGetResult(Snapshot.Key key, Get get, List throws CrudException { TableMetadata metadata = getTableMetadata(key.getNamespace(), key.getTable()); return snapshot - .mergeResult(key, snapshot.get(get), get.getConjunctions()) + .getResult(key, get) .map(r -> new FilteredResult(r, projections, metadata, isIncludeMetadataEnabled)); } @@ -139,18 +138,13 @@ private List scanInternal(Scan originalScan) throws CrudException { List originalProjections = new ArrayList<>(originalScan.getProjections()); Scan scan = (Scan) prepareStorageSelection(originalScan); - Map results = new LinkedHashMap<>(); - - Optional> resultsInSnapshot = snapshot.get(scan); + Optional> resultsInSnapshot = snapshot.getResults(scan); if (resultsInSnapshot.isPresent()) { - for (Entry entry : resultsInSnapshot.get().entrySet()) { - snapshot - .mergeResult(entry.getKey(), Optional.of(entry.getValue())) - .ifPresent(result -> results.put(entry.getKey(), result)); - } - return createScanResults(scan, originalProjections, results); + return createScanResults(scan, originalProjections, resultsInSnapshot.get()); } + Map results = new LinkedHashMap<>(); + Scanner scanner = null; try { scanner = scanFromStorage(scan); @@ -169,9 +163,9 @@ private List scanInternal(Scan originalScan) throws CrudException { // We always update the read set to create before image by using the latest record (result) // because another conflicting transaction might have updated the record after this // transaction read it first. - snapshot.put(key, Optional.of(result)); + snapshot.putIntoReadSet(key, Optional.of(result)); - snapshot.mergeResult(key, Optional.of(result)).ifPresent(value -> results.put(key, value)); + snapshot.getResult(key).ifPresent(value -> results.put(key, value)); } } finally { if (scanner != null) { @@ -182,7 +176,7 @@ private List scanInternal(Scan originalScan) throws CrudException { } } } - snapshot.put(scan, results); + snapshot.putIntoScanSet(scan, results); return createScanResults(scan, originalProjections, results); } @@ -213,10 +207,10 @@ public void put(Put put) throws CrudException { read(key, createGet(key)); } mutationConditionsValidator.checkIfConditionIsSatisfied( - put, snapshot.getFromReadSet(key).orElse(null)); + put, snapshot.getResult(key).orElse(null)); } - snapshot.put(key, put); + snapshot.putIntoWriteSet(key, put); } public void delete(Delete delete) throws CrudException { @@ -227,10 +221,10 @@ public void delete(Delete delete) throws CrudException { read(key, createGet(key)); } mutationConditionsValidator.checkIfConditionIsSatisfied( - delete, snapshot.getFromReadSet(key).orElse(null)); + delete, snapshot.getResult(key).orElse(null)); } - snapshot.put(key, delete); + snapshot.putIntoDeleteSet(key, delete); } public void readIfImplicitPreReadEnabled() throws CrudException { diff --git a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java index dce0413b50..99481d2670 100644 --- a/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java +++ b/core/src/main/java/com/scalar/db/transaction/consensuscommit/Snapshot.java @@ -9,6 +9,7 @@ import com.scalar.db.api.Get; import com.scalar.db.api.Operation; import com.scalar.db.api.Put; +import com.scalar.db.api.PutBuilder; import com.scalar.db.api.Result; import com.scalar.db.api.Scan; import com.scalar.db.api.ScanAll; @@ -30,6 +31,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -113,45 +115,62 @@ Isolation getIsolation() { // Although this class is not thread-safe, this method is actually thread-safe because the readSet // is a concurrent map - public void put(Key key, Optional result) { + public void putIntoReadSet(Key key, Optional result) { readSet.put(key, result); } // Although this class is not thread-safe, this method is actually thread-safe because the getSet // is a concurrent map - public void put(Get get, Optional result) { + public void putIntoGetSet(Get get, Optional result) { getSet.put(get, result); } - public void put(Scan scan, Map results) { + public void putIntoScanSet(Scan scan, Map results) { scanSet.put(scan, results); } - public void put(Key key, Put put) { + public void putIntoWriteSet(Key key, Put put) { if (deleteSet.containsKey(key)) { throw new IllegalArgumentException( CoreError.CONSENSUS_COMMIT_WRITING_ALREADY_DELETED_DATA_NOT_ALLOWED.buildMessage()); } if (writeSet.containsKey(key)) { + if (put.isInsertModeEnabled()) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_INSERTING_ALREADY_WRITTEN_DATA_NOT_ALLOWED.buildMessage()); + } + // merge the previous put in the write set and the new put Put originalPut = writeSet.get(key); - put.getColumns().values().forEach(originalPut::withValue); + PutBuilder.BuildableFromExisting putBuilder = Put.newBuilder(originalPut); + put.getColumns().values().forEach(putBuilder::value); + + // If the implicit pre-read is enabled for the new put, it should also be enabled for the + // merged put. However, if the previous put is in insert mode, this doesn’t apply. This is + // because, in insert mode, the read set is not used during the preparation phase. Therefore, + // we only need to enable the implicit pre-read if the previous put is not in insert mode + if (put.isImplicitPreReadEnabled() && !originalPut.isInsertModeEnabled()) { + putBuilder.enableImplicitPreRead(); + } + + writeSet.put(key, putBuilder.build()); } else { writeSet.put(key, put); } } - public void put(Key key, Delete delete) { - writeSet.remove(key); - deleteSet.put(key, delete); - } + public void putIntoDeleteSet(Key key, Delete delete) { + Put put = writeSet.get(key); + if (put != null) { + if (put.isInsertModeEnabled()) { + throw new IllegalArgumentException( + CoreError.CONSENSUS_COMMIT_DELETING_ALREADY_INSERTED_DATA_NOT_ALLOWED.buildMessage()); + } - public boolean containsKeyInReadSet(Key key) { - return readSet.containsKey(key); - } + writeSet.remove(key); + } - public Optional getFromReadSet(Key key) { - return readSet.getOrDefault(key, Optional.empty()); + deleteSet.put(key, delete); } public List getPutsInWriteSet() { @@ -166,7 +185,39 @@ public ReadWriteSets getReadWriteSets() { return new ReadWriteSets(id, readSet, writeSet.entrySet(), deleteSet.entrySet()); } - public Optional mergeResult(Key key, Optional result) + public boolean containsKeyInReadSet(Key key) { + return readSet.containsKey(key); + } + + public boolean containsKeyInGetSet(Get get) { + return getSet.containsKey(get); + } + + public Optional getResult(Key key) throws CrudException { + Optional result = readSet.getOrDefault(key, Optional.empty()); + return mergeResult(key, result); + } + + public Optional getResult(Key key, Get get) throws CrudException { + Optional result = getSet.getOrDefault(get, Optional.empty()); + return mergeResult(key, result, get.getConjunctions()); + } + + public Optional> getResults(Scan scan) throws CrudException { + if (!scanSet.containsKey(scan)) { + return Optional.empty(); + } + + Map results = new LinkedHashMap<>(); + for (Entry entry : scanSet.get(scan).entrySet()) { + mergeResult(entry.getKey(), Optional.of(entry.getValue())) + .ifPresent(result -> results.put(entry.getKey(), result)); + } + + return Optional.of(results); + } + + private Optional mergeResult(Key key, Optional result) throws CrudException { if (deleteSet.containsKey(key)) { return Optional.empty(); @@ -180,7 +231,7 @@ public Optional mergeResult(Key key, Optional mergeResult( + private Optional mergeResult( Key key, Optional result, Set conjunctions) throws CrudException { return mergeResult(key, result) @@ -209,32 +260,6 @@ private TableMetadata getTableMetadata(Key key) throws CrudException { } } - private TableMetadata getTableMetadata(Scan scan) throws ExecutionException { - TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan); - if (metadata == null) { - throw new IllegalArgumentException( - CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get())); - } - return metadata.getTableMetadata(); - } - - public boolean containsKeyInGetSet(Get get) { - return getSet.containsKey(get); - } - - public Optional get(Get get) { - // We expect this method is called after putting the result of the get operation in the get set. - assert getSet.containsKey(get); - return getSet.get(get); - } - - public Optional> get(Scan scan) { - if (scanSet.containsKey(scan)) { - return Optional.ofNullable(scanSet.get(scan)); - } - return Optional.empty(); - } - public void verify(Scan scan) { if (isWriteSetOverlappedWith(scan)) { throw new IllegalArgumentException( @@ -536,6 +561,15 @@ void toSerializableWithExtraRead(DistributedStorage storage) parallelExecutor.validate(tasks, getId()); } + private TableMetadata getTableMetadata(Scan scan) throws ExecutionException { + TransactionTableMetadata metadata = tableMetadataManager.getTransactionTableMetadata(scan); + if (metadata == null) { + throw new IllegalArgumentException( + CoreError.TABLE_NOT_FOUND.buildMessage(scan.forFullTableName().get())); + } + return metadata.getTableMetadata(); + } + private boolean isChanged( Optional latestResult, Optional result) { if (latestResult.isPresent() != result.isPresent()) { diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java index bac8fd847c..fdf84d0c15 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CommitHandlerTest.java @@ -130,8 +130,8 @@ private Snapshot prepareSnapshotWithDifferentPartitionPut() { // different partition Put put1 = preparePut1(); Put put2 = preparePut2(); - snapshot.put(new Snapshot.Key(put1), put1); - snapshot.put(new Snapshot.Key(put2), put2); + snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); + snapshot.putIntoWriteSet(new Snapshot.Key(put2), put2); return snapshot; } @@ -148,8 +148,8 @@ private Snapshot prepareSnapshotWithSamePartitionPut() { // same partition Put put1 = preparePut1(); Put put3 = preparePut3(); - snapshot.put(new Snapshot.Key(put1), put1); - snapshot.put(new Snapshot.Key(put3), put3); + snapshot.putIntoWriteSet(new Snapshot.Key(put1), put1); + snapshot.putIntoWriteSet(new Snapshot.Key(put3), put3); return snapshot; } diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java index bacc87ca1c..2d2c50c463 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/CrudHandlerTest.java @@ -3,7 +3,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -160,11 +159,10 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept // Arrange Get get = prepareGet(); Get getForStorage = toGetForStorageFrom(get); + Snapshot.Key key = new Snapshot.Key(get); Optional expected = Optional.of(prepareResult(TransactionState.COMMITTED)); when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(true); - when(snapshot.get(getForStorage)).thenReturn(expected); - when(snapshot.mergeResult(new Snapshot.Key(getForStorage), expected, Collections.emptySet())) - .thenReturn(expected); + when(snapshot.getResult(key, getForStorage)).thenReturn(expected); // Act Optional actual = handler.get(get); @@ -188,13 +186,8 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept Optional transactionResult = expected.map(e -> (TransactionResult) e); Snapshot.Key key = new Snapshot.Key(getForStorage); when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false); - doNothing() - .when(snapshot) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(storage.get(getForStorage)).thenReturn(expected); - when(snapshot.get(getForStorage)).thenReturn(transactionResult); - when(snapshot.mergeResult(key, transactionResult, Collections.emptySet())) - .thenReturn(transactionResult); + when(snapshot.getResult(key, getForStorage)).thenReturn(transactionResult); // Act Optional result = handler.get(get); @@ -206,8 +199,8 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept new FilteredResult( expected.get(), Collections.emptyList(), TABLE_METADATA, false))); verify(storage).get(getForStorage); - verify(snapshot).put(key, Optional.of((TransactionResult) expected.get())); - verify(snapshot).put(get, Optional.of((TransactionResult) expected.get())); + verify(snapshot).putIntoReadSet(key, Optional.of((TransactionResult) expected.get())); + verify(snapshot).putIntoGetSet(get, Optional.of((TransactionResult) expected.get())); } @Test @@ -233,8 +226,8 @@ public void get_GetExistsInSnapshot_ShouldReturnFromSnapshot() throws CrudExcept assertThat(exception.getResults().get(0)).isEqualTo(result); }); - verify(snapshot, never()) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); + verify(snapshot, never()).putIntoReadSet(any(), ArgumentMatchers.any()); + verify(snapshot, never()).putIntoGetSet(any(), ArgumentMatchers.any()); } @Test @@ -277,15 +270,9 @@ public void get_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() Get get2 = prepareGet(); Result result = prepareResult(TransactionState.COMMITTED); Optional expected = Optional.of(new TransactionResult(result)); - doNothing() - .when(snapshot) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); Snapshot.Key key = new Snapshot.Key(getForStorage); when(snapshot.containsKeyInGetSet(getForStorage)).thenReturn(false).thenReturn(true); - when(snapshot.get(getForStorage)).thenReturn(expected).thenReturn(expected); - when(snapshot.mergeResult(key, expected, Collections.emptySet())) - .thenReturn(expected) - .thenReturn(expected); + when(snapshot.getResult(key, getForStorage)).thenReturn(expected).thenReturn(expected); when(storage.get(getForStorage)).thenReturn(Optional.of(result)); // Act @@ -293,7 +280,7 @@ public void get_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() Optional results2 = handler.get(get2); // Assert - verify(snapshot).put(key, expected); + verify(snapshot).putIntoReadSet(key, expected); assertThat(results1) .isEqualTo( Optional.of( @@ -363,18 +350,16 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); TransactionResult expected = new TransactionResult(result); - doNothing() - .when(snapshot) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scanForStorage)).thenReturn(scanner); - when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(expected)); + when(snapshot.getResult(any())).thenReturn(Optional.of(expected)); // Act List results = handler.scan(scan); // Assert - verify(snapshot).put(key, Optional.of(expected)); + verify(snapshot).putIntoReadSet(key, Optional.of(expected)); + verify(snapshot).putIntoScanSet(scan, ImmutableMap.of(key, expected)); verify(snapshot).verify(scan); assertThat(results.size()).isEqualTo(1); assertThat(results.get(0)) @@ -403,8 +388,8 @@ public void scan_ResultGivenFromStorage_ShouldUpdateSnapshotAndReturn() assertThat(exception.getResults().get(0)).isEqualTo(result); }); - verify(snapshot, never()) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); + verify(snapshot, never()).putIntoReadSet(any(), ArgumentMatchers.any()); + verify(snapshot, never()).putIntoScanSet(any(), ArgumentMatchers.any()); } @Test @@ -417,23 +402,21 @@ public void scan_CalledTwice_SecondTimeShouldReturnTheSameFromSnapshot() Scan scan2 = prepareScan(); result = prepareResult(TransactionState.COMMITTED); TransactionResult expected = new TransactionResult(result); - doNothing() - .when(snapshot) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scanForStorage)).thenReturn(scanner); Snapshot.Key key = new Snapshot.Key(scanForStorage, result); - when(snapshot.get(scanForStorage)) + when(snapshot.getResults(scanForStorage)) .thenReturn(Optional.empty()) - .thenReturn(Optional.of(Collections.singletonMap(key, expected))); - when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(expected)); + .thenReturn(Optional.of(ImmutableMap.of(key, expected))); + when(snapshot.getResult(key)).thenReturn(Optional.of(expected)); // Act List results1 = handler.scan(scan1); List results2 = handler.scan(scan2); // Assert - verify(snapshot).put(key, Optional.of(expected)); + verify(snapshot).putIntoReadSet(key, Optional.of(expected)); + verify(snapshot).putIntoScanSet(scanForStorage, ImmutableMap.of(key, expected)); assertThat(results1.size()).isEqualTo(1); assertThat(results1.get(0)) .isEqualTo(new FilteredResult(expected, Collections.emptyList(), TABLE_METADATA, false)); @@ -478,20 +461,15 @@ public void scan_GetCalledAfterScan_ShouldReturnFromStorage() Scan scan = prepareScan(); Scan scanForStorage = toScanForStorageFrom(scan); result = prepareResult(TransactionState.COMMITTED); - doNothing() - .when(snapshot) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scanForStorage)).thenReturn(scanner); - Get get = prepareGet(); + Snapshot.Key key = new Snapshot.Key(get); Get getForStorage = toGetForStorageFrom(get); Optional transactionResult = Optional.of(new TransactionResult(result)); when(storage.get(getForStorage)).thenReturn(Optional.of(result)); - when(snapshot.get(get)).thenReturn(transactionResult); - when(snapshot.mergeResult(any(), any())).thenReturn(transactionResult); - when(snapshot.mergeResult(new Snapshot.Key(get), transactionResult, Collections.emptySet())) - .thenReturn(transactionResult); + when(snapshot.getResult(key, get)).thenReturn(transactionResult); + when(snapshot.getResult(key)).thenReturn(transactionResult); // Act List results = handler.scan(scan); @@ -499,7 +477,10 @@ public void scan_GetCalledAfterScan_ShouldReturnFromStorage() // Assert verify(storage).scan(scanForStorage); + verify(storage).get(getForStorage); + assertThat(results.size()).isEqualTo(1); + assertThat(result).isPresent(); assertThat(results.get(0)).isEqualTo(result.get()); } @@ -514,7 +495,7 @@ public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorage() handler = new CrudHandler(storage, snapshot, tableMetadataManager, false, parallelExecutor); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(scan)).thenReturn(scanner); - Get get = toGetForStorageFrom(prepareGet()); + Get get = prepareGet(); when(storage.get(get)).thenReturn(Optional.of(result)); // Act @@ -524,6 +505,8 @@ public void scan_GetCalledAfterScanUnderRealSnapshot_ShouldReturnFromStorage() // Assert verify(storage).scan(scan); verify(storage).get(get); + assertThat(results.size()).isEqualTo(1); + assertThat(result).isPresent(); assertThat(results.get(0)).isEqualTo(result.get()); } @@ -607,19 +590,17 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe Scan scan = prepareCrossPartitionScan(); result = prepareResult(TransactionState.COMMITTED); Snapshot.Key key = new Snapshot.Key(scan, result); - doNothing() - .when(snapshot) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); when(scanner.iterator()).thenReturn(Collections.singletonList(result).iterator()); when(storage.scan(any(ScanAll.class))).thenReturn(scanner); TransactionResult transactionResult = new TransactionResult(result); - when(snapshot.mergeResult(any(), any())).thenReturn(Optional.of(transactionResult)); + when(snapshot.getResult(key)).thenReturn(Optional.of(transactionResult)); // Act List results = handler.scan(scan); // Assert - verify(snapshot).put(key, Optional.of(transactionResult)); + verify(snapshot).putIntoReadSet(key, Optional.of(transactionResult)); + verify(snapshot).putIntoScanSet(scan, ImmutableMap.of(key, transactionResult)); verify(snapshot).verify(scan); assertThat(results.size()).isEqualTo(1); assertThat(results.get(0)) @@ -648,8 +629,7 @@ public void scan_CalledAfterDeleteUnderRealSnapshot_ShouldReturnResultsWithoutDe assertThat(exception.getResults().get(0)).isEqualTo(result); }); - verify(snapshot, never()) - .put(any(Snapshot.Key.class), ArgumentMatchers.>any()); + verify(snapshot, never()).putIntoReadSet(any(Snapshot.Key.class), ArgumentMatchers.any()); verify(snapshot, never()).verify(any()); } @@ -666,9 +646,9 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied, never()).readUnread(any(), any()); - verify(snapshot, never()).getFromReadSet(any()); + verify(snapshot, never()).getResult(any()); verify(mutationConditionsValidator, never()).checkIfConditionIsSatisfied(any(Put.class), any()); - verify(snapshot).put(new Snapshot.Key(put), put); + verify(snapshot).putIntoWriteSet(new Snapshot.Key(put), put); } @Test @@ -685,10 +665,10 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .enableImplicitPreRead() .build(); Snapshot.Key key = new Snapshot.Key(put); - when(snapshot.containsKeyInReadSet(any())).thenReturn(true); + when(snapshot.containsKeyInReadSet(key)).thenReturn(true); TransactionResult result = mock(TransactionResult.class); when(result.isCommitted()).thenReturn(true); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.of(result)); + when(snapshot.getResult(key)).thenReturn(Optional.of(result)); Get getForKey = Get.newBuilder() @@ -704,9 +684,9 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied, never()).readUnread(key, getForKey); - verify(snapshot).getFromReadSet(key); + verify(snapshot).getResult(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result); - verify(snapshot).put(key, put); + verify(snapshot).putIntoWriteSet(key, put); } @Test @@ -723,10 +703,10 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .enableImplicitPreRead() .build(); Snapshot.Key key = new Snapshot.Key(put); - when(snapshot.containsKeyInReadSet(any())).thenReturn(false); + when(snapshot.containsKeyInReadSet(key)).thenReturn(false); TransactionResult result = mock(TransactionResult.class); when(result.isCommitted()).thenReturn(true); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.of(result)); + when(snapshot.getResult(key)).thenReturn(Optional.of(result)); Get getForKey = toGetForStorageFrom( @@ -744,9 +724,9 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied).read(key, getForKey); - verify(snapshot).getFromReadSet(key); + verify(snapshot).getResult(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result); - verify(snapshot).put(key, put); + verify(snapshot).putIntoWriteSet(key, put); } @Test @@ -762,10 +742,10 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C .condition(ConditionBuilder.putIfExists()) .build(); Snapshot.Key key = new Snapshot.Key(put); - when(snapshot.containsKeyInReadSet(any())).thenReturn(true); + when(snapshot.containsKeyInReadSet(key)).thenReturn(true); TransactionResult result = mock(TransactionResult.class); when(result.isCommitted()).thenReturn(true); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.of(result)); + when(snapshot.getResult(key)).thenReturn(Optional.of(result)); Get getForKey = Get.newBuilder() @@ -781,9 +761,9 @@ public void put_PutWithoutConditionGiven_ShouldCallAppropriateMethods() throws C // Assert verify(spied, never()).readUnread(key, getForKey); - verify(snapshot).getFromReadSet(key); + verify(snapshot).getResult(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(put, result); - verify(snapshot).put(key, put); + verify(snapshot).putIntoWriteSet(key, put); } @Test @@ -821,10 +801,10 @@ public void delete_DeleteWithoutConditionGiven_ShouldCallAppropriateMethods() // Assert verify(spied, never()).readUnread(any(), any()); - verify(snapshot, never()).getFromReadSet(any()); + verify(snapshot, never()).getResult(any()); verify(mutationConditionsValidator, never()) .checkIfConditionIsSatisfied(any(Delete.class), any()); - verify(snapshot).put(new Snapshot.Key(delete), delete); + verify(snapshot).putIntoDeleteSet(new Snapshot.Key(delete), delete); } @Test @@ -839,10 +819,10 @@ public void delete_DeleteWithConditionGiven_WithResultInReadSet_ShouldCallApprop .condition(ConditionBuilder.deleteIfExists()) .build(); Snapshot.Key key = new Snapshot.Key(delete); - when(snapshot.containsKeyInReadSet(any())).thenReturn(true); + when(snapshot.containsKeyInReadSet(key)).thenReturn(true); TransactionResult result = mock(TransactionResult.class); when(result.isCommitted()).thenReturn(true); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.of(result)); + when(snapshot.getResult(key)).thenReturn(Optional.of(result)); Get getForKey = Get.newBuilder() @@ -858,9 +838,9 @@ public void delete_DeleteWithConditionGiven_WithResultInReadSet_ShouldCallApprop // Assert verify(spied, never()).readUnread(key, getForKey); - verify(snapshot).getFromReadSet(key); + verify(snapshot).getResult(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, result); - verify(snapshot).put(key, delete); + verify(snapshot).putIntoDeleteSet(key, delete); } @Test @@ -875,8 +855,8 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp .condition(ConditionBuilder.deleteIfExists()) .build(); Snapshot.Key key = new Snapshot.Key(delete); - when(snapshot.containsKeyInReadSet(any())).thenReturn(false); - when(snapshot.getFromReadSet(any())).thenReturn(Optional.empty()); + when(snapshot.containsKeyInReadSet(key)).thenReturn(false); + when(snapshot.getResult(key)).thenReturn(Optional.empty()); Get getForKey = toGetForStorageFrom( @@ -894,9 +874,9 @@ public void delete_DeleteWithConditionGiven_WithoutResultInReadSet_ShouldCallApp // Assert verify(spied).read(key, getForKey); - verify(snapshot).getFromReadSet(key); + verify(snapshot).getResult(key); verify(mutationConditionsValidator).checkIfConditionIsSatisfied(delete, null); - verify(snapshot).put(key, delete); + verify(snapshot).putIntoDeleteSet(key, delete); } @SuppressWarnings("unchecked") @@ -921,7 +901,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() // Assert verify(storage, never()).get(any()); - verify(snapshot, never()).put(any(Get.class), any(Optional.class)); + verify(snapshot, never()).putIntoGetSet(any(Get.class), any(Optional.class)); } @Test @@ -947,8 +927,8 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() // Assert verify(storage).get(any()); - verify(snapshot).put(key, Optional.empty()); - verify(snapshot).put(getForKey, Optional.empty()); + verify(snapshot).putIntoReadSet(key, Optional.empty()); + verify(snapshot).putIntoGetSet(getForKey, Optional.empty()); } @Test @@ -975,8 +955,8 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() // Assert verify(storage).get(any()); - verify(snapshot, never()).put(key, Optional.empty()); - verify(snapshot).put(getForKey, Optional.empty()); + verify(snapshot, never()).putIntoReadSet(key, Optional.empty()); + verify(snapshot).putIntoGetSet(getForKey, Optional.empty()); } @Test @@ -1006,7 +986,7 @@ public void readUnread_GetContainedInGetSet_ShouldCallAppropriateMethods() // Assert verify(storage).get(any()); - verify(snapshot).put(key, Optional.of(new TransactionResult(result))); + verify(snapshot).putIntoReadSet(key, Optional.of(new TransactionResult(result))); } @Test diff --git a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java index 856eca5eea..0918162902 100644 --- a/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java +++ b/core/src/test/java/com/scalar/db/transaction/consensuscommit/SnapshotTest.java @@ -27,7 +27,6 @@ import com.scalar.db.api.Scan; import com.scalar.db.api.ScanAll; import com.scalar.db.api.Scanner; -import com.scalar.db.api.Selection.Conjunction; import com.scalar.db.api.TableMetadata; import com.scalar.db.common.ResultImpl; import com.scalar.db.exception.storage.ExecutionException; @@ -47,9 +46,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.junit.jupiter.api.BeforeEach; @@ -294,80 +293,205 @@ private void configureBehavior() throws ExecutionException { } @Test - public void put_ResultGiven_ShouldHoldWhatsGivenInReadSet() { + public void putIntoReadSet_ResultGiven_ShouldHoldWhatsGivenInReadSet() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Snapshot.Key key = new Snapshot.Key(prepareGet()); TransactionResult result = prepareResult(ANY_ID); // Act - snapshot.put(key, Optional.of(result)); + snapshot.putIntoReadSet(key, Optional.of(result)); // Assert assertThat(readSet.get(key)).isEqualTo(Optional.of(result)); } @Test - public void put_PutGiven_ShouldHoldWhatsGivenInWriteSet() { + public void putIntoGetSet_ResultGiven_ShouldHoldWhatsGivenInReadSet() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Get get = prepareGet(); + TransactionResult result = prepareResult(ANY_ID); + + // Act + snapshot.putIntoGetSet(get, Optional.of(result)); + + // Assert + assertThat(getSet.get(get)).isEqualTo(Optional.of(result)); + } + + @Test + public void putIntoWriteSet_PutGiven_ShouldHoldWhatsGivenInWriteSet() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Put put = preparePut(); Snapshot.Key key = new Snapshot.Key(put); // Act - snapshot.put(key, put); + snapshot.putIntoWriteSet(key, put); // Assert assertThat(writeSet.get(key)).isEqualTo(put); } @Test - public void put_PutGivenTwice_ShouldHoldMergedPut() { + public void putIntoWriteSet_PutGivenTwice_ShouldHoldMergedPut() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Put put1 = preparePut(); Snapshot.Key key = new Snapshot.Key(put1); - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); + Key partitionKey = Key.ofText(ANY_NAME_1, ANY_TEXT_1); + Key clusteringKey = Key.ofText(ANY_NAME_2, ANY_TEXT_2); Put put2 = - new Put(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME) - .withValue(ANY_NAME_3, ANY_TEXT_5) - .withTextValue(ANY_NAME_4, null); + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .textValue(ANY_NAME_3, ANY_TEXT_5) + .textValue(ANY_NAME_4, null) + .enableImplicitPreRead() + .build(); + + // Act + snapshot.putIntoWriteSet(key, put1); + snapshot.putIntoWriteSet(key, put2); + + // Assert + Put mergedPut = writeSet.get(key); + assertThat(mergedPut.getColumns()) + .isEqualTo( + ImmutableMap.of( + ANY_NAME_3, + TextColumn.of(ANY_NAME_3, ANY_TEXT_5), + ANY_NAME_4, + TextColumn.ofNull(ANY_NAME_4))); + assertThat(mergedPut.isImplicitPreReadEnabled()).isTrue(); + } + + @Test + public void putIntoWriteSet_PutGivenAfterDelete_ShouldThrowIllegalArgumentException() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Delete delete = prepareDelete(); + Snapshot.Key deleteKey = new Snapshot.Key(prepareDelete()); + snapshot.putIntoDeleteSet(deleteKey, delete); + + Put put = preparePut(); + Snapshot.Key putKey = new Snapshot.Key(preparePut()); + + // Act Assert + assertThatThrownBy(() -> snapshot.putIntoWriteSet(putKey, put)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void + putIntoWriteSet_PutWithInsertModeEnabledGivenAfterPut_ShouldThrowIllegalArgumentException() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePut(); + Put putWithInsertModeEnabled = Put.newBuilder(put).enableInsertMode().build(); + Snapshot.Key key = new Snapshot.Key(put); + + // Act Assert + snapshot.putIntoWriteSet(key, put); + assertThatThrownBy(() -> snapshot.putIntoWriteSet(key, putWithInsertModeEnabled)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void + putIntoWriteSet_PutWithImplicitPreReadEnabledGivenAfterWithInsertModeEnabled_ShouldHoldMergedPutWithoutImplicitPreRead() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put putWithInsertModeEnabled = Put.newBuilder(preparePut()).enableInsertMode().build(); + + Key partitionKey = Key.ofText(ANY_NAME_1, ANY_TEXT_1); + Key clusteringKey = Key.ofText(ANY_NAME_2, ANY_TEXT_2); + Put putWithImplicitPreReadEnabled = + Put.newBuilder() + .namespace(ANY_NAMESPACE_NAME) + .table(ANY_TABLE_NAME) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .textValue(ANY_NAME_3, ANY_TEXT_5) + .textValue(ANY_NAME_4, null) + .enableImplicitPreRead() + .build(); + + Snapshot.Key key = new Snapshot.Key(putWithInsertModeEnabled); // Act - snapshot.put(key, put1); - snapshot.put(key, put2); + snapshot.putIntoWriteSet(key, putWithInsertModeEnabled); + snapshot.putIntoWriteSet(key, putWithImplicitPreReadEnabled); // Assert - assertThat(writeSet.get(key).getColumns()) + Put mergedPut = writeSet.get(key); + assertThat(mergedPut.getColumns()) .isEqualTo( ImmutableMap.of( ANY_NAME_3, TextColumn.of(ANY_NAME_3, ANY_TEXT_5), ANY_NAME_4, TextColumn.ofNull(ANY_NAME_4))); + assertThat(mergedPut.isInsertModeEnabled()).isTrue(); + assertThat(mergedPut.isImplicitPreReadEnabled()).isFalse(); } @Test - public void put_DeleteGiven_ShouldHoldWhatsGivenInDeleteSet() { + public void putIntoDeleteSet_DeleteGiven_ShouldHoldWhatsGivenInDeleteSet() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Delete delete = prepareDelete(); Snapshot.Key key = new Snapshot.Key(delete); // Act - snapshot.put(key, delete); + snapshot.putIntoDeleteSet(key, delete); // Assert assertThat(deleteSet.get(key)).isEqualTo(delete); } @Test - public void put_ScanGiven_ShouldHoldWhatsGivenInScanSet() { + public void putIntoDeleteSet_DeleteGivenAfterPut_PutSupercedesDelete() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePut(); + Snapshot.Key putKey = new Snapshot.Key(preparePut()); + snapshot.putIntoWriteSet(putKey, put); + + Delete delete = prepareDelete(); + Snapshot.Key deleteKey = new Snapshot.Key(prepareDelete()); + + // Act + snapshot.putIntoDeleteSet(deleteKey, delete); + + // Assert + assertThat(writeSet.size()).isEqualTo(0); + assertThat(deleteSet.size()).isEqualTo(1); + assertThat(deleteSet.get(deleteKey)).isEqualTo(delete); + } + + @Test + public void + putIntoDeleteSet_DeleteGivenAfterPutWithInsertModeEnabled_ShouldThrowIllegalArgumentException() { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Delete delete = prepareDelete(); + Snapshot.Key key = new Snapshot.Key(delete); + + Put putWithInsertModeEnabled = Put.newBuilder(preparePut()).enableInsertMode().build(); + snapshot.putIntoWriteSet(key, putWithInsertModeEnabled); + + // Act Assert + assertThatThrownBy(() -> snapshot.putIntoDeleteSet(key, delete)) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void putIntoScanSet_ScanGiven_ShouldHoldWhatsGivenInScanSet() { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Scan scan = prepareScan(); @@ -376,33 +500,64 @@ public void put_ScanGiven_ShouldHoldWhatsGivenInScanSet() { Map expected = Collections.singletonMap(key, result); // Act - snapshot.put(scan, expected); + snapshot.putIntoScanSet(scan, expected); // Assert assertThat(scanSet.get(scan)).isEqualTo(expected); } @Test - public void mergeResult_KeyGivenContainedInWriteSet_ShouldReturnMergedResult() + public void getResult_KeyNeitherContainedInWriteSetNorReadSet_ShouldReturnEmpty() throws CrudException { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Key partitionKey = new Key(ANY_NAME_1, ANY_TEXT_1); - Key clusteringKey = new Key(ANY_NAME_2, ANY_TEXT_2); - Put put = - new Put(partitionKey, clusteringKey) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME) - .withValue(ANY_NAME_3, ANY_TEXT_5) - .withTextValue(ANY_NAME_4, null); + Snapshot.Key key = new Snapshot.Key(prepareGet()); + + // Act + Optional actual = snapshot.getResult(key); + + // Assert + assertThat(actual).isNotPresent(); + } + + @Test + public void getResult_KeyContainedInWriteSetButNotContainedInReadSet_ShouldReturnProperResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePut(); + Snapshot.Key key = new Snapshot.Key(prepareGet()); + snapshot.putIntoWriteSet(key, put); + + // Act + Optional actual = snapshot.getResult(key); + + // Assert + assertThat(actual).isPresent(); + + assertThat(actual.get().contains(ANY_NAME_1)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); + assertThat(actual.get().contains(ANY_NAME_2)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); + assertThat(actual.get().contains(ANY_NAME_3)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_3)).isEqualTo(ANY_TEXT_3); + assertThat(actual.get().contains(ANY_NAME_4)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_4)).isEqualTo(ANY_TEXT_4); + } + + @Test + public void getResult_KeyContainedInWriteSetAndReadSetGiven_ShouldReturnMergedResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePutForMergeTest(); Snapshot.Key key = new Snapshot.Key(prepareGet()); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); - snapshot.put(key, put); + snapshot.putIntoReadSet(key, Optional.of(result)); + snapshot.putIntoWriteSet(key, put); // Act - Optional actual = snapshot.mergeResult(key, Optional.of(result)); + Optional actual = snapshot.getResult(key); // Assert assertThat(actual).isPresent(); @@ -410,16 +565,18 @@ public void mergeResult_KeyGivenContainedInWriteSet_ShouldReturnMergedResult() } @Test - public void mergeResult_KeyGivenContainedInDeleteSet_ShouldReturnEmpty() throws CrudException { + public void getResult_KeyContainedInDeleteSetAndReadSetGiven_ShouldReturnEmpty() + throws CrudException { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Delete delete = prepareDelete(); Snapshot.Key key = new Snapshot.Key(delete); - snapshot.put(key, delete); TransactionResult result = prepareResult(ANY_ID); + snapshot.putIntoReadSet(key, Optional.of(result)); + snapshot.putIntoDeleteSet(key, delete); // Act - Optional actual = snapshot.mergeResult(key, Optional.of(result)); + Optional actual = snapshot.getResult(key); // Assert assertThat(actual).isNotPresent(); @@ -427,44 +584,314 @@ public void mergeResult_KeyGivenContainedInDeleteSet_ShouldReturnEmpty() throws @Test public void - mergeResult_KeyGivenNeitherContainedInDeleteSetNorWriteSet_ShouldReturnOriginalResult() + getResult_KeyNeitherContainedInDeleteSetNorWriteSetButContainedInAndReadSetGiven_ShouldReturnOriginalResult() throws CrudException { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Snapshot.Key key = new Snapshot.Key(prepareGet()); TransactionResult result = prepareResult(ANY_ID); + snapshot.putIntoReadSet(key, Optional.of(result)); // Act - Optional actual = snapshot.mergeResult(key, Optional.of(result)); + Optional actual = snapshot.getResult(key); // Assert assertThat(actual).isEqualTo(Optional.of(result)); } + @Test + public void getResult_KeyContainedInWriteSetAndGetNotContainedInGetSet_ShouldReturnEmpty() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Get get = prepareGet(); + Snapshot.Key key = new Snapshot.Key(get); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isNotPresent(); + } + + @Test + public void getResult_KeyContainedInWriteSetAndGetNotContainedInGetSet_ShouldReturnProperResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePut(); + Get get = prepareGet(); + Snapshot.Key key = new Snapshot.Key(get); + snapshot.putIntoWriteSet(key, put); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isPresent(); + + assertThat(actual.get().contains(ANY_NAME_1)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_1)).isEqualTo(ANY_TEXT_1); + assertThat(actual.get().contains(ANY_NAME_2)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_2)).isEqualTo(ANY_TEXT_2); + assertThat(actual.get().contains(ANY_NAME_3)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_3)).isEqualTo(ANY_TEXT_3); + assertThat(actual.get().contains(ANY_NAME_4)).isTrue(); + assertThat(actual.get().getText(ANY_NAME_4)).isEqualTo(ANY_TEXT_4); + } + @Test public void - mergeResult_MatchedConjunctionAndKeyContainedInWriteSetGiven_ShouldReturnMergedResult() + getResult_KeyContainedInWriteSetAndGetContainedInGetSetGiven_ShouldReturnMergedResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePutForMergeTest(); + Get get = prepareGet(); + Snapshot.Key key = new Snapshot.Key(get); + TransactionResult result = prepareResult(ANY_ID); + snapshot.putIntoGetSet(get, Optional.of(result)); + snapshot.putIntoWriteSet(key, put); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isPresent(); + assertMergedResultIsEqualTo(actual.get()); + } + + @Test + public void getResult_KeyContainedInDeleteSetAndGetContainedInGetSetGiven_ShouldReturnEmpty() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Delete delete = prepareDelete(); + Get get = prepareGet(); + Snapshot.Key key = new Snapshot.Key(get); + TransactionResult result = prepareResult(ANY_ID); + snapshot.putIntoGetSet(get, Optional.of(result)); + snapshot.putIntoDeleteSet(key, delete); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isNotPresent(); + } + + @Test + public void + getResult_KeyNeitherContainedInDeleteSetNorWriteSetAndGetContainedInGetSetGiven_ShouldReturnOriginalResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Get get = prepareGet(); + Snapshot.Key key = new Snapshot.Key(get); + TransactionResult result = prepareResult(ANY_ID); + snapshot.putIntoGetSet(get, Optional.of(result)); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isEqualTo(Optional.of(result)); + } + + @Test + public void + getResult_KeyContainedInWriteSetAndGetContainedInGetSetWithMatchedConjunctionGiven_ShouldReturnMergedResult() throws CrudException { // Arrange snapshot = prepareSnapshot(Isolation.SNAPSHOT); Put put = preparePutForMergeTest(); ConditionalExpression condition = ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_5); - Set conjunctions = ImmutableSet.of(Conjunction.of(condition)); Get get = Get.newBuilder(prepareGet()).where(condition).build(); Snapshot.Key key = new Snapshot.Key(get); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); - snapshot.put(key, put); + snapshot.putIntoGetSet(get, Optional.of(result)); + snapshot.putIntoWriteSet(key, put); // Act - Optional actual = - snapshot.mergeResult(key, Optional.of(result), conjunctions); + Optional actual = snapshot.getResult(key, get); // Assert assertThat(actual).isPresent(); assertMergedResultIsEqualTo(actual.get()); } + @Test + public void + getResult_KeyNeitherContainedInDeleteSetNorWriteSetAndGetContainedInGetSetWithUnmatchedConjunction_ShouldReturnOriginalResult() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Snapshot.Key key = new Snapshot.Key(prepareGet()); + TransactionResult result = prepareResult(ANY_ID); + ConditionalExpression condition = ConditionBuilder.column(ANY_NAME_1).isEqualToText(ANY_TEXT_2); + Get get = Get.newBuilder(prepareGet()).where(condition).build(); + snapshot.putIntoGetSet(get, Optional.of(result)); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isEqualTo(Optional.of(result)); + } + + @Test + public void + getResult_KeyContainedInWriteSetAndGetContainedInGetSetWithUnmatchedConjunctionGiven_ShouldReturnEmpty() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePutForMergeTest(); + ConditionalExpression condition = ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_3); + Get get = Get.newBuilder(prepareGet()).where(condition).build(); + Snapshot.Key key = new Snapshot.Key(get); + TransactionResult result = prepareResult(ANY_ID); + snapshot.putIntoGetSet(get, Optional.of(result)); + snapshot.putIntoWriteSet(key, put); + + // Act + Optional actual = snapshot.getResult(key, get); + + // Assert + assertThat(actual).isEmpty(); + } + + @Test + public void getResults_ScanNotContainedInScanSetGiven_ShouldReturnEmpty() throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Scan scan = prepareScan(); + + // Act + Optional> results = snapshot.getResults(scan); + + // Assert + assertThat(results.isPresent()).isFalse(); + } + + @Test + public void getResults_ScanContainedInScanSetGiven_ShouldReturnProperResults() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Scan scan = prepareScan(); + + TransactionResult result1 = mock(TransactionResult.class); + TransactionResult result2 = mock(TransactionResult.class); + TransactionResult result3 = mock(TransactionResult.class); + Snapshot.Key key1 = mock(Snapshot.Key.class); + Snapshot.Key key2 = mock(Snapshot.Key.class); + Snapshot.Key key3 = mock(Snapshot.Key.class); + scanSet.put(scan, ImmutableMap.of(key1, result1, key2, result2, key3, result3)); + + // Act + Optional> results = snapshot.getResults(scan); + + // Assert + assertThat(results).isPresent(); + + Iterator> entryIterator = + results.get().entrySet().iterator(); + + Map.Entry entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key1); + assertThat(entry.getValue()).isEqualTo(result1); + + entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key2); + assertThat(entry.getValue()).isEqualTo(result2); + + entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key3); + assertThat(entry.getValue()).isEqualTo(result3); + + assertThat(entryIterator.hasNext()).isFalse(); + } + + @Test + public void getResults_ScanContainedInScanSetGivenAndPutInWriteSet_ShouldReturnProperResults() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Put put = preparePutForMergeTest(); + Scan scan = prepareScan(); + + TransactionResult result1 = prepareResult(ANY_ID); + TransactionResult result2 = mock(TransactionResult.class); + TransactionResult result3 = mock(TransactionResult.class); + Snapshot.Key key1 = new Snapshot.Key(put); + Snapshot.Key key2 = mock(Snapshot.Key.class); + Snapshot.Key key3 = mock(Snapshot.Key.class); + scanSet.put(scan, ImmutableMap.of(key1, result1, key2, result2, key3, result3)); + + snapshot.putIntoWriteSet(key1, put); + + // Act + Optional> results = snapshot.getResults(scan); + + // Assert + assertThat(results).isPresent(); + + Iterator> entryIterator = + results.get().entrySet().iterator(); + + Map.Entry entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key1); + assertMergedResultIsEqualTo(entry.getValue()); + + entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key2); + assertThat(entry.getValue()).isEqualTo(result2); + + entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key3); + assertThat(entry.getValue()).isEqualTo(result3); + + assertThat(entryIterator.hasNext()).isFalse(); + } + + @Test + public void getResults_ScanContainedInScanSetGivenAndDeleteInDeleteSet_ShouldReturnProperResults() + throws CrudException { + // Arrange + snapshot = prepareSnapshot(Isolation.SNAPSHOT); + Delete delete = prepareDelete(); + Scan scan = prepareScan(); + + TransactionResult result1 = prepareResult(ANY_ID); + TransactionResult result2 = mock(TransactionResult.class); + TransactionResult result3 = mock(TransactionResult.class); + Snapshot.Key key1 = new Snapshot.Key(delete); + Snapshot.Key key2 = mock(Snapshot.Key.class); + Snapshot.Key key3 = mock(Snapshot.Key.class); + scanSet.put(scan, ImmutableMap.of(key1, result1, key2, result2, key3, result3)); + + snapshot.putIntoDeleteSet(key1, delete); + + // Act + Optional> results = snapshot.getResults(scan); + + // Assert + assertThat(results).isPresent(); + + Iterator> entryIterator = + results.get().entrySet().iterator(); + + Map.Entry entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key2); + assertThat(entry.getValue()).isEqualTo(result2); + + entry = entryIterator.next(); + assertThat(entry.getKey()).isEqualTo(key3); + assertThat(entry.getValue()).isEqualTo(result3); + + assertThat(entryIterator.hasNext()).isFalse(); + } + private void assertMergedResultIsEqualTo(TransactionResult result) { assertThat(result.getValues()) .isEqualTo( @@ -533,60 +960,6 @@ private void assertMergedResultIsEqualTo(TransactionResult result) { assertThat(result.getAsObject(Attribute.VERSION)).isEqualTo(ANY_VERSION); } - @Test - public void - mergeResult_UnmatchedConjunctionAndKeyNeitherContainedInDeleteSetNorWriteSet_ShouldReturnOriginalResult() - throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Snapshot.Key key = new Snapshot.Key(prepareGet()); - TransactionResult result = prepareResult(ANY_ID); - ConditionalExpression condition = ConditionBuilder.column(ANY_NAME_1).isEqualToText(ANY_TEXT_2); - Set conjunctions = ImmutableSet.of(Conjunction.of(condition)); - - // Act - Optional actual = - snapshot.mergeResult(key, Optional.of(result), conjunctions); - - // Assert - assertThat(actual).isEqualTo(Optional.of(result)); - } - - @Test - public void mergeResult_UnmatchedConjunctionAndKeyContainedInWriteSetGiven_ShouldReturnEmpty() - throws CrudException { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Put put = preparePutForMergeTest(); - ConditionalExpression condition = ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_3); - Set conjunctions = ImmutableSet.of(Conjunction.of(condition)); - Get get = Get.newBuilder(prepareGet()).where(condition).build(); - Snapshot.Key key = new Snapshot.Key(get); - TransactionResult result = prepareResult(ANY_ID); - snapshot.put(key, Optional.of(result)); - snapshot.put(key, put); - - // Act - Optional actual = - snapshot.mergeResult(key, Optional.of(result), conjunctions); - - // Assert - assertThat(actual).isEmpty(); - } - - @Test - public void get_ScanNotContainedInSnapshotGiven_ShouldReturnEmptyList() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Scan scan = prepareScan(); - - // Act - Optional> results = snapshot.get(scan); - - // Assert - assertThat(results.isPresent()).isFalse(); - } - @Test public void to_PrepareMutationComposerGivenAndSnapshotIsolationSet_ShouldCallComposerProperly() throws PreparationConflictException, ExecutionException { @@ -595,10 +968,10 @@ public void to_PrepareMutationComposerGivenAndSnapshotIsolationSet_ShouldCallCom Put put = preparePut(); Delete delete = prepareAnotherDelete(); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(prepareGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); - snapshot.put(new Snapshot.Key(delete), delete); + snapshot.putIntoReadSet(new Snapshot.Key(prepareGet()), Optional.of(result)); + snapshot.putIntoReadSet(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); configureBehavior(); // Act @@ -617,9 +990,9 @@ public void to_PrepareMutationComposerGivenAndSnapshotIsolationSet_ShouldCallCom snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_WRITE); Put put = preparePut(); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(prepareGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(prepareGet()), Optional.of(result)); + snapshot.putIntoReadSet(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); configureBehavior(); // Act @@ -640,10 +1013,10 @@ public void to_CommitMutationComposerGiven_ShouldCallComposerProperly() Put put = preparePut(); Delete delete = prepareAnotherDelete(); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(prepareGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); - snapshot.put(new Snapshot.Key(delete), delete); + snapshot.putIntoReadSet(new Snapshot.Key(prepareGet()), Optional.of(result)); + snapshot.putIntoReadSet(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); // Act snapshot.to(commitComposer); @@ -662,10 +1035,10 @@ public void to_CommitMutationComposerGiven_ShouldCallComposerProperly() Put put = preparePut(); Delete delete = prepareAnotherDelete(); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(prepareGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); - snapshot.put(new Snapshot.Key(delete), delete); + snapshot.putIntoReadSet(new Snapshot.Key(prepareGet()), Optional.of(result)); + snapshot.putIntoReadSet(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); configureBehavior(); // Act @@ -686,10 +1059,10 @@ public void to_RollbackMutationComposerGiven_ShouldCallComposerProperly() Put put = preparePut(); Delete delete = prepareAnotherDelete(); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(prepareGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); - snapshot.put(new Snapshot.Key(delete), delete); + snapshot.putIntoReadSet(new Snapshot.Key(prepareGet()), Optional.of(result)); + snapshot.putIntoReadSet(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); configureBehavior(); // Act @@ -709,10 +1082,10 @@ public void to_RollbackMutationComposerGiven_ShouldCallComposerProperly() Put put = preparePut(); Delete delete = prepareAnotherDelete(); TransactionResult result = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(prepareGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); - snapshot.put(new Snapshot.Key(delete), delete); + snapshot.putIntoReadSet(new Snapshot.Key(prepareGet()), Optional.of(result)); + snapshot.putIntoReadSet(new Snapshot.Key(prepareAnotherGet()), Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); + snapshot.putIntoDeleteSet(new Snapshot.Key(delete), delete); configureBehavior(); // Act @@ -734,9 +1107,9 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea Put put = preparePut(); TransactionResult result = prepareResult(ANY_ID); TransactionResult txResult = new TransactionResult(result); - snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); - snapshot.put(get, Optional.of(txResult)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.putIntoGetSet(get, Optional.of(txResult)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); // Act Assert assertThatCode(() -> snapshot.toSerializableWithExtraWrite(prepareComposer)) @@ -761,9 +1134,9 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_WRITE); Get get = prepareAnotherGet(); Put put = preparePut(); - snapshot.put(new Snapshot.Key(get), Optional.empty()); - snapshot.put(get, Optional.empty()); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.empty()); + snapshot.putIntoGetSet(get, Optional.empty()); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.toSerializableWithExtraWrite(prepareComposer)); @@ -787,9 +1160,9 @@ public void toSerializableWithExtraWrite_UnmutatedReadSetExists_ShouldConvertRea TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); Put put = preparePut(); - snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonMap(key, txResult)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(key, Optional.of(txResult)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, txResult)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.toSerializableWithExtraWrite(prepareComposer)); @@ -807,9 +1180,9 @@ public void toSerializableWithExtraRead_ReadSetNotChanged_ShouldProcessWithoutEx Put put = preparePut(); TransactionResult result = prepareResult(ANY_ID); TransactionResult txResult = new TransactionResult(result); - snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); - snapshot.put(get, Optional.of(txResult)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.putIntoGetSet(get, Optional.of(txResult)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = prepareAnotherGet().withProjection(Attribute.ID).withProjection(Attribute.VERSION); @@ -830,9 +1203,9 @@ public void toSerializableWithExtraRead_ReadSetUpdated_ShouldThrowValidationConf Get get = prepareAnotherGet(); Put put = preparePut(); TransactionResult txResult = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(get), Optional.of(txResult)); - snapshot.put(get, Optional.of(txResult)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.of(txResult)); + snapshot.putIntoGetSet(get, Optional.of(txResult)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult changedTxResult = prepareResult(ANY_ID + "x"); Get getWithProjections = @@ -854,9 +1227,9 @@ public void toSerializableWithExtraRead_ReadSetExtended_ShouldThrowValidationCon snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Get get = prepareAnotherGet(); Put put = preparePut(); - snapshot.put(new Snapshot.Key(get), Optional.empty()); - snapshot.put(get, Optional.empty()); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.empty()); + snapshot.putIntoGetSet(get, Optional.empty()); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult txResult = prepareResult(ANY_ID); Get getWithProjections = @@ -880,9 +1253,9 @@ public void toSerializableWithExtraRead_ScanSetNotChanged_ShouldProcessWithoutEx Put put = preparePut(); TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); - snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonMap(key, txResult)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(key, Optional.of(txResult)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, txResult)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Scanner scanner = mock(Scanner.class); when(scanner.iterator()).thenReturn(Collections.singletonList((Result) txResult).iterator()); @@ -908,9 +1281,9 @@ public void toSerializableWithExtraRead_ScanSetUpdated_ShouldThrowValidationConf Put put = preparePut(); TransactionResult txResult = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, txResult); - snapshot.put(key, Optional.of(txResult)); - snapshot.put(scan, Collections.singletonMap(key, txResult)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(key, Optional.of(txResult)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, txResult)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult changedTxResult = prepareResult(ANY_ID + "x"); Scanner scanner = mock(Scanner.class); @@ -938,8 +1311,8 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Scan scan = prepareScan(); Put put = preparePut(); TransactionResult result = prepareResult(ANY_ID + "x"); - snapshot.put(scan, Collections.emptyMap()); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); TransactionResult txResult = new TransactionResult(result); Scanner scanner = mock(Scanner.class); @@ -1009,10 +1382,10 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Snapshot.Key key1 = new Snapshot.Key(scan1, result1); Snapshot.Key key2 = new Snapshot.Key(scan2, result2); - snapshot.put(scan1, Collections.singletonMap(key1, new TransactionResult(result1))); - snapshot.put(scan2, Collections.singletonMap(key2, new TransactionResult(result2))); - snapshot.put(key1, Optional.of(new TransactionResult(result1))); - snapshot.put(key2, Optional.of(new TransactionResult(result2))); + snapshot.putIntoScanSet(scan1, Collections.singletonMap(key1, new TransactionResult(result1))); + snapshot.putIntoScanSet(scan2, Collections.singletonMap(key2, new TransactionResult(result2))); + snapshot.putIntoReadSet(key1, Optional.of(new TransactionResult(result1))); + snapshot.putIntoReadSet(key2, Optional.of(new TransactionResult(result2))); DistributedStorage storage = mock(DistributedStorage.class); @@ -1054,9 +1427,9 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Put put = preparePut(); TransactionResult result = prepareResultWithNullMetadata(); TransactionResult txResult = new TransactionResult(result); - snapshot.put(new Snapshot.Key(get), Optional.of(result)); - snapshot.put(get, Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.of(result)); + snapshot.putIntoGetSet(get, Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = Get.newBuilder(get).projections(Attribute.ID, Attribute.VERSION).build(); @@ -1079,9 +1452,9 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Put put = preparePut(); TransactionResult result = prepareResultWithNullMetadata(); TransactionResult changedResult = prepareResult(ANY_ID); - snapshot.put(new Snapshot.Key(get), Optional.of(result)); - snapshot.put(get, Optional.of(result)); - snapshot.put(new Snapshot.Key(put), put); + snapshot.putIntoReadSet(new Snapshot.Key(get), Optional.of(result)); + snapshot.putIntoGetSet(get, Optional.of(result)); + snapshot.putIntoWriteSet(new Snapshot.Key(put), put); DistributedStorage storage = mock(DistributedStorage.class); Get getWithProjections = Get.newBuilder(get).projections(Attribute.ID, Attribute.VERSION).build(); @@ -1104,8 +1477,8 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon Scan scan = prepareScan(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(key, Optional.of(result)); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoReadSet(key, Optional.of(result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); DistributedStorage storage = mock(DistributedStorage.class); Scan scanWithProjections = Scan.newBuilder(scan) @@ -1123,83 +1496,6 @@ public void toSerializableWithExtraRead_ScanSetExtended_ShouldThrowValidationCon verify(storage).scan(scanWithProjections); } - @Test - public void put_DeleteGivenAfterPut_PutSupercedesDelete() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Put put = preparePut(); - Snapshot.Key putKey = new Snapshot.Key(preparePut()); - snapshot.put(putKey, put); - - Delete delete = prepareDelete(); - Snapshot.Key deleteKey = new Snapshot.Key(prepareDelete()); - - // Act - snapshot.put(deleteKey, delete); - - // Assert - assertThat(writeSet.size()).isEqualTo(0); - assertThat(deleteSet.size()).isEqualTo(1); - assertThat(deleteSet.get(deleteKey)).isEqualTo(delete); - } - - @Test - public void put_PutGivenAfterDelete_ShouldThrowIllegalArgumentException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Delete delete = prepareDelete(); - Snapshot.Key deleteKey = new Snapshot.Key(prepareDelete()); - snapshot.put(deleteKey, delete); - - Put put = preparePut(); - Snapshot.Key putKey = new Snapshot.Key(preparePut()); - - // Act Assert - assertThatThrownBy(() -> snapshot.put(putKey, put)) - .isInstanceOf(IllegalArgumentException.class); - } - - @Test - public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - // "text2" - Put put = preparePut(); - Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); - Scan scan = - new Scan(new Key(ANY_NAME_1, ANY_TEXT_1)) - // ["text3", "text4"] - .withStart(new Key(ANY_NAME_2, ANY_TEXT_3), true) - .withEnd(new Key(ANY_NAME_2, ANY_TEXT_4), true) - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME) - .forTable(ANY_TABLE_NAME); - - // Act Assert - Throwable thrown = catchThrowable(() -> snapshot.get(scan)); - - // Assert - assertThat(thrown).doesNotThrowAnyException(); - } - - @Test - public void - get_ScanGivenAndPutWithSamePartitionKeyWithoutClusteringKeyInWriteSet_ShouldNotThrowException() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Put put = preparePutWithPartitionKeyOnly(); - Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); - Scan scan = prepareScan(); - - // Act Assert - Throwable thrown = catchThrowable(() -> snapshot.get(scan)); - - // Assert - assertThat(thrown).doesNotThrowAnyException(); - } - @Test public void verify_ScanGivenAndPutKeyAlreadyPresentInScanSet_ShouldThrowIllegalArgumentException() { @@ -1207,12 +1503,12 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc snapshot = prepareSnapshot(Isolation.SNAPSHOT); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = prepareScan(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(key, Optional.of(result)); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoReadSet(key, Optional.of(result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1228,9 +1524,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc snapshot = prepareSnapshot(Isolation.SNAPSHOT); Put put = preparePutWithPartitionKeyOnly(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = prepareScan(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1247,14 +1543,14 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // "text2" Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = new Scan(new Key(ANY_NAME_1, ANY_TEXT_1)) // (-infinite, infinite) .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1270,7 +1566,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc snapshot = prepareSnapshot(Isolation.SNAPSHOT); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder() .namespace(ANY_NAMESPACE_NAME) @@ -1279,7 +1575,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .consistency(Consistency.LINEARIZABLE) .where(ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_4)) .build(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1296,7 +1592,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // "text2" Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan1 = prepareScan() // ["text1", "text3"] @@ -1322,11 +1618,11 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // ["text1", "text2") .withStart(new Key(ANY_NAME_2, ANY_TEXT_1), true) .withEnd(new Key(ANY_NAME_2, ANY_TEXT_2), false); - snapshot.put(scan1, Collections.emptyMap()); - snapshot.put(scan2, Collections.emptyMap()); - snapshot.put(scan3, Collections.emptyMap()); - snapshot.put(scan4, Collections.emptyMap()); - snapshot.put(scan5, Collections.emptyMap()); + snapshot.putIntoScanSet(scan1, Collections.emptyMap()); + snapshot.putIntoScanSet(scan2, Collections.emptyMap()); + snapshot.putIntoScanSet(scan3, Collections.emptyMap()); + snapshot.putIntoScanSet(scan4, Collections.emptyMap()); + snapshot.putIntoScanSet(scan5, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1351,7 +1647,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // "text2" Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan1 = new Scan(new Key(ANY_NAME_1, ANY_TEXT_1)) // (-infinite, "text3"] @@ -1373,9 +1669,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); - snapshot.put(scan1, Collections.emptyMap()); - snapshot.put(scan2, Collections.emptyMap()); - snapshot.put(scan3, Collections.emptyMap()); + snapshot.putIntoScanSet(scan1, Collections.emptyMap()); + snapshot.putIntoScanSet(scan2, Collections.emptyMap()); + snapshot.putIntoScanSet(scan3, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1396,7 +1692,7 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc // "text2" Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan1 = new Scan(new Key(ANY_NAME_1, ANY_TEXT_1)) // ["text1", infinite) @@ -1418,9 +1714,9 @@ public void get_ScanGivenAndPutInWriteSetNotOverlappedWithScan_ShouldNotThrowExc .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); - snapshot.put(scan1, Collections.emptyMap()); - snapshot.put(scan2, Collections.emptyMap()); - snapshot.put(scan3, Collections.emptyMap()); + snapshot.putIntoScanSet(scan1, Collections.emptyMap()); + snapshot.putIntoScanSet(scan2, Collections.emptyMap()); + snapshot.putIntoScanSet(scan3, Collections.emptyMap()); // Act Assert Throwable thrown1 = catchThrowable(() -> snapshot.verify(scan1)); @@ -1439,7 +1735,7 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInSameTable_ShouldThrowExce snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder() .namespace(ANY_NAMESPACE_NAME) @@ -1448,7 +1744,7 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInSameTable_ShouldThrowExce .build(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1469,7 +1765,7 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInDifferentTable_ShouldNotT .clusteringKey(Key.ofText(ANY_NAME_2, ANY_TEXT_2)) .build(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder() .namespace(ANY_NAMESPACE_NAME) @@ -1478,7 +1774,7 @@ public void verify_ScanWithIndexGivenAndPutInWriteSetInDifferentTable_ShouldNotT .build(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1509,8 +1805,8 @@ public void verify_ScanWithIndexAndPutWithSameIndexKeyGiven_ShouldThrowException .build(); Snapshot.Key putKey1 = new Snapshot.Key(put1); Snapshot.Key putKey2 = new Snapshot.Key(put2); - snapshot.put(putKey1, put1); - snapshot.put(putKey2, put2); + snapshot.putIntoWriteSet(putKey1, put1); + snapshot.putIntoWriteSet(putKey2, put2); Scan scan = Scan.newBuilder() .namespace(ANY_NAMESPACE_NAME) @@ -1519,7 +1815,7 @@ public void verify_ScanWithIndexAndPutWithSameIndexKeyGiven_ShouldThrowException .build(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1552,8 +1848,8 @@ public void verify_ScanWithIndexAndPutWithSameIndexKeyGiven_ShouldThrowException .build(); Snapshot.Key putKey1 = new Snapshot.Key(put1); Snapshot.Key putKey2 = new Snapshot.Key(put2); - snapshot.put(putKey1, put1); - snapshot.put(putKey2, put2); + snapshot.putIntoWriteSet(putKey1, put1); + snapshot.putIntoWriteSet(putKey2, put2); Scan scan = Scan.newBuilder() .namespace(ANY_NAMESPACE_NAME) @@ -1563,7 +1859,7 @@ public void verify_ScanWithIndexAndPutWithSameIndexKeyGiven_ShouldThrowException .build(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1579,7 +1875,7 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( // "text2" Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); ScanAll scanAll = new ScanAll() .withConsistency(Consistency.LINEARIZABLE) @@ -1587,7 +1883,7 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( .forTable(ANY_TABLE_NAME); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scanAll, result); - snapshot.put(scanAll, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scanAll, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scanAll)); @@ -1604,7 +1900,7 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( // "text2" Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); ScanAll scanAll = new ScanAll() .withConsistency(Consistency.LINEARIZABLE) @@ -1612,7 +1908,7 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( .forTable(ANY_TABLE_NAME_2); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scanAll, result); - snapshot.put(scanAll, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scanAll, Collections.singletonMap(key, result)); // Act Assert Throwable thrown = catchThrowable(() -> snapshot.verify(scanAll)); @@ -1621,60 +1917,17 @@ public void verify_ScanAllGivenAndPutInWriteSetInSameTable_ShouldThrowException( assertThat(thrown).doesNotThrowAnyException(); } - @Test - public void get_GetGivenAndAlreadyPresentInGetSet_ShouldReturnResult() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - Get get = prepareGet(); - TransactionResult expected = prepareResult(ANY_ID); - snapshot.put(get, Optional.of(expected)); - - // Act - Optional actual = snapshot.get(get); - - // Assert - assertThat(actual).isPresent(); - assertThat(actual.get()).isEqualTo(expected); - } - - @Test - public void get_ScanAllGivenAndAlreadyPresentInScanSet_ShouldReturnKeys() { - // Arrange - snapshot = prepareSnapshot(Isolation.SNAPSHOT); - // "text2" - Put put = preparePut(); - Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); - - ScanAll scanAll = - new ScanAll() - .withConsistency(Consistency.LINEARIZABLE) - .forNamespace(ANY_NAMESPACE_NAME_2) - .forTable(ANY_TABLE_NAME_2); - TransactionResult result = prepareResult(ANY_ID); - Snapshot.Key key = new Snapshot.Key(scanAll, result); - snapshot.put(scanAll, Collections.singletonMap(key, result)); - - // Act Assert - Optional> results = snapshot.get(scanAll); - - // Assert - assertThat(results).isNotEmpty(); - assertThat(results.get()).containsKey(key); - assertThat(results.get().get(key)).isEqualTo(result); - } - @Test public void verify_CrossPartitionScanGivenAndPutInSameTable_ShouldThrowException() { // Arrange snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = prepareCrossPartitionScan(); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1689,11 +1942,11 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentNamespace_ShouldNotTh snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = prepareCrossPartitionScan(ANY_NAMESPACE_NAME_2, ANY_TABLE_NAME); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1708,11 +1961,11 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = prepareCrossPartitionScan(ANY_NAMESPACE_NAME, ANY_TABLE_NAME_2); TransactionResult result = prepareResult(ANY_ID); Snapshot.Key key = new Snapshot.Key(scan, result); - snapshot.put(scan, Collections.singletonMap(key, result)); + snapshot.putIntoScanSet(scan, Collections.singletonMap(key, result)); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1728,7 +1981,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePutWithIntColumns(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder(prepareCrossPartitionScan()) .clearConditions() @@ -1746,7 +1999,7 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE ConditionBuilder.column(ANY_NAME_8).isNullInt())) .build()) .build(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1762,14 +2015,14 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder(prepareCrossPartitionScan()) .clearConditions() .where(ConditionBuilder.column(ANY_NAME_3).isEqualToText(ANY_TEXT_1)) .or(ConditionBuilder.column(ANY_NAME_4).isEqualToText(ANY_TEXT_4)) .build(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1785,14 +2038,14 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder(prepareCrossPartitionScan()) .clearConditions() .where(ConditionBuilder.column(ANY_NAME_3).isLikeText("text%")) .and(ConditionBuilder.column(ANY_NAME_4).isNotLikeText("text")) .build(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1808,14 +2061,14 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePut(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder(prepareCrossPartitionScan()) .clearConditions() .where(ConditionBuilder.column(ANY_NAME_4).isEqualToText(ANY_TEXT_1)) .or(ConditionBuilder.column(ANY_NAME_5).isEqualToText(ANY_TEXT_1)) .build(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1831,9 +2084,9 @@ public void verify_CrossPartitionScanGivenAndPutInDifferentTable_ShouldNotThrowE snapshot = prepareSnapshot(Isolation.SERIALIZABLE, SerializableStrategy.EXTRA_READ); Put put = preparePutWithIntColumns(); Snapshot.Key putKey = new Snapshot.Key(put); - snapshot.put(putKey, put); + snapshot.putIntoWriteSet(putKey, put); Scan scan = Scan.newBuilder(prepareCrossPartitionScan()).clearConditions().build(); - snapshot.put(scan, Collections.emptyMap()); + snapshot.putIntoScanSet(scan, Collections.emptyMap()); // Act Throwable thrown = catchThrowable(() -> snapshot.verify(scan)); @@ -1850,18 +2103,18 @@ void getReadWriteSet_ReadSetAndWriteSetGiven_ShouldReturnProperValue() { Get get1 = prepareGet(); TransactionResult result1 = prepareResult("t1"); Snapshot.Key readKey1 = new Snapshot.Key(get1); - snapshot.put(readKey1, Optional.of(result1)); + snapshot.putIntoReadSet(readKey1, Optional.of(result1)); Get get2 = prepareAnotherGet(); TransactionResult result2 = prepareResult("t2"); Snapshot.Key readKey2 = new Snapshot.Key(get2); - snapshot.put(readKey2, Optional.of(result2)); + snapshot.putIntoReadSet(readKey2, Optional.of(result2)); Put put1 = preparePut(); Snapshot.Key putKey1 = new Snapshot.Key(put1); - snapshot.put(putKey1, put1); + snapshot.putIntoWriteSet(putKey1, put1); Put put2 = prepareAnotherPut(); Snapshot.Key putKey2 = new Snapshot.Key(put2); - snapshot.put(putKey2, put2); + snapshot.putIntoWriteSet(putKey2, put2); // Act ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); @@ -1873,7 +2126,7 @@ void getReadWriteSet_ReadSetAndWriteSetGiven_ShouldReturnProperValue() { .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); TransactionResult delayedResult = prepareResult("t3"); - snapshot.put(new Snapshot.Key(delayedGet), Optional.of(delayedResult)); + snapshot.putIntoReadSet(new Snapshot.Key(delayedGet), Optional.of(delayedResult)); Put delayedPut = new Put(new Key(ANY_NAME_1, ANY_TEXT_2), new Key(ANY_NAME_2, ANY_TEXT_1)) @@ -1881,7 +2134,7 @@ void getReadWriteSet_ReadSetAndWriteSetGiven_ShouldReturnProperValue() { .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME) .withValue(ANY_NAME_3, ANY_TEXT_3); - snapshot.put(new Snapshot.Key(delayedPut), delayedPut); + snapshot.putIntoWriteSet(new Snapshot.Key(delayedPut), delayedPut); } // Assert @@ -1917,18 +2170,18 @@ void getReadWriteSet_ReadSetAndDeleteSetGiven_ShouldReturnProperValue() { Get get1 = prepareGet(); TransactionResult result1 = prepareResult("t1"); Snapshot.Key readKey1 = new Snapshot.Key(get1); - snapshot.put(readKey1, Optional.of(result1)); + snapshot.putIntoReadSet(readKey1, Optional.of(result1)); Get get2 = prepareAnotherGet(); TransactionResult result2 = prepareResult("t2"); Snapshot.Key readKey2 = new Snapshot.Key(get2); - snapshot.put(readKey2, Optional.of(result2)); + snapshot.putIntoReadSet(readKey2, Optional.of(result2)); Delete delete1 = prepareDelete(); Snapshot.Key deleteKey1 = new Snapshot.Key(delete1); - snapshot.put(deleteKey1, delete1); + snapshot.putIntoDeleteSet(deleteKey1, delete1); Delete delete2 = prepareAnotherDelete(); Snapshot.Key deleteKey2 = new Snapshot.Key(delete2); - snapshot.put(deleteKey2, delete2); + snapshot.putIntoDeleteSet(deleteKey2, delete2); // Act ReadWriteSets readWriteSets = snapshot.getReadWriteSets(); @@ -1940,14 +2193,14 @@ void getReadWriteSet_ReadSetAndDeleteSetGiven_ShouldReturnProperValue() { .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); TransactionResult delayedResult = prepareResult("t3"); - snapshot.put(new Snapshot.Key(delayedGet), Optional.of(delayedResult)); + snapshot.putIntoReadSet(new Snapshot.Key(delayedGet), Optional.of(delayedResult)); Delete delayedDelete = new Delete(new Key(ANY_NAME_1, ANY_TEXT_2), new Key(ANY_NAME_2, ANY_TEXT_1)) .withConsistency(Consistency.LINEARIZABLE) .forNamespace(ANY_NAMESPACE_NAME) .forTable(ANY_TABLE_NAME); - snapshot.put(new Snapshot.Key(delayedDelete), delayedDelete); + snapshot.putIntoDeleteSet(new Snapshot.Key(delayedDelete), delayedDelete); } // Assert diff --git a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java index 49b6d86ce0..bf5abaae05 100644 --- a/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java +++ b/integration-test/src/main/java/com/scalar/db/transaction/consensuscommit/ConsensusCommitIntegrationTestBase.java @@ -1,7 +1,21 @@ package com.scalar.db.transaction.consensuscommit; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.scalar.db.api.Delete; +import com.scalar.db.api.DistributedTransaction; import com.scalar.db.api.DistributedTransactionIntegrationTestBase; +import com.scalar.db.api.Insert; +import com.scalar.db.api.Result; +import com.scalar.db.api.Update; +import com.scalar.db.api.Upsert; +import com.scalar.db.exception.transaction.CommitConflictException; +import com.scalar.db.exception.transaction.TransactionException; +import com.scalar.db.io.Key; +import java.util.Optional; import java.util.Properties; +import org.junit.jupiter.api.Test; public abstract class ConsensusCommitIntegrationTestBase extends DistributedTransactionIntegrationTestBase { @@ -23,4 +37,896 @@ protected final Properties getProperties(String testName) { } protected abstract Properties getProps(String testName); + + @Test + public void + insertAndInsert_forSameRecord_whenRecordNotExists_shouldThrowIllegalArgumentExceptionOnSecondInsert() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + assertThatThrownBy( + () -> + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void + insertAndInsert_forSameRecord_whenRecordExists_shouldThrowIllegalArgumentExceptionOnSecondInsert() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + assertThatThrownBy( + () -> + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void insertAndUpsert_forSameRecord_whenRecordNotExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void + insertAndUpsert_forSameRecord_whenRecordExists_shouldThrowCommitConflictExceptionOnCommit() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void insertAndUpdate_forSameRecord_whenRecordNotExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void + insertAndUpdate_forSameRecord_whenRecordExists_shouldThrowCommitConflictExceptionOnCommit() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + assertThatThrownBy(transaction::commit).isInstanceOf(CommitConflictException.class); + } + + @Test + public void + insertAndDelete_forSameRecord_whenRecordNotExists_shouldThrowIllegalArgumentExceptionOnDelete() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + assertThatThrownBy( + () -> + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void + insertAndDelete_forSameRecord_whenRecordExists_shouldThrowIllegalArgumentExceptionOnDelete() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + assertThatThrownBy( + () -> + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void upsertAndInsert_forSameRecord_shouldThrowIllegalArgumentExceptionOnInsert() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + assertThatThrownBy( + () -> + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void upsertAndUpsert_forSameRecord_shouldWorkCorrectly() throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void upsertAndUpdate_forSameRecord_shouldWorkCorrectly() throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void upsertAndDelete_forSameRecord_shouldWorkCorrectly() throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isNotPresent(); + } + + @Test + public void updateAndInsert_forSameRecord_whenRecordNotExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void + updateAndInsert_forSameRecord_whenRecordExists_shouldThrowIllegalArgumentExceptionOnInsert() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + assertThatThrownBy( + () -> + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void updateAndUpsert_forSameRecord_whenRecordNotExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void updateAndUpsert_forSameRecord_whenRecordExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void updateAndUpdate_forSameRecord_whenRecordNotExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isNotPresent(); + } + + @Test + public void updateAndUpdate_forSameRecord_whenRecordExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + int expectedBalance = 100; + int expectedSomeColumn = 200; + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, expectedBalance) + .intValue(SOME_COLUMN, expectedSomeColumn) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isPresent(); + Result result = optResult.get(); + assertThat(result.getInt(ACCOUNT_ID)).isEqualTo(0); + assertThat(result.getInt(ACCOUNT_TYPE)).isEqualTo(0); + assertThat(result.getInt(BALANCE)).isEqualTo(expectedBalance); + assertThat(result.getInt(SOME_COLUMN)).isEqualTo(expectedSomeColumn); + } + + @Test + public void updateAndDelete_forSameRecord_whenRecordNotExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isNotPresent(); + } + + @Test + public void updateAndDelete_forSameRecord_whenRecordExists_shouldWorkCorrectly() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isNotPresent(); + } + + @Test + public void + deleteAndInsert_forSameRecord_whenRecordExists_shouldThrowIllegalArgumentExceptionOnInsert() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + assertThatThrownBy( + () -> + transaction.insert( + Insert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void + deleteAndUpsert_forSameRecord_whenRecordExists_shouldThrowIllegalArgumentExceptionOnUpsert() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act Assert + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + assertThatThrownBy( + () -> + transaction.upsert( + Upsert.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build())) + .isInstanceOf(IllegalArgumentException.class); + + transaction.rollback(); + } + + @Test + public void deleteAndUpdate_forSameRecord_whenRecordExists_shouldDoNothing() + throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + transaction.update( + Update.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .intValue(BALANCE, INITIAL_BALANCE) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isNotPresent(); + } + + @Test + public void deleteAndDelete_forSameRecord_shouldWorkCorrectly() throws TransactionException { + // Arrange + put(preparePut(0, 0)); + + Key partitionKey = Key.ofInt(ACCOUNT_ID, 0); + Key clusteringKey = Key.ofInt(ACCOUNT_TYPE, 0); + + DistributedTransaction transaction = manager.start(); + + // Act + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + transaction.delete( + Delete.newBuilder() + .namespace(namespace) + .table(TABLE) + .partitionKey(partitionKey) + .clusteringKey(clusteringKey) + .build()); + + transaction.commit(); + + // Assert + Optional optResult = get(prepareGet(0, 0)); + assertThat(optResult).isNotPresent(); + } }