From a593551fb4bd574c5ba88af68040f8033b70c056 Mon Sep 17 00:00:00 2001
From: Andy Xie <andy.xning@gmail.com>
Date: Fri, 26 Jul 2019 13:22:40 +0800
Subject: [PATCH] refactor nsqd storage engine

---
 go.mod               |   5 +-
 go.sum               |  30 +++-
 nsqd/channel.go      |   2 +-
 nsqd/http.go         |   2 +-
 nsqd/message.go      | 110 ++++++++++--
 nsqd/message_test.go | 416 +++++++++++++++++++++++++++++++++++++++++++
 nsqd/protocol_v2.go  |  14 +-
 nsqd/topic.go        |  13 +-
 8 files changed, 564 insertions(+), 28 deletions(-)
 create mode 100644 nsqd/message_test.go

diff --git a/go.mod b/go.mod
index f4b7c15c5..fb20c92ff 100644
--- a/go.mod
+++ b/go.mod
@@ -10,10 +10,13 @@ require (
 	github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db
 	github.com/judwhite/go-svc v1.0.0
 	github.com/julienschmidt/httprouter v1.2.0
+	github.com/kr/pretty v0.1.0 // indirect
 	github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6
 	github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839
 	github.com/nsqio/go-nsq v1.0.7
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/stretchr/testify v1.2.2 // indirect
-	golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 // indirect
+	github.com/vmihailenco/msgpack v4.0.4+incompatible
+	google.golang.org/appengine v1.6.1 // indirect
+	gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
 )
diff --git a/go.sum b/go.sum
index 63c62ab65..f49f11068 100644
--- a/go.sum
+++ b/go.sum
@@ -10,12 +10,19 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDf
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
+github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/judwhite/go-svc v1.0.0 h1:W447kYhZsqC14hkfNG8XLy9wbYibeMW75g5DtAIpFGw=
 github.com/judwhite/go-svc v1.0.0/go.mod h1:EeMSAFO3mLgEQfcvnZ50JDG0O1uQlagpAbMS6talrXE=
 github.com/julienschmidt/httprouter v1.2.0 h1:TDTW5Yz1mjftljbcKqRcrYhd4XeOoI98t+9HbQbYf7g=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6 h1:frRvTmIp7QT1RPaphBvr6zvEHfvdOX7jMO7rvicCH9Q=
 github.com/mreiferson/go-options v0.0.0-20190302015348-0c63f026bcd6/go.mod h1:zHtCks/HQvOt8ATyfwVe3JJq2PPuImzXINPRTC03+9w=
 github.com/nsqio/go-diskqueue v0.0.0-20180306152900-74cfbc9de839 h1:nZ0z0haJRzCXAWH9Jl+BUnfD2n2MCSbGRSl8VBX+zR0=
@@ -26,5 +33,24 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6 h1:IcgEB62HYgAhX0Nd/QrVgZlxlcyxbGQHElLUhW2X4Fo=
-golang.org/x/sys v0.0.0-20181221143128-b4a75ba826a6/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
+github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ=
+golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aWCh3PIquJB2fYlv9wcs=
+golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
+google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
diff --git a/nsqd/channel.go b/nsqd/channel.go
index ad9c66ce9..1eb66d1a2 100644
--- a/nsqd/channel.go
+++ b/nsqd/channel.go
@@ -106,7 +106,7 @@ func NewChannel(topicName string, channelName string, ctx *context,
 			ctx.nsqd.getOpts().DataPath,
 			ctx.nsqd.getOpts().MaxBytesPerFile,
 			int32(minValidMsgLength),
-			int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
+			int32(ctx.nsqd.getOpts().MaxMsgSize)+maxValidMsgLength,
 			ctx.nsqd.getOpts().SyncEvery,
 			ctx.nsqd.getOpts().SyncTimeout,
 			dqLogf,
diff --git a/nsqd/http.go b/nsqd/http.go
index 9914b093f..0c92d73e8 100644
--- a/nsqd/http.go
+++ b/nsqd/http.go
@@ -223,7 +223,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
 	}
 
 	msg := NewMessage(topic.GenerateID(), body)
-	msg.deferred = deferred
+	msg.AbsTs = time.Now().Add(deferred).UnixNano()
 	err = topic.PutMessage(msg)
 	if err != nil {
 		return nil, http_api.Err{503, "EXITING"}
diff --git a/nsqd/message.go b/nsqd/message.go
index 77ee4c79d..c1649b099 100644
--- a/nsqd/message.go
+++ b/nsqd/message.go
@@ -3,30 +3,49 @@ package nsqd
 import (
 	"bytes"
 	"encoding/binary"
+	"errors"
 	"fmt"
 	"io"
+	"math"
 	"time"
+
+	"github.com/vmihailenco/msgpack"
+)
+
+var (
+	// First 4 bytes picked from hex representation of a day before epoch timestamp which should never exist in normal timestamp.
+	// python3 -c 'import struct; import datetime; print(struct.pack(">Q", int((datetime.datetime(1990, 1, 1).timestamp() - 60*60*24) * 10**9)))'
+	msgMagic = []byte{0x08, 0xc1, 0xe4, 0xa0}
+
+	metaKey               = []byte("meta")
+	bodyKey               = []byte("body")
+	metaLengthPlaceholder = []byte("01")       // 2 bytes
+	bodyLengthPlaceholder = []byte("01234567") // 8 bytes. Because `MaxMsgSize` is in int64 type.
+
+	// No const or reference directly are mainly used for unit test.
+	maxMetaLen uint16 = math.MaxUint16
 )
 
 const (
 	MsgIDLength       = 16
-	minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
+	minValidMsgLength = 4 + 4 + 2 + 4 + 8                  // msgMagic + metaKey + metaLen + bodyKey + bodyLen
+	maxValidMsgLength = minValidMsgLength + math.MaxUint16 // minValidMsgLength + maxMetaLength
 )
 
 type MessageID [MsgIDLength]byte
 
 type Message struct {
-	ID        MessageID
-	Body      []byte
-	Timestamp int64
-	Attempts  uint16
+	ID        MessageID `msgpack:"message_id"`
+	Body      []byte    `msgpack:"-"`
+	Timestamp int64     `msgpack:"timestamp"`
+	Attempts  uint16    `msgpack:"attempts"`
+	AbsTs     int64     `msgpack:"abs_ts"`
 
 	// for in-flight handling
 	deliveryTS time.Time
 	clientID   int64
 	pri        int64
 	index      int
-	deferred   time.Duration
 }
 
 func NewMessage(id MessageID, body []byte) *Message {
@@ -38,19 +57,45 @@ func NewMessage(id MessageID, body []byte) *Message {
 }
 
 func (m *Message) WriteTo(w io.Writer) (int64, error) {
-	var buf [10]byte
 	var total int64
 
-	binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
-	binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
+	// magic bytes
+	n, err := w.Write(msgMagic)
+	total += int64(n)
+	if err != nil {
+		return total, err
+	}
+
+	// meta bytes
+	meta, err := msgpack.Marshal(m)
+	if err != nil {
+		return total, err
+	}
+
+	if len(meta) > int(maxMetaLen) {
+		return total, errors.New("marshaled meta data length exceeds max meta length")
+	}
+
+	var metaPrefix = append(metaKey, metaLengthPlaceholder...)
+	binary.BigEndian.PutUint16(metaPrefix[4:4+len(metaLengthPlaceholder)], uint16(len(meta)))
+
+	n, err = w.Write(metaPrefix[:])
+	total += int64(n)
+	if err != nil {
+		return total, err
+	}
 
-	n, err := w.Write(buf[:])
+	n, err = w.Write(meta)
 	total += int64(n)
 	if err != nil {
 		return total, err
 	}
 
-	n, err = w.Write(m.ID[:])
+	// msg body
+	bodyPrefix := append(bodyKey, bodyLengthPlaceholder...)
+	binary.BigEndian.PutUint64(bodyPrefix[4:4+len(bodyLengthPlaceholder)], uint64(len(m.Body)))
+
+	n, err = w.Write(bodyPrefix[:])
 	total += int64(n)
 	if err != nil {
 		return total, err
@@ -67,6 +112,8 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
 
 // decodeMessage deserializes data (as []byte) and creates a new Message
 // message format:
+//
+// Old message format:
 // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
 // |       (int64)        ||    ||      (hex string encoded in ASCII)           || (binary)
 // |       8-byte         ||    ||                 16-byte                      || N-byte
@@ -75,18 +122,45 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
 //                        (uint16)
 //                         2-byte
 //                        attempts
+//
+// New message format:
+// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
+// | ([]byte)||      (metaKey+metaLen+meta)      ||         (bodyKey+bodyLen+body)
+// |  4-byte ||           (4+2+N)-byte           ||              (4+8+N)-byte
+// ------------------------------------------------------------------------------------------...
+// message magic          message meta                           message body
+//
 func decodeMessage(b []byte) (*Message, error) {
 	var msg Message
 
-	if len(b) < minValidMsgLength {
-		return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
+	prefixBytes := b[:len(msgMagic)]
+	if bytes.Equal(prefixBytes, msgMagic) {
+		// New message format
+		metaStartIndex := len(msgMagic)
+		if !bytes.Equal(b[metaStartIndex:metaStartIndex+len(metaKey)], metaKey) {
+			return nil, fmt.Errorf("bad msg format. \"meta\" key should be after msg magic")
+		}
+
+		metaSize := binary.BigEndian.Uint16(b[metaStartIndex+len(metaKey) : metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)])
+		err := msgpack.Unmarshal(b[metaStartIndex+len(metaKey)+len(metaLengthPlaceholder):metaStartIndex+len(metaKey)+len(metaLengthPlaceholder)+int(metaSize)], &msg)
+		if err != nil {
+			return nil, err
+		}
+
+		bodyStartIndex := metaStartIndex + len(bodyKey) + len(metaLengthPlaceholder) + int(metaSize)
+		if !bytes.Equal(b[bodyStartIndex:bodyStartIndex+len(bodyKey)], bodyKey) {
+			return nil, fmt.Errorf("bad msg format. \"body\" key should be after meta content")
+		}
+		bodySize := binary.BigEndian.Uint64(b[bodyStartIndex+len(bodyKey) : bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder)])
+		msg.Body = b[bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder) : uint64(bodyStartIndex+len(bodyKey)+len(bodyLengthPlaceholder))+bodySize]
+	} else {
+		// Old message format
+		msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
+		msg.Attempts = binary.BigEndian.Uint16(b[8:10])
+		copy(msg.ID[:], b[10:10+MsgIDLength])
+		msg.Body = b[10+MsgIDLength:]
 	}
 
-	msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
-	msg.Attempts = binary.BigEndian.Uint16(b[8:10])
-	copy(msg.ID[:], b[10:10+MsgIDLength])
-	msg.Body = b[10+MsgIDLength:]
-
 	return &msg, nil
 }
 
diff --git a/nsqd/message_test.go b/nsqd/message_test.go
new file mode 100644
index 000000000..f1a45759a
--- /dev/null
+++ b/nsqd/message_test.go
@@ -0,0 +1,416 @@
+package nsqd
+
+import (
+	"bytes"
+	"encoding/binary"
+	"encoding/json"
+	"math"
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/vmihailenco/msgpack"
+)
+
+type messageWithAbsTsDelete struct {
+	ID        MessageID `msgpack:"message_id"`
+	Body      []byte    `msgpack:"-"`
+	Timestamp int64     `msgpack:"timestamp"`
+	Attempts  uint16    `msgpack:"attempts"`
+
+	// for in-flight handling
+	deliveryTS time.Time
+	clientID   int64
+	pri        int64
+	index      int
+}
+
+func TestMessageWithAbsTsDeleteAfterMarshaling(t *testing.T) {
+	now := time.Now()
+
+	cases := []struct {
+		Desc string
+		Msg  *Message
+	}{
+		{
+			Desc: "nonzero AbsTs",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+				Attempts:  1,
+				AbsTs:     now.Add(time.Second).UnixNano(),
+			},
+		},
+		{
+			Desc: "zero AbsTs",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+				Attempts:  1,
+			},
+		},
+	}
+
+	for _, ut := range cases {
+		meta, err := msgpack.Marshal(ut.Msg)
+		if err != nil {
+			t.Fatalf("Desc: %q. Msgpack marshal error: %v", ut.Desc, err)
+		}
+
+		var unMarshaledMsgWithAbsTsDelete messageWithAbsTsDelete
+		err = msgpack.Unmarshal(meta, &unMarshaledMsgWithAbsTsDelete)
+		if err != nil {
+			t.Fatalf("Desc: %q. Msgpack unmarshal error: %v", ut.Desc, err)
+		}
+
+		originalMsg := *ut.Msg
+		originalMsg.Body = nil
+		if !(reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.ID, originalMsg.ID) &&
+			reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.Body, originalMsg.Body) &&
+			reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.Timestamp, originalMsg.Timestamp) &&
+			reflect.DeepEqual(unMarshaledMsgWithAbsTsDelete.Attempts, originalMsg.Attempts)) {
+			t.Fatalf("Desc: %q. MarshaledMessage unmarshal does not equal to original msg. Unmarshaled msg: %#v, orignal msg: %#v",
+				ut.Desc, unMarshaledMsgWithAbsTsDelete, originalMsg)
+		}
+	}
+}
+
+type messageWithAddedFieldAdd struct {
+	ID         MessageID `msgpack:"message_id"`
+	Body       []byte    `msgpack:"-"`
+	Timestamp  int64     `msgpack:"timestamp"`
+	Attempts   uint16    `msgpack:"attempts"`
+	AbsTs      int64     `msgpack:"abs_ts"`
+	AddedField []byte    `msgpack:"added_field"`
+
+	// for in-flight handling
+	deliveryTS time.Time
+	clientID   int64
+	pri        int64
+	index      int
+}
+
+func TestMessageWithAddedFieldAddMarshaling(t *testing.T) {
+	now := time.Now()
+
+	cases := []struct {
+		Desc string
+		Msg  *Message
+	}{
+		{
+			Desc: "no AddedField",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+				Attempts:  1,
+				AbsTs:     now.Add(time.Second).UnixNano(),
+			},
+		},
+	}
+
+	for _, ut := range cases {
+		meta, err := msgpack.Marshal(ut.Msg)
+		if err != nil {
+			t.Fatalf("Desc: %q. Msgpack marshal error: %v", ut.Desc, err)
+		}
+
+		var unMarshaledMsgWithAddedFieldAdd messageWithAddedFieldAdd
+		err = msgpack.Unmarshal(meta, &unMarshaledMsgWithAddedFieldAdd)
+		if err != nil {
+			t.Fatalf("Desc: %q. Msgpack unmarshal error: %v", ut.Desc, err)
+		}
+
+		originalMsg := *ut.Msg
+		originalMsg.Body = nil
+		if !(reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.ID, originalMsg.ID) &&
+			reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.Body, originalMsg.Body) &&
+			reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.Timestamp, originalMsg.Timestamp) &&
+			reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.Attempts, originalMsg.Attempts) &&
+			reflect.DeepEqual(unMarshaledMsgWithAddedFieldAdd.AbsTs, originalMsg.AbsTs) &&
+			unMarshaledMsgWithAddedFieldAdd.AddedField == nil) {
+			t.Fatalf("Desc: %q. MarshaledMessage unmarshal does not equal to original msg. Unmarshaled msg: %#v, orignal msg: %#v",
+				ut.Desc, unMarshaledMsgWithAddedFieldAdd, originalMsg)
+		}
+	}
+}
+
+func TestMessageMarshalAndUnmarshal(t *testing.T) {
+	now := time.Now()
+
+	cases := []struct {
+		Desc string
+		Msg  *Message
+	}{
+		{
+			Desc: "zero Attempts",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+				AbsTs:     now.Add(time.Second).UnixNano(),
+			},
+		},
+		{
+			Desc: "zero AbsTs, normal msg",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+				Attempts:  1,
+			},
+		},
+		{
+			Desc: "zero AbsTs and zero Attempts",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+			},
+		},
+		{
+			Desc: "defer msg",
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Body:      []byte("abc"),
+				Timestamp: now.UnixNano(),
+				Attempts:  1,
+				AbsTs:     now.Add(time.Second).UnixNano(),
+			},
+		},
+	}
+
+	for _, ut := range cases {
+		meta, err := msgpack.Marshal(ut.Msg)
+		if err != nil {
+			t.Fatalf("Desc: %q. Msgpack marshal error: %v", ut.Desc, err)
+		}
+
+		var unMarshaledMsg Message
+		err = msgpack.Unmarshal(meta, &unMarshaledMsg)
+		if err != nil {
+			t.Fatalf("Desc: %q. Msgpack unmarshal error: %v", ut.Desc, err)
+		}
+
+		originalMsg := *ut.Msg
+		originalMsg.Body = nil
+		if !reflect.DeepEqual(unMarshaledMsg, originalMsg) {
+			t.Fatalf("Desc: %q. MarshaledMessage unmarshal does not equal to original msg. Unmarshaled msg: %#v, orignal msg: %#v",
+				ut.Desc, unMarshaledMsg, originalMsg)
+		}
+	}
+}
+
+func BenchmarkMessageJsonMarshalAndUnmarshal(b *testing.B) {
+	msg := Message{
+		ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+		Timestamp: time.Now().UnixNano(),
+		Attempts:  1,
+		AbsTs:     time.Now().Add(time.Second).UnixNano(),
+	}
+
+	content, err := json.Marshal(msg)
+	if err != nil {
+		b.Fatalf("Json marshal error: %v", err)
+	}
+
+	b.SetBytes(int64(len(content)))
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		content, err := json.Marshal(msg)
+		if err != nil {
+			b.Fatalf("Json marshal error: %v", err)
+		}
+
+		var tmp Message
+		err = json.Unmarshal(content, &tmp)
+		if err != nil {
+			b.Fatalf("Json unmarshal error: %v", err)
+		}
+	}
+}
+
+func BenchmarkMessageMsgPackMarshalAndUnmarshal(b *testing.B) {
+	msg := Message{
+		ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+		Timestamp: time.Now().UnixNano(),
+		Attempts:  1,
+		AbsTs:     time.Now().Add(time.Second).UnixNano(),
+	}
+
+	content, err := msgpack.Marshal(&msg)
+	if err != nil {
+		b.Fatalf("Msgpack marshal error: %v", err)
+	}
+
+	b.SetBytes(int64(len(content)))
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		content, err := msgpack.Marshal(&msg)
+		if err != nil {
+			b.Fatalf("Msgpack marshal error: %v", err)
+		}
+
+		var tmp Message
+		err = msgpack.Unmarshal(content, &tmp)
+		if err != nil {
+			b.Fatalf("Msgpack unmarshal error: %v", err)
+		}
+	}
+}
+
+func MarshalMessageInNewFormat(t *testing.T, m *Message) []byte {
+	var ret []byte
+	ret = append(ret, msgMagic...)
+	ret = append(ret, metaKey...)
+
+	meta, err := msgpack.Marshal(&m)
+	if err != nil {
+		t.Fatalf("")
+	}
+	var metaLen [2]byte
+	binary.BigEndian.PutUint16(metaLen[:], uint16(len(meta)))
+	ret = append(ret, metaLen[:]...)
+	ret = append(ret, meta...)
+
+	ret = append(ret, bodyKey...)
+	var bodyLen [8]byte
+	binary.BigEndian.PutUint64(bodyLen[:], uint64(len(m.Body)))
+	ret = append(ret, bodyLen[:]...)
+	ret = append(ret, m.Body...)
+
+	return ret
+}
+
+func MarshalMessageInOldFormat(t *testing.T, m *Message) []byte {
+	var bf bytes.Buffer
+
+	var buf [10]byte
+	var total int64
+
+	binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
+	binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))
+
+	n, err := bf.Write(buf[:])
+	total += int64(n)
+	if err != nil {
+		t.Fatalf("Write old format message error: %v", err)
+	}
+
+	n, err = bf.Write(m.ID[:])
+	total += int64(n)
+	if err != nil {
+		t.Fatalf("Write old format message error: %v", err)
+	}
+
+	n, err = bf.Write(m.Body)
+	total += int64(n)
+	if err != nil {
+		t.Fatalf("Write old format message error: %v", err)
+	}
+
+	return bf.Bytes()
+}
+
+func TestMessage_WriteTo(t *testing.T) {
+	now := time.Now()
+
+	cases := []struct {
+		Desc       string
+		MaxMetaLen uint16
+		Msg        *Message
+		WantedErr  bool
+	}{
+		{
+			Desc:       "normal message",
+			MaxMetaLen: math.MaxUint16,
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Timestamp: time.Now().UnixNano(),
+				Attempts:  1,
+				AbsTs:     now.Add(time.Second).UnixNano(),
+				Body:      []byte("abc"),
+			},
+			WantedErr: false,
+		},
+		{
+			Desc:       "empty body message",
+			MaxMetaLen: math.MaxUint16,
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Timestamp: time.Now().UnixNano(),
+				Attempts:  1,
+				AbsTs:     now.Add(time.Second).UnixNano(),
+				Body:      []byte(""),
+			},
+			WantedErr: false,
+		},
+		{
+			Desc:       "Exceed max meta length",
+			MaxMetaLen: 1,
+			Msg: &Message{
+				ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+				Timestamp: time.Now().UnixNano(),
+				Attempts:  1,
+				AbsTs:     now.Add(time.Second).UnixNano(),
+				Body:      []byte("abc"),
+			},
+			WantedErr: true,
+		},
+	}
+
+	for _, ut := range cases {
+		oldMaxMetaLen := maxMetaLen
+
+		maxMetaLen = ut.MaxMetaLen
+
+		bf := &bytes.Buffer{}
+		writtenLen, err := ut.Msg.WriteTo(bf)
+		if (err != nil && !ut.WantedErr) || (err == nil && ut.WantedErr) {
+			t.Fatalf("Desc: %q. WriteTo for message got error: %v, wanted error: %v", ut.Desc, err, ut.WantedErr)
+		}
+
+		wanted := MarshalMessageInNewFormat(t, ut.Msg)
+		if writtenLen != int64(len(wanted)) && bytes.EqualFold(bf.Bytes(), wanted) {
+			t.Fatalf("Desc: %q. WriteTo output does not match wanted bytes. Got: %v, wanted: %v", ut.Desc, bf.Bytes(), wanted)
+		}
+
+		maxMetaLen = oldMaxMetaLen
+	}
+}
+
+func TestDecodeMessage(t *testing.T) {
+	msg := &Message{
+		ID:        MessageID{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
+		Timestamp: time.Now().UnixNano(),
+		Attempts:  1,
+		Body:      []byte("abc"),
+	}
+
+	cases := []struct {
+		Desc             string
+		MarshaledMessage []byte
+	}{
+		{
+			Desc:             "old message format",
+			MarshaledMessage: MarshalMessageInOldFormat(t, msg),
+		},
+		{
+			Desc:             "new message format",
+			MarshaledMessage: MarshalMessageInNewFormat(t, msg),
+		},
+	}
+
+	for _, ut := range cases {
+		gotMsg, err := decodeMessage(ut.MarshaledMessage)
+		if err != nil {
+			t.Fatalf("Desc: %q. Decode message error: %v", ut.Desc, err)
+		}
+
+		if !reflect.DeepEqual(gotMsg, msg) {
+			t.Fatalf("Desc: %q. Decoded message does not equal to wanted message. Got: %#v, wanted: %#v", ut.Desc, *gotMsg, *msg)
+		}
+	}
+}
diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go
index 31aeeacba..1d6b83fb3 100644
--- a/nsqd/protocol_v2.go
+++ b/nsqd/protocol_v2.go
@@ -309,6 +309,18 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
 				p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
 				continue
 			}
+
+			var deferred time.Duration
+			nowInNano := time.Now().UnixNano()
+			if nowInNano-msg.AbsTs < 0 {
+				deferred = time.Duration(msg.AbsTs - nowInNano)
+			}
+
+			if deferred != 0 {
+				subChannel.PutMessageDeferred(msg, deferred)
+				continue
+			}
+
 			msg.Attempts++
 
 			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
@@ -916,7 +928,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
 
 	topic := p.ctx.nsqd.GetTopic(topicName)
 	msg := NewMessage(topic.GenerateID(), messageBody)
-	msg.deferred = timeoutDuration
+	msg.AbsTs = time.Now().Add(timeoutDuration).UnixNano()
 	err = topic.PutMessage(msg)
 	if err != nil {
 		return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error())
diff --git a/nsqd/topic.go b/nsqd/topic.go
index c9884fe22..efea0b430 100644
--- a/nsqd/topic.go
+++ b/nsqd/topic.go
@@ -71,7 +71,7 @@ func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topi
 			ctx.nsqd.getOpts().DataPath,
 			ctx.nsqd.getOpts().MaxBytesPerFile,
 			int32(minValidMsgLength),
-			int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
+			int32(ctx.nsqd.getOpts().MaxMsgSize)+maxValidMsgLength,
 			ctx.nsqd.getOpts().SyncEvery,
 			ctx.nsqd.getOpts().SyncTimeout,
 			dqLogf,
@@ -307,6 +307,12 @@ func (t *Topic) messagePump() {
 			goto exit
 		}
 
+		var deferred time.Duration
+		nowInNano := time.Now().UnixNano()
+		if nowInNano-msg.AbsTs < 0 {
+			deferred = time.Duration(msg.AbsTs - nowInNano)
+		}
+
 		for i, channel := range chans {
 			chanMsg := msg
 			// copy the message because each channel
@@ -316,10 +322,9 @@ func (t *Topic) messagePump() {
 			if i > 0 {
 				chanMsg = NewMessage(msg.ID, msg.Body)
 				chanMsg.Timestamp = msg.Timestamp
-				chanMsg.deferred = msg.deferred
 			}
-			if chanMsg.deferred != 0 {
-				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
+			if deferred != 0 {
+				channel.PutMessageDeferred(chanMsg, deferred)
 				continue
 			}
 			err := channel.PutMessage(chanMsg)