From dcf2a5063e54991b2ec3e106d1622dd623f0d6e9 Mon Sep 17 00:00:00 2001 From: Andy Xie Date: Thu, 21 Feb 2019 12:24:23 +0800 Subject: [PATCH] keep defer property under nsqd restart --- nsqd/channel.go | 1 + nsqd/message.go | 62 ++++++++++++++++++++++++++++++++++++++++++--- nsqd/protocol_v2.go | 11 ++++++++ nsqd/topic.go | 5 ++++ 4 files changed, 76 insertions(+), 3 deletions(-) diff --git a/nsqd/channel.go b/nsqd/channel.go index 95a7adb21..1dc918e71 100644 --- a/nsqd/channel.go +++ b/nsqd/channel.go @@ -431,6 +431,7 @@ func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error { absTs := time.Now().Add(timeout).UnixNano() + msg.absTs = absTs item := &pqueue.Item{Value: msg, Priority: absTs} err := c.pushDeferredMessage(item) if err != nil { diff --git a/nsqd/message.go b/nsqd/message.go index 77ee4c79d..a68aaf9e1 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -13,6 +13,8 @@ const ( minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts ) +var deferMsgMagicFlag = []byte("#DEFER_MSG#") + type MessageID [MsgIDLength]byte type Message struct { @@ -26,7 +28,10 @@ type Message struct { clientID int64 pri int64 index int - deferred time.Duration + + // for defer message handling + deferred time.Duration + absTs int64 } func NewMessage(id MessageID, body []byte) *Message { @@ -65,6 +70,51 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { return total, nil } +func (m *Message) WriteToBackend(w io.Writer) (int64, error) { + var buf [10]byte + var total int64 + + binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) + binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) + + n, err := w.Write(buf[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.ID[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.Body) + total += int64(n) + if err != nil { + return total, err + } + + if m.deferred != 0 { + n, err = w.Write(deferMsgMagicFlag) + total += int64(n) + if err != nil { + return total, err + } + + var deferBuf [8]byte + binary.BigEndian.PutUint64(deferBuf[:8], uint64(m.absTs)) + + n, err := w.Write(deferBuf[:]) + total += int64(n) + if err != nil { + return total, err + } + } + + return total, nil +} + // decodeMessage deserializes data (as []byte) and creates a new Message // message format: // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... @@ -85,14 +135,20 @@ func decodeMessage(b []byte) (*Message, error) { msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) copy(msg.ID[:], b[10:10+MsgIDLength]) - msg.Body = b[10+MsgIDLength:] + + if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) { + msg.absTs = int64(binary.BigEndian.Uint64(b[len(b)-8:])) + msg.Body = b[10+MsgIDLength : len(b)-8-len(deferMsgMagicFlag)] + } else { + msg.Body = b[10+MsgIDLength:] + } return &msg, nil } func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error { buf.Reset() - _, err := msg.WriteTo(buf) + _, err := msg.WriteToBackend(buf) if err != nil { return err } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c2e7d7b42..b0e02eaa8 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -309,6 +309,17 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + + nowInNano := time.Now().UnixNano() + if nowInNano-msg.absTs > 0 { + msg.deferred = time.Duration(nowInNano-msg.absTs) / time.Millisecond + } + + if msg.deferred != 0 { + subChannel.PutMessageDeferred(msg, msg.deferred) + continue + } + msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) diff --git a/nsqd/topic.go b/nsqd/topic.go index e41be2b0c..8c5160b24 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -279,6 +279,11 @@ func (t *Topic) messagePump() { t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + + nowInNano := time.Now().UnixNano() + if nowInNano-msg.absTs > 0 { + msg.deferred = time.Duration(nowInNano-msg.absTs) / time.Millisecond + } case <-t.channelUpdateChan: chans = chans[:0] t.RLock()