Skip to content

Commit

Permalink
[GOBBLIN-1921] Properly handle reminder events (#3790)
Browse files Browse the repository at this point in the history
* Add millisecond level precision to timestamp cols & proper timezone conversion

	- existing tests pass with minor modifications

* Handle reminder events properly

* Fix compilation errors & add isReminder flag

* Add unit tests

* Address review comments

* Add newline to address comment

* Include reminder/original tag in logging

* Clarify timezone issues in comment

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Oct 3, 2023
1 parent 028b85f commit 2a2edfc
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public class ConfigurationKeys {
// Event time of flow action to orchestrate using the multi-active lease arbiter
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY = "orchestratorTriggerEventTimeMillis";
public static final String ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL = "-1";
public static final String FLOW_IS_REMINDER_EVENT_KEY = "isReminderEvent";
public static final String SCHEDULER_EVENT_EPSILON_MILLIS_KEY = MYSQL_LEASE_ARBITER_PREFIX + ".epsilonMillis";
public static final int DEFAULT_SCHEDULER_EVENT_EPSILON_MILLIS = 5000;
// Note: linger should be on the order of seconds even though we measure in millis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* This interface defines a generic approach to a non-blocking, multiple active thread or host system, in which one or
* more active participants compete to take responsiblity for a particular flow's event. The type of flow event in
* more active participants compete to take responsibility for a particular flow's event. The type of flow event in
* question does not impact the algorithm other than to uniquely identify the flow event. Each participant uses the
* interface to initiate an attempt at ownership over the flow event and receives a response indicating the status of
* the attempt.
Expand All @@ -38,7 +38,8 @@
* b) LeasedToAnotherStatus -> another will attempt to carry out the required action before the lease expires
* c) NoLongerLeasingStatus -> flow event no longer needs to be acted upon (terminal state)
* 3. If another participant has acquired the lease before this one could, then the present participant must check back
* in at the time of lease expiry to see if it needs to attempt the lease again [status (b) above].
* in at the time of lease expiry to see if it needs to attempt the lease again [status (b) above]. We refer to this
* check-in as a 'reminder event'.
* 4. Once the participant which acquired the lease completes its work on the flow event, it calls recordLeaseSuccess
* to indicate to all other participants that the flow event no longer needs to be acted upon [status (c) above]
*/
Expand All @@ -51,10 +52,12 @@ public interface MultiActiveLeaseArbiter {
* determine the next action.
* @param flowAction uniquely identifies the flow and the present action upon it
* @param eventTimeMillis is the time this flow action was triggered
* @param isReminderEvent true if the flow action event we're checking on is a reminder event
* @return LeaseAttemptStatus
* @throws IOException
*/
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis) throws IOException;
LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis, boolean isReminderEvent)
throws IOException;

/**
* This method is used to indicate the owner of the lease has successfully completed required actions while holding
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@

@Slf4j
public class MysqlMultiActiveLeaseArbiterTest {
private static final int EPSILON = 30000;
private static final int LINGER = 80000;
private static final int EPSILON = 10000;
private static final int LINGER = 50000;
private static final String USER = "testUser";
private static final String PASSWORD = "testPassword";
private static final String TABLE = "mysql_multi_active_lease_arbiter_store";
Expand All @@ -49,6 +49,7 @@ public class MysqlMultiActiveLeaseArbiterTest {
private static DagActionStore.DagAction resumeDagAction =
new DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.RESUME);
private static final long eventTimeMillis = System.currentTimeMillis();
private static final Timestamp dummyTimestamp = new Timestamp(99999);
private MysqlMultiActiveLeaseArbiter mysqlMultiActiveLeaseArbiter;
private String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, TABLE);
Expand Down Expand Up @@ -82,7 +83,7 @@ Tests all cases of trying to acquire a lease (CASES 1-6 detailed below) for a fl
public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 1 of acquire lease for a flow action event not present in DB
MultiActiveLeaseArbiter.LeaseAttemptStatus firstLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(firstLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus firstObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) firstLaunchStatus;
Expand All @@ -95,7 +96,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
DagActionStore.DagAction killDagAction = new
DagActionStore.DagAction(flowGroup, flowName, flowExecutionId, DagActionStore.FlowActionType.KILL);
MultiActiveLeaseArbiter.LeaseAttemptStatus killStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(killDagAction, eventTimeMillis, false);
Assert.assertTrue(killStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus killObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) killStatus;
Expand All @@ -106,19 +107,19 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
// Very little time should have passed if this test directly follows the one above so this call will be considered
// the same as the previous event
MultiActiveLeaseArbiter.LeaseAttemptStatus secondLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(secondLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus secondLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) secondLaunchStatus;
Assert.assertTrue(secondLeasedToAnotherStatus.getEventTimeMillis() == firstObtainedStatus.getEventTimestamp());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() >= LINGER);
Assert.assertEquals(firstObtainedStatus.getEventTimestamp(), secondLeasedToAnotherStatus.getEventTimeMillis());
Assert.assertTrue(secondLeasedToAnotherStatus.getMinimumLingerDurationMillis() > 0);

// Tests CASE 3 of trying to acquire a lease for a distinct flow action event, while the previous event's lease is
// valid
// Allow enough time to pass for this trigger to be considered distinct, but not enough time so the lease expires
Thread.sleep(EPSILON * 3/2);
MultiActiveLeaseArbiter.LeaseAttemptStatus thirdLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(thirdLaunchStatus instanceof MultiActiveLeaseArbiter.LeasedToAnotherStatus);
MultiActiveLeaseArbiter.LeasedToAnotherStatus thirdLeasedToAnotherStatus =
(MultiActiveLeaseArbiter.LeasedToAnotherStatus) thirdLaunchStatus;
Expand All @@ -128,7 +129,7 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
// Tests CASE 4 of lease out of date
Thread.sleep(LINGER);
MultiActiveLeaseArbiter.LeaseAttemptStatus fourthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(fourthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus fourthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) fourthLaunchStatus;
Expand All @@ -141,14 +142,14 @@ public void testAcquireLeaseSingleParticipant() throws Exception {
Assert.assertTrue(mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(fourthObtainedStatus));
Assert.assertTrue(System.currentTimeMillis() - fourthObtainedStatus.getEventTimestamp() < EPSILON);
MultiActiveLeaseArbiter.LeaseAttemptStatus fifthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(fifthLaunchStatus instanceof MultiActiveLeaseArbiter.NoLongerLeasingStatus);

// Tests CASE 6 of no longer leasing a distinct event in DB
// Wait so this event is considered distinct and a new lease will be acquired
Thread.sleep(EPSILON * 3/2);
MultiActiveLeaseArbiter.LeaseAttemptStatus sixthLaunchStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis);
mysqlMultiActiveLeaseArbiter.tryAcquireLease(launchDagAction, eventTimeMillis, false);
Assert.assertTrue(sixthLaunchStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus);
MultiActiveLeaseArbiter.LeaseObtainedStatus sixthObtainedStatus =
(MultiActiveLeaseArbiter.LeaseObtainedStatus) sixthLaunchStatus;
Expand All @@ -166,9 +167,9 @@ Tests attemptLeaseIfNewRow() method to ensure a new row is inserted if no row ma
@Test (dependsOnMethods = "testAcquireLeaseSingleParticipant")
public void testAcquireLeaseIfNewRow() throws IOException {
// Inserting the first time should update 1 row
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction), 1);
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction), 1);
// Inserting the second time should not update any rows
Assert.assertEquals(this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction), 0);
Assert.assertEquals(mysqlMultiActiveLeaseArbiter.attemptLeaseIfNewRow(resumeDagAction), 0);
}

/*
Expand All @@ -180,22 +181,22 @@ public void testAcquireLeaseIfNewRow() throws IOException {
@Test (dependsOnMethods = "testAcquireLeaseIfNewRow")
public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IOException {
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);

// The following insert will fail since the eventTimestamp does not match
int numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
int numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true,
new Timestamp(99999), new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
dummyTimestamp, new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertEquals(numRowsUpdated, 0);

// The following insert will fail since the leaseAcquisitionTimestamp does not match
numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true,
new Timestamp(selectInfoResult.getEventTimeMillis()), new Timestamp(99999));
new Timestamp(selectInfoResult.getEventTimeMillis()), dummyTimestamp);
Assert.assertEquals(numRowsUpdated, 0);

// This insert should work since the values match all the columns
numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true,
new Timestamp(selectInfoResult.getEventTimeMillis()),
new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Expand All @@ -213,26 +214,92 @@ public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
throws IOException, InterruptedException, SQLException {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess = this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction, Optional.empty());

// Sleep enough time for event to be considered distinct
Thread.sleep(LINGER);
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction,
Optional.empty(), false);

// The following insert will fail since eventTimestamp does not match the expected
int numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
int numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
new Timestamp(99999), null);
dummyTimestamp, null);
Assert.assertEquals(numRowsUpdated, 0);

// This insert does match since we utilize the right eventTimestamp
numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
numRowsUpdated = mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfFinishedStatement, resumeDagAction, true, false,
new Timestamp(selectInfoResult.getEventTimeMillis()), null);
Assert.assertEquals(numRowsUpdated, 1);
}

/*
Tests calling `tryAcquireLease` for an older reminder event which should be immediately returned as `NoLongerLeasing`
*/
@Test (dependsOnMethods = "testConditionallyAcquireLeaseIfFinishedLeasingStatement")
public void testOlderReminderEventAcquireLease() throws IOException {
// Read database to obtain existing db eventTimeMillis and use it to construct an older event
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
long olderEventTimestamp = selectInfoResult.getEventTimeMillis() - 1;
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, olderEventTimestamp, true);
Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
}

/*
Tests calling `tryAcquireLease` for a reminder event for which a valid lease exists in the database. We don't expect
this case to occur because the reminderEvent should be triggered after the lease expires, but ensure it's handled
correctly anyway.
*/
@Test (dependsOnMethods = "testOlderReminderEventAcquireLease")
public void testReminderEventAcquireLeaseOnValidLease() throws IOException {
// Read database to obtain existing db eventTimeMillis and re-use it for the reminder event time
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Assert.assertTrue(attemptStatus instanceof LeasedToAnotherStatus);
LeasedToAnotherStatus leasedToAnotherStatus = (LeasedToAnotherStatus) attemptStatus;
Assert.assertEquals(leasedToAnotherStatus.getEventTimeMillis(), selectInfoResult.getEventTimeMillis());
}

/*
Tests calling `tryAcquireLease` for a reminder event whose lease has expired in the database and should successfully
acquire a new lease
*/
@Test (dependsOnMethods = "testReminderEventAcquireLeaseOnValidLease")
public void testReminderEventAcquireLeaseOnInvalidLease() throws IOException, InterruptedException {
// Read database to obtain existing db eventTimeMillis and wait enough time for the lease to expire
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
Thread.sleep(LINGER);
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Assert.assertTrue(attemptStatus instanceof LeaseObtainedStatus);
LeaseObtainedStatus obtainedStatus = (LeaseObtainedStatus) attemptStatus;
Assert.assertTrue(obtainedStatus.getEventTimestamp() > selectInfoResult.getEventTimeMillis());
Assert.assertTrue(obtainedStatus.getLeaseAcquisitionTimestamp() > selectInfoResult.getLeaseAcquisitionTimeMillis().get().longValue());
}

/*
Tests calling `tryAcquireLease` for a reminder event whose lease has completed in the database and should return
`NoLongerLeasing` status
*/
@Test (dependsOnMethods = "testReminderEventAcquireLeaseOnInvalidLease")
public void testReminderEventAcquireLeaseOnCompletedLease() throws IOException {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess = mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);

// Now have a reminder event check-in on the completed lease
LeaseAttemptStatus attemptStatus =
mysqlMultiActiveLeaseArbiter.tryAcquireLease(resumeDagAction, selectInfoResult.getEventTimeMillis(), true);
Assert.assertTrue(attemptStatus instanceof NoLongerLeasingStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ public FlowTriggerHandler(Config config, Optional<MultiActiveLeaseArbiter> lease
* @param jobProps
* @param flowAction
* @param eventTimeMillis
* @param isReminderEvent
* @throws IOException
*/
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
public void handleTriggerEvent(Properties jobProps, DagActionStore.DagAction flowAction, long eventTimeMillis,
boolean isReminderEvent) throws IOException {
if (multiActiveLeaseArbiter.isPresent()) {
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(flowAction, eventTimeMillis);
MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus = multiActiveLeaseArbiter.get().tryAcquireLease(
flowAction, eventTimeMillis, isReminderEvent);
if (leaseAttemptStatus instanceof MultiActiveLeaseArbiter.LeaseObtainedStatus) {
MultiActiveLeaseArbiter.LeaseObtainedStatus leaseObtainedStatus = (MultiActiveLeaseArbiter.LeaseObtainedStatus) leaseAttemptStatus;
this.leaseObtainedCount.inc();
Expand Down Expand Up @@ -278,6 +280,8 @@ public static JobDataMap updatePropsInJobDataMap(JobDataMap jobDataMap,
// excess flows to be triggered by the reminder functionality.
prevJobProps.setProperty(ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY,
String.valueOf(leasedToAnotherStatus.getEventTimeMillis()));
// Use this boolean to indicate whether this is a reminder event
prevJobProps.setProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, String.valueOf(false));
// Update job data map and reset it in jobDetail
jobDataMap.put(GobblinServiceJobScheduler.PROPERTIES_KEY, prevJobProps);
return jobDataMap;
Expand Down
Loading

0 comments on commit 2a2edfc

Please sign in to comment.