Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(statement-distribution):Implement FanIn approach to statement distribution #4406

Open
wants to merge 17 commits into
base: feat/parachain
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
25771c5
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Dec 12, 2024
dabff42
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Dec 24, 2024
ca8be50
Merge branch 'feat/parachain' into feature/implement_FanInApproach
DanielDDHM Dec 24, 2024
9cc4039
Merge branch 'feat/parachain' into feature/implement_FanInApproach
DanielDDHM Jan 8, 2025
1bdca50
feat(prospective-parachains) implements fanin operation
DanielDDHM Jan 8, 2025
2d038e6
Merge branch 'feature/implement_FanInApproach' of github.com:DanielDD…
DanielDDHM Jan 8, 2025
a67abba
feat(prospective-parachains) implements fanin operation
DanielDDHM Jan 8, 2025
4de9b32
feat(prospective-parachains) implements fanin operation
DanielDDHM Jan 9, 2025
efbb2c4
feat(prospective-parachains) implements fanin operation
DanielDDHM Jan 9, 2025
959624f
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
22477cd
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
9d17d2e
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
4e19d34
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
6467711
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
1ab8692
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
71e3b84
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
35547c4
feat(statement-distribution):Implement FanIn approach to statement di…
DanielDDHM Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions dot/parachain/statement-distribution/statement_distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,84 @@ package statementdistribution

import (
"context"
"time"

"github.com/ChainSafe/gossamer/internal/log"

statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages"
parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util"
)

var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution"))

type StatementDistribution struct {
reputationAggregator *parachainutil.ReputationAggregator
SubSystemToOverseer chan<- parachainutil.NetworkBridgeTxMessage
}

func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) {
func (s *StatementDistribution) Run(
ctx context.Context,
overseerToSubSystem <-chan any,
v2CommChannel <-chan any,
receiverRespCh <-chan any,
retryReqCh <-chan any,
DanielDDHM marked this conversation as resolved.
Show resolved Hide resolved
) {
// Timer for reputation aggregator trigger
reputationDelay := time.NewTicker(parachainutil.ReputationChangeInterval) // Adjust the duration as needed
defer reputationDelay.Stop()

for {
select {
case msg, ok := <-overseerToSubSystem:
if !ok {
return
}
case msg := <-overseerToSubSystem:
err := s.processMessage(msg)
if err != nil {
logger.Errorf("processing overseer message: %w", err)
logger.Errorf("error processing overseer message: %v", err)
}
// case _ = <-v2CommChannel:
// panic("Not Implemented")
// case _ = <-receiverRespCh:
// panic("Not Implemented")
// case _ = <-retryReqCh:
logger.Infof("received retry request, no action taken")
case <-reputationDelay.C:
// Trigger reputation aggregator logic
s.reputationAggregator.Send(s.SubSystemToOverseer)
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.Errorf("ctx error: %v\n", err)
}
logger.Infof("shutting down: %v", ctx.Err())
return
}
}
}

func (s StatementDistribution) processMessage(msg any) error {
func (s *StatementDistribution) processMessage(msg any) error {
switch msg := msg.(type) {
case statementedistributionmessages.Backed:
// TODO #4171
case statementedistributionmessages.Share:
// TODO #4170
// case statementedistributionmessages.NetworkBridgeUpdate
// TODO #4172 this above case would need to wait until network bridge receiver side is merged
case parachaintypes.ActiveLeavesUpdateSignal:
return s.ProcessActiveLeavesUpdateSignal(msg)
return s.processActiveLeavesUpdateSignal(msg)
case parachaintypes.BlockFinalizedSignal:
return s.ProcessBlockFinalizedSignal(msg)

return s.processBlockFinalizedSignal(msg)
default:
return parachaintypes.ErrUnknownOverseerMessage
}

return nil
}

func (s StatementDistribution) Name() parachaintypes.SubSystemName {
func (s *StatementDistribution) Name() parachaintypes.SubSystemName {
return parachaintypes.StatementDistribution
}

func (s StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error {
func (s *StatementDistribution) processActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error {
// TODO #4173
return nil
}

func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error {
func (s *StatementDistribution) processBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error {
// nothing to do here
return nil
}

func (s StatementDistribution) Stop() {}
func (s *StatementDistribution) Stop() {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package statementdistribution

// import (
// parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util"
// )

// func createStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) {
DanielDDHM marked this conversation as resolved.
Show resolved Hide resolved
// mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool {
// return rep.Type == parachainutil.Malicious
// })

// subSystemToOverseer := make(chan parachainutil.NetworkBridgeTxMessage, 10)

// return &StatementDistribution{
// reputationAggregator: mockAggregator,
// SubSystemToOverseer: subSystemToOverseer,
// }, subSystemToOverseer
// }

// func createChannels() (chan any, chan any, chan any, chan any) {
// return make(chan any, 10), make(chan any, 10), make(chan any, 10), make(chan any, 10)
// }
DanielDDHM marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions dot/parachain/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type UnifiedReputationChange struct {
Reason string
}

const ReputationChangeInterval = 30 * time.Second

// CostOrBenefit returns the cost or benefit of the reputation change.
func (u UnifiedReputationChange) CostOrBenefit() int32 {
switch u.Type {
Expand Down
Loading