From 79818e9fd48fb2ad011ce900f107498a75c980bb Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Thu, 6 Jun 2024 15:31:45 -0400 Subject: [PATCH] fix: adapt source task shutdown logic to work with newer versions of Kafka Connect --- .../connect/jdbc/source/JdbcSourceTask.java | 126 +++++++++--------- 1 file changed, 65 insertions(+), 61 deletions(-) diff --git a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java index 58910701..7065f4fb 100644 --- a/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/aiven/connect/jdbc/source/JdbcSourceTask.java @@ -59,12 +59,13 @@ public class JdbcSourceTask extends SourceTask { // Visible for testing public static final int MAX_QUERY_SLEEP_MS = 100; - private Time time; + private final Time time; private JdbcSourceTaskConfig config; private DatabaseDialect dialect; private CachedConnectionProvider cachedConnectionProvider; - private PriorityQueue tableQueue = new PriorityQueue(); + private final PriorityQueue tableQueue = new PriorityQueue(); private final AtomicBoolean running = new AtomicBoolean(false); + private final Object pollLock = new Object(); public JdbcSourceTask() { this.time = Time.SYSTEM; @@ -269,8 +270,14 @@ private List> possibleTablePartitions(final String table) { public void stop() throws ConnectException { log.info("Stopping JDBC source task"); running.set(false); - // All resources are closed at the end of 'poll()' when no longer running or - // if there is an error + // Wait for any in-progress polls to stop before closing resources + // On older versions of Kafka Connect, SourceTask::stop and SourceTask::poll may + // be called concurrently on different threads + // On more recent versions, SourceTask::stop is always called after the last invocation + // of SourceTask::poll + synchronized (pollLock) { + closeResources(); + } } protected void closeResources() { @@ -297,72 +304,69 @@ protected void closeResources() { @Override public List poll() throws InterruptedException { - log.trace("Polling for new data"); - - while (running.get()) { - final TableQuerier querier = tableQueue.peek(); - - if (!querier.querying()) { - // If not in the middle of an update, wait for next update time - final long nextUpdate = querier.getLastUpdate() - + config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG); - final long untilNext = nextUpdate - time.milliseconds(); - final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS); - if (sleepMs > 0) { - log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)", - sleepMs, querier.toString(), untilNext); - time.sleep(sleepMs); - // Return control to the Connect runtime periodically - // See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll(): - // "If no data is currently available, this method should block but return control to the caller - // regularly (by returning null)" - return null; + synchronized (pollLock) { + log.trace("Polling for new data"); + + while (running.get()) { + final TableQuerier querier = tableQueue.peek(); + + if (!querier.querying()) { + // If not in the middle of an update, wait for next update time + final long nextUpdate = querier.getLastUpdate() + + config.getInt(JdbcSourceTaskConfig.POLL_INTERVAL_MS_CONFIG); + final long untilNext = nextUpdate - time.milliseconds(); + final long sleepMs = Math.min(untilNext, MAX_QUERY_SLEEP_MS); + if (sleepMs > 0) { + log.trace("Waiting {} ms to poll {} next ({} ms total left to wait)", + sleepMs, querier.toString(), untilNext); + time.sleep(sleepMs); + // Return control to the Connect runtime periodically + // See https://kafka.apache.org/37/javadoc/org/apache/kafka/connect/source/SourceTask.html#poll(): + // "If no data is currently available, this method should block but return control to the caller + // regularly (by returning null)" + return null; + } } - } - final List results = new ArrayList<>(); - try { - log.debug("Checking for next block of results from {}", querier.toString()); - querier.maybeStartQuery(cachedConnectionProvider.getConnection()); + final List results = new ArrayList<>(); + try { + log.debug("Checking for next block of results from {}", querier.toString()); + querier.maybeStartQuery(cachedConnectionProvider.getConnection()); - final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG); - boolean hadNext = true; - while (results.size() < batchMaxRows && (hadNext = querier.next())) { - results.add(querier.extractRecord()); - } + final int batchMaxRows = config.getInt(JdbcSourceTaskConfig.BATCH_MAX_ROWS_CONFIG); + boolean hadNext = true; + while (results.size() < batchMaxRows && (hadNext = querier.next())) { + results.add(querier.extractRecord()); + } - if (!hadNext) { - // If we finished processing the results from the current query, we can reset and send - // the querier to the tail of the queue - resetAndRequeueHead(querier); - } + if (!hadNext) { + // If we finished processing the results from the current query, we can reset and send + // the querier to the tail of the queue + resetAndRequeueHead(querier); + } - if (results.isEmpty()) { - log.trace("No updates for {}", querier.toString()); - continue; - } + if (results.isEmpty()) { + log.trace("No updates for {}", querier.toString()); + continue; + } - log.debug("Returning {} records for {}", results.size(), querier.toString()); - return results; - } catch (final SQLException sqle) { - log.error("Failed to run query for table {}: {}", querier.toString(), sqle); - resetAndRequeueHead(querier); - return null; - } catch (final Throwable t) { - resetAndRequeueHead(querier); - // This task has failed, so close any resources (may be reopened if needed) before throwing - closeResources(); - throw t; + log.debug("Returning {} records for {}", results.size(), querier.toString()); + return results; + } catch (final SQLException sqle) { + log.error("Failed to run query for table {}: {}", querier.toString(), sqle); + resetAndRequeueHead(querier); + return null; + } catch (final Throwable t) { + resetAndRequeueHead(querier); + // This task has failed, so close any resources (may be reopened if needed) before throwing + closeResources(); + throw t; + } } - } - // Only in case of shutdown - final TableQuerier querier = tableQueue.peek(); - if (querier != null) { - resetAndRequeueHead(querier); + // Only in case of shutdown + return null; } - closeResources(); - return null; } private void resetAndRequeueHead(final TableQuerier expectedHead) {