From 1b45fe73caedddd1c5af6e467bf2973470eec0b0 Mon Sep 17 00:00:00 2001 From: JiHongKim98 Date: Mon, 11 Nov 2024 17:14:39 +0900 Subject: [PATCH 1/2] Add wait for container stop on cancellation --- .../data/redis/stream/StreamPollTask.java | 11 ++++++++++ ...sageListenerContainerIntegrationTests.java | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java index 6641a791c3..d6d5e01934 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java @@ -41,6 +41,7 @@ * {@link Task} that invokes a {@link BiFunction read function} to poll on a Redis Stream. * * @author Mark Paluch + * @author JiHongKim98 * @see 2.2 */ class StreamPollTask> implements Task { @@ -54,6 +55,8 @@ class StreamPollTask> implements Task { private final PollState pollState; private final TypeDescriptor targetType; + private final CountDownLatch cancelLatch = new CountDownLatch(1); + private volatile boolean isInEventLoop = false; StreamPollTask(StreamReadRequest streamRequest, StreamListener listener, ErrorHandler errorHandler, @@ -83,6 +86,12 @@ private static PollState createPollState(StreamReadRequest streamRequest) { @Override public void cancel() throws DataAccessResourceFailureException { this.pollState.cancel(); + + try { + cancelLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } @Override @@ -112,6 +121,7 @@ public void run() { doLoop(); } finally { isInEventLoop = false; + cancelLatch.countDown(); } } @@ -134,6 +144,7 @@ private void doLoop() { } catch (RuntimeException ex) { if (cancelSubscriptionOnError.test(ex)) { + cancelLatch.countDown(); cancel(); } diff --git a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java index 4e5d620ccf..fda3f94a93 100644 --- a/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/stream/AbstractStreamMessageListenerContainerIntegrationTests.java @@ -384,6 +384,27 @@ void containerRestartShouldRestartSubscription() throws InterruptedException { cancelAwait(subscription); } + @Test // GH-2261 + void containerShouldStopGracefully() throws InterruptedException { + StreamMessageListenerContainer> container = StreamMessageListenerContainer + .create(connectionFactory, containerOptions); + + BlockingQueue> queue = new LinkedBlockingQueue<>(); + container.start(); + Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), r -> { + try { + Thread.sleep(1500); + } catch (InterruptedException e) { + // ignore + } + queue.add(r); + }); + redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1")); + subscription.await(DEFAULT_TIMEOUT); + container.stop(); + assertThat(queue.poll(500, TimeUnit.MILLISECONDS)).isNotNull(); + } + private static void cancelAwait(Subscription subscription) { subscription.cancel(); From d474333c7021b9707ced3aac4b8ee78bb574b6d3 Mon Sep 17 00:00:00 2001 From: JiHongKim98 Date: Mon, 11 Nov 2024 18:14:34 +0900 Subject: [PATCH 2/2] Add cancel latch countdown on interrupt error --- .../org/springframework/data/redis/stream/StreamPollTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java index d6d5e01934..12ce1f5446 100644 --- a/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java +++ b/src/main/java/org/springframework/data/redis/stream/StreamPollTask.java @@ -139,6 +139,7 @@ private void doLoop() { } catch (InterruptedException ex) { + cancelLatch.countDown(); cancel(); Thread.currentThread().interrupt(); } catch (RuntimeException ex) {