Skip to content

Commit

Permalink
Put any left suspended delivery messages back to the regular queue on…
Browse files Browse the repository at this point in the history
… resume

Signed-off-by: Jakub Zalas <[email protected]>
  • Loading branch information
jakzal committed Jan 10, 2022
1 parent 6223e4b commit 7a6b1c0
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 56 deletions.
51 changes: 0 additions & 51 deletions src/main/java/io/vlingo/xoom/actors/ResumingMailbox.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,26 @@
import io.vlingo.xoom.actors.Dispatcher;
import io.vlingo.xoom.actors.Mailbox;
import io.vlingo.xoom.actors.Message;
import io.vlingo.xoom.actors.ResumingMailbox;

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class ConcurrentQueueMailbox implements Mailbox, Runnable {
private AtomicBoolean delivering;
private final Dispatcher dispatcher;
private AtomicReference<SuspendedDeliveryOverrides> suspendedDeliveryOverrides;
private AtomicReference<SuspendedDeliveryQueue> suspendedDeliveryQueue;
private final AtomicReference<SuspendedDeliveryOverrides> suspendedDeliveryOverrides;
private final AtomicReference<SuspendedDeliveryQueue> suspendedDeliveryQueue;
private final Queue<Message> queue;
private final byte throttlingCount;

@Override
public void close() {
queue.clear();
suspendedDeliveryQueue.get().clear();
}

@Override
Expand All @@ -44,6 +45,7 @@ public int concurrencyCapacity() {
@Override
public void resume(final String name) {
if (suspendedDeliveryOverrides.get().pop(name)) {
suspendedDeliveryQueue.get().putBack(this::queue);
dispatcher.execute(this);
}
}
Expand Down Expand Up @@ -112,7 +114,7 @@ public void run() {
/* @see io.vlingo.xoom.actors.Mailbox#pendingMessages() */
@Override
public int pendingMessages() {
return queue.size();
return queue.size() + suspendedDeliveryQueue.get().size();
}

private void queue(final Message message) {
Expand Down Expand Up @@ -279,7 +281,6 @@ public void add(final Message message) {
break;
}
}

}

public Message poll() {
Expand All @@ -295,8 +296,32 @@ public Message poll() {
}
}

public void putBack(final Consumer<Message> consumer) {
while(true) {
if (accessible.compareAndSet(false, true)) {
queue.forEach(consumer);
queue.clear();
accessible.set(false);
break;
}
}
}

public void clear() {
while(true) {
if (accessible.compareAndSet(false, true)) {
queue.clear();
break;
}
}
}

public boolean isEmpty() {
return queue.isEmpty();
}

public int size() {
return queue.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,33 @@ public void testThatSuspendedOverrideMessagesAreDeliveredInOrderTheyArrived() {
assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
}

@Test
public void testThatSuspendedButNotHandledMessagesAreQueued() {

final Dispatcher dispatcher = new ExecutorDispatcher(2, 0, 1.0f);
final Mailbox mailbox = new ConcurrentQueueMailbox(dispatcher, 1);

final TestResults testResults = new TestResults(3);
final CountTakerActor actor = new CountTakerActor(testResults);

mailbox.suspendExceptFor("paused#", CountTaker.class);

for (int count = 0; count < 3; ++count) {
final int countParam = count;
final SerializableConsumer<CountTaker> consumer = (consumerActor) -> {
// Give longer Delay to messages that come first
delay(20 - (countParam * 10));
consumerActor.take(countParam);
};
final LocalMessage<CountTaker> message = new LocalMessage<CountTaker>(actor, CountTaker.class, consumer, "take(int)");
mailbox.send(message);
}

mailbox.resume("paused#");

assertEquals(Arrays.asList(0, 1, 2), actor.testResults.getCounts());
}

@Before
@Override
public void setUp() throws Exception {
Expand Down

0 comments on commit 7a6b1c0

Please sign in to comment.