Skip to content

Commit

Permalink
add inclusion list manager (Consensys#9034)
Browse files Browse the repository at this point in the history
  • Loading branch information
mehdi-aouadi authored Jan 24, 2025
1 parent 18a3286 commit 3d1744e
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
import tech.pegasys.teku.statetransition.forkchoice.MergeTransitionBlockValidator;
import tech.pegasys.teku.statetransition.forkchoice.NoopForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListPool;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validation.SignedBlsToExecutionChangeValidator;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorCache;
Expand Down Expand Up @@ -127,7 +127,7 @@ public abstract class AbstractDataBackedRestAPIIntegrationTest {
protected final EventChannels eventChannels = mock(EventChannels.class);
protected final AggregatingAttestationPool attestationPool =
mock(AggregatingAttestationPool.class);
protected final InclusionListPool inclusionListPool = mock(InclusionListPool.class);
protected final InclusionListManager inclusionListManager = mock(InclusionListManager.class);
protected final AttestationManager attestationManager = mock(AttestationManager.class);
protected final OperationPool<AttesterSlashing> attesterSlashingPool = mock(OperationPool.class);
protected final OperationPool<ProposerSlashing> proposerSlashingPool = mock(OperationPool.class);
Expand Down Expand Up @@ -235,7 +235,7 @@ private void setupAndStartRestAPI(final BeaconRestApiConfig config) {
.attestationManager(attestationManager)
.activeValidatorChannel(activeValidatorChannel)
.attestationPool(attestationPool)
.inclusionListPool(inclusionListPool)
.inclusionListPool(inclusionListManager)
.attesterSlashingPool(attesterSlashingPool)
.proposerSlashingPool(proposerSlashingPool)
.voluntaryExitPool(voluntaryExitPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListPool;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
import tech.pegasys.teku.storage.client.CombinedChainDataClient;
Expand Down Expand Up @@ -105,7 +105,7 @@ public static class Builder {
private SyncService syncService;
private ValidatorApiChannel validatorApiChannel;
private AggregatingAttestationPool attestationPool;
private InclusionListPool inclusionListPool;
private InclusionListManager inclusionListManager;
private BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool;
private AttestationManager attestationManager;
private ActiveValidatorChannel activeValidatorChannel;
Expand Down Expand Up @@ -154,8 +154,8 @@ public Builder attestationPool(final AggregatingAttestationPool attestationPool)
return this;
}

public Builder inclusionListPool(final InclusionListPool inclusionListPool) {
this.inclusionListPool = inclusionListPool;
public Builder inclusionListPool(final InclusionListManager inclusionListManager) {
this.inclusionListManager = inclusionListManager;
return this;
}

Expand Down Expand Up @@ -230,7 +230,7 @@ public DataProvider build() {
final NodeDataProvider nodeDataProvider =
new NodeDataProvider(
attestationPool,
inclusionListPool,
inclusionListManager,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import tech.pegasys.teku.statetransition.forkchoice.PreparedProposerInfo;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.forkchoice.RegisteredValidatorInfo;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListPool;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
Expand All @@ -57,7 +57,7 @@
public class NodeDataProvider {
private static final Logger LOG = LogManager.getLogger();
private final AggregatingAttestationPool attestationPool;
private final InclusionListPool inclusionListPool;
private final InclusionListManager inclusionListManager;
private final OperationPool<AttesterSlashing> attesterSlashingPool;
private final OperationPool<ProposerSlashing> proposerSlashingPool;
private final OperationPool<SignedVoluntaryExit> voluntaryExitPool;
Expand All @@ -74,7 +74,7 @@ public class NodeDataProvider {

public NodeDataProvider(
final AggregatingAttestationPool attestationPool,
final InclusionListPool inclusionListPool,
final InclusionListManager inclusionListManager,
final OperationPool<AttesterSlashing> attesterSlashingsPool,
final OperationPool<ProposerSlashing> proposerSlashingPool,
final OperationPool<SignedVoluntaryExit> voluntaryExitPool,
Expand All @@ -89,7 +89,7 @@ public NodeDataProvider(
final RecentChainData recentChainData,
final Spec spec) {
this.attestationPool = attestationPool;
this.inclusionListPool = inclusionListPool;
this.inclusionListManager = inclusionListManager;
this.attesterSlashingPool = attesterSlashingsPool;
this.proposerSlashingPool = proposerSlashingPool;
this.voluntaryExitPool = voluntaryExitPool;
Expand All @@ -111,7 +111,7 @@ public List<Attestation> getAttestations(
}

public List<SignedInclusionList> getInclusionLists(final UInt64 slot) {
return inclusionListPool.getInclusionLists(slot);
return inclusionListManager.getInclusionLists(slot);
}

public ObjectAndMetaData<List<Attestation>> getAttestationsAndMetaData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier;
import tech.pegasys.teku.statetransition.forkchoice.ProposersDataManager;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListPool;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListManager;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.statetransition.validatorcache.ActiveValidatorChannel;
Expand All @@ -56,7 +56,7 @@ public class NodeDataProviderTest {
private final Spec spec = TestSpecFactory.createMinimalCapella();
private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec);
private final AggregatingAttestationPool attestationPool = mock(AggregatingAttestationPool.class);
private final InclusionListPool inclusionListPool = mock(InclusionListPool.class);
private final InclusionListManager inclusionListManager = mock(InclusionListManager.class);
private final BlockBlobSidecarsTrackersPool blockBlobSidecarsTrackersPool =
mock(BlockBlobSidecarsTrackersPool.class);
private final AttestationManager attestationManager = mock(AttestationManager.class);
Expand All @@ -83,7 +83,7 @@ public void setup() {
provider =
new NodeDataProvider(
attestationPool,
inclusionListPool,
inclusionListManager,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down Expand Up @@ -207,7 +207,7 @@ private Spec setUpMockedSpec() {
provider =
new NodeDataProvider(
attestationPool,
inclusionListPool,
inclusionListManager,
attesterSlashingPool,
proposerSlashingPool,
voluntaryExitPool,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Consensys Software Inc., 2025
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.statetransition.inclusionlist;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.operations.SignedInclusionList;
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.statetransition.validation.SignedInclusionListValidator;

public class InclusionListManager implements SlotEventsChannel {

private final SignedInclusionListValidator signedInclusionListValidator;

private final Map<UInt64, List<SignedInclusionList>> validatorIndexToInclusionLists =
new HashMap<>();

public InclusionListManager(final SignedInclusionListValidator signedInclusionListValidator) {
this.signedInclusionListValidator = signedInclusionListValidator;
}

@Override
public synchronized void onSlot(final UInt64 slot) {
validatorIndexToInclusionLists.clear();
}

public void add(final SignedInclusionList signedInclusionList) {
final UInt64 validatorIndex = signedInclusionList.getMessage().getValidatorIndex();
if (validatorIndexToInclusionLists.containsKey(validatorIndex)) {
validatorIndexToInclusionLists.get(validatorIndex).add(signedInclusionList);
} else {
validatorIndexToInclusionLists.put(validatorIndex, List.of(signedInclusionList));
}
}

public SafeFuture<InternalValidationResult> addSignedInclusionList(
final SignedInclusionList signedInclusionList, final Optional<UInt64> arrivalTimestamp) {
final SafeFuture<InternalValidationResult> validationResult =
signedInclusionListValidator.validate(signedInclusionList, validatorIndexToInclusionLists);
return validationResult.thenApply(
result -> {
if (result.isAccept()) {
add(signedInclusionList);
}
return result;
});
}

public List<SignedInclusionList> getInclusionLists(final UInt64 slot) {
return Collections.emptyList();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

package tech.pegasys.teku.statetransition.validation;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand All @@ -34,7 +37,8 @@ public SignedInclusionListValidator(final Spec spec, final RecentChainData recen
}

public SafeFuture<InternalValidationResult> validate(
final SignedInclusionList signedInclusionList) {
final SignedInclusionList signedInclusionList,
final Map<UInt64, List<SignedInclusionList>> validatorIndexToInclusionLists) {

final InclusionList inclusionList = signedInclusionList.getMessage();
final UInt64 slot = inclusionList.getSlot();
Expand Down Expand Up @@ -91,8 +95,15 @@ public SafeFuture<InternalValidationResult> validate(
/*
* [IGNORE] The message is either the first or second valid message received from the validator with index message.validator_index.
*/
// TODO EIP7805 add an inclusion list cache to track how many we've received from each validator
// and enforce this rule
if (validatorIndexToInclusionLists
.getOrDefault(inclusionList.getValidatorIndex(), Collections.emptyList())
.size()
> 2) {
return SafeFuture.completedFuture(
InternalValidationResult.ignore(
"Already received 2 Inclusion Lists from validator with index %d",
inclusionList.getValidatorIndex().intValue()));
}

return recentChainData
.retrieveStateInEffectAtSlot(slot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
import tech.pegasys.teku.statetransition.forkchoice.TickProcessingPerformance;
import tech.pegasys.teku.statetransition.forkchoice.TickProcessor;
import tech.pegasys.teku.statetransition.genesis.GenesisHandler;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListPool;
import tech.pegasys.teku.statetransition.inclusionlist.InclusionListManager;
import tech.pegasys.teku.statetransition.synccommittee.SignedContributionAndProofValidator;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool;
import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool;
Expand All @@ -167,6 +167,7 @@
import tech.pegasys.teku.statetransition.validation.InternalValidationResult;
import tech.pegasys.teku.statetransition.validation.ProposerSlashingValidator;
import tech.pegasys.teku.statetransition.validation.SignedBlsToExecutionChangeValidator;
import tech.pegasys.teku.statetransition.validation.SignedInclusionListValidator;
import tech.pegasys.teku.statetransition.validation.VoluntaryExitValidator;
import tech.pegasys.teku.statetransition.validation.signatures.AggregatingSignatureVerificationService;
import tech.pegasys.teku.statetransition.validation.signatures.SignatureVerificationService;
Expand Down Expand Up @@ -253,7 +254,7 @@ public class BeaconChainController extends Service implements BeaconChainControl
protected volatile Eth2P2PNetwork p2pNetwork;
protected volatile Optional<BeaconRestApi> beaconRestAPI = Optional.empty();
protected volatile AggregatingAttestationPool attestationPool;
protected volatile InclusionListPool inclusionListPool;
protected volatile InclusionListManager inclusionListManager;
protected volatile DepositProvider depositProvider;
protected volatile SyncService syncService;
protected volatile AttestationManager attestationManager;
Expand Down Expand Up @@ -522,7 +523,7 @@ public void initAll() {
initCombinedChainDataClient();
initSignatureVerificationService();
initAttestationPool();
initInclusionListPool();
initInclusionListManager();
initAttesterSlashingPool();
initProposerSlashingPool();
initVoluntaryExitPool();
Expand Down Expand Up @@ -771,7 +772,7 @@ protected void initDataProvider() {
.validatorApiChannel(
eventChannels.getPublisher(ValidatorApiChannel.class, beaconAsyncRunner))
.attestationPool(attestationPool)
.inclusionListPool(inclusionListPool)
.inclusionListPool(inclusionListManager)
.blockBlobSidecarsTrackersPool(blockBlobSidecarsTrackersPool)
.attestationManager(attestationManager)
.isLivenessTrackingEnabled(getLivenessTrackingEnabled(beaconConfig))
Expand Down Expand Up @@ -1147,7 +1148,7 @@ protected void initP2PNetwork() {
.gossipedSignedContributionAndProofProcessor(syncCommitteeContributionPool::addRemote)
.gossipedSyncCommitteeMessageProcessor(syncCommitteeMessagePool::addRemote)
.gossipedSignedBlsToExecutionChangeProcessor(blsToExecutionChangePool::addRemote)
.gossipedSignedInclusionListProcessor(inclusionListPool::addRemote)
.gossipedSignedInclusionListProcessor(inclusionListManager::addSignedInclusionList)
.processedAttestationSubscriptionProvider(
attestationManager::subscribeToAttestationsToSend)
.metricsSystem(metricsSystem)
Expand Down Expand Up @@ -1202,10 +1203,12 @@ public void initAttestationPool() {
attestationPool::onAttestationsIncludedInBlock);
}

protected void initInclusionListPool() {
protected void initInclusionListManager() {
LOG.debug("BeaconChainController.initInclusionListPool()");
inclusionListPool = new InclusionListPool();
eventChannels.subscribe(SlotEventsChannel.class, inclusionListPool);
final SignedInclusionListValidator signedInclusionListValidator =
new SignedInclusionListValidator(spec, recentChainData);
inclusionListManager = new InclusionListManager(signedInclusionListValidator);
eventChannels.subscribe(SlotEventsChannel.class, inclusionListManager);
}

public void initRestAPI() {
Expand Down

0 comments on commit 3d1744e

Please sign in to comment.