Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

看了好多遍都没看懂,这个地方没有错误吗 #87

Open
sgolang opened this issue Sep 22, 2021 · 15 comments
Open

看了好多遍都没看懂,这个地方没有错误吗 #87

sgolang opened this issue Sep 22, 2021 · 15 comments

Comments

@sgolang
Copy link

sgolang commented Sep 22, 2021

这个问题我想了很久也没想明白,恳请大侠指点一下。
因为我不相信这个程序这里会有错误。太抓狂了。

// session.go
// shaper shapes the sending sequence among streams
func (s *Session) shaperLoop() {
	var reqs shaperHeap
	var next writeRequest
	var chWrite chan writeRequest

	for {
		if len(reqs) > 0 {
			chWrite = s.writes
			next = heap.Pop(&reqs).(writeRequest) // 从队列里取一个数据准备发送
		} else {
			chWrite = nil
		}

		select {
		case <-s.die:
			return
		case r := <-s.shaper:  //有新的数据,就把刚刚从队列里取出的数据放回去,问题是放回去顺序不就乱了吗
			if chWrite != nil { // next is valid, reshape
				heap.Push(&reqs, next)
			}
			heap.Push(&reqs, r)
		case chWrite <- next:
		}
	}
}

我的理解 有prio = 1 的一种数据 a1 a2 a3 a4 a5,prio =2 的一种数据 b1,b2,b3,b4,b5 。只要保证 prio相同的数据的发送顺序即可。a、b两种数据可以优先发送a数据。上面的操作把数据取出来,再放回去不是打算了数据a这种数据的顺序了吗?
我写了个小程序测试了一下。程序和结果如下:

package main

import (
	"container/heap"
	"fmt"
	"math/rand"
	"time"
)

type writeRequest struct {
	prio int
	data string
}

type shaperHeap []writeRequest

func (h shaperHeap) Len() int            { return len(h) }
func (h shaperHeap) Less(i, j int) bool  { return h[i].prio < h[j].prio }
func (h shaperHeap) Swap(i, j int)       { h[i], h[j] = h[j], h[i] }
func (h *shaperHeap) Push(x interface{}) { *h = append(*h, x.(writeRequest)) }

func (h *shaperHeap) Pop() interface{} {
	old := *h
	n := len(old)
	x := old[n-1]
	*h = old[0 : n-1]
	return x
}

type Session struct {
	die    chan struct{}
	shaper chan writeRequest // a shaper for writing
	writes chan writeRequest
}

func NewSession() *Session {
	s := &Session{}
	s.writes = make(chan writeRequest)
	s.shaper = make(chan writeRequest)
	s.die = make(chan struct{})
	return s
}

func (s *Session) Consume() {

	for {
		time.Sleep(300 * time.Millisecond)
		select {
		case <-s.die:
			return
		case wr := <-s.writes:
			fmt.Printf("[-]% d %s\n", wr.prio, wr.data)

		}
	}
}

func (s *Session) Write(wr writeRequest) {

	select {
	case s.shaper <- wr:
		fmt.Printf("[+] %d %s\n", wr.prio, wr.data)
	case <-s.die:
		return
	}

}

// shaper shapes the sending sequence among streams
func (s *Session) shaperLoop() {
	var reqs shaperHeap
	var next writeRequest
	var chWrite chan writeRequest

	for {
		if len(reqs) > 0 {
			chWrite = s.writes
			next = heap.Pop(&reqs).(writeRequest)
		} else {
			chWrite = nil
		}

		select {
		case <-s.die:
			return
		case r := <-s.shaper:
			if chWrite != nil { // next is valid, reshape
				heap.Push(&reqs, next)
			}
			heap.Push(&reqs, r)
		case chWrite <- next:
		}
	}
}

func main() {
	sess := NewSession()
	go sess.shaperLoop()

	go sess.Consume()

	rand.Seed(time.Now().UnixNano())

	go func() {

		a1 := writeRequest{1, "a1"}
		a2 := writeRequest{1, "a2"}
		a3 := writeRequest{1, "a3"}
		a4 := writeRequest{1, "a4"}
		a5 := writeRequest{1, "a5"}

		b1 := writeRequest{2, "b1"}
		b2 := writeRequest{2, "b2"}
		b3 := writeRequest{2, "b3"}
		b4 := writeRequest{2, "b4"}
		b5 := writeRequest{2, "b5"}

		sa := []writeRequest{a1, a2, a3, a4, a5}
		sb := []writeRequest{b1, b2, b3, b4, b5}
		_ = sa
		_ = sb
		for {
			x := rand.Intn(2)
			// fmt.Println(x)
			if len(sa) == 0 && len(sb) == 0 {
				break
			}
			if x == 0 {
				if len(sa) > 0 {
					aa := sa[0]
					sa = sa[1:]
					sess.Write(aa)
				}
			} else {
				if len(sb) > 0 {
					bb := sb[0]
					sb = sb[1:]
					sess.Write(bb)
				}

			}
		}

	}()

	time.Sleep(30 * time.Second)
}

结果是这样的:

[+] 1 a1
[+] 2 b1
[+] 2 b2
[+] 1 a2
[+] 1 a3
[+] 2 b3
[+] 2 b4
[+] 1 a4
[+] 1 a5
[+] 2 b5
[-] 1 a2
[-] 1 a5
[-] 1 a4
[-] 1 a3
[-] 1 a1
[-] 2 b2
[-] 2 b3
[-] 2 b5
[-] 2 b4
[-] 2 b1
@xtaci
Copy link
Owner

xtaci commented Sep 22, 2021

每个流都是独立的,所以流之间乱序没有关系,这反而是shaper的目的,如果是单一流,可能确实肯定存在一个prio变量overflow导致的乱序问题。

@sgolang
Copy link
Author

sgolang commented Sep 22, 2021

又看了下代码在每个stream内部,使用的是numWritten作为writeRequest结构体中的prio,所以在stream内部怎么重新排列都可以保证发送的顺序。 这个问题可以关闭了。

感谢您的回复,您的代码让我学到了很多!

func (s *Stream) writeV2(b []byte) (n int, err error) {
...
			for len(bts) > 0 {
				sz := len(bts)
				if sz > s.frameSize {
					sz = s.frameSize
				}
				frame.data = bts[:sz]
				bts = bts[sz:]
				n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
				atomic.AddUint32(&s.numWritten, uint32(sz))
				sent += n
				if err != nil {
					return sent, err
				}
			}
		}

@xtaci
Copy link
Owner

xtaci commented Sep 22, 2021

嗯,这个的prio溢出可能性是很高的,还是需要修复一下。

@sgolang
Copy link
Author

sgolang commented Sep 22, 2021

这个overflow的问题怎么保证呢,每个stream 发送 1 << 32 字节后就不行了。

@xtaci
Copy link
Owner

xtaci commented Sep 22, 2021

因为stream.Write本身是阻塞函数,那么出现这个情况只可能是发送一个刚好在 1<<32附近的数据导致乱序,那么只需要做 无符号判断即可。

@sgolang
Copy link
Author

sgolang commented Sep 23, 2021

我怎么觉得这里的判断有问题呢?
···
// shaper.go
package smux

func _itimediff(later, earlier uint32) int32 {
return (int32)(later - earlier)
}
···

比如说某stream用来传输一个大文件。

在这个stream 里,存在三个writeRequest 在队列里,分别为:
prio = (1 << 32)- 32768 *2
prio = (1 << 32)- 32768
prio = (1 << 32)+ 1024

等待发送,最后一个溢出了。这样判断还还能保证顺序吗?

使用发送的字节数作为prio 真的合理吗?
假如长期存在以下两个stream。
另一个streamB 存在很久了,且用来发送大文件,那么它的prio增长就很快。
一个streamA是新创建的,每次只发送很小的数据,那么这个stream 里的prio增长就比较慢。

在 shaperHeap 中存在很多这样的数据。
reshape的时候,是不是Stream B 一直得不到发送的机会?

@xtaci
Copy link
Owner

xtaci commented Sep 23, 2021

我怎么觉得这里的判断有问题呢?
···
// shaper.go
package smux

func _itimediff(later, earlier uint32) int32 {
return (int32)(later - earlier)
}
···

比如说某stream用来传输一个大文件。

在这个stream 里,存在三个writeRequest 在队列里,分别为:
prio = (1 << 32)- 32768 *2
prio = (1 << 32)- 32768
prio = (1 << 32)+ 1024

等待发送,最后一个溢出了。这样判断还还能保证顺序吗?

成立,你可以做 go test -v -run Shaper

使用发送的字节数作为prio 真的合理吗?
假如长期存在以下两个stream。
另一个streamB 存在很久了,且用来发送大文件,那么它的prio增长就很快。
一个streamA是新创建的,每次只发送很小的数据,那么这个stream 里的prio增长就比较慢。

在 shaperHeap 中存在很多这样的数据。
reshape的时候,是不是Stream B 一直得不到发送的机会?

数值32bit问题不大,很快就会回绕。

@sgolang
Copy link
Author

sgolang commented Sep 24, 2021

package smux

import (
	"container/heap"
	"testing"
)

func TestShaper(t *testing.T) {

	w1 := writeRequest{prio: 10}
	w2 := writeRequest{prio: 2048}
	w3 := writeRequest{prio: (1 << 32) - 32768*2}
	w4 := writeRequest{prio: (1 << 32) - 32768}

	prioOverflow := uint32((1 << 32) - 32768)
	prioOverflow += 32768
	prioOverflow += 1024

	w5 := writeRequest{prio: prioOverflow}

	var reqs shaperHeap
	heap.Push(&reqs, w5)
	heap.Push(&reqs, w4)
	heap.Push(&reqs, w3)
	heap.Push(&reqs, w2)
	heap.Push(&reqs, w1)

	var lastPrio = reqs[0].prio
	for len(reqs) > 0 {
		w := heap.Pop(&reqs).(writeRequest)
		if int32(w.prio-lastPrio) < 0 {
			t.Fatal("incorrect shaper priority")
		}

		t.Log("prio:", w.prio)
		lastPrio = w.prio
	}
}

go test -run Shaper -v
=== RUN TestShaper
shaper_test.go:43: prio: 4294901760
shaper_test.go:43: prio: 4294934528
shaper_test.go:43: prio: 10
shaper_test.go:43: prio: 1024
shaper_test.go:43: prio: 2048
--- PASS: TestShaper (0.00s)
PASS
ok _/Users/jason/code/smux 1.744s

您的意思是只要保证溢出数据附近的数据顺序正确就可以是吧。
感谢您的回复,我是个业余程序员,读代码也是技能,恕我愚钝,耽误您时间。
之前看第一版的学到了很多知识,比如分包组包、流复用,也用在了自己写的工具上,这个版本总算也快搞明白了。
感谢无私共享和指教!

@sgolang
Copy link
Author

sgolang commented Sep 24, 2021

有两个问题请您解答一下:

  1. stream.Write 是阻塞的,func (s *Session) writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧?
  2. 这个结构体里writeRequest.result 为什么要用带缓存的channel呢?
// internal writeFrame version to support deadline used in keepalive
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) {
	req := writeRequest{
		prio:   prio,
		frame:  f,
		result: make(chan writeResult, 1), //为什么要用带缓存的channel呢?这个地方直接make(chan writeResult)会影响速度吗?
	}

@xtaci
Copy link
Owner

xtaci commented Sep 24, 2021

有两个问题请您解答一下:

  1. stream.Write 是阻塞的,func (s *Session) writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧?
    队列里可以有多个,阻塞是函数不保证立即返回。
  1. 这个结构体里writeRequest.result 为什么要用带缓存的channel呢?
// internal writeFrame version to support deadline used in keepalive
func (s *Session) writeFrameInternal(f Frame, deadline <-chan time.Time, prio uint32) (int, error) {
	req := writeRequest{
		prio:   prio,
		frame:  f,
		result: make(chan writeResult, 1), //为什么要用带缓存的channel呢?这个地方直接make(chan writeResult)会影响速度吗?
	}

结果就一个,避免阻塞啊。

@sgolang
Copy link
Author

sgolang commented Sep 24, 2021

即使把所有的cmdPSH 数据的writeResult 的prio 设置为1,也能保证数据发送顺序

stream.Write 是阻塞的,Session.writeFrameInternal()也是阻塞的.就是说某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,(可以同时有cmdNOP、cmdUPD)我理解的没错吧?
队列里可以有多个,阻塞是函数不保证立即返回。

队列里可以有多个,阻塞是函数不保证立即返回。

我做了个测试,某一个stream 里(sid相同)的数据,没有并发调用 Write(不应该并发调用吧?) 函数的情况下,实际上同一时刻只有一个cmdPSH数据 在shaperHeap队列里。即使把所有的cmdPSH 数据的writeResult 的prio 设置为1,也能保证数据发送顺序,不会出错。

您说的队列里可以有多个

  1. 在多个stream里队列里肯定有多个
  2. 同一个stream里,可以有多个的情况是,还有 cmdNOP、cmdUPD等数据。

@sgolang
Copy link
Author

sgolang commented Sep 25, 2021

把shaper.go中比较函数改成随机的,删除shaper_test.go,也能测试通过。

//shaper.go
package smux

import "math/rand"

func _itimediff(later, earlier uint32) int32 {
	x := rand.Intn(2)
	return int32(x)

	// return (int32)(later - earlier)
}

···
jason@MacBook-Air ~/g/s/g/x/smux (master)> go test -v
=== RUN TestAllocGet
--- PASS: TestAllocGet (0.00s)
=== RUN TestAllocPut
--- PASS: TestAllocPut (0.00s)
=== RUN TestAllocPutThenGet
--- PASS: TestAllocPutThenGet (0.00s)
=== RUN TestConfig
mux_test.go:23: keep-alive interval must be positive
mux_test.go:32: keep-alive timeout must be larger than keep-alive interval
mux_test.go:40: max frame size must be positive
mux_test.go:48: max frame size must not be larger than 65535
mux_test.go:56: max receive buffer must be positive
mux_test.go:64: max stream buffer must be positive
mux_test.go:73: max stream buffer must not be larger than max receive buffer
--- PASS: TestConfig (0.00s)
=== RUN TestEcho
--- PASS: TestEcho (0.01s)
=== RUN TestWriteTo
--- PASS: TestWriteTo (0.03s)
=== RUN TestWriteToV2
--- PASS: TestWriteToV2 (0.02s)
=== RUN TestGetDieCh
--- PASS: TestGetDieCh (0.00s)
=== RUN TestSpeed
session_test.go:315: 127.0.0.1:56096 127.0.0.1:56095
session_test.go:336: time for 16MB rtt 59.936929ms
--- PASS: TestSpeed (0.06s)
=== RUN TestParallel
session_test.go:374: created 501 streams
--- PASS: TestParallel (1.49s)
=== RUN TestParallelV2
session_test.go:408: created 605 streams
--- PASS: TestParallelV2 (1.48s)
=== RUN TestCloseThenOpen
--- PASS: TestCloseThenOpen (0.00s)
=== RUN TestSessionDoubleClose
--- PASS: TestSessionDoubleClose (0.00s)
=== RUN TestStreamDoubleClose
--- PASS: TestStreamDoubleClose (0.00s)
=== RUN TestConcurrentClose
--- PASS: TestConcurrentClose (0.00s)
=== RUN TestTinyReadBuffer
--- PASS: TestTinyReadBuffer (0.01s)
=== RUN TestIsClose
--- PASS: TestIsClose (0.00s)
=== RUN TestKeepAliveTimeout
--- PASS: TestKeepAliveTimeout (3.00s)
=== RUN TestKeepAliveBlockWriteTimeout
--- PASS: TestKeepAliveBlockWriteTimeout (3.00s)
=== RUN TestServerEcho
--- PASS: TestServerEcho (0.02s)
=== RUN TestSendWithoutRecv
--- PASS: TestSendWithoutRecv (0.00s)
=== RUN TestWriteAfterClose
--- PASS: TestWriteAfterClose (0.00s)
=== RUN TestReadStreamAfterSessionClose
session_test.go:703: EOF
--- PASS: TestReadStreamAfterSessionClose (0.00s)
=== RUN TestWriteStreamAfterConnectionClose
--- PASS: TestWriteStreamAfterConnectionClose (0.00s)
=== RUN TestNumStreamAfterClose
--- PASS: TestNumStreamAfterClose (0.00s)
=== RUN TestRandomFrame
--- PASS: TestRandomFrame (0.01s)
=== RUN TestWriteFrameInternal
--- PASS: TestWriteFrameInternal (1.01s)
=== RUN TestReadDeadline
--- PASS: TestReadDeadline (0.00s)
=== RUN TestWriteDeadline
--- PASS: TestWriteDeadline (0.00s)
PASS
ok github.com/xtaci/smux 13.257s
···

@xtaci
Copy link
Owner

xtaci commented Sep 25, 2021

都是正数,当然能过哈

@xtaci
Copy link
Owner

xtaci commented Sep 25, 2021

无符号数转换为带符号数,比如uint32 1<<32 - 1 ,cast成 int32,就会负了。

@sgolang
Copy link
Author

sgolang commented Sep 25, 2021

都是正数,当然能过哈

//shaper.go
package smux

import "math/rand"

func _itimediff(later, earlier uint32) int32 {
	x := rand.Intn(2)
	return int32(x) // 这里返回0 或者1 

	// return (int32)(later - earlier)
}
//shaper.go 这里比较的是大于0
func (h shaperHeap) Less(i, j int) bool  { return _itimediff(h[j].prio, h[i].prio) > 0 }

因为 你的测试函数,TestWriteToV2,这里有比较发送和接受的数据是否一致,能测试通过说明数据顺序是正确的。原因就是某一个stream 在这个 shaperHeap []writeRequest 队列里,最多只有一个cmdPSH数据,

func TestWriteToV2(t *testing.T) {
......

       
	if bytes.Compare(sndbuf, rcvbuf.Bytes()) != 0 {
		t.Fatal("mismatched echo bytes")
	}
}

我的意思是说把s.numWritten 作为 writeFrameInternal函数的prio参数,并没有发挥作用啊。

func (s *Stream) writeV2(b []byte) (n int, err error) {
...
			for len(bts) > 0 {
				sz := len(bts)
				if sz > s.frameSize {
					sz = s.frameSize
				}
				frame.data = bts[:sz]
				bts = bts[sz:]
				n, err := s.sess.writeFrameInternal(frame, deadline, uint64(atomic.LoadUint32(&s.numWritten)))
				atomic.AddUint32(&s.numWritten, uint32(sz))
				sent += n
				if err != nil {
					return sent, err
				}
			}
		}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants