diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java index 3eaec37dd2..3632ed5e12 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/ReceiverEndPoints.java @@ -24,7 +24,8 @@ import java.io.IOException; import java.nio.channels.SelectionKey; import java.util.Arrays; -import java.util.function.Consumer; +import java.util.Iterator; +import java.util.Set; import java.util.function.LongConsumer; import java.util.stream.Stream; @@ -47,24 +48,6 @@ class ReceiverEndPoints extends TransportPoller // An endpoint that has read data out of the TCP layer but has been back-pressured when attempting to write // the data into the Aeron stream. private ReceiverEndPoint backpressuredEndPoint = null; - private int bytesReceived; - private final Consumer selectorPoller = (selectionKey) -> - { - if (null == backpressuredEndPoint) - { - final ReceiverEndPoint endPoint = (ReceiverEndPoint)selectionKey.attachment(); - final int polledBytes = endPoint.poll(); - if (polledBytes < 0) - { - backpressuredEndPoint = endPoint; - bytesReceived -= polledBytes; - } - else - { - bytesReceived += polledBytes; - } - } - }; ReceiverEndPoints(final ErrorHandler errorHandler) { @@ -235,7 +218,7 @@ int pollEndPoints() private int pollNormalEndPoints(final int numRequiredPollingEndPoints) throws IOException { - bytesReceived = 0; + int bytesReceived = 0; final ReceiverEndPoint[] endPoints = this.endPoints; final int numEndPoints = endPoints.length; final int threshold = ARTIO_ITERATION_THRESHOLD - numRequiredPollingEndPoints; @@ -245,7 +228,26 @@ private int pollNormalEndPoints(final int numRequiredPollingEndPoints) throws IO } else { - selector.selectNow(selectorPoller); + selector.selectNow(); + final Set selectedKeys = selector.selectedKeys(); + for (final Iterator iterator = selectedKeys.iterator(); iterator.hasNext();) + { + final SelectionKey key = iterator.next(); + if (key != null) + { + final ReceiverEndPoint endPoint = (ReceiverEndPoint)key.attachment(); + final int polledBytes = endPoint.poll(); + if (polledBytes < 0) + { + backpressuredEndPoint = endPoint; + bytesReceived -= polledBytes; + break; + } + + bytesReceived += polledBytes; + iterator.remove(); + } + } } return bytesReceived; }