Skip to content

Commit

Permalink
optimize(client): impl. syncx on resultStore
Browse files Browse the repository at this point in the history
  • Loading branch information
fumiama committed May 7, 2024
1 parent 5ad8b28 commit c3326bd
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 45 deletions.
30 changes: 17 additions & 13 deletions client/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type QQClient struct {
appInfo *info.AppInfo
deviceInfo *info.DeviceInfo
sig *info.SigInfo
signProvider func(string, int, []byte) map[string]string
signProvider func(string, uint32, []byte) map[string]string

pushStore chan *wtlogin.SSOPacket

Expand Down Expand Up @@ -88,7 +88,7 @@ func (c *QQClient) SendOidbPacketAndWait(pkt *oidb.OidbPacket) (*wtlogin.SSOPack
}

func (c *QQClient) SendUniPacket(cmd string, buf []byte) error {
seq := c.getSeq()
seq := c.getAndIncreaseSequence()
var sign map[string]string
if c.signProvider != nil {
sign = c.signProvider(cmd, seq, buf)
Expand All @@ -98,27 +98,27 @@ func (c *QQClient) SendUniPacket(cmd string, buf []byte) error {
}

func (c *QQClient) SendUniPacketAndAwait(cmd string, buf []byte) (*wtlogin.SSOPacket, error) {
seq := c.getSeq()
seq := c.getAndIncreaseSequence()
var sign map[string]string
if c.signProvider != nil {
sign = c.signProvider(cmd, seq, buf)
}
packet := wtlogin.BuildUniPacket(int(c.Uin), seq, cmd, sign, c.appInfo, c.deviceInfo, c.sig, buf)
return c.SendAndWait(packet, seq, 5)
return c.SendAndWait(packet, int(seq), 5)
}

func (c *QQClient) Send(data []byte) error {
return c.tcp.Write(data)
}

func (c *QQClient) SendAndWait(data []byte, seq, timeout int) (*wtlogin.SSOPacket, error) {
resultStore.AddSeq(seq)
func (c *QQClient) SendAndWait(data []byte, seq int, timeout int) (*wtlogin.SSOPacket, error) {
fetcher.AddSeq(seq)
err := c.tcp.Write(data)
if err != nil {
// 出错了要删掉
resultStore.DeleteSeq(seq)
fetcher.DeleteSeq(seq)
}
return resultStore.Fecth(seq, timeout)
return fetcher.Fecth(seq, timeout)
}

func (c *QQClient) SSOHeartbeat(calcLatency bool) int64 {
Expand Down Expand Up @@ -178,17 +178,17 @@ func (c *QQClient) OnMessage(msgLen int) {

if packet.Seq > 0 { // uni rsp
networkLogger.Debugf("%d(%d) -> %s, extra: %s", packet.Seq, packet.RetCode, packet.Cmd, packet.Extra)
if packet.RetCode != 0 && resultStore.ContainSeq(packet.Seq) {
if packet.RetCode != 0 && fetcher.ContainSeq(packet.Seq) {
networkLogger.Errorf("error ssopacket retcode: %d, extra: %s", packet.RetCode, packet.Extra)
return
} else if packet.RetCode != 0 {
networkLogger.Errorf("Unexpected error on sso layer: %d: %s", packet.RetCode, packet.Extra)
return
}
if !resultStore.ContainSeq(packet.Seq) {
if !fetcher.ContainSeq(packet.Seq) {
networkLogger.Warningf("Unknown packet: %s(%d), ignore", packet.Cmd, packet.Seq)
} else {
resultStore.AddResult(packet.Seq, packet)
fetcher.AddResult(packet.Seq, packet)
}
} else { // server pushed
if _, ok := listeners[packet.Cmd]; ok {
Expand Down Expand Up @@ -265,6 +265,10 @@ func (c *QQClient) OnDisconnected() {
c.Online.Store(false)
}

func (c *QQClient) getSeq() int {
return int(atomic.AddUint32(&c.sig.Sequence, 1) % 0x8000)
func (c *QQClient) getAndIncreaseSequence() uint32 {
return atomic.AddUint32(&c.sig.Sequence, 1) % 0x8000
}

func (c *QQClient) getSequence() uint32 {
return atomic.LoadUint32(&c.sig.Sequence) % 0x8000
}
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

var (
resultStore = NewResultStore()
fetcher = newssofetcher()
networkLogger = utils.GetLogger("network")
)

Expand Down
4 changes: 1 addition & 3 deletions client/message.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package client

import (
"sync/atomic"

message2 "github.com/LagrangeDev/LagrangeGo/message"
"github.com/LagrangeDev/LagrangeGo/packets/pb/action"
"github.com/LagrangeDev/LagrangeGo/packets/pb/message"
Expand All @@ -20,7 +18,7 @@ func (c *QQClient) SendRawMessage(route *message.RoutingHead, body *message.Mess
DivSeq: proto.Some(uint32(0)),
},
Body: body,
Seq: proto.Some(atomic.LoadUint32(&c.sig.Sequence)),
Seq: proto.Some(c.getSequence()),
Rand: proto.Some(crypto.RandU32()),
}
// grp_id not null
Expand Down
43 changes: 20 additions & 23 deletions client/resultStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,57 @@ package client

import (
"errors"
"sync"
"time"

"github.com/RomiChan/syncx"

"github.com/LagrangeDev/LagrangeGo/packets/wtlogin"
"github.com/LagrangeDev/LagrangeGo/utils"
)

//nolint:unused
var resultLogger = utils.GetLogger("resultstore")
// var resultLogger = utils.GetLogger("resultstore")

// ResultStore 灵感来源于ddl的onebot适配器
type ResultStore struct {
result sync.Map
}
// ssofetcher 灵感来源于ddl的onebot适配器
type ssofetcher syncx.Map[uint32, chan *wtlogin.SSOPacket]

func NewResultStore() *ResultStore {
return &ResultStore{}
func newssofetcher() *ssofetcher {
return &ssofetcher{}
}

// ContainSeq 判断这个seq是否存在
func (s *ResultStore) ContainSeq(seq int) bool {
_, ok := s.result.Load(seq)
func (s *ssofetcher) ContainSeq(seq int) bool {
_, ok := (*syncx.Map[uint32, chan *wtlogin.SSOPacket])(s).Load(uint32(seq))
return ok
}

// AddSeq 发消息的时候调用,把seq加到map里面
func (s *ResultStore) AddSeq(seq int) {
func (s *ssofetcher) AddSeq(seq int) {
resultChan := make(chan *wtlogin.SSOPacket, 1)
s.result.Store(seq, resultChan)
(*syncx.Map[uint32, chan *wtlogin.SSOPacket])(s).Store(uint32(seq), resultChan)
}

// DeleteSeq 删除seq
func (s *ResultStore) DeleteSeq(seq int) {
s.result.Delete(seq)
func (s *ssofetcher) DeleteSeq(seq int) {
(*syncx.Map[uint32, chan *wtlogin.SSOPacket])(s).Delete(uint32(seq))
}

// AddResult 收到消息的时候调用,返回此seq是否存在,如果存在则存储数据
func (s *ResultStore) AddResult(seq int, data *wtlogin.SSOPacket) bool {
if resultChan, ok := s.result.Load(seq); ok {
resultChan.(chan *wtlogin.SSOPacket) <- data
func (s *ssofetcher) AddResult(seq int, data *wtlogin.SSOPacket) bool {
if resultChan, ok := (*syncx.Map[uint32, chan *wtlogin.SSOPacket])(s).Load(uint32(seq)); ok {
resultChan <- data
return true
}
return false
}

// Fecth 等待获取数据直到超时,这里找不到对应的seq会直接返回错误,务必在发包之前调用 AddSeq,如果发包出错可以 DeleteSeq
func (s *ResultStore) Fecth(seq, timeout int) (*wtlogin.SSOPacket, error) {
if resultChan, ok := s.result.Load(seq); ok {
func (s *ssofetcher) Fecth(seq, timeout int) (*wtlogin.SSOPacket, error) {
if resultChan, ok := (*syncx.Map[uint32, chan *wtlogin.SSOPacket])(s).Load(uint32(seq)); ok {
// 确保读取完删除这个结果
defer s.result.Delete(seq)
defer (*syncx.Map[uint32, chan *wtlogin.SSOPacket])(s).Delete(uint32(seq))
select {
case <-time.After(time.Duration(timeout) * time.Second):
return nil, errors.New("fetch timeout")
case result := <-(resultChan.(chan *wtlogin.SSOPacket)):
case result := <-resultChan:
return result, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ go 1.20

require (
github.com/RomiChan/protobuf v0.1.1-0.20230204044148-2ed269a2e54d
github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7
github.com/fumiama/gofastTEA v0.0.10
github.com/fumiama/imgsz v0.0.4
github.com/mattn/go-colorable v0.1.13
github.com/sirupsen/logrus v1.9.3
)

require (
github.com/RomiChan/syncx v0.0.0-20240418144900-b7402ffdebc7 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/stretchr/testify v1.8.0 // indirect
golang.org/x/image v0.16.0 // indirect
Expand Down
2 changes: 1 addition & 1 deletion packets/wtlogin/oicq.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func BuildLoginPacket(uin uint32, cmd string, appinfo *info.AppInfo, body []byte
return frame
}

func BuildUniPacket(uin, seq int, cmd string, sign map[string]string,
func BuildUniPacket(uin int, seq uint32, cmd string, sign map[string]string,
appInfo *info.AppInfo, deviceInfo *info.DeviceInfo, sigInfo *info.SigInfo, body []byte) []byte {

trace := generateTrace()
Expand Down
6 changes: 3 additions & 3 deletions utils/sign.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,19 @@ func containSignPKG(cmd string) bool {
return ok
}

func SignProvider(rawUrl string) func(string, int, []byte) map[string]string {
func SignProvider(rawUrl string) func(string, uint32, []byte) map[string]string {
if rawUrl == "" {
return nil
}
return func(cmd string, seq int, buf []byte) map[string]string {
return func(cmd string, seq uint32, buf []byte) map[string]string {
if !containSignPKG(cmd) {
return nil
}
startTime := time.Now().UnixMilli()
resp := signResponse{}
err := httpGet(rawUrl, map[string]string{
"cmd": cmd,
"seq": strconv.Itoa(seq),
"seq": strconv.Itoa(int(seq)),
"src": fmt.Sprintf("%x", buf),
}, time.Duration(5)*time.Second, &resp)
if err != nil {
Expand Down

0 comments on commit c3326bd

Please sign in to comment.