Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add token into stream v2 #47

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open

Conversation

cs8425
Copy link

@cs8425 cs8425 commented Mar 9, 2019

接續 #19
重新提交PR
TestSlowReadBlocking()這個test可以復現 #18

net.Pipe()測試過程中有踩到類似 #37 的問題
發送cmdACKcmdFUL會卡住(已透過另外開個控制包專用channel解決)
有空再試試可否透過test復現

另外改變了keepalive的實作方式
最明顯的改變是KeepAliveInterval可以大於KeepAliveTimeout
送一個包會等KeepAliveTimeout的長度再去確認無回應才斷線
如果回應在timeout之前收到
則會重設timeout跟下次發包的時機
換句話說
ping的時間點不再是固定的KeepAliveInterval
而是KeepAliveInterval + RTT

recvLoop()的部份
收到不明的cmd ID不再強制斷線
可以直接於SMUX這層加入雜訊且不干擾stream token的估測
或者提高之後引入新ID的相容性

@cs8425 cs8425 changed the title Move token to stream v2 add token into stream v2 Mar 9, 2019
@codecov-io
Copy link

codecov-io commented Mar 9, 2019

Codecov Report

Merging #47 into master will increase coverage by 2.33%.
The diff coverage is 91.62%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master      #47      +/-   ##
==========================================
+ Coverage    87.5%   89.83%   +2.33%     
==========================================
  Files           4        4              
  Lines         392      531     +139     
==========================================
+ Hits          343      477     +134     
- Misses         44       49       +5     
  Partials        5        5
Impacted Files Coverage Δ
frame.go 76.92% <ø> (ø) ⬆️
mux.go 100% <100%> (ø) ⬆️
session.go 88.04% <88.13%> (-1.12%) ⬇️
stream.go 91.38% <96.05%> (+7.82%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f4f6ca3...9d5a405. Read the comment docs.

@jannson
Copy link
Contributor

jannson commented Mar 9, 2019

给力给力!

@jannson
Copy link
Contributor

jannson commented Mar 9, 2019

@jannson
Copy link
Contributor

jannson commented Mar 12, 2019

没办法啊,比如说服务器端与客户端这样的业务,同时存在新旧的客户端。
服务器端更新的时候,它就得一个库支持新旧客户端,免得就无法达到兼容的目的了,只能强制所有用户更新。
客户端与客户端的连接,可以用版本号控制,不同版本禁止相互连接。但服务器应该是要同时支持新旧的,通过参数控制。

@jannson
Copy link
Contributor

jannson commented Mar 12, 2019

继续之前的讨论,你说要 “保證每個stream cmdSYN >> cmdPSH >> cmdFIN的封包順序”, cmdSYN 优先第一,这个现在代码就能保证了吧?因为只有 cmdSYN 之后,才能 OpenStream 或者 AcceptStream,后面只要保证先 Write 的 cmdPSH 比后 Write 的 cmdFIN 快就行了吧?
如果是这样,则简单实现很多,只要 cmdFIN 与 cmdPSH 共用一个 channel writes,同时在 Write cmdFIN 的时候,也要 writeLock 一下, 这样有调度系统去保证他们获得锁的先后顺序就行了。

思路不变,但补充:

  1. AcceptStream 肯定是在 cmdSYN 成功之后的,不变
  2. OpenStream 是可能有先 cmdPSH 再 cmdSYN 的,思路与 cmdFIN 一样,在发送 cmdSYN cmdPSH cmdFIN 都要加 writeLock,由系统调度的先后来决定他们的顺序,先函数调用肯定先得到锁。

@cs8425
Copy link
Author

cs8425 commented Mar 12, 2019

目前改進的有:

  1. stream token, 完全不相容舊版
  2. keepalive, 有三種模式:
    1. 任一邊為舊版, 全降級為舊版(有缺陷)
    2. 新版但是要把KeepAliveInterval + KeepAliveTimeout設定成比舊版KeepAliveTimeout還小才能相容
    3. 雙邊都新版
  3. 併發Write的問題, 可相容
  4. (暫)保證cmdSYN >> cmdPSH >> cmdFIN的封包順序, 可相容

就算伺服端更新了新版
為了舊客戶端相容
等於沒有更新版本
那還不如強制所有舊版客戶端更新
反過來的意義比較大
客戶端可以用新版的程式連上還未更新的伺服端繼續使用服務

我認為2.2是比較好的選擇
(至少確定新版這邊不會主動斷開, 舊版那側無解)
但是需要使用者重新確認自己的設定
而不是像現在這樣為了相容舊版
必須回到有缺陷的實作

至於1完全不相容是因為舊版使用比較激進的錯誤處理方式(強制斷線)
如果把那段移除是可以相容的(會發cmdEMP cmdFUL cmdACK但是無作用)
但是改這個一樣要更新
那一樣不如一次到位....

@jannson
Copy link
Contributor

jannson commented Mar 12, 2019

除非是新特性实在跟旧客户端冲突,免得兼容的意义还是挺大的。毕竟强制所有用户更新客户端,从产品或业务的角度讲,伤害很大。我想的是如何更好的较缓和的升级服务器端。
当然也可以用你说的方案,先让大部分用户更新客户端,但不启用客户端的新特性,同时服务器端是旧的。等用户更新得差不多了,再在服务器端启用新特性,同时强制小部分用户升级。

兼容这个问题可以先放着,等解决了所有问题咱们再回头来看,看能达到什么兼容程度。

“保證cmdSYN >> cmdPSH >> cmdFIN的封包順序” 这个问题现在解决了吗?
我说的方案是否可行?如果系统的锁的调度也无法保证顺序,那么也可以实现一个保证顺序的 cmdSYN cmdPSH cmdFIN 优先级的锁来解决。

@cs8425
Copy link
Author

cs8425 commented Mar 12, 2019

“保證cmdSYN >> cmdPSH >> cmdFIN的封包順序”這個應該是解決了
後來仔細想想
OpenStream()肯定要等到writeFrame(newFrame(cmdSYN, sid))成功後才返回*Stream
之後才會有Write()操作
所以這邊沒問題

Write()Close()併發呼叫的時候
是先close還是先write本身就無法保證
但是我們至少能保證一次Write()的資料不會被Close()送的cmdFIN截掉
理論上來說應該是符合正常使用情境

@jannson
Copy link
Contributor

jannson commented Mar 12, 2019

恩,看起来很简结,给力给力。

@jannson
Copy link
Contributor

jannson commented Mar 13, 2019

感觉用 go test -race 还是会死锁,不知道问题在哪里。代码看起来都很 OK 的。

@cs8425
Copy link
Author

cs8425 commented Mar 13, 2019

-race我測是有過的啊
(TestParallel() TestParallel2()那邊RAM吃到7~8G)
你能確定是卡在哪個項目嗎?
卡住的時候不要急著關
連上 http://localhost:6060/debug/pprof/goroutine?debug=2
可以把goroutine dump出來分析問題

另外go v1.9.x的net.Pipe()應該有問題
TestWriteStreamRacePipe()一定不會過
猜測是v1.9.x過了support期限所以net.Pipe()的patch沒有backport
既然v1.9.x已停止維護
應該要把它移除了

@jannson
Copy link
Contributor

jannson commented Mar 13, 2019

感谢提供思路,我测试是超过 10 分钟没输出结果,就自动 killed 的了。
一会我再跑一次看看。

@jannson
Copy link
Contributor

jannson commented Mar 13, 2019

版本是: go1.11 linux/amd64
系统是: centos (家里的 ubuntu 也没测试过)

--- FAIL: TestSlowReadBlocking (3.57s)
session_test.go:1017: fast write stream start...
session_test.go:1081: normal stream start...
session_test.go:1126: [normal]r 5 0 rtt 5.817711ms stream.bucket 0 stream.guessNeeded 0
session_test.go:1126: [normal]r 5 1 rtt 2.559955ms stream.bucket 0 stream.guessNeeded 18
session_test.go:1126: [normal]r 5 2 rtt 678.696µs stream.bucket 0 stream.guessNeeded 12
session_test.go:1126: [normal]r 5 3 rtt 3.64238ms stream.bucket 0 stream.guessNeeded 0
.......................................................................
.......................................................................
.......................................................................
.......................................................................
session_test.go:1126: [normal]r 5 46 rtt 2.722445ms stream.bucket 0 stream.guessNeeded 14
session_test.go:1126: [normal]r 5 47 rtt 2.687178ms stream.bucket 0 stream.guessNeeded 14
session_test.go:1126: [normal]r 5 48 rtt 1.015599ms stream.bucket 0 stream.guessNeeded 28
session_test.go:1126: [normal]r 5 49 rtt 645.687µs stream.bucket 0 stream.guessNeeded 28
session_test.go:1126: [normal]r 5 50 rtt 1.085514ms stream.bucket 0 stream.guessNeeded 14
session_test.go:1126: [normal]r 5 51 rtt 830.415µs stream.bucket 0 stream.guessNeeded 21
session_test.go:1126: [normal]r 5 52 rtt 1.100771ms stream.bucket 0 stream.guessNeeded 0
session_test.go:1126: [normal]r 5 53 rtt 677.102µs stream.bucket 0 stream.guessNeeded 14
session_test.go:1126: [normal]r 5 54 rtt 156.806743ms stream.bucket 0 stream.guessNeeded 2653
session_test.go:1116: 5 55 i/o timeout session.bucket 0 stream.bucket 0 stream.empflag 1 stream.fulflag 0
session_test.go:1560: goroutine 6389 [running]:
runtime/pprof.writeGoroutineStacks(0xa7a460, 0xc0001264d0, 0xd0, 0xd0)
/home/janson/projects/cloud/go1.11/src/runtime/pprof/pprof.go:678 +0xb5
runtime/pprof.writeGoroutine(0xa7a460, 0xc0001264d0, 0x2, 0x435458, 0xd0)
/home/janson/projects/cloud/go1.11/src/runtime/pprof/pprof.go:667 +0x52
runtime/pprof.(*Profile).WriteTo(0xd92200, 0xa7a460, 0xc0001264d0, 0x2, 0xc000124e18, 0xc000379c38)
/home/janson/projects/cloud/go1.11/src/runtime/pprof/pprof.go:328 +0x546
github.com/jannson/smux.dumpGoroutine(0xc000124e00)
/home/janson/workspace-go/src/github.com/jannson/smux/session_test.go:1559 +0xd8
github.com/jannson/smux.testSlowReadBlocking.func3(0xc00445e4d0, 0xc000137880, 0xc000124e00, 0xc000137878, 0xc004c3a460)
/home/janson/workspace-go/src/github.com/jannson/smux/session_test.go:1120 +0xa4f
created by github.com/jannson/smux.testSlowReadBlocking
/home/janson/workspace-go/src/github.com/jannson/smux/session_test.go:1073 +0x2ce

    goroutine 1 [chan receive]:
    testing.(*T).Run(0xc000124100, 0x9f8781, 0x14, 0xa0d150, 0xc0000ebc01)
            /home/janson/projects/cloud/go1.11/src/testing/testing.go:879 +0x689
    testing.runTests.func1(0xc000124100)
            /home/janson/projects/cloud/go1.11/src/testing/testing.go:1119 +0xa9
    testing.tRunner(0xc000124100, 0xc0000ebd88)
            /home/janson/projects/cloud/go1.11/src/testing/testing.go:827 +0x163
    testing.runTests(0xc00009e3e0, 0xd990e0, 0x2f, 0x2f, 0xc0000ebe78)
            /home/janson/projects/cloud/go1.11/src/testing/testing.go:1117 +0x4ef
    testing.(*M).Run(0xc00013e100, 0x0)
            /home/janson/projects/cloud/go1.11/src/testing/testing.go:1034 +0x2ef
    main.main()
            _testmain.go:150 +0x222

    goroutine 20 [IO wait, 3 minutes]:
    internal/poll.runtime_pollWait(0x7f273c683f00, 0x72, 0xc000146b18)
            /home/janson/projects/cloud/go1.11/src/runtime/netpoll.go:173 +0x66
    internal/poll.(*pollDesc).wait(0xc000152298, 0x72, 0x0, 0x0, 0xa7b660)
            /home/janson/projects/cloud/go1.11/src/internal/poll/fd_poll_runtime.go:85 +0xe4
    internal/poll.(*pollDesc).waitRead(0xc000152298, 0xffffffffffffff00, 0x0, 0x0)
            /home/janson/projects/cloud/go1.11/src/internal/poll/fd_poll_runtime.go:90 +0x4b
    internal/poll.(*FD).Accept(0xc000152280, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
            /home/janson/projects/cloud/go1.11/src/internal/poll/fd_unix.go:384 +0x27f
    net.(*netFD).accept(0xc000152280, 0x50, 0x50, 0x46551f)

总结下,卡住的地方有:
session.go#225
session.go#408
session.go#362
session.go#390
session.go#135
stream.go#259

@cs8425
Copy link
Author

cs8425 commented Mar 13, 2019

--- FAIL: TestSlowReadBlocking (3.57s)
session_test.go:1017: fast write stream start...
session_test.go:1081: normal stream start...
session_test.go:1126: [normal]r 5 0 rtt 5.817711ms stream.bucket 0 stream.guessNeeded 0
session_test.go:1126: [normal]r 5 1 rtt 2.559955ms stream.bucket 0 stream.guessNeeded 18
.......................................................................
.......................................................................
.......................................................................
.......................................................................
session_test.go:1126: [normal]r 5 53 rtt 677.102µs stream.bucket 0 stream.guessNeeded 14
session_test.go:1126: [normal]r 5 54 rtt 156.806743ms stream.bucket 0 stream.guessNeeded 2653
session_test.go:1116: 5 55 i/o timeout session.bucket 0 stream.bucket 0 stream.empflag 1 stream.fulflag 0

session.bucket 0 << session的buffer用完了導致全部卡死
stream.sendPause()已發送但是對面的stream.pauseWrite()還未執行
stream.Write()把buffer寫爆造成的
通常發生在高RTT或是發送方的recvLoop()執行比較慢/無法同時執行的情況
(-race好像會影響調度, 至少我測試的情況下buffer都要開比較大)
MaxReceiveBuffer加大即可解決


我好像又發現原版的一個race問題
recvLoop() stream.pushBytes(f.data)之後session.bucket剛好爆掉
進入<-s.bucketNotify卡住
而同時stream.Read()又剛好把stream.buffer讀空卡在case <-s.chReadEvent:
由於recvLoop()卡住
<-s.chReadEvent不可能被觸發
直接等到timeout

此問題容易發生在session.bucket很小
幾個frame就會用光
而且一次Read()/Write()的量大於單個frame大小

@jannson
Copy link
Contributor

jannson commented Mar 13, 2019

关于这一点:"我好像又發現原版的一個race問題"——好像没问题。
recvLoop stream.pushBytes 之后,bucket 空了,但也给 stream.chReadEvent 事件了,所以 stream.Read 总能读一次,并且 returnTokens,从而让 <-s.bucketNotify 得以释放。

我把
MaxReceiveBuffer: 16 * 1024 * 1024,
MaxStreamBuffer: 4 * 1024 * 1024,
设置得很大了,但返回这样:
*** Test killed with quit: ran too long (10m0s).
是不是表示测试通过了?因为没有 stream 超时了。

@cs8425
Copy link
Author

cs8425 commented Mar 13, 2019

关于这一点:"我好像又發現原版的一個race問題"——好像没问题。
recvLoop stream.pushBytes 之后,bucket 空了,但也给 stream.chReadEvent 事件了,所以 stream.Read 总能读一次,并且 returnTokens,从而让 <-s.bucketNotify 得以释放。

不對
我這邊有個test可以複現了
雖然要多跑幾次才會碰到
4個Read()都卡在<-s.chReadEvent
但是recvLoop()卡在<-s.bucketNotify直接deadlock
造成Read() timeout
我這邊測試一次大約5秒
總共大概20~60秒左右就會複現


應該是不算...
是跑太慢沒跑完
我記得有參數能調整test的時間長度


補個卡住時候的輸出:
少的read()應該是在dump的時候因為timeout return了

================ echo server
        goroutine 236 [select]:
        _/home/cs8425/code/smux.(*Session).AcceptStream(0xc0000964d0, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/session.go:135 +0x187
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func1(0xc000018340, 0x888180, 0xc00000e038)
        	/home/cs8425/code/smux/session_test.go:1383 +0xaa
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite
        	/home/cs8425/code/smux/session_test.go:1380 +0x113

        goroutine 246 [select]:
        _/home/cs8425/code/smux.(*Session).keepalive.func1(0x0, 0xc0000e9400)
        	/home/cs8425/code/smux/session.go:331 +0x1b7
        _/home/cs8425/code/smux.(*Session).keepalive(0xc0000964d0)
        	/home/cs8425/code/smux/session.go:346 +0xa9
        created by _/home/cs8425/code/smux.newSession
        	/home/cs8425/code/smux/session.go:88 +0x261

        goroutine 245 [IO wait]:
        internal/poll.runtime_pollWait(0x7f1414bedf00, 0x77, 0xc000182c78)
        	/opt/go/src/runtime/netpoll.go:173 +0x66
        internal/poll.(*pollDesc).wait(0xc00012c598, 0x77, 0xffffffffffffff00, 0x883d40, 0xae5618)
        	/opt/go/src/internal/poll/fd_poll_runtime.go:85 +0x9a
        internal/poll.(*pollDesc).waitWrite(0xc00012c598, 0xc000906200, 0x1ff, 0xfdff)
        	/opt/go/src/internal/poll/fd_poll_runtime.go:94 +0x3d
        internal/poll.(*FD).Write(0xc00012c580, 0xc000906000, 0x408, 0x10008, 0x0, 0x0, 0x0)
        	/opt/go/src/internal/poll/fd_unix.go:276 +0x254
        net.(*netFD).Write(0xc00012c580, 0xc000906000, 0x408, 0x10008, 0xc00049f9e0, 0xc000182fa0, 0xc0001363c0)
        	/opt/go/src/net/fd_unix.go:220 +0x4f
        net.(*conn).Write(0xc00000e038, 0xc000906000, 0x408, 0x10008, 0x0, 0x0, 0x0)
        	/opt/go/src/net/net.go:189 +0x68
        _/home/cs8425/code/smux.(*Session).sendLoop.func1(0x300000201, 0xc0002b8000, 0x400, 0x100000, 0xc0008c7b00)
        	/home/cs8425/code/smux/session.go:388 +0x15a
        _/home/cs8425/code/smux.(*Session).sendLoop(0xc0000964d0)
        	/home/cs8425/code/smux/session.go:416 +0x103
        created by _/home/cs8425/code/smux.newSession
        	/home/cs8425/code/smux/session.go:87 +0x23f

        goroutine 244 [chan receive]: `<-s.bucketNotify`
        _/home/cs8425/code/smux.(*Session).recvLoop(0xc0000964d0)
        	/home/cs8425/code/smux/session.go:248 +0xa0
        created by _/home/cs8425/code/smux.newSession
        	/home/cs8425/code/smux/session.go:86 +0x21d
================

================ echo read/write
        goroutine 252 [select]: `<-s.chReadEvent`
        _/home/cs8425/code/smux.(*Stream).Read(0xc0000e8000, 0xc0001b8000, 0x100000, 0x100000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:100 +0x24b
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func1.1(0x885f60, 0xc0000e8000)
        	/home/cs8425/code/smux/session_test.go:1388 +0xeb
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite.func1
        	/home/cs8425/code/smux/session_test.go:1384 +0x9c

        goroutine 254 [select]: `<-s.chReadEvent`
        _/home/cs8425/code/smux.(*Stream).Read(0xc0000e83c0, 0xc00070e000, 0x100000, 0x100000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:100 +0x24b
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func1.1(0x885f60, 0xc0000e83c0)
        	/home/cs8425/code/smux/session_test.go:1388 +0xeb
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite.func1
        	/home/cs8425/code/smux/session_test.go:1384 +0x9c
        
        goroutine 251 [select]:
        _/home/cs8425/code/smux.(*Stream).Write(0xc0000e8b40, 0xc0002b8000, 0x400, 0x100000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:153 +0x48b
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func1.1(0x885f60, 0xc0000e8b40)
        	/home/cs8425/code/smux/session_test.go:1392 +0xb9
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite.func1
        	/home/cs8425/code/smux/session_test.go:1384 +0x9c
        
        goroutine 253 [select]:
        _/home/cs8425/code/smux.(*Stream).Write(0xc0000e8280, 0xc000506000, 0x800, 0x100000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:145 +0x3a0
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func1.1(0x885f60, 0xc0000e8280)
        	/home/cs8425/code/smux/session_test.go:1392 +0xb9
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite.func1
        	/home/cs8425/code/smux/session_test.go:1384 +0x9c
================

================ test session
        goroutine 239 [select]:
        _/home/cs8425/code/smux.(*Session).keepalive.func1(0x0, 0xc000032700)
        	/home/cs8425/code/smux/session.go:331 +0x1b7
        _/home/cs8425/code/smux.(*Session).keepalive(0xc000096420)
        	/home/cs8425/code/smux/session.go:346 +0xa9
        created by _/home/cs8425/code/smux.newSession
        	/home/cs8425/code/smux/session.go:88 +0x261

        goroutine 238 [IO wait]:
        internal/poll.runtime_pollWait(0x7f1414bede30, 0x77, 0xc000038478)
        	/opt/go/src/runtime/netpoll.go:173 +0x66
        internal/poll.(*pollDesc).wait(0xc00012c518, 0x77, 0xffffffffffffff00, 0x883d40, 0xae5618)
        	/opt/go/src/internal/poll/fd_poll_runtime.go:85 +0x9a
        internal/poll.(*pollDesc).waitWrite(0xc00012c518, 0xc000868200, 0x16f, 0xfd6f)
        	/opt/go/src/internal/poll/fd_poll_runtime.go:94 +0x3d
        internal/poll.(*FD).Write(0xc00012c500, 0xc000868000, 0x408, 0x10008, 0x0, 0x0, 0x0)
        	/opt/go/src/internal/poll/fd_unix.go:276 +0x254
        net.(*netFD).Write(0xc00012c500, 0xc000868000, 0x408, 0x10008, 0xc00049f8c0, 0xc0000387a0, 0xc0001363c0)
        	/opt/go/src/net/fd_unix.go:220 +0x4f
        net.(*conn).Write(0xc00000e050, 0xc000868000, 0x408, 0x10008, 0x0, 0x0, 0x0)
        	/opt/go/src/net/net.go:189 +0x68
        _/home/cs8425/code/smux.(*Session).sendLoop.func1(0x500000201, 0xc00087cc00, 0x400, 0x1400, 0xc00066c420)
        	/home/cs8425/code/smux/session.go:388 +0x15a
        _/home/cs8425/code/smux.(*Session).sendLoop(0xc000096420)
        	/home/cs8425/code/smux/session.go:416 +0x103
        created by _/home/cs8425/code/smux.newSession
        	/home/cs8425/code/smux/session.go:87 +0x23f

        goroutine 237 [chan receive]: `<-s.bucketNotify`
        _/home/cs8425/code/smux.(*Session).recvLoop(0xc000096420)
        	/home/cs8425/code/smux/session.go:248 +0xa0
        created by _/home/cs8425/code/smux.newSession
        	/home/cs8425/code/smux/session.go:86 +0x21d

    session_test.go:1405: ================
    session_test.go:1406: session.bucket 0 session.streams.len 4
    session_test.go:1408: id: 3, addr: 0xc0000e8500, bucket: 0, empflag: 1, fulflag: 0
    session_test.go:1408: id: 5, addr: 0xc0000e8640, bucket: 0, empflag: 1, fulflag: 0
    session_test.go:1408: id: 7, addr: 0xc0000e8780, bucket: 0, empflag: 1, fulflag: 0
    session_test.go:1408: id: 9, addr: 0xc0000e88c0, bucket: 0, empflag: 1, fulflag: 0
    session_test.go:1411: ================

================ test write
        goroutine 241 [select]: sid = 7
        _/home/cs8425/code/smux.(*Stream).Write(0xc0000e8780, 0xc000880000, 0x2000, 0x2000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:145 +0x3a0
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func3(0xc000096420)
        	/home/cs8425/code/smux/session_test.go:1458 +0x2d2
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite
        	/home/cs8425/code/smux/session_test.go:1478 +0x23a
        
        goroutine 240 [select]: sid = 5
        _/home/cs8425/code/smux.(*Stream).Write(0xc0000e8640, 0xc00087c000, 0x2000, 0x2000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:153 +0x48b
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func3(0xc000096420)
        	/home/cs8425/code/smux/session_test.go:1458 +0x2d2
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite
        	/home/cs8425/code/smux/session_test.go:1478 +0x23a
        
        goroutine 243 [select]: sid = 3
        _/home/cs8425/code/smux.(*Stream).Write(0xc0000e8500, 0xc00087a000, 0x2000, 0x2000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:145 +0x3a0
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func3(0xc000096420)
        	/home/cs8425/code/smux/session_test.go:1458 +0x2d2
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite
        	/home/cs8425/code/smux/session_test.go:1478 +0x23a

        goroutine 242 [select]: sid = 9
        _/home/cs8425/code/smux.(*Stream).Write(0xc0000e88c0, 0xc000884000, 0x2000, 0x2000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:145 +0x3a0
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func3(0xc000096420)
        	/home/cs8425/code/smux/session_test.go:1458 +0x2d2
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite
        	/home/cs8425/code/smux/session_test.go:1478 +0x23a
================

================ test read
        goroutine 247 [select]: sid = 3,  `<-s.chReadEvent`
        _/home/cs8425/code/smux.(*Stream).Read(0xc0000e8500, 0xc000855f78, 0x2000, 0x2000, 0x0, 0x0, 0x0)
        	/home/cs8425/code/smux/stream.go:100 +0x24b
        _/home/cs8425/code/smux.testSmallBufferReadWrite.func3.2(0xc000488ce0, 0xc000488cb0, 0xc0000e8500, 0xc000104500, 0x835d48, 0xc000096420)
        	/home/cs8425/code/smux/session_test.go:1441 +0x114
        created by _/home/cs8425/code/smux.testSmallBufferReadWrite.func3
        	/home/cs8425/code/smux/session_test.go:1436 +0x1f4

@jannson
Copy link
Contributor

jannson commented Mar 14, 2019

bucketNotify 毕竟是为 1 的 channel,只要发送信号,recvLoop 肯定能接收一次的。 session_test.go:1406: session.bucket 0 session.streams.len 4 这一行看,是不是 session.bucket 为 0 所以卡在 <-s.bucketNotify,这样是合理的。

内心想法是这样:
session.bucket == 0,而对方链路发过来的数据,卡在了 s.readFrame 底层的 net.Conn
因为 session.bucket == 0,肯定有一个 stream 在读数据且能读到数据,会让 session.bucket 值变大,并且肯定会让 s.bucketNotify 通知一次
此时 session.bucket 就会大于 0 并获得往下执行的信号

反过来如果现实 session.bucket 被卡住且为 0,那么就只能是所有的 stream 都没有去读,让 bucket 数据耗空了。

stream.chReadEvent 也是 size 为 1 的 channel,也就是说上一次 session.bucket 为 0 之前,肯定往一个 stream 发送了一次 chReadEvent 信号,且这个 stream 没有再去读数据,才造成 session.bucket 为 0

这个 stream 没有去读数据的可能,一种是本来 cpu 很慢超时了,一种是被 Close 掉了。如果是 close 掉则也会在 session.sessionClosed 先增加 session.bucket 的

总结哈,算起来整个代码逻辑应该还是对的。如果真出现了这种情况,是不是 token 在哪个环节原子操作没处理好,丢失了 token,一点一点丢到最后空了?

@cs8425
Copy link
Author

cs8425 commented Mar 14, 2019

我是覺得蠻奇怪的
讀取跟寫入速度相同
還會發生Read() timeout的情況
如果無視timeout繼續讀的話倒是能繼續跑完測試
設定5秒結束
正常沒問題的會在5.0x秒結束
發生問題的時候會花5.3~5.5秒才結束測試

我有試著把stream.buffer.Len()給dump出來
結果發生timeout的那個stream的buffer是有資料的
目前推測是pushBytes()之後不知道為何在<-s.chReadEventRead()沒有被通知到
到底是那個部份造成的還要再研究研究

@jannson
Copy link
Contributor

jannson commented Mar 15, 2019

TestSmallBufferReadWrite 我测试一直都测试通过!
同时为何:https://github.com/cs8425/smux/blob/move-token-to-stream2/session_test.go#L1460

if strings.Contains(err.Error(), "i/o timeout") {
continue
}
这里要 continue ? 我注释掉这三行会跑出错。

@WinstonPrivacy
Copy link

I am trying to follow along using Google Translate but am not sure that it's working. I am wondering if this discussion is pertinent to an issue that we're encountering.

We are using static connections between peers. We set the KeepAliveInterval to 5 seconds and the KeepAliveTimeout to 16 seconds. Smux sessions are frequently dropping, despite good network conditions. We've traced the cause down to ping timeouts... could this be related?

@cs8425
Copy link
Author

cs8425 commented Mar 21, 2019

@jannson
抱歉 最近比較忙一點
原本是想說用delay跟固定大小的資料
來達成讀/寫的速度控制
後來發現這種寫法好像不夠精準
無法反應實際情況
我還要再想想該怎寫....

@WinstonPrivacy
Yes, this branch fix:

  1. fix buggy keepalive implement, compatible with old version in some condition
  2. race between parallel Stream.Write(), compatible with old version
  3. implement stream token, fix for fast-write-slow-read stream cause whole session freeze, NOT compatible with old version

When you set KeepAliveInterval to 5 seconds and the KeepAliveTimeout to 16 seconds.
The old keepalive implement only check if there have any frame at 16 * N seconds,
and the ping frame will send at 5 * N seconds.
So, on the 80 seconds, will send a ping frame,
and check if there are any frame receive on the same time.
At this situation,
if you session is idle, only ping frame sending, and RTT big enough,
then smux will drop the session.
This is what you met.
I push a new branch that have new keepalive implement and fix for parallel Stream.Write().
https://github.com/cs8425/smux/tree/fix-keepalive
You can take a try, will need to update both server and client side.

@WinstonPrivacy
Copy link

WinstonPrivacy commented Mar 21, 2019 via email

@cs8425
Copy link
Author

cs8425 commented Mar 21, 2019

@WinstonPrivacy
Because mix using new and old implement,
the old side still have chance to drop the session actively.
This issue can not be fully fix without changing any code on old version.
The old code is too strictly, we can't fix/add features without breaking change.

'fast-write-slow-read stream cause whole session freeze' issue not only on KCPTun, but also any program that use smux.
I encounter this issue on my tool which underlying connection is tcp, and submit a PR long time age.
The owner said that is breaking change, even the feature can turn off without breaking compatible,
the owner still not merge....

@xtaci
Copy link
Owner

xtaci commented Mar 22, 2019

breaking changes WILL NOT be merged, but discussion on how to to prevent streams from starving is ENCOURAGED, a compatible change by turning on new feature is also acceptable.

@cs8425
Copy link
Author

cs8425 commented Mar 22, 2019

@xtaci
This PR is a compatible change by switching new feature on/off, also my old one did.

We have:

  1. race between parallel Stream.Write(), compatible with old version
  2. fix buggy keepalive implement, if need compatible with old version, won't fully fix issue
  3. implement stream token, NOT compatible with old version, can be turn off for compatible. But if we still have session token limit and DO NOT want any breaking changes, this issue will NEVER FIX.

@jannson
Copy link
Contributor

jannson commented Mar 22, 2019

@cs8425 忙就暂停点时间,好事多磨。最近一直在使用你的分支体验,明显感觉是比 master 分支体验好,没有突然卡顿的现象。
在不使用单元测试的现实情况,我是能重现 master 分支的 "fast-write-slow-read" 问题的,所以个人体验比较明显。
但 master 分支代码足够好,别人很难体验得到。我有一个 20K 的在线用户的公有服务使用了 smux,没有用户反馈这样的问题。

@xtaci
Copy link
Owner

xtaci commented Mar 22, 2019

the essence of this problem is we need a per-stream flow control

@cs8425
Copy link
Author

cs8425 commented Mar 22, 2019

我打算先fork出一份kcptun
把smux套上這PR個的修復
順便再加點小功能(socks5)
看有沒有人要當白老鼠試驗試驗吧@@
已fork+更新完畢: https://github.com/cs8425/kcptunB/releases

@xtaci

the essence of this problem is we need a per-stream flow control

But the question is that we can't do it without add new packet ID.
And any new packet ID will cause old version drop the session....
Or we must add a layer inside each stream.
Each Write() make a new data frame (type, length, data), and insert control frame to pause/resume/limit Write() call.
This will be very inefficient.

@jannson
Copy link
Contributor

jannson commented Apr 30, 2019

@cs8425 今天有一个场景确实需要 CloseWrite/CloseRead 的功能,跟你说一声。之前我不知道在哪个地方回复说不需要。打脸了。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants