diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java index 24462c50b12..390ea64d708 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java @@ -123,8 +123,7 @@ protected interface CheckedFunction { // Need to define three separate statements to handle cases where row does not exist or has null values to check protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, " + "flow_name, flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) " - + "SELECT ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP WHERE NOT EXISTS (SELECT * FROM %s " - + WHERE_CLAUSE_TO_MATCH_KEY + ")"; + + "VALUES(?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)"; protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s " + "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP " + WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL"; @@ -226,7 +225,14 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseNewRowStatement, insertStatement -> { completeInsertPreparedStatement(insertStatement, flowAction); - return insertStatement.executeUpdate(); + try { + return insertStatement.executeUpdate(); + } catch (SQLIntegrityConstraintViolationException e) { + if (!e.getMessage().contains("Duplicate entry")) { + throw e; + } + } + return 0; }, true); return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent()); } @@ -396,11 +402,6 @@ protected static void completeInsertPreparedStatement(PreparedStatement statemen statement.setString(++i, flowAction.getFlowName()); statement.setString(++i, flowAction.getFlowExecutionId()); statement.setString(++i, flowAction.getFlowActionType().toString()); - // Values to check if a row with this primary key exists - statement.setString(++i, flowAction.getFlowGroup()); - statement.setString(++i, flowAction.getFlowName()); - statement.setString(++i, flowAction.getFlowExecutionId()); - statement.setString(++i, flowAction.getFlowActionType().toString()); } /** diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java index b9920c91de1..9c7b0554d81 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java @@ -19,6 +19,7 @@ import com.typesafe.config.Config; import java.io.IOException; +import java.sql.SQLIntegrityConstraintViolationException; import java.sql.Timestamp; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.config.ConfigBuilder; @@ -174,13 +175,22 @@ public void testConditionallyAcquireLeaseIfNewRow() throws IOException { return insertStatement.executeUpdate(); }, true); Assert.assertEquals(numRowsUpdated, 1); - // Inserting the second time should update 0 rows but not throw any error - numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement, - insertStatement -> { - completeInsertPreparedStatement(insertStatement, resumeDagAction); - return insertStatement.executeUpdate(); - }, true); - Assert.assertEquals(numRowsUpdated, 0); + // Inserting the second time should throw an error + boolean wasExceptionThrown = + this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement, + insertStatement -> { + completeInsertPreparedStatement(insertStatement, resumeDagAction); + try { + insertStatement.executeUpdate(); + return false; + } catch (SQLIntegrityConstraintViolationException e) { + if (e.getMessage().contains("Duplicate entry")) { + return true; + } + } + return false; + }, true); + Assert.assertTrue(wasExceptionThrown); } /*