diff --git a/botgo/websocket/client/client.go b/botgo/websocket/client/client.go index e8fd77d7..1dc8bb6c 100644 --- a/botgo/websocket/client/client.go +++ b/botgo/websocket/client/client.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "os/signal" - "sync" "sync/atomic" "syscall" "time" @@ -25,19 +24,7 @@ import ( const DefaultQueueSize = 10000 // 定义全局变量 -var global_s int64 - -// PayloadWithTimestamp 存储带时间戳的 WSPayload -type PayloadWithTimestamp struct { - Payload *dto.WSPayload - Timestamp time.Time -} - -var dataMap sync.Map - -func init() { - StartCleanupRoutine() -} +var Global_s int64 // Setup 依赖注册 func Setup() { @@ -200,33 +187,6 @@ func (c *Client) Session() *dto.Session { return c.session } -// func (c *Client) readMessageToQueue() { -// for { -// _, message, err := c.conn.ReadMessage() -// if err != nil { -// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message)) -// close(c.messageQueue) -// c.closeChan <- err -// return -// } -// payload := &dto.WSPayload{} -// if err := json.Unmarshal(message, payload); err != nil { -// log.Errorf("%s json failed, %v", c.session, err) -// continue -// } -// // 更新 global_s 的值 -// atomic.StoreInt64(&global_s, payload.S) - -// payload.RawMessage = message -// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message)) -// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务 -// if c.isHandleBuildIn(payload) { -// continue -// } -// c.messageQueue <- payload -// } -// } - func (c *Client) readMessageToQueue() { for { _, message, err := c.conn.ReadMessage() @@ -241,64 +201,22 @@ func (c *Client) readMessageToQueue() { log.Errorf("%s json failed, %v", c.session, err) continue } - atomic.StoreInt64(&global_s, payload.S) + // 更新 global_s 的值 + atomic.StoreInt64(&Global_s, payload.S) payload.RawMessage = message log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message)) - - // 不过滤心跳事件 - if payload.OPCode != 11 { - // 计算数据的哈希值 - dataHash := calculateDataHash(payload.Data) - - // 检查是否已存在相同的 Data - if existingPayload, ok := getDataFromSyncMap(dataHash); ok { - // 如果已存在相同的 Data,则丢弃当前消息 - log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload) - continue - } - - // 将新的 payload 存入 sync.Map - storeDataToSyncMap(dataHash, payload) - } - // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务 if c.isHandleBuildIn(payload) { continue } - c.messageQueue <- payload } } -func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) { - value, ok := dataMap.Load(dataHash) - if !ok { - return nil, false - } - payloadWithTimestamp, ok := value.(*PayloadWithTimestamp) - if !ok { - return nil, false - } - return payloadWithTimestamp.Payload, true -} - -func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) { - payloadWithTimestamp := &PayloadWithTimestamp{ - Payload: payload, - Timestamp: time.Now(), - } - dataMap.Store(dataHash, payloadWithTimestamp) -} - -func calculateDataHash(data interface{}) string { - dataBytes, _ := json.Marshal(data) - return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法 -} - // 在全局范围通过atomic访问s值与message_id的映射 func GetGlobalS() int64 { - return atomic.LoadInt64(&global_s) + return atomic.LoadInt64(&Global_s) } func (c *Client) listenMessageAndHandle() { @@ -383,31 +301,3 @@ func (c *Client) readyHandler(payload *dto.WSPayload) { event.DefaultHandlers.Ready(payload, readyData) } } - -const cleanupInterval = 5 * time.Minute // 清理间隔时间 - -func StartCleanupRoutine() { - go func() { - for { - <-time.After(cleanupInterval) - cleanupDataMap() - } - }() -} - -func cleanupDataMap() { - now := time.Now() - dataMap.Range(func(key, value interface{}) bool { - payloadWithTimestamp, ok := value.(*PayloadWithTimestamp) - if !ok { - return true - } - - // 检查时间戳,清理超过一定时间的数据 - if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval { - dataMap.Delete(key) - } - - return true - }) -} diff --git a/main.go b/main.go index 54b7ed4c..9dcd3d83 100644 --- a/main.go +++ b/main.go @@ -446,11 +446,19 @@ func main() { } } } + + webhookHandler := server.NewWebhookHandler(100) + + // 启动消息处理协程 + go webhookHandler.ListenAndProcessMessages() + r.GET("/updateport", server.HandleIpupdate) r.POST("/uploadpic", server.UploadBase64ImageHandler(rateLimiter)) r.POST("/uploadpicv2", server.UploadBase64ImageHandlerV2(rateLimiter, apiV2)) r.POST("/uploadpicv3", server.UploadBase64ImageHandlerV3(rateLimiter, api)) r.POST("/uploadrecord", server.UploadBase64RecordHandler(rateLimiter)) + // 使用 CreateHandleValidation,传入 WebhookHandler 实例 + r.POST("/webhook", server.CreateHandleValidation(conf.Settings.ClientSecret, webhookHandler)) r.Static("/channel_temp", "./channel_temp") if config.GetFrpPort() == "0" && !config.GetDisableWebui() { //webui和它的api diff --git a/server/webhook.go b/server/webhook.go new file mode 100644 index 00000000..2cbe24b1 --- /dev/null +++ b/server/webhook.go @@ -0,0 +1,133 @@ +package server + +import ( + "bytes" + "crypto/ed25519" + "encoding/hex" + "encoding/json" + "io" + "log" + "net/http" + "strings" + "sync/atomic" + + "github.com/gin-gonic/gin" + "github.com/tencent-connect/botgo/dto" + "github.com/tencent-connect/botgo/event" + "github.com/tencent-connect/botgo/websocket/client" +) + +// Payload 定义请求载荷结构 +type Payload struct { + D ValidationRequest `json:"d"` + Op int `json:"op"` +} + +// ValidationRequest 定义鉴权请求结构 +type ValidationRequest struct { + PlainToken string `json:"plain_token"` + EventTs string `json:"event_ts"` +} + +// WebhookPayload 定义Webhook消息结构 +type WebhookPayload struct { + PlainToken string `json:"plain_token"` + EventTs string `json:"event_ts"` + RawMessage []byte // 保存原始消息内容 +} + +// WebhookHandler 负责处理 Webhook 的接收和消息处理 +type WebhookHandler struct { + messageQueue chan *WebhookPayload + closeChan chan error +} + +// NewWebhookHandler 创建新的 WebhookHandler 实例 +func NewWebhookHandler(queueSize int) *WebhookHandler { + return &WebhookHandler{ + messageQueue: make(chan *WebhookPayload, queueSize), + closeChan: make(chan error), + } +} + +// CreateHandleValidation 创建用于签名验证和消息入队的处理函数 +func CreateHandleValidation(botSecret string, wh *WebhookHandler) gin.HandlerFunc { + return func(c *gin.Context) { + httpBody, err := io.ReadAll(c.Request.Body) + if err != nil { + log.Println("Failed to read HTTP body:", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to read request body"}) + return + } + + var payload Payload + if err := json.Unmarshal(httpBody, &payload); err != nil { + log.Println("Failed to parse HTTP payload:", err) + c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse payload"}) + return + } + + // 生成种子并创建私钥 + seed := botSecret + for len(seed) < ed25519.SeedSize { + seed = strings.Repeat(seed, 2) + } + seed = seed[:ed25519.SeedSize] + reader := strings.NewReader(seed) + _, privateKey, err := ed25519.GenerateKey(reader) + if err != nil { + log.Println("Failed to generate ed25519 key:", err) + c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to generate ed25519 key"}) + return + } + + // 拼接消息并生成签名 + var msg bytes.Buffer + msg.WriteString(payload.D.EventTs) + msg.WriteString(payload.D.PlainToken) + signature := hex.EncodeToString(ed25519.Sign(privateKey, msg.Bytes())) + + // 推送验证成功消息到队列 + webhookPayload := &WebhookPayload{ + PlainToken: payload.D.PlainToken, + EventTs: payload.D.EventTs, + RawMessage: httpBody, + } + wh.messageQueue <- webhookPayload + + // 返回签名验证响应 + c.JSON(http.StatusOK, gin.H{ + "plain_token": payload.D.PlainToken, + "signature": signature, + }) + } +} + +// listenAndProcessMessages 启动协程处理队列中的消息 +func (wh *WebhookHandler) ListenAndProcessMessages() { + for payload := range wh.messageQueue { + go func(p *WebhookPayload) { + log.Printf("Processing Webhook event with token: %s", p.PlainToken) + // 业务逻辑处理的地方 + payload := &dto.WSPayload{} + if err := json.Unmarshal(p.RawMessage, payload); err != nil { + log.Printf("%s json failed, %v", p.EventTs, err) + return + } + // 更新 global_s 的值 + atomic.StoreInt64(&client.Global_s, payload.S) + + payload.RawMessage = p.RawMessage + log.Printf("%s receive %s message, %s", p.EventTs, dto.OPMeans(payload.OPCode), string(p.RawMessage)) + + // 性能不够 报错也没用 就扬了 + go event.ParseAndHandle(payload) + }(payload) + } + log.Println("Message queue is closed") +} + +// Close 关闭消息队列 +func (wh *WebhookHandler) Close() { + close(wh.messageQueue) +} diff --git a/structs/structs.go b/structs/structs.go index 1bea9414..58624200 100644 --- a/structs/structs.go +++ b/structs/structs.go @@ -69,6 +69,7 @@ type Settings struct { IdentifyAppids []int64 `yaml:"identify_appids"` Crt string `yaml:"crt"` Key string `yaml:"key"` + UseSelfCrt bool `yaml:"use_self_crt"` //日志类 DeveloperLog bool `yaml:"developer_log"` LogLevel int `yaml:"log_level"`