Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parachain): send requests from subsystems #4475

Open
wants to merge 11 commits into
base: feat/parachain
Choose a base branch
from
2 changes: 1 addition & 1 deletion dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func (s *Service) SendMessage(to peer.ID, msg NotificationsMessage) error {
}

func (s *Service) GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *RequestResponseProtocol {
maxResponseSize uint64) RequestMaker {
haikoschol marked this conversation as resolved.
Show resolved Hide resolved

protocolID := s.host.protocolID + protocol.ID(subprotocol)
return &RequestResponseProtocol{
Expand Down
27 changes: 14 additions & 13 deletions dot/parachain/collator-protocol/mocks_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions dot/parachain/collator-protocol/validator_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ type Network interface {
maxSize uint64,
) error
GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *network.RequestResponseProtocol
maxResponseSize uint64) network.RequestMaker
}

type CollationEvent struct {
Expand All @@ -574,7 +574,7 @@ type CollatorProtocolValidatorSide struct {
SubSystemToOverseer chan<- any
unfetchedCollation chan UnfetchedCollation

collationFetchingReqResProtocol *network.RequestResponseProtocol
collationFetchingReqResProtocol network.RequestMaker

fetchedCollations []parachaintypes.Collation
// track all active collators and their data
Expand Down
2 changes: 1 addition & 1 deletion dot/parachain/network-bridge/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Network interface {
maxSize uint64,
) error
GetRequestResponseProtocol(subprotocol string, requestTimeout time.Duration,
maxResponseSize uint64) *network.RequestResponseProtocol
maxResponseSize uint64) network.RequestMaker
ReportPeer(change peerset.ReputationChange, p peer.ID)
DisconnectPeer(setID int, p peer.ID)
GetNetworkEventsChannel() chan *network.NetworkEventInfo
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
// Copyright 2023 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package parachain
package messages

import (
"fmt"

"github.com/ChainSafe/gossamer/dot/network"

parachaintypes "github.com/ChainSafe/gossamer/dot/parachain/types"
"github.com/ChainSafe/gossamer/pkg/scale"
)
Expand All @@ -24,6 +26,16 @@ func (c ChunkFetchingRequest) Encode() ([]byte, error) {
return scale.Marshal(c)
}

// Protocol returns the sub-protocol ID for this message
func (c ChunkFetchingRequest) Protocol() ReqProtocolName {
return ChunkFetchingV1
}

// Response returns an instance of the response type for this message, for the purpose of decoding into it.
func (c ChunkFetchingRequest) Response() network.ResponseMessage {
return &ChunkFetchingResponse{}
}

type ChunkFetchingResponseValues interface {
ChunkResponse | NoSuchChunk
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package parachain
package messages

import (
"testing"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package messages

import (
"github.com/ChainSafe/gossamer/dot/network"
"github.com/libp2p/go-libp2p/core/peer"
)

type ReqProtocolName uint

const (
ChunkFetchingV1 ReqProtocolName = iota
CollationFetchingV1
PoVFetchingV1
AvailableDataFetchingV1
StatementFetchingV1
DisputeSendingV1
)

func (n ReqProtocolName) String() string {
switch n {
case ChunkFetchingV1:
return "req_chunk/1"
case CollationFetchingV1:
return "req_collation/1"
case PoVFetchingV1:
return "req_pov/1"
case AvailableDataFetchingV1:
return "req_available_data/1"
case StatementFetchingV1:
return "req_statement/1"
case DisputeSendingV1:
return "send_dispute/1"
default:
panic("unknown protocol")
}
}

// ReqProtocolMessage is a network message that can be sent over a request response protocol.
type ReqProtocolMessage interface {
network.Message
// Response returns an instance of the response type for this message, for the purpose of decoding into it.
Response() network.ResponseMessage
Protocol() ReqProtocolName
}

// ReqRespResult is the result of sending a request over a request response protocol. It contains either a response
// message or an error.
type ReqRespResult struct {
Response network.ResponseMessage
Error error
}

// OutgoingRequest contains all data required to send a request over a request response protocol and receive the result.
type OutgoingRequest struct {
Recipient peer.ID // TODO use a type that can contain either a peer ID or an authority ID
Payload ReqProtocolMessage
Result chan ReqRespResult
}

func NewOutgoingRequest(recipient peer.ID, payload ReqProtocolMessage) *OutgoingRequest {
result := make(chan ReqRespResult, 1)

return &OutgoingRequest{
Recipient: recipient,
Payload: payload,
Result: result,
}
}
13 changes: 13 additions & 0 deletions dot/parachain/network-bridge/messages/tx_overseer_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,16 @@ type ConnectToValidators struct {
// authority discovery has Failed to resolve.
Failed chan<- uint
}

type IfDisconnectedBehavior int

const (
TryConnect IfDisconnectedBehavior = iota
ImmediateError // TODO not implemented
)

// SendRequests is a subsystem message for sending requests over a request response protocol.
type SendRequests struct {
Requests []*OutgoingRequest
IfDisconnected IfDisconnectedBehavior
}
35 changes: 34 additions & 1 deletion dot/parachain/network-bridge/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package networkbridge
import (
"context"
"fmt"
"time"

"github.com/ChainSafe/gossamer/dot/network"
networkbridgemessages "github.com/ChainSafe/gossamer/dot/parachain/network-bridge/messages"
Expand Down Expand Up @@ -92,7 +93,9 @@ func (nbs *NetworkBridgeSender) processMessage(msg any) error {
return fmt.Errorf("sending message: %w", err)
}
}
// TODO: add ConnectTOResolvedValidators, SendRequests
case networkbridgemessages.SendRequests:
nbs.sendRequests(msg.Requests, msg.IfDisconnected)
// TODO: add ConnectTOResolvedValidators
case networkbridgemessages.ConnectToValidators:
// TODO
case networkbridgemessages.ReportPeer:
Expand All @@ -104,3 +107,33 @@ func (nbs *NetworkBridgeSender) processMessage(msg any) error {

return nil
}

const requestTimeout = 200 * time.Millisecond // TODO is this reasonable?
haikoschol marked this conversation as resolved.
Show resolved Hide resolved

// PoV is probably the largest message and is currently set at 5MB, but will likely be increased to 10MB in the future.
// see: https://github.com/paritytech/polkadot-sdk/issues/5334
// Maybe message types should have a MaxSize() method instead of using the same value for all messages.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I would say you can attach to the protocol string the max response size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question then becomes what the values for requests other than PoVFetchingV1 should be. Would you be ok with deferring this change to a later PR, given that using smaller values here amounts to a potential performance optimization?

const maxResponseSize uint64 = 5 * 1024 * 1024

func (nbs *NetworkBridgeSender) sendRequests(
requests []*networkbridgemessages.OutgoingRequest,
ifDisconnected networkbridgemessages.IfDisconnectedBehavior, //nolint:unparam
) {
for _, request := range requests {
protoID := request.Payload.Protocol().String()
protocol := nbs.net.GetRequestResponseProtocol(protoID, requestTimeout, maxResponseSize)
response := request.Payload.Response()
result := networkbridgemessages.ReqRespResult{}

// TODO This should probably be done on a goroutine. Unclear how to deal with cancellation/shutdown though.
err := protocol.Do(request.Recipient, request.Payload, response)
if err != nil {
result.Error = err
} else {
result.Response = response
}

request.Result <- result
close(request.Result)
}
}
Loading
Loading