Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize: bytes.Index and humanize.IBytes in SyncStandaloneReader #909

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ require (
github.com/mcuadros/go-defaults v1.2.0
github.com/rs/zerolog v1.28.0
github.com/spf13/viper v1.18.1
github.com/stretchr/testify v1.8.4
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)

require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
Expand Down
146 changes: 91 additions & 55 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package reader

import (
"RedisShake/internal/client/proto"
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -14,6 +14,8 @@ import (
"strings"
"time"

"RedisShake/internal/client/proto"

"RedisShake/internal/client"
"RedisShake/internal/config"
"RedisShake/internal/entry"
Expand All @@ -38,6 +40,8 @@ type SyncReaderOptions struct {
Sentinel client.SentinelOptions `mapstructure:"sentinel"`
}

const RDB_EOF_MARKER_LEN = 40

type State string

const (
Expand All @@ -48,36 +52,65 @@ const (
kSyncAof State = "syncing aof"
)

type syncStandaloneReader struct {
ctx context.Context
opts *SyncReaderOptions
client *client.Redis
type syncStandaloneReaderStat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`

ch chan *entry.Entry
DbId int
// status
Status State `json:"status"`

stat struct {
Name string `json:"name"`
Address string `json:"address"`
Dir string `json:"dir"`
// rdb info
RdbFileSizeBytes uint64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
RdbReceivedBytes uint64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbSentBytes uint64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan

// status
Status State `json:"status"`
// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes uint64 `json:"aof_received_bytes"` // bytes of AOF received from master
}

// rdb info
RdbFileSizeBytes int64 `json:"rdb_file_size_bytes"` // bytes of the rdb file
func (s syncStandaloneReaderStat) MarshalJSON() ([]byte, error) {
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
rdbFileSizeHuman, rdbReceivedHuman, rdbSentHuman, aofReceivedHuman := "", "", "", ""
if s.RdbFileSizeBytes != 0 {
rdbFileSizeHuman = humanize.IBytes(s.RdbFileSizeBytes)
}
if s.RdbReceivedBytes != 0 {
rdbReceivedHuman = humanize.IBytes(s.RdbReceivedBytes)
}
if s.RdbSentBytes != 0 {
rdbSentHuman = humanize.IBytes(s.RdbSentBytes)
}
if s.AofReceivedBytes != 0 {
aofReceivedHuman = humanize.IBytes(s.AofReceivedBytes)
}

type aliasStat syncStandaloneReaderStat // alias to avoid infinite recursion
return json.Marshal(struct {
aliasStat
RdbFileSizeHuman string `json:"rdb_file_size_human"`
RdbReceivedBytes int64 `json:"rdb_received_bytes"` // bytes of RDB received from master
RdbReceivedHuman string `json:"rdb_received_human"`
RdbSentBytes int64 `json:"rdb_sent_bytes"` // bytes of RDB sent to chan
RdbSentHuman string `json:"rdb_sent_human"`
AofReceivedHuman string `json:"aof_received_human"`
}{
aliasStat: aliasStat(s),
RdbFileSizeHuman: rdbFileSizeHuman,
RdbReceivedHuman: rdbReceivedHuman,
RdbSentHuman: rdbSentHuman,
AofReceivedHuman: aofReceivedHuman,
})
}

// aof info
AofReceivedOffset int64 `json:"aof_received_offset"` // offset of AOF received from master
AofSentOffset int64 `json:"aof_sent_offset"` // offset of AOF sent to chan
AofReceivedBytes int64 `json:"aof_received_bytes"` // bytes of AOF received from master
AofReceivedHuman string `json:"aof_received_human"`
}
type syncStandaloneReader struct {
ctx context.Context
opts *SyncReaderOptions
client *client.Redis

ch chan *entry.Entry
DbId int

stat syncStandaloneReaderStat

// version info
isDiskless bool
Expand Down Expand Up @@ -319,7 +352,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
}
timeStart = time.Now()
log.Debugf("[%s] start receiving RDB. path=[%s]", r.stat.Name, rdbFilePath)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
rdbFileHandle, err := os.OpenFile(rdbFilePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o666)
if err != nil {
log.Panicf(err.Error())
}
Expand All @@ -345,39 +378,46 @@ func (r *syncStandaloneReader) receiveRDBWithDiskless(marker string, wt io.Write
buf := make([]byte, bufSize)

marker = strings.Split(marker, ":")[1]
if len(marker) != 40 {
if len(marker) != RDB_EOF_MARKER_LEN {
log.Panicf("[%s] invalid len of EOF marker. value=[%s]", r.stat.Name, marker)
}
log.Infof("meet EOF begin marker: %s", marker)
bMarker := []byte(marker)
goon := true
for goon {
n, err := r.client.Read(buf[:bufSize])
var lastBytes []byte
for {
if lastBytes != nil { // add previous tail
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
copy(buf, lastBytes)
}

nread, err := r.client.Read(buf[len(lastBytes):])
if err != nil {
log.Panicf(err.Error())
}
buffer := buf[:n]
if bytes.Contains(buffer, bMarker) {
suxb201 marked this conversation as resolved.
Show resolved Hide resolved

bufLen := len(lastBytes) + nread
nwrite := 0
if bufLen >= RDB_EOF_MARKER_LEN && bytes.Equal(buf[bufLen-RDB_EOF_MARKER_LEN:bufLen], bMarker) {
log.Infof("meet EOF end marker.")
// replace it
fi := bytes.Index(buffer, bMarker)
if len(buffer[fi+40:]) > 0 {
suxb201 marked this conversation as resolved.
Show resolved Hide resolved
log.Warnf("data after end marker will be discarded: %s", string(buffer[fi+40:]))
// Write all buf without EOF marker and break
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
buffer = buffer[:fi]

goon = false
break
}

_, err = wt.Write(buffer)
if err != nil {
log.Panicf(err.Error())
if bufLen >= RDB_EOF_MARKER_LEN {
// left RDB_EOF_MARKER_LEN bytes to next round
if nwrite, err = wt.Write(buf[:bufLen-RDB_EOF_MARKER_LEN]); err != nil {
log.Panicf(err.Error())
}
lastBytes = buf[bufLen-RDB_EOF_MARKER_LEN : bufLen] // save last RDB_EOF_MARKER_LEN bytes into lastBytes for next round
} else {
// save all bytes into lastBytes for next round if less than RDB_EOF_MARKER_LEN
lastBytes = buf[:bufLen]
}

r.stat.RdbFileSizeBytes += int64(n)
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(r.stat.RdbFileSizeBytes))
r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbFileSizeBytes += uint64(nwrite)
r.stat.RdbReceivedBytes += uint64(nwrite)
}
}

Expand All @@ -387,8 +427,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}
log.Debugf("[%s] rdb file size: [%v]", r.stat.Name, humanize.IBytes(uint64(length)))
r.stat.RdbFileSizeBytes = length
r.stat.RdbFileSizeHuman = humanize.IBytes(uint64(length))
r.stat.RdbFileSizeBytes = uint64(length)

remainder := length
const bufSize int64 = 32 * 1024 * 1024 // 32MB
Expand All @@ -408,8 +447,7 @@ func (r *syncStandaloneReader) receiveRDBWithoutDiskless(marker string, wt io.Wr
log.Panicf(err.Error())
}

r.stat.RdbReceivedBytes += int64(n)
r.stat.RdbReceivedHuman = humanize.IBytes(uint64(r.stat.RdbReceivedBytes))
r.stat.RdbReceivedBytes += uint64(n)
}
}

Expand All @@ -427,8 +465,7 @@ func (r *syncStandaloneReader) receiveAOF() {
if err != nil {
log.Panicf(err.Error())
}
r.stat.AofReceivedBytes += int64(n)
r.stat.AofReceivedHuman = humanize.IBytes(uint64(r.stat.AofReceivedBytes))
r.stat.AofReceivedBytes += uint64(n)
aofWriter.Write(buf[:n])
r.stat.AofReceivedOffset += int64(n)
}
Expand All @@ -440,8 +477,7 @@ func (r *syncStandaloneReader) sendRDB(rdbFilePath string) {
log.Debugf("[%s] start sending RDB to target", r.stat.Name)
r.stat.Status = kSyncRdb
updateFunc := func(offset int64) {
r.stat.RdbSentBytes = offset
r.stat.RdbSentHuman = humanize.IBytes(uint64(offset))
r.stat.RdbSentBytes = uint64(offset)
}
rdbLoader := rdb.NewLoader(r.stat.Name, updateFunc, rdbFilePath, r.ch)
r.DbId = rdbLoader.ParseRDB(r.ctx)
Expand Down Expand Up @@ -532,16 +568,16 @@ func (r *syncStandaloneReader) Status() interface{} {

func (r *syncStandaloneReader) StatusString() string {
if r.stat.Status == kSyncRdb {
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbSentHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbSentBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
if r.stat.Status == kSyncAof {
return fmt.Sprintf("%s, diff=[%v]", r.stat.Status, -r.stat.AofSentOffset+r.stat.AofReceivedOffset)
}
if r.stat.Status == kReceiveRdb {
if r.isDiskless {
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, r.stat.RdbReceivedHuman)
return fmt.Sprintf("%s diskless, size=[%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes))
}
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, r.stat.RdbReceivedHuman, r.stat.RdbFileSizeHuman)
return fmt.Sprintf("%s, size=[%s/%s]", r.stat.Status, humanize.IBytes(r.stat.RdbReceivedBytes), humanize.IBytes(r.stat.RdbFileSizeBytes))
}
return string(r.stat.Status)
}
Expand Down
78 changes: 78 additions & 0 deletions internal/reader/sync_standalone_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package reader

import (
"context"
"encoding/json"
"testing"

"RedisShake/internal/log"
"github.com/stretchr/testify/require"
)

func Test_syncStandaloneReader_Status(t *testing.T) {
type fields struct {
ctx context.Context
opts *SyncReaderOptions
}
tests := []struct {
name string
fields fields
want interface{}
}{
{
name: "syncStandaloneReader_Status_Marshal",
fields: fields{
ctx: context.Background(),
opts: &SyncReaderOptions{
Cluster: false,
Address: "127.0.0.1:6379",
Username: "username",
Password: "password",
Tls: false,
SyncRdb: false,
SyncAof: false,
PreferReplica: false,
TryDiskless: false,
},
},
want: map[string]interface{}{
"name": "",
"address": "",
"dir": "",
"status": "",
"rdb_file_size_bytes": 0,
"rdb_file_size_human": "",
"rdb_received_bytes": 0,
"rdb_received_human": "",
"rdb_sent_bytes": 0,
"rdb_sent_human": "",
"aof_received_offset": 0,
"aof_sent_offset": 0,
"aof_received_bytes": 0,
"aof_received_human": "",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &syncStandaloneReader{
ctx: tt.fields.ctx,
opts: tt.fields.opts,
}

want, err := json.Marshal(tt.want)
if err != nil {
log.Warnf("marshal want failed, err=[%v]", err)
return
}

got, err := json.Marshal(r.Status())
if err != nil {
log.Warnf("marshal status failed, err=[%v]", err)
return
}

require.JSONEq(t, string(want), string(got))
})
}
}
Loading