diff --git a/nsqd/http.go b/nsqd/http.go index 714b74eef..2388f10ea 100644 --- a/nsqd/http.go +++ b/nsqd/http.go @@ -224,6 +224,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout msg := NewMessage(topic.GenerateID(), body) msg.deferred = deferred + msg.absTs = time.Now().Add(msg.deferred).UnixNano() err = topic.PutMessage(msg) if err != nil { return nil, http_api.Err{503, "EXITING"} diff --git a/nsqd/message.go b/nsqd/message.go index 77ee4c79d..a68aaf9e1 100644 --- a/nsqd/message.go +++ b/nsqd/message.go @@ -13,6 +13,8 @@ const ( minValidMsgLength = MsgIDLength + 8 + 2 // Timestamp + Attempts ) +var deferMsgMagicFlag = []byte("#DEFER_MSG#") + type MessageID [MsgIDLength]byte type Message struct { @@ -26,7 +28,10 @@ type Message struct { clientID int64 pri int64 index int - deferred time.Duration + + // for defer message handling + deferred time.Duration + absTs int64 } func NewMessage(id MessageID, body []byte) *Message { @@ -65,6 +70,51 @@ func (m *Message) WriteTo(w io.Writer) (int64, error) { return total, nil } +func (m *Message) WriteToBackend(w io.Writer) (int64, error) { + var buf [10]byte + var total int64 + + binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp)) + binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts)) + + n, err := w.Write(buf[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.ID[:]) + total += int64(n) + if err != nil { + return total, err + } + + n, err = w.Write(m.Body) + total += int64(n) + if err != nil { + return total, err + } + + if m.deferred != 0 { + n, err = w.Write(deferMsgMagicFlag) + total += int64(n) + if err != nil { + return total, err + } + + var deferBuf [8]byte + binary.BigEndian.PutUint64(deferBuf[:8], uint64(m.absTs)) + + n, err := w.Write(deferBuf[:]) + total += int64(n) + if err != nil { + return total, err + } + } + + return total, nil +} + // decodeMessage deserializes data (as []byte) and creates a new Message // message format: // [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... @@ -85,14 +135,20 @@ func decodeMessage(b []byte) (*Message, error) { msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8])) msg.Attempts = binary.BigEndian.Uint16(b[8:10]) copy(msg.ID[:], b[10:10+MsgIDLength]) - msg.Body = b[10+MsgIDLength:] + + if bytes.Equal(b[len(b)-8-len(deferMsgMagicFlag):len(b)-8], deferMsgMagicFlag) { + msg.absTs = int64(binary.BigEndian.Uint64(b[len(b)-8:])) + msg.Body = b[10+MsgIDLength : len(b)-8-len(deferMsgMagicFlag)] + } else { + msg.Body = b[10+MsgIDLength:] + } return &msg, nil } func writeMessageToBackend(buf *bytes.Buffer, msg *Message, bq BackendQueue) error { buf.Reset() - _, err := msg.WriteTo(buf) + _, err := msg.WriteToBackend(buf) if err != nil { return err } diff --git a/nsqd/protocol_v2.go b/nsqd/protocol_v2.go index c2e7d7b42..c8832cadd 100644 --- a/nsqd/protocol_v2.go +++ b/nsqd/protocol_v2.go @@ -309,6 +309,17 @@ func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + + nowInNano := time.Now().UnixNano() + if nowInNano-msg.absTs < 0 { + msg.deferred = time.Duration(msg.absTs - nowInNano) + } + + if msg.deferred != 0 { + subChannel.PutMessageDeferred(msg, msg.deferred) + continue + } + msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) diff --git a/nsqd/topic.go b/nsqd/topic.go index e41be2b0c..2455f9721 100644 --- a/nsqd/topic.go +++ b/nsqd/topic.go @@ -279,6 +279,11 @@ func (t *Topic) messagePump() { t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } + + nowInNano := time.Now().UnixNano() + if nowInNano-msg.absTs < 0 { + msg.deferred = time.Duration(msg.absTs - nowInNano) + } case <-t.channelUpdateChan: chans = chans[:0] t.RLock()