Skip to content

Commit

Permalink
Added all the path to reach to aggregation, aggregate still WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o committed Aug 29, 2024
1 parent 8fe7938 commit 1c00037
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 2 deletions.
11 changes: 11 additions & 0 deletions lib/lambda_ethereum_consensus/p2p/gossip/sync_committee.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,17 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.SyncCommittee do
:ok
end

@spec stop_collecting(non_neg_integer()) ::
{:ok, list(Types.Attestation.t())} | {:error, String.t()}
def stop_collecting(subnet_id) do
# TODO from Attestation: implement some way to unsubscribe without leaving the topic
# TODO: This handle individual subnet_id while the other ones handle lists.
topic = topic(subnet_id)
Libp2pPort.leave_topic(topic)
Libp2pPort.join_topic(topic)
SyncSubnetInfo.stop_collecting(subnet_id)
end

defp topic(subnet_id) do
# TODO: this doesn't take into account fork digest changes
fork_context = ForkChoice.get_fork_digest() |> Base.encode16(case: :lower)
Expand Down
17 changes: 17 additions & 0 deletions lib/lambda_ethereum_consensus/validator/duties.ex
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
end
end

@spec current_sync_aggregators(duties(), Types.epoch(), Types.slot()) :: sync_committee_duties()
def current_sync_aggregators(duties, epoch, slot) do
for %{aggregation: aggregation} = duty <- sync_committee(duties, epoch),
Map.get(aggregation, slot),
Enum.any?(aggregation[slot], &(not Map.get(&1, :aggregated?))) do
duty
end
end

@spec current_attesters(duties(), Types.epoch(), Types.slot()) :: attester_duties()
def current_attesters(duties, epoch, slot) do
for %{attested?: false} = duty <- attesters(duties, epoch, slot) do
Expand Down Expand Up @@ -307,6 +316,14 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
@spec sync_committee_broadcasted(sync_committee_duty(), Types.slot()) :: sync_committee_duty()
def sync_committee_broadcasted(duty, slot), do: Map.put(duty, :last_slot_broadcasted, slot)

@spec sync_committee_aggregated(sync_committee_duty(), Types.slot()) :: sync_committee_duty()
def sync_committee_aggregated(duty, slot) do
updated_aggreagtion =
Enum.map(duty.aggregation[slot], fn agg -> Map.put(agg, :aggregated?, true) end)

put_in(duty, [:aggregation, slot], updated_aggreagtion)
end

############################
# Helpers

Expand Down
32 changes: 32 additions & 0 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,38 @@ defmodule LambdaEthereumConsensus.Validator do
}
end

@spec publish_sync_aggregate(t(), Duties.sync_committee_duty(), Types.slot()) :: :ok
def publish_sync_aggregate(%{index: validator_index, keystore: _keystore}, duty, slot) do
for subnet_id <- duty.subnet_ids do
case Gossip.SyncCommittee.stop_collecting(subnet_id) do
{:ok, messages} ->
log_md = [slot: slot, messages: messages]
log_info(validator_index, "publishing sync committee aggregate", log_md)

# aggregate_messages(messages, subnet_id)
# |> append_proof(duty.selection_proof, validator_index)
# |> append_signature(duty.signing_domain, keystore)
# |> Gossip.SyncCommittee.publish_aggregate()
# |> log_info_result(validator_index, "published sync committee aggregate", log_md)

{:error, reason} ->
log_error(validator_index, "stop collecting sync committee messages", reason)
:ok
end
end

:ok
end

# defp aggregate_messages(messages, subnet_id) do
# %Types.SyncCommitteeContribution{
# slot: List.first(messages).slot,
# beacon_block_root: List.first(messages).beacon_block_root,
# subcommittee_index: subnet_id,
# signature: Bls.aggregate(List.map(messages, & &1.signature))
# }
# end

################################
# Payload building and proposing

Expand Down
26 changes: 24 additions & 2 deletions lib/lambda_ethereum_consensus/validator/validator_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
end

defp process_tick(set, epoch, {slot, :last_third}) do
maybe_publish_aggregates(set, epoch, slot)
set
|> maybe_publish_attestation_aggregates(epoch, slot)
|> maybe_publish_sync_aggregates(epoch, slot)
end

##############################
Expand Down Expand Up @@ -197,6 +199,18 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
end
end

defp maybe_publish_sync_aggregates(set, epoch, slot) do
case Duties.current_sync_aggregators(set.duties, epoch, slot) do
[] ->
set

aggregator_duties ->
aggregator_duties
|> Enum.map(&publish_sync_aggregate(&1, slot, set.validators))
|> update_duties(set, epoch, :sync_committees, slot)
end
end

defp sync_committee_broadcast(duty, slot, head_root, validators) do
validators
|> Map.get(duty.validator_index)
Expand All @@ -205,6 +219,14 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
Duties.sync_committee_broadcasted(duty, slot)
end

defp publish_sync_aggregate(duty, slot, validators) do
validators
|> Map.get(duty.validator_index)
|> Validator.publish_sync_aggregate(duty, slot)

Duties.sync_committee_aggregated(duty, slot)
end

##############################
# Attestation

Expand All @@ -220,7 +242,7 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
end
end

defp maybe_publish_aggregates(set, epoch, slot) do
defp maybe_publish_attestation_aggregates(set, epoch, slot) do
case Duties.current_aggregators(set.duties, epoch, slot) do
[] ->
set
Expand Down

0 comments on commit 1c00037

Please sign in to comment.