From 25771c5281a136bda009435b91e69285eac0bc29 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Thu, 12 Dec 2024 18:59:20 +0000 Subject: [PATCH 01/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution.go | 117 ++++++++++++++++-- 1 file changed, 107 insertions(+), 10 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index e9d01812db..1fc803d4d5 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -2,6 +2,7 @@ package statementdistribution import ( "context" + "fmt" "github.com/ChainSafe/gossamer/internal/log" @@ -14,21 +15,41 @@ var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { } -func (s StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { +// MuxedMessage represents a combined message with its source +type MuxedMessage struct { + Source string + Message any +} + +func (s StatementDistribution) Run( + ctx context.Context, + overseerToSubSystem <-chan any, + v1RequesterChannel <-chan any, + v1CommChannel <-chan any, + v2CommChannel <-chan any, + receiverRespCh <-chan any, + retryReqCh <-chan any, +) { + muxedChannel := FanIn( + ctx, + overseerToSubSystem, + v1RequesterChannel, + v1CommChannel, + v2CommChannel, + receiverRespCh, + retryReqCh, + ) + for { select { - case msg, ok := <-overseerToSubSystem: - if !ok { - return - } - err := s.processMessage(msg) + case muxedMsg := <-muxedChannel: + err := s.processMuxedMessage(muxedMsg) if err != nil { - logger.Errorf("processing overseer message: %w", err) + logger.Errorf("error processing muxed message: %w", err) } case <-ctx.Done(): - if err := ctx.Err(); err != nil { - logger.Errorf("ctx error: %v\n", err) - } + logger.Infof("shutting down: %v", ctx.Err()) + return } } } @@ -54,6 +75,33 @@ func (s StatementDistribution) processMessage(msg any) error { return nil } +func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error { + switch muxedMsg.Source { + case "SubsystemMsg": + // Use processMessage for messages from overseerToSubSystem + return s.processMessage(muxedMsg.Message) + case "V1Requester": + // Handle legacy V1Requester messages + return nil + case "V1Responder": + // Handle legacy V1Responder messages + return nil + case "V2Responder": + // Handle V2Responder messages + return nil + case "Receive_Response": + // Handle response messages + return nil + case "Retry_Request": + // Do nothing for retry requests + logger.Infof("received retry request, no action taken") + return nil + default: + logger.Warnf("unknown message source: %s", muxedMsg.Source) + return fmt.Errorf("unknown message source: %s", muxedMsg.Source) + } +} + func (s StatementDistribution) Name() parachaintypes.SubSystemName { return parachaintypes.StatementDistribution } @@ -69,3 +117,52 @@ func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes } func (s StatementDistribution) Stop() {} + +func FanIn( + ctx context.Context, + overseerChannel <-chan any, + v1RequesterChannel <-chan any, + v1CommChannel <-chan any, + v2CommChannel <-chan any, + receiverRespCh <-chan any, + retryReqCh <-chan any, +) <-chan MuxedMessage { + output := make(chan MuxedMessage) + + go func() { + defer close(output) + for { + select { + // On each case verify if the channel is open before send the message + case <-ctx.Done(): + return + case msg, ok := <-overseerChannel: + if ok { + output <- MuxedMessage{Source: "SubsystemMsg", Message: msg} + } + case msg, ok := <-v1RequesterChannel: + if ok { + output <- MuxedMessage{Source: "V1Requester", Message: msg} + } + case msg, ok := <-v1CommChannel: + if ok { + output <- MuxedMessage{Source: "V1Responder", Message: msg} + } + case msg, ok := <-v2CommChannel: + if ok { + output <- MuxedMessage{Source: "V2Responder", Message: msg} + } + case msg, ok := <-receiverRespCh: + if ok { + output <- MuxedMessage{Source: "Receive_Response", Message: msg} + } + case msg, ok := <-retryReqCh: + if ok { + output <- MuxedMessage{Source: "Retry_Request", Message: msg} + } + } + } + }() + + return output +} From dabff423a6614fde41e0ccabe489a8302b8a1a55 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Tue, 24 Dec 2024 09:54:53 +0000 Subject: [PATCH 02/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution.go | 113 ++++++------------ 1 file changed, 36 insertions(+), 77 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 1fc803d4d5..53032d01d1 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -3,6 +3,7 @@ package statementdistribution import ( "context" "fmt" + "time" "github.com/ChainSafe/gossamer/internal/log" @@ -24,29 +25,44 @@ type MuxedMessage struct { func (s StatementDistribution) Run( ctx context.Context, overseerToSubSystem <-chan any, - v1RequesterChannel <-chan any, - v1CommChannel <-chan any, v2CommChannel <-chan any, receiverRespCh <-chan any, retryReqCh <-chan any, ) { - muxedChannel := FanIn( - ctx, - overseerToSubSystem, - v1RequesterChannel, - v1CommChannel, - v2CommChannel, - receiverRespCh, - retryReqCh, - ) + // Timer for reputation aggregator trigger + ticker := time.NewTicker(1 * time.Minute) // Adjust the duration as needed + defer ticker.Stop() for { select { - case muxedMsg := <-muxedChannel: - err := s.processMuxedMessage(muxedMsg) - if err != nil { - logger.Errorf("error processing muxed message: %w", err) + case msg, ok := <-overseerToSubSystem: + if ok { + err := s.processMessage(msg) + if err != nil { + logger.Errorf("error processing overseer message: %v", err) + } + } + case msg, ok := <-v2CommChannel: + if ok { + err := s.processMuxedMessage(MuxedMessage{Source: "V2Responder", Message: msg}) + if err != nil { + logger.Errorf("error processing V2 responder message: %v", err) + } + } + case msg, ok := <-receiverRespCh: + if ok { + err := s.processMuxedMessage(MuxedMessage{Source: "Receive_Response", Message: msg}) + if err != nil { + logger.Errorf("error processing receiver response message: %v", err) + } + } + case _, ok := <-retryReqCh: + if ok { + logger.Infof("received retry request, no action taken") } + case <-ticker.C: + // Trigger reputation aggregator logic + s.triggerReputationAggregator() case <-ctx.Done(): logger.Infof("shutting down: %v", ctx.Err()) return @@ -55,37 +71,25 @@ func (s StatementDistribution) Run( } func (s StatementDistribution) processMessage(msg any) error { - switch msg := msg.(type) { case statementedistributionmessages.Backed: // TODO #4171 case statementedistributionmessages.Share: // TODO #4170 - // case statementedistributionmessages.NetworkBridgeUpdate - // TODO #4172 this above case would need to wait until network bridge receiver side is merged case parachaintypes.ActiveLeavesUpdateSignal: return s.ProcessActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: return s.ProcessBlockFinalizedSignal(msg) - default: return parachaintypes.ErrUnknownOverseerMessage } - return nil } func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error { switch muxedMsg.Source { case "SubsystemMsg": - // Use processMessage for messages from overseerToSubSystem return s.processMessage(muxedMsg.Message) - case "V1Requester": - // Handle legacy V1Requester messages - return nil - case "V1Responder": - // Handle legacy V1Responder messages - return nil case "V2Responder": // Handle V2Responder messages return nil @@ -93,7 +97,6 @@ func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error // Handle response messages return nil case "Retry_Request": - // Do nothing for retry requests logger.Infof("received retry request, no action taken") return nil default: @@ -116,53 +119,9 @@ func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes return nil } -func (s StatementDistribution) Stop() {} - -func FanIn( - ctx context.Context, - overseerChannel <-chan any, - v1RequesterChannel <-chan any, - v1CommChannel <-chan any, - v2CommChannel <-chan any, - receiverRespCh <-chan any, - retryReqCh <-chan any, -) <-chan MuxedMessage { - output := make(chan MuxedMessage) - - go func() { - defer close(output) - for { - select { - // On each case verify if the channel is open before send the message - case <-ctx.Done(): - return - case msg, ok := <-overseerChannel: - if ok { - output <- MuxedMessage{Source: "SubsystemMsg", Message: msg} - } - case msg, ok := <-v1RequesterChannel: - if ok { - output <- MuxedMessage{Source: "V1Requester", Message: msg} - } - case msg, ok := <-v1CommChannel: - if ok { - output <- MuxedMessage{Source: "V1Responder", Message: msg} - } - case msg, ok := <-v2CommChannel: - if ok { - output <- MuxedMessage{Source: "V2Responder", Message: msg} - } - case msg, ok := <-receiverRespCh: - if ok { - output <- MuxedMessage{Source: "Receive_Response", Message: msg} - } - case msg, ok := <-retryReqCh: - if ok { - output <- MuxedMessage{Source: "Retry_Request", Message: msg} - } - } - } - }() - - return output +func (s StatementDistribution) triggerReputationAggregator() { + // Implement the logic to send reputation changes + logger.Infof("triggering reputation aggregator logic") } + +func (s StatementDistribution) Stop() {} From 1bdca5029d8f6f3852e66aef532e3473bef39109 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Wed, 8 Jan 2025 19:36:57 +0000 Subject: [PATCH 03/14] feat(prospective-parachains) implements fanin operation --- .../statement_distribution.go | 88 +++++-------------- dot/parachain/util/util.go | 4 +- 2 files changed, 26 insertions(+), 66 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 53032d01d1..ba953165b2 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -2,27 +2,23 @@ package statementdistribution import ( "context" - "fmt" "time" "github.com/ChainSafe/gossamer/internal/log" statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" + parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" ) var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { + reputationAggregator parachainutil.ReputationAggregator + SubSystemToOverseer chan<- any } -// MuxedMessage represents a combined message with its source -type MuxedMessage struct { - Source string - Message any -} - -func (s StatementDistribution) Run( +func (s *StatementDistribution) Run( ctx context.Context, overseerToSubSystem <-chan any, v2CommChannel <-chan any, @@ -30,39 +26,25 @@ func (s StatementDistribution) Run( retryReqCh <-chan any, ) { // Timer for reputation aggregator trigger - ticker := time.NewTicker(1 * time.Minute) // Adjust the duration as needed - defer ticker.Stop() + reputationDelay := time.NewTicker(parachainutil.ReputationChangeInterval) // Adjust the duration as needed + defer reputationDelay.Stop() for { select { - case msg, ok := <-overseerToSubSystem: - if ok { - err := s.processMessage(msg) - if err != nil { - logger.Errorf("error processing overseer message: %v", err) - } - } - case msg, ok := <-v2CommChannel: - if ok { - err := s.processMuxedMessage(MuxedMessage{Source: "V2Responder", Message: msg}) - if err != nil { - logger.Errorf("error processing V2 responder message: %v", err) - } + case msg := <-overseerToSubSystem: + err := s.processMessage(msg) + if err != nil { + logger.Errorf("error processing overseer message: %v", err) } - case msg, ok := <-receiverRespCh: - if ok { - err := s.processMuxedMessage(MuxedMessage{Source: "Receive_Response", Message: msg}) - if err != nil { - logger.Errorf("error processing receiver response message: %v", err) - } - } - case _, ok := <-retryReqCh: - if ok { - logger.Infof("received retry request, no action taken") - } - case <-ticker.C: + case _ = <-v2CommChannel: + panic("Not Implemented") + case _ = <-receiverRespCh: + panic("Not Implemented") + case _ = <-retryReqCh: + logger.Infof("received retry request, no action taken") + case <-reputationDelay.C: // Trigger reputation aggregator logic - s.triggerReputationAggregator() + s.reputationAggregator.Send(s.SubSystemToOverseer) case <-ctx.Done(): logger.Infof("shutting down: %v", ctx.Err()) return @@ -70,7 +52,7 @@ func (s StatementDistribution) Run( } } -func (s StatementDistribution) processMessage(msg any) error { +func (s *StatementDistribution) processMessage(msg any) error { switch msg := msg.(type) { case statementedistributionmessages.Backed: // TODO #4171 @@ -86,42 +68,18 @@ func (s StatementDistribution) processMessage(msg any) error { return nil } -func (s StatementDistribution) processMuxedMessage(muxedMsg MuxedMessage) error { - switch muxedMsg.Source { - case "SubsystemMsg": - return s.processMessage(muxedMsg.Message) - case "V2Responder": - // Handle V2Responder messages - return nil - case "Receive_Response": - // Handle response messages - return nil - case "Retry_Request": - logger.Infof("received retry request, no action taken") - return nil - default: - logger.Warnf("unknown message source: %s", muxedMsg.Source) - return fmt.Errorf("unknown message source: %s", muxedMsg.Source) - } -} - -func (s StatementDistribution) Name() parachaintypes.SubSystemName { +func (s *StatementDistribution) Name() parachaintypes.SubSystemName { return parachaintypes.StatementDistribution } -func (s StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { +func (s *StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { // TODO #4173 return nil } -func (s StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { +func (s *StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { // nothing to do here return nil } -func (s StatementDistribution) triggerReputationAggregator() { - // Implement the logic to send reputation changes - logger.Infof("triggering reputation aggregator logic") -} - -func (s StatementDistribution) Stop() {} +func (s *StatementDistribution) Stop() {} diff --git a/dot/parachain/util/util.go b/dot/parachain/util/util.go index 82afe41a3f..706f55d73f 100644 --- a/dot/parachain/util/util.go +++ b/dot/parachain/util/util.go @@ -64,6 +64,8 @@ type UnifiedReputationChange struct { Reason string } +const ReputationChangeInterval = 30 * time.Second + // CostOrBenefit returns the cost or benefit of the reputation change. func (u UnifiedReputationChange) CostOrBenefit() int32 { switch u.Type { @@ -106,7 +108,7 @@ func NewReputationAggregator(sendImmediatelyIf func(rep UnifiedReputationChange) } // Send sends the accumulated reputation changes in a batch and clears the state. -func (r *ReputationAggregator) Send(overseerCh chan<- NetworkBridgeTxMessage) { +func (r *ReputationAggregator) Send(overseerCh chan<- any) { r.mu.Lock() defer r.mu.Unlock() From a67abbaa60fd40c046ef990f5f481d06ebaad0c8 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Wed, 8 Jan 2025 19:46:27 +0000 Subject: [PATCH 04/14] feat(prospective-parachains) implements fanin operation --- .../statement-distribution/statement_distribution.go | 12 ++++++------ dot/parachain/util/util.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index ba953165b2..d4747efff2 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -15,7 +15,7 @@ var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { reputationAggregator parachainutil.ReputationAggregator - SubSystemToOverseer chan<- any + SubSystemToOverseer chan<- parachainutil.NetworkBridgeTxMessage } func (s *StatementDistribution) Run( @@ -36,11 +36,11 @@ func (s *StatementDistribution) Run( if err != nil { logger.Errorf("error processing overseer message: %v", err) } - case _ = <-v2CommChannel: - panic("Not Implemented") - case _ = <-receiverRespCh: - panic("Not Implemented") - case _ = <-retryReqCh: + // case _ = <-v2CommChannel: + // panic("Not Implemented") + // case _ = <-receiverRespCh: + // panic("Not Implemented") + // case _ = <-retryReqCh: logger.Infof("received retry request, no action taken") case <-reputationDelay.C: // Trigger reputation aggregator logic diff --git a/dot/parachain/util/util.go b/dot/parachain/util/util.go index 706f55d73f..dec86af337 100644 --- a/dot/parachain/util/util.go +++ b/dot/parachain/util/util.go @@ -108,7 +108,7 @@ func NewReputationAggregator(sendImmediatelyIf func(rep UnifiedReputationChange) } // Send sends the accumulated reputation changes in a batch and clears the state. -func (r *ReputationAggregator) Send(overseerCh chan<- any) { +func (r *ReputationAggregator) Send(overseerCh chan<- NetworkBridgeTxMessage) { r.mu.Lock() defer r.mu.Unlock() From 4de9b323e262c17b646d3d2e489058e7b02f9d95 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Thu, 9 Jan 2025 08:52:10 +0000 Subject: [PATCH 05/14] feat(prospective-parachains) implements fanin operation --- .../statement_distribution_test.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 dot/parachain/statement-distribution/statement_distribution_test.go diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go new file mode 100644 index 0000000000..048cd73609 --- /dev/null +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -0,0 +1,28 @@ +package statementdistribution + +import ( + "context" + + parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" +) + +func CreateStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) { + mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool { + return rep.Type == parachainutil.Malicious + }) + + subSystemToOverseer := make(chan parachainutil.NetworkBridgeTxMessage, 10) + + return &StatementDistribution{ + reputationAggregator: *mockAggregator, + SubSystemToOverseer: subSystemToOverseer, + }, subSystemToOverseer +} + +func CreateChannels() (chan any, chan any, chan any, chan any) { + return make(chan any, 10), make(chan any, 10), make(chan any, 10), make(chan any, 10) +} + +func CreateContext() (context.Context, context.CancelFunc) { + return context.WithCancel(context.Background()) +} From efbb2c45897a0474a84d6944e86f0bcd203dea23 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Thu, 9 Jan 2025 10:27:13 +0000 Subject: [PATCH 06/14] feat(prospective-parachains) implements fanin operation --- dot/parachain/statement-distribution/statement_distribution.go | 2 +- .../statement-distribution/statement_distribution_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index d4747efff2..2fd187815a 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -14,7 +14,7 @@ import ( var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { - reputationAggregator parachainutil.ReputationAggregator + reputationAggregator *parachainutil.ReputationAggregator SubSystemToOverseer chan<- parachainutil.NetworkBridgeTxMessage } diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 048cd73609..93b1a37ede 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -14,7 +14,7 @@ func CreateStatementDistribution() (*StatementDistribution, chan parachainutil.N subSystemToOverseer := make(chan parachainutil.NetworkBridgeTxMessage, 10) return &StatementDistribution{ - reputationAggregator: *mockAggregator, + reputationAggregator: mockAggregator, SubSystemToOverseer: subSystemToOverseer, }, subSystemToOverseer } From 959624f840fcdb3f942903128faae3f5618dd6d8 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 15:04:35 +0000 Subject: [PATCH 07/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement-distribution/statement_distribution.go | 8 ++++---- .../statement_distribution_test.go | 10 ++-------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 2fd187815a..6acbbeb34f 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -59,9 +59,9 @@ func (s *StatementDistribution) processMessage(msg any) error { case statementedistributionmessages.Share: // TODO #4170 case parachaintypes.ActiveLeavesUpdateSignal: - return s.ProcessActiveLeavesUpdateSignal(msg) + return s.processActiveLeavesUpdateSignal(msg) case parachaintypes.BlockFinalizedSignal: - return s.ProcessBlockFinalizedSignal(msg) + return s.processBlockFinalizedSignal(msg) default: return parachaintypes.ErrUnknownOverseerMessage } @@ -72,12 +72,12 @@ func (s *StatementDistribution) Name() parachaintypes.SubSystemName { return parachaintypes.StatementDistribution } -func (s *StatementDistribution) ProcessActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { +func (s *StatementDistribution) processActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { // TODO #4173 return nil } -func (s *StatementDistribution) ProcessBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { +func (s *StatementDistribution) processBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { // nothing to do here return nil } diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 93b1a37ede..e6507b8e26 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -1,12 +1,10 @@ package statementdistribution import ( - "context" - parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" ) -func CreateStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) { +func createStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) { mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool { return rep.Type == parachainutil.Malicious }) @@ -19,10 +17,6 @@ func CreateStatementDistribution() (*StatementDistribution, chan parachainutil.N }, subSystemToOverseer } -func CreateChannels() (chan any, chan any, chan any, chan any) { +func createChannels() (chan any, chan any, chan any, chan any) { return make(chan any, 10), make(chan any, 10), make(chan any, 10), make(chan any, 10) } - -func CreateContext() (context.Context, context.CancelFunc) { - return context.WithCancel(context.Background()) -} From 22477cdb8f27f6bde7d4bbcbfd29e0115fdd9fd3 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 15:19:28 +0000 Subject: [PATCH 08/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution_test.go | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index e6507b8e26..0924d58f85 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -1,22 +1,22 @@ package statementdistribution -import ( - parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" -) +// import ( +// parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" +// ) -func createStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) { - mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool { - return rep.Type == parachainutil.Malicious - }) +// func createStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) { +// mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool { +// return rep.Type == parachainutil.Malicious +// }) - subSystemToOverseer := make(chan parachainutil.NetworkBridgeTxMessage, 10) +// subSystemToOverseer := make(chan parachainutil.NetworkBridgeTxMessage, 10) - return &StatementDistribution{ - reputationAggregator: mockAggregator, - SubSystemToOverseer: subSystemToOverseer, - }, subSystemToOverseer -} +// return &StatementDistribution{ +// reputationAggregator: mockAggregator, +// SubSystemToOverseer: subSystemToOverseer, +// }, subSystemToOverseer +// } -func createChannels() (chan any, chan any, chan any, chan any) { - return make(chan any, 10), make(chan any, 10), make(chan any, 10), make(chan any, 10) -} +// func createChannels() (chan any, chan any, chan any, chan any) { +// return make(chan any, 10), make(chan any, 10), make(chan any, 10), make(chan any, 10) +// } From 9d17d2e38b3252fb1f9ea108726893412b7cdac5 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 15:40:03 +0000 Subject: [PATCH 09/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution.go | 101 ++++++++---------- 1 file changed, 42 insertions(+), 59 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 6acbbeb34f..83c27f5973 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -2,84 +2,67 @@ package statementdistribution import ( "context" + "fmt" "time" - "github.com/ChainSafe/gossamer/internal/log" - - statementedistributionmessages "github.com/ChainSafe/gossamer/dot/parachain/statement-distribution/messages" - parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types" parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" + "github.com/ChainSafe/gossamer/internal/log" ) var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { - reputationAggregator *parachainutil.ReputationAggregator - SubSystemToOverseer chan<- parachainutil.NetworkBridgeTxMessage + SubSystemToOverseer chan<- parachainutil.NetworkBridgeTxMessage +} + +type MuxedMessage interface { + isMuxedMessage() +} + +type overseerMessage struct { + inner any +} + +func (*overseerMessage) isMuxedMessage() {} + +type responderMessage struct { + inner any // should be replaced with AttestedCandidateRequest type } -func (s *StatementDistribution) Run( - ctx context.Context, - overseerToSubSystem <-chan any, - v2CommChannel <-chan any, - receiverRespCh <-chan any, - retryReqCh <-chan any, -) { +func (*responderMessage) isMuxedMessage() {} + +// Run just receives the ctx and a channel from the overseer to subsystem +func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { + // Inside the method Run, we spawn a goroutine to handle network incoming requests + // TODO: https://github.com/ChainSafe/gossamer/issues/4285 + responderCh := make(chan any, 1) + go s.taskResponder(responderCh) + // Timer for reputation aggregator trigger reputationDelay := time.NewTicker(parachainutil.ReputationChangeInterval) // Adjust the duration as needed defer reputationDelay.Stop() for { - select { - case msg := <-overseerToSubSystem: - err := s.processMessage(msg) - if err != nil { - logger.Errorf("error processing overseer message: %v", err) - } - // case _ = <-v2CommChannel: - // panic("Not Implemented") - // case _ = <-receiverRespCh: - // panic("Not Implemented") - // case _ = <-retryReqCh: - logger.Infof("received retry request, no action taken") - case <-reputationDelay.C: - // Trigger reputation aggregator logic - s.reputationAggregator.Send(s.SubSystemToOverseer) - case <-ctx.Done(): - logger.Infof("shutting down: %v", ctx.Err()) - return - } - } -} + message := s.awaitMessageFrom(overseerToSubSystem, responderCh) -func (s *StatementDistribution) processMessage(msg any) error { - switch msg := msg.(type) { - case statementedistributionmessages.Backed: - // TODO #4171 - case statementedistributionmessages.Share: - // TODO #4170 - case parachaintypes.ActiveLeavesUpdateSignal: - return s.processActiveLeavesUpdateSignal(msg) - case parachaintypes.BlockFinalizedSignal: - return s.processBlockFinalizedSignal(msg) - default: - return parachaintypes.ErrUnknownOverseerMessage + switch innerMessage := message.(type) { + // Handle each muxed message type + default: + logger.Warn("Unhandled message type: " + fmt.Sprintf("%v", innerMessage)) + } } - return nil -} - -func (s *StatementDistribution) Name() parachaintypes.SubSystemName { - return parachaintypes.StatementDistribution } -func (s *StatementDistribution) processActiveLeavesUpdateSignal(signal parachaintypes.ActiveLeavesUpdateSignal) error { - // TODO #4173 - return nil +func (s *StatementDistribution) taskResponder(responderCh chan any) { + // TODO: Implement taskResponder logic } -func (s *StatementDistribution) processBlockFinalizedSignal(signal parachaintypes.BlockFinalizedSignal) error { - // nothing to do here - return nil +// awaitMessageFrom waits for messages from either the overseerToSubSystem or responderCh +func (s *StatementDistribution) awaitMessageFrom(overseerToSubSystem <-chan any, responderCh chan any) MuxedMessage { + select { + case msg := <-overseerToSubSystem: + return &overseerMessage{inner: msg} + case msg := <-responderCh: + return &responderMessage{inner: msg} + } } - -func (s *StatementDistribution) Stop() {} From 4e19d34de5fb9ed224e596cd8ca3597eae8b65b4 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 15:52:14 +0000 Subject: [PATCH 10/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement-distribution/statement_distribution.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 83c27f5973..8e2e34480a 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -36,7 +36,7 @@ func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-c // Inside the method Run, we spawn a goroutine to handle network incoming requests // TODO: https://github.com/ChainSafe/gossamer/issues/4285 responderCh := make(chan any, 1) - go s.taskResponder(responderCh) + go taskResponder(responderCh) // Timer for reputation aggregator trigger reputationDelay := time.NewTicker(parachainutil.ReputationChangeInterval) // Adjust the duration as needed @@ -53,9 +53,7 @@ func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-c } } -func (s *StatementDistribution) taskResponder(responderCh chan any) { - // TODO: Implement taskResponder logic -} +func taskResponder(responderCh chan any) {} // awaitMessageFrom waits for messages from either the overseerToSubSystem or responderCh func (s *StatementDistribution) awaitMessageFrom(overseerToSubSystem <-chan any, responderCh chan any) MuxedMessage { From 64677117a8a8ef70134d8864e317f0e0162c6307 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 15:56:46 +0000 Subject: [PATCH 11/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 8e2e34480a..162a4225aa 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -31,6 +31,10 @@ type responderMessage struct { func (*responderMessage) isMuxedMessage() {} +type reputationChangeMessage struct{} + +func (*reputationChangeMessage) isMuxedMessage() {} + // Run just receives the ctx and a channel from the overseer to subsystem func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-chan any) { // Inside the method Run, we spawn a goroutine to handle network incoming requests @@ -43,10 +47,11 @@ func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-c defer reputationDelay.Stop() for { - message := s.awaitMessageFrom(overseerToSubSystem, responderCh) + message := s.awaitMessageFrom(overseerToSubSystem, responderCh, reputationDelay.C) switch innerMessage := message.(type) { - // Handle each muxed message type + case *reputationChangeMessage: + logger.Info("Reputation change triggered.") default: logger.Warn("Unhandled message type: " + fmt.Sprintf("%v", innerMessage)) } @@ -55,12 +60,18 @@ func (s *StatementDistribution) Run(ctx context.Context, overseerToSubSystem <-c func taskResponder(responderCh chan any) {} -// awaitMessageFrom waits for messages from either the overseerToSubSystem or responderCh -func (s *StatementDistribution) awaitMessageFrom(overseerToSubSystem <-chan any, responderCh chan any) MuxedMessage { +// awaitMessageFrom waits for messages from either the overseerToSubSystem, responderCh, or reputationDelay +func (s *StatementDistribution) awaitMessageFrom( + overseerToSubSystem <-chan any, + responderCh chan any, + reputationDelay <-chan time.Time, +) MuxedMessage { select { case msg := <-overseerToSubSystem: return &overseerMessage{inner: msg} case msg := <-responderCh: return &responderMessage{inner: msg} + case <-reputationDelay: + return &reputationChangeMessage{} } } From 1ab86923fa8f70526f0d8885be5600ea76da2a87 Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 15:58:54 +0000 Subject: [PATCH 12/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- dot/parachain/statement-distribution/statement_distribution.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/parachain/statement-distribution/statement_distribution.go b/dot/parachain/statement-distribution/statement_distribution.go index 162a4225aa..7cde0429ba 100644 --- a/dot/parachain/statement-distribution/statement_distribution.go +++ b/dot/parachain/statement-distribution/statement_distribution.go @@ -12,7 +12,7 @@ import ( var logger = log.NewFromGlobal(log.AddContext("pkg", "statement-distribution")) type StatementDistribution struct { - SubSystemToOverseer chan<- parachainutil.NetworkBridgeTxMessage + SubSystemToOverseer chan<- any } type MuxedMessage interface { From 71e3b8486e881d0a498385a8c21e1f8ae1d5041b Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 16:01:00 +0000 Subject: [PATCH 13/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement-distribution/statement_distribution_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 0924d58f85..49e4ca2e46 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -4,19 +4,15 @@ package statementdistribution // parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" // ) -// func createStatementDistribution() (*StatementDistribution, chan parachainutil.NetworkBridgeTxMessage) { +// func createStatementDistribution() (*StatementDistribution, chan any) { // mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool { // return rep.Type == parachainutil.Malicious // }) -// subSystemToOverseer := make(chan parachainutil.NetworkBridgeTxMessage, 10) +// subSystemToOverseer := make(chan any, 10) // return &StatementDistribution{ // reputationAggregator: mockAggregator, // SubSystemToOverseer: subSystemToOverseer, // }, subSystemToOverseer // } - -// func createChannels() (chan any, chan any, chan any, chan any) { -// return make(chan any, 10), make(chan any, 10), make(chan any, 10), make(chan any, 10) -// } From 35547c4204547f4c530166b2429893431523fe2c Mon Sep 17 00:00:00 2001 From: DanielDDHM Date: Fri, 10 Jan 2025 16:50:49 +0000 Subject: [PATCH 14/14] feat(statement-distribution):Implement FanIn approach to statement distribution --- .../statement_distribution_test.go | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/dot/parachain/statement-distribution/statement_distribution_test.go b/dot/parachain/statement-distribution/statement_distribution_test.go index 49e4ca2e46..682de9cf51 100644 --- a/dot/parachain/statement-distribution/statement_distribution_test.go +++ b/dot/parachain/statement-distribution/statement_distribution_test.go @@ -1,18 +1,11 @@ package statementdistribution -// import ( -// parachainutil "github.com/ChainSafe/gossamer/dot/parachain/util" -// ) +// //nolint +func createStatementDistribution() (*StatementDistribution, chan any) { -// func createStatementDistribution() (*StatementDistribution, chan any) { -// mockAggregator := parachainutil.NewReputationAggregator(func(rep parachainutil.UnifiedReputationChange) bool { -// return rep.Type == parachainutil.Malicious -// }) + subSystemToOverseer := make(chan any, 10) -// subSystemToOverseer := make(chan any, 10) - -// return &StatementDistribution{ -// reputationAggregator: mockAggregator, -// SubSystemToOverseer: subSystemToOverseer, -// }, subSystemToOverseer -// } + return &StatementDistribution{ + SubSystemToOverseer: subSystemToOverseer, + }, subSystemToOverseer +}