Skip to content

Commit

Permalink
[GOBBLIN-1907] Handle Lease Completion From Other Multi-active Partic…
Browse files Browse the repository at this point in the history
…ipants (#3771)

* Handle Lease Completion From Other Multi-active Participants

* Use optional instead of sentinel value when lease acquisition timestamp null

* Use Java Optional instead of Guava

---------

Co-authored-by: Urmi Mustafi <[email protected]>
  • Loading branch information
umustafi and Urmi Mustafi authored Sep 14, 2023
1 parent d6f546e commit d600b52
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.gobblin.runtime.api;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
Expand All @@ -28,6 +27,7 @@
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;
import java.util.Optional;
import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -200,7 +200,7 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no existing row for this flow action, then go"
+ " ahead and insert", flowAction, eventTimeMillis);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent());
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.empty());
}

// Extract values from result set
Expand Down Expand Up @@ -273,7 +273,7 @@ protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAc
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
return Optional.<GetEventInfoResult>absent();
return Optional.<GetEventInfoResult>empty();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
Expand Down Expand Up @@ -367,11 +367,16 @@ protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throw
protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException {
try {
if (!resultSet.next()) {
throw new IOException("Expected num rows and lease_acquisition_timestamp returned from query but received nothing, so "
+ "providing empty result to lease evaluation code");
throw new IOException("Expected resultSet containing row information for the lease that was attempted but "
+ "received nothing.");
}
if (resultSet.getTimestamp(1) == null) {
throw new IOException("event_timestamp should never be null (it is always set to current timestamp)");
}
long eventTimeMillis = resultSet.getTimestamp(1).getTime();
long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
// Lease acquisition timestamp is null if another participant has completed the lease
Optional<Long> leaseAcquisitionTimeMillis = resultSet.getTimestamp(2) == null ? Optional.empty() :
Optional.of(resultSet.getTimestamp(2).getTime());
int dbLinger = resultSet.getInt(3);
return new SelectInfoResult(eventTimeMillis, leaseAcquisitionTimeMillis, dbLinger);
} catch (SQLException e) {
Expand Down Expand Up @@ -399,17 +404,21 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = getRowInfo(flowAction);
// Another participant won the lease in between
if (!selectInfoResult.getLeaseAcquisitionTimeMillis().isPresent()) {
return new NoLongerLeasingStatus();
}
if (numRowsUpdated == 1) {
log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", flowAction,
selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis());
selectInfoResult.getLeaseAcquisitionTimeMillis().get());
}
log.debug("Another participant acquired lease in between for [{}, eventTimestamp: {}] - num rows updated: ",
flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis() + selectInfoResult.getDbLinger()
selectInfoResult.getLeaseAcquisitionTimeMillis().get() + selectInfoResult.getDbLinger()
- (dbCurrentTimestamp.isPresent() ? dbCurrentTimestamp.get().getTime() : System.currentTimeMillis()));
}

Expand Down Expand Up @@ -545,7 +554,7 @@ static class GetEventInfoResult {
@Data
static class SelectInfoResult {
private final long eventTimeMillis;
private final long leaseAcquisitionTimeMillis;
private final Optional<Long> leaseAcquisitionTimeMillis;
private final int dbLinger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import com.typesafe.config.Config;
import java.io.IOException;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
Expand Down Expand Up @@ -183,7 +185,7 @@ public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
// The following insert will fail since the eventTimestamp does not match
int numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true,
new Timestamp(99999), new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
new Timestamp(99999), new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertEquals(numRowsUpdated, 0);

// The following insert will fail since the leaseAcquisitionTimestamp does not match
Expand All @@ -196,7 +198,7 @@ public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.attemptLeaseIfExistingRow(
formattedAcquireLeaseIfMatchingAllStatement, resumeDagAction, true, true,
new Timestamp(selectInfoResult.getEventTimeMillis()),
new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis()));
new Timestamp(selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertEquals(numRowsUpdated, 1);
}

Expand All @@ -207,13 +209,16 @@ public void testConditionallyAcquireLeaseIfFMatchingAllColsStatement() throws IO
its prior lease, encouraging the current participant to acquire a lease for its event.
*/
@Test (dependsOnMethods = "testConditionallyAcquireLeaseIfFMatchingAllColsStatement")
public void testConditionallyAcquireLeaseIfFinishedLeasingStatement() throws IOException, InterruptedException {
public void testConditionallyAcquireLeaseIfFinishedLeasingStatement()
throws IOException, InterruptedException, SQLException {
// Mark the resume action lease from above as completed by fabricating a LeaseObtainedStatus
MysqlMultiActiveLeaseArbiter.SelectInfoResult selectInfoResult =
this.mysqlMultiActiveLeaseArbiter.getRowInfo(resumeDagAction);
boolean markedSuccess = this.mysqlMultiActiveLeaseArbiter.recordLeaseSuccess(new LeaseObtainedStatus(
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis()));
resumeDagAction, selectInfoResult.getEventTimeMillis(), selectInfoResult.getLeaseAcquisitionTimeMillis().get()));
Assert.assertTrue(markedSuccess);
// Ensure no NPE results from calling this after a lease has been completed and acquisition timestamp val is NULL
mysqlMultiActiveLeaseArbiter.evaluateStatusAfterLeaseAttempt(1, resumeDagAction, Optional.empty());

// Sleep enough time for event to be considered distinct
Thread.sleep(LINGER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,9 @@ public void runJob(Properties jobProps, JobListener jobListener) throws JobExcep
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_NEVER_SET_VAL);
this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis));
} catch (Exception e) {
throw new JobException("Failed to run Spec: " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
String exceptionPrefix = "Failed to run Spec: " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
log.warn(exceptionPrefix + " because", e);
throw new JobException(exceptionPrefix, e);
}
}

Expand Down

0 comments on commit d600b52

Please sign in to comment.