Skip to content

Commit

Permalink
fix: duplicate and old attestations aggregates (#1303)
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o authored Sep 26, 2024
1 parent 6156307 commit 86f4df2
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
40 changes: 21 additions & 19 deletions lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
get_operation(:voluntary_exit, count)
end

@spec get_attestations(non_neg_integer()) :: list(Types.Attestation.t())
def get_attestations(count) do
get_operation(:attestation, count)
@spec get_attestations(non_neg_integer(), Types.slot()) :: list(Types.Attestation.t())
def get_attestations(count, slot) do
slot = slot || fetch_slot!()
get_operation(:attestation, count, &ignore?(&1, slot))
end

@spec get_sync_committee_contributions() :: list(Types.SignedContributionAndProof.t())
Expand Down Expand Up @@ -141,16 +142,22 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
defp get_operation(operation, count) when operation in @operations do
# NOTE: we don't remove these from the db, since after a block is built
# :new_block will be called, and already added messages will be removed
operation
|> fetch_operation!()
|> cap_operations(count)
end

slot = fetch_slot!()

operations = fetch_operation!(operation)

if count == :all,
do: operations |> Enum.reject(&ignore?(&1, slot)),
else: operations |> Stream.reject(&ignore?(&1, slot)) |> Enum.take(count)
defp get_operation(operation, count, filter) when operation in @operations do
operation
|> fetch_operation!()
|> Stream.reject(filter)
|> cap_operations(count)
end

defp cap_operations(%Stream{} = operations, :all), do: Enum.to_list(operations)
defp cap_operations(operations, :all), do: operations
defp cap_operations(operations, count), do: Enum.take(operations, count)

@impl true
def handle_gossip_message(store, topic, msg_id, message) do
handle_gossip_message(topic, msg_id, message)
Expand All @@ -166,14 +173,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
{:ok,
%Types.SignedAggregateAndProof{message: %Types.AggregateAndProof{aggregate: aggregate}}} <-
Ssz.from_ssz(uncompressed, Types.SignedAggregateAndProof) do
votes = BitField.count(aggregate.aggregation_bits)
slot = aggregate.data.slot
root = aggregate.data.beacon_block_root |> Base.encode16()

Logger.debug(
"[Gossip] Aggregate decoded. Total attestations: #{votes}",
slot: slot,
root: root
"[Gossip] Aggregate decoded. Total attestations: #{BitField.count(aggregate.aggregation_bits)}",
slot: aggregate.data.slot,
root: aggregate.data.beacon_block_root
)

# We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment.
Expand Down Expand Up @@ -271,11 +274,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do
defp ignore?(%Types.Attestation{}, nil), do: false

defp ignore?(%Types.Attestation{data: data}, slot) do
# Right now this preset is 1, so we add every attestation ASAP, but it could be changed in the future
data.slot + ChainSpec.get("MIN_ATTESTATION_INCLUSION_DELAY") > slot
end

defp ignore?(_, _), do: false

defp update_operation(operation, f) when is_function(f) do
fetch_operation!(operation)
|> f.()
Expand Down
22 changes: 17 additions & 5 deletions lib/lambda_ethereum_consensus/validator/block_builder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
{:ok, eth1_vote} <- fetch_eth1_data(request.slot, mid_state),
{:ok, block_request} <-
request
|> Map.merge(fetch_operations_for_block())
|> Map.merge(fetch_operations_for_block(request.slot))
|> Map.put_new_lazy(:deposits, fn -> fetch_deposits(mid_state, eth1_vote) end)
|> Map.put(:blob_kzg_commitments, blobs_bundle.commitments)
|> BuildBlockRequest.validate(pre_state),
Expand Down Expand Up @@ -84,7 +84,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
graffiti: block_request.graffiti_message,
proposer_slashings: block_request.proposer_slashings,
attester_slashings: block_request.attester_slashings,
attestations: block_request.attestations,
attestations: select_best_aggregates(block_request.attestations),
deposits: block_request.deposits,
voluntary_exits: block_request.voluntary_exits,
bls_to_execution_changes: block_request.bls_to_execution_changes,
Expand Down Expand Up @@ -152,21 +152,22 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
end
end

@spec fetch_operations_for_block() :: %{
@spec fetch_operations_for_block(Types.slot()) :: %{
proposer_slashings: [Types.ProposerSlashing.t()],
attester_slashings: [Types.AttesterSlashing.t()],
attestations: [Types.Attestation.t()],
sync_committee_contributions: [Types.SyncCommitteeContribution.t()],
voluntary_exits: [Types.VoluntaryExit.t()],
bls_to_execution_changes: [Types.SignedBLSToExecutionChange.t()]
}
defp fetch_operations_for_block() do
defp fetch_operations_for_block(slot) do
%{
proposer_slashings:
ChainSpec.get("MAX_PROPOSER_SLASHINGS") |> OperationsCollector.get_proposer_slashings(),
attester_slashings:
ChainSpec.get("MAX_ATTESTER_SLASHINGS") |> OperationsCollector.get_attester_slashings(),
attestations: ChainSpec.get("MAX_ATTESTATIONS") |> OperationsCollector.get_attestations(),
attestations:
ChainSpec.get("MAX_ATTESTATIONS") |> OperationsCollector.get_attestations(slot),
sync_committee_contributions: OperationsCollector.get_sync_committee_contributions(),
voluntary_exits:
ChainSpec.get("MAX_VOLUNTARY_EXITS") |> OperationsCollector.get_voluntary_exits(),
Expand Down Expand Up @@ -209,6 +210,17 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do
signature
end

defp select_best_aggregates(attestations) do
attestations
|> Enum.group_by(& &1.data.index)
|> Enum.map(fn {_, attestations} ->
Enum.max_by(
attestations,
&(&1.aggregation_bits |> BitVector.count())
)
end)
end

defp get_sync_aggregate(contributions, slot, parent_root) do
# We group by the contributions by subcommittee index, get only the ones related to the previous slot
# and pick the one with the most amount of set bits in the aggregation bits.
Expand Down

0 comments on commit 86f4df2

Please sign in to comment.