Skip to content

Commit

Permalink
Refactored Duties functions out of the ValidatorSet
Browse files Browse the repository at this point in the history
  • Loading branch information
rodrigo-o committed Aug 16, 2024
1 parent d9e8d21 commit 14ce12f
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 115 deletions.
64 changes: 52 additions & 12 deletions lib/lambda_ethereum_consensus/validator/duties.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,50 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
@type attester_duties :: %{Types.slot() => [attester_duty()]}
@type proposer_duties :: %{Types.slot() => [proposer_duty()]}

@type kind :: :proposers | :attesters
@type duties :: %{attesters: attester_duties(), proposers: proposer_duties()}

############################
# Accessors

@spec current_proposer(duties(), Types.epoch(), Types.slot()) :: proposer_duty() | nil
def current_proposer(duties, epoch, slot),
do: get_in(duties, [epoch, :proposers, slot])

@spec current_attesters(duties(), Types.epoch(), Types.slot()) :: [attester_duty()]
def current_attesters(duties, epoch, slot) do
for %{attested?: false} = duty <- attesters(duties, epoch, slot) do
duty
end
end

@spec current_aggregators(duties(), Types.epoch(), Types.slot()) :: [attester_duty()]
def current_aggregators(duties, epoch, slot) do
for %{should_aggregate?: true} = duty <- attesters(duties, epoch, slot) do
duty
end
end

defp attesters(duties, epoch, slot), do: get_in(duties, [epoch, :attesters, slot]) || []

############################
# Update functions

@spec update_duties!(duties(), kind(), Types.epoch(), Types.slot(), [
attester_duty() | proposer_duties()
]) :: duties()
def update_duties!(duties, kind, epoch, slot, updated),
do: put_in(duties, [epoch, kind, slot], updated)

@spec attested(attester_duty()) :: attester_duty()
def attested(duty), do: Map.put(duty, :attested?, true)

@spec aggregated(attester_duty()) :: attester_duty()
def aggregated(duty), do: Map.put(duty, :should_aggregate?, false)

############################
# Main functions

@spec compute_proposers_for_epoch(BeaconState.t(), Types.epoch(), ValidatorSet.validators()) ::
proposer_duties()
def compute_proposers_for_epoch(%BeaconState{} = state, epoch, validators) do
Expand Down Expand Up @@ -64,18 +106,16 @@ defmodule LambdaEthereumConsensus.Validator.Duties do
case Accessors.get_beacon_committee(state, slot, committee_index) do
{:ok, committee} ->
for {validator_index, index_in_committee} <- Enum.with_index(committee),
validator = Map.get(validators, validator_index),
duty =
%{
validator_index: validator_index,
index_in_committee: index_in_committee,
committee_length: length(committee),
committee_index: committee_index,
attested?: false
}
|> update_with_aggregation_duty(state, slot, validator.keystore.privkey)
|> update_with_subnet_id(state, epoch, slot) do
duty
validator = Map.get(validators, validator_index) do
%{
validator_index: validator_index,
index_in_committee: index_in_committee,
committee_length: length(committee),
committee_index: committee_index,
attested?: false
}
|> update_with_aggregation_duty(state, slot, validator.keystore.privkey)
|> update_with_subnet_id(state, epoch, slot)
end

{:error, _} ->
Expand Down
23 changes: 20 additions & 3 deletions lib/lambda_ethereum_consensus/validator/validator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,24 @@ defmodule LambdaEthereumConsensus.Validator do
epoch = Misc.compute_epoch_at_slot(head_slot)
beacon = fetch_target_state_and_go_to_slot(epoch, head_slot, head_root)

new(keystore, beacon)
state = %__MODULE__{
index: nil,
keystore: keystore,
payload_builder: nil
}

case fetch_validator_index(beacon, state.keystore.pubkey) do
nil ->
Logger.warning(
"[Validator] Public key #{state.keystore.pubkey} not found in the validator set"
)

state

validator_index ->
log_debug(validator_index, "Setup completed")
%{state | index: validator_index}
end
end

@spec new(Keystore.t(), Types.BeaconState.t()) :: t()
Expand Down Expand Up @@ -115,9 +132,9 @@ defmodule LambdaEthereumConsensus.Validator do
end
end

@spec publish_aggregate(Duties.attester_duty(), Types.slot(), non_neg_integer(), Keystore.t()) ::
@spec publish_aggregate(t(), Duties.attester_duty(), Types.slot()) ::
:ok
def publish_aggregate(duty, slot, validator_index, keystore) do
def publish_aggregate(%{index: validator_index, keystore: keystore}, duty, slot) do
case Gossip.Attestation.stop_collecting(duty.subnet_id) do
{:ok, attestations} ->
log_md = [slot: slot, attestations: attestations]
Expand Down
187 changes: 87 additions & 100 deletions lib/lambda_ethereum_consensus/validator/validator_set.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,43 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
setup_validators(slot, head_root, keystore_dir, keystore_pass_dir)
end

defp setup_validators(_s, _r, keystore_dir, keystore_pass_dir)
when is_nil(keystore_dir) or is_nil(keystore_pass_dir) do
Logger.warning(
"[Validator] No keystore_dir or keystore_pass_dir provided. Validators won't start."
)

%__MODULE__{}
end

defp setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) do
validator_keystores = decode_validator_keystores(keystore_dir, keystore_pass_dir)
epoch = Misc.compute_epoch_at_slot(slot)

validators =
Map.new(validator_keystores, fn keystore ->
validator = Validator.new(keystore, slot, head_root)
{validator.index, validator}
end)

Logger.info("[Validator] Initialized #{Enum.count(validators)} validators")

%__MODULE__{validators: validators}
|> update_state(epoch, slot, head_root)
end

@doc """
Notify all validators of a new head.
"""
@spec notify_head(t(), Types.slot(), Types.root()) :: t()
def notify_head(set, slot, head_root) do
# TODO: Just for testing purposes, remove it later
Logger.info("[Validator] Notifying all Validators with new_head", root: head_root, slot: slot)
Logger.info("[ValidatorSet] New Head", root: head_root, slot: slot)
epoch = Misc.compute_epoch_at_slot(slot)

set
|> update_state(epoch, slot, head_root)
|> attest(epoch, slot, head_root)
|> attests(epoch, slot, head_root)
|> build_next_payload(epoch, slot, head_root)
end

Expand All @@ -59,59 +84,26 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
@spec notify_tick(t(), tuple()) :: t()
def notify_tick(%{head_root: head_root} = set, {slot, third} = slot_data) do
# TODO: Just for testing purposes, remove it later
Logger.info("[Validator] Notifying all Validators with notify_tick: #{inspect(third)}",
root: head_root,
slot: slot
)

Logger.info("[ValidatorSet] Tick #{inspect(third)}", root: head_root, slot: slot)
epoch = Misc.compute_epoch_at_slot(slot)

set
|> update_state(epoch, slot, head_root)
|> process_tick(epoch, slot_data)
end

defp process_tick(%{head_root: head_root} = set, epoch, {slot, :first_third}),
do: propose(set, epoch, slot, head_root)
defp process_tick(%{head_root: head_root} = set, epoch, {slot, :first_third}) do
propose(set, epoch, slot, head_root)
end

defp process_tick(%{head_root: head_root} = set, epoch, {slot, :second_third}) do
set
|> attest(epoch, slot, head_root)
|> attests(epoch, slot, head_root)
|> build_next_payload(epoch, slot, head_root)
end

defp process_tick(set, epoch, {slot, :last_third}),
do: publish_aggregate(set, epoch, slot)

##############################
# Setup

defp setup_validators(_s, _r, keystore_dir, keystore_pass_dir)
when is_nil(keystore_dir) or is_nil(keystore_pass_dir) do
Logger.warning(
"[Validator] No keystore_dir or keystore_pass_dir provided. Validators won't start."
)

%__MODULE__{}
end

defp setup_validators(slot, head_root, keystore_dir, keystore_pass_dir) do
validator_keystores = decode_validator_keystores(keystore_dir, keystore_pass_dir)
epoch = Misc.compute_epoch_at_slot(slot)

# This will be removed later when refactoring Validator new
beacon = Validator.fetch_target_state_and_go_to_slot(epoch, slot, head_root)

validators =
Map.new(validator_keystores, fn keystore ->
validator = Validator.new(keystore, beacon)
{validator.index, validator}
end)

Logger.info("[Validator] Initialized #{Enum.count(validators)} validators")

%__MODULE__{validators: validators}
|> update_state(epoch, slot, head_root)
defp process_tick(set, epoch, {slot, :last_third}) do
publish_aggregates(set, epoch, slot)
end

##############################
Expand Down Expand Up @@ -161,91 +153,86 @@ defmodule LambdaEthereumConsensus.ValidatorSet do
end

##############################
# Attestation and proposal

defp attest(set, epoch, slot, head_root) do
case current_attesters(set, epoch, slot) do
[] ->
set

attesters ->
Enum.map(attesters, fn {validator, duty} ->
Validator.attest(validator, duty, slot, head_root)

# Duty.attested(duty)
%{duty | attested?: true}
end)
|> then(&%{set | duties: put_in(set.duties, [epoch, :attesters, slot], &1)})
end
end

defp publish_aggregate(set, epoch, slot) do
case current_aggregators(set, epoch, slot) do
[] ->
set

aggregators ->
Enum.map(aggregators, fn {validator, duty} ->
Validator.publish_aggregate(duty, slot, validator.index, validator.keystore)

# Duty.aggregated(duty)
%{duty | should_aggregate?: false}
end)
|> then(&%{set | duties: put_in(set.duties, [epoch, :attesters, slot], &1)})
end
end
# Block proposal

defp build_next_payload(%{validators: validators} = set, epoch, slot, head_root) do
set
|> proposer(epoch, slot + 1)
|> case do
# FIXME: At a boundary slot epoch here is incorrect, we need to alway have the next epoch calculated
case Duties.current_proposer(set.duties, epoch, slot + 1) do
nil ->
set

validator_index ->
validators
|> Map.update!(validator_index, &Validator.start_payload_builder(&1, slot + 1, head_root))
|> then(&%{set | validators: &1})
|> update_validators(set)
end
end

defp propose(%{validators: validators} = set, epoch, slot, head_root) do
set
|> proposer(epoch, slot)
|> case do
case Duties.current_proposer(set.duties, epoch, slot) do
nil ->
set

validator_index ->
validators
|> Map.update!(validator_index, &Validator.propose(&1, slot, head_root))
|> then(&%{set | validators: &1})
|> update_validators(set)
end
end

defp update_validators(new_validators, set), do: %{set | validators: new_validators}

##############################
# Helpers
# Attestation

defp current_attesters(set, epoch, slot) do
set
|> attesters(epoch, slot)
|> Enum.flat_map(fn
%{attested?: false} = duty -> [{Map.get(set.validators, duty.validator_index), duty}]
_ -> []
end)
defp attests(set, epoch, slot, head_root) do
case Duties.current_attesters(set.duties, epoch, slot) do
[] ->
set

attester_duties ->
attester_duties
|> Enum.map(&attest(&1, slot, head_root, set.validators))
|> update_duties(set, epoch, :attesters, slot)
end
end

defp current_aggregators(set, epoch, slot) do
set
|> attesters(epoch, slot)
|> Enum.flat_map(fn
%{should_aggregate?: true} = duty -> [{Map.get(set.validators, duty.validator_index), duty}]
_ -> []
end)
defp publish_aggregates(set, epoch, slot) do
case Duties.current_aggregators(set.duties, epoch, slot) do
[] ->
set

aggregator_duties ->
aggregator_duties
|> Enum.map(&publish_aggregate(&1, slot, set.validators))
|> update_duties(set, epoch, :attesters, slot)
end
end

defp attest(duty, slot, head_root, validators) do
validators
|> Map.get(duty.validator_index)
|> Validator.attest(duty, slot, head_root)

Duties.attested(duty)
end

defp proposer(set, epoch, slot), do: get_in(set.duties, [epoch, :proposers, slot])
defp attesters(set, epoch, slot), do: get_in(set.duties, [epoch, :attesters, slot]) || []
defp publish_aggregate(duty, slot, validators) do
validators
|> Map.get(duty.validator_index)
|> Validator.publish_aggregate(duty, slot)

Duties.aggregated(duty)
end

defp update_duties(new_duties, set, epoch, kind, slot) do
set.duties
|> Duties.update_duties!(kind, epoch, slot, new_duties)
|> then(&%{set | duties: &1})
end

##############################
# Key management

@doc """
Get validator keystores from the keystore directory.
Expand Down

0 comments on commit 14ce12f

Please sign in to comment.