Skip to content

Commit

Permalink
keep defer property under nsqd restart
Browse files Browse the repository at this point in the history
  • Loading branch information
andyxning committed Feb 21, 2019
1 parent cbdcd54 commit 43879e2
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 3 deletions.
1 change: 1 addition & 0 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
absTs := time.Now().Add(timeout).UnixNano()
msg.absTs = absTs
item := &pqueue.Item{Value: msg, Priority: absTs}
err := c.pushDeferredMessage(item)
if err != nil {
Expand Down
62 changes: 59 additions & 3 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ const (
minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts
)

var deferMsgMagicFlag = []byte("#DEFER_MSG#")

type MessageID [MsgIDLength]byte

type Message struct {
Expand All @@ -26,7 +28,10 @@ type Message struct {
clientID int64
pri int64
index int
deferred time.Duration

// for defer message handling
deferred time.Duration
absTs int64
}

func NewMessage(id MessageID, body []byte) *Message {
Expand Down Expand Up @@ -65,6 +70,51 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) {
return total, nil
}

func (m *Message) WriteToBackend(w io.Writer) (int64, error) {
var buf [10]byte
var total int64

binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))

n, err := w.Write(buf[:])
total += int64(n)
if err != nil {
return total, err
}

n, err = w.Write(m.ID[:])
total += int64(n)
if err != nil {
return total, err
}

n, err = w.Write(m.Body)
total += int64(n)
if err != nil {
return total, err
}

if m.deferred != 0 {
n, err = w.Write(deferMsgMagicFlag)
total += int64(n)
if err != nil {
return total, err
}

var deferBuf [8]byte
binary.BigEndian.PutUint64(deferBuf[:8], uint64(m.absTs))

n, err := w.Write(deferBuf[:])
total += int64(n)
if err != nil {
return total, err
}
}

return total, nil
}

// decodeMessage deserializes data (as []byte) and creates a new Message
// message format:
// [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]...
Expand All @@ -85,14 +135,20 @@ func decodeMessage(b []byte) (*Message, error) {
msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
copy(msg.ID[:], b[10:10+MsgIDLength])
msg.Body = b[10+MsgIDLength:]

if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) {
msg.absTs = int64(binary.BigEndian.Uint64(b[len(b)-8:]))
msg.Body = b[10+MsgIDLength : len(b)-8-len(deferMsgMagicFlag)]
} else {
msg.Body = b[10+MsgIDLength:]
}

return &msg, nil
}

func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error {
buf.Reset()
_, err := msg.WriteTo(buf)
_, err := msg.WriteToBackend(buf)
if err != nil {
return err
}
Expand Down
11 changes: 11 additions & 0 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,17 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}

nowInNano := time.Now().UnixNano()
if nowInNano-msg.absTs < 0 {
msg.deferred = time.Duration(msg.absTs - nowInNano)
}

if msg.deferred != 0 {
subChannel.PutMessageDeferred(msg, msg.deferred)
continue
}

msg.Attempts++

subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
Expand Down
5 changes: 5 additions & 0 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func (t *Topic) messagePump() {
t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}

nowInNano := time.Now().UnixNano()
if nowInNano-msg.absTs < 0 {
msg.deferred = time.Duration(msg.absTs - nowInNano)
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
Expand Down

0 comments on commit 43879e2

Please sign in to comment.