Skip to content

Commit

Permalink
optimize(highway): http request
Browse files Browse the repository at this point in the history
远期目标为迁移到纯tcp的, 可并发上传且零拷贝的 https://github.com/Mrs4s/MiraiGo/tree/master/client/internal/highway
  • Loading branch information
fumiama committed May 9, 2024
1 parent c5d739f commit dd74a56
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 211 deletions.
6 changes: 3 additions & 3 deletions client/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
binary2 "github.com/LagrangeDev/LagrangeGo/utils/binary"
)

const Server = "msfwifi.3g.qq.com:8080"
const msfwifiServer = "msfwifi.3g.qq.com:8080"

// NewQQclient 创建一个新的QQClient
func NewQQclient(uin uint32, signUrl string, appInfo *info.AppInfo, deviceInfo *info.DeviceInfo, sig *info.SigInfo) *QQClient {
Expand All @@ -31,7 +31,7 @@ func NewQQclient(uin uint32, signUrl string, appInfo *info.AppInfo, deviceInfo *
// 128应该够用了吧
pushStore: make(chan *wtlogin.SSOPacket, 128),
stopChan: make(chan struct{}),
tcp: NewTCPClient(Server, 5),
tcp: &TCPClient{},
cache: &cache.Cache{},
}
client.Online.Store(false)
Expand Down Expand Up @@ -233,7 +233,7 @@ func (c *QQClient) Loop() error {
}

func (c *QQClient) Connect() error {
err := c.tcp.Connect()
err := c.tcp.Connect(msfwifiServer, 5*time.Second)
if err != nil {
return err
}
Expand Down
191 changes: 81 additions & 110 deletions client/highway.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package client

import (
"bytes"
binary2 "encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"strconv"
Expand All @@ -17,6 +17,11 @@ import (
"github.com/RomiChan/protobuf/proto"
)

const (
uploadBlockSize = 1024 * 1024
httpServiceType uint32 = 1
)

type UpBlock struct {
CommandId int
Uin uint
Expand All @@ -25,108 +30,84 @@ type UpBlock struct {
Offset uint64
Ticket []byte
FileMd5 []byte
Block []byte
Block io.Reader
BlockMd5 []byte
BlockSize uint32
ExtendInfo []byte
Timestamp uint64
}

func (c *QQClient) GetServiceServer() ([]byte, map[uint32][]string) {
func (c *QQClient) EnsureHighwayServers() error {
if c.highwayUri == nil || c.sigSession == nil {
c.highwayUri = make(map[uint32][]string)
packet, err := highway2.BuildHighWayUrlReq(c.sig.Tgt)
if err != nil {
return nil, nil
return err
}
payload, err := c.SendUniPacketAndAwait("HttpConn.0x6ff_501", packet)
if err != nil {
networkLogger.Errorf("Failed to get highway server: %v", err)
return nil, nil
return fmt.Errorf("get highway server: %v", err)
}
resp, err := highway2.ParseHighWayUrlReq(payload.Data)
if err != nil {
networkLogger.Errorf("Failed to parse highway server: %v", err)
return nil, nil
return fmt.Errorf("parse highway server: %v", err)
}
for _, info := range resp.HttpConn.ServerInfos {
servicetype := info.ServiceType
for _, addr := range info.ServerAddrs {
ip := make([]byte, 4)
binary2.LittleEndian.PutUint32(ip, addr.IP)
service := c.highwayUri[servicetype]
service = append(service, fmt.Sprintf("http://%d.%d.%d.%d:%d/cgi-bin/httpconn?htcmd=0x6FF0087&uin=%d", ip[0], ip[1], ip[2], ip[3], addr.Port, c.sig.Uin))
service = append(service, fmt.Sprintf(
"http://%s:%d/cgi-bin/httpconn?htcmd=0x6FF0087&uin=%d",
le32toipstr(addr.IP), addr.Port, c.sig.Uin,
))
c.highwayUri[servicetype] = service
}
}
c.sigSession = resp.HttpConn.SigSession
}
return c.sigSession, c.highwayUri
}

func (c *QQClient) UploadSrcByStreamAsync(commonId int, stream io.ReadSeeker, ticket []byte, md5 []byte, extendInfo []byte) bool {
// Get server URL
_, server := c.GetServiceServer()
if server == nil {
networkLogger.Errorln("Failed to get upload server")
return false
}
success := true
var upBlocks []UpBlock
data, err := io.ReadAll(stream)
if err != nil {
networkLogger.Errorln("Failed to read stream")
return false
if c.highwayUri == nil || c.sigSession == nil {
return errors.New("empty highway servers")
}
return nil
}

fileSize := uint64(len(data))
offset := uint64(0)
_, err = stream.Seek(0, io.SeekStart)
func (c *QQClient) UploadSrcByStream(commonId int, r io.Reader, fileSize uint64, md5 []byte, extendInfo []byte) error {
err := c.EnsureHighwayServers()
if err != nil {
networkLogger.Errorln("Failed to seek stream")
return false
}

for offset < fileSize {
var buffersize uint64
if uint64(1024*1024) > fileSize-offset {
buffersize = fileSize - offset
} else {
buffersize = uint64(1024 * 1024)
return err
}
servers := c.highwayUri[httpServiceType]
server := servers[rand.Intn(len(servers))]
buffer := make([]byte, uploadBlockSize)
for offset := uint64(0); offset < fileSize; offset += uploadBlockSize {
if uploadBlockSize > fileSize-offset {
buffer = buffer[:fileSize-offset]
}
buffer := make([]byte, buffersize)
payload, err := io.ReadFull(stream, buffer)
_, err := io.ReadFull(r, buffer)
if err != nil {
networkLogger.Errorln("Failed to read stream")
return false
return err
}
reqBody := UpBlock{
err = c.SendUpBlock(&UpBlock{
CommandId: commonId,
Uin: uint(c.sig.Uin),
Sequence: uint(c.highwaySequence.Add(1)),
FileSize: fileSize,
Offset: offset,
Ticket: ticket,
Ticket: c.sigSession,
FileMd5: md5,
Block: buffer,
Block: bytes.NewReader(buffer),
BlockMd5: crypto.MD5Digest(buffer),
BlockSize: uint32(len(buffer)),
ExtendInfo: extendInfo,
}
upBlocks = append(upBlocks, reqBody)
offset += uint64(payload)
// 4 is HighwayConcurrent
if len(upBlocks) >= 4 || offset == fileSize {
for _, block := range upBlocks {
success = success && c.SendUpBlockAsync(block, server[1][0])
if !success {
networkLogger.Errorln("Failed to send block")
return false
}
}
upBlocks = nil
}, server)
if err != nil {
return err
}
}
return success
return nil
}

func (c *QQClient) SendUpBlockAsync(block UpBlock, server string) bool {
func (c *QQClient) SendUpBlock(block *UpBlock, server string) error {
head := &highway.DataHighwayHead{
Version: 1,
Uin: proto.Some(strconv.Itoa(int(block.Uin))),
Expand All @@ -137,15 +118,14 @@ func (c *QQClient) SendUpBlockAsync(block UpBlock, server string) bool {
DataFlag: 16,
CommandId: uint32(block.CommandId),
}
md5 := crypto.MD5Digest(block.Block)
segHead := &highway.SegHead{
ServiceId: proto.Some(uint32(0)),
Filesize: block.FileSize,
DataOffset: proto.Some(block.Offset),
DataLength: uint32(len(block.Block)),
DataLength: uint32(block.BlockSize),
RetCode: proto.Some(uint32(0)),
ServiceTicket: block.Ticket,
Md5: md5,
Md5: block.BlockMd5,
FileMd5: block.FileMd5,
CacheAddr: proto.Some(uint32(0)),
CachePort: proto.Some(uint32(0)),
Expand All @@ -162,71 +142,69 @@ func (c *QQClient) SendUpBlockAsync(block UpBlock, server string) bool {
Timestamp: block.Timestamp,
MsgLoginSigHead: loginHead,
}
isEnd := block.Offset+uint64(len(block.Block)) == block.FileSize
packet := binary.NewBuilder(nil)
packet.WriteBytes(block.Block, false)
payload, err := SendPacketAsync(highwayHead, packet, server, isEnd)
isEnd := block.Offset+uint64(block.BlockSize) == block.FileSize
payload, err := sendHighwayPacket(highwayHead, block.Block, block.BlockSize, server, isEnd)
if err != nil {
networkLogger.Errorln("Failed to send packet ", err)
return false
return fmt.Errorf("send highway packet: %v", err)
}
resphead, respbody, err := ParsePacket(payload)
defer payload.Close()
resphead, respbody, err := parseHighwayPacket(payload)
if err != nil {
networkLogger.Errorln("Failed to parse packet ", err)
return false
return fmt.Errorf("parse highway packet: %v", err)
}
networkLogger.Debugf("Highway Block Result: %d | %d | %x | %v",
resphead.ErrorCode, resphead.MsgSegHead.RetCode.Unwrap(), resphead.BytesRspExtendInfo, respbody)
return resphead.ErrorCode == 0
if resphead.ErrorCode != 0 {
return errors.New("highway error code: " + strconv.Itoa(int(resphead.ErrorCode)))
}
return nil
}

func ParsePacket(data []byte) (head *highway.RespDataHighwayHead, body *binary.Reader, err error) {
reader := binary.NewReader(data)
if reader.ReadBytesNoCopy(1)[0] == 0x28 {
headlength := reader.ReadU32()
bodylength := reader.ReadU32()
head = &highway.RespDataHighwayHead{}
headraw := reader.ReadBytesNoCopy(int(int64(headlength)))
err = proto.Unmarshal(headraw, head)
if err != nil {
return nil, nil, err
}
body = binary.NewReader(reader.ReadBytesNoCopy(int(bodylength)))
if reader.ReadBytesNoCopy(1)[0] == 0x29 {
return head, body, nil
}
func parseHighwayPacket(data io.Reader) (head *highway.RespDataHighwayHead, body *binary.Reader, err error) {
reader := binary.ParseReader(data)
if reader.ReadBytesNoCopy(1)[0] != 0x28 {
return nil, nil, errors.New("invalid highway packet")
}
headlength := reader.ReadU32()
_ = reader.ReadU32() // body len
head = &highway.RespDataHighwayHead{}
headraw := reader.ReadBytesNoCopy(int(int64(headlength)))
err = proto.Unmarshal(headraw, head)
if err != nil {
return nil, nil, err
}
return nil, nil, err
if reader.ReadBytesNoCopy(1)[0] != 0x29 {
return nil, nil, errors.New("invalid highway head")
}
return head, reader, nil
}

func SendPacketAsync(packet *highway.ReqDataHighwayHead, buffer *binary.Builder, serverURL string, end bool) ([]byte, error) {
func sendHighwayPacket(packet *highway.ReqDataHighwayHead, buffer io.Reader, bufferSize uint32, serverURL string, end bool) (io.ReadCloser, error) {
marshal, err := proto.Marshal(packet)
if err != nil {
return nil, err
}

println(hex.EncodeToString(marshal))

writer := binary.NewBuilder(nil).
WriteBytes([]byte{0x28}, false).
WriteU32(uint32(len(marshal))).
WriteU32(uint32(buffer.Len())).
WriteBytes(marshal, false).
WriteBytes(buffer.ToBytes(), false).
WriteBytes([]byte{0x29}, false)
WriteU32(bufferSize).
WriteBytes(marshal, false)
_, _ = io.Copy(writer, buffer)
writer.Write([]byte{0x29})

Check failure on line 194 in client/highway.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `writer.Write` is not checked (errcheck)

return SendDataAsync(writer.ToBytes(), serverURL, end)
return postHighwayContent(writer.ToReader(), serverURL, end)
}

func SendDataAsync(packet []byte, serverURL string, end bool) ([]byte, error) {
func postHighwayContent(content io.Reader, serverURL string, end bool) (io.ReadCloser, error) {
// Parse server URL
server, err := url.Parse(serverURL)
if err != nil {
return nil, err
}

// Create request
content := bytes.NewBuffer(packet)
networkLogger.Debugln("post content to highway url:", server)
req, err := http.NewRequest("POST", server.String(), content)
if err != nil {
return nil, err
Expand All @@ -244,12 +222,5 @@ func SendDataAsync(packet []byte, serverURL string, end bool) ([]byte, error) {
if err != nil {
return nil, err
}
defer resp.Body.Close()

// Read response data
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return data, nil
return resp.Body, nil
}
24 changes: 6 additions & 18 deletions client/network.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package client

// from https://github.com/Mrs4s/MiraiGo/blob/master/client/internal/network/conn.go

import (
"errors"
"io"
Expand All @@ -12,31 +14,18 @@ var ErrConnectionClosed = errors.New("connection closed")

type TCPClient struct {
lock sync.RWMutex

addr string
conn net.Conn
timeout int

connected bool
}

func NewTCPClient(addr string, timeout int) *TCPClient {
return &TCPClient{
addr: addr,
timeout: timeout,
}
conn net.Conn
}

func (c *TCPClient) Connect() error {
conn, err := net.DialTimeout("tcp", c.addr, time.Duration(c.timeout)*time.Second)
func (c *TCPClient) Connect(addr string, timeout time.Duration) error {
conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return err
}
networkLogger.Infof("connected to %s", conn.RemoteAddr())
c.lock.Lock()
defer c.lock.Unlock()
c.conn = conn
c.connected = true
return nil
}

Expand Down Expand Up @@ -75,7 +64,6 @@ func (c *TCPClient) Close() {
_ = c.conn.Close()
networkLogger.Error("tcp closed")
c.conn = nil
c.connected = false
}
}

Expand All @@ -86,5 +74,5 @@ func (c *TCPClient) getConn() net.Conn {
}

func (c *TCPClient) IsClosed() bool {
return !c.connected
return c.conn == nil
}
Loading

0 comments on commit dd74a56

Please sign in to comment.