Skip to content

Commit

Permalink
Merge pull request #2038 from ssvlabs/stage
Browse files Browse the repository at this point in the history
stage -> main
  • Loading branch information
moshe-blox authored Feb 19, 2025
2 parents e12abf7 + 83e58c0 commit 9aeac90
Show file tree
Hide file tree
Showing 28 changed files with 819 additions and 264 deletions.
91 changes: 83 additions & 8 deletions beacon/goclient/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/electra"
"github.com/attestantio/go-eth2-client/spec/phase0"
ssz "github.com/ferranbt/fastssz"
"go.uber.org/zap"
Expand All @@ -29,10 +30,13 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd
return nil, DataVersionNil, fmt.Errorf("validator is not an aggregator")
}

attData, _, err := gc.GetAttestationData(slot, committeeIndex)
attData, _, err := gc.GetAttestationData(slot)
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to get attestation data: %w", err)
}
if gc.DataVersion(gc.network.EstimatedEpochAtSlot(attData.Slot)) < spec.DataVersionElectra {
attData.Index = committeeIndex
}

// Get aggregate attestation data.
root, err := attData.HashTreeRoot()
Expand All @@ -44,6 +48,7 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd
aggDataResp, err := gc.multiClient.AggregateAttestation(gc.ctx, &api.AggregateAttestationOpts{
Slot: slot,
AttestationDataRoot: root,
CommitteeIndex: committeeIndex,
})
recordRequestDuration(gc.ctx, "AggregateAttestation", gc.multiClient.Address(), http.MethodGet, time.Since(aggDataReqStart), err)
if err != nil {
Expand All @@ -69,22 +74,92 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd
var selectionProof phase0.BLSSignature
copy(selectionProof[:], slotSig)

return &phase0.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data,
SelectionProof: selectionProof,
}, spec.DataVersionPhase0, nil
switch aggDataResp.Data.Version {
case spec.DataVersionElectra:
if aggDataResp.Data.Electra == nil {
gc.log.Error(clNilResponseForkDataErrMsg,
zap.String("api", "AggregateAttestation"),
)
return nil, DataVersionNil, fmt.Errorf("aggregate attestation electra data is nil")
}
return &electra.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data.Electra,
SelectionProof: selectionProof,
}, aggDataResp.Data.Version, nil
case spec.DataVersionDeneb:
if aggDataResp.Data.Deneb == nil {
gc.log.Error(clNilResponseForkDataErrMsg,
zap.String("api", "AggregateAttestation"),
)
return nil, DataVersionNil, fmt.Errorf("aggregate attestation deneb data is nil")
}
return &phase0.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data.Deneb,
SelectionProof: selectionProof,
}, aggDataResp.Data.Version, nil
case spec.DataVersionCapella:
if aggDataResp.Data.Capella == nil {
gc.log.Error(clNilResponseForkDataErrMsg,
zap.String("api", "AggregateAttestation"),
)
return nil, DataVersionNil, fmt.Errorf("aggregate attestation capella data is nil")
}
return &phase0.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data.Capella,
SelectionProof: selectionProof,
}, aggDataResp.Data.Version, nil
case spec.DataVersionBellatrix:
if aggDataResp.Data.Bellatrix == nil {
gc.log.Error(clNilResponseForkDataErrMsg,
zap.String("api", "AggregateAttestation"),
)
return nil, DataVersionNil, fmt.Errorf("aggregate attestation bellatrix data is nil")
}
return &phase0.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data.Bellatrix,
SelectionProof: selectionProof,
}, aggDataResp.Data.Version, nil
case spec.DataVersionAltair:
if aggDataResp.Data.Altair == nil {
gc.log.Error(clNilResponseForkDataErrMsg,
zap.String("api", "AggregateAttestation"),
)
return nil, DataVersionNil, fmt.Errorf("aggregate attestation altair data is nil")
}
return &phase0.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data.Altair,
SelectionProof: selectionProof,
}, aggDataResp.Data.Version, nil
default:
if aggDataResp.Data.Phase0 == nil {
gc.log.Error(clNilResponseForkDataErrMsg,
zap.String("api", "AggregateAttestation"),
)
return nil, DataVersionNil, fmt.Errorf("aggregate attestation phase0 data is nil")
}
return &phase0.AggregateAndProof{
AggregatorIndex: index,
Aggregate: aggDataResp.Data.Phase0,
SelectionProof: selectionProof,
}, aggDataResp.Data.Version, nil
}
}

// SubmitSignedAggregateSelectionProof broadcasts a signed aggregator msg
func (gc *GoClient) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggregateAndProof) error {
func (gc *GoClient) SubmitSignedAggregateSelectionProof(msg *spec.VersionedSignedAggregateAndProof) error {
clientAddress := gc.multiClient.Address()
logger := gc.log.With(
zap.String("api", "SubmitAggregateAttestations"),
zap.String("client_addr", clientAddress))

start := time.Now()
err := gc.multiClient.SubmitAggregateAttestations(gc.ctx, []*phase0.SignedAggregateAndProof{msg})

err := gc.multiClient.SubmitAggregateAttestations(gc.ctx, &api.SubmitAggregateAttestationsOpts{SignedAggregateAndProofs: []*spec.VersionedSignedAggregateAndProof{msg}})
recordRequestDuration(gc.ctx, "SubmitAggregateAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err)
if err != nil {
logger.Error(clResponseErrMsg, zap.Error(err))
Expand Down
50 changes: 13 additions & 37 deletions beacon/goclient/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,26 @@ func (gc *GoClient) AttesterDuties(ctx context.Context, epoch phase0.Epoch, vali
return resp.Data, nil
}

// GetAttestationData returns attestation data for a given slot and committeeIndex.
// GetAttestationData returns attestation data for a given slot.
// Multiple calls for the same slot are joined into a single request, after which
// the result is cached for a short duration, deep copied and returned
// with the provided committeeIndex set.
func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.CommitteeIndex) (
func (gc *GoClient) GetAttestationData(slot phase0.Slot) (
*phase0.AttestationData,
spec.DataVersion,
error,
) {
// Check cache.
cachedResult := gc.attestationDataCache.Get(slot)
if cachedResult != nil {
data, err := withCommitteeIndex(cachedResult.Value(), committeeIndex)
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to set committee index: %w", err)
}
return data, spec.DataVersionPhase0, nil
}

// Have to make beacon node request and cache the result.
result, err, _ := gc.attestationReqInflight.Do(slot, func() (*phase0.AttestationData, error) {
// Check cache.
cachedResult := gc.attestationDataCache.Get(slot)
if cachedResult != nil {
return cachedResult.Value(), nil
}

attDataReqStart := time.Now()
resp, err := gc.multiClient.AttestationData(gc.ctx, &api.AttestationDataOpts{
Slot: slot,
Slot: slot,
CommitteeIndex: 0,
})

recordRequestDuration(gc.ctx, "AttestationData", gc.multiClient.Address(), http.MethodGet, time.Since(attDataReqStart), err)
Expand Down Expand Up @@ -97,38 +93,18 @@ func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.C
return nil, DataVersionNil, err
}

data, err := withCommitteeIndex(result, committeeIndex)
if err != nil {
return nil, DataVersionNil, fmt.Errorf("failed to set committee index: %w", err)
}
return data, spec.DataVersionPhase0, nil
}

// withCommitteeIndex returns a deep copy of the attestation data with the provided committee index set.
func withCommitteeIndex(data *phase0.AttestationData, committeeIndex phase0.CommitteeIndex) (*phase0.AttestationData, error) {
// Marshal & unmarshal to make a deep copy. This is safer because it won't break silently if
// a new field is added to AttestationData.
ssz, err := data.MarshalSSZ()
if err != nil {
return nil, fmt.Errorf("failed to marshal attestation data: %w", err)
}
var cpy phase0.AttestationData
if err := cpy.UnmarshalSSZ(ssz); err != nil {
return nil, fmt.Errorf("failed to unmarshal attestation data: %w", err)
}
cpy.Index = committeeIndex
return &cpy, nil
return result, spec.DataVersionPhase0, nil
}

// SubmitAttestations implements Beacon interface
func (gc *GoClient) SubmitAttestations(attestations []*phase0.Attestation) error {
func (gc *GoClient) SubmitAttestations(attestations []*spec.VersionedAttestation) error {
clientAddress := gc.multiClient.Address()
logger := gc.log.With(
zap.String("api", "SubmitAttestations"),
zap.String("client_addr", clientAddress))

start := time.Now()
err := gc.multiClient.SubmitAttestations(gc.ctx, attestations)
err := gc.multiClient.SubmitAttestations(gc.ctx, &api.SubmitAttestationsOpts{Attestations: attestations})
recordRequestDuration(gc.ctx, "SubmitAttestations", clientAddress, http.MethodPost, time.Since(start), err)
if err != nil {
logger.Error(clResponseErrMsg, zap.Error(err))
Expand Down
28 changes: 12 additions & 16 deletions beacon/goclient/attest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ func TestGoClient_GetAttestationData(t *testing.T) {
"SYNC_COMMITTEE_SUBNET_COUNT": "4",
"TARGET_AGGREGATORS_PER_COMMITTEE": "16",
"TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE": "16",
"INTERVALS_PER_SLOT": "3"
"INTERVALS_PER_SLOT": "3",
"ALTAIR_FORK_EPOCH": "74240",
"BELLATRIX_FORK_EPOCH": "144896",
"CAPELLA_FORK_EPOCH": "194048",
"DENEB_FORK_EPOCH": "269568",
"ELECTRA_FORK_EPOCH": "18446744073709551615"
}
}`),
"/eth/v1/beacon/genesis": []byte(`{
Expand Down Expand Up @@ -152,8 +157,6 @@ func TestGoClient_GetAttestationData(t *testing.T) {
t.Run("requests must be cached (per slot)", func(t *testing.T) {
slot1 := phase0.Slot(12345678)
slot2 := phase0.Slot(12345679)
committeeIndex1 := phase0.CommitteeIndex(1)
committeeIndex2 := phase0.CommitteeIndex(2)

server, serverGotRequests := newMockServer()

Expand All @@ -177,23 +180,21 @@ func TestGoClient_GetAttestationData(t *testing.T) {
require.NoError(t, err)

// First request with slot1.
gotResult1a, gotVersion, err := client.GetAttestationData(slot1, committeeIndex1)
gotResult1a, gotVersion, err := client.GetAttestationData(slot1)
require.NoError(t, err)
require.Equal(t, spec.DataVersionPhase0, gotVersion)
require.Equal(t, slot1, gotResult1a.Slot)
require.Equal(t, committeeIndex1, gotResult1a.Index)
require.NotEmpty(t, gotResult1a.BeaconBlockRoot)
require.NotEmpty(t, gotResult1a.Source.Epoch)
require.NotEmpty(t, gotResult1a.Source.Root)
require.NotEmpty(t, gotResult1a.Target.Epoch)
require.NotEmpty(t, gotResult1a.Target.Root)

// Second request with slot1, result should have been cached.
gotResult1b, gotVersion, err := client.GetAttestationData(slot1, committeeIndex1)
gotResult1b, gotVersion, err := client.GetAttestationData(slot1)
require.NoError(t, err)
require.Equal(t, spec.DataVersionPhase0, gotVersion)
require.Equal(t, slot1, gotResult1b.Slot)
require.Equal(t, committeeIndex1, gotResult1b.Index)
require.NotEmpty(t, gotResult1b.BeaconBlockRoot)
require.NotEmpty(t, gotResult1b.Source.Epoch)
require.NotEmpty(t, gotResult1b.Source.Root)
Expand All @@ -207,23 +208,21 @@ func TestGoClient_GetAttestationData(t *testing.T) {
require.Equal(t, gotResult1b.Target.Root, gotResult1a.Target.Root)

// Third request with slot2.
gotResult2a, gotVersion, err := client.GetAttestationData(slot2, committeeIndex2)
gotResult2a, gotVersion, err := client.GetAttestationData(slot2)
require.NoError(t, err)
require.Equal(t, spec.DataVersionPhase0, gotVersion)
require.Equal(t, slot2, gotResult2a.Slot)
require.Equal(t, committeeIndex2, gotResult2a.Index)
require.NotEmpty(t, gotResult2a.BeaconBlockRoot)
require.NotEmpty(t, gotResult2a.Source.Epoch)
require.NotEmpty(t, gotResult2a.Source.Root)
require.NotEmpty(t, gotResult2a.Target.Epoch)
require.NotEmpty(t, gotResult2a.Target.Root)

// Fourth request with slot2, result should have been cached.
gotResult2b, gotVersion, err := client.GetAttestationData(slot2, committeeIndex2)
gotResult2b, gotVersion, err := client.GetAttestationData(slot2)
require.NoError(t, err)
require.Equal(t, spec.DataVersionPhase0, gotVersion)
require.Equal(t, slot2, gotResult2b.Slot)
require.Equal(t, committeeIndex2, gotResult2b.Index)
require.NotEmpty(t, gotResult2b.BeaconBlockRoot)
require.NotEmpty(t, gotResult2b.Source.Epoch)
require.NotEmpty(t, gotResult2b.Source.Root)
Expand All @@ -237,11 +236,10 @@ func TestGoClient_GetAttestationData(t *testing.T) {
require.Equal(t, gotResult2b.Target.Root, gotResult2a.Target.Root)

// Second request with slot1, result STILL should be cached.
gotResult1c, gotVersion, err := client.GetAttestationData(slot1, committeeIndex1)
gotResult1c, gotVersion, err := client.GetAttestationData(slot1)
require.NoError(t, err)
require.Equal(t, spec.DataVersionPhase0, gotVersion)
require.Equal(t, slot1, gotResult1c.Slot)
require.Equal(t, committeeIndex1, gotResult1c.Index)
require.NotEmpty(t, gotResult1c.BeaconBlockRoot)
require.NotEmpty(t, gotResult1c.Source.Epoch)
require.NotEmpty(t, gotResult1c.Source.Root)
Expand Down Expand Up @@ -297,13 +295,11 @@ func TestGoClient_GetAttestationData(t *testing.T) {
p := pool.New()
for i := 0; i < 1000; i++ {
slot := phase0.Slot(slotStartPos + i%slotsTotalCnt)
committeeIndex := phase0.CommitteeIndex(i % 64)
p.Go(func() {
gotResult, gotVersion, err := client.GetAttestationData(slot, committeeIndex)
gotResult, gotVersion, err := client.GetAttestationData(slot)
require.NoError(t, err)
require.Equal(t, spec.DataVersionPhase0, gotVersion)
require.Equal(t, slot, gotResult.Slot)
require.Equal(t, committeeIndex, gotResult.Index)

prevResult, ok := gotResults.GetOrSet(slot, gotResult)
if ok {
Expand Down
Loading

0 comments on commit 9aeac90

Please sign in to comment.