Skip to content

Commit

Permalink
refactor: refactor events stream queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Dec 12, 2024
1 parent f767b40 commit 1a93c67
Showing 1 changed file with 59 additions and 47 deletions.
106 changes: 59 additions & 47 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
type Queue struct {
head unsafe.Pointer // pointer to the head of the queue
tail unsafe.Pointer // pointer to the tail of the queue
len uint64 // length of the queue
len int64 // length of the queue
pool sync.Pool
}

Expand All @@ -51,6 +51,7 @@ func NewQueue() *Queue {
return &Queue{
head: unsafe.Pointer(dummy), // both head and tail point to the dummy node
tail: unsafe.Pointer(dummy),
len: 0,
pool: sync.Pool{
New: func() interface{} {
return &item{}
Expand All @@ -61,25 +62,31 @@ func NewQueue() *Queue {

// Enqueue adds a value to the tail of the queue.
func (q *Queue) Enqueue(v interface{}) {
xitem := q.pool.Get().(*item)
xitem.next = nil
xitem.v = v
// Get a node from the pool
newNode := q.getItem()
newNode.v = v
newNodePtr := unsafe.Pointer(newNode)

for {
last := loadItem(&q.tail) // Load current tail
lastNext := loadItem(&last.next) // Load the next pointer of tail
if last == loadItem(&q.tail) { // Check tail consistency
if lastNext == nil { // Is tail really pointing to the last node?
// Try to link the new item at the end
if casItem(&last.next, nil, xitem) {
// Enqueue successful, now try to move the tail pointer
casItem(&q.tail, last, xitem)
atomic.AddUint64(&q.len, 1)
return
}
} else {
// Tail was pointing to an intermediate node, help move it forward
casItem(&q.tail, last, lastNext)
}
tail := (*item)(atomic.LoadPointer(&q.tail))
next := atomic.LoadPointer(&tail.next)

// Another thread might have already enqueued a node
if next != nil {
// Try to help advance the tail
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), next)
continue
}

// Try to link the new node
if atomic.CompareAndSwapPointer(&tail.next, nil, newNodePtr) {
// Successfully linked, now try to advance tail
atomic.CompareAndSwapPointer(&q.tail, unsafe.Pointer(tail), newNodePtr)

// Increment length atomically
atomic.AddInt64(&q.len, 1)

return
}
}
}
Expand All @@ -88,46 +95,51 @@ func (q *Queue) Enqueue(v interface{}) {
// It returns nil if the queue is empty.
func (q *Queue) Dequeue() interface{} {
for {
head := loadItem(&q.head) // Load the current head
tail := loadItem(&q.tail) // Load the current tail
next := loadItem(&head.next) // Load the next node after head
if head == loadItem(&q.head) { // Check head consistency
if head == tail { // Is the queue empty?
if next == nil { // Confirm that queue is empty
return nil
}
// Tail is lagging behind, move it forward
casItem(&q.tail, tail, next)
} else {
// Get the value before CAS to avoid freeing the node too early
v := next.v
// Try to swing the head to the next node
if casItem(&q.head, head, next) {
atomic.AddUint64(&q.len, ^uint64(0)) // decrement length
q.pool.Put(next)
return v // return the dequeued value
}
}
head := (*item)(atomic.LoadPointer(&q.head))
next := atomic.LoadPointer(&head.next)

// Queue is empty
if next == nil {
return nil
}

nextNode := (*item)(next)

// Try to advance the head
if atomic.CompareAndSwapPointer(&q.head, unsafe.Pointer(head), next) {
// Get the value before potentially releasing the node
value := nextNode.v

// Release the old head node back to the pool
q.releaseItem(head)

// Decrement length atomically
atomic.AddInt64(&q.len, -1)

return value
}
}
}

// Length returns the number of items in the queue.
func (q *Queue) Length() uint64 {
return atomic.LoadUint64(&q.len)
return uint64(atomic.LoadInt64(&q.len))
}

// IsEmpty returns true when the queue is empty
func (q *Queue) IsEmpty() bool {
return atomic.LoadUint64(&q.len) == 0
return atomic.LoadInt64(&q.len) == 0
}

// loadItem atomically loads an item pointer from the given unsafe pointer.
func loadItem(p *unsafe.Pointer) *item {
return (*item)(atomic.LoadPointer(p))
// getItem retrieves a node from the pool or creates a new one
func (q *Queue) getItem() *item {
return q.pool.Get().(*item)
}

// casItem performs an atomic compare-and-swap on an unsafe pointer.
func casItem(p *unsafe.Pointer, old, new *item) bool {
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
// releaseItem returns a node to the pool for reuse
func (q *Queue) releaseItem(i *item) {
// Reset i to prevent memory leaks
i.v = nil
i.next = nil
q.pool.Put(i)
}

0 comments on commit 1a93c67

Please sign in to comment.