diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex index 02ec5834f..77b4dea68 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/operations_collector.ex @@ -69,9 +69,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do get_operation(:voluntary_exit, count) end - @spec get_attestations(non_neg_integer()) :: list(Types.Attestation.t()) - def get_attestations(count) do - get_operation(:attestation, count) + @spec get_attestations(non_neg_integer(), Types.slot()) :: list(Types.Attestation.t()) + def get_attestations(count, slot) do + slot = slot || fetch_slot!() + get_operation(:attestation, count, &ignore?(&1, slot)) end @spec get_sync_committee_contributions() :: list(Types.SignedContributionAndProof.t()) @@ -141,16 +142,22 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do defp get_operation(operation, count) when operation in @operations do # NOTE: we don't remove these from the db, since after a block is built # :new_block will be called, and already added messages will be removed + operation + |> fetch_operation!() + |> cap_operations(count) + end - slot = fetch_slot!() - - operations = fetch_operation!(operation) - - if count == :all, - do: operations |> Enum.reject(&ignore?(&1, slot)), - else: operations |> Stream.reject(&ignore?(&1, slot)) |> Enum.take(count) + defp get_operation(operation, count, filter) when operation in @operations do + operation + |> fetch_operation!() + |> Stream.reject(filter) + |> cap_operations(count) end + defp cap_operations(%Stream{} = operations, :all), do: Enum.to_list(operations) + defp cap_operations(operations, :all), do: operations + defp cap_operations(operations, count), do: Enum.take(operations, count) + @impl true def handle_gossip_message(store, topic, msg_id, message) do handle_gossip_message(topic, msg_id, message) @@ -166,14 +173,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do {:ok, %Types.SignedAggregateAndProof{message: %Types.AggregateAndProof{aggregate: aggregate}}} <- Ssz.from_ssz(uncompressed, Types.SignedAggregateAndProof) do - votes = BitField.count(aggregate.aggregation_bits) - slot = aggregate.data.slot - root = aggregate.data.beacon_block_root |> Base.encode16() - Logger.debug( - "[Gossip] Aggregate decoded. Total attestations: #{votes}", - slot: slot, - root: root + "[Gossip] Aggregate decoded. Total attestations: #{BitField.count(aggregate.aggregation_bits)}", + slot: aggregate.data.slot, + root: aggregate.data.beacon_block_root ) # We are getting ~500 attestations in half a second. This is overwhelming the store GenServer at the moment. @@ -271,11 +274,10 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.OperationsCollector do defp ignore?(%Types.Attestation{}, nil), do: false defp ignore?(%Types.Attestation{data: data}, slot) do + # Right now this preset is 1, so we add every attestation ASAP, but it could be changed in the future data.slot + ChainSpec.get("MIN_ATTESTATION_INCLUSION_DELAY") > slot end - defp ignore?(_, _), do: false - defp update_operation(operation, f) when is_function(f) do fetch_operation!(operation) |> f.() diff --git a/lib/lambda_ethereum_consensus/validator/block_builder.ex b/lib/lambda_ethereum_consensus/validator/block_builder.ex index fc1d5edc7..aa6aa07d4 100644 --- a/lib/lambda_ethereum_consensus/validator/block_builder.ex +++ b/lib/lambda_ethereum_consensus/validator/block_builder.ex @@ -42,7 +42,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do {:ok, eth1_vote} <- fetch_eth1_data(request.slot, mid_state), {:ok, block_request} <- request - |> Map.merge(fetch_operations_for_block()) + |> Map.merge(fetch_operations_for_block(request.slot)) |> Map.put_new_lazy(:deposits, fn -> fetch_deposits(mid_state, eth1_vote) end) |> Map.put(:blob_kzg_commitments, blobs_bundle.commitments) |> BuildBlockRequest.validate(pre_state), @@ -84,7 +84,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do graffiti: block_request.graffiti_message, proposer_slashings: block_request.proposer_slashings, attester_slashings: block_request.attester_slashings, - attestations: block_request.attestations, + attestations: select_best_aggregates(block_request.attestations), deposits: block_request.deposits, voluntary_exits: block_request.voluntary_exits, bls_to_execution_changes: block_request.bls_to_execution_changes, @@ -152,7 +152,7 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do end end - @spec fetch_operations_for_block() :: %{ + @spec fetch_operations_for_block(Types.slot()) :: %{ proposer_slashings: [Types.ProposerSlashing.t()], attester_slashings: [Types.AttesterSlashing.t()], attestations: [Types.Attestation.t()], @@ -160,13 +160,14 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do voluntary_exits: [Types.VoluntaryExit.t()], bls_to_execution_changes: [Types.SignedBLSToExecutionChange.t()] } - defp fetch_operations_for_block() do + defp fetch_operations_for_block(slot) do %{ proposer_slashings: ChainSpec.get("MAX_PROPOSER_SLASHINGS") |> OperationsCollector.get_proposer_slashings(), attester_slashings: ChainSpec.get("MAX_ATTESTER_SLASHINGS") |> OperationsCollector.get_attester_slashings(), - attestations: ChainSpec.get("MAX_ATTESTATIONS") |> OperationsCollector.get_attestations(), + attestations: + ChainSpec.get("MAX_ATTESTATIONS") |> OperationsCollector.get_attestations(slot), sync_committee_contributions: OperationsCollector.get_sync_committee_contributions(), voluntary_exits: ChainSpec.get("MAX_VOLUNTARY_EXITS") |> OperationsCollector.get_voluntary_exits(), @@ -209,6 +210,17 @@ defmodule LambdaEthereumConsensus.Validator.BlockBuilder do signature end + defp select_best_aggregates(attestations) do + attestations + |> Enum.group_by(& &1.data.index) + |> Enum.map(fn {_, attestations} -> + Enum.max_by( + attestations, + &(&1.aggregation_bits |> BitVector.count()) + ) + end) + end + defp get_sync_aggregate(contributions, slot, parent_root) do # We group by the contributions by subcommittee index, get only the ones related to the previous slot # and pick the one with the most amount of set bits in the aggregation bits.