From 9942ef2bd0dab43249a0716f9f76fe68711fb3f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=BA=90=E6=96=87=E9=9B=A8?= <41315874+fumiama@users.noreply.github.com> Date: Sun, 11 Aug 2024 21:35:59 +0800 Subject: [PATCH] fix(link): duplicate crc recv (#2) --- gold/head/packet.go | 17 ++++++- gold/link/listen.go | 114 +++++++++++++++++++++++++++++++------------- gold/link/recv.go | 18 ++++--- gold/link/send.go | 12 ++--- 4 files changed, 109 insertions(+), 52 deletions(-) diff --git a/gold/head/packet.go b/gold/head/packet.go index ca4975d..2b89385 100644 --- a/gold/head/packet.go +++ b/gold/head/packet.go @@ -147,17 +147,19 @@ func (p *Packet) Marshal(src net.IP, teatype uint8, additional uint16, datasz ui if hasmore { offset |= 0x2000 } + p.Flags = PacketFlags(offset) return helper.OpenWriterF(func(w *helper.Writer) { w.WriteUInt32(p.idxdatsz) w.WriteUInt16((uint16(p.TTL) << 8) | uint16(p.Proto)) w.WriteUInt16(p.SrcPort) w.WriteUInt16(p.DstPort) - w.WriteUInt16(uint16(PacketFlags(offset))) + w.WriteUInt16(uint16(p.Flags)) w.Write(p.Src.To4()) w.Write(p.Dst.To4()) w.Write(p.Hash[:]) - w.WriteUInt64(CalcCRC64(w.Bytes())) + p.crc64 = CalcCRC64(w.Bytes()) + w.WriteUInt64(p.crc64) w.Write(p.Body()) }) } @@ -213,6 +215,10 @@ func (p *Packet) Put() { PutPacket(p) } +func (p *Packet) CRC64() uint64 { + return p.crc64 +} + // Body returns data func (p *Packet) Body() []byte { return p.data[p.a:p.b] @@ -256,3 +262,10 @@ func (p *Packet) Copy() *Packet { newp.buffered = false return newp } + +func (p *Packet) CopyWithBody() *Packet { + newp := p.Copy() + newp.data = helper.MakeBytes(len(p.data)) + copy(newp.data, p.data) + return newp +} diff --git a/gold/link/listen.go b/gold/link/listen.go index 8fb936b..b3c7351 100644 --- a/gold/link/listen.go +++ b/gold/link/listen.go @@ -21,6 +21,52 @@ import ( const lstnbufgragsz = 65536 +type lstnq struct { + index int + addr p2p.EndPoint + buf []byte +} + +type listenqueue chan lstnq + +func (q listenqueue) listen(m *Me, hasntfinished []sync.Mutex) { + recvtotlcnt := uint64(0) + recvloopcnt := uint16(0) + recvlooptime := time.Now().UnixMilli() + for lstn := range q { + recvtotlcnt += uint64(len(lstn.buf)) + recvloopcnt++ + if recvloopcnt%m.speedloop == 0 { + now := time.Now().UnixMilli() + logrus.Infof("[listen] queue recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime)) + recvtotlcnt = 0 + recvlooptime = now + } + packet := m.wait(lstn.buf[:len(lstn.buf):lstnbufgragsz]) + if packet == nil { + if lstn.index < 0 { + if config.ShowDebugLog { + logrus.Debugln("[listen] queue waiting") + } + helper.PutBytes(lstn.buf) + continue + } + if config.ShowDebugLog { + logrus.Debugln("[listen] queue waiting, unlock index", lstn.index) + } + hasntfinished[lstn.index].Unlock() + continue + } + if lstn.index >= 0 { + go m.dispatch(packet, lstn.addr, lstn.index, hasntfinished[lstn.index].Unlock) + } else { + go m.dispatch(packet, lstn.addr, lstn.index, func() { + helper.PutBytes(lstn.buf) + }) + } + } +} + // 监听本机 endpoint func (m *Me) listen() (conn p2p.Conn, err error) { conn, err = m.ep.Listen() @@ -30,29 +76,40 @@ func (m *Me) listen() (conn p2p.Conn, err error) { m.ep = conn.LocalAddr() logrus.Infoln("[listen] at", m.ep) go func() { - recvtotlcnt := uint64(0) - recvloopcnt := uint16(0) - recvlooptime := time.Now().UnixMilli() - n := runtime.NumCPU() + n := uint(runtime.NumCPU()) if n > 64 { n = 64 // 只用最多 64 核 } logrus.Infoln("[listen] use cpu num:", n) - listenbuff := make([]byte, lstnbufgragsz*n) + listenbuf := make([]byte, lstnbufgragsz*n) hasntfinished := make([]sync.Mutex, n) - for i := 0; err == nil; i++ { + q := make(listenqueue, n) + defer close(q) + go q.listen(m, hasntfinished) + i := uint(0) + for { + usenewbuf := false i %= n for !hasntfinished[i].TryLock() { i++ i %= n - if i == 0 { // looked up a full round - time.Sleep(time.Millisecond * 10) + if i == 0 { // looked up a full round, make a new buf + usenewbuf = true + if config.ShowDebugLog { + logrus.Debugln("[listen] use new buf") + } + break } } - if config.ShowDebugLog { + if config.ShowDebugLog && !usenewbuf { logrus.Debugln("[listen] lock index", i) } - lbf := listenbuff[i*lstnbufgragsz : (i+1)*lstnbufgragsz] + var lbf []byte + if usenewbuf { + lbf = helper.MakeBytes(lstnbufgragsz) + } else { + lbf = listenbuf[i*lstnbufgragsz : (i+1)*lstnbufgragsz] + } n, addr, err := conn.ReadFromPeer(lbf) if m.connections == nil || errors.Is(err, net.ErrClosed) { logrus.Warnln("[listen] quit listening") @@ -65,31 +122,24 @@ func (m *Me) listen() (conn p2p.Conn, err error) { logrus.Errorln("[listen] reconnect udp err:", err) return } - if config.ShowDebugLog { - logrus.Debugln("[listen] unlock index", i) + if !usenewbuf { + if config.ShowDebugLog { + logrus.Debugln("[listen] unlock index", i) + } + hasntfinished[i].Unlock() + i-- } - hasntfinished[i].Unlock() - i-- continue } - recvtotlcnt += uint64(n) - recvloopcnt++ - if recvloopcnt%m.speedloop == 0 { - now := time.Now().UnixMilli() - logrus.Infof("[listen] recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime)) - recvtotlcnt = 0 - recvlooptime = now + lq := lstnq{ + index: -1, + addr: addr, + buf: lbf[:n], } - packet := m.wait(lbf[:n:lstnbufgragsz]) - if packet == nil { - if config.ShowDebugLog { - logrus.Debugln("[listen] waiting, unlock index", i) - } - hasntfinished[i].Unlock() - i-- - continue + if !usenewbuf { + lq.index = int(i) } - go m.dispatch(packet, addr, i, hasntfinished[i].Unlock) + q <- lq } }() return @@ -193,7 +243,7 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish packet.Put() case head.ProtoData: if p.pipe != nil { - p.pipe <- packet + p.pipe <- packet.CopyWithBody() if config.ShowDebugLog { logrus.Debugln("[listen] @", index, "deliver to pipe of", p.peerip) } @@ -204,8 +254,8 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish } else if config.ShowDebugLog { logrus.Debugln("[listen] @", index, "deliver", packet.BodyLen(), "bytes data to nic") } - packet.Put() } + packet.Put() default: logrus.Warnln("[listen] @", index, "recv unknown proto:", packet.Proto) packet.Put() diff --git a/gold/link/recv.go b/gold/link/recv.go index 8f46196..90496b4 100644 --- a/gold/link/recv.go +++ b/gold/link/recv.go @@ -52,17 +52,17 @@ func (m *Me) wait(data []byte) *head.Packet { } return nil } + m.recved.Set(crc, true) if config.ShowDebugLog { - logrus.Debugln("[recv]", len(data), "bytes data with flag", hex.EncodeToString(data[11:12]), hex.EncodeToString(data[10:11])) + logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), len(data), "bytes data with flag", hex.EncodeToString(data[11:12]), hex.EncodeToString(data[10:11])) } if flags.IsSingle() || flags.NoFrag() { h := head.SelectPacket() _, err := h.Unmarshal(data) if err != nil { - logrus.Errorln("[recv] unmarshal err:", err) + logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unmarshal err:", err) return nil } - m.recved.Set(crc, true) return h } @@ -75,35 +75,33 @@ func (m *Me) wait(data []byte) *head.Packet { h := m.recving.Get(hsh) if h != nil { if config.ShowDebugLog { - logrus.Debugln("[recv] get another frag part of", strconv.FormatUint(hsh, 16)) + logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "get another frag part of", strconv.FormatUint(hsh, 16)) } ok, err := h.Unmarshal(data) if err == nil { if ok { m.recving.Delete(hsh) - m.recved.Set(crc, true) if config.ShowDebugLog { - logrus.Debugln("[recv] all parts of", strconv.FormatUint(hsh, 16), "has reached") + logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "all parts of", strconv.FormatUint(hsh, 16), "has reached") } return h } } else { h.Put() - logrus.Errorln("[recv] unmarshal err:", err) + logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unmarshal err:", err) } return nil } if config.ShowDebugLog { - logrus.Debugln("[recv] get new frag part of", strconv.FormatUint(hsh, 16)) + logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "get new frag part of", strconv.FormatUint(hsh, 16)) } h = head.SelectPacket() _, err := h.Unmarshal(data) if err != nil { h.Put() - logrus.Errorln("[recv] unmarshal err:", err) + logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unmarshal err:", err) return nil } m.recving.Set(hsh, h) - m.recved.Set(crc, true) return nil } diff --git a/gold/link/send.go b/gold/link/send.go index 8631112..1bcf61e 100644 --- a/gold/link/send.go +++ b/gold/link/send.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "math/rand" - "time" "github.com/klauspost/compress/zstd" "github.com/sirupsen/logrus" @@ -30,7 +29,8 @@ func (l *Link) WriteAndPut(p *head.Packet, istransfer bool) (n int, err error) { teatype := l.randkeyidx() sndcnt := uint16(l.incgetsndcnt()) var buf [4]byte - _, _ = crand.Read(buf[:]) + _, _ = crand.Read(buf[:2]) + binary.BigEndian.PutUint16(buf[2:4], sndcnt) seq := binary.BigEndian.Uint32(buf[:]) mtu := l.mtu if l.mturandomrange > 0 { @@ -114,11 +114,7 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui return 0, ErrTTL } if l.doublepacket { - cpp := p.Copy() - _ = time.AfterFunc(time.Millisecond*(10+time.Duration(rand.Intn(40))), func() { - defer cpp.Put() - _, _ = l.writeonce(cpp, teatype, additional, datasz, offset, istransfer, hasmore, seq) - }) + _, _ = l.writeonce(p, teatype, additional, datasz, offset, istransfer, hasmore, seq) } return l.writeonce(p, teatype, additional, datasz, offset, istransfer, hasmore, seq) } @@ -147,7 +143,7 @@ func (l *Link) writeonce(p *head.Packet, teatype uint8, additional uint16, datas endl = "." } if config.ShowDebugLog { - logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.conn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset)) + logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.conn.LocalAddr(), "to", peerep, "offset", fmt.Sprintf("%04x", offset), "crc", fmt.Sprintf("%016x", p.CRC64())) logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl) } d = l.me.xorenc(d, seq)