Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition (no room can be created) #205

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"github.com/screego/server/ws"
)

type Health struct {
Status string `json:"status"`
Clients int `json:"clients"`
Reason string `json:"reason,omitempty"`
}

type UIConfig struct {
AuthMode string `json:"authMode"`
User string `json:"user"`
Expand Down Expand Up @@ -47,6 +53,19 @@ func Router(conf config.Config, rooms *ws.Rooms, users *auth.Users, version stri
CloseRoomWhenOwnerLeaves: conf.CloseRoomWhenOwnerLeaves,
})
})
router.Methods("GET").Path("/health").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
i, err := rooms.Count()
status := "up"
if err != "" {
status = "down"
w.WriteHeader(500)
}
_ = json.NewEncoder(w).Encode(Health{
Status: status,
Clients: i,
Reason: err,
})
})
if conf.Prometheus {
log.Info().Msg("Prometheus enabled")
router.Methods("GET").Path("/metrics").Handler(basicAuth(promhttp.Handler(), users))
Expand Down
6 changes: 0 additions & 6 deletions ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ type ClientMessage struct {

type ClientInfo struct {
ID xid.ID
RoomID string
Authenticated bool
AuthenticatedUser string
Write chan outgoing.Message
Expand All @@ -60,7 +59,6 @@ func newClient(conn *websocket.Conn, req *http.Request, read chan ClientMessage,
Authenticated: authenticated,
AuthenticatedUser: authenticatedUser,
ID: xid.New(),
RoomID: "",
Addr: ip,
Write: make(chan outgoing.Message, 1),
},
Expand Down Expand Up @@ -158,10 +156,6 @@ func (c *Client) startWriteHandler(pingPeriod time.Duration) {
continue
}

if room, ok := message.(outgoing.Room); ok {
c.info.RoomID = room.ID
}

if err := writeJSON(c.conn, typed); err != nil {
c.printWebSocketError("write", err)
c.CloseOnError(websocket.CloseNormalClosure, "write error"+err.Error())
Expand Down
10 changes: 3 additions & 7 deletions ws/event_clientanswer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ func init() {
type ClientAnswer outgoing.P2PMessage

func (e *ClientAnswer) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

session, ok := room.Sessions[e.SID]
Expand Down
10 changes: 3 additions & 7 deletions ws/event_clientice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ func init() {
type ClientICE outgoing.P2PMessage

func (e *ClientICE) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

session, ok := room.Sessions[e.SID]
Expand Down
2 changes: 1 addition & 1 deletion ws/event_connected.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ package ws
type Connected struct{}

func (e Connected) Execute(rooms *Rooms, current ClientInfo) error {
rooms.connected[current.ID] = true
rooms.connected[current.ID] = ""
return nil
}
3 changes: 2 additions & 1 deletion ws/event_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type Create struct {
}

func (e *Create) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID != "" {
if rooms.connected[current.ID] != "" {
return fmt.Errorf("cannot join room, you are already in one")
}

Expand Down Expand Up @@ -74,6 +74,7 @@ func (e *Create) Execute(rooms *Rooms, current ClientInfo) error {
},
},
}
rooms.connected[current.ID] = room.ID
rooms.Rooms[e.ID] = room
room.notifyInfoChanged()
usersJoinedTotal.Inc()
Expand Down
9 changes: 5 additions & 4 deletions ws/event_disconnected.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ func (e *Disconnected) Execute(rooms *Rooms, current ClientInfo) error {
}

func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
roomID := rooms.connected[current.ID]
delete(rooms.connected, current.ID)
current.Write <- outgoing.CloseWriter{Code: e.Code, Reason: e.Reason}

if current.RoomID == "" {
if roomID == "" {
return
}

room, ok := rooms.Rooms[current.RoomID]
room, ok := rooms.Rooms[roomID]
if !ok {
// room may already be removed
return
Expand Down Expand Up @@ -63,12 +64,12 @@ func (e *Disconnected) executeNoError(rooms *Rooms, current ClientInfo) {
delete(rooms.connected, member.ID)
member.Write <- outgoing.CloseWriter{Code: websocket.CloseNormalClosure, Reason: CloseOwnerLeft}
}
rooms.closeRoom(current.RoomID)
rooms.closeRoom(roomID)
return
}

if len(room.Users) == 0 {
rooms.closeRoom(current.RoomID)
rooms.closeRoom(roomID)
return
}

Expand Down
10 changes: 10 additions & 0 deletions ws/event_health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ws

type Health struct {
Response chan int
}

func (e *Health) Execute(rooms *Rooms, current ClientInfo) error {
e.Response <- len(rooms.connected)
return nil
}
10 changes: 3 additions & 7 deletions ws/event_hostice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ func init() {
type HostICE outgoing.P2PMessage

func (e *HostICE) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

session, ok := room.Sessions[e.SID]
Expand Down
10 changes: 3 additions & 7 deletions ws/event_hostoffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,9 @@ func init() {
type HostOffer outgoing.P2PMessage

func (e *HostOffer) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

session, ok := room.Sessions[e.SID]
Expand Down
3 changes: 2 additions & 1 deletion ws/event_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Join struct {
}

func (e *Join) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID != "" {
if rooms.connected[current.ID] != "" {
return fmt.Errorf("cannot join room, you are already in one")
}

Expand All @@ -40,6 +40,7 @@ func (e *Join) Execute(rooms *Rooms, current ClientInfo) error {
Addr: current.Addr,
Write: current.Write,
}
rooms.connected[current.ID] = room.ID
room.notifyInfoChanged()
usersJoinedTotal.Inc()

Expand Down
14 changes: 3 additions & 11 deletions ws/event_name.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package ws

import (
"fmt"
)

func init() {
register("name", func() Event {
return &Name{}
Expand All @@ -15,13 +11,9 @@ type Name struct {
}

func (e *Name) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

room.Users[current.ID].Name = e.UserName
Expand Down
14 changes: 3 additions & 11 deletions ws/event_share.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package ws

import (
"fmt"
)

func init() {
register("share", func() Event {
return &StartShare{}
Expand All @@ -13,13 +9,9 @@ func init() {
type StartShare struct{}

func (e *StartShare) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

room.Users[current.ID].Streaming = true
Expand Down
11 changes: 3 additions & 8 deletions ws/event_stop_share.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ws

import (
"bytes"
"fmt"

"github.com/screego/server/ws/outgoing"
)
Expand All @@ -16,13 +15,9 @@ func init() {
type StopShare struct{}

func (e *StopShare) Execute(rooms *Rooms, current ClientInfo) error {
if current.RoomID == "" {
return fmt.Errorf("not in a room")
}

room, ok := rooms.Rooms[current.RoomID]
if !ok {
return fmt.Errorf("room with id %s does not exist", current.RoomID)
room, err := rooms.CurrentRoom(current)
if err != nil {
return err
}

room.Users[current.ID].Streaming = false
Expand Down
39 changes: 36 additions & 3 deletions ws/rooms.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewRooms(tServer turn.Server, users *auth.Users, conf config.Config) *Rooms
return &Rooms{
Rooms: map[string]*Room{},
Incoming: make(chan ClientMessage),
connected: map[xid.ID]bool{},
connected: map[xid.ID]string{},
turnServer: tServer,
users: users,
config: conf,
Expand Down Expand Up @@ -51,7 +51,23 @@ type Rooms struct {
users *auth.Users
config config.Config
r *rand.Rand
connected map[xid.ID]bool
connected map[xid.ID]string
}

func (r *Rooms) CurrentRoom(info ClientInfo) (*Room, error) {
roomID, ok := r.connected[info.ID]
if !ok {
return nil, fmt.Errorf("not connected")
}
if roomID == "" {
return nil, fmt.Errorf("not in a room")
}
room, ok := r.Rooms[roomID]
if !ok {
return nil, fmt.Errorf("room with id %s does not exist", roomID)
}

return room, nil
}

func (r *Rooms) RandUserName() string {
Expand Down Expand Up @@ -81,7 +97,8 @@ func (r *Rooms) Upgrade(w http.ResponseWriter, req *http.Request) {

func (r *Rooms) Start() {
for msg := range r.Incoming {
if !msg.SkipConnectedCheck && !r.connected[msg.Info.ID] {
_, connected := r.connected[msg.Info.ID]
if !msg.SkipConnectedCheck && !connected {
log.Debug().Interface("event", fmt.Sprintf("%T", msg.Incoming)).Interface("payload", msg.Incoming).Msg("WebSocket Ignore")
continue
}
Expand All @@ -93,6 +110,22 @@ func (r *Rooms) Start() {
}
}

func (r *Rooms) Count() (int, string) {
h := Health{Response: make(chan int, 1)}
select {
case r.Incoming <- ClientMessage{SkipConnectedCheck: true, Incoming: &h}:
case <-time.After(5 * time.Second):
return -1, "main loop didn't accept a message within 5 second"
}
r.Incoming <- ClientMessage{SkipConnectedCheck: true, Incoming: &h}
select {
case count := <-h.Response:
return count, ""
case <-time.After(5 * time.Second):
return -1, "main loop didn't respond to a message within 5 second"
}
}

func (r *Rooms) closeRoom(roomID string) {
room, ok := r.Rooms[roomID]
if !ok {
Expand Down
Loading
Loading