Skip to content

Commit

Permalink
优化:
Browse files Browse the repository at this point in the history
1粗暴的codec扩容\
2.eventLoop=1,代替channel来实现单线程
  • Loading branch information
xuning888 committed Aug 18, 2024
1 parent 0920804 commit e732121
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 137 deletions.
75 changes: 5 additions & 70 deletions database/server_standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"github.com/xuning888/godis-tiny/redis/connection"
"github.com/xuning888/godis-tiny/redis/protocol"
"go.uber.org/zap"
"runtime"
"sync"
"sync/atomic"
"time"
)

var _ database.DBEngine = &Standalone{}

var lock = sync.Mutex{}

// Standalone 单机存储的存储引擎
type Standalone struct {
shutdown *atomic2.Boolean
Expand All @@ -30,15 +31,12 @@ type Standalone struct {
cancel func()
dbSet []*atomic.Value
aof *persistence.Aof
reqQueue chan *database.CmdReq
resQueue chan *database.CmdRes
lg *zap.Logger
}

func (s *Standalone) Cron() {
cmdReq := database.MakeCmdReq(connection.SystemCon, util.ToCmdLine("ttlops"))
s.PushReqEvent(cmdReq)

s.Exec(cmdReq)
// 触发aof重写
s.aofRewrite()
}
Expand Down Expand Up @@ -87,16 +85,12 @@ func (s *Standalone) ForEach(dbIndex int, cb func(key string, entity *database.D
}
}

var multi = runtime.NumCPU()

func MakeStandalone() *Standalone {
server := &Standalone{}
ctx, cancelFunc := context.WithCancel(context.Background())
server.ctx = ctx
server.cancel = cancelFunc
server.shutdown = new(atomic2.Boolean)
server.resQueue = make(chan *database.CmdRes, multi)
server.reqQueue = make(chan *database.CmdReq, multi)
lg, err := logger.CreateLogger(logger.DefaultLevel)
if err != nil {
panic(err)
Expand Down Expand Up @@ -150,38 +144,10 @@ func (s *Standalone) Rewrite() error {
return s.aof.Rewrite()
}

var ErrorsShutdown = errors.New("shutdown")

func (s *Standalone) PushReqEvent(req *database.CmdReq) error {
if s.shutdown.Get() {
return ErrorsShutdown
}
s.reqWait.Add(1)
go func() {
defer s.reqWait.Done()
reqq := req
select {
case <-s.ctx.Done():
s.lg.Sugar().Errorf("push aborted: Standalone is shutting down")
return
case s.reqQueue <- reqq:
return
}
}()
return nil
}

func (s *Standalone) DeliverResEvent() <-chan *database.CmdRes {
return s.resQueue
}

// Exec 这是一个无锁实现, 用于AofLoad
func (s *Standalone) Exec(req *database.CmdReq) *database.CmdRes {
return s.doExec(req)
}

// doExec 调用db执行命令,将结果放到 resQueue 中
func (s *Standalone) doExec(req *database.CmdReq) *database.CmdRes {
lock.Lock()
defer lock.Unlock()
// 执行命令
client := req.GetConn()
index := client.GetIndex()
Expand Down Expand Up @@ -234,8 +200,6 @@ func (s *Standalone) Init() {
if config.Properties.AppendOnly {
s.aof.LoadAof(0)
}
// 开启一个协程来消费req队列
s.startCmdConsumer()
}

func (s *Standalone) bindPersister(aof *persistence.Aof) {
Expand Down Expand Up @@ -266,32 +230,3 @@ func (s *Standalone) Shutdown(cancelCtx context.Context) (err error) {
}
return
}

// startCmdConsumer 开启一个协程消费指令
func (s *Standalone) startCmdConsumer() {
go func() {
for {
select {
case <-s.ctx.Done():
s.lg.Info("receive cmdRequeue closed")
return
case cmdReq, ok := <-s.reqQueue:
if !ok {
s.lg.Info("cmdReqQueue is closed")
return
}
cmdRes := s.doExec(cmdReq)
if cmdRes == nil || cmdRes.GetConn().IsInner() {
continue
}

devl := func(res *database.CmdRes) {
defer s.resWait.Done()
s.resQueue <- res
}
s.resWait.Add(1)
go devl(cmdRes)
}
}
}()
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/xuning888/godis-tiny
go 1.17

require (
github.com/panjf2000/gnet/v2 v2.3.4
github.com/panjf2000/gnet/v2 v2.5.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.21.0
)
Expand All @@ -14,7 +14,7 @@ require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sync v0.5.0 // indirect
golang.org/x/sync v0.6.0 // indirect
golang.org/x/sys v0.17.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
14 changes: 7 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/panjf2000/ants/v2 v2.8.2 h1:D1wfANttg8uXhC9149gRt1PDQ+dLVFjNXkCEycMcvQQ=
github.com/panjf2000/ants/v2 v2.8.2/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/panjf2000/gnet/v2 v2.3.4 h1:+ASHt+Wxr0KIzlk5FsLBbegCc4US7iVCdZ1QbUyw17g=
github.com/panjf2000/gnet/v2 v2.3.4/go.mod h1:0mTLWq4zMEXyQ35BY094dNWYnXfIdDg0mOlmZJflaXE=
github.com/panjf2000/ants/v2 v2.9.0 h1:SztCLkVxBRigbg+vt0S5QvF5vxAbxbKt09/YfAJ0tEo=
github.com/panjf2000/ants/v2 v2.9.0/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/panjf2000/gnet/v2 v2.5.0 h1:nJOJ+SK+MeFN4+6zNgxPRU88BbH7SAMf9wu7nw6mGz4=
github.com/panjf2000/gnet/v2 v2.5.0/go.mod h1:R+X5M5YBpOGMVP/92OJ02P35SbmoHjiL7GnaBhht6GE=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -50,14 +50,14 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
4 changes: 0 additions & 4 deletions interface/database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ type CmdLine = [][]byte
type DBEngine interface {
// Init 做必要的初始化工作
Init()
// PushReqEvent 推送一个命令到dbEngine
PushReqEvent(req *CmdReq) error
// DeliverResEvent 接收res的channel
DeliverResEvent() <-chan *CmdRes
// Exec 同步调用
Exec(req *CmdReq) *CmdRes
// ForEach 遍历指定的db
Expand Down
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func setupConfiguration(configPath string) {
}

func printHelp() {
helpText := `
Usage: ./` + serverName + ` [/path/to/redis.conf] [options] [-]
./` + serverName + ` -h or --help`
helpText := `Usage: ./` + serverName + ` [/path/to/redis.conf] [options] [-]
./` + serverName + ` -h or --help
`
fmt.Print(helpText)
}

Expand Down
5 changes: 5 additions & 0 deletions redis/parser/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func appendReply(reply []byte, replies []redis.Reply, c *Codec) []redis.Reply {
func handleDecodeError(err error, c *Codec) error {
// 如果err是 黏包/半包 就直接返回,否则就把接收到的包丢弃
if errors.Is(err, ErrIncompletePacket) {
if (cap(c.buf)) < c.remainingBulkLength+2 {
newBuffer := make([]byte, 0, len(c.buf)+c.remainingBulkLength+2)
newBuffer = append(newBuffer, c.buf...)
c.buf = newBuffer
}
return err
}
c.Reset()
Expand Down
51 changes: 4 additions & 47 deletions tcp/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
func (r *RedisServer) OnBoot(eng gnet.Engine) (action gnet.Action) {
r.engine = eng
r.dbEngine.Init()
r.listen()
r.lg.Sugar().Info("Ready to accept connections tcp")
return
}
Expand Down Expand Up @@ -70,7 +69,6 @@ func (r *RedisServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
replies, err := codecc.Decode(data)
if err != nil && len(replies) == 0 {
if errors.Is(err, parser.ErrIncompletePacket) {
r.lg.Sugar().Error("ErrIncompletePacket1")
return gnet.None
}
r.lg.Sugar().Errorf("decode falied with error: %v", err)
Expand All @@ -83,14 +81,8 @@ func (r *RedisServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
for _, value := range replies {
cmd, _ := value.(*protocol.MultiBulkReply)
cmdReq := database2.MakeCmdReq(conn, cmd.Args)
err2 := r.dbEngine.PushReqEvent(cmdReq)
if err2 != nil {
err3 := r.quickWrite(c, protocol.MakeStandardErrReply("ERR Server is shutting down").ToBytes())
if err3 != nil {
r.lg.Sugar().Errorf("err3: %v", err3)
}
return gnet.None
}
cmdRes := r.dbEngine.Exec(cmdReq)
r.asyncWrite(c, cmdRes.GetReply().ToBytes())
}
if errors.Is(err, parser.ErrIncompletePacket) {
r.lg.Sugar().Error("ErrIncompletePacket1")
Expand All @@ -106,47 +98,12 @@ func (r *RedisServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
for _, value := range replies {
cmd, _ := value.(*protocol.MultiBulkReply)
cmdReq := database2.MakeCmdReq(conn, cmd.Args)
err2 := r.dbEngine.PushReqEvent(cmdReq)
if err2 != nil {
err3 := r.quickWrite(c, protocol.MakeStandardErrReply("ERR Server is shutting down").ToBytes())
if err3 != nil {
r.lg.Sugar().Errorf("err3: %v", err3)
}
return gnet.None
}
cmdRes := r.dbEngine.Exec(cmdReq)
r.asyncWrite(c, cmdRes.GetReply().ToBytes())
}
return
}

func (r *RedisServer) listen() {
r.lg.Sugar().Info("start listen cmdResQueue")
resEvent := r.dbEngine.DeliverResEvent()
go func() {
for {
select {
case cmdRes, ok := <-resEvent:
if !ok {
r.lg.Sugar().Info("stop listen cmdResQueue")
return
}
conn := cmdRes.GetConn()
if conn.IsInner() {
return
}
go func() {
finalCmdRes := cmdRes
redisConn := finalCmdRes.GetConn()
bytes := finalCmdRes.GetReply().ToBytes()
r.asyncWrite(redisConn.GnetConn(), bytes)
}()
case <-r.stopChan:
r.lg.Sugar().Info("stop listen due to shutdown signal")
return
}
}
}()
}

func (r *RedisServer) asyncWrite(c gnet.Conn, bytes []byte) {
r.writeWg.Add(1)
err := c.AsyncWrite(bytes, r.callback)
Expand Down
7 changes: 3 additions & 4 deletions tcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,13 @@ func (r *RedisServer) Spin() {
go func() {
errCh <- gnet.Run(
r, address,
gnet.WithReadBufferCap(1<<18),
// 启用多核心, 开启后NumEventLoops = coreSize
gnet.WithMulticore(true),
gnet.WithMulticore(false),
gnet.WithNumEventLoop(1), // 关闭多核心, 设置numEventLoop = 1, 用于模拟redis的单线程
// 启用定时任务
gnet.WithTicker(true),
// socket 60不活跃就会被驱逐
gnet.WithTCPKeepAlive(time.Second*time.Duration(defaultTimeout)),
gnet.WithReusePort(true),
gnet.WithReusePort(false),
// 使用最少连接的负载均衡算法为eventLoop分配conn
gnet.WithLoadBalancing(gnet.LeastConnections),
gnet.WithLogger(r.lg.Sugar().Named("tcp-server")),
Expand Down

0 comments on commit e732121

Please sign in to comment.