Skip to content

Commit

Permalink
fix: race that causes LocalParticipant to show up as RemoteParticipant (
Browse files Browse the repository at this point in the history
#619)

* fix: race that causes LocalParticipant to show up as RemoteParticipant

RTCEngine.JoinContext starts SignalClient's dispatch loop. If there's an
update to the local participant (in the form of ParticipantUpdate) before
PeerConnection is established, then we would incorrectly categorize the
LocalParticipant as a RemoteParticipant.

* actually call it

* fix staticcheck
  • Loading branch information
davidzhao authored Feb 25, 2025
1 parent 1477da5 commit cf06c23
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 23 deletions.
5 changes: 5 additions & 0 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type RTCEngine struct {
OnResuming func()
OnResumed func()
OnTranscription func(*livekit.Transcription)
OnSignalClientConnected func(*livekit.JoinResponse)

// callbacks to get data
CbGetLocalParticipantSID func() string
Expand Down Expand Up @@ -160,6 +161,10 @@ func (e *RTCEngine) JoinContext(ctx context.Context, url string, token string, p
return nil, err
}

if e.OnSignalClientConnected != nil {
e.OnSignalClientConnected(res)
}

e.client.Start()

// send offer
Expand Down
9 changes: 7 additions & 2 deletions regionurlprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,19 @@ func (r *regionURLProvider) RefreshRegionSettings(cloudHostname, token string) e
return errors.New("refreshRegionSettings failed to decode region settings: " + err.Error())
}

r.mutex.Lock()
r.hostnameSettingsCache[cloudHostname] = &hostnameSettingsCacheItem{
item := &hostnameSettingsCacheItem{
regionSettings: regions,
updatedAt: time.Now(),
regionURLAttempts: map[string]int{},
}
r.mutex.Lock()
r.hostnameSettingsCache[cloudHostname] = item
r.mutex.Unlock()

if len(item.regionSettings.Regions) == 0 {
logger.Warnw("no regions returned", nil, "cloudHostname", cloudHostname)
}

return nil
}

Expand Down
46 changes: 25 additions & 21 deletions room.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func NewRoom(callback *RoomCallback) *Room {
r.LocalParticipant = newLocalParticipant(engine, r.callback)

// callbacks from engine
engine.OnSignalClientConnected = r.handleSignalClientConnected
engine.OnMediaTrack = r.handleMediaTrack
engine.OnDisconnected = r.handleDisconnect
engine.OnParticipantUpdate = r.handleParticipantUpdate
Expand Down Expand Up @@ -328,6 +329,7 @@ func (r *Room) JoinWithToken(url, token string, opts ...ConnectOption) error {
d := time.Duration(1<<min(tries, 6)) * time.Second // max 64 seconds
logger.Errorw("failed to join room", err,
"retrying in", d,
"url", bestURL,
)
time.Sleep(d)
continue
Expand All @@ -338,32 +340,12 @@ func (r *Room) JoinWithToken(url, token string, opts ...ConnectOption) error {

if joinRes == nil {
var err error
joinRes, err = r.engine.JoinContext(ctx, url, token, params)
_, err = r.engine.JoinContext(ctx, url, token, params)
if err != nil {
return err
}
}

r.lock.Lock()
r.name = joinRes.Room.Name
r.metadata = joinRes.Room.Metadata
r.serverInfo = joinRes.ServerInfo
r.connectionState = ConnectionStateConnected
r.sifTrailer = make([]byte, len(joinRes.SifTrailer))
copy(r.sifTrailer, joinRes.SifTrailer)
r.lock.Unlock()

r.setSid(joinRes.Room.Sid, false)

r.LocalParticipant.updateInfo(joinRes.Participant)
r.LocalParticipant.updateSubscriptionPermission()

for _, pi := range joinRes.OtherParticipants {
rp := r.addRemoteParticipant(pi, true)
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
r.runParticipantDefers(livekit.ParticipantID(pi.Sid), rp)
}

return nil
}

Expand Down Expand Up @@ -552,6 +534,28 @@ func (r *Room) handleMediaTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPR
r.runParticipantDefers(livekit.ParticipantID(participantID), rp)
}

func (r *Room) handleSignalClientConnected(joinRes *livekit.JoinResponse) {
r.lock.Lock()
r.name = joinRes.Room.Name
r.metadata = joinRes.Room.Metadata
r.serverInfo = joinRes.ServerInfo
r.connectionState = ConnectionStateConnected
r.sifTrailer = make([]byte, len(joinRes.SifTrailer))
copy(r.sifTrailer, joinRes.SifTrailer)
r.lock.Unlock()

r.setSid(joinRes.Room.Sid, false)

r.LocalParticipant.updateInfo(joinRes.Participant)
r.LocalParticipant.updateSubscriptionPermission()

for _, pi := range joinRes.OtherParticipants {
r.addRemoteParticipant(pi, true)
r.clearParticipantDefers(livekit.ParticipantID(pi.Sid), pi)
// no need to run participant defers here, since we are connected for the first time
}
}

func (r *Room) handleDisconnect(reason DisconnectionReason) {
r.callback.OnDisconnected()
r.callback.OnDisconnectedWithReason(reason)
Expand Down

0 comments on commit cf06c23

Please sign in to comment.