Skip to content

Commit

Permalink
Allow signaling through data channel
Browse files Browse the repository at this point in the history
  • Loading branch information
streamer45 committed Sep 23, 2024
1 parent c9ed0e1 commit 4301ac1
Show file tree
Hide file tree
Showing 11 changed files with 446 additions and 315 deletions.
410 changes: 210 additions & 200 deletions client/api_test.go

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions client/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

func (c *Client) joinCall() error {
if err := c.SendWS(wsEventJoin, CallJoinMessage{
ChannelID: c.cfg.ChannelID,
JobID: c.cfg.JobID,
AV1Support: c.cfg.EnableAV1,
ChannelID: c.cfg.ChannelID,
JobID: c.cfg.JobID,
AV1Support: c.cfg.EnableAV1,
DCSignaling: c.cfg.EnableDCSignaling,
}, false); err != nil {
return fmt.Errorf("failed to send ws msg: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Config struct {
// EnableAV1 controls whether the client should advertise support
// for receiving the AV1 codec.
EnableAV1 bool
// EnableDCSignaling controls whether the client should use data channels
// for signaling of media tracks.
EnableDCSignaling bool

wsURL string
}
Expand Down
178 changes: 119 additions & 59 deletions client/rtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,62 +86,86 @@ func (c *Client) handleWSEventSignal(evData map[string]any) error {
return fmt.Errorf("invalid SDP data received")
}

c.log.Debug("received sdp offer", slog.Any("sdp", sdp))

if err := c.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("failed to set remote description: %w", err)
}

answer, err := c.pc.CreateAnswer(nil)
if err != nil {
return fmt.Errorf("failed to create answer: %w", err)
}

if err := c.pc.SetLocalDescription(answer); err != nil {
return fmt.Errorf("failed to set local description: %w", err)
}

var sdpData bytes.Buffer
w := zlib.NewWriter(&sdpData)
if err := json.NewEncoder(w).Encode(answer); err != nil {
w.Close()
return fmt.Errorf("failed to encode answer: %w", err)
}
w.Close()
return c.SendWS(wsEventSDP, map[string]any{
"data": sdpData.Bytes(),
}, true)
return c.handleOffer(sdp)
case signalMsgAnswer:
sdp, ok := msg["sdp"].(string)
if !ok {
return fmt.Errorf("invalid SDP data received")
}

c.log.Debug("received sdp answer", slog.Any("sdp", sdp))
return c.handleAnswer(sdp)
default:
return fmt.Errorf("invalid signaling msg type %s", msgType)
}

if err := c.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("failed to set remote description: %w", err)
}
return nil
}

for i := 0; i < len(c.iceCh); i++ {
c.log.Debug("adding queued remote candidate")
if err := c.pc.AddICECandidate(<-c.iceCh); err != nil {
return fmt.Errorf("failed to add remote candidate: %w", err)
}
func (c *Client) handleAnswer(sdp string) error {
c.log.Debug("received sdp answer", slog.Any("sdp", sdp))

if err := c.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("failed to set remote description: %w", err)
}

for i := 0; i < len(c.iceCh); i++ {
c.log.Debug("adding queued remote candidate")
if err := c.pc.AddICECandidate(<-c.iceCh); err != nil {
return fmt.Errorf("failed to add remote candidate: %w", err)
}
default:
return fmt.Errorf("invalid signaling msg type %s", msgType)
}

return nil
}

func (c *Client) handleOffer(sdp string) error {
c.log.Debug("received sdp offer", slog.Any("sdp", sdp))

if err := c.pc.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("failed to set remote description: %w", err)
}

answer, err := c.pc.CreateAnswer(nil)
if err != nil {
return fmt.Errorf("failed to create answer: %w", err)
}

if err := c.pc.SetLocalDescription(answer); err != nil {
return fmt.Errorf("failed to set local description: %w", err)
}

if c.cfg.EnableDCSignaling && c.dc.ReadyState() == webrtc.DataChannelStateOpen {
c.log.Debug("sending answer through dc")
data, err := json.Marshal(answer)
if err != nil {
return fmt.Errorf("failed to encode answer: %w", err)
}
return c.dc.SendText(string(data))
}

if c.cfg.EnableDCSignaling {
c.log.Debug("dc not connected, sending answer through ws")
}

var sdpData bytes.Buffer
w := zlib.NewWriter(&sdpData)
if err := json.NewEncoder(w).Encode(answer); err != nil {
w.Close()
return fmt.Errorf("failed to encode answer: %w", err)
}
w.Close()

return c.SendWS(wsEventSDP, map[string]any{
"data": sdpData.Bytes(),
}, true)
}

func (c *Client) initRTCSession() error {
cfg := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{}, // TODO: consider loading ICE servers from config
Expand Down Expand Up @@ -174,6 +198,12 @@ func (c *Client) initRTCSession() error {
c.pc = pc
c.mut.Unlock()

dc, err := pc.CreateDataChannel("calls-dc", nil)
if err != nil {
return fmt.Errorf("failed to create data channel: %w", err)
}
c.dc = dc

pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
if candidate == nil {
c.log.Debug("local ICE gathering completed")
Expand Down Expand Up @@ -294,28 +324,58 @@ func (c *Client) initRTCSession() error {
return
}

var sdpData bytes.Buffer
w := zlib.NewWriter(&sdpData)
if err := json.NewEncoder(w).Encode(offer); err != nil {
if c.cfg.EnableDCSignaling && c.dc.ReadyState() == webrtc.DataChannelStateOpen {
c.log.Debug("sending offer through dc")
data, err := json.Marshal(offer)
if err != nil {
c.log.Error("failed to encode offer", slog.String("err", err.Error()))
return
}
if err := c.dc.SendText(string(data)); err != nil {
c.log.Error("failed to send on dc", slog.String("err", err.Error()))
}
} else {
if c.cfg.EnableDCSignaling {
c.log.Debug("dc not connected, sending offer through ws")
}

var sdpData bytes.Buffer
w := zlib.NewWriter(&sdpData)
if err := json.NewEncoder(w).Encode(offer); err != nil {
w.Close()
c.log.Error("failed to encode offer", slog.String("err", err.Error()))
return
}
w.Close()
c.log.Error("failed to encode offer", slog.String("err", err.Error()))
return
err = c.SendWS(wsEventSDP, map[string]any{
"data": sdpData.Bytes(),
}, true)
if err != nil {
c.log.Error("failed to send ws msg", slog.String("err", err.Error()))
return
}
}
w.Close()
err = c.SendWS(wsEventSDP, map[string]any{
"data": sdpData.Bytes(),
}, true)
if err != nil {
c.log.Error("failed to send ws msg", slog.String("err", err.Error()))
})

dc.OnMessage(func(msg webrtc.DataChannelMessage) {
var sdp webrtc.SessionDescription
if err := json.Unmarshal(msg.Data, &sdp); err != nil {
c.log.Error("failed to unmarshal sdp", slog.String("err", err.Error()))
return
}
})

dc, err := pc.CreateDataChannel("calls-dc", nil)
if err != nil {
return fmt.Errorf("failed to create data channel: %w", err)
}
c.dc = dc
c.log.Debug("received sdp through DC", slog.String("sdp", sdp.SDP))

if sdp.Type == webrtc.SDPTypeOffer {
if err := c.handleOffer(sdp.SDP); err != nil {
c.log.Error("failed to offer", slog.String("err", err.Error()))
}
} else if sdp.Type == webrtc.SDPTypeAnswer {
if err := c.handleAnswer(sdp.SDP); err != nil {
c.log.Error("failed to answer", slog.String("err", err.Error()))
}
}
})

return nil
}
7 changes: 4 additions & 3 deletions client/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ package client
const pluginID = "com.mattermost.calls"

type CallJoinMessage struct {
ChannelID string `json:"channelID"`
JobID string `json:"jobID"`
AV1Support bool `json:"av1Support"`
ChannelID string `json:"channelID"`
JobID string `json:"jobID"`
AV1Support bool `json:"av1Support"`
DCSignaling bool `json:"dcSignaling"`
}

type CallReconnectMessage struct {
Expand Down
3 changes: 2 additions & 1 deletion service/rtc/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ func (c *call) addSession(cfg SessionConfig, rtcConn *webrtc.PeerConnection, clo
cfg: cfg,
rtcConn: rtcConn,
iceInCh: make(chan []byte, signalChSize*2),
sdpOfferInCh: make(chan webrtc.SessionDescription, signalChSize),
sdpOfferInCh: make(chan offerMessage, signalChSize),
sdpAnswerInCh: make(chan webrtc.SessionDescription, signalChSize),
dcSDPCh: make(chan Message, signalChSize),
closeCh: make(chan struct{}),
closeCb: closeCb,
doneCh: make(chan struct{}),
Expand Down
10 changes: 8 additions & 2 deletions service/rtc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (p SessionProps) AV1Support() bool {
return val
}

func (p SessionProps) DCSignaling() bool {
val, _ := p["dcSignaling"].(bool)
return val
}

func (c SessionConfig) IsValid() error {
if c.GroupID == "" {
return fmt.Errorf("invalid GroupID value: should not be empty")
Expand Down Expand Up @@ -123,8 +128,9 @@ func (c *SessionConfig) FromMap(m map[string]any) error {
c.UserID, _ = m["userID"].(string)
c.SessionID, _ = m["sessionID"].(string)
c.Props = SessionProps{
"channelID": m["channelID"],
"av1Support": m["av1Support"],
"channelID": m["channelID"],
"av1Support": m["av1Support"],
"dcSignaling": m["dcSignaling"],
}

return nil
Expand Down
23 changes: 13 additions & 10 deletions service/rtc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,21 +344,23 @@ func TestSessionConfigFromMap(t *testing.T) {
UserID: "userID",
CallID: "callID",
Props: SessionProps{
"channelID": nil,
"av1Support": nil,
"channelID": nil,
"av1Support": nil,
"dcSignaling": nil,
},
}, cfg)
})

t.Run("complete", func(t *testing.T) {
var cfg SessionConfig
err := cfg.FromMap(map[string]any{
"callID": "callID",
"sessionID": "sessionID",
"groupID": "groupID",
"userID": "userID",
"channelID": "channelID",
"av1Support": true,
"callID": "callID",
"sessionID": "sessionID",
"groupID": "groupID",
"userID": "userID",
"channelID": "channelID",
"av1Support": true,
"dcSignaling": true,
})
require.NoError(t, err)
require.NoError(t, cfg.IsValid())
Expand All @@ -368,8 +370,9 @@ func TestSessionConfigFromMap(t *testing.T) {
UserID: "userID",
CallID: "callID",
Props: SessionProps{
"channelID": "channelID",
"av1Support": true,
"channelID": "channelID",
"av1Support": true,
"dcSignaling": true,
},
}, cfg)
})
Expand Down
Loading

0 comments on commit 4301ac1

Please sign in to comment.