-
Notifications
You must be signed in to change notification settings - Fork 1
/
ChunkedQueue2.hpp
119 lines (97 loc) · 3.66 KB
/
ChunkedQueue2.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#pragma once
#include <atomic>
#include <array>
#include <cstddef>
#include <new>
#include <type_traits>
template<typename T, std::size_t SIZE>
struct ChunkedQueue2 {
using value_type = T;
static const auto align = size_t(64);
static const auto chunk_size_bytes = (size_t(1) << 15) - (3 * align);
static const auto chunk_max_size = chunk_size_bytes / sizeof(value_type);
static const auto chunk_count =
((SIZE + chunk_max_size - 1) / chunk_max_size);
static const auto chunk_size =
(SIZE / chunk_count) + (((SIZE % chunk_count) == 0) ? 0 : 1);
struct alignas(align) chunk {
chunk() :
tail(),
head() {}
alignas(align) std::atomic_size_t tail;
alignas(align) std::atomic_size_t head;
alignas(align) chunk* next;
alignas(align) std::array<std::byte, sizeof(T) * chunk_size> buffer;
};
alignas(align) std::atomic<chunk*> tail_chunk;
alignas(align) std::atomic<chunk*> head_chunk;
alignas(align) std::array<chunk, chunk_count> chunks;
ChunkedQueue2() :
chunks{},
head_chunk(chunks.data()),
tail_chunk(chunks.data())
{
for (size_t i = 0; i < chunk_count - 1; i += 1) {
chunks[i].next = &chunks[i + 1];
}
chunks[chunk_count - 1].next = chunks.data();
}
template<typename... Args>
int Enqueue(Args&&... args) {
chunk* tc = tail_chunk.load(std::memory_order_relaxed);
size_t t = tc->tail.load(std::memory_order_relaxed);
size_t n = (t + 1);
if (n == chunk_size) n = 0;
size_t h = tc->head.load(std::memory_order_acquire);
if (n == h) {
tc = tc->next;
if (tc == head_chunk.load(std::memory_order_acquire))
return 0;
t = tc->tail.load(std::memory_order_relaxed);
n = (t + 1);
if (n == chunk_size) n = 0;
new(tc->buffer.data() + t * sizeof(T))
T(std::forward<Args>(args)...);
tc->tail.store(n, std::memory_order_release);
tail_chunk.store(tc, std::memory_order_release);
return 1;
}
new(tc->buffer.data() + t * sizeof(T)) T(std::forward<Args>(args)...);
tc->tail.store(n, std::memory_order_release);
return 1;
}
template<typename Callable>
int Dequeue(Callable&& f) {
chunk* hc = head_chunk.load(std::memory_order_relaxed);
// this line needs to stay right here, before we load the latest
// value of hc->tail
chunk* tc = tail_chunk.load(std::memory_order_acquire);
auto h = hc->head.load(std::memory_order_relaxed);
auto t = hc->tail.load(std::memory_order_acquire);
if (t == h) {
if (hc == tc)
return 0;
hc = hc->next;
h = hc->head.load(std::memory_order_relaxed);
t = hc->tail.load(std::memory_order_acquire);
if (t == h) return 0;
T* elem = std::launder(reinterpret_cast<T*>(
hc->buffer.data() + h * sizeof(T)));
std::invoke(std::forward<Callable>(f), std::move(*elem));
elem->~T();
h += 1;
if (h == chunk_size) h = 0;
hc->head.store(h, std::memory_order_release);
head_chunk.store(hc, std::memory_order_release);
return 1;
}
T* elem = std::launder(reinterpret_cast<T*>(
hc->buffer.data() + h * sizeof(T)));
std::invoke(std::forward<Callable>(f), std::move(*elem));
elem->~T();
h += 1;
if (h == chunk_size) h = 0;
hc->head.store(h, std::memory_order_release);
return 1;
}
};