Skip to content

Commit

Permalink
Extract methods for readability
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Zalas <[email protected]>
  • Loading branch information
jakzal committed Dec 14, 2021
1 parent 780c3cc commit 6223e4b
Showing 1 changed file with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,7 @@ public void resume(final String name) {

@Override
public void send(final Message message) {
if (isSuspendedExceptFor(message)) {
suspendedDeliveryQueue.get().add(message);
} else {
queue.add(message);
}
queue(message);
if (!isDelivering()) {
dispatcher.execute(this);
}
Expand Down Expand Up @@ -82,7 +78,11 @@ private boolean isSuspendedExceptFor(final Message override) {

@Override
public Message receive() {
return queue.poll();
if (!isSuspended()) {
return queue.poll();
} else {
return suspendedDeliveryQueue.get().poll();
}
}

@Override
Expand All @@ -95,24 +95,15 @@ public void run() {
if (delivering.compareAndSet(false, true)) {
final int total = throttlingCount;
for (int count = 0; count < total; ++count) {
if (isSuspended()) {
Message message = suspendedDeliveryQueue.get().poll();
if (message != null) {
message.deliver();
} else {
break;
}
final Message message = receive();
if (message != null) {
message.deliver();
} else {
final Message message = receive();
if (message != null) {
message.deliver();
} else {
break;
}
break;
}
}
delivering.set(false);
if (!queue.isEmpty() || !suspendedDeliveryQueue.get().isEmpty()) {
if (!isQueueEmpty()) {
dispatcher.execute(this);
}
}
Expand All @@ -124,6 +115,18 @@ public int pendingMessages() {
return queue.size();
}

private void queue(final Message message) {
if (isSuspendedExceptFor(message)) {
suspendedDeliveryQueue.get().add(message);
} else {
queue.add(message);
}
}

private boolean isQueueEmpty() {
return queue.isEmpty() && suspendedDeliveryQueue.get().isEmpty();
}

protected ConcurrentQueueMailbox(final Dispatcher dispatcher, final int throttlingCount) {
this.dispatcher = dispatcher;
this.delivering = new AtomicBoolean(false);
Expand Down

0 comments on commit 6223e4b

Please sign in to comment.