Skip to content

Commit

Permalink
Add draft implementation for ExecutionPayloadEnvelopesByRange
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Feb 24, 2025
1 parent 778cde7 commit d3fe866
Show file tree
Hide file tree
Showing 9 changed files with 575 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope;

public class ThrottlingSyncSource implements SyncSource {
private static final Logger LOG = LogManager.getLogger();
Expand All @@ -43,6 +44,7 @@ public class ThrottlingSyncSource implements SyncSource {
private final RateTracker blocksRateTracker;
private final Optional<Integer> maybeMaxBlobsPerBlock;
private final RateTracker blobSidecarsRateTracker;
private final RateTracker executionPayloadEnvelopesRateTracker;

public ThrottlingSyncSource(
final AsyncRunner asyncRunner,
Expand All @@ -61,6 +63,8 @@ public ThrottlingSyncSource(
maxBlobSidecarsPerMinute ->
RateTracker.create(maxBlobSidecarsPerMinute, TIMEOUT_SECONDS, timeProvider))
.orElse(RateTracker.NOOP);
this.executionPayloadEnvelopesRateTracker =
RateTracker.create(maxBlocksPerMinute, TIMEOUT_SECONDS, timeProvider);
}

@Override
Expand Down Expand Up @@ -125,6 +129,38 @@ public SafeFuture<Void> requestBlobSidecarsByRange(
});
}

@Override
public SafeFuture<Void> requestExecutionPayloadEnvelopesByRange(
final UInt64 startSlot,
final UInt64 count,
final RpcResponseListener<SignedExecutionPayloadEnvelope> listener) {
return executionPayloadEnvelopesRateTracker
.approveObjectsRequest(count.longValue())
.map(
requestApproval -> {
LOG.debug("Sending request for {} execution payload envelopes", count);
final RpcResponseListenerWithCount<SignedExecutionPayloadEnvelope> listenerWithCount =
new RpcResponseListenerWithCount<>(listener);
return delegate
.requestExecutionPayloadEnvelopesByRange(startSlot, count, listenerWithCount)
.alwaysRun(
() ->
// adjust for slots with empty execution payloads
executionPayloadEnvelopesRateTracker.adjustObjectsRequest(
requestApproval, listenerWithCount.count.get()));
})
.orElseGet(
() -> {
LOG.debug(
"Rate limiting request for {} execution payload envelopes. Retry in {} seconds",
count,
PEER_REQUEST_DELAY.toSeconds());
return asyncRunner.runAfterDelay(
() -> requestExecutionPayloadEnvelopesByRange(startSlot, count, listener),
PEER_REQUEST_DELAY);
});
}

@Override
public SafeFuture<Void> disconnectCleanly(final DisconnectReason reason) {
return delegate.disconnectCleanly(reason);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc;

import tech.pegasys.teku.infrastructure.ssz.containers.Container2;
import tech.pegasys.teku.infrastructure.ssz.containers.ContainerSchema2;
import tech.pegasys.teku.infrastructure.ssz.primitive.SszUInt64;
import tech.pegasys.teku.infrastructure.ssz.schema.SszPrimitiveSchemas;
import tech.pegasys.teku.infrastructure.ssz.tree.TreeNode;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public final class ExecutionPayloadEnvelopesByRangeRequestMessage
extends Container2<ExecutionPayloadEnvelopesByRangeRequestMessage, SszUInt64, SszUInt64>
implements RpcRequest {

public static class ExecutionPayloadEnvelopesByRangeRequestMessageSchema
extends ContainerSchema2<
ExecutionPayloadEnvelopesByRangeRequestMessage, SszUInt64, SszUInt64> {

public ExecutionPayloadEnvelopesByRangeRequestMessageSchema() {
super(
"ExecutionPayloadEnvelopesByRangeRequestMessage",
namedSchema("start_slot", SszPrimitiveSchemas.UINT64_SCHEMA),
namedSchema("count", SszPrimitiveSchemas.UINT64_SCHEMA));
}

@Override
public ExecutionPayloadEnvelopesByRangeRequestMessage createFromBackingNode(
final TreeNode node) {
return new ExecutionPayloadEnvelopesByRangeRequestMessage(this, node);
}
}

public static final ExecutionPayloadEnvelopesByRangeRequestMessageSchema SSZ_SCHEMA =
new ExecutionPayloadEnvelopesByRangeRequestMessageSchema();

private ExecutionPayloadEnvelopesByRangeRequestMessage(
final ExecutionPayloadEnvelopesByRangeRequestMessageSchema type, final TreeNode backingNode) {
super(type, backingNode);
}

public ExecutionPayloadEnvelopesByRangeRequestMessage(
final UInt64 startSlot, final UInt64 count) {
super(SSZ_SCHEMA, SszUInt64.of(startSlot), SszUInt64.of(count));
}

public UInt64 getStartSlot() {
return getField0().get();
}

public UInt64 getCount() {
return getField1().get();
}

public UInt64 getMaxSlot() {
return getStartSlot().plus(getCount().minusMinZero(1));
}

@Override
public int getMaximumResponseChunks() {
return getCount().intValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.config.SpecConfigDeneb;
import tech.pegasys.teku.spec.config.SpecConfigEip7732;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope;
Expand All @@ -62,6 +63,7 @@
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage.ExecutionPayloadEnvelopesByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.GoodbyeMessage;
Expand Down Expand Up @@ -99,6 +101,7 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
blobSidecarsByRootRequestMessageSchema;
private final Supplier<ExecutionPayloadEnvelopesByRootRequestMessageSchema>
executionPayloadEnvelopesByRootRequestMessageSchema;
private final Supplier<UInt64> firstSlotSupportingExecutionPayloadEnvelopesByRange;

DefaultEth2Peer(
final Spec spec,
Expand Down Expand Up @@ -143,6 +146,12 @@ class DefaultEth2Peer extends DelegatingPeer implements Eth2Peer {
SchemaDefinitionsEip7732.required(
spec.forMilestone(SpecMilestone.EIP7732).getSchemaDefinitions())
.getExecutionPayloadEnvelopesByRootRequestMessageSchema());
this.firstSlotSupportingExecutionPayloadEnvelopesByRange =
Suppliers.memoize(
() -> {
final UInt64 eip7732ForkEpoch = getSpecConfigEip7732().getEip7732ForkEpoch();
return spec.computeStartSlotAtEpoch(eip7732ForkEpoch);
});
}

@Override
Expand Down Expand Up @@ -407,6 +416,42 @@ public SafeFuture<Void> requestBlobSidecarsByRange(
.orElse(failWithUnsupportedMethodException("BlobSidecarsByRange"));
}

@Override
public SafeFuture<Void> requestExecutionPayloadEnvelopesByRange(
final UInt64 startSlot,
final UInt64 count,
final RpcResponseListener<SignedExecutionPayloadEnvelope> listener) {
return rpcMethods
.executionPayloadEnvelopesByRange()
.map(
method -> {
final UInt64 firstSupportedSlot =
firstSlotSupportingExecutionPayloadEnvelopesByRange.get();
final ExecutionPayloadEnvelopesByRangeRequestMessage request;

if (startSlot.isLessThan(firstSupportedSlot)) {
LOG.debug(
"Requesting execution payload envelopes from slot {} instead of slot {} because the request is spanning the EIP-7732 fork transition",
firstSupportedSlot,
startSlot);
final UInt64 updatedCount =
count.minusMinZero(firstSupportedSlot.minusMinZero(startSlot));
if (updatedCount.isZero()) {
return SafeFuture.COMPLETE;
}
request =
new ExecutionPayloadEnvelopesByRangeRequestMessage(
firstSupportedSlot, updatedCount);
} else {
request = new ExecutionPayloadEnvelopesByRangeRequestMessage(startSlot, count);
}
// EIP-7732 TODO: add a listener wrapper for additional verification (similar to
// BlocksByRange)
return requestStream(method, request, listener);
})
.orElse(failWithUnsupportedMethodException("ExecutionPayloadEnvelopesByRange"));
}

private int calculateMaxBlobsPerBlock(final UInt64 endSlot) {
return SpecConfigDeneb.required(spec.atSlot(endSlot).getConfig()).getMaxBlobsPerBlock();
}
Expand Down Expand Up @@ -554,6 +599,10 @@ private SpecConfigDeneb getSpecConfigDeneb() {
return SpecConfigDeneb.required(spec.forMilestone(SpecMilestone.DENEB).getConfig());
}

private SpecConfigEip7732 getSpecConfigEip7732() {
return SpecConfigEip7732.required(spec.forMilestone(SpecMilestone.EIP7732).getConfig());
}

private <T> SafeFuture<T> failWithUnsupportedMethodException(final String method) {
return SafeFuture.failedFuture(
new UnsupportedOperationException(method + " method is not supported"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.SignedExecutionPayloadEnvelope;

/**
* Represents an external source of blocks (and blob sidecars post Deneb) to sync. Typically, a
Expand All @@ -32,6 +33,9 @@ SafeFuture<Void> requestBlocksByRange(
SafeFuture<Void> requestBlobSidecarsByRange(
UInt64 startSlot, UInt64 count, RpcResponseListener<BlobSidecar> listener);

SafeFuture<Void> requestExecutionPayloadEnvelopesByRange(
UInt64 startSlot, UInt64 count, RpcResponseListener<SignedExecutionPayloadEnvelope> listener);

void adjustReputation(final ReputationAdjustment adjustment);

SafeFuture<Void> disconnectCleanly(DisconnectReason reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public class BeaconChainMethodIds {
static final String EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT =
"/eth2/beacon_chain/req/execution_payload_envelopes_by_root";

static final String EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE =
"/eth2/beacon_chain/req/execution_payload_envelopes_by_range";

static final String GET_METADATA = "/eth2/beacon_chain/req/metadata";
static final String PING = "/eth2/beacon_chain/req/ping";

Expand All @@ -55,11 +58,6 @@ public static String getBlobSidecarsByRangeMethodId(
return getMethodId(BLOB_SIDECARS_BY_RANGE, version, encoding);
}

public static String getExecutionPayloadEnvelopeByRootMethodId(
final int version, final RpcEncoding encoding) {
return getMethodId(EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT, version, encoding);
}

public static String getStatusMethodId(final int version, final RpcEncoding encoding) {
return getMethodId(STATUS, version, encoding);
}
Expand All @@ -72,10 +70,6 @@ public static int extractBeaconBlocksByRangeVersion(final String methodId) {
return extractVersion(methodId, BEACON_BLOCKS_BY_RANGE);
}

public static int extractExecutionPayloadEnvelopeByRootVersion(final String methodId) {
return extractVersion(methodId, EXECUTION_PAYLOAD_ENVELOPES_BY_ROOT);
}

public static int extractGetMetadataVersion(final String methodId) {
return extractVersion(methodId, GET_METADATA);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BeaconBlocksByRootMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRangeMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.BlobSidecarsByRootMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.ExecutionPayloadEnvelopesByRangeMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.ExecutionPayloadEnvelopesByRootMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.GoodbyeMessageHandler;
import tech.pegasys.teku.networking.eth2.rpc.beaconchain.methods.MetadataMessageHandler;
Expand Down Expand Up @@ -55,6 +56,7 @@
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.BlobSidecarsByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.EmptyMessage.EmptyMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRangeRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.ExecutionPayloadEnvelopesByRootRequestMessage.ExecutionPayloadEnvelopesByRootRequestMessageSchema;
import tech.pegasys.teku.spec.datastructures.networking.libp2p.rpc.GoodbyeMessage;
Expand Down Expand Up @@ -82,6 +84,10 @@ public class BeaconChainMethods {
Eth2RpcMethod<
ExecutionPayloadEnvelopesByRootRequestMessage, SignedExecutionPayloadEnvelope>>
executionPayloadEnvelopesByRoot;
private final Optional<
Eth2RpcMethod<
ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>>
executionPayloadEnvelopesByRange;
private final Eth2RpcMethod<EmptyMessage, MetadataMessage> getMetadata;
private final Eth2RpcMethod<PingMessage, PingMessage> ping;

Expand All @@ -100,6 +106,10 @@ private BeaconChainMethods(
Eth2RpcMethod<
ExecutionPayloadEnvelopesByRootRequestMessage, SignedExecutionPayloadEnvelope>>
executionPayloadEnvelopesByRoot,
final Optional<
Eth2RpcMethod<
ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>>
executionPayloadEnvelopesByRange,
final Eth2RpcMethod<EmptyMessage, MetadataMessage> getMetadata,
final Eth2RpcMethod<PingMessage, PingMessage> ping) {
this.status = status;
Expand All @@ -109,6 +119,7 @@ private BeaconChainMethods(
this.blobSidecarsByRoot = blobSidecarsByRoot;
this.blobSidecarsByRange = blobSidecarsByRange;
this.executionPayloadEnvelopesByRoot = executionPayloadEnvelopesByRoot;
this.executionPayloadEnvelopesByRange = executionPayloadEnvelopesByRange;
this.getMetadata = getMetadata;
this.ping = ping;
this.allMethods =
Expand Down Expand Up @@ -160,6 +171,14 @@ public static BeaconChainMethods create(
recentChainData),
createExecutionPayloadEnvelopesByRoot(
spec, metricsSystem, asyncRunner, peerLookup, rpcEncoding, recentChainData),
createExecutionPayloadEnvelopesByRange(
spec,
metricsSystem,
asyncRunner,
peerLookup,
rpcEncoding,
recentChainData,
combinedChainDataClient),
createMetadata(spec, asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding),
createPing(asyncRunner, metadataMessagesFactory, peerLookup, rpcEncoding));
}
Expand Down Expand Up @@ -389,6 +408,42 @@ private static Eth2RpcMethod<GoodbyeMessage, GoodbyeMessage> createGoodBye(
peerLookup));
}

private static Optional<
Eth2RpcMethod<
ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>>
createExecutionPayloadEnvelopesByRange(
final Spec spec,
final MetricsSystem metricsSystem,
final AsyncRunner asyncRunner,
final PeerLookup peerLookup,
final RpcEncoding rpcEncoding,
final RecentChainData recentChainData,
final CombinedChainDataClient combinedChainDataClient) {
if (!spec.isMilestoneSupported(SpecMilestone.EIP7732)) {
return Optional.empty();
}

final RpcContextCodec<Bytes4, SignedExecutionPayloadEnvelope> forkDigestContextCodec =
RpcContextCodec.forkDigest(
spec, recentChainData, ForkDigestPayloadContext.EXECUTION_PAYLOAD_ENVELOPE);

final ExecutionPayloadEnvelopesByRangeMessageHandler executionPayloadEnvelopesByRangeHandler =
new ExecutionPayloadEnvelopesByRangeMessageHandler(
spec, metricsSystem, combinedChainDataClient);

return Optional.of(
new SingleProtocolEth2RpcMethod<>(
asyncRunner,
BeaconChainMethodIds.EXECUTION_PAYLOAD_ENVELOPES_BY_RANGE,
1,
rpcEncoding,
ExecutionPayloadEnvelopesByRangeRequestMessage.SSZ_SCHEMA,
true,
forkDigestContextCodec,
executionPayloadEnvelopesByRangeHandler,
peerLookup));
}

private static Eth2RpcMethod<EmptyMessage, MetadataMessage> createMetadata(
final Spec spec,
final AsyncRunner asyncRunner,
Expand Down Expand Up @@ -505,6 +560,13 @@ public Eth2RpcMethod<BeaconBlocksByRangeRequestMessage, SignedBeaconBlock> beaco
return executionPayloadEnvelopesByRoot;
}

public Optional<
Eth2RpcMethod<
ExecutionPayloadEnvelopesByRangeRequestMessage, SignedExecutionPayloadEnvelope>>
executionPayloadEnvelopesByRange() {
return executionPayloadEnvelopesByRange;
}

public Eth2RpcMethod<EmptyMessage, MetadataMessage> getMetadata() {
return getMetadata;
}
Expand Down
Loading

0 comments on commit d3fe866

Please sign in to comment.