Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: switch to monotonic clocks for ID generation #1249

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions internal/pqueue/pqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package pqueue

import (
"container/heap"
"time"
)

type Item struct {
Value interface{}
Priority int64
Priority time.Time
Index int
}

Expand All @@ -23,7 +24,7 @@ func (pq PriorityQueue) Len() int {
}

func (pq PriorityQueue) Less(i, j int) bool {
return pq[i].Priority < pq[j].Priority
return pq[i].Priority.Before(pq[j].Priority)
}

func (pq PriorityQueue) Swap(i, j int) {
Expand Down Expand Up @@ -60,14 +61,14 @@ func (pq *PriorityQueue) Pop() interface{} {
return item
}

func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
func (pq *PriorityQueue) PeekAndShift(max time.Time) (*Item, time.Duration) {
if pq.Len() == 0 {
return nil, 0
}

item := (*pq)[0]
if item.Priority > max {
return nil, item.Priority - max
if item.Priority.After(max) {
return nil, item.Priority.Sub(max)
}
heap.Remove(pq, 0)

Expand Down
13 changes: 7 additions & 6 deletions internal/pqueue/pqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"runtime"
"sort"
"testing"
"time"
)

func equal(t *testing.T, act, exp interface{}) {
Expand All @@ -24,7 +25,7 @@ func TestPriorityQueue(t *testing.T) {
pq := New(c)

for i := 0; i < c+1; i++ {
heap.Push(&pq, &Item{Value: i, Priority: int64(i)})
heap.Push(&pq, &Item{Value: i, Priority: time.Unix(int64(i), 0)})
}
equal(t, pq.Len(), c+1)
equal(t, cap(pq), c*2)
Expand All @@ -44,16 +45,16 @@ func TestUnsortedInsert(t *testing.T) {
for i := 0; i < c; i++ {
v := rand.Int()
ints = append(ints, v)
heap.Push(&pq, &Item{Value: i, Priority: int64(v)})
heap.Push(&pq, &Item{Value: i, Priority: time.Unix(int64(v), 0)})
}
equal(t, pq.Len(), c)
equal(t, cap(pq), c)

sort.Ints(ints)

for i := 0; i < c; i++ {
item, _ := pq.PeekAndShift(int64(ints[len(ints)-1]))
equal(t, item.Priority, int64(ints[i]))
item, _ := pq.PeekAndShift(time.Unix(int64(ints[len(ints)-1]), 0))
equal(t, item.Priority, time.Unix(int64(ints[i]), 0))
}
}

Expand All @@ -63,7 +64,7 @@ func TestRemove(t *testing.T) {

for i := 0; i < c; i++ {
v := rand.Int()
heap.Push(&pq, &Item{Value: "test", Priority: int64(v)})
heap.Push(&pq, &Item{Value: "test", Priority: time.Unix(int64(v), 0)})
}

for i := 0; i < 10; i++ {
Expand All @@ -73,7 +74,7 @@ func TestRemove(t *testing.T) {
lastPriority := heap.Pop(&pq).(*Item).Priority
for i := 0; i < (c - 10 - 1); i++ {
item := heap.Pop(&pq)
equal(t, lastPriority < item.(*Item).Priority, true)
equal(t, lastPriority.Before(item.(*Item).Priority), true)
lastPriority = item.(*Item).Priority
}
}
4 changes: 2 additions & 2 deletions internal/quantile/quantile.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ func (q *Quantile) Result() *Result {
return &result
}

func (q *Quantile) Insert(msgStartTime int64) {
func (q *Quantile) Insert(msgStartTime time.Time) {
q.Lock()

now := time.Now()
for q.IsDataStale(now) {
q.moveWindow()
}

q.currentStream.Insert(float64(now.UnixNano() - msgStartTime))
q.currentStream.Insert(float64(now.Sub(msgStartTime).Nanoseconds()))
q.Unlock()
}

Expand Down
10 changes: 5 additions & 5 deletions nsqd/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (c *Channel) TouchMessage(clientID int64, id MessageID, clientMsgTimeout ti
newTimeout = msg.deliveryTS.Add(c.ctx.nsqd.getOpts().MaxMsgTimeout)
}

msg.pri = newTimeout.UnixNano()
msg.pri = newTimeout
err = c.pushInFlightMessage(msg)
if err != nil {
return err
Expand Down Expand Up @@ -431,7 +431,7 @@ func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
msg.pri = now.Add(timeout).UnixNano()
msg.pri = now.Add(timeout)
err := c.pushInFlightMessage(msg)
if err != nil {
return err
Expand All @@ -441,7 +441,7 @@ func (c *Channel) StartInFlightTimeout(msg *Message, clientID int64, timeout tim
}

func (c *Channel) StartDeferredTimeout(msg *Message, timeout time.Duration) error {
absTs := time.Now().Add(timeout).UnixNano()
absTs := time.Now().Add(timeout)
item := &pqueue.Item{Value: msg, Priority: absTs}
err := c.pushDeferredMessage(item)
if err != nil {
Expand Down Expand Up @@ -531,7 +531,7 @@ func (c *Channel) addToDeferredPQ(item *pqueue.Item) {
c.deferredMutex.Unlock()
}

func (c *Channel) processDeferredQueue(t int64) bool {
func (c *Channel) processDeferredQueue(t time.Time) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

Expand Down Expand Up @@ -562,7 +562,7 @@ exit:
return dirty
}

func (c *Channel) processInFlightQueue(t int64) bool {
func (c *Channel) processInFlightQueue(t time.Time) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

Expand Down
44 changes: 6 additions & 38 deletions nsqd/guid.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package nsqd
import (
"encoding/hex"
"errors"
"math/rand"
"sync"
"time"
)
Expand All @@ -36,55 +37,22 @@ type guid int64
type guidFactory struct {
sync.Mutex

nodeID int64
sequence int64
lastTimestamp int64
lastID guid
nodeID int64
randng *rand.Rand
}

func NewGUIDFactory(nodeID int64) *guidFactory {
return &guidFactory{
nodeID: nodeID,
randng: rand.New(rand.NewSource(time.Now().UnixNano() ^ nodeID)),
}
}

func (f *guidFactory) NewGUID() (guid, error) {
f.Lock()

// divide by 1048576, giving pseudo-milliseconds
ts := time.Now().UnixNano() >> 20

if ts < f.lastTimestamp {
f.Unlock()
return 0, ErrTimeBackwards
}

if f.lastTimestamp == ts {
f.sequence = (f.sequence + 1) & sequenceMask
if f.sequence == 0 {
f.Unlock()
return 0, ErrSequenceExpired
}
} else {
f.sequence = 0
}

f.lastTimestamp = ts

id := guid(((ts - twepoch) << timestampShift) |
(f.nodeID << nodeIDShift) |
f.sequence)

if id <= f.lastID {
f.Unlock()
return 0, ErrIDBackwards
}

f.lastID = id

id := f.randng.Int63()
f.Unlock()

return id, nil
return guid(id), nil
}

func (g guid) Hex() MessageID {
Expand Down
14 changes: 8 additions & 6 deletions nsqd/in_flight_pqueue.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package nsqd

import "time"

type inFlightPqueue []*Message

func newInFlightPqueue(capacity int) inFlightPqueue {
Expand Down Expand Up @@ -55,14 +57,14 @@ func (pq *inFlightPqueue) Remove(i int) *Message {
return x
}

func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
func (pq *inFlightPqueue) PeekAndShift(max time.Time) (*Message, time.Duration) {
if len(*pq) == 0 {
return nil, 0
}

x := (*pq)[0]
if x.pri > max {
return nil, x.pri - max
if x.pri.After(max) {
return nil, x.pri.Sub(max)
}
pq.Pop()

Expand All @@ -72,7 +74,7 @@ func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) {
func (pq *inFlightPqueue) up(j int) {
for {
i := (j - 1) / 2 // parent
if i == j || (*pq)[j].pri >= (*pq)[i].pri {
if i == j || (*pq)[j].pri.After((*pq)[i].pri) {
break
}
pq.Swap(i, j)
Expand All @@ -87,10 +89,10 @@ func (pq *inFlightPqueue) down(i, n int) {
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && (*pq)[j1].pri >= (*pq)[j2].pri {
if j2 := j1 + 1; j2 < n && (*pq)[j1].pri.After((*pq)[j2].pri) {
j = j2 // = 2*i + 2 // right child
}
if (*pq)[j].pri >= (*pq)[i].pri {
if (*pq)[j].pri.After((*pq)[i].pri) {
break
}
pq.Swap(i, j)
Expand Down
15 changes: 8 additions & 7 deletions nsqd/in_flight_pqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"sort"
"testing"
"time"

"github.com/nsqio/nsq/internal/test"
)
Expand All @@ -14,7 +15,7 @@ func TestPriorityQueue(t *testing.T) {
pq := newInFlightPqueue(c)

for i := 0; i < c+1; i++ {
pq.Push(&Message{clientID: int64(i), pri: int64(i)})
pq.Push(&Message{clientID: int64(i), pri: time.Unix(int64(i), 0)})
}
test.Equal(t, c+1, len(pq))
test.Equal(t, c*2, cap(pq))
Expand All @@ -34,16 +35,16 @@ func TestUnsortedInsert(t *testing.T) {
for i := 0; i < c; i++ {
v := rand.Int()
ints = append(ints, v)
pq.Push(&Message{pri: int64(v)})
pq.Push(&Message{pri: time.Unix(int64(v), 0)})
}
test.Equal(t, c, len(pq))
test.Equal(t, c, cap(pq))

sort.Ints(ints)

for i := 0; i < c; i++ {
msg, _ := pq.PeekAndShift(int64(ints[len(ints)-1]))
test.Equal(t, int64(ints[i]), msg.pri)
msg, _ := pq.PeekAndShift(time.Unix(int64(ints[len(ints)-1]), 0))
test.Equal(t, int64(ints[i]), msg.pri.Unix())
}
}

Expand All @@ -53,8 +54,8 @@ func TestRemove(t *testing.T) {

msgs := make(map[MessageID]*Message)
for i := 0; i < c; i++ {
m := &Message{pri: int64(rand.Intn(100000000))}
copy(m.ID[:], fmt.Sprintf("%016d", m.pri))
m := &Message{pri: time.Unix(int64(rand.Intn(100000000)), 0)}
copy(m.ID[:], fmt.Sprintf("%016d", m.pri.Unix()))
msgs[m.ID] = m
pq.Push(m)
}
Expand All @@ -75,7 +76,7 @@ func TestRemove(t *testing.T) {
lastPriority := pq.Pop().pri
for i := 0; i < (c - 10 - 1); i++ {
msg := pq.Pop()
test.Equal(t, true, lastPriority <= msg.pri)
test.Equal(t, true, lastPriority.Before(msg.pri))
lastPriority = msg.pri
}
}
10 changes: 5 additions & 5 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ type MessageID [MsgIDLength]byte
type Message struct {
ID MessageID
Body []byte
Timestamp int64
Timestamp time.Time
Attempts uint16

// for in-flight handling
deliveryTS time.Time
clientID int64
pri int64
pri time.Time
index int
deferred time.Duration
}
Expand All @@ -33,15 +33,15 @@ func NewMessage(id MessageID, body []byte) *Message {
return &Message{
ID: id,
Body: body,
Timestamp: time.Now().UnixNano(),
Timestamp: time.Now(),
}
}

func (m *Message) WriteTo(w io.Writer) (int64, error) {
var buf [10]byte
var total int64

binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp))
binary.BigEndian.PutUint64(buf[:8], uint64(m.Timestamp.UnixNano()))
binary.BigEndian.PutUint16(buf[8:10], uint16(m.Attempts))

n, err := w.Write(buf[:])
Expand Down Expand Up @@ -82,7 +82,7 @@ func decodeMessage(b []byte) (*Message, error) {
return nil, fmt.Errorf("invalid message buffer size (%d)", len(b))
}

msg.Timestamp = int64(binary.BigEndian.Uint64(b[:8]))
msg.Timestamp = time.Unix(0, int64(binary.BigEndian.Uint64(b[:8])))
msg.Attempts = binary.BigEndian.Uint16(b[8:10])
copy(msg.ID[:], b[10:10+MsgIDLength])
msg.Body = b[10+MsgIDLength:]
Expand Down
2 changes: 1 addition & 1 deletion nsqd/nsqd.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, close
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
now := time.Now()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
Expand Down
Loading