Skip to content

Commit

Permalink
update iterator consumers to properly trigger task counts so wait all…
Browse files Browse the repository at this point in the history
… works correctly
  • Loading branch information
rdfriese committed Jul 29, 2024
1 parent b680d55 commit 28cf0f9
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
9 changes: 8 additions & 1 deletion examples/array_examples/array_consumer_schedules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,22 @@ fn sum_with_schedule(

fn main() {
let world = lamellar::LamellarWorldBuilder::new().build();
println!("world created");
let _my_pe = world.my_pe();
let _num_pes = world.num_pes();
let block_array = AtomicArray::<usize>::new(world.team(), ARRAY_LEN, Distribution::Block);
println!("array created");
block_array.print();
let _ = block_array
.dist_iter_mut()
.enumerate()
.for_each(move |(i, e)| e.store(i))
.for_each(move |(i, e)| {
println!("setting {i} to {i}");
e.store(i)
})
.spawn();
world.wait_all();
println!("Done");
block_array.print();

let thread_cnts: Arc<Mutex<HashMap<ThreadId, usize>>> = Arc::new(Mutex::new(HashMap::new()));
Expand Down
27 changes: 20 additions & 7 deletions src/array/unsafe/iteration/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use core::marker::PhantomData;
use futures_util::Future;
use paste::paste;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;

impl<T> InnerArray for UnsafeArray<T> {
Expand Down Expand Up @@ -48,15 +49,27 @@ macro_rules! consumer_impl {
$($bounds)+
{
let am = $($am)*;
// set req counters so that wait all works
self.data.team.team_counters.add_send_req(1);
self.data.team.world_counters.add_send_req(1);
self.data.task_group.counters.add_send_req(1);

let barrier = self.barrier_handle();
let inner = self.clone();
let reqs_future = Box::pin(async move{match sched {
Schedule::Static => inner.sched_static(am),
Schedule::Dynamic => inner.sched_dynamic(am),
Schedule::Chunk(size) => inner.sched_chunk(am,size),
Schedule::Guided => inner.sched_guided(am),
Schedule::WorkStealing => inner.sched_work_stealing(am),
}});
let reqs_future = Box::pin(async move{
let reqs = match sched {
Schedule::Static => inner.sched_static(am),
Schedule::Dynamic => inner.sched_dynamic(am),
Schedule::Chunk(size) => inner.sched_chunk(am,size),
Schedule::Guided => inner.sched_guided(am),
Schedule::WorkStealing => inner.sched_work_stealing(am),
};
// remove req counters after individual ams have been launched.
inner.data.team.team_counters.outstanding_reqs.fetch_sub(1,Ordering::SeqCst);
inner.data.team.world_counters.outstanding_reqs.fetch_sub(1,Ordering::SeqCst);
inner.data.task_group.counters.outstanding_reqs.fetch_sub(1,Ordering::SeqCst);
reqs
});
$return_type::new(barrier,reqs_future,self)
}

Expand Down
6 changes: 5 additions & 1 deletion src/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ impl Barrier {
// but for the case of Darcs, or any case where the barrier is being called in a worker thread
// we actually want to be able to process other tasks while the barrier is active
pub(crate) fn tasking_barrier(&self) {
// println!("calling tasking barrier");
self.barrier_internal(|| {
self.scheduler.exec_task();
});
Expand All @@ -314,10 +315,13 @@ impl Barrier {
n: self.n,
state: State::RoundInit(self.num_rounds),
};
// println!("in barrier handle");
// self.print_bar();
if self.panic.load(Ordering::SeqCst) == 0 {
if let Some(_) = &self.send_buf {
if let Ok(my_index) = self.arch.team_pe(self.my_pe) {
let barrier_id = self.barrier_cnt.fetch_add(1, Ordering::SeqCst);
// println!("barrier id: {:?}", barrier_id);
handle.barrier_id = barrier_id;
handle.my_index = my_index;
handle.state = State::RoundInit(0);
Expand Down Expand Up @@ -459,7 +463,7 @@ impl BarrierHandle {
- (i as isize * (self.n as isize + 1).pow(round as u32) as isize))
as isize)
.rem_euclid(self.num_pes as isize) as isize;
let recv_pe = self.arch.single_iter(team_recv_pe as usize).next().unwrap();
// let recv_pe = self.arch.single_iter(team_recv_pe as usize).next().unwrap();
if team_recv_pe as usize != self.my_index {
unsafe {
//safe as each pe is only capable of writing to its own index
Expand Down
2 changes: 1 addition & 1 deletion src/darc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use std::sync::Arc;

// //use tracing::*;

use crate::active_messaging::{AMCounters, AmHandle, RemotePtr};
use crate::active_messaging::{AMCounters, RemotePtr};
use crate::barrier::Barrier;
use crate::env_var::config;
use crate::lamellae::{AllocationType, Backend, LamellaeComm, LamellaeRDMA};
Expand Down
3 changes: 1 addition & 2 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,12 @@ impl Scheduler {
while self.panic.load(Ordering::SeqCst) == 0
&& self.num_tasks.load(Ordering::Relaxed) > 3
&& self.num_ams.load(Ordering::Relaxed) > 0
{}
//TODO maybe this should be > 2
{
//the Lamellae Comm Task, Lamellae Alloc Task, Lamellar Error Task
if timer.elapsed().as_secs_f64() > config().deadlock_timeout {
println!(
"shurtdown timeout, tasks remaining: {:?} panic: {:?}",
"shutdown timeout, tasks remaining: {:?} panic: {:?}",
self.num_tasks.load(Ordering::Relaxed),
self.panic.load(Ordering::SeqCst),
);
Expand Down

0 comments on commit 28cf0f9

Please sign in to comment.