Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
fix: do not track pongs during command execution
Browse files Browse the repository at this point in the history
Long-running commands may cause pongs blocking, thus, dropping clients connections
  • Loading branch information
palkan committed Oct 31, 2024
1 parent b8e98fd commit aa9b745
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
8 changes: 7 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ func (n *Node) Instrumenter() metrics.Instrumenter {
// execute the command (if recognized)
func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
s.Log.Debug("incoming message", "data", msg)

s.keepalive()

switch msg.Command {
case "pong":
s.handlePong(msg)
case "subscribe":
_, err = n.Subscribe(s, msg)
case "unsubscribe":
Expand All @@ -179,6 +181,10 @@ func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) {
err = fmt.Errorf("unknown command: %s", msg.Command)
}

// Reset pong timer after the message has been processed
// (so we don't disconnect the client if processing the message took too long)
s.resetPong()

return
}

Expand Down
14 changes: 12 additions & 2 deletions node/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,12 +673,22 @@ func newPingMessage(format string) *common.PingMessage {
return (&common.PingMessage{Type: "ping", Message: ts})
}

func (s *Session) handlePong(msg *common.Message) {
func (s *Session) keepalive() {
s.mu.Lock()
defer s.mu.Unlock()

if s.pongTimer == nil {
return
}

s.pongTimer.Stop()
}

func (s *Session) resetPong() {
s.mu.Lock()
defer s.mu.Unlock()

if s.pongTimer == nil {
s.Log.Debug("unexpected pong received")
return
}

Expand Down

0 comments on commit aa9b745

Please sign in to comment.