diff --git a/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java b/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java deleted file mode 100644 index 294c69d0..00000000 --- a/src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java +++ /dev/null @@ -1,51 +0,0 @@ -// Copyright © 2012-2021 VLINGO LABS. All rights reserved. -// -// This Source Code Form is subject to the terms of the -// Mozilla Public License, v. 2.0. If a copy of the MPL -// was not distributed with this file, You can obtain -// one at https://mozilla.org/MPL/2.0/. - -package io.vlingo.xoom.actors; - -public class ResumingMailbox implements Mailbox { - private final Message message; - - public ResumingMailbox(final Message message) { - this.message = message; - } - - @Override - public void run() { - message.deliver(); - } - - @Override - public void close() { } - - @Override - public boolean isClosed() { return false; } - - @Override - public boolean isDelivering() { return true; } - - @Override - public int concurrencyCapacity() { return 0; } - - @Override - public void resume(final String name) { } - - @Override - public void send(final Message message) { } - - @Override - public void suspendExceptFor(String name, Class... overrides) { } - - @Override - public boolean isSuspended() { return false; } - - @Override - public Message receive() { return null; } - - @Override - public int pendingMessages() { return 1; } -} diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index aeb47c6b..e7731aeb 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -10,25 +10,26 @@ import io.vlingo.xoom.actors.Dispatcher; import io.vlingo.xoom.actors.Mailbox; import io.vlingo.xoom.actors.Message; -import io.vlingo.xoom.actors.ResumingMailbox; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; public class ConcurrentQueueMailbox implements Mailbox, Runnable { private AtomicBoolean delivering; private final Dispatcher dispatcher; - private AtomicReference suspendedDeliveryOverrides; - private AtomicReference suspendedDeliveryQueue; + private final AtomicReference suspendedDeliveryOverrides; + private final AtomicReference suspendedDeliveryQueue; private final Queue queue; private final byte throttlingCount; @Override public void close() { queue.clear(); + suspendedDeliveryQueue.get().clear(); } @Override @@ -44,6 +45,7 @@ public int concurrencyCapacity() { @Override public void resume(final String name) { if (suspendedDeliveryOverrides.get().pop(name)) { + suspendedDeliveryQueue.get().putBack(this::queue); dispatcher.execute(this); } } @@ -112,7 +114,7 @@ public void run() { /* @see io.vlingo.xoom.actors.Mailbox#pendingMessages() */ @Override public int pendingMessages() { - return queue.size(); + return queue.size() + suspendedDeliveryQueue.get().size(); } private void queue(final Message message) { @@ -279,7 +281,6 @@ public void add(final Message message) { break; } } - } public Message poll() { @@ -295,8 +296,32 @@ public Message poll() { } } + public void putBack(final Consumer consumer) { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.forEach(consumer); + queue.clear(); + accessible.set(false); + break; + } + } + } + + public void clear() { + while(true) { + if (accessible.compareAndSet(false, true)) { + queue.clear(); + break; + } + } + } + public boolean isEmpty() { return queue.isEmpty(); } + + public int size() { + return queue.size(); + } } } diff --git a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java index abc2df39..037cca06 100644 --- a/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java +++ b/src/test/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailboxTest.java @@ -113,6 +113,33 @@ public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() { assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); } + @Test + public void testThatSuspendedButNotHandledMessagesAreQueued() { + + final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f); + final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1); + + final TestResults testResults = new TestResults(3); + final CountTakerActor actor = new CountTakerActor(testResults); + + mailbox.suspendExceptFor("paused#", CountTaker.class); + + for (int count = 0; count < 3; ++count) { + final int countParam = count; + final SerializableConsumer consumer = (consumerActor) -> { + // Give longer Delay to messages that come first + delay(20 - (countParam * 10)); + consumerActor.take(countParam); + }; + final LocalMessage message = new LocalMessage(actor, CountTaker.class, consumer, "take(int)"); + mailbox.send(message); + } + + mailbox.resume("paused#"); + + assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts()); + } + @Before @Override public void setUp() throws Exception {