Skip to content

Commit

Permalink
workflow executor
Browse files Browse the repository at this point in the history
  • Loading branch information
manuraf committed Jan 22, 2024
1 parent e6cf5de commit 8536088
Show file tree
Hide file tree
Showing 10 changed files with 340 additions and 357 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,37 @@
import com.microsoft.durabletask.azurefunctions.DurableClientContext;
import com.microsoft.durabletask.azurefunctions.DurableClientInput;
import com.microsoft.durabletask.azurefunctions.DurableOrchestrationTrigger;
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
import it.pagopa.selfcare.onboarding.common.WorkflowType;
import it.pagopa.selfcare.onboarding.config.RetryPolicyConfig;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.exception.ResourceNotFoundException;
import it.pagopa.selfcare.onboarding.functions.utils.SaveOnboardingStatusInput;
import it.pagopa.selfcare.onboarding.service.CompletionService;
import it.pagopa.selfcare.onboarding.service.OnboardingService;
import it.pagopa.selfcare.onboarding.workflow.*;

import java.time.Duration;
import java.util.Optional;

import static it.pagopa.selfcare.onboarding.functions.CommonFunctions.FORMAT_LOGGER_ONBOARDING_STRING;
import static it.pagopa.selfcare.onboarding.functions.CommonFunctions.SAVE_ONBOARDING_STATUS_ACTIVITY;
import static it.pagopa.selfcare.onboarding.utils.Utils.getOnboardingString;
import static it.pagopa.selfcare.onboarding.functions.utils.ActivityName.*;
import static it.pagopa.selfcare.onboarding.utils.Utils.readOnboardingValue;

/**
* Azure Functions with HTTP Trigger integrated with Quarkus
*/
public class OnboardingFunctions {
public static final String CREATED_NEW_ONBOARDING_ORCHESTRATION_WITH_INSTANCE_ID_MSG = "Created new Onboarding orchestration with instance ID = ";
public static final String SAVE_TOKEN_WITH_CONTRACT_ACTIVITY_NAME = "SaveTokenWithContract";
public static final String BUILD_CONTRACT_ACTIVITY_NAME = "BuildContract";
public static final String SEND_MAIL_REGISTRATION_WITH_CONTRACT_ACTIVITY = "SendMailRegistrationWithContract";

public static final String SEND_MAIL_REGISTRATION_WITH_CONTRACT_WHEN_APPROVE_ACTIVITY = "SendMailRegistrationWithContractWhenApprove";
public static final String SEND_MAIL_REGISTRATION_REQUEST_ACTIVITY = "SendMailRegistrationRequest";
public static final String SEND_MAIL_REGISTRATION_APPROVE_ACTIVITY = "SendMailRegistrationApprove";
public static final String SEND_MAIL_ONBOARDING_APPROVE_ACTIVITY = "SendMailOnboardingApprove";
public static final String SEND_MAIL_CONFIRMATION_ACTIVITY = "SendMailConfirmation";

private final OnboardingService service;
private final CompletionService completionService;

private final ObjectMapper objectMapper;
private final TaskOptions optionsRetry;

public OnboardingFunctions(OnboardingService service, ObjectMapper objectMapper, RetryPolicyConfig retryPolicyConfig) {
public OnboardingFunctions(OnboardingService service, ObjectMapper objectMapper, RetryPolicyConfig retryPolicyConfig, CompletionService completionService) {
this.service = service;
this.objectMapper = objectMapper;
this.completionService = completionService;

final int maxAttempts = retryPolicyConfig.maxAttempts();
final Duration firstRetryInterval = Duration.ofSeconds(retryPolicyConfig.firstRetryInterval());
Expand Down Expand Up @@ -92,45 +84,18 @@ public void onboardingsOrchestrator(
String onboardingId = ctx.getInput(String.class);
Onboarding onboarding = service.getOnboarding(onboardingId)
.orElseThrow(() -> new ResourceNotFoundException(String.format("Onboarding with id %s not found!", onboardingId)));
String onboardingString = getOnboardingString(objectMapper, onboarding);

switch (onboarding.getWorkflowType()) {
case CONTRACT_REGISTRATION -> workflowContractRegistration(ctx, onboardingString);
case FOR_APPROVE -> workflowForApprove(ctx, onboardingString, onboarding.getStatus());
case FOR_APPROVE_PT -> workflowRegistrationRequestAndApprove(ctx, onboardingString);
case CONFIRMATION -> workflowForConfirmation(ctx, onboardingString);
}

//Last activity consist of saving pending status
OnboardingStatus nextStatus = nextProcessOnboardingStatus(onboarding.getStatus(), onboarding.getWorkflowType());
String saveOnboardingStatusInput = SaveOnboardingStatusInput.buildAsJsonString(onboardingId, nextStatus.name());

ctx.callActivity(SAVE_ONBOARDING_STATUS_ACTIVITY, saveOnboardingStatusInput, optionsRetry, String.class).await();
}
WorkflowExecutor workflowExecutor;

private void workflowContractRegistration(TaskOrchestrationContext ctx, String onboardingString){
ctx.callActivity(BUILD_CONTRACT_ACTIVITY_NAME, onboardingString, optionsRetry, String.class).await();
ctx.callActivity(SAVE_TOKEN_WITH_CONTRACT_ACTIVITY_NAME, onboardingString, optionsRetry, String.class).await();
ctx.callActivity(SEND_MAIL_REGISTRATION_WITH_CONTRACT_ACTIVITY, onboardingString, optionsRetry, String.class).await();
}

private void workflowForApprove(TaskOrchestrationContext ctx, String onboardingString, OnboardingStatus onboardingStatus){
if (OnboardingStatus.REQUEST.equals(onboardingStatus)) {
ctx.callActivity(SEND_MAIL_ONBOARDING_APPROVE_ACTIVITY, onboardingString, optionsRetry, String.class).await();
} else if (OnboardingStatus.TO_BE_VALIDATED.equals(onboardingStatus)) {
ctx.callActivity(BUILD_CONTRACT_ACTIVITY_NAME, onboardingString, optionsRetry, String.class).await();
ctx.callActivity(SAVE_TOKEN_WITH_CONTRACT_ACTIVITY_NAME, onboardingString, optionsRetry, String.class).await();
ctx.callActivity(SEND_MAIL_REGISTRATION_WITH_CONTRACT_WHEN_APPROVE_ACTIVITY, onboardingString, optionsRetry, String.class).await();
switch (onboarding.getWorkflowType()) {
case CONTRACT_REGISTRATION -> workflowExecutor = new WorkflowExecutorContractRegistration(objectMapper, optionsRetry);
case FOR_APPROVE -> workflowExecutor = new WorkflowExecutorForApprove(objectMapper, optionsRetry);
case FOR_APPROVE_PT -> workflowExecutor = new WorkflowExecutorForApprovePt(objectMapper, optionsRetry);
case CONFIRMATION -> workflowExecutor = new WorkflowExecutorConfirmation(objectMapper, optionsRetry);
default -> throw new IllegalArgumentException("Workflow options not found!");
}
}

private void workflowRegistrationRequestAndApprove(TaskOrchestrationContext ctx, String onboardingString){
ctx.callActivity(SEND_MAIL_REGISTRATION_REQUEST_ACTIVITY, onboardingString, optionsRetry, String.class).await();
ctx.callActivity(SEND_MAIL_REGISTRATION_APPROVE_ACTIVITY, onboardingString, optionsRetry, String.class).await();
}

private void workflowForConfirmation(TaskOrchestrationContext ctx, String onboardingString){
ctx.callActivity(SEND_MAIL_CONFIRMATION_ACTIVITY, onboardingString, optionsRetry, String.class).await();
workflowExecutor.execute(ctx, onboarding);
}

/**
Expand Down Expand Up @@ -192,12 +157,23 @@ public String sendMailConfirmation(@DurableActivityTrigger(name = "onboardingStr
return onboardingString;
}

private OnboardingStatus nextProcessOnboardingStatus(OnboardingStatus previous, WorkflowType workflowType) {
if(OnboardingStatus.REQUEST.equals(previous) &&
(WorkflowType.FOR_APPROVE.equals(workflowType) || WorkflowType.FOR_APPROVE_PT.equals(workflowType))) {
return OnboardingStatus.TO_BE_VALIDATED;
}

return OnboardingStatus.PENDING;

@FunctionName(CREATE_INSTITUTION_ACTIVITY)
public String createInstitutionAndPersistInstitutionId(@DurableActivityTrigger(name = "onboardingString") String onboardingString, final ExecutionContext context) {
context.getLogger().info(String.format(FORMAT_LOGGER_ONBOARDING_STRING, CREATE_INSTITUTION_ACTIVITY, onboardingString));
return completionService.createInstitutionAndPersistInstitutionId(readOnboardingValue(objectMapper, onboardingString));
}

@FunctionName(CREATE_ONBOARDING_ACTIVITY)
public void createOnboarding(@DurableActivityTrigger(name = "onboardingString") String onboardingString, final ExecutionContext context) {
context.getLogger().info(String.format(FORMAT_LOGGER_ONBOARDING_STRING, CREATE_ONBOARDING_ACTIVITY, onboardingString));
completionService.persistOnboarding(readOnboardingValue(objectMapper, onboardingString));
}

@FunctionName(SEND_MAIL_COMPLETION_ACTIVITY)
public void sendMailCompletion(@DurableActivityTrigger(name = "onboardingString") String onboardingString, final ExecutionContext context) {
context.getLogger().info(String.format(FORMAT_LOGGER_ONBOARDING_STRING, SEND_MAIL_COMPLETION_ACTIVITY, onboardingString));
completionService.sendCompletedEmail(readOnboardingValue(objectMapper, onboardingString));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package it.pagopa.selfcare.onboarding.functions.utils;

public class ActivityName {

public static final String SAVE_TOKEN_WITH_CONTRACT_ACTIVITY_NAME = "SaveTokenWithContract";
public static final String BUILD_CONTRACT_ACTIVITY_NAME = "BuildContract";
public static final String SEND_MAIL_REGISTRATION_WITH_CONTRACT_ACTIVITY = "SendMailRegistrationWithContract";

public static final String SEND_MAIL_REGISTRATION_WITH_CONTRACT_WHEN_APPROVE_ACTIVITY = "SendMailRegistrationWithContractWhenApprove";
public static final String SEND_MAIL_REGISTRATION_REQUEST_ACTIVITY = "SendMailRegistrationRequest";
public static final String SEND_MAIL_REGISTRATION_APPROVE_ACTIVITY = "SendMailRegistrationApprove";
public static final String SEND_MAIL_ONBOARDING_APPROVE_ACTIVITY = "SendMailOnboardingApprove";
public static final String SEND_MAIL_CONFIRMATION_ACTIVITY = "SendMailConfirmation";


public static final String ONBOARDING_COMPLETION_ACTIVITY = "OnboardingCompletion";
public static final String CREATE_INSTITUTION_ACTIVITY = "CreateInstitution";
public static final String CREATE_ONBOARDING_ACTIVITY = "CreateOnboarding";
public static final String SEND_MAIL_COMPLETION_ACTIVITY = "SendMailCompletion";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package it.pagopa.selfcare.onboarding.workflow;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.microsoft.durabletask.TaskOptions;
import com.microsoft.durabletask.TaskOrchestrationContext;
import it.pagopa.selfcare.onboarding.common.OnboardingStatus;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.functions.utils.SaveOnboardingStatusInput;

import static it.pagopa.selfcare.onboarding.functions.CommonFunctions.SAVE_ONBOARDING_STATUS_ACTIVITY;
import static it.pagopa.selfcare.onboarding.functions.utils.ActivityName.*;
import static it.pagopa.selfcare.onboarding.utils.Utils.getOnboardingString;

public interface WorkflowExecutor {

void executeRequestState(TaskOrchestrationContext ctx, Onboarding onboarding);
void executeToBeValidatedState(TaskOrchestrationContext ctx, Onboarding onboarding);

ObjectMapper objectMapper();

TaskOptions optionsRetry();

default void execute(TaskOrchestrationContext ctx, Onboarding onboarding){
switch (onboarding.getStatus()){
case REQUEST -> executeRequestState(ctx, onboarding);
case TO_BE_VALIDATED -> executeToBeValidatedState(ctx, onboarding);
case PENDING -> executePendingState(ctx, onboarding);
}
}

default void executePendingState(TaskOrchestrationContext ctx, Onboarding onboarding) {
String onboardingString = getOnboardingString(objectMapper(), onboarding);

//CreateInstitution activity return an institutionId that is used by CreateOnboarding activity
String institutionId = ctx.callActivity(CREATE_INSTITUTION_ACTIVITY, onboardingString, optionsRetry(), String.class).await();
onboarding.getInstitution().setId(institutionId);
onboardingString = getOnboardingString(objectMapper(), onboarding);

ctx.callActivity(CREATE_ONBOARDING_ACTIVITY, onboardingString, optionsRetry(), String.class).await();
ctx.callActivity(SEND_MAIL_COMPLETION_ACTIVITY, onboardingString, optionsRetry(), String.class).await();

//Last activity consist of saving pending status
String saveOnboardingStatusInput = SaveOnboardingStatusInput.buildAsJsonString(onboarding.getOnboardingId(), OnboardingStatus.COMPLETED.name());
ctx.callActivity(SAVE_ONBOARDING_STATUS_ACTIVITY, saveOnboardingStatusInput, optionsRetry(), String.class).await();
}
}
Loading

0 comments on commit 8536088

Please sign in to comment.