Skip to content

Commit

Permalink
catch sql exception & update test
Browse files Browse the repository at this point in the history
  • Loading branch information
Urmi Mustafi committed Jul 24, 2023
1 parent 1ecaafa commit 6244592
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,7 @@ protected interface CheckedFunction<T, R> {
// Need to define three separate statements to handle cases where row does not exist or has null values to check
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT = "INSERT INTO %s (flow_group, "
+ "flow_name, flow_execution_id, flow_action, event_timestamp, lease_acquisition_timestamp) "
+ "SELECT ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP WHERE NOT EXISTS (SELECT * FROM %s "
+ WHERE_CLAUSE_TO_MATCH_KEY + ")";
+ "VALUES(?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)";
protected static final String CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT = "UPDATE %s "
+ "SET event_timestamp=CURRENT_TIMESTAMP, lease_acquisition_timestamp=CURRENT_TIMESTAMP "
+ WHERE_CLAUSE_TO_MATCH_KEY + " AND event_timestamp=? AND lease_acquisition_timestamp is NULL";
Expand Down Expand Up @@ -226,7 +225,14 @@ public LeaseAttemptStatus tryAcquireLease(DagActionStore.DagAction flowAction, l
int numRowsUpdated = withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, flowAction);
return insertStatement.executeUpdate();
try {
return insertStatement.executeUpdate();
} catch (SQLIntegrityConstraintViolationException e) {
if (!e.getMessage().contains("Duplicate entry")) {
throw e;
}
}
return 0;
}, true);
return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, Optional.absent());
}
Expand Down Expand Up @@ -396,11 +402,6 @@ protected static void completeInsertPreparedStatement(PreparedStatement statemen
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
statement.setString(++i, flowAction.getFlowActionType().toString());
// Values to check if a row with this primary key exists
statement.setString(++i, flowAction.getFlowGroup());
statement.setString(++i, flowAction.getFlowName());
statement.setString(++i, flowAction.getFlowExecutionId());
statement.setString(++i, flowAction.getFlowActionType().toString());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.typesafe.config.Config;
import java.io.IOException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.Timestamp;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
Expand Down Expand Up @@ -174,13 +175,22 @@ public void testConditionallyAcquireLeaseIfNewRow() throws IOException {
return insertStatement.executeUpdate();
}, true);
Assert.assertEquals(numRowsUpdated, 1);
// Inserting the second time should update 0 rows but not throw any error
numRowsUpdated = this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, resumeDagAction);
return insertStatement.executeUpdate();
}, true);
Assert.assertEquals(numRowsUpdated, 0);
// Inserting the second time should throw an error
boolean wasExceptionThrown =
this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
insertStatement -> {
completeInsertPreparedStatement(insertStatement, resumeDagAction);
try {
insertStatement.executeUpdate();
return false;
} catch (SQLIntegrityConstraintViolationException e) {
if (e.getMessage().contains("Duplicate entry")) {
return true;
}
}
return false;
}, true);
Assert.assertTrue(wasExceptionThrown);
}

/*
Expand Down

0 comments on commit 6244592

Please sign in to comment.