Skip to content

Commit

Permalink
improved broker queue implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kenzel committed Nov 24, 2023
1 parent 318237e commit 5498f6a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 63 deletions.
157 changes: 96 additions & 61 deletions src/queues/BQ/broker_queue.art
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ enum bwd_alloc_result {
Failed
}

struct BWDAllocator {
struct BQAllocator {
alloc: fn(i64) -> bwd_alloc_result,
release: fn(Buffer) -> ()
}

fn @createBrokerWorkDistributorQueue_internal[T](queue_size: i32, allocator: BWDAllocator, reference_impl: bool) -> create_queue_result[T] {
fn @createBrokerQueue_internal[T](queue_size: i32, allocator: BQAllocator, bwd: bool, reference_impl: bool) -> create_queue_result[T] {
if queue_size < 0 {
return(create_queue_result[T]::Err("invalid queue size"))
}
Expand Down Expand Up @@ -49,132 +49,161 @@ fn @createBrokerWorkDistributorQueue_internal[T](queue_size: i32, allocator: BWD
let buffer = &mut queue_device_memory(buffer_offset) as &mut addrspace(1) [T];
let queue = &mut queue_device_memory(queue_offset) as &mut addrspace(1) BWD::Queue;

// fn @atomic_head_tail(thread: thread_context) {
// let ht = thread.atomic_load_global_u64(&mut queue.head as &mut addrspace(1) u64, memory_order::relaxed);
// ((ht & 0xFFFFFFFF) as u32, (ht >> 32) as u32)
// }

fn @wait_for_ticket(i: u32, number: u32, thread: thread_context) -> () {
let load_ticket = if reference_impl {
@|| thread.atomic_load_global_u32_coalesced(tickets(i), memory_order::relaxed)
@|| thread.atomic_load_global_u32(&mut tickets(i), memory_order::relaxed)
}
else {
@|| thread.atomic_load_global_u32(tickets(i), memory_order::relaxed)
// @|| thread.atomic_xor_global_u32(&mut tickets(i), 0, memory_order::relaxed)
@|| thread.atomic_load_global_u32(&mut tickets(i), memory_order::relaxed)
};

thread.wait(@|| load_ticket() == number, "BWD waiting for ticket");
thread.wait(@|| load_ticket() == number, "BWD waiting for ticket");
}

fn @ensure_dequeue(thread: thread_context) -> bool {
let load_size = if reference_impl {
@|| queue.size
let mut num = if reference_impl {
thread.atomic_load_global_i32(&mut queue.size, memory_order::relaxed)
}
else {
@|| thread.atomic_load_global_i32(queue.size, memory_order::relaxed)
thread.atomic_xor_global_i32(&mut queue.size, 0, memory_order::relaxed)
};

let mut num = load_size();

let mut ensurance = false;

while !ensurance && num > 0 {
if thread.atomic_sub_global_i32(queue.size, 1, memory_order::relaxed) > 0 {
ensurance = true;
while true {
if num <= 0 {
return(false)
}
else {
num = thread.atomic_add_global_i32(queue.size, 1, memory_order::relaxed) + 1;

if thread.atomic_sub_global_i32(&mut queue.size, 1, memory_order::relaxed) > 0 {
break()
}

num = thread.atomic_add_global_i32(&mut queue.size, 1, memory_order::relaxed) + 1;
}

ensurance
true
}

fn @ensure_enqueue(thread: thread_context) -> bool {
let load_size = if reference_impl {
@|| queue.size
let mut num = if reference_impl {
thread.atomic_load_global_i32(&mut queue.size, memory_order::relaxed)
}
else {
@|| thread.atomic_load_global_i32(queue.size, memory_order::relaxed)
thread.atomic_xor_global_i32(&mut queue.size, 0, memory_order::relaxed)
};

let mut num = load_size();

let mut ensurance = false;

while !ensurance && num < queue_size {
if thread.atomic_add_global_i32(queue.size, 1, memory_order::relaxed) < queue_size {
ensurance = true;
while true {
if num >= queue_size {
return(false)
}
else {
num = thread.atomic_sub_global_i32(queue.size, 1, memory_order::relaxed) - 1;

if thread.atomic_add_global_i32(&mut queue.size, 1, memory_order::relaxed) < queue_size {
break()
}

num = thread.atomic_sub_global_i32(&mut queue.size, 1, memory_order::relaxed) - 1;
}

ensurance
true
}

fn @read_data(sink: fn(T) -> (), thread: thread_context) -> () {
let store_ticket = if reference_impl {
@|p:u32, value:u32| { thread.atomic_store_global_u32_coalesced(tickets(p), value, memory_order::relaxed); }
@|p:u32, value:u32| { thread.atomic_store_global_u32(&mut tickets(p), value, memory_order::release); }
}
else {
@|p:u32, value:u32| { tickets(p) = value; }
// @|p:u32, value:u32| { thread.atomic_exch_global_u32(&mut tickets(p), value, memory_order::release); }
@|p:u32, value:u32| { thread.atomic_store_global_u32(&mut tickets(p), value, memory_order::release); }
};

let pos = thread.atomic_add_global_u32(queue.head, 1, memory_order::relaxed);
let pos = thread.atomic_add_global_u32(&mut queue.head, 1, memory_order::relaxed);
let p = pos % queue_size as u32;

wait_for_ticket(p, 2 * (pos / queue_size as u32) + 1, thread);
thread.memory_barrier(memory_order::acquire);
let val = buffer(p);
thread.memory_barrier(memory_order::acq_rel);
store_ticket(p, 2 * ((pos + queue_size as u32) / queue_size as u32));

sink(val);
}

fn @put_data(source: fn() -> T, thread: thread_context) -> () {
let store_ticket = if reference_impl {
@|p:u32, value:u32| { thread.atomic_store_global_u32_coalesced(tickets(p), value, memory_order::relaxed); }
@|p:u32, value:u32| { thread.atomic_store_global_u32(&mut tickets(p), value, memory_order::release); }
}
else {
@|p:u32, value:u32| { tickets(p) = value; }
// @|p:u32, value:u32| { thread.atomic_exch_global_u32(&mut tickets(p), value, memory_order::release); }
@|p:u32, value:u32| { thread.atomic_store_global_u32(&mut tickets(p), value, memory_order::release); }
};

let pos = thread.atomic_add_global_u32(queue.tail, 1, memory_order::relaxed);
let p = pos % queue_size as u32;
let b = 2 * (pos / queue_size as u32);

let val = source();

wait_for_ticket(p, b, thread);
let pos = thread.atomic_add_global_u32(&mut queue.tail, 1, memory_order::relaxed);
let p = pos % queue_size as u32;
wait_for_ticket(p, 2 * (pos / queue_size as u32), thread);
buffer(p) = val;
thread.memory_barrier(memory_order::release);
store_ticket(p, b + 1);
store_ticket(p, 2 * (pos / queue_size as u32) + 1);
}

create_queue_result[T]::Ok(ProducerConsumerQueue[T] {
push = @|source| @|thread| {
if ensure_enqueue(thread) {
put_data(source, thread);
1
if bwd {
if ensure_enqueue(thread) {
put_data(source, thread);
1
}
else {
0
}
}
else {
0
// while !ensure_enqueue() {
// let (head, tail) = atomic_head_tail(thread);
// if N <= tail - head < N + MaxThreads/2 {
// return(0)
// }
// }

// put_data(source, thread);
// 1
}
},

pop = @|sink| @|thread| {
if ensure_dequeue(thread) {
read_data(sink, thread);
1
if bwd {
if ensure_dequeue(thread) {
read_data(sink, thread);
1
}
else {
0
}
}
else {
0
// while !ensure_enqueue() {
// let (head, tail) = atomic_head_tail(thread);
// if N + MaxThreads/2 <= tail - head - 1 {
// return(0)
// }
// }

// read_data(sink, thread);
// 1
}
},

size = @|thread| {
if reference_impl {
queue.size
thread.atomic_load_global_i32(&mut queue.size, memory_order::relaxed)
}
else {
thread.atomic_load_global_i32(queue.size, memory_order::relaxed)
thread.atomic_load_global_i32(&mut queue.size, memory_order::relaxed)
}
},

Expand All @@ -201,20 +230,26 @@ fn @createBrokerWorkDistributorQueue_internal[T](queue_size: i32, allocator: BWD
})
}

fn @bwd_dynamic_alloc(device: AccDevice) = BWDAllocator {
fn @bwd_dynamic_alloc(device: AccDevice) = BQAllocator {
alloc = @|size: i64| bwd_alloc_result::Ok(device.alloc(size)),
release = @|buffer| release(buffer)
};

fn @createBrokerWorkDistributorQueue[T](device: AccDevice, queue_size: i32) {
createBrokerWorkDistributorQueue_internal[T](queue_size, bwd_dynamic_alloc(device), false)
createBrokerQueue_internal[T](queue_size, bwd_dynamic_alloc(device), true, false)
}

static mut bwd_static_queue_buffer: [u8 * 268435456];
fn @createBrokerWorkDistributorQueueOrig[T](device: AccDevice, queue_size: i32) {
createBrokerQueue_internal[T](queue_size, bwd_dynamic_alloc(device), true, true)
}

fn @bwd_static_alloc(device: AccDevice) = BWDAllocator {
static mut bwd_static_queue_buffer: [u8 * 536870912];

fn @bwd_static_alloc(device: AccDevice) = BQAllocator {
alloc = @|size: i64| {
if size <= 268435456 {
if size <= 536870912 && (device.platform_device & 0xF) != 0 {
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
// cannot access static buffer from non-host device
bwd_alloc_result::Ok(Buffer {
data = &mut bwd_static_queue_buffer as &mut [i8],
size = size,
Expand All @@ -229,9 +264,9 @@ fn @bwd_static_alloc(device: AccDevice) = BWDAllocator {
};

fn @createBrokerWorkDistributorQueueStatic[T](device: AccDevice, queue_size: i32) {
createBrokerWorkDistributorQueue_internal[T](queue_size, bwd_static_alloc(device), false)
createBrokerQueue_internal[T](queue_size, bwd_static_alloc(device), true, false)
}

fn @createBrokerWorkDistributorQueueOrig[T](device: AccDevice, queue_size: i32) {
createBrokerWorkDistributorQueue_internal[T](queue_size, bwd_static_alloc(device), true)
fn @createBrokerWorkDistributorQueueOrigStatic[T](device: AccDevice, queue_size: i32) {
createBrokerQueue_internal[T](queue_size, bwd_static_alloc(device), true, true)
}
6 changes: 4 additions & 2 deletions src/queues/BQ/queue.cmake
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
set(BrokerWorkDistributorQueue_short_name BWD)
set(BrokerWorkDistributorQueueStatic_short_name BWD_static)
set(BrokerWorkDistributorQueueOrig_short_name BWD_orig)
set(BrokerWorkDistributorQueueOrigCUDA_short_name BWD_cuda_orig)
set(BrokerWorkDistributorQueueOrigStatic_short_name BWD_orig_static)
set(BrokerWorkDistributorQueueOrigCUDA_short_name BWD_orig_cuda)
set(BrokerWorkDistributorQueueCUDA_short_name BWD_cuda)
set(BrokerWorkDistributorQueueCUDAIndirect_short_name BWD_cuda_indirect)

set(BrokerWorkDistributorQueue_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue.art)
set(BrokerWorkDistributorQueueStatic_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue.art)
set(BrokerWorkDistributorQueueOrig_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue.art)
set(BrokerWorkDistributorQueueOrigStatic_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue.art)
set(BrokerWorkDistributorQueueOrigCUDA_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue_cuda.art)
set(BrokerWorkDistributorQueueCUDA_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue_cuda.art)
set(BrokerWorkDistributorQueueCUDAIndirect_sources ${CMAKE_CURRENT_LIST_DIR}/broker_queue_cuda_indirect.art)
Expand All @@ -27,7 +29,7 @@ function (BrokerWorkDistributorQueueCUDA_configure_target target patch_includes)
get_target_property(_bin_dir ${target} ANYDSL_BINARY_DIR)
get_target_property(_name ${target} NAME)

set(cuda_src "${_bin_dir}/${_name}")
set(cuda_src "${_bin_dir}/$<CONFIG>/${_name}")

add_custom_command(
OUTPUT ${cuda_src}.ll
Expand Down

0 comments on commit 5498f6a

Please sign in to comment.