Skip to content

Commit

Permalink
Merge pull request #2404 from danhaywood/CAUSEWAY-3738
Browse files Browse the repository at this point in the history
Causeway 3738
  • Loading branch information
danhaywood authored May 10, 2024
2 parents 82c1d18 + 88736f4 commit 88bfe60
Showing 1 changed file with 97 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@

import javax.inject.Inject;

import org.apache.causeway.commons.functional.Try;
import org.apache.causeway.extensions.commandlog.applib.dom.CommandLogEntryRepository;

import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;

import org.springframework.dao.DeadlockLoserDataAccessException;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;

Expand Down Expand Up @@ -67,6 +70,9 @@
@Log4j2
public class RunBackgroundCommandsJob implements Job {

final static int RETRY_COUNT = 3;
final static long RETRY_INTERVAL_MILLIS = 1000;

@Inject InteractionService interactionService;
@Inject TransactionService transactionService;
@Inject ClockService clockService;
Expand All @@ -83,61 +89,106 @@ public void execute(final JobExecutionContext quartzContext) {
return;
}

UserMemento user = UserMemento.ofNameAndRoleNames("scheduler_user", "admin_role");
InteractionContext interactionContext = InteractionContext.builder().user(user).build();
val userMemento = UserMemento.ofNameAndRoleNames("scheduler_user", "admin_role");
val interactionContext = InteractionContext.builder().user(userMemento).build();

// we obtain the list of Commands first; we use their CommandDto as it is serializable across transactions
final Optional<List<CommandDto>> commandDtosIfAny =
interactionService.callAndCatch(interactionContext, () ->
transactionService.callTransactional(Propagation.REQUIRES_NEW, () ->
commandLogEntryRepository.findBackgroundAndNotYetStarted()
.stream()
.map(CommandLogEntry::getCommandDto)
.collect(Collectors.toList())
)
.ifFailureFail()
.valueAsNonNullElseFail()
)
.ifFailureFail() // we give up if unable to find these
.getValue();
final Optional<List<CommandDto>> commandDtosIfAny = pendingCommandDtos(interactionContext);

// for each command, we execute within its own transaction. Failure of one should not impact the next.
commandDtosIfAny.ifPresent(commandDtos -> {
for (val commandDto : commandDtos) {
interactionService.runAndCatch(interactionContext, () -> {
transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
// look up the CommandLogEntry again because we are within a new transaction.
val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));

// finally, we execute
commandLogEntryIfAny.ifPresent(commandLogEntry ->
{
commandExecutorService.executeCommand(
CommandExecutorService.InteractionContextPolicy.NO_SWITCH, commandDto);
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
});
})
.ifFailureFail();
})
.ifFailure(throwable -> {
log.error("Failed to execute command: " + CommandDtoUtils.dtoMapper().toString(commandDto), throwable);
// update this command as having failed.
interactionService.runAndCatch(interactionContext, () -> {
transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
// look up the CommandLogEntry again because we are within a new transaction.
val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));

// capture the error
commandLogEntryIfAny.ifPresent(commandLogEntry ->
{
commandLogEntry.setException(throwable);
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
});
});
});
executeCommandWithinOwnTransaction(commandDto, interactionContext);
}
});
}

private Optional<List<CommandDto>> pendingCommandDtos(final InteractionContext interactionContext) {
return interactionService.callAndCatch(interactionContext, () ->
transactionService.callTransactional(Propagation.REQUIRES_NEW, () ->
commandLogEntryRepository.findBackgroundAndNotYetStarted()
.stream()
.map(CommandLogEntry::getCommandDto)
.collect(Collectors.toList())
)
.ifFailureFail()
.valueAsNonNullElseFail()
)
.ifFailureFail() // we give up if unable to find these
.getValue();
}

private void executeCommandWithinOwnTransaction(
final CommandDto commandDto,
final InteractionContext interactionContext
) {
int retryCount = RETRY_COUNT;
while(retryCount > 0) {
Try<?> result = interactionService.runAndCatch(interactionContext, () -> {
executeCommandWithinOwnTransactionElseFail(commandDto);
});
if (isEncounteredDeadlock(result)) {
retryCount--;
log.debug("Deadlock occurred, retrying command: " + CommandDtoUtils.dtoMapper().toString(commandDto));
sleep(RETRY_INTERVAL_MILLIS);
} else {
retryCount=0; // ie break
result.ifFailure(throwable -> {
logAndCaptureFailure(throwable, commandDto, interactionContext);
});
}
}
}

private void executeCommandWithinOwnTransactionElseFail(CommandDto commandDto) {
transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
// look up the CommandLogEntry again because we are within a new transaction.
val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));

// finally, we execute
commandLogEntryIfAny.ifPresent(commandLogEntry ->
{
commandExecutorService.executeCommand(
CommandExecutorService.InteractionContextPolicy.NO_SWITCH, commandDto);
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
});
})
.ifFailureFail();
}

private void logAndCaptureFailure(Throwable throwable, CommandDto commandDto, InteractionContext interactionContext) {
log.error("Failed to execute command: " + CommandDtoUtils.dtoMapper().toString(commandDto), throwable);
// update this command as having failed.
interactionService.runAndCatch(interactionContext, () -> {
transactionService.runTransactional(Propagation.REQUIRES_NEW, () -> {
// look up the CommandLogEntry again because we are within a new transaction.
val commandLogEntryIfAny = commandLogEntryRepository.findByInteractionId(UUID.fromString(commandDto.getInteractionId()));

// capture the error
commandLogEntryIfAny.ifPresent(commandLogEntry ->
{
commandLogEntry.setException(throwable);
commandLogEntry.setCompletedAt(clockService.getClock().nowAsJavaSqlTimestamp());
});
});
});
}

private static boolean isEncounteredDeadlock(Try<?> result) {
if (!result.isFailure()) {
return false;
}
return result.getFailure()
.map(throwable -> throwable instanceof DeadlockLoserDataAccessException)
.orElse(false);
}

private static void sleep(long retryIntervalMs) {
try {
Thread.sleep(retryIntervalMs);
} catch (InterruptedException e) {
// do nothing - continue
}
}

}

0 comments on commit 88bfe60

Please sign in to comment.