Skip to content

Commit

Permalink
Merge 5eb061e into 021579e
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoshinonyaruko authored Oct 15, 2024
2 parents 021579e + 5eb061e commit 7ec8bd7
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 114 deletions.
118 changes: 4 additions & 114 deletions botgo/websocket/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
Expand All @@ -25,19 +24,7 @@ import (
const DefaultQueueSize = 10000

// 定义全局变量
var global_s int64

// PayloadWithTimestamp 存储带时间戳的 WSPayload
type PayloadWithTimestamp struct {
Payload *dto.WSPayload
Timestamp time.Time
}

var dataMap sync.Map

func init() {
StartCleanupRoutine()
}
var Global_s int64

// Setup 依赖注册
func Setup() {
Expand Down Expand Up @@ -200,33 +187,6 @@ func (c *Client) Session() *dto.Session {
return c.session
}

// func (c *Client) readMessageToQueue() {
// for {
// _, message, err := c.conn.ReadMessage()
// if err != nil {
// log.Errorf("%s read message failed, %v, message %s", c.session, err, string(message))
// close(c.messageQueue)
// c.closeChan <- err
// return
// }
// payload := &dto.WSPayload{}
// if err := json.Unmarshal(message, payload); err != nil {
// log.Errorf("%s json failed, %v", c.session, err)
// continue
// }
// // 更新 global_s 的值
// atomic.StoreInt64(&global_s, payload.S)

// payload.RawMessage = message
// log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))
// // 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
// if c.isHandleBuildIn(payload) {
// continue
// }
// c.messageQueue <- payload
// }
// }

func (c *Client) readMessageToQueue() {
for {
_, message, err := c.conn.ReadMessage()
Expand All @@ -241,64 +201,22 @@ func (c *Client) readMessageToQueue() {
log.Errorf("%s json failed, %v", c.session, err)
continue
}
atomic.StoreInt64(&global_s, payload.S)
// 更新 global_s 的值
atomic.StoreInt64(&Global_s, payload.S)

payload.RawMessage = message
log.Infof("%s receive %s message, %s", c.session, dto.OPMeans(payload.OPCode), string(message))

// 不过滤心跳事件
if payload.OPCode != 11 {
// 计算数据的哈希值
dataHash := calculateDataHash(payload.Data)

// 检查是否已存在相同的 Data
if existingPayload, ok := getDataFromSyncMap(dataHash); ok {
// 如果已存在相同的 Data,则丢弃当前消息
log.Infof("%s discard duplicate message with DataHash: %v", c.session, existingPayload)
continue
}

// 将新的 payload 存入 sync.Map
storeDataToSyncMap(dataHash, payload)
}

// 处理内置的一些事件,如果处理成功,则这个事件不再投递给业务
if c.isHandleBuildIn(payload) {
continue
}

c.messageQueue <- payload
}
}

func getDataFromSyncMap(dataHash string) (*dto.WSPayload, bool) {
value, ok := dataMap.Load(dataHash)
if !ok {
return nil, false
}
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return nil, false
}
return payloadWithTimestamp.Payload, true
}

func storeDataToSyncMap(dataHash string, payload *dto.WSPayload) {
payloadWithTimestamp := &PayloadWithTimestamp{
Payload: payload,
Timestamp: time.Now(),
}
dataMap.Store(dataHash, payloadWithTimestamp)
}

func calculateDataHash(data interface{}) string {
dataBytes, _ := json.Marshal(data)
return string(dataBytes) // 这里直接转换为字符串,可以使用更复杂的算法
}

// 在全局范围通过atomic访问s值与message_id的映射
func GetGlobalS() int64 {
return atomic.LoadInt64(&global_s)
return atomic.LoadInt64(&Global_s)
}

func (c *Client) listenMessageAndHandle() {
Expand Down Expand Up @@ -383,31 +301,3 @@ func (c *Client) readyHandler(payload *dto.WSPayload) {
event.DefaultHandlers.Ready(payload, readyData)
}
}

const cleanupInterval = 5 * time.Minute // 清理间隔时间

func StartCleanupRoutine() {
go func() {
for {
<-time.After(cleanupInterval)
cleanupDataMap()
}
}()
}

func cleanupDataMap() {
now := time.Now()
dataMap.Range(func(key, value interface{}) bool {
payloadWithTimestamp, ok := value.(*PayloadWithTimestamp)
if !ok {
return true
}

// 检查时间戳,清理超过一定时间的数据
if now.Sub(payloadWithTimestamp.Timestamp) > cleanupInterval {
dataMap.Delete(key)
}

return true
})
}
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,19 @@ func main() {
}
}
}

webhookHandler := server.NewWebhookHandler(100)

// 启动消息处理协程
go webhookHandler.ListenAndProcessMessages()

r.GET("/updateport", server.HandleIpupdate)
r.POST("/uploadpic", server.UploadBase64ImageHandler(rateLimiter))
r.POST("/uploadpicv2", server.UploadBase64ImageHandlerV2(rateLimiter, apiV2))
r.POST("/uploadpicv3", server.UploadBase64ImageHandlerV3(rateLimiter, api))
r.POST("/uploadrecord", server.UploadBase64RecordHandler(rateLimiter))
// 使用 CreateHandleValidation,传入 WebhookHandler 实例
r.POST("/webhook", server.CreateHandleValidation(conf.Settings.ClientSecret, webhookHandler))
r.Static("/channel_temp", "./channel_temp")
if config.GetFrpPort() == "0" && !config.GetDisableWebui() {
//webui和它的api
Expand Down
133 changes: 133 additions & 0 deletions server/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package server

import (
"bytes"
"crypto/ed25519"
"encoding/hex"
"encoding/json"
"io"
"log"
"net/http"
"strings"
"sync/atomic"

"github.com/gin-gonic/gin"
"github.com/tencent-connect/botgo/dto"
"github.com/tencent-connect/botgo/event"
"github.com/tencent-connect/botgo/websocket/client"
)

// Payload 定义请求载荷结构
type Payload struct {
D ValidationRequest `json:"d"`
Op int `json:"op"`
}

// ValidationRequest 定义鉴权请求结构
type ValidationRequest struct {
PlainToken string `json:"plain_token"`
EventTs string `json:"event_ts"`
}

// WebhookPayload 定义Webhook消息结构
type WebhookPayload struct {
PlainToken string `json:"plain_token"`
EventTs string `json:"event_ts"`
RawMessage []byte // 保存原始消息内容
}

// WebhookHandler 负责处理 Webhook 的接收和消息处理
type WebhookHandler struct {
messageQueue chan *WebhookPayload
closeChan chan error
}

// NewWebhookHandler 创建新的 WebhookHandler 实例
func NewWebhookHandler(queueSize int) *WebhookHandler {
return &WebhookHandler{
messageQueue: make(chan *WebhookPayload, queueSize),
closeChan: make(chan error),
}
}

// CreateHandleValidation 创建用于签名验证和消息入队的处理函数
func CreateHandleValidation(botSecret string, wh *WebhookHandler) gin.HandlerFunc {
return func(c *gin.Context) {
httpBody, err := io.ReadAll(c.Request.Body)
if err != nil {
log.Println("Failed to read HTTP body:", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to read request body"})
return
}

var payload Payload
if err := json.Unmarshal(httpBody, &payload); err != nil {
log.Println("Failed to parse HTTP payload:", err)
c.JSON(http.StatusBadRequest, gin.H{"error": "Failed to parse payload"})
return
}

// 生成种子并创建私钥
seed := botSecret
for len(seed) < ed25519.SeedSize {
seed = strings.Repeat(seed, 2)
}
seed = seed[:ed25519.SeedSize]
reader := strings.NewReader(seed)
_, privateKey, err := ed25519.GenerateKey(reader)
if err != nil {
log.Println("Failed to generate ed25519 key:", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to generate ed25519 key"})
return
}

// 拼接消息并生成签名
var msg bytes.Buffer
msg.WriteString(payload.D.EventTs)
msg.WriteString(payload.D.PlainToken)
signature := hex.EncodeToString(ed25519.Sign(privateKey, msg.Bytes()))

// 推送验证成功消息到队列
webhookPayload := &WebhookPayload{
PlainToken: payload.D.PlainToken,
EventTs: payload.D.EventTs,
RawMessage: httpBody,
}
wh.messageQueue <- webhookPayload

// 返回签名验证响应
c.JSON(http.StatusOK, gin.H{
"plain_token": payload.D.PlainToken,
"signature": signature,
})
}
}

// listenAndProcessMessages 启动协程处理队列中的消息
func (wh *WebhookHandler) ListenAndProcessMessages() {
for payload := range wh.messageQueue {
go func(p *WebhookPayload) {
log.Printf("Processing Webhook event with token: %s", p.PlainToken)
// 业务逻辑处理的地方
payload := &dto.WSPayload{}
if err := json.Unmarshal(p.RawMessage, payload); err != nil {
log.Printf("%s json failed, %v", p.EventTs, err)
return
}
// 更新 global_s 的值
atomic.StoreInt64(&client.Global_s, payload.S)

payload.RawMessage = p.RawMessage
log.Printf("%s receive %s message, %s", p.EventTs, dto.OPMeans(payload.OPCode), string(p.RawMessage))

// 性能不够 报错也没用 就扬了
go event.ParseAndHandle(payload)
}(payload)
}
log.Println("Message queue is closed")
}

// Close 关闭消息队列
func (wh *WebhookHandler) Close() {
close(wh.messageQueue)
}
1 change: 1 addition & 0 deletions structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Settings struct {
IdentifyAppids []int64 `yaml:"identify_appids"`
Crt string `yaml:"crt"`
Key string `yaml:"key"`
UseSelfCrt bool `yaml:"use_self_crt"`
//日志类
DeveloperLog bool `yaml:"developer_log"`
LogLevel int `yaml:"log_level"`
Expand Down

0 comments on commit 7ec8bd7

Please sign in to comment.