From b7dc4bfc84d85edf4144f66d4b1b71f6833b0dce Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Mon, 24 Feb 2025 15:44:08 +0000 Subject: [PATCH] Add draft implementation for ExecutionPayloadEnvelopesByRange --- .../chains/ThrottlingSyncSource.java | 36 ++ ...PayloadEnvelopesByRangeRequestMessage.java | 74 ++++ .../eth2/peers/DefaultEth2Peer.java | 49 +++ .../networking/eth2/peers/SyncSource.java | 4 + .../rpc/beaconchain/BeaconChainMethodIds.java | 12 +- .../rpc/beaconchain/BeaconChainMethods.java | 62 ++++ ...PayloadEnvelopesByRangeMessageHandler.java | 328 ++++++++++++++++++ .../eth2/peers/RespondingEth2Peer.java | 9 + .../networking/eth2/peers/StubSyncSource.java | 10 + 9 files changed, 575 insertions(+), 9 deletions(-) create mode 100644 ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/ExecutionPayloadEnvelopesByRangeRequestMessage.java create mode 100644 networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/ExecutionPayloadEnvelopesByRangeMessageHandler.java diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java index 9961be15863..f9e04cf6022 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/chains/ThrottlingSyncSource.java @@ -30,6 +30,7 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope; public class ThrottlingSyncSource implements SyncSource { private static final Logger LOG = LogManager.getLogger(); @@ -43,6 +44,7 @@ public class ThrottlingSyncSource implements SyncSource { private final RateTracker blocksRateTracker; private final Optional maybeMaxBlobsPerBlock; private final RateTracker blobSidecarsRateTracker; + private final RateTracker executionPayloadEnvelopesRateTracker; public ThrottlingSyncSource( final AsyncRunner asyncRunner, @@ -61,6 +63,8 @@ public ThrottlingSyncSource( maxBlobSidecarsPerMinute -> RateTracker.create(maxBlobSidecarsPerMinute, TIMEOUT_SECONDS, timeProvider)) .orElse(RateTracker.NOOP); + this.executionPayloadEnvelopesRateTracker = + RateTracker.create(maxBlocksPerMinute, TIMEOUT_SECONDS, timeProvider); } @Override @@ -125,6 +129,38 @@ public SafeFuture requestBlobSidecarsByRange( }); } + @Override + public SafeFuture requestExecutionPayloadEnvelopesByRange( + final UInt64 startSlot, + final UInt64 count, + final RpcResponseListener listener) { + return executionPayloadEnvelopesRateTracker + .approveObjectsRequest(count.longValue()) + .map( + requestApproval -> { + LOG.debug("Sending request for {} execution payload envelopes", count); + final RpcResponseListenerWithCount listenerWithCount = + new RpcResponseListenerWithCount<>(listener); + return delegate + .requestExecutionPayloadEnvelopesByRange(startSlot, count, listenerWithCount) + .alwaysRun( + () -> + // adjust for slots with empty execution payloads + executionPayloadEnvelopesRateTracker.adjustObjectsRequest( + requestApproval, listenerWithCount.count.get())); + }) + .orElseGet( + () -> { + LOG.debug( + "Rate limiting request for {} execution payload envelopes. Retry in {} seconds", + count, + PEER_REQUEST_DELAY.toSeconds()); + return asyncRunner.runAfterDelay( + () -> requestExecutionPayloadEnvelopesByRange(startSlot, count, listener), + PEER_REQUEST_DELAY); + }); + } + @Override public SafeFuture disconnectCleanly(final DisconnectReason reason) { return delegate.disconnectCleanly(reason); diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/ExecutionPayloadEnvelopesByRangeRequestMessage.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/ExecutionPayloadEnvelopesByRangeRequestMessage.java new file mode 100644 index 00000000000..7babcb34f1a --- /dev/null +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/networking/libp2p/rpc/ExecutionPayloadEnvelopesByRangeRequestMessage.java @@ -0,0 +1,74 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc; + +import tech.pegasys.teku.infrastructure.ssz.containers.Container2; +import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2; +import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64; +import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas; +import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; + +public final class ExecutionPayloadEnvelopesByRangeRequestMessage + extends Container2 + implements RpcRequest { + + public static class ExecutionPayloadEnvelopesByRangeRequestMessageSchema + extends ContainerSchema2< + ExecutionPayloadEnvelopesByRangeRequestMessage, SszUInt64, SszUInt64> { + + public ExecutionPayloadEnvelopesByRangeRequestMessageSchema() { + super( + "ExecutionPayloadEnvelopesByRangeRequestMessage", + namedSchema("start_slot", SszPrimitiveSchemas.UINT64_SCHEMA), + namedSchema("count", SszPrimitiveSchemas.UINT64_SCHEMA)); + } + + @Override + public ExecutionPayloadEnvelopesByRangeRequestMessage createFromBackingNode( + final TreeNode node) { + return new ExecutionPayloadEnvelopesByRangeRequestMessage(this, node); + } + } + + public static final ExecutionPayloadEnvelopesByRangeRequestMessageSchema SSZ_SCHEMA = + new ExecutionPayloadEnvelopesByRangeRequestMessageSchema(); + + private ExecutionPayloadEnvelopesByRangeRequestMessage( + final ExecutionPayloadEnvelopesByRangeRequestMessageSchema type, final TreeNode backingNode) { + super(type, backingNode); + } + + public ExecutionPayloadEnvelopesByRangeRequestMessage( + final UInt64 startSlot, final UInt64 count) { + super(SSZ_SCHEMA, SszUInt64.of(startSlot), SszUInt64.of(count)); + } + + public UInt64 getStartSlot() { + return getField0().get(); + } + + public UInt64 getCount() { + return getField1().get(); + } + + public UInt64 getMaxSlot() { + return getStartSlot().plus(getCount().minusMinZero(1)); + } + + @Override + public int getMaximumResponseChunks() { + return getCount().intValue(); + } +} diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java index 94644b5c8da..86250559351 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/DefaultEth2Peer.java @@ -52,6 +52,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.config.SpecConfigDeneb; +import tech.pegasys.teku.spec.config.SpecConfigEip7732; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope; @@ -62,6 +63,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage.ExecutionPayloadEnvelopesByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.GoodbyeMessage; @@ -99,6 +101,7 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer { blobSidecarsByRootRequestMessageSchema; private final Supplier executionPayloadEnvelopesByRootRequestMessageSchema; + private final Supplier firstSlotSupportingExecutionPayloadEnvelopesByRange; DefaultEth2Peer( final Spec spec, @@ -143,6 +146,12 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer { SchemaDefinitionsEip7732.required( spec.forMilestone(SpecMilestone.EIP7732).getSchemaDefinitions()) .getExecutionPayloadEnvelopesByRootRequestMessageSchema()); + this.firstSlotSupportingExecutionPayloadEnvelopesByRange = + Suppliers.memoize( + () -> { + final UInt64 eip7732ForkEpoch = getSpecConfigEip7732().getEip7732ForkEpoch(); + return spec.computeStartSlotAtEpoch(eip7732ForkEpoch); + }); } @Override @@ -407,6 +416,42 @@ public SafeFuture requestBlobSidecarsByRange( .orElse(failWithUnsupportedMethodException("BlobSidecarsByRange")); } + @Override + public SafeFuture requestExecutionPayloadEnvelopesByRange( + final UInt64 startSlot, + final UInt64 count, + final RpcResponseListener listener) { + return rpcMethods + .executionPayloadEnvelopesByRange() + .map( + method -> { + final UInt64 firstSupportedSlot = + firstSlotSupportingExecutionPayloadEnvelopesByRange.get(); + final ExecutionPayloadEnvelopesByRangeRequestMessage request; + + if (startSlot.isLessThan(firstSupportedSlot)) { + LOG.debug( + "Requesting execution payload envelopes from slot {} instead of slot {} because the request is spanning the EIP-7732 fork transition", + firstSupportedSlot, + startSlot); + final UInt64 updatedCount = + count.minusMinZero(firstSupportedSlot.minusMinZero(startSlot)); + if (updatedCount.isZero()) { + return SafeFuture.COMPLETE; + } + request = + new ExecutionPayloadEnvelopesByRangeRequestMessage( + firstSupportedSlot, updatedCount); + } else { + request = new ExecutionPayloadEnvelopesByRangeRequestMessage(startSlot, count); + } + // EIP-7732 TODO: add a listener wrapper for additional verification (similar to + // BlocksByRange) + return requestStream(method, request, listener); + }) + .orElse(failWithUnsupportedMethodException("ExecutionPayloadEnvelopesByRange")); + } + private int calculateMaxBlobsPerBlock(final UInt64 endSlot) { return SpecConfigDeneb.required(spec.atSlot(endSlot).getConfig()).getMaxBlobsPerBlock(); } @@ -554,6 +599,10 @@ private SpecConfigDeneb getSpecConfigDeneb() { return SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig()); } + private SpecConfigEip7732 getSpecConfigEip7732() { + return SpecConfigEip7732.required(spec.forMilestone(SpecMilestone.EIP7732).getConfig()); + } + private SafeFuture failWithUnsupportedMethodException(final String method) { return SafeFuture.failedFuture( new UnsupportedOperationException(method + " method is not supported")); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java index 76fd491174d..d35e3d57052 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/peers/SyncSource.java @@ -20,6 +20,7 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope; /** * Represents an external source of blocks (and blob sidecars post Deneb) to sync. Typically, a @@ -32,6 +33,9 @@ SafeFuture requestBlocksByRange( SafeFuture requestBlobSidecarsByRange( UInt64 startSlot, UInt64 count, RpcResponseListener listener); + SafeFuture requestExecutionPayloadEnvelopesByRange( + UInt64 startSlot, UInt64 count, RpcResponseListener listener); + void adjustReputation(final ReputationAdjustment adjustment); SafeFuture disconnectCleanly(DisconnectReason reason); diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java index 2b05f2a451c..24b1d7186e3 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethodIds.java @@ -29,6 +29,9 @@ public class BeaconChainMethodIds { static final String EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT = "/eth2/beacon_chain/req/execution_payload_envelopes_by_root"; + static final String EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE = + "/eth2/beacon_chain/req/execution_payload_envelopes_by_range"; + static final String GET_METADATA = "/eth2/beacon_chain/req/metadata"; static final String PING = "/eth2/beacon_chain/req/ping"; @@ -55,11 +58,6 @@ public static String getBlobSidecarsByRangeMethodId( return getMethodId(BLOB_SIDECARS_BY_RANGE, version, encoding); } - public static String getExecutionPayloadEnvelopeByRootMethodId( - final int version, final RpcEncoding encoding) { - return getMethodId(EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT, version, encoding); - } - public static String getStatusMethodId(final int version, final RpcEncoding encoding) { return getMethodId(STATUS, version, encoding); } @@ -72,10 +70,6 @@ public static int extractBeaconBlocksByRangeVersion(final String methodId) { return extractVersion(methodId, BEACON_BLOCKS_BY_RANGE); } - public static int extractExecutionPayloadEnvelopeByRootVersion(final String methodId) { - return extractVersion(methodId, EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT); - } - public static int extractGetMetadataVersion(final String methodId) { return extractVersion(methodId, GET_METADATA); } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java index 21b8d272540..adfacae6a9d 100644 --- a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/BeaconChainMethods.java @@ -27,6 +27,7 @@ import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRootMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRootMessageHandler; +import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.ExecutionPayloadEnvelopesByRangeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.ExecutionPayloadEnvelopesByRootMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.GoodbyeMessageHandler; import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.MetadataMessageHandler; @@ -55,6 +56,7 @@ import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage.EmptyMessageSchema; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRangeRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage.ExecutionPayloadEnvelopesByRootRequestMessageSchema; import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.GoodbyeMessage; @@ -82,6 +84,10 @@ public class BeaconChainMethods { Eth2RpcMethod< ExecutionPayloadEnvelopesByRootRequestMessage, SignedExecutionPayloadEnvelope>> executionPayloadEnvelopesByRoot; + private final Optional< + Eth2RpcMethod< + ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>> + executionPayloadEnvelopesByRange; private final Eth2RpcMethod getMetadata; private final Eth2RpcMethod ping; @@ -100,6 +106,10 @@ private BeaconChainMethods( Eth2RpcMethod< ExecutionPayloadEnvelopesByRootRequestMessage, SignedExecutionPayloadEnvelope>> executionPayloadEnvelopesByRoot, + final Optional< + Eth2RpcMethod< + ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>> + executionPayloadEnvelopesByRange, final Eth2RpcMethod getMetadata, final Eth2RpcMethod ping) { this.status = status; @@ -109,6 +119,7 @@ private BeaconChainMethods( this.blobSidecarsByRoot = blobSidecarsByRoot; this.blobSidecarsByRange = blobSidecarsByRange; this.executionPayloadEnvelopesByRoot = executionPayloadEnvelopesByRoot; + this.executionPayloadEnvelopesByRange = executionPayloadEnvelopesByRange; this.getMetadata = getMetadata; this.ping = ping; this.allMethods = @@ -160,6 +171,14 @@ public static BeaconChainMethods create( recentChainData), createExecutionPayloadEnvelopesByRoot( spec, metricsSystem, asyncRunner, peerLookup, rpcEncoding, recentChainData), + createExecutionPayloadEnvelopesByRange( + spec, + metricsSystem, + asyncRunner, + peerLookup, + rpcEncoding, + recentChainData, + combinedChainDataClient), createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding), createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding)); } @@ -389,6 +408,42 @@ private static Eth2RpcMethod createGoodBye( peerLookup)); } + private static Optional< + Eth2RpcMethod< + ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>> + createExecutionPayloadEnvelopesByRange( + final Spec spec, + final MetricsSystem metricsSystem, + final AsyncRunner asyncRunner, + final PeerLookup peerLookup, + final RpcEncoding rpcEncoding, + final RecentChainData recentChainData, + final CombinedChainDataClient combinedChainDataClient) { + if (!spec.isMilestoneSupported(SpecMilestone.EIP7732)) { + return Optional.empty(); + } + + final RpcContextCodec forkDigestContextCodec = + RpcContextCodec.forkDigest( + spec, recentChainData, ForkDigestPayloadContext.EXECUTION_PAYLOAD_ENVELOPE); + + final ExecutionPayloadEnvelopesByRangeMessageHandler executionPayloadEnvelopesByRangeHandler = + new ExecutionPayloadEnvelopesByRangeMessageHandler( + spec, metricsSystem, combinedChainDataClient); + + return Optional.of( + new SingleProtocolEth2RpcMethod<>( + asyncRunner, + BeaconChainMethodIds.EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE, + 1, + rpcEncoding, + ExecutionPayloadEnvelopesByRangeRequestMessage.SSZ_SCHEMA, + true, + forkDigestContextCodec, + executionPayloadEnvelopesByRangeHandler, + peerLookup)); + } + private static Eth2RpcMethod createMetadata( final Spec spec, final AsyncRunner asyncRunner, @@ -505,6 +560,13 @@ public Eth2RpcMethod beaco return executionPayloadEnvelopesByRoot; } + public Optional< + Eth2RpcMethod< + ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>> + executionPayloadEnvelopesByRange() { + return executionPayloadEnvelopesByRange; + } + public Eth2RpcMethod getMetadata() { return getMetadata; } diff --git a/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/ExecutionPayloadEnvelopesByRangeMessageHandler.java b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/ExecutionPayloadEnvelopesByRangeMessageHandler.java new file mode 100644 index 00000000000..e3247c49c8e --- /dev/null +++ b/networking/eth2/src/main/java/tech/pegasys/teku/networking/eth2/rpc/beaconchain/methods/ExecutionPayloadEnvelopesByRangeMessageHandler.java @@ -0,0 +1,328 @@ +/* + * Copyright Consensys Software Inc., 2022 + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods; + +import static tech.pegasys.teku.infrastructure.async.SafeFuture.completedFuture; +import static tech.pegasys.teku.infrastructure.unsigned.UInt64.ZERO; +import static tech.pegasys.teku.networking.eth2.rpc.core.RpcResponseStatus.INVALID_REQUEST_CODE; + +import com.google.common.base.Throwables; +import java.nio.channels.ClosedChannelException; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.metrics.Counter; +import org.hyperledger.besu.plugin.services.metrics.LabelledMetric; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.networking.eth2.peers.Eth2Peer; +import tech.pegasys.teku.networking.eth2.peers.RequestApproval; +import tech.pegasys.teku.networking.eth2.rpc.core.PeerRequiredLocalMessageHandler; +import tech.pegasys.teku.networking.eth2.rpc.core.ResponseCallback; +import tech.pegasys.teku.networking.eth2.rpc.core.RpcException; +import tech.pegasys.teku.networking.p2p.rpc.StreamClosedException; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecMilestone; +import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary; +import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope; +import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRangeRequestMessage; +import tech.pegasys.teku.storage.client.CombinedChainDataClient; + +public class ExecutionPayloadEnvelopesByRangeMessageHandler + extends PeerRequiredLocalMessageHandler< + ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope> { + private static final Logger LOG = LogManager.getLogger(); + + private final Spec spec; + private final CombinedChainDataClient combinedChainDataClient; + private final LabelledMetric requestCounter; + private final Counter totalExecutionPayloadEnvelopesRequestedCounter; + + public ExecutionPayloadEnvelopesByRangeMessageHandler( + final Spec spec, + final MetricsSystem metricsSystem, + final CombinedChainDataClient combinedChainDataClient) { + this.spec = spec; + this.combinedChainDataClient = combinedChainDataClient; + requestCounter = + metricsSystem.createLabelledCounter( + TekuMetricCategory.NETWORK, + "rpc_execution_payload_envelopes_by_range_requests_total", + "Total number of execution payload envelopes by range requests received", + "status"); + totalExecutionPayloadEnvelopesRequestedCounter = + metricsSystem.createCounter( + TekuMetricCategory.NETWORK, + "rpc_execution_payload_envelopes_by_range_requested_blocks_total", + "Total number of execution payload envelopes requested in accepted execution payload envelopes by range requests from peers"); + } + + @Override + public Optional validateRequest( + final String protocolId, final ExecutionPayloadEnvelopesByRangeRequestMessage request) { + final SpecMilestone latestMilestoneRequested; + try { + latestMilestoneRequested = + spec.getForkSchedule().getSpecMilestoneAtSlot(request.getMaxSlot()); + } catch (final ArithmeticException __) { + return Optional.of( + new RpcException(INVALID_REQUEST_CODE, "Requested slot is too far in the future")); + } + + final int maxRequestExecutionPayloadEnvelopes = + spec.forMilestone(latestMilestoneRequested).miscHelpers().getMaxRequestBlocks().intValue(); + + int requestedCount; + try { + requestedCount = request.getCount().intValue(); + } catch (final ArithmeticException __) { + // handle overflows + requestedCount = -1; + } + + if (requestedCount == -1 || requestedCount > maxRequestExecutionPayloadEnvelopes) { + requestCounter.labels("count_too_big").inc(); + return Optional.of( + new RpcException( + INVALID_REQUEST_CODE, + "Only a maximum of " + + maxRequestExecutionPayloadEnvelopes + + " execution payload envelopes can be requested per request")); + } + + return Optional.empty(); + } + + @Override + public void onIncomingMessage( + final String protocolId, + final Eth2Peer peer, + final ExecutionPayloadEnvelopesByRangeRequestMessage message, + final ResponseCallback callback) { + LOG.trace( + "Peer {} requested {} ExecutionPayloadEnvelopes starting at slot {}", + peer.getId(), + message.getCount(), + message.getStartSlot()); + + final Optional executionPayloadsRequestApproval = + peer.approveExecutionPayloadEnvelopesRequest(callback, message.getCount().longValue()); + + if (!peer.approveRequest() || executionPayloadsRequestApproval.isEmpty()) { + requestCounter.labels("rate_limited").inc(); + return; + } + + requestCounter.labels("ok").inc(); + totalExecutionPayloadEnvelopesRequestedCounter.inc(message.getCount().longValue()); + sendMatchingExecutionPayloadEnvelopes(message, callback) + .finish( + requestState -> { + if (requestState.sentExecutionPayloads.get() != message.getCount().longValue()) { + peer.adjustExecutionPayloadEnvelopesRequest( + executionPayloadsRequestApproval.get(), + requestState.sentExecutionPayloads.get()); + } + callback.completeSuccessfully(); + }, + error -> { + peer.adjustExecutionPayloadEnvelopesRequest( + executionPayloadsRequestApproval.get(), 0); + final Throwable rootCause = Throwables.getRootCause(error); + if (rootCause instanceof RpcException) { + LOG.trace( + "Rejecting execution payload envelopes by range request", + error); // Keep full context + callback.completeWithErrorResponse((RpcException) rootCause); + } else { + if (rootCause instanceof StreamClosedException + || rootCause instanceof ClosedChannelException) { + LOG.trace( + "Stream closed while sending requested execution payload envelopes", error); + } else { + LOG.error( + "Failed to process execution payload envelopes by range request", error); + } + callback.completeWithUnexpectedError(error); + } + }); + } + + private SafeFuture sendMatchingExecutionPayloadEnvelopes( + final ExecutionPayloadEnvelopesByRangeRequestMessage message, + final ResponseCallback callback) { + final UInt64 startSlot = message.getStartSlot(); + final UInt64 count = message.getCount(); + + return combinedChainDataClient + .getEarliestAvailableBlockSlot() + .thenCompose( + earliestSlot -> { + if (earliestSlot.map(s -> s.isGreaterThan(startSlot)).orElse(true)) { + // We're missing the first block so execution payload is missing as well so return + // an error + return SafeFuture.failedFuture( + new RpcException.ResourceUnavailableException( + "Requested historical blocks are currently unavailable")); + } + final UInt64 headBlockSlot = + combinedChainDataClient + .getChainHead() + .map(MinimalBeaconBlockSummary::getSlot) + .orElse(ZERO); + final NavigableMap hotRoots; + if (combinedChainDataClient.isFinalized(message.getMaxSlot())) { + // All execution payloads are finalized so skip scanning the protoarray + hotRoots = new TreeMap<>(); + } else { + hotRoots = combinedChainDataClient.getAncestorRoots(startSlot, UInt64.ONE, count); + } + // Don't send anything past the last slot found in protoarray to ensure execution + // payloads are consistent + // If we didn't find any execution payloads in protoarray, every execution payload in + // the range must be finalized so we don't need to worry about inconsistent execution + // payloads + final UInt64 headSlot = hotRoots.isEmpty() ? headBlockSlot : hotRoots.lastKey(); + final RequestState initialState = + new RequestState(startSlot, count, headSlot, hotRoots, callback); + if (initialState.isComplete()) { + return SafeFuture.completedFuture(initialState); + } + return sendNextExecutionPayload(initialState); + }); + } + + private SafeFuture sendNextExecutionPayload(final RequestState requestState) { + SafeFuture executionPayloadFuture = processNextExecutionPayload(requestState); + // Avoid risk of StackOverflowException by iterating when the execution payload future is + // already complete. Using thenCompose on the completed future would execute immediately and + // recurse back into + // this method to send the next execution payload. When not already complete, thenCompose is + // executed on a separate thread so doesn't recurse on the same stack. + while (executionPayloadFuture.isDone() && !executionPayloadFuture.isCompletedExceptionally()) { + if (executionPayloadFuture.join()) { + return completedFuture(requestState); + } + executionPayloadFuture = processNextExecutionPayload(requestState); + } + return executionPayloadFuture.thenCompose( + complete -> + complete ? completedFuture(requestState) : sendNextExecutionPayload(requestState)); + } + + private SafeFuture processNextExecutionPayload(final RequestState requestState) { + // Ensure execution payloads are loaded off of the event thread + return requestState + .loadNextExecutionPayload() + .thenCompose( + block -> { + requestState.decrementRemainingExecutionPayloads(); + return handleLoadedExecutionPayload(requestState, block); + }); + } + + /** Sends the execution payload and returns true if the request is now complete. */ + private SafeFuture handleLoadedExecutionPayload( + final RequestState requestState, + final Optional executionPayload) { + return executionPayload + .map(requestState::sendExecutionPayload) + .orElse(SafeFuture.COMPLETE) + .thenApply( + __ -> { + if (requestState.isComplete()) { + return true; + } else { + requestState.incrementCurrentSlot(); + return false; + } + }); + } + + private class RequestState { + + private final UInt64 headSlot; + private final ResponseCallback callback; + private final NavigableMap knownBlockRoots; + private UInt64 currentSlot; + private UInt64 remainingExecutionPayloads; + + private final AtomicInteger sentExecutionPayloads = new AtomicInteger(0); + + RequestState( + final UInt64 startSlot, + final UInt64 count, + final UInt64 headSlot, + final NavigableMap knownBlockRoots, + final ResponseCallback callback) { + this.currentSlot = startSlot; + this.knownBlockRoots = knownBlockRoots; + this.remainingExecutionPayloads = count; + this.headSlot = headSlot; + this.callback = callback; + } + + private boolean needsMoreBlocks() { + return !remainingExecutionPayloads.equals(ZERO); + } + + private boolean hasReachedHeadSlot() { + return currentSlot.compareTo(headSlot) >= 0; + } + + boolean isComplete() { + return !needsMoreBlocks() || hasReachedHeadSlot(); + } + + SafeFuture sendExecutionPayload(final SignedExecutionPayloadEnvelope executionPayload) { + return callback.respond(executionPayload).thenRun(sentExecutionPayloads::incrementAndGet); + } + + void decrementRemainingExecutionPayloads() { + remainingExecutionPayloads = remainingExecutionPayloads.minusMinZero(1); + } + + void incrementCurrentSlot() { + currentSlot = currentSlot.increment(); + } + + SafeFuture> loadNextExecutionPayload() { + final UInt64 slot = this.currentSlot; + final Bytes32 knownBlockRoot = knownBlockRoots.get(slot); + if (knownBlockRoot != null) { + // Known root so lookup by root + return combinedChainDataClient + .getExecutionPayloadByBlockRoot(knownBlockRoot) + .thenApply( + maybeExecutionPayload -> + maybeExecutionPayload.filter( + executionPayload -> executionPayload.getMessage().getSlot().equals(slot))); + } else if ((!knownBlockRoots.isEmpty() && slot.compareTo(knownBlockRoots.firstKey()) >= 0) + || slot.compareTo(headSlot) > 0) { + // Unknown root but not finalized means this is an empty slot + // Could also be because the first execution payload requested is above our head slot + return SafeFuture.completedFuture(Optional.empty()); + } else { + // EIP-7732 TODO: Must be a finalized execution payload so lookup by slot + return SafeFuture.completedFuture(Optional.empty()); + } + } + } +} diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java index af022416b28..0d2dc629511 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/RespondingEth2Peer.java @@ -229,6 +229,15 @@ public SafeFuture requestBlobSidecarsByRange( return createPendingBlobSidecarRequest(handler); } + // EIP-7732 TODO: implement (test) + @Override + public SafeFuture requestExecutionPayloadEnvelopesByRange( + final UInt64 startSlot, + final UInt64 count, + final RpcResponseListener listener) { + throw new UnsupportedOperationException(); + } + @Override public SafeFuture requestBlocksByRoot( final List blockRoots, final RpcResponseListener listener) { diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java index 9442fba9f23..70bb1a4247a 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java @@ -28,6 +28,7 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope; public class StubSyncSource implements SyncSource { @@ -81,6 +82,15 @@ public SafeFuture requestBlobSidecarsByRange( return request; } + // EIP-7732 TODO: implement (test) + @Override + public SafeFuture requestExecutionPayloadEnvelopesByRange( + final UInt64 startSlot, + final UInt64 count, + final RpcResponseListener listener) { + throw new UnsupportedOperationException(); + } + @Override public SafeFuture disconnectCleanly(final DisconnectReason reason) { return SafeFuture.COMPLETE;