Skip to content

Commit

Permalink
[Java] Fix nested selector calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
vyazelenko committed Aug 25, 2024
1 parent e3644ab commit 177da9b
Showing 1 changed file with 23 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.util.Arrays;
import java.util.function.Consumer;
import java.util.Iterator;
import java.util.Set;
import java.util.function.LongConsumer;
import java.util.stream.Stream;

Expand All @@ -47,24 +48,6 @@ class ReceiverEndPoints extends TransportPoller
// An endpoint that has read data out of the TCP layer but has been back-pressured when attempting to write
// the data into the Aeron stream.
private ReceiverEndPoint backpressuredEndPoint = null;
private int bytesReceived;
private final Consumer<SelectionKey> selectorPoller = (selectionKey) ->
{
if (null == backpressuredEndPoint)
{
final ReceiverEndPoint endPoint = (ReceiverEndPoint)selectionKey.attachment();
final int polledBytes = endPoint.poll();
if (polledBytes < 0)
{
backpressuredEndPoint = endPoint;
bytesReceived -= polledBytes;
}
else
{
bytesReceived += polledBytes;
}
}
};

ReceiverEndPoints(final ErrorHandler errorHandler)
{
Expand Down Expand Up @@ -235,7 +218,7 @@ int pollEndPoints()

private int pollNormalEndPoints(final int numRequiredPollingEndPoints) throws IOException
{
bytesReceived = 0;
int bytesReceived = 0;
final ReceiverEndPoint[] endPoints = this.endPoints;
final int numEndPoints = endPoints.length;
final int threshold = ARTIO_ITERATION_THRESHOLD - numRequiredPollingEndPoints;
Expand All @@ -245,7 +228,26 @@ private int pollNormalEndPoints(final int numRequiredPollingEndPoints) throws IO
}
else
{
selector.selectNow(selectorPoller);
selector.selectNow();
final Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (final Iterator<SelectionKey> iterator = selectedKeys.iterator(); iterator.hasNext();)
{
final SelectionKey key = iterator.next();
if (key != null)
{
final ReceiverEndPoint endPoint = (ReceiverEndPoint)key.attachment();
final int polledBytes = endPoint.poll();
if (polledBytes < 0)
{
backpressuredEndPoint = endPoint;
bytesReceived -= polledBytes;
break;
}

bytesReceived += polledBytes;
iterator.remove();
}
}
}
return bytesReceived;
}
Expand Down

0 comments on commit 177da9b

Please sign in to comment.