Skip to content

Commit

Permalink
Refactor to create better api for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
Urmi Mustafi committed Aug 4, 2023
1 parent 6244592 commit f6ad3d7
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@
package org.apache.gobblin.runtime.api;

import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;
import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;

import com.google.inject.Inject;
import com.typesafe.config.Config;
import com.zaxxer.hikari.HikariDataSource;

import javax.sql.DataSource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
Expand Down Expand Up @@ -90,6 +87,8 @@ protected interface CheckedFunction<T, R> {
private final int linger;
private String thisTableGetInfoStatement;
private String thisTableSelectAfterInsertStatement;
private String thisTableAcquireLeaseIfMatchingAllStatement;
private String thisTableAcquireLeaseIfFinishedStatement;

// TODO: define retention on this table
private static final String CREATE_LEASE_ARBITER_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s ("
Expand All @@ -102,8 +101,8 @@ protected interface CheckedFunction<T, R> {
private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s "
+ "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY (primary_key))";
// Only insert epsilon and linger values from config if this table does not contain a pre-existing values already.
private static final String INSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (primary_key, epsilon, linger) "
+ "VALUES(1, ?, ?)";
private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO %s (primary_key, epsilon, linger) "
+ "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), linger=VALUES(linger)";
protected static final String WHERE_CLAUSE_TO_MATCH_KEY = "WHERE flow_group=? AND flow_name=? AND flow_execution_id=?"
+ " AND flow_action=?";
protected static final String WHERE_CLAUSE_TO_MATCH_ROW = WHERE_CLAUSE_TO_MATCH_KEY
Expand All @@ -121,9 +120,9 @@ protected interface CheckedFunction<T, R> {
+ "ELSE 3 END as lease_validity_status, linger, CURRENT_TIMESTAMP FROM %s, %s " + WHERE_CLAUSE_TO_MATCH_KEY;
// Insert or update row to acquire lease if values have not changed since the previous read
// Need to define three separate statements to handle cases where row does not exist or has null values to check
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, "
+ "flow_name, flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) "
+ "VALUES(?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, flow_name, "
+ "flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) VALUES(?, ?, ?, ?, "
+ "CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
Expand Down Expand Up @@ -155,6 +154,10 @@ public MysqlMultiActiveLeaseArbiter(Config config) throws IOException {
this.constantsTableName);
this.thisTableSelectAfterInsertStatement = String.format(SELECT_AFTER_INSERT_STATEMENT, this.leaseArbiterTableName,
this.constantsTableName);
this.thisTableAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, this.leaseArbiterTableName);
this.thisTableAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName);
this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
String createArbiterStatement = String.format(
CREATE_LEASE_ARBITER_TABLE_STATEMENT, leaseArbiterTableName);
Expand All @@ -175,65 +178,26 @@ private void initializeConstantsTable() throws IOException {
String createConstantsStatement = String.format(CREATE_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(createConstantsStatement, createStatement -> createStatement.executeUpdate(), true);

String insertConstantsStatement = String.format(INSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
String insertConstantsStatement = String.format(UPSERT_CONSTANTS_TABLE_STATEMENT, this.constantsTableName);
withPreparedStatement(insertConstantsStatement, insertStatement -> {
int i = 0;
insertStatement.setInt(++i, epsilon);
insertStatement.setInt(++i, linger);
try {
return insertStatement.executeUpdate();
} catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry '1' for key 'PRIMARY")) {
throw e;
}
}
return null;
return insertStatement.executeUpdate();
}, true);
}

@Override
public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, long eventTimeMillis)
throws IOException {
// Check table for an existing entry for this flow action and event time
Optional<GetEventInfoResult> getResult = withPreparedStatement(thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
return Optional.absent();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
// Query lease arbiter table about this flow action
Optional<GetEventInfoResult> getResult = getExistingEventInfo(flowAction);

try {
if (!getResult.isPresent()) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no existing row for this flow action, then go"
+ " ahead and insert", flowAction, eventTimeMillis);
String formattedAcquireLeaseNewRowStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName,
this.leaseArbiterTableName);
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
try {
return insertStatement.executeUpdate();
} catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
throw e;
}
}
return 0;
}, true);
int numRowsUpdated = attemptLeaseIfNewRow(flowAction);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent());
}

Expand Down Expand Up @@ -273,14 +237,8 @@ else if (leaseValidityStatus == 2) {
dbEventTimestamp, dbLeaseAcquisitionTimestamp, dbLinger);
}
// Use our event to acquire lease, check for previous db eventTimestamp and leaseAcquisitionTimestamp
String formattedAcquireLeaseIfMatchingAllStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, this.leaseArbiterTableName);
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, true,
true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfMatchingAllStatement, flowAction,
true,true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp));
} // No longer leasing this event
if (isWithinEpsilon) {
Expand All @@ -291,20 +249,39 @@ else if (leaseValidityStatus == 2) {
log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: Distinct event, no longer leasing event in "
+ "db", flowAction, dbCurrentTimestamp.getTime());
// Use our event to acquire lease, check for previous db eventTimestamp and NULL leaseAcquisitionTimestamp
String formattedAcquireLeaseIfFinishedStatement =
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, this.leaseArbiterTableName);
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, true,
false, dbEventTimestamp, null);
return insertStatement.executeUpdate();
}, true);
int numRowsUpdated = attemptLeaseIfExistingRow(thisTableAcquireLeaseIfFinishedStatement, flowAction,
true, false, dbEventTimestamp, null);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.of(dbCurrentTimestamp));
} catch (SQLException e) {
throw new RuntimeException(e);
}
}

/**
* Checks leaseArbiterTable for an existing entry for this flow action and event time
*/
protected Optional<GetEventInfoResult> getExistingEventInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableGetInfoStatement,
getInfoStatement -> {
int i = 0;
getInfoStatement.setString(++i, flowAction.getFlowGroup());
getInfoStatement.setString(++i, flowAction.getFlowName());
getInfoStatement.setString(++i, flowAction.getFlowExecutionId());
getInfoStatement.setString(++i, flowAction.getFlowActionType().toString());
ResultSet resultSet = getInfoStatement.executeQuery();
try {
if (!resultSet.next()) {
return Optional.absent();
}
return Optional.of(createGetInfoResult(resultSet));
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
}

protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOException {
try {
// Extract values from result set
Expand All @@ -329,6 +306,62 @@ protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws IOE
}
}

/**
* Called by participant to try to acquire lease for a flow action that does not have an attempt in progress or in
* near past for it.
* @return int corresponding to number of rows updated by INSERT statement to acquire lease
*/
protected int attemptLeaseIfNewRow(DagActionStore.DagAction flowAction) throws IOException {
String formattedAcquireLeaseNewRowStatement =
String.format(ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, this.leaseArbiterTableName);
return withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
try {
return insertStatement.executeUpdate();
} catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
throw e;
}
return 0;
}
}, true);
}

/**
* Called by participant to try to acquire lease for a flow action that has an existing, completed, or expired lease
* attempt for the flow action in the table.
* @return int corresponding to number of rows updated by INSERT statement to acquire lease
*/
protected int attemptLeaseIfExistingRow(String acquireLeaseStatement, DagActionStore.DagAction flowAction,
boolean needEventTimeCheck, boolean needLeaseAcquisition, Timestamp dbEventTimestamp,
Timestamp dbLeaseAcquisitionTimestamp) throws IOException {
return withPreparedStatement(acquireLeaseStatement,
insertStatement -> {
completeUpdatePreparedStatement(insertStatement, flowAction, needEventTimeCheck,
needLeaseAcquisition, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
return insertStatement.executeUpdate();
}, true);
}

/**
* Checks leaseArbiter table for a row corresponding to this flow action to determine if the lease acquisition attempt
* was successful or not.
*/
protected SelectInfoResult getRowInfo(DagActionStore.DagAction flowAction) throws IOException {
return withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
try {
return createSelectInfoResult(resultSet);
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
}
protected static SelectInfoResult createSelectInfoResult(ResultSet resultSet) throws IOException {
try {
if (!resultSet.next()) {
Expand Down Expand Up @@ -363,25 +396,15 @@ protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int numRowsUpdated,
DagActionStore.DagAction flowAction, Optional<Timestamp> dbCurrentTimestamp)
throws SQLException, IOException {
// Fetch values in row after attempted insert
SelectInfoResult selectInfoResult = withPreparedStatement(thisTableSelectAfterInsertStatement,
selectStatement -> {
completeWhereClauseMatchingKeyPreparedStatement(selectStatement, flowAction);
ResultSet resultSet = selectStatement.executeQuery();
try {
return createSelectInfoResult(resultSet);
} finally {
if (resultSet != null) {
resultSet.close();
}
}
}, true);
SelectInfoResult selectInfoResult = getRowInfo(flowAction);
if (numRowsUpdated == 1) {
log.debug("Obtained lease for [{}, eventTimestamp: {}] successfully!", flowAction,
selectInfoResult.eventTimeMillis);
return new LeaseObtainedStatus(flowAction, selectInfoResult.eventTimeMillis,
selectInfoResult.getLeaseAcquisitionTimeMillis());
}
log.debug("Another participant acquired lease in between for [{}, eventTimestamp: {}] - num rows updated: ", flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
log.debug("Another participant acquired lease in between for [{}, eventTimestamp: {}] - num rows updated: ",
flowAction, selectInfoResult.eventTimeMillis, numRowsUpdated);
// Another participant acquired lease in between
return new LeasedToAnotherStatus(flowAction, selectInfoResult.getEventTimeMillis(),
selectInfoResult.getLeaseAcquisitionTimeMillis() + selectInfoResult.getDbLinger()
Expand Down
Loading

0 comments on commit f6ad3d7

Please sign in to comment.