From b0b414da55ce7b331325a1aa8ec72f97c7ead533 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Tue, 7 Jan 2025 18:55:31 +0200 Subject: [PATCH] Reduce response chunk arrival timeout (#8962) --- .../rpc/core/Eth2IncomingRequestHandler.java | 10 ++++---- .../rpc/core/Eth2OutgoingRequestHandler.java | 24 +++++++++---------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java index 2a1bce8e6f3..9d3c3fdcb83 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2IncomingRequestHandler.java @@ -33,7 +33,9 @@ public class Eth2IncomingRequestHandler< TRequest extends RpcRequest & SszData, TResponse extends SszData> implements RpcRequestHandler { + private static final Logger LOG = LogManager.getLogger(); + private static final Duration RECEIVE_INCOMING_REQUEST_TIMEOUT = Duration.ofSeconds(10); private final PeerLookup peerLookup; @@ -119,9 +121,8 @@ private void handleRequest( private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) { asyncRunner - .getDelayedFuture(RECEIVE_INCOMING_REQUEST_TIMEOUT) - .thenAccept( - (__) -> { + .runAfterDelay( + () -> { if (!requestHandled.get()) { LOG.debug( "Failed to receive incoming request data within {} sec for protocol {}. Close stream.", @@ -129,7 +130,8 @@ private void ensureRequestReceivedWithinTimeLimit(final RpcStream stream) { protocolId); stream.closeAbruptly().ifExceptionGetsHereRaiseABug(); } - }) + }, + RECEIVE_INCOMING_REQUEST_TIMEOUT) .ifExceptionGetsHereRaiseABug(); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java index 320a5a3f943..be3e39cd95b 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/core/Eth2OutgoingRequestHandler.java @@ -55,7 +55,7 @@ enum State { private static final Logger LOG = LogManager.getLogger(); @VisibleForTesting static final Duration READ_COMPLETE_TIMEOUT = Duration.ofSeconds(10); - @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(30); + @VisibleForTesting static final Duration RESPONSE_CHUNK_ARRIVAL_TIMEOUT = Duration.ofSeconds(10); private final AsyncRunner asyncRunner; private final int maximumResponseChunks; @@ -116,7 +116,7 @@ public void processData(final NodeId nodeId, final RpcStream rpcStream, final By throw new RpcException.ExtraDataAppendedException(" extra data: " + bufToString(data)); } - List maybeResponses = responseDecoder.decodeNextResponses(data); + final List maybeResponses = responseDecoder.decodeNextResponses(data); final int chunksReceived = currentChunkCount.addAndGet(maybeResponses.size()); if (chunksReceived > maximumResponseChunks) { @@ -161,8 +161,8 @@ private String bufToString(final ByteBuf buf) { final int contentSize = Integer.min(buf.readableBytes(), 1024); String bufContent = ""; if (contentSize > 0) { - ByteBuf bufSlice = buf.slice(0, contentSize); - byte[] bytes = new byte[bufSlice.readableBytes()]; + final ByteBuf bufSlice = buf.slice(0, contentSize); + final byte[] bytes = new byte[bufSlice.readableBytes()]; bufSlice.getBytes(0, bytes); bufContent += Bytes.wrap(bytes); if (contentSize < buf.readableBytes()) { @@ -255,9 +255,8 @@ private void ensureNextResponseChunkArrivesInTime( final int previousResponseCount, final AtomicInteger currentResponseCount) { timeoutRunner - .getDelayedFuture(RESPONSE_CHUNK_ARRIVAL_TIMEOUT) - .thenAccept( - (__) -> { + .runAfterDelay( + () -> { if (previousResponseCount == currentResponseCount.get()) { abortRequest( stream, @@ -265,22 +264,23 @@ private void ensureNextResponseChunkArrivesInTime( "Timed out waiting for response chunk " + previousResponseCount, RESPONSE_CHUNK_ARRIVAL_TIMEOUT)); } - }) + }, + RESPONSE_CHUNK_ARRIVAL_TIMEOUT) .ifExceptionGetsHereRaiseABug(); } private void ensureReadCompleteArrivesInTime(final RpcStream stream) { timeoutRunner - .getDelayedFuture(READ_COMPLETE_TIMEOUT) - .thenAccept( - (__) -> { + .runAfterDelay( + () -> { if (!(state.get() == READ_COMPLETE || state.get() == CLOSED)) { abortRequest( stream, new RpcTimeoutException( "Timed out waiting for read channel close", READ_COMPLETE_TIMEOUT)); } - }) + }, + READ_COMPLETE_TIMEOUT) .ifExceptionGetsHereRaiseABug(); }