Skip to content

Commit

Permalink
fix(link): duplicate crc recv (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
fumiama authored Aug 11, 2024
1 parent bd5c009 commit 9942ef2
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 52 deletions.
17 changes: 15 additions & 2 deletions gold/head/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,17 +147,19 @@ func (p *Packet) Marshal(src net.IP, teatype uint8, additional uint16, datasz ui
if hasmore {
offset |= 0x2000
}
p.Flags = PacketFlags(offset)

return helper.OpenWriterF(func(w *helper.Writer) {
w.WriteUInt32(p.idxdatsz)
w.WriteUInt16((uint16(p.TTL) << 8) | uint16(p.Proto))
w.WriteUInt16(p.SrcPort)
w.WriteUInt16(p.DstPort)
w.WriteUInt16(uint16(PacketFlags(offset)))
w.WriteUInt16(uint16(p.Flags))
w.Write(p.Src.To4())
w.Write(p.Dst.To4())
w.Write(p.Hash[:])
w.WriteUInt64(CalcCRC64(w.Bytes()))
p.crc64 = CalcCRC64(w.Bytes())
w.WriteUInt64(p.crc64)
w.Write(p.Body())
})
}
Expand Down Expand Up @@ -213,6 +215,10 @@ func (p *Packet) Put() {
PutPacket(p)
}

func (p *Packet) CRC64() uint64 {
return p.crc64
}

// Body returns data
func (p *Packet) Body() []byte {
return p.data[p.a:p.b]
Expand Down Expand Up @@ -256,3 +262,10 @@ func (p *Packet) Copy() *Packet {
newp.buffered = false
return newp
}

func (p *Packet) CopyWithBody() *Packet {
newp := p.Copy()
newp.data = helper.MakeBytes(len(p.data))
copy(newp.data, p.data)
return newp
}
114 changes: 82 additions & 32 deletions gold/link/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,52 @@ import (

const lstnbufgragsz = 65536

type lstnq struct {
index int
addr p2p.EndPoint
buf []byte
}

type listenqueue chan lstnq

func (q listenqueue) listen(m *Me, hasntfinished []sync.Mutex) {
recvtotlcnt := uint64(0)
recvloopcnt := uint16(0)
recvlooptime := time.Now().UnixMilli()
for lstn := range q {
recvtotlcnt += uint64(len(lstn.buf))
recvloopcnt++
if recvloopcnt%m.speedloop == 0 {
now := time.Now().UnixMilli()
logrus.Infof("[listen] queue recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime))
recvtotlcnt = 0
recvlooptime = now
}
packet := m.wait(lstn.buf[:len(lstn.buf):lstnbufgragsz])
if packet == nil {
if lstn.index < 0 {
if config.ShowDebugLog {
logrus.Debugln("[listen] queue waiting")
}
helper.PutBytes(lstn.buf)
continue
}
if config.ShowDebugLog {
logrus.Debugln("[listen] queue waiting, unlock index", lstn.index)
}
hasntfinished[lstn.index].Unlock()
continue
}
if lstn.index >= 0 {
go m.dispatch(packet, lstn.addr, lstn.index, hasntfinished[lstn.index].Unlock)
} else {
go m.dispatch(packet, lstn.addr, lstn.index, func() {
helper.PutBytes(lstn.buf)
})
}
}
}

// 监听本机 endpoint
func (m *Me) listen() (conn p2p.Conn, err error) {
conn, err = m.ep.Listen()
Expand All @@ -30,29 +76,40 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
m.ep = conn.LocalAddr()
logrus.Infoln("[listen] at", m.ep)
go func() {
recvtotlcnt := uint64(0)
recvloopcnt := uint16(0)
recvlooptime := time.Now().UnixMilli()
n := runtime.NumCPU()
n := uint(runtime.NumCPU())
if n > 64 {
n = 64 // 只用最多 64 核
}
logrus.Infoln("[listen] use cpu num:", n)
listenbuff := make([]byte, lstnbufgragsz*n)
listenbuf := make([]byte, lstnbufgragsz*n)
hasntfinished := make([]sync.Mutex, n)
for i := 0; err == nil; i++ {
q := make(listenqueue, n)
defer close(q)
go q.listen(m, hasntfinished)
i := uint(0)
for {
usenewbuf := false
i %= n
for !hasntfinished[i].TryLock() {
i++
i %= n
if i == 0 { // looked up a full round
time.Sleep(time.Millisecond * 10)
if i == 0 { // looked up a full round, make a new buf
usenewbuf = true
if config.ShowDebugLog {
logrus.Debugln("[listen] use new buf")
}
break
}
}
if config.ShowDebugLog {
if config.ShowDebugLog && !usenewbuf {
logrus.Debugln("[listen] lock index", i)
}
lbf := listenbuff[i*lstnbufgragsz : (i+1)*lstnbufgragsz]
var lbf []byte
if usenewbuf {
lbf = helper.MakeBytes(lstnbufgragsz)
} else {
lbf = listenbuf[i*lstnbufgragsz : (i+1)*lstnbufgragsz]
}
n, addr, err := conn.ReadFromPeer(lbf)
if m.connections == nil || errors.Is(err, net.ErrClosed) {
logrus.Warnln("[listen] quit listening")
Expand All @@ -65,31 +122,24 @@ func (m *Me) listen() (conn p2p.Conn, err error) {
logrus.Errorln("[listen] reconnect udp err:", err)
return
}
if config.ShowDebugLog {
logrus.Debugln("[listen] unlock index", i)
if !usenewbuf {
if config.ShowDebugLog {
logrus.Debugln("[listen] unlock index", i)
}
hasntfinished[i].Unlock()
i--
}
hasntfinished[i].Unlock()
i--
continue
}
recvtotlcnt += uint64(n)
recvloopcnt++
if recvloopcnt%m.speedloop == 0 {
now := time.Now().UnixMilli()
logrus.Infof("[listen] recv avg speed: %.2f KB/s", float64(recvtotlcnt)/float64(now-recvlooptime))
recvtotlcnt = 0
recvlooptime = now
lq := lstnq{
index: -1,
addr: addr,
buf: lbf[:n],
}
packet := m.wait(lbf[:n:lstnbufgragsz])
if packet == nil {
if config.ShowDebugLog {
logrus.Debugln("[listen] waiting, unlock index", i)
}
hasntfinished[i].Unlock()
i--
continue
if !usenewbuf {
lq.index = int(i)
}
go m.dispatch(packet, addr, i, hasntfinished[i].Unlock)
q <- lq
}
}()
return
Expand Down Expand Up @@ -193,7 +243,7 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish
packet.Put()
case head.ProtoData:
if p.pipe != nil {
p.pipe <- packet
p.pipe <- packet.CopyWithBody()
if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "deliver to pipe of", p.peerip)
}
Expand All @@ -204,8 +254,8 @@ func (m *Me) dispatch(packet *head.Packet, addr p2p.EndPoint, index int, finish
} else if config.ShowDebugLog {
logrus.Debugln("[listen] @", index, "deliver", packet.BodyLen(), "bytes data to nic")
}
packet.Put()
}
packet.Put()
default:
logrus.Warnln("[listen] @", index, "recv unknown proto:", packet.Proto)
packet.Put()
Expand Down
18 changes: 8 additions & 10 deletions gold/link/recv.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,17 @@ func (m *Me) wait(data []byte) *head.Packet {
}
return nil
}
m.recved.Set(crc, true)
if config.ShowDebugLog {
logrus.Debugln("[recv]", len(data), "bytes data with flag", hex.EncodeToString(data[11:12]), hex.EncodeToString(data[10:11]))
logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), len(data), "bytes data with flag", hex.EncodeToString(data[11:12]), hex.EncodeToString(data[10:11]))
}
if flags.IsSingle() || flags.NoFrag() {
h := head.SelectPacket()
_, err := h.Unmarshal(data)
if err != nil {
logrus.Errorln("[recv] unmarshal err:", err)
logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unmarshal err:", err)
return nil
}
m.recved.Set(crc, true)
return h
}

Expand All @@ -75,35 +75,33 @@ func (m *Me) wait(data []byte) *head.Packet {
h := m.recving.Get(hsh)
if h != nil {
if config.ShowDebugLog {
logrus.Debugln("[recv] get another frag part of", strconv.FormatUint(hsh, 16))
logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "get another frag part of", strconv.FormatUint(hsh, 16))
}
ok, err := h.Unmarshal(data)
if err == nil {
if ok {
m.recving.Delete(hsh)
m.recved.Set(crc, true)
if config.ShowDebugLog {
logrus.Debugln("[recv] all parts of", strconv.FormatUint(hsh, 16), "has reached")
logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "all parts of", strconv.FormatUint(hsh, 16), "has reached")
}
return h
}
} else {
h.Put()
logrus.Errorln("[recv] unmarshal err:", err)
logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unmarshal err:", err)
}
return nil
}
if config.ShowDebugLog {
logrus.Debugln("[recv] get new frag part of", strconv.FormatUint(hsh, 16))
logrus.Debugln("[recv]", strconv.FormatUint(crc, 16), "get new frag part of", strconv.FormatUint(hsh, 16))
}
h = head.SelectPacket()
_, err := h.Unmarshal(data)
if err != nil {
h.Put()
logrus.Errorln("[recv] unmarshal err:", err)
logrus.Errorln("[recv]", strconv.FormatUint(crc, 16), "unmarshal err:", err)
return nil
}
m.recving.Set(hsh, h)
m.recved.Set(crc, true)
return nil
}
12 changes: 4 additions & 8 deletions gold/link/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"math/rand"
"time"

"github.com/klauspost/compress/zstd"
"github.com/sirupsen/logrus"
Expand All @@ -30,7 +29,8 @@ func (l *Link) WriteAndPut(p *head.Packet, istransfer bool) (n int, err error) {
teatype := l.randkeyidx()
sndcnt := uint16(l.incgetsndcnt())
var buf [4]byte
_, _ = crand.Read(buf[:])
_, _ = crand.Read(buf[:2])
binary.BigEndian.PutUint16(buf[2:4], sndcnt)
seq := binary.BigEndian.Uint32(buf[:])
mtu := l.mtu
if l.mturandomrange > 0 {
Expand Down Expand Up @@ -114,11 +114,7 @@ func (l *Link) write(p *head.Packet, teatype uint8, additional uint16, datasz ui
return 0, ErrTTL
}
if l.doublepacket {
cpp := p.Copy()
_ = time.AfterFunc(time.Millisecond*(10+time.Duration(rand.Intn(40))), func() {
defer cpp.Put()
_, _ = l.writeonce(cpp, teatype, additional, datasz, offset, istransfer, hasmore, seq)
})
_, _ = l.writeonce(p, teatype, additional, datasz, offset, istransfer, hasmore, seq)
}
return l.writeonce(p, teatype, additional, datasz, offset, istransfer, hasmore, seq)
}
Expand Down Expand Up @@ -147,7 +143,7 @@ func (l *Link) writeonce(p *head.Packet, teatype uint8, additional uint16, datas
endl = "."
}
if config.ShowDebugLog {
logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.conn.LocalAddr(), "to", peerep, "offset:", fmt.Sprintf("%04x", offset))
logrus.Debugln("[send] write", len(d), "bytes data from ep", l.me.conn.LocalAddr(), "to", peerep, "offset", fmt.Sprintf("%04x", offset), "crc", fmt.Sprintf("%016x", p.CRC64()))
logrus.Debugln("[send] data bytes", hex.EncodeToString(d[:bound]), endl)
}
d = l.me.xorenc(d, seq)
Expand Down

0 comments on commit 9942ef2

Please sign in to comment.