diff --git a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java index 74cf50f3..aeb47c6b 100644 --- a/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java +++ b/src/main/java/io/vlingo/xoom/actors/plugin/mailbox/concurrentqueue/ConcurrentQueueMailbox.java @@ -50,11 +50,7 @@ public void resume(final String name) { @Override public void send(final Message message) { - if (isSuspendedExceptFor(message)) { - suspendedDeliveryQueue.get().add(message); - } else { - queue.add(message); - } + queue(message); if (!isDelivering()) { dispatcher.execute(this); } @@ -82,7 +78,11 @@ private boolean isSuspendedExceptFor(final Message override) { @Override public Message receive() { - return queue.poll(); + if (!isSuspended()) { + return queue.poll(); + } else { + return suspendedDeliveryQueue.get().poll(); + } } @Override @@ -95,24 +95,15 @@ public void run() { if (delivering.compareAndSet(false, true)) { final int total = throttlingCount; for (int count = 0; count < total; ++count) { - if (isSuspended()) { - Message message = suspendedDeliveryQueue.get().poll(); - if (message != null) { - message.deliver(); - } else { - break; - } + final Message message = receive(); + if (message != null) { + message.deliver(); } else { - final Message message = receive(); - if (message != null) { - message.deliver(); - } else { - break; - } + break; } } delivering.set(false); - if (!queue.isEmpty() || !suspendedDeliveryQueue.get().isEmpty()) { + if (!isQueueEmpty()) { dispatcher.execute(this); } } @@ -124,6 +115,18 @@ public int pendingMessages() { return queue.size(); } + private void queue(final Message message) { + if (isSuspendedExceptFor(message)) { + suspendedDeliveryQueue.get().add(message); + } else { + queue.add(message); + } + } + + private boolean isQueueEmpty() { + return queue.isEmpty() && suspendedDeliveryQueue.get().isEmpty(); + } + protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttlingCount) { this.dispatcher = dispatcher; this.delivering = new AtomicBoolean(false);