Skip to content

Commit

Permalink
Optimize Coordinator#getStateForGroupCommit() by first looking up t…
Browse files Browse the repository at this point in the history
…he coordinator table with parent transaction ID (#2331)
  • Loading branch information
komamitsu committed Nov 14, 2024
1 parent 4fe9471 commit 2efc63e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,41 +90,30 @@ public Optional<Coordinator.State> getState(String id) throws CoordinatorExcepti

@VisibleForTesting
Optional<Coordinator.State> getStateForGroupCommit(String fullId) throws CoordinatorException {
// Reading a coordinator state is likely to occur during lazy recovery, as follows:
// 1. Transaction T1 starts and creates PREPARED state records but hasn't committed or aborted
// yet.
// 2. Transaction T2 starts and reads the PREPARED state records created by T1.
// 3. T2 reads the coordinator table record for T1 to decide whether to roll back or roll
// forward.
//
// The likelihood of step 2 would increase if T1 is delayed.
//
// With the group commit feature enabled, delayed transactions are isolated from a normal group
// that is looked up by a parent ID into a delayed group that is looked up by a full ID.
// Therefore, looking up with the full transaction ID should be tried first to minimize read
// operations as much as possible.

// Scan with the full ID for a delayed group that contains only a single transaction.
// The normal lookup logic can be used as is.
Optional<State> stateOfDelayedTxn = get(createGetWith(fullId));
if (stateOfDelayedTxn.isPresent()) {
return stateOfDelayedTxn;
}

// Scan with the parent ID for a normal group that contains multiple transactions.
Keys<String, String, String> idForGroupCommit = keyManipulator.keysFromFullKey(fullId);

String parentId = idForGroupCommit.parentKey;
String childId = idForGroupCommit.childKey;
Get get = createGetWith(parentId);
Optional<State> state = get(get);
return state.flatMap(
s -> {
if (s.getChildIds().contains(childId)) {
return state;
}
return Optional.empty();
});
// The current implementation is optimized for cases where most transactions are
// group-committed. It first looks up a transaction state using the parent ID with a single read
// operation. If no matching transaction state is found (i.e., the transaction was delayed and
// committed individually), it issues an additional read operation using the full ID.
Optional<State> stateContainingTargetTxId =
state.flatMap(
s -> {
if (s.getChildIds().contains(childId)) {
return state;
}
return Optional.empty();
});
if (stateContainingTargetTxId.isPresent()) {
return stateContainingTargetTxId;
}

return get(createGetWith(fullId));
}

public void putState(Coordinator.State state) throws CoordinatorException {
Expand Down Expand Up @@ -253,7 +242,8 @@ private void putStateForLazyRecoveryRollbackForGroupCommit(String id)
putState(new Coordinator.State(id, TransactionState.ABORTED));
}

private Get createGetWith(String id) {
@VisibleForTesting
Get createGetWith(String id) {
return new Get(new Key(Attribute.toIdValue(id)))
.withConsistency(Consistency.LINEARIZABLE)
.forNamespace(coordinatorNamespace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,19 +264,9 @@ public void getState_TransactionIdForGroupCommitGivenAndParentIdAndChildIdMatch_
// The IDs used to find the state are:
// - parentId:childId1
// - parentId:childId2
doReturn(
// For the first call,
// - The first get with the full ID shouldn't find a state.
Optional.empty(),
// - The second get with the parent ID should return the state.
Optional.of(resultForGroupCommitState),
// For the second call,
// - The first get with the full ID shouldn't find a state.
Optional.empty(),
// - The second get with the parent ID should return the state.
Optional.of(resultForGroupCommitState))
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(any(Get.class));
.get(coordinator.createGetWith(parentId));

// Act
Optional<Coordinator.State> state1 = spiedCoordinator.getState(fullId1);
Expand All @@ -291,9 +281,9 @@ public void getState_TransactionIdForGroupCommitGivenAndParentIdAndChildIdMatch_
assertThat(state1.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
verify(spiedCoordinator).getStateForGroupCommit(fullId1);
verify(spiedCoordinator).getStateForGroupCommit(fullId2);
verify(storage, times(4)).get(getArgumentCaptor.capture());
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Arrays.asList(fullId1, parentId, fullId2, parentId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, parentId));
}

@ParameterizedTest
Expand All @@ -310,6 +300,20 @@ public void getState_TransactionIdForSingleCommitGivenAndFullIdMatches_ShouldRet
String childId = UUID.randomUUID().toString();
String fullId = keyManipulator.fullKey(parentId, childId);
List<String> childIds = Collections.emptyList();
String dummyChildId1 = UUID.randomUUID().toString();
String dummyChildId2 = UUID.randomUUID().toString();
List<String> dummyChildIds = Arrays.asList(dummyChildId1, dummyChildId2);

Result resultForGroupCommitState = mock(Result.class);
when(resultForGroupCommitState.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, parentId)));
when(resultForGroupCommitState.getValue(Attribute.CHILD_IDS))
.thenReturn(
Optional.of(new TextValue(Attribute.CHILD_IDS, Joiner.on(',').join(dummyChildIds))));
when(resultForGroupCommitState.getValue(Attribute.STATE))
.thenReturn(Optional.of(new IntValue(Attribute.STATE, transactionState.get())));
when(resultForGroupCommitState.getValue(Attribute.CREATED_AT))
.thenReturn(Optional.of(new BigIntValue(Attribute.CREATED_AT, ANY_TIME_1)));

Result resultForSingleCommitState = mock(Result.class);
when(resultForSingleCommitState.getValue(Attribute.ID))
Expand All @@ -323,13 +327,18 @@ public void getState_TransactionIdForSingleCommitGivenAndFullIdMatches_ShouldRet

// Assuming these states exist:
//
// id | child_ids | state
// ------------------+-----------+----------
// parentId:childId | [] | COMMITTED
// id | child_ids | state
// ------------------+----------------------+----------
// parentId:childId | [childId1, childId2] | COMMITTED
//
// The IDs used to find the state are:
// - parentId:childId
doReturn(Optional.of(resultForSingleCommitState)).when(storage).get(any(Get.class));
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(coordinator.createGetWith(parentId));
doReturn(Optional.of(resultForSingleCommitState))
.when(storage)
.get(coordinator.createGetWith(fullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(fullId);
Expand All @@ -341,9 +350,9 @@ public void getState_TransactionIdForSingleCommitGivenAndFullIdMatches_ShouldRet
Assertions.assertThat(state.get().getState()).isEqualTo(transactionState);
assertThat(state.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
verify(spiedCoordinator).getStateForGroupCommit(fullId);
verify(storage).get(getArgumentCaptor.capture());
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Collections.singletonList(fullId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, fullId));
}

@ParameterizedTest
Expand Down Expand Up @@ -381,14 +390,11 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
//
// The IDs used to find the state are:
// - parentId:childIdX
doReturn(
// The first get with the full ID should return empty.
Optional.empty(),
// The second get with the parent ID should return a state, but it doesn't contain the
// child ID.
Optional.of(resultForGroupCommitState))
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(any(Get.class));
.get(coordinator.createGetWith(parentId));

doReturn(Optional.empty()).when(storage).get(coordinator.createGetWith(targetFullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(targetFullId);
Expand All @@ -398,7 +404,7 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
verify(spiedCoordinator).getStateForGroupCommit(targetFullId);
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Arrays.asList(targetFullId, parentId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, targetFullId));
}

@ParameterizedTest
Expand All @@ -413,11 +419,23 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
CoordinatorGroupCommitKeyManipulator keyManipulator =
new CoordinatorGroupCommitKeyManipulator();
String parentId = keyManipulator.generateParentKey();
List<String> childIds =
Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString());

// Look up with the same parent ID and a wrong child ID.
// But the full ID matches the single committed state.
String targetFullId = keyManipulator.fullKey(parentId, UUID.randomUUID().toString());

Result resultForGroupCommitState = mock(Result.class);
when(resultForGroupCommitState.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, parentId)));
when(resultForGroupCommitState.getValue(Attribute.CHILD_IDS))
.thenReturn(Optional.of(new TextValue(Attribute.CHILD_IDS, Joiner.on(',').join(childIds))));
when(resultForGroupCommitState.getValue(Attribute.STATE))
.thenReturn(Optional.of(new IntValue(Attribute.STATE, transactionState.get())));
when(resultForGroupCommitState.getValue(Attribute.CREATED_AT))
.thenReturn(Optional.of(new BigIntValue(Attribute.CREATED_AT, ANY_TIME_1)));

Result resultForSingleCommitState = mock(Result.class);
when(resultForSingleCommitState.getValue(Attribute.ID))
.thenReturn(Optional.of(new TextValue(Attribute.ID, targetFullId)));
Expand All @@ -437,7 +455,12 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
//
// The IDs used to find the state are:
// - parentId:childIdX
doReturn(Optional.of(resultForSingleCommitState)).when(storage).get(any(Get.class));
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(coordinator.createGetWith(parentId));
doReturn(Optional.of(resultForSingleCommitState))
.when(storage)
.get(coordinator.createGetWith(targetFullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(targetFullId);
Expand All @@ -449,9 +472,9 @@ public void getState_TransactionIdForGroupCommitGivenAndOnlyParentIdMatches_Shou
Assertions.assertThat(state.get().getState()).isEqualTo(transactionState);
assertThat(state.get().getCreatedAt()).isEqualTo(ANY_TIME_1);
verify(spiedCoordinator).getStateForGroupCommit(targetFullId);
verify(storage).get(getArgumentCaptor.capture());
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Collections.singletonList(targetFullId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, targetFullId));
}

@ParameterizedTest
Expand Down Expand Up @@ -491,7 +514,10 @@ public void getState_TransactionIdGivenButNoIdMatches_ShouldReturnEmpty(
//
// The IDs used to find the state are:
// - parentId:childIdY
when(storage.get(any(Get.class))).thenReturn(Optional.empty());
doReturn(Optional.of(resultForGroupCommitState))
.when(storage)
.get(coordinator.createGetWith(parentId));
doReturn(Optional.empty()).when(storage).get(coordinator.createGetWith(targetFullId));

// Act
Optional<Coordinator.State> state = spiedCoordinator.getState(targetFullId);
Expand All @@ -501,7 +527,7 @@ public void getState_TransactionIdGivenButNoIdMatches_ShouldReturnEmpty(
verify(spiedCoordinator).getStateForGroupCommit(targetFullId);
verify(storage, times(2)).get(getArgumentCaptor.capture());
assertGetArgumentCaptorForGetState(
getArgumentCaptor.getAllValues(), Arrays.asList(targetFullId, parentId));
getArgumentCaptor.getAllValues(), Arrays.asList(parentId, targetFullId));
}

@Test
Expand Down

0 comments on commit 2efc63e

Please sign in to comment.