Skip to content

Commit

Permalink
Rename SnapshotHandler to PreparedSnapshotHook
Browse files Browse the repository at this point in the history
  • Loading branch information
komamitsu committed Oct 9, 2024
1 parent cfe0790 commit 0379139
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.scalar.db.exception.transaction.ValidationException;
import com.scalar.db.transaction.consensuscommit.Coordinator.State;
import com.scalar.db.transaction.consensuscommit.ParallelExecutor.ParallelExecutorTask;
import com.scalar.db.transaction.consensuscommit.SnapshotHandler.SnapshotHandleFuture;
import com.scalar.db.transaction.consensuscommit.PreparedSnapshotHook.PreparedSnapshotHookFuture;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -37,7 +37,7 @@ public class CommitHandler {
private final TransactionTableMetadataManager tableMetadataManager;
private final ParallelExecutor parallelExecutor;

@LazyInit @Nullable private SnapshotHandler snapshotHandler;
@LazyInit @Nullable private PreparedSnapshotHook preparedSnapshotHook;

@SuppressFBWarnings("EI_EXPOSE_REP2")
public CommitHandler(
Expand All @@ -56,9 +56,9 @@ protected void onPrepareFailure(Snapshot snapshot) {}
protected void onValidateFailure(Snapshot snapshot) {}

public void commit(Snapshot snapshot) throws CommitException, UnknownTransactionStatusException {
Optional<SnapshotHandleFuture> snapshotHandleFuture;
Optional<PreparedSnapshotHookFuture> snapshotHookFuture;
try {
snapshotHandleFuture = prepare(snapshot);
snapshotHookFuture = prepare(snapshot);
} catch (PreparationException e) {
abortState(snapshot.getId());
rollbackRecords(snapshot);
Expand All @@ -85,7 +85,7 @@ public void commit(Snapshot snapshot) throws CommitException, UnknownTransaction
throw e;
}

snapshotHandleFuture.ifPresent(SnapshotHandleFuture::get);
snapshotHookFuture.ifPresent(PreparedSnapshotHookFuture::get);

commitState(snapshot);
commitRecords(snapshot);
Expand Down Expand Up @@ -118,7 +118,8 @@ protected void handleCommitConflict(Snapshot snapshot, Exception cause)
}
}

public Optional<SnapshotHandleFuture> prepare(Snapshot snapshot) throws PreparationException {
public Optional<PreparedSnapshotHookFuture> prepare(Snapshot snapshot)
throws PreparationException {
try {
return prepareRecords(snapshot);
} catch (NoMutationException e) {
Expand All @@ -135,14 +136,14 @@ public Optional<SnapshotHandleFuture> prepare(Snapshot snapshot) throws Preparat
}
}

private Optional<SnapshotHandleFuture> prepareRecords(Snapshot snapshot)
private Optional<PreparedSnapshotHookFuture> prepareRecords(Snapshot snapshot)
throws ExecutionException, PreparationConflictException {

Optional<SnapshotHandleFuture> snapshotHandleFuture;
if (snapshotHandler == null) {
snapshotHandleFuture = Optional.empty();
Optional<PreparedSnapshotHookFuture> snapshotHookFuture;
if (preparedSnapshotHook == null) {
snapshotHookFuture = Optional.empty();
} else {
snapshotHandleFuture = Optional.of(snapshotHandler.handle(tableMetadataManager, snapshot));
snapshotHookFuture = Optional.of(preparedSnapshotHook.handle(tableMetadataManager, snapshot));
}
PrepareMutationComposer composer =
new PrepareMutationComposer(snapshot.getId(), tableMetadataManager);
Expand All @@ -156,7 +157,7 @@ private Optional<SnapshotHandleFuture> prepareRecords(Snapshot snapshot)
}
parallelExecutor.prepare(tasks, snapshot.getId());

return snapshotHandleFuture;
return snapshotHookFuture;
}

public void validate(Snapshot snapshot) throws ValidationException {
Expand Down Expand Up @@ -253,13 +254,13 @@ public void rollbackRecords(Snapshot snapshot) {
}

/**
* Sets the {@link SnapshotHandler}. This method must be called immediately after the constructor
* is invoked.
* Sets the {@link PreparedSnapshotHook}. This method must be called immediately after the
* constructor is invoked.
*
* @param snapshotHandler The snapshot handler to set.
* @param preparedSnapshotHook The snapshot hook to set.
* @throws NullPointerException If the argument is null.
*/
public void setSnapshotHandler(SnapshotHandler snapshotHandler) {
this.snapshotHandler = checkNotNull(snapshotHandler);
public void setPreparedSnapshotHook(PreparedSnapshotHook preparedSnapshotHook) {
this.preparedSnapshotHook = checkNotNull(preparedSnapshotHook);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.scalar.db.transaction.consensuscommit;

public interface SnapshotHandler {
SnapshotHandleFuture handle(
public interface PreparedSnapshotHook {
PreparedSnapshotHookFuture handle(
TransactionTableMetadataManager transactionTableMetadataManager, Snapshot snapshot);

interface SnapshotHandleFuture {
interface PreparedSnapshotHookFuture {
void get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void prepare() throws PreparationException {
}

try {
// SnapshotHandler is not supported in two-phase commit interface.
// PreparedSnapshotHook is not supported in two-phase commit interface.
commit
.prepare(crud.getSnapshot())
.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.scalar.db.exception.transaction.UnknownTransactionStatusException;
import com.scalar.db.exception.transaction.ValidationConflictException;
import com.scalar.db.io.Key;
import com.scalar.db.transaction.consensuscommit.SnapshotHandler.SnapshotHandleFuture;
import com.scalar.db.transaction.consensuscommit.PreparedSnapshotHook.PreparedSnapshotHookFuture;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
Expand Down Expand Up @@ -57,8 +57,8 @@ public class CommitHandlerTest {
@Mock protected Coordinator coordinator;
@Mock protected TransactionTableMetadataManager tableMetadataManager;
@Mock protected ConsensusCommitConfig config;
@Mock protected SnapshotHandler snapshotHandler;
@Mock protected SnapshotHandleFuture snapshotHandleFuture;
@Mock protected PreparedSnapshotHook preparedSnapshotHook;
@Mock protected PreparedSnapshotHookFuture preparedSnapshotHookFuture;

private CommitHandler handler;
protected ParallelExecutor parallelExecutor;
Expand Down Expand Up @@ -153,53 +153,52 @@ private Snapshot prepareSnapshotWithSamePartitionPut() {
return snapshot;
}

private void setSnapshotHandlerIfNeeded(boolean withSnapshotHandler) {
if (withSnapshotHandler) {
doReturn(snapshotHandleFuture).when(snapshotHandler).handle(any(), any());
handler.setSnapshotHandler(snapshotHandler);
private void setPreparedSnapshotHookIfNeeded(boolean withSnapshotHook) {
if (withSnapshotHook) {
doReturn(preparedSnapshotHookFuture).when(preparedSnapshotHook).handle(any(), any());
handler.setPreparedSnapshotHook(preparedSnapshotHook);
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void commit_SnapshotWithDifferentPartitionPutsGiven_ShouldCommitRespectively(
boolean withSnapshotHandler)
boolean withSnapshotHook)
throws CommitException, UnknownTransactionStatusException, ExecutionException,
CoordinatorException {
// Arrange
Snapshot snapshot = prepareSnapshotWithDifferentPartitionPut();
doNothing().when(storage).mutate(anyList());
doNothingWhenCoordinatorPutState();
setSnapshotHandlerIfNeeded(withSnapshotHandler);
setPreparedSnapshotHookIfNeeded(withSnapshotHook);

// Act
handler.commit(snapshot);

// Assert
verify(storage, times(4)).mutate(anyList());
verifyCoordinatorPutState(TransactionState.COMMITTED);
verifySnapshotHandler(withSnapshotHandler, snapshot);
verifySnapshotHook(withSnapshotHook, snapshot);
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(
boolean withSnapshotHandler)
public void commit_SnapshotWithSamePartitionPutsGiven_ShouldCommitAtOnce(boolean withSnapshotHook)
throws CommitException, UnknownTransactionStatusException, ExecutionException,
CoordinatorException {
// Arrange
Snapshot snapshot = prepareSnapshotWithSamePartitionPut();
doNothing().when(storage).mutate(anyList());
doNothingWhenCoordinatorPutState();
setSnapshotHandlerIfNeeded(withSnapshotHandler);
setPreparedSnapshotHookIfNeeded(withSnapshotHook);

// Act
handler.commit(snapshot);

// Assert
verify(storage, times(2)).mutate(anyList());
verifyCoordinatorPutState(TransactionState.COMMITTED);
verifySnapshotHandler(withSnapshotHandler, snapshot);
verifySnapshotHook(withSnapshotHook, snapshot);
}

@Test
Expand Down Expand Up @@ -644,24 +643,24 @@ public void commit_ExceptionThrownInCoordinatorCommit_ShouldThrowUnknown()
}

@Test
public void commit_SnapshotHandlerGiven_ShouldWaitSnapshotHandlerFinishesBeforeCommitState()
public void commit_SnapshotHookGiven_ShouldWaitSnapshotHookFinishesBeforeCommitState()
throws ExecutionException, CoordinatorException {
// Arrange
Snapshot snapshot = prepareSnapshotWithDifferentPartitionPut();
doNothing().when(storage).mutate(anyList());
doThrowExceptionWhenCoordinatorPutState(TransactionState.COMMITTED, CoordinatorException.class);
// Lambda can be spied...
SnapshotHandler delayedSnapshotHandler =
// Lambda can't be spied...
PreparedSnapshotHook delayedPreparedSnapshotHook =
spy(
new SnapshotHandler() {
new PreparedSnapshotHook() {
@Override
public SnapshotHandleFuture handle(
public PreparedSnapshotHookFuture handle(
TransactionTableMetadataManager tableMetadataManager, Snapshot ss) {
Uninterruptibles.sleepUninterruptibly(Duration.ofSeconds(2));
return snapshotHandleFuture;
return preparedSnapshotHookFuture;
}
});
handler.setSnapshotHandler(delayedSnapshotHandler);
handler.setPreparedSnapshotHook(delayedPreparedSnapshotHook);

// Act
Instant start = Instant.now();
Expand All @@ -673,8 +672,8 @@ public SnapshotHandleFuture handle(
verify(storage, times(2)).mutate(anyList());
verifyCoordinatorPutState(TransactionState.COMMITTED);
verify(handler, never()).rollbackRecords(snapshot);
verify(delayedSnapshotHandler).handle(tableMetadataManager, snapshot);
// This means `commit()` waited until `SnapshotHandler.handle()` is completed before throwing
verify(delayedPreparedSnapshotHook).handle(tableMetadataManager, snapshot);
// This means `commit()` waited until the callback was completed before throwing
// an exception from `commitState()`.
assertThat(Duration.between(start, end)).isGreaterThanOrEqualTo(Duration.ofSeconds(2));
}
Expand All @@ -695,11 +694,11 @@ protected void verifyCoordinatorPutState(TransactionState expectedTransactionSta
verify(coordinator).putState(new Coordinator.State(anyId(), expectedTransactionState));
}

private void verifySnapshotHandler(boolean withSnapshotHandler, Snapshot snapshot) {
if (withSnapshotHandler) {
verify(snapshotHandler).handle(eq(tableMetadataManager), eq(snapshot));
private void verifySnapshotHook(boolean withSnapshotHook, Snapshot snapshot) {
if (withSnapshotHook) {
verify(preparedSnapshotHook).handle(eq(tableMetadataManager), eq(snapshot));
} else {
verify(snapshotHandler, never()).handle(any(), any());
verify(preparedSnapshotHook, never()).handle(any(), any());
}
}
}

0 comments on commit 0379139

Please sign in to comment.