Skip to content

Commit

Permalink
feat: use sync instead of psync while Redis version is less than 2.8.…
Browse files Browse the repository at this point in the history
…0. (#874)
  • Loading branch information
lyramilk authored Oct 23, 2024
1 parent 0700f9d commit 0dd0b41
Showing 1 changed file with 86 additions and 0 deletions.
86 changes: 86 additions & 0 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ type syncStandaloneReader struct {
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}

// version info
SupportPSYNC bool
}

func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader {
Expand All @@ -90,10 +93,36 @@ func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reade
r.stat.Status = kHandShake
r.stat.Dir = utils.GetAbsPath(r.stat.Name)
utils.CreateEmptyDir(r.stat.Dir)

r.SupportPSYNC = r.supportPSYNC();
return r
}


func (r *syncStandaloneReader) supportPSYNC() bool {
reply := r.client.DoWithStringReply("info", "server")
for _, line := range strings.Split(reply, "\n") {
if strings.HasPrefix(line, "redis_version:") {
version := strings.Split(line, ":")[1]
parts := strings.Split(version,".");
if len(parts) > 2{
v1,_ := strconv.Atoi(parts[0]);
v2,_ := strconv.Atoi(parts[1]);
if v1 * 1000 + v2 < 2008{
return false
}
}

}
}

return true;
}

func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entry {
if !r.SupportPSYNC{
return r.StartReadWithSync(ctx);
}
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {
Expand All @@ -116,6 +145,29 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr
return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) StartReadWithSync(ctx context.Context) []chan *entry.Entry {
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {
//r.sendReplconfListenPort()
r.sendSync()
rdbFilePath := r.receiveRDB()
startOffset := r.stat.AofReceivedOffset
//go r.sendReplconfAck() // start sent replconf ack
go r.receiveAOF(r.rd)
if r.opts.SyncRdb {
r.sendRDB(rdbFilePath)
}
if r.opts.SyncAof {
r.stat.Status = kSyncAof
r.sendAOF(startOffset)
}
close(r.ch)
}()

return []chan *entry.Entry{r.ch}
}

func (r *syncStandaloneReader) sendReplconfListenPort() {
// use status_port as redis-shake port
argv := []interface{}{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)}
Expand Down Expand Up @@ -166,6 +218,40 @@ func (r *syncStandaloneReader) sendPSync() {
r.stat.AofReceivedOffset = int64(masterOffset)
}

func (r *syncStandaloneReader) sendSync() {
if r.opts.TryDiskless {
argv := []interface{}{"REPLCONF", "CAPA", "EOF"}
reply := r.client.DoWithStringReply(argv...)
if reply != "OK" {
log.Warnf("[%s] send replconf capa eof to redis server failed. reply=[%v]", r.stat.Name, reply)
}
}
// send SYNC
argv := []interface{}{"SYNC"}
if config.Opt.Advanced.AwsPSync != "" {
argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"}
}
r.client.Send(argv...)

// format: \n\n\n+<reply>\r\n
for {
select {
case <-r.ctx.Done():
close(r.ch)
runtime.Goexit() // stop goroutine
default:
}
bytes, err := r.rd.Peek(1)
if err != nil {
log.Panicf(err.Error())
}
if bytes[0] != '\n' {
break
}
r.rd.ReadByte()
}
}

func (r *syncStandaloneReader) receiveRDB() string {
log.Debugf("[%s] source db is doing bgsave.", r.stat.Name)
r.stat.Status = kWaitBgsave
Expand Down

0 comments on commit 0dd0b41

Please sign in to comment.