From 1a59abf95b9b02c662b384081c8688209e338eef Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 2 Sep 2024 22:08:59 -0300 Subject: [PATCH] Contribution generated and published, but not validated to be picked up --- .../p2p/gossip/sync_committee.ex | 9 ++ .../state_transition/accessors.ex | 22 +++++ .../validator/duties.ex | 4 +- .../validator/utils.ex | 34 ++++---- .../validator/validator.ex | 83 +++++++++++++++---- .../validator/validator_set.ex | 12 +-- lib/utils/bit_list.ex | 10 ++- 7 files changed, 134 insertions(+), 40 deletions(-) diff --git a/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex b/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex index dade74b98..8a91de6c3 100644 --- a/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex +++ b/lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex @@ -62,6 +62,15 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do :ok end + @spec publish_contribution(Types.SignedContributionAndProof.t()) :: :ok + def publish_contribution(%Types.SignedContributionAndProof{} = signed_contribution) do + fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower) + topic = "/eth2/#{fork_context}/sync_committee_contribution_and_proof/ssz_snappy" + {:ok, encoded} = SszEx.encode(signed_contribution, Types.SignedContributionAndProof) + {:ok, message} = :snappyer.compress(encoded) + Libp2pPort.publish(topic, message) + end + @spec collect([non_neg_integer()], Types.SyncCommitteeMessage.t()) :: :ok def collect(subnet_ids, message) do join(subnet_ids) diff --git a/lib/lambda_ethereum_consensus/state_transition/accessors.ex b/lib/lambda_ethereum_consensus/state_transition/accessors.ex index 3d631c80f..a2b041aec 100644 --- a/lib/lambda_ethereum_consensus/state_transition/accessors.ex +++ b/lib/lambda_ethereum_consensus/state_transition/accessors.ex @@ -19,6 +19,28 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do @max_random_byte 2 ** 8 - 1 + @doc """ + Compute the correct sync committee for a given `epoch`. + """ + def get_sync_committee_for_epoch!(%BeaconState{} = state, epoch) do + sync_committee_period = Misc.compute_sync_committee_period(epoch) + current_epoch = get_current_epoch(state) + current_sync_committee_period = Misc.compute_sync_committee_period(current_epoch) + next_sync_committee_period = current_sync_committee_period + 1 + + case sync_committee_period do + ^current_sync_committee_period -> + state.current_sync_committee + + ^next_sync_committee_period -> + state.next_sync_committee + + _ -> + raise ArgumentError, + "Invalid epoch #{epoch}, should be in the current or next sync committee period" + end + end + @doc """ Return the next sync committee, with possible pubkey duplicates. """ diff --git a/lib/lambda_ethereum_consensus/validator/duties.ex b/lib/lambda_ethereum_consensus/validator/duties.ex index 7e3fc3e66..e0ba1ec19 100644 --- a/lib/lambda_ethereum_consensus/validator/duties.ex +++ b/lib/lambda_ethereum_consensus/validator/duties.ex @@ -168,8 +168,8 @@ defmodule LambdaEthereumConsensus.Validator.Duties do validator_privkey ) - domain_sc_selection_proof = Constants.domain_sync_committee_selection_proof() - domain = Accessors.get_domain(beacon_state, domain_sc_selection_proof, epoch) + domain_contribution_and_proof = Constants.domain_contribution_and_proof() + domain = Accessors.get_domain(beacon_state, domain_contribution_and_proof, epoch) if Utils.sync_committee_aggregator?(proof) do aggregation = %{ diff --git a/lib/lambda_ethereum_consensus/validator/utils.ex b/lib/lambda_ethereum_consensus/validator/utils.ex index a4e969913..7595e57bf 100644 --- a/lib/lambda_ethereum_consensus/validator/utils.ex +++ b/lib/lambda_ethereum_consensus/validator/utils.ex @@ -82,24 +82,28 @@ defmodule LambdaEthereumConsensus.Validator.Utils do @spec assigned_to_sync_committee?(BeaconState.t(), Types.epoch(), Types.validator_index()) :: boolean() def assigned_to_sync_committee?(%BeaconState{} = state, epoch, validator_index) do - sync_committee_period = Misc.compute_sync_committee_period(epoch) - current_epoch = Accessors.get_current_epoch(state) - current_sync_committee_period = Misc.compute_sync_committee_period(current_epoch) - next_sync_committee_period = current_sync_committee_period + 1 + target_pubkey = state.validators |> Map.get(validator_index, %{}) |> Map.get(:pubkey) - pubkey = state.validators[validator_index].pubkey - - case sync_committee_period do - ^current_sync_committee_period -> - Enum.member?(state.current_sync_committee.pubkeys, pubkey) + target_pubkey && target_pubkey in Accessors.get_sync_committee_for_epoch!(state, epoch) + end - ^next_sync_committee_period -> - Enum.member?(state.next_sync_committee.pubkeys, pubkey) + @spec participants_per_sync_subcommittee(BeaconState.t(), Types.epoch()) :: + %{non_neg_integer() => [Bls.pubkey()]} + def participants_per_sync_subcommittee(state, epoch) do + sync_committee_subnet_size = + div(ChainSpec.get("SYNC_COMMITTEE_SIZE"), Constants.sync_committee_subnet_count()) - _ -> - raise ArgumentError, - "Invalid epoch #{epoch}, should be in the current or next sync committee period" - end + state + |> Accessors.get_sync_committee_for_epoch!(epoch) + |> Map.get(:pubkeys) + |> Enum.chunk_every(sync_committee_subnet_size) + |> Enum.with_index() + |> Map.new(fn {pubkeys, i} -> + indices_by_pubkeys = + pubkeys |> Enum.with_index() |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + + {i, indices_by_pubkeys} + end) end @spec get_sync_committee_selection_proof( diff --git a/lib/lambda_ethereum_consensus/validator/validator.ex b/lib/lambda_ethereum_consensus/validator/validator.ex index 960c5689f..4f8f6cdea 100644 --- a/lib/lambda_ethereum_consensus/validator/validator.ex +++ b/lib/lambda_ethereum_consensus/validator/validator.ex @@ -271,19 +271,40 @@ defmodule LambdaEthereumConsensus.Validator do } end - @spec publish_sync_aggregate(t(), Duties.sync_committee_duty(), Types.slot()) :: :ok - def publish_sync_aggregate(%{index: validator_index, keystore: _keystore}, duty, slot) do + @spec publish_sync_aggregate( + t(), + Duties.sync_committee_duty(), + Types.epoch(), + Types.slot(), + Types.root() + ) :: :ok + def publish_sync_aggregate( + %{index: validator_index, keystore: keystore}, + duty, + epoch, + slot, + head_root + ) do + head_state = BlockStates.get_state_info!(head_root).beacon_state |> go_to_slot(slot) + for subnet_id <- duty.subnet_ids do case Gossip.SyncCommittee.stop_collecting(subnet_id) do {:ok, messages} -> log_md = [slot: slot, messages: messages] log_info(validator_index, "publishing sync committee aggregate", log_md) - # aggregate_messages(messages, subnet_id) - # |> append_proof(duty.selection_proof, validator_index) - # |> append_signature(duty.signing_domain, keystore) - # |> Gossip.SyncCommittee.publish_aggregate() - # |> log_info_result(validator_index, "published sync committee aggregate", log_md) + aggregation_bits = + sync_committee_aggregation_bits(head_state, subnet_id, epoch, messages) + + aggregation_duty = + Enum.find(duty.aggregation[slot], &(&1.subcommittee_index == subnet_id)) + + messages + |> sync_committee_contribution(subnet_id, aggregation_bits) + |> append_sync_proof(aggregation_duty.selection_proof, validator_index) + |> append_sync_signature(aggregation_duty.signing_domain, keystore) + |> Gossip.SyncCommittee.publish_contribution() + |> log_info_result(validator_index, "published sync committee aggregate", log_md) {:error, reason} -> log_error(validator_index, "stop collecting sync committee messages", reason) @@ -294,14 +315,46 @@ defmodule LambdaEthereumConsensus.Validator do :ok end - # defp aggregate_messages(messages, subnet_id) do - # %Types.SyncCommitteeContribution{ - # slot: List.first(messages).slot, - # beacon_block_root: List.first(messages).beacon_block_root, - # subcommittee_index: subnet_id, - # signature: Bls.aggregate(List.map(messages, & &1.signature)) - # } - # end + defp sync_committee_contribution(messages, subnet_id, aggregation_bits) do + {:ok, signature} = Bls.aggregate(Enum.map(messages, & &1.signature)) + + %Types.SyncCommitteeContribution{ + slot: List.first(messages).slot, + beacon_block_root: List.first(messages).beacon_block_root, + subcommittee_index: subnet_id, + aggregation_bits: aggregation_bits, + signature: signature + } + end + + defp append_sync_proof(contribution, proof, validator_index) do + %Types.ContributionAndProof{ + aggregator_index: validator_index, + contribution: contribution, + selection_proof: proof + } + end + + defp append_sync_signature(contribution_and_proof, signing_domain, %{privkey: privkey}) do + signing_root = Misc.compute_signing_root(contribution_and_proof, signing_domain) + {:ok, signature} = Bls.sign(privkey, signing_root) + %Types.SignedContributionAndProof{message: contribution_and_proof, signature: signature} + end + + defp sync_committee_aggregation_bits(state, subnet_id, epoch, messages) do + indexes_in_subcommittee = + state + |> Utils.participants_per_sync_subcommittee(epoch) + |> Map.get(subnet_id) + |> Map.new(fn {pubkey, indexes} -> {fetch_validator_index(state, pubkey), indexes} end) + + aggregation_bits = indexes_in_subcommittee |> Enum.count() |> BitList.zero() + + for %{validator_index: validator_index} <- messages, reduce: aggregation_bits do + acc -> + BitList.set(acc, indexes_in_subcommittee |> Map.get(validator_index)) + end + end ################################ # Payload building and proposing diff --git a/lib/lambda_ethereum_consensus/validator/validator_set.ex b/lib/lambda_ethereum_consensus/validator/validator_set.ex index 6d7942337..10b7cb6ee 100644 --- a/lib/lambda_ethereum_consensus/validator/validator_set.ex +++ b/lib/lambda_ethereum_consensus/validator/validator_set.ex @@ -109,10 +109,10 @@ defmodule LambdaEthereumConsensus.ValidatorSet do |> maybe_sync_committee_broadcasts(slot, head_root) end - defp process_tick(set, epoch, {slot, :last_third}) do + defp process_tick(%{head_root: head_root} = set, epoch, {slot, :last_third}) do set |> maybe_publish_attestation_aggregates(epoch, slot) - |> maybe_publish_sync_aggregates(epoch, slot) + |> maybe_publish_sync_aggregates(epoch, slot, head_root) end ############################## @@ -199,14 +199,14 @@ defmodule LambdaEthereumConsensus.ValidatorSet do end end - defp maybe_publish_sync_aggregates(set, epoch, slot) do + defp maybe_publish_sync_aggregates(set, epoch, slot, head_root) do case Duties.current_sync_aggregators(set.duties, epoch, slot) do [] -> set aggregator_duties -> aggregator_duties - |> Enum.map(&publish_sync_aggregate(&1, slot, set.validators)) + |> Enum.map(&publish_sync_aggregate(&1, epoch, slot, head_root, set.validators)) |> update_duties(set, epoch, :sync_committees, slot) end end @@ -219,10 +219,10 @@ defmodule LambdaEthereumConsensus.ValidatorSet do Duties.sync_committee_broadcasted(duty, slot) end - defp publish_sync_aggregate(duty, slot, validators) do + defp publish_sync_aggregate(duty, epoch, slot, head_root, validators) do validators |> Map.get(duty.validator_index) - |> Validator.publish_sync_aggregate(duty, slot) + |> Validator.publish_sync_aggregate(duty, epoch, slot, head_root) Duties.sync_committee_aggregated(duty, slot) end diff --git a/lib/utils/bit_list.ex b/lib/utils/bit_list.ex index fdb5f0786..7000d3271 100644 --- a/lib/utils/bit_list.ex +++ b/lib/utils/bit_list.ex @@ -67,9 +67,15 @@ defmodule LambdaEthereumConsensus.Utils.BitList do def set?(bit_list, index), do: BitField.set?(bit_list, index) @doc """ - Sets a bit (turns it to 1). - Equivalent to bit_list[index] = 1. + Set a bit or list of bits (turns them to 1). + Equivalent to bit_list[index] = 1. If indexes is a list, + it will do it for every index in the list. """ + @spec set(t, [non_neg_integer]) :: t + def set(bit_list, indexes) when is_list(indexes) do + Enum.reduce(indexes, bit_list, fn index, acc -> set(acc, index) end) + end + @spec set(t, non_neg_integer) :: t def set(bit_list, index), do: BitField.set(bit_list, index)