Skip to content

Commit

Permalink
Separate type for unaggregated network attestations (#14659)
Browse files Browse the repository at this point in the history
* definitions and gossip

* validator

* broadcast

* broadcast the correct att depending on version

* small updates

* don't check bits after Electra

* nitpick

* tests

* changelog <3

* review

* more review

* review yet again

* try a different design

* fix gossip issues

* cleanup

* tests

* reduce cognitive complexity

* Preston's review

* move changelog entry to unreleased section

* fix pending atts pool issues

* reviews

* Potuz's comments

* test fixes
  • Loading branch information
rkapka authored Jan 10, 2025
1 parent e99df5e commit c19224a
Show file tree
Hide file tree
Showing 50 changed files with 1,793 additions and 930 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- DB optimization for saving light client bootstraps (save unique sync committees only).
- Trace IDONTWANT Messages in Pubsub.
- Add Fulu fork boilerplate.
- Separate type for unaggregated network attestations. [PR](https://github.com/prysmaticlabs/prysm/pull/14659)

### Changed

Expand Down
26 changes: 26 additions & 0 deletions api/server/structs/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,32 @@ func (a *AttestationElectra) ToConsensus() (*eth.AttestationElectra, error) {
}, nil
}

func (a *SingleAttestation) ToConsensus() (*eth.SingleAttestation, error) {
ci, err := strconv.ParseUint(a.CommitteeIndex, 10, 64)
if err != nil {
return nil, server.NewDecodeError(err, "CommitteeIndex")
}
ai, err := strconv.ParseUint(a.AttesterIndex, 10, 64)
if err != nil {
return nil, server.NewDecodeError(err, "AttesterIndex")
}
data, err := a.Data.ToConsensus()
if err != nil {
return nil, server.NewDecodeError(err, "Data")
}
sig, err := bytesutil.DecodeHexWithLength(a.Signature, fieldparams.BLSSignatureLength)
if err != nil {
return nil, server.NewDecodeError(err, "Signature")
}

return &eth.SingleAttestation{
CommitteeId: primitives.CommitteeIndex(ci),
AttesterIndex: primitives.ValidatorIndex(ai),
Data: data,
Signature: sig,
}, nil
}

func AttElectraFromConsensus(a *eth.AttestationElectra) *AttestationElectra {
return &AttestationElectra{
AggregationBits: hexutil.Encode(a.AggregationBits),
Expand Down
7 changes: 7 additions & 0 deletions api/server/structs/other.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type AttestationElectra struct {
CommitteeBits string `json:"committee_bits"`
}

type SingleAttestation struct {
CommitteeIndex string `json:"committee_index"`
AttesterIndex string `json:"attester_index"`
Data *AttestationData `json:"data"`
Signature string `json:"signature"`
}

type AttestationData struct {
Slot string `json:"slot"`
CommitteeIndex string `json:"index"`
Expand Down
3 changes: 3 additions & 0 deletions beacon-chain/core/helpers/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func ValidateNilAttestation(attestation ethpb.Att) error {
if attestation.GetData().Target == nil {
return errors.New("attestation's target can't be nil")
}
if attestation.IsSingle() {
return nil
}
if attestation.GetAggregationBits() == nil {
return errors.New("attestation's bitfield can't be nil")
}
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/core/helpers/attestation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,16 @@ func TestValidateNilAttestation(t *testing.T) {
},
errString: "",
},
{
name: "single attestation",
attestation: &ethpb.SingleAttestation{
Data: &ethpb.AttestationData{
Target: &ethpb.Checkpoint{},
Source: &ethpb.Checkpoint{},
},
},
errString: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions beacon-chain/core/helpers/beacon_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ func TestVerifyBitfieldLength_OK(t *testing.T) {
assert.NoError(t, helpers.VerifyBitfieldLength(bf, committeeSize), "Bitfield is not validated when it was supposed to be")
}

func TestVerifyBitfieldLength_Incorrect(t *testing.T) {
helpers.ClearCache()

bf := bitfield.NewBitlist(1)
require.ErrorContains(t, "wanted participants bitfield length 2, got: 1", helpers.VerifyBitfieldLength(bf, 2))
}

func TestCommitteeAssignments_CannotRetrieveFutureEpoch(t *testing.T) {
helpers.ClearCache()

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GossipTopicMappings(topic string, epoch primitives.Epoch) proto.Message {
return gossipMessage(topic)
case AttestationSubnetTopicFormat:
if epoch >= params.BeaconConfig().ElectraForkEpoch {
return &ethpb.AttestationElectra{}
return &ethpb.SingleAttestation{}
}
return gossipMessage(topic)
case AttesterSlashingSubnetTopicFormat:
Expand Down Expand Up @@ -109,7 +109,7 @@ func init() {

// Specially handle Electra objects.
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedBeaconBlockElectra{})] = BlockSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.AttestationElectra{})] = AttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SingleAttestation{})] = AttestationSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.AttesterSlashingElectra{})] = AttesterSlashingSubnetTopicFormat
GossipTypeMapping[reflect.TypeOf(&ethpb.SignedAggregateAttestationAndProofElectra{})] = AggregateAndProofSubnetTopicFormat

Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/gossip_topic_mappings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestGossipTopicMappings_CorrectType(t *testing.T) {
_, ok = pMessage.(*ethpb.SignedBeaconBlockElectra)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttestationSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.AttestationElectra)
_, ok = pMessage.(*ethpb.SingleAttestation)
assert.Equal(t, true, ok)
pMessage = GossipTopicMappings(AttesterSlashingSubnetTopicFormat, electraForkEpoch)
_, ok = pMessage.(*ethpb.AttesterSlashingElectra)
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/types/object_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func InitializeDataMaps() {
return &ethpb.Attestation{}, nil
},
bytesutil.ToBytes4(params.BeaconConfig().ElectraForkVersion): func() (ethpb.Att, error) {
return &ethpb.AttestationElectra{}, nil
return &ethpb.SingleAttestation{}, nil
},
bytesutil.ToBytes4(params.BeaconConfig().FuluForkVersion): func() (ethpb.Att, error) {
return &ethpb.AttestationElectra{}, nil
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (s *Service) beaconEndpoints(
FinalizationFetcher: s.cfg.FinalizationFetcher,
ForkchoiceFetcher: s.cfg.ForkchoiceFetcher,
CoreService: coreService,
AttestationStateFetcher: s.cfg.AttestationReceiver,
}

const namespace = "beacon"
Expand Down
50 changes: 25 additions & 25 deletions beacon-chain/rpc/eth/beacon/handlers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,11 @@ func (s *Server) SubmitAttestationsV2(w http.ResponseWriter, r *http.Request) {
}
}

func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMessage) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) {
var sourceAttestations []*structs.AttestationElectra
func (s *Server) handleAttestationsElectra(
ctx context.Context,
data json.RawMessage,
) (attFailures []*server.IndexedVerificationFailure, failedBroadcasts []string, err error) {
var sourceAttestations []*structs.SingleAttestation

if err = json.Unmarshal(data, &sourceAttestations); err != nil {
return nil, nil, errors.Wrap(err, "failed to unmarshal attestation")
Expand All @@ -296,7 +299,7 @@ func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMes
return nil, nil, errors.New("no data submitted")
}

var validAttestations []*eth.AttestationElectra
var validAttestations []*eth.SingleAttestation
for i, sourceAtt := range sourceAttestations {
att, err := sourceAtt.ToConsensus()
if err != nil {
Expand All @@ -316,31 +319,32 @@ func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMes
validAttestations = append(validAttestations, att)
}

for i, att := range validAttestations {
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
// Note we can't send for aggregated att because we don't have selection proof.
if !att.IsAggregated() {
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
for i, singleAtt := range validAttestations {
targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
}
committee, err := corehelpers.BeaconCommitteeFromState(ctx, targetState, singleAtt.Data.Slot, singleAtt.CommitteeId)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get committee for attestation")
}
att := singleAtt.ToAttestationElectra(committee)

s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})

wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
}
committeeIndex, err := att.GetCommitteeIndex()
if err != nil {
return nil, nil, errors.Wrap(err, "failed to retrieve attestation committee index")
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), committeeIndex, att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.GetCommitteeIndex(), att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, singleAtt); err != nil {
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
continue
Expand All @@ -350,13 +354,9 @@ func (s *Server) handleAttestationsElectra(ctx context.Context, data json.RawMes
if err = s.AttestationCache.Add(att); err != nil {
log.WithError(err).Error("could not save attestation")
}
} else if att.IsAggregated() {
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save aggregated attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save unaggregated attestation")
log.WithError(err).Error("could not save attestation")
}
}
}
Expand Down
31 changes: 18 additions & 13 deletions beacon-chain/rpc/eth/beacon/handlers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,13 +498,17 @@ func TestSubmitAttestations(t *testing.T) {
c.SlotsPerEpoch = 1
params.OverrideBeaconConfig(c)

_, keys, err := util.DeterministicDepositsAndKeys(1)
_, keys, err := util.DeterministicDepositsAndKeys(2)
require.NoError(t, err)
validators := []*ethpbv1alpha1.Validator{
{
PublicKey: keys[0].PublicKey().Marshal(),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
},
{
PublicKey: keys[1].PublicKey().Marshal(),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
},
}
bs, err := util.NewBeaconState(func(state *ethpbv1alpha1.BeaconState) error {
state.Validators = validators
Expand All @@ -521,9 +525,10 @@ func TestSubmitAttestations(t *testing.T) {

chainService := &blockchainmock.ChainService{State: bs}
s := &Server{
HeadFetcher: chainService,
ChainInfoFetcher: chainService,
OperationNotifier: &blockchainmock.MockOperationNotifier{},
HeadFetcher: chainService,
ChainInfoFetcher: chainService,
OperationNotifier: &blockchainmock.MockOperationNotifier{},
AttestationStateFetcher: chainService,
}
t.Run("V1", func(t *testing.T) {
t.Run("single", func(t *testing.T) {
Expand Down Expand Up @@ -732,7 +737,7 @@ func TestSubmitAttestations(t *testing.T) {
assert.Equal(t, http.StatusOK, writer.Code)
assert.Equal(t, true, broadcaster.BroadcastCalled.Load())
assert.Equal(t, 1, broadcaster.NumAttestations())
assert.Equal(t, "0x03", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetAggregationBits()))
assert.Equal(t, primitives.ValidatorIndex(1), broadcaster.BroadcastAttestations[0].GetAttestingIndex())
assert.Equal(t, "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15", hexutil.Encode(broadcaster.BroadcastAttestations[0].GetSignature()))
assert.Equal(t, primitives.Slot(0), broadcaster.BroadcastAttestations[0].GetData().Slot)
assert.Equal(t, primitives.CommitteeIndex(0), broadcaster.BroadcastAttestations[0].GetData().CommitteeIndex)
Expand Down Expand Up @@ -2344,8 +2349,8 @@ var (
]`
singleAttElectra = `[
{
"aggregation_bits": "0x03",
"committee_bits": "0x0100000000000000",
"committee_index": "0",
"attester_index": "1",
"signature": "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15",
"data": {
"slot": "0",
Expand All @@ -2364,8 +2369,8 @@ var (
]`
multipleAttsElectra = `[
{
"aggregation_bits": "0x03",
"committee_bits": "0x0100000000000000",
"committee_index": "0",
"attester_index": "0",
"signature": "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15",
"data": {
"slot": "0",
Expand All @@ -2382,8 +2387,8 @@ var (
}
},
{
"aggregation_bits": "0x03",
"committee_bits": "0x0100000000000000",
"committee_index": "0",
"attester_index": "1",
"signature": "0x8146f4397bfd8fd057ebbcd6a67327bdc7ed5fb650533edcb6377b650dea0b6da64c14ecd60846d5c0a0cd43893d6972092500f82c9d8a955e2b58c5ed3cbe885d84008ace6bd86ba9e23652f58e2ec207cec494c916063257abf285b9b15b15",
"data": {
"slot": "0",
Expand All @@ -2403,8 +2408,8 @@ var (
// signature is invalid
invalidAttElectra = `[
{
"aggregation_bits": "0x03",
"committee_bits": "0x0100000000000000",
"committee_index": "0",
"attester_index": "0",
"signature": "0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
"data": {
"slot": "0",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/rpc/eth/beacon/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ type Server struct {
BLSChangesPool blstoexec.PoolManager
ForkchoiceFetcher blockchain.ForkchoiceFetcher
CoreService *core.Service
AttestationStateFetcher blockchain.AttestationStateFetcher
}
33 changes: 9 additions & 24 deletions beacon-chain/rpc/eth/validator/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,35 +198,20 @@ func matchingAtts(atts []ethpbalpha.Att, slot primitives.Slot, attDataRoot []byt
continue
}

root, err := att.GetData().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get attestation data root")
}
if !bytes.Equal(root[:], attDataRoot) {
continue
}

// We ignore the committee index from the request before Electra.
// This is because before Electra the committee index is part of the attestation data,
// meaning that comparing the data root is sufficient.
// Post-Electra the committee index in the data root is always 0, so we need to
// compare the committee index separately.
if postElectra {
if att.Version() >= version.Electra {
ci, err := att.GetCommitteeIndex()
if err != nil {
return nil, err
}
if ci != index {
continue
}
} else {
continue
}
} else {
// If postElectra is false and att.Version >= version.Electra, ignore the attestation.
if att.Version() >= version.Electra {
continue
}
}

root, err := att.GetData().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not get attestation data root")
}
if bytes.Equal(root[:], attDataRoot) {
if (!postElectra && att.Version() < version.Electra) || (postElectra && att.Version() >= version.Electra && att.GetCommitteeIndex() == index) {
result = append(result, att)
}
}
Expand Down
Loading

0 comments on commit c19224a

Please sign in to comment.