Skip to content

Commit

Permalink
change to saga-in-workflow solution
Browse files Browse the repository at this point in the history
Signed-off-by: Sky Ao <[email protected]>
  • Loading branch information
skyao committed Nov 16, 2023
1 parent 6c8076c commit 21b1b34
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

import org.slf4j.Logger;

import com.microsoft.durabletask.OrchestratorBlockedException;

import io.dapr.quickstarts.saga.activities.DeliveryActivity;
import io.dapr.quickstarts.saga.activities.NotifyActivity;
import io.dapr.quickstarts.saga.activities.ProcessPaymentActivity;
import io.dapr.quickstarts.saga.activities.ProcessPaymentCompensatationActivity;
import io.dapr.quickstarts.saga.activities.RequestApprovalActivity;
import io.dapr.quickstarts.saga.activities.ReserveInventoryActivity;
import io.dapr.quickstarts.saga.activities.UpdateInventoryActivity;
import io.dapr.quickstarts.saga.activities.UpdateInventoryCompensatationActivity;
import io.dapr.quickstarts.saga.models.ApprovalResult;
import io.dapr.quickstarts.saga.models.InventoryRequest;
import io.dapr.quickstarts.saga.models.InventoryResult;
Expand All @@ -19,7 +19,6 @@
import io.dapr.quickstarts.saga.models.PaymentRequest;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;
import io.dapr.workflows.saga.Saga;
import io.dapr.workflows.saga.SagaConfiguration;

public class OrderProcessingWorkflow extends Workflow {
Expand All @@ -33,106 +32,93 @@ public WorkflowStub create() {
logger.info("Instance ID(order ID): " + orderId);
logger.info("Current Orchestration Time: " + ctx.getCurrentInstant());

SagaConfiguration config = SagaConfiguration.newBuilder()
.setParallelCompensation(false)
.setContinueWithError(true).build();
Saga saga = new Saga(config);

OrderPayload order = ctx.getInput(OrderPayload.class);
logger.info("Received Order: " + order.toString());
OrderResult orderResult = new OrderResult();

try {

// step1: notify the user that an order has come through
Notification notification = new Notification();
notification.setMessage("Received Order: " + order.toString());
// step1: notify the user that an order has come through
Notification notification = new Notification();
notification.setMessage("Received Order: " + order.toString());
ctx.callActivity(NotifyActivity.class.getName(), notification).await();

// step2: determine if there is enough of the item available for purchase by
// checking the inventory
InventoryRequest inventoryRequest = new InventoryRequest();
inventoryRequest.setRequestId(orderId);
inventoryRequest.setItemName(order.getItemName());
inventoryRequest.setQuantity(order.getQuantity());
InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(),
inventoryRequest, InventoryResult.class).await();

// If there is insufficient inventory, fail and let the user know
if (!inventoryResult.isSuccess()) {
notification.setMessage("Insufficient inventory for order : " + order.getItemName());
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
ctx.complete(orderResult);
return;
}

// step2: determine if there is enough of the item available for purchase by
// checking
// the inventory
InventoryRequest inventoryRequest = new InventoryRequest();
inventoryRequest.setRequestId(orderId);
inventoryRequest.setItemName(order.getItemName());
inventoryRequest.setQuantity(order.getQuantity());
InventoryResult inventoryResult = ctx.callActivity(ReserveInventoryActivity.class.getName(),
inventoryRequest, InventoryResult.class).await();

// If there is insufficient inventory, fail and let the user know
if (!inventoryResult.isSuccess()) {
notification.setMessage("Insufficient inventory for order : " + order.getItemName());
// step3: require orders over a certain threshold to be approved
if (order.getTotalCost() > 5000) {
ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(),
order, ApprovalResult.class).await();
if (approvalResult != ApprovalResult.Approved) {
notification.setMessage("Order " + order.getItemName() + " was not approved.");
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
ctx.complete(orderResult);
return;
}
}

// step3: require orders over a certain threshold to be approved
if (order.getTotalCost() > 5000) {
ApprovalResult approvalResult = ctx.callActivity(RequestApprovalActivity.class.getName(),
order, ApprovalResult.class).await();
if (approvalResult != ApprovalResult.Approved) {
notification.setMessage("Order " + order.getItemName() + " was not approved.");
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
ctx.complete(orderResult);
return;
}
}

// There is enough inventory available so the user can purchase the item(s).
// step4: Process their payment (need compensation)
PaymentRequest paymentRequest = new PaymentRequest();
paymentRequest.setRequestId(orderId);
paymentRequest.setItemBeingPurchased(order.getItemName());
paymentRequest.setQuantity(order.getQuantity());
paymentRequest.setAmount(order.getTotalCost());
boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(),
paymentRequest, boolean.class).await();
if (!isOK) {
notification.setMessage("Payment failed for order : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
ctx.complete(orderResult);
return;
}
// payment activity is processed, register for compensation
saga.registerCompensation(ProcessPaymentActivity.class.getName(), paymentRequest, isOK);

// step5: Update the inventory (need compensation)
inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(),
inventoryRequest, InventoryResult.class).await();
if (!inventoryResult.isSuccess()) {
// Let users know their payment processing failed
notification.setMessage("Order failed to update inventory! : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
// There is enough inventory available so the user can purchase the item(s).
// step4: Process their payment (need compensation)
PaymentRequest paymentRequest = new PaymentRequest();
paymentRequest.setRequestId(orderId);
paymentRequest.setItemBeingPurchased(order.getItemName());
paymentRequest.setQuantity(order.getQuantity());
paymentRequest.setAmount(order.getTotalCost());
boolean isOK = ctx.callActivity(ProcessPaymentActivity.class.getName(),
paymentRequest, boolean.class).await();
if (!isOK) {
notification.setMessage("Payment failed for order : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
ctx.complete(orderResult);
return;
}
ctx.getSaga().registerCompensation(ProcessPaymentCompensatationActivity.class.getName(), paymentRequest);

// step5: Update the inventory (need compensation)
inventoryResult = ctx.callActivity(UpdateInventoryActivity.class.getName(),
inventoryRequest, InventoryResult.class).await();
if (!inventoryResult.isSuccess()) {
// Let users know their payment processing failed
notification.setMessage("Order failed to update inventory! : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();

// throw exception to trigger compensation
throw new RuntimeException("Failed to update inventory");
}
// Update Inventory activity is succeed, register for compensation
saga.registerCompensation(UpdateInventoryActivity.class.getName(), inventoryRequest, inventoryResult);
// throw exception to trigger compensation
throw new RuntimeException("Failed to update inventory");
}
ctx.getSaga().registerCompensation(UpdateInventoryCompensatationActivity.class.getName(), inventoryRequest);

// step6: delevery (allways be failed to trigger compensation)
ctx.callActivity(DeliveryActivity.class.getName(), null).await();
// step6: delevery (allways be failed to trigger compensation)
ctx.callActivity(DeliveryActivity.class.getName()).await();

// step7: Let user know their order was processed(won't be executed if step6
// failed)
notification.setMessage("Order completed! : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();
// step7: Let user know their order was processed(won't be executed if step6
// failed)
notification.setMessage("Order completed! : " + orderId);
ctx.callActivity(NotifyActivity.class.getName(), notification).await();

// Complete the workflow with order result is processed(won't be executed if
// step6 failed)
orderResult.setProcessed(true);
ctx.complete(orderResult);
} catch (OrchestratorBlockedException e) {
//TODO: try to improve design and remove this exception catch
throw e;
} catch (Exception e) {
orderResult.setCompensated(true);
ctx.complete(orderResult);

saga.compensate();
}
// Complete the workflow with order result is processed(won't be executed if
// step6 failed)
orderResult.setProcessed(true);
ctx.complete(orderResult);
};
}

@Override
public SagaConfiguration getSagaConfiguration() {
return SagaConfiguration.newBuilder().setParallelCompensation(false)
.setContinueWithError(true).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import io.dapr.quickstarts.saga.activities.DeliveryActivity;
import io.dapr.quickstarts.saga.activities.NotifyActivity;
import io.dapr.quickstarts.saga.activities.ProcessPaymentActivity;
import io.dapr.quickstarts.saga.activities.ProcessPaymentCompensatationActivity;
import io.dapr.quickstarts.saga.activities.RequestApprovalActivity;
import io.dapr.quickstarts.saga.activities.ReserveInventoryActivity;
import io.dapr.quickstarts.saga.activities.UpdateInventoryActivity;
import io.dapr.quickstarts.saga.activities.UpdateInventoryCompensatationActivity;
import io.dapr.quickstarts.saga.models.InventoryItem;
import io.dapr.quickstarts.saga.models.OrderPayload;
import io.dapr.workflows.client.DaprWorkflowClient;
Expand Down Expand Up @@ -56,6 +58,9 @@ public static void main(String[] args) throws Exception {
builder.registerActivity(UpdateInventoryActivity.class);
builder.registerActivity(DeliveryActivity.class);

builder.registerActivity(ProcessPaymentCompensatationActivity.class);
builder.registerActivity(UpdateInventoryCompensatationActivity.class);

// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
Expand Down Expand Up @@ -116,8 +121,17 @@ private static InventoryItem prepareInventoryAndOrder() {
inventory.setName("cars");
inventory.setPerItemCost(15000);
inventory.setQuantity(100);

DaprClient daprClient = new DaprClientBuilder().build();
restockInventory(daprClient, inventory);
try {
String key = inventory.getName();
daprClient.saveState(STATE_STORE_NAME, key, inventory).block();
} finally {
try{
daprClient.close();
} catch (Exception e) {
}
}

// prepare order for 10 cars
InventoryItem order = new InventoryItem();
Expand All @@ -126,9 +140,4 @@ private static InventoryItem prepareInventoryAndOrder() {
order.setQuantity(10);
return order;
}

private static void restockInventory(DaprClient daprClient, InventoryItem inventory) {
String key = inventory.getName();
daprClient.saveState(STATE_STORE_NAME, key, inventory).block();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public Object run(WorkflowActivityContext ctx) {
// in this quickstart, we assume that the Delivery will be failed
// So that the workflow will be failed and compensated
logger.info("Delivery failed");
throw new RuntimeException("Distribution failed");
throw new RuntimeException("Delivery failed");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import io.dapr.quickstarts.saga.models.PaymentRequest;
import io.dapr.workflows.runtime.WorkflowActivity;
import io.dapr.workflows.runtime.WorkflowActivityContext;
import io.dapr.workflows.saga.CompensatableWorkflowActivity;

public class ProcessPaymentActivity implements WorkflowActivity, CompensatableWorkflowActivity {
public class ProcessPaymentActivity implements WorkflowActivity {
private static Logger logger = LoggerFactory.getLogger(ProcessPaymentActivity.class);

@Override
Expand All @@ -26,21 +25,4 @@ public Object run(WorkflowActivityContext ctx) {

return true;
}

@Override
public void compensate(Object activityInput, Object activityOutput) {
PaymentRequest input = (PaymentRequest) activityInput;

logger.info("Compensating payment for request ID '{}' at ${}",
input.getRequestId(), input.getAmount());

// Simulate slow processing
try {
Thread.sleep(3 * 1000);
} catch (InterruptedException e) {
}

logger.info("Compensated payment for request ID '{}' at ${}",
input.getRequestId(), input.getAmount());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.dapr.quickstarts.saga.activities;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.dapr.quickstarts.saga.models.PaymentRequest;
import io.dapr.workflows.runtime.WorkflowActivityContext;
import io.dapr.workflows.saga.CompensatableWorkflowActivity;

public class ProcessPaymentCompensatationActivity implements CompensatableWorkflowActivity {
private static Logger logger = LoggerFactory.getLogger(ProcessPaymentCompensatationActivity.class);

@Override
public Object run(WorkflowActivityContext ctx) {
PaymentRequest input = ctx.getInput(PaymentRequest.class);

logger.info("Compensating payment for request ID '{}' at ${}",
input.getRequestId(), input.getAmount());

// Simulate slow processing
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException e) {
}

logger.info("Compensated payment for request ID '{}' at ${}",
input.getRequestId(), input.getAmount());
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ public class ReserveInventoryActivity implements WorkflowActivity {

private static final String STATE_STORE_NAME = "statestore";

private DaprClient daprClient;

public ReserveInventoryActivity() {
this.daprClient = new DaprClientBuilder().build();
}

@Override
public Object run(WorkflowActivityContext ctx) {
InventoryRequest inventoryRequest = ctx.getInput(InventoryRequest.class);
logger.info("Reserving inventory for order '{}' of {} {}",
inventoryRequest.getRequestId(), inventoryRequest.getQuantity(), inventoryRequest.getItemName());

State<InventoryItem> inventoryState = daprClient.getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block();
State<InventoryItem> inventoryState;
try {
DaprClient daprClient = new DaprClientBuilder().build();
inventoryState = daprClient.getState(STATE_STORE_NAME, inventoryRequest.getItemName(), InventoryItem.class).block();
} catch (Exception e) {
throw e;
}

InventoryItem inventory = inventoryState.getValue();

logger.info("There are {} {} available for purchase",
inventory.getQuantity(), inventory.getName());

Expand Down
Loading

0 comments on commit 21b1b34

Please sign in to comment.