From 877b999756774cabf499d2d32fef6a5a9ea22afa Mon Sep 17 00:00:00 2001 From: Logan Attwood Date: Thu, 13 Jun 2024 14:33:28 -0300 Subject: [PATCH 1/3] Streams should check for Session shutdown when waiting for data Stream.Close is called when the Session's keepalive fails. This call wakes up Stream.Read & Stream.write waiters. The wakeup is via a non-blocking send to recvNotifyCh/sendNotifyCh, which are unbuffered channels. In Stream.Read this send can be attempted between the call to s.stateLock.Unlock() and the select statement at the bottom of the method. When this occurs, if deadlines haven't been set on the Stream in question, the call to Stream.Read will block forever. By adding Session.shutdownCh to the channels in the select statement, this coordination gap is addressed. This uses `goto START` instead of `return 0, io.EOF`, as that would potentially leave a last read on the buffer in the Read case. --- stream.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/stream.go b/stream.go index 365b718..2d1d4ee 100644 --- a/stream.go +++ b/stream.go @@ -143,14 +143,15 @@ WAIT: timeout = timer.C } select { + case <-s.session.shutdownCh: case <-s.recvNotifyCh: - if timer != nil { - timer.Stop() - } - goto START case <-timeout: return 0, ErrTimeout } + if timer != nil { + timer.Stop() + } + goto START } // Write is used to write to the stream @@ -225,11 +226,12 @@ WAIT: timeout = time.After(delay) } select { + case <-s.session.shutdownCh: case <-s.sendNotifyCh: - goto START case <-timeout: return 0, ErrTimeout } + goto START } // sendFlags determines any flags that are appropriate From eedcf9596aa75ea72f49642cd103fca8e0d3654b Mon Sep 17 00:00:00 2001 From: Logan Attwood Date: Fri, 14 Jun 2024 11:31:38 -0300 Subject: [PATCH 2/3] Align Write timeout handling with Read timeout handling In hashicorp/yamux#31, the Read path was changed to move away from time.After. This change was not reflected in the Write path, and this commit rectifies that. --- stream.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/stream.go b/stream.go index 2d1d4ee..0a73a38 100644 --- a/stream.go +++ b/stream.go @@ -220,10 +220,12 @@ START: WAIT: var timeout <-chan time.Time + var timer *time.Timer writeDeadline := s.writeDeadline.Load().(time.Time) if !writeDeadline.IsZero() { delay := time.Until(writeDeadline) - timeout = time.After(delay) + timer = time.NewTimer(delay) + timeout = timer.C } select { case <-s.session.shutdownCh: @@ -231,6 +233,9 @@ WAIT: case <-timeout: return 0, ErrTimeout } + if timer != nil { + timer.Stop() + } goto START } From 84b3fc617bb1d13ae6b0b6850f734049e9605060 Mon Sep 17 00:00:00 2001 From: Logan Attwood Date: Fri, 14 Jun 2024 11:35:21 -0300 Subject: [PATCH 3/3] Drain the timer's channel if needed, to prevent memory leakage When stopping a time.Timer, you need to check the return value and drain the channel to prevent a memory leak, if the Timer has fired. --- stream.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 0a73a38..0db2af2 100644 --- a/stream.go +++ b/stream.go @@ -149,7 +149,9 @@ WAIT: return 0, ErrTimeout } if timer != nil { - timer.Stop() + if !timer.Stop() { + <-timeout + } } goto START } @@ -234,7 +236,9 @@ WAIT: return 0, ErrTimeout } if timer != nil { - timer.Stop() + if !timer.Stop() { + <-timeout + } } goto START }