diff --git a/go.mod b/go.mod index f4b7c15c5..fb20c92ff 100644 --- a/go.mod +++ b/go.mod @@ -10,10 +10,13 @@ require ( github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/judwhite/go-svc v1.0.0 github.com/julienschmidt/httprouter v1.2.0 + github.com/kr/pretty v0.1.0 // indirect github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 github.com/nsqio/go-nsq v1.0.7 github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/testify v1.2.2 // indirect - golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect + github.com/vmihailenco/msgpack v4.0.4+incompatible + google.golang.org/appengine v1.6.1 // indirect + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect ) diff --git a/go.sum b/go.sum index 63c62ab65..f49f11068 100644 --- a/go.sum +++ b/go.sum @@ -10,12 +10,19 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDf github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw= github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE= github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 h1:frRvTmIp7QT1RPaphBvr6zvEHfvdOX7jMO7rvicCH9Q= github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w= github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 h1:nZ0z0haJRzCXAWH9Jl+BUnfD2n2MCSbGRSl8VBX+zR0= @@ -26,5 +33,24 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo= -golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aWCh3PIquJB2fYlv9wcs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= +google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/nsqd/channel.go b/nsqd/channel.go index ad9c66ce9..1eb66d1a2 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -106,7 +106,7 @@ func NewChannel(topicName string, channelName string, ctx *context, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), - int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, + int32(ctx.nsqd.getOpts().MaxMsgSize)+maxValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, dqLogf, diff --git a/nsqd/http.go b/nsqd/http.go index 9914b093f..0c92d73e8 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -223,7 +223,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout } msg := NewMessage(topic.GenerateID(), body) - msg.deferred = deferred + msg.AbsTs = time.Now().Add(deferred).UnixNano() err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} diff --git a/nsqd/message.go b/nsqd/message.go index 77ee4c79d..5b9719e8d 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -3,30 +3,49 @@ package nsqd import ( "bytes" "encoding/binary" + "errors" "fmt" "io" + "math" "time" + + "github.com/vmihailenco/msgpack" +) + +var ( + // First 4 bytes picked from hex representation of a day before epoch timestamp which should never exist in normal timestamp. + // python3 -c 'import struct; import datetime; print(struct.pack(">Q", int((datetime.datetime(1990, 1, 1).timestamp() - 60*60*24) * 10**9)))' + msgMagic = []byte{0x08, 0xc1, 0xe4, 0xa0} + + metaKey = []byte("meta") + bodyKey = []byte("body") + metaLengthPlaceholder = []byte("01") // 2 bytes + bodyLengthPlaceholder = []byte("01234567") // 8 bytes. Because `MaxMsgSize` is in int64 type. + + // No const or reference directly are mainly used for unit test. + maxMetaLen uint16 = math.MaxUint16 ) const ( MsgIDLength = 16 - minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts + minValidMsgLength = 4 + 4 + 2 + 4 + 8 // msgMagic + metaKey + metaLen + bodyKey + bodyLen + maxValidMsgLength = minValidMsgLength + math.MaxUint16 // minValidMsgLength + maxMetaLength ) type MessageID [MsgIDLength]byte type Message struct { - ID MessageID - Body []byte - Timestamp int64 - Attempts uint16 + ID MessageID `msgpack:"message_id"` + Body []byte `msgpack:"-"` + Timestamp int64 `msgpack:"timestamp"` + Attempts uint16 `msgpack:"attempts"` + AbsTs int64 `msgpack:"abs_ts"` // for in-flight handling deliveryTS time.Time clientID int64 pri int64 index int - deferred time.Duration } func NewMessage(id MessageID, body []byte) *Message { @@ -40,21 +59,70 @@ func NewMessage(id MessageID, body []byte) *Message { func (m *Message) WriteTo(w io.Writer) (int64, error) { var buf [10]byte var total int64 - binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) - n, err := w.Write(buf[:]) total += int64(n) if err != nil { return total, err } - n, err = w.Write(m.ID[:]) total += int64(n) if err != nil { return total, err } + n, err = w.Write(m.Body) + total += int64(n) + if err != nil { + return total, err + } + return total, nil +} + +func (m *Message) WriteToBackend(w io.Writer) (int64, error) { + var total int64 + + // magic bytes + n, err := w.Write(msgMagic) + total += int64(n) + if err != nil { + return total, err + } + + // meta bytes + meta, err := msgpack.Marshal(m) + if err != nil { + return total, err + } + + if len(meta) > int(maxMetaLen) { + return total, errors.New("marshaled meta data length exceeds max meta length") + } + + var metaPrefix = append(metaKey, metaLengthPlaceholder...) + binary.BigEndian.PutUint16(metaPrefix[4:4+len(metaLengthPlaceholder)], uint16(len(meta))) + + n, err = w.Write(metaPrefix[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(meta) + total += int64(n) + if err != nil { + return total, err + } + + // msg body + bodyPrefix := append(bodyKey, bodyLengthPlaceholder...) + binary.BigEndian.PutUint64(bodyPrefix[4:4+len(bodyLengthPlaceholder)], uint64(len(m.Body))) + + n, err = w.Write(bodyPrefix[:]) + total += int64(n) + if err != nil { + return total, err + } n, err = w.Write(m.Body) total += int64(n) @@ -67,6 +135,8 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { // decodeMessage deserializes data (as []byte) and creates a new Message // message format: +// +// Old message format: // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... // | (int64) || || (hex string encoded in ASCII) || (binary) // | 8-byte || || 16-byte || N-byte @@ -75,24 +145,51 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { // (uint16) // 2-byte // attempts +// +// New message format: +// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... +// | ([]byte)|| (metaKey+metaLen+meta) || (bodyKey+bodyLen+body) +// | 4-byte || (4+2+N)-byte || (4+8+N)-byte +// ------------------------------------------------------------------------------------------... +// message magic message meta message body +// func decodeMessage(b []byte) (*Message, error) { var msg Message - if len(b) < minValidMsgLength { - return nil, fmt.Errorf("invalid message buffer size (%d)", len(b)) + prefixBytes := b[:len(msgMagic)] + if bytes.Equal(prefixBytes, msgMagic) { + // New message format + metaStartIndex := len(msgMagic) + if !bytes.Equal(b[metaStartIndex:metaStartIndex+len(metaKey)], metaKey) { + return nil, fmt.Errorf("bad msg format. \"meta\" key should be after msg magic") + } + + metaSize := binary.BigEndian.Uint16(b[metaStartIndex+len(metaKey) : metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)]) + err := msgpack.Unmarshal(b[metaStartIndex+len(metaKey)+len(metaLengthPlaceholder):metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)+int(metaSize)], &msg) + if err != nil { + return nil, err + } + + bodyStartIndex := metaStartIndex + len(bodyKey) + len(metaLengthPlaceholder) + int(metaSize) + if !bytes.Equal(b[bodyStartIndex:bodyStartIndex+len(bodyKey)], bodyKey) { + return nil, fmt.Errorf("bad msg format. \"body\" key should be after meta content") + } + bodySize := binary.BigEndian.Uint64(b[bodyStartIndex+len(bodyKey) : bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder)]) + msg.Body = b[bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder) : uint64(bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder))+bodySize] + } else { + // Old message format + msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) + msg.Attempts = binary.BigEndian.Uint16(b[8:10]) + copy(msg.ID[:], b[10:10+MsgIDLength]) + msg.Body = b[10+MsgIDLength:] } - msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) - msg.Attempts = binary.BigEndian.Uint16(b[8:10]) - copy(msg.ID[:], b[10:10+MsgIDLength]) - msg.Body = b[10+MsgIDLength:] - return &msg, nil } func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error { buf.Reset() - _, err := msg.WriteTo(buf) + _, err := msg.WriteToBackend(buf) if err != nil { return err } diff --git a/nsqd/message_test.go b/nsqd/message_test.go new file mode 100644 index 000000000..dd7083244 --- /dev/null +++ b/nsqd/message_test.go @@ -0,0 +1,397 @@ +package nsqd + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "math" + "reflect" + "testing" + "time" + + "github.com/vmihailenco/msgpack" +) + +type messageWithAbsTsDelete struct { + ID MessageID `msgpack:"message_id"` + Body []byte `msgpack:"-"` + Timestamp int64 `msgpack:"timestamp"` + Attempts uint16 `msgpack:"attempts"` + + // for in-flight handling + deliveryTS time.Time + clientID int64 + pri int64 + index int +} + +func TestMessageWithAbsTsDeleteAfterMarshaling(t *testing.T) { + now := time.Now() + + cases := []struct { + Desc string + Msg *Message + }{ + { + Desc: "nonzero AbsTs", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + Attempts: 1, + AbsTs: now.Add(time.Second).UnixNano(), + }, + }, + { + Desc: "zero AbsTs", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + Attempts: 1, + }, + }, + } + + for _, ut := range cases { + meta, err := msgpack.Marshal(ut.Msg) + if err != nil { + t.Fatalf("Desc: %q. Msgpack marshal error: %v", ut.Desc, err) + } + + var unMarshaledMsgWithAbsTsDelete messageWithAbsTsDelete + err = msgpack.Unmarshal(meta, &unMarshaledMsgWithAbsTsDelete) + if err != nil { + t.Fatalf("Desc: %q. Msgpack unmarshal error: %v", ut.Desc, err) + } + + originalMsg := *ut.Msg + originalMsg.Body = nil + if !(reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.ID, originalMsg.ID) && + reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.Body, originalMsg.Body) && + reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.Timestamp, originalMsg.Timestamp) && + reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.Attempts, originalMsg.Attempts)) { + t.Fatalf("Desc: %q. MarshaledMessage unmarshal does not equal to original msg. Unmarshaled msg: %#v, orignal msg: %#v", + ut.Desc, unMarshaledMsgWithAbsTsDelete, originalMsg) + } + } +} + +type messageWithAddedFieldAdd struct { + ID MessageID `msgpack:"message_id"` + Body []byte `msgpack:"-"` + Timestamp int64 `msgpack:"timestamp"` + Attempts uint16 `msgpack:"attempts"` + AbsTs int64 `msgpack:"abs_ts"` + AddedField []byte `msgpack:"added_field"` + + // for in-flight handling + deliveryTS time.Time + clientID int64 + pri int64 + index int +} + +func TestMessageWithAddedFieldAddMarshaling(t *testing.T) { + now := time.Now() + + cases := []struct { + Desc string + Msg *Message + }{ + { + Desc: "no AddedField", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + Attempts: 1, + AbsTs: now.Add(time.Second).UnixNano(), + }, + }, + } + + for _, ut := range cases { + meta, err := msgpack.Marshal(ut.Msg) + if err != nil { + t.Fatalf("Desc: %q. Msgpack marshal error: %v", ut.Desc, err) + } + + var unMarshaledMsgWithAddedFieldAdd messageWithAddedFieldAdd + err = msgpack.Unmarshal(meta, &unMarshaledMsgWithAddedFieldAdd) + if err != nil { + t.Fatalf("Desc: %q. Msgpack unmarshal error: %v", ut.Desc, err) + } + + originalMsg := *ut.Msg + originalMsg.Body = nil + if !(reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.ID, originalMsg.ID) && + reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.Body, originalMsg.Body) && + reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.Timestamp, originalMsg.Timestamp) && + reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.Attempts, originalMsg.Attempts) && + reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.AbsTs, originalMsg.AbsTs) && + unMarshaledMsgWithAddedFieldAdd.AddedField == nil) { + t.Fatalf("Desc: %q. MarshaledMessage unmarshal does not equal to original msg. Unmarshaled msg: %#v, orignal msg: %#v", + ut.Desc, unMarshaledMsgWithAddedFieldAdd, originalMsg) + } + } +} + +func TestMessageMarshalAndUnmarshal(t *testing.T) { + now := time.Now() + + cases := []struct { + Desc string + Msg *Message + }{ + { + Desc: "zero Attempts", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + AbsTs: now.Add(time.Second).UnixNano(), + }, + }, + { + Desc: "zero AbsTs, normal msg", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + Attempts: 1, + }, + }, + { + Desc: "zero AbsTs and zero Attempts", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + }, + }, + { + Desc: "defer msg", + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Body: []byte("abc"), + Timestamp: now.UnixNano(), + Attempts: 1, + AbsTs: now.Add(time.Second).UnixNano(), + }, + }, + } + + for _, ut := range cases { + meta, err := msgpack.Marshal(ut.Msg) + if err != nil { + t.Fatalf("Desc: %q. Msgpack marshal error: %v", ut.Desc, err) + } + + var unMarshaledMsg Message + err = msgpack.Unmarshal(meta, &unMarshaledMsg) + if err != nil { + t.Fatalf("Desc: %q. Msgpack unmarshal error: %v", ut.Desc, err) + } + + originalMsg := *ut.Msg + originalMsg.Body = nil + if !reflect.DeepEqual(unMarshaledMsg, originalMsg) { + t.Fatalf("Desc: %q. MarshaledMessage unmarshal does not equal to original msg. Unmarshaled msg: %#v, orignal msg: %#v", + ut.Desc, unMarshaledMsg, originalMsg) + } + } +} + +func BenchmarkMessageJsonMarshalAndUnmarshal(b *testing.B) { + msg := Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Timestamp: time.Now().UnixNano(), + Attempts: 1, + AbsTs: time.Now().Add(time.Second).UnixNano(), + } + + content, err := json.Marshal(msg) + if err != nil { + b.Fatalf("Json marshal error: %v", err) + } + + b.SetBytes(int64(len(content))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + content, err := json.Marshal(msg) + if err != nil { + b.Fatalf("Json marshal error: %v", err) + } + + var tmp Message + err = json.Unmarshal(content, &tmp) + if err != nil { + b.Fatalf("Json unmarshal error: %v", err) + } + } +} + +func BenchmarkMessageMsgPackMarshalAndUnmarshal(b *testing.B) { + msg := Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Timestamp: time.Now().UnixNano(), + Attempts: 1, + AbsTs: time.Now().Add(time.Second).UnixNano(), + } + + content, err := msgpack.Marshal(&msg) + if err != nil { + b.Fatalf("Msgpack marshal error: %v", err) + } + + b.SetBytes(int64(len(content))) + b.ResetTimer() + for i := 0; i < b.N; i++ { + content, err := msgpack.Marshal(&msg) + if err != nil { + b.Fatalf("Msgpack marshal error: %v", err) + } + + var tmp Message + err = msgpack.Unmarshal(content, &tmp) + if err != nil { + b.Fatalf("Msgpack unmarshal error: %v", err) + } + } +} + +func MarshalMessageInNewFormat(t *testing.T, m *Message) []byte { + var ret []byte + ret = append(ret, msgMagic...) + ret = append(ret, metaKey...) + + meta, err := msgpack.Marshal(&m) + if err != nil { + t.Fatalf("") + } + var metaLen [2]byte + binary.BigEndian.PutUint16(metaLen[:], uint16(len(meta))) + ret = append(ret, metaLen[:]...) + ret = append(ret, meta...) + + ret = append(ret, bodyKey...) + var bodyLen [8]byte + binary.BigEndian.PutUint64(bodyLen[:], uint64(len(m.Body))) + ret = append(ret, bodyLen[:]...) + ret = append(ret, m.Body...) + + return ret +} + +func MarshalMessageInOldFormat(t *testing.T, m *Message) []byte { + bf := bytes.NewBuffer(nil) + + _, err := m.WriteTo(bf) + if err != nil { + t.Fatalf("Marshal message in old format error: %v", err) + } + + return bf.Bytes() +} + +func TestMessage_WriteToBackend(t *testing.T) { + now := time.Now() + + cases := []struct { + Desc string + MaxMetaLen uint16 + Msg *Message + WantedErr bool + }{ + { + Desc: "normal message", + MaxMetaLen: math.MaxUint16, + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Timestamp: time.Now().UnixNano(), + Attempts: 1, + AbsTs: now.Add(time.Second).UnixNano(), + Body: []byte("abc"), + }, + WantedErr: false, + }, + { + Desc: "empty body message", + MaxMetaLen: math.MaxUint16, + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Timestamp: time.Now().UnixNano(), + Attempts: 1, + AbsTs: now.Add(time.Second).UnixNano(), + Body: []byte(""), + }, + WantedErr: false, + }, + { + Desc: "Exceed max meta length", + MaxMetaLen: 1, + Msg: &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Timestamp: time.Now().UnixNano(), + Attempts: 1, + AbsTs: now.Add(time.Second).UnixNano(), + Body: []byte("abc"), + }, + WantedErr: true, + }, + } + + for _, ut := range cases { + oldMaxMetaLen := maxMetaLen + + maxMetaLen = ut.MaxMetaLen + + bf := &bytes.Buffer{} + writtenLen, err := ut.Msg.WriteToBackend(bf) + if (err != nil && !ut.WantedErr) || (err == nil && ut.WantedErr) { + t.Fatalf("Desc: %q. WriteToBackend for message got error: %v, wanted error: %v", ut.Desc, err, ut.WantedErr) + } + + wanted := MarshalMessageInNewFormat(t, ut.Msg) + if writtenLen != int64(len(wanted)) && bytes.EqualFold(bf.Bytes(), wanted) { + t.Fatalf("Desc: %q. WriteToBackend output does not match wanted bytes. Got: %v, wanted: %v", ut.Desc, bf.Bytes(), wanted) + } + + maxMetaLen = oldMaxMetaLen + } +} + +func TestDecodeMessage(t *testing.T) { + msg := &Message{ + ID: MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, + Timestamp: time.Now().UnixNano(), + Attempts: 1, + Body: []byte("abc"), + } + + cases := []struct { + Desc string + MarshaledMessage []byte + }{ + { + Desc: "old message format", + MarshaledMessage: MarshalMessageInOldFormat(t, msg), + }, + { + Desc: "new message format", + MarshaledMessage: MarshalMessageInNewFormat(t, msg), + }, + } + + for _, ut := range cases { + gotMsg, err := decodeMessage(ut.MarshaledMessage) + if err != nil { + t.Fatalf("Desc: %q. Decode message error: %v", ut.Desc, err) + } + + if !reflect.DeepEqual(gotMsg, msg) { + t.Fatalf("Desc: %q. Decoded message does not equal to wanted message. Got: %#v, wanted: %#v", ut.Desc, *gotMsg, *msg) + } + } +} diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index 31aeeacba..1d6b83fb3 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -309,6 +309,18 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + + var deferred time.Duration + nowInNano := time.Now().UnixNano() + if nowInNano-msg.AbsTs < 0 { + deferred = time.Duration(msg.AbsTs - nowInNano) + } + + if deferred != 0 { + subChannel.PutMessageDeferred(msg, deferred) + continue + } + msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) @@ -916,7 +928,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) { topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) - msg.deferred = timeoutDuration + msg.AbsTs = time.Now().Add(timeoutDuration).UnixNano() err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error()) diff --git a/nsqd/topic.go b/nsqd/topic.go index c9884fe22..efea0b430 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -71,7 +71,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), - int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, + int32(ctx.nsqd.getOpts().MaxMsgSize)+maxValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, dqLogf, @@ -307,6 +307,12 @@ func (t *Topic) messagePump() { goto exit } + var deferred time.Duration + nowInNano := time.Now().UnixNano() + if nowInNano-msg.AbsTs < 0 { + deferred = time.Duration(msg.AbsTs - nowInNano) + } + for i, channel := range chans { chanMsg := msg // copy the message because each channel @@ -316,10 +322,9 @@ func (t *Topic) messagePump() { if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp - chanMsg.deferred = msg.deferred } - if chanMsg.deferred != 0 { - channel.PutMessageDeferred(chanMsg, chanMsg.deferred) + if deferred != 0 { + channel.PutMessageDeferred(chanMsg, deferred) continue } err := channel.PutMessage(chanMsg)