Skip to content

Commit

Permalink
Update flatcontainer test to use flatcontainers in iteration
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 12, 2024
1 parent b842b05 commit c29adf6
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions tests/bfs_flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
use rand::{Rng, SeedableRng, StdRng};
use std::fmt::Debug;
use timely::container::flatcontainer::{MirrorRegion, ReserveItems};
use timely::order::FlatProductRegion;

use std::sync::{Arc, Mutex};

use timely::Config;
use timely::container::flatcontainer::{Containerized, Push, Region};
use timely::{Config, PartialOrder};

use timely::dataflow::operators::capture::Extract;
use timely::dataflow::operators::Capture;
use timely::dataflow::*;
use timely::order::Product;

use differential_dataflow::input::Input;
use differential_dataflow::Collection;
Expand All @@ -20,15 +25,15 @@ type Node = usize;
type Edge = (Node, Node);

#[test]
fn bfs_10_20_1000() {
fn flat_bfs_10_20_1000() {
test_sizes(10, 20, 1000, Config::process(3));
}
#[test]
fn bfs_100_200_10() {
fn flat_bfs_100_200_10() {
test_sizes(100, 200, 10, Config::process(3));
}
#[test]
fn bfs_100_2000_1() {
fn flat_bfs_100_2000_1() {
test_sizes(100, 2000, 1, Config::process(3));
}

Expand Down Expand Up @@ -88,40 +93,40 @@ fn bfs_sequential(
) -> Vec<((usize, usize), usize, isize)> {
let mut nodes = 0;
for &(root, _, _) in &root_list {
nodes = ::std::cmp::max(nodes, root + 1);
nodes = std::cmp::max(nodes, root + 1);
}
for &((src, dst), _, _) in &edge_list {
nodes = ::std::cmp::max(nodes, src + 1);
nodes = ::std::cmp::max(nodes, dst + 1);
nodes = std::cmp::max(nodes, src + 1);
nodes = std::cmp::max(nodes, dst + 1);
}

let mut rounds = 0;
for &(_, time, _) in &root_list {
rounds = ::std::cmp::max(rounds, time + 1);
rounds = std::cmp::max(rounds, time + 1);
}
for &(_, time, _) in &edge_list {
rounds = ::std::cmp::max(rounds, time + 1);
rounds = std::cmp::max(rounds, time + 1);
}

let mut counts = vec![0; nodes];
let mut results = Vec::new();

for round in 0..rounds {
let mut roots = ::std::collections::HashMap::new();
let mut roots = std::collections::HashMap::new();
for &(root, time, diff) in &root_list {
if time <= round {
*roots.entry(root).or_insert(0) += diff;
}
}

let mut edges = ::std::collections::HashMap::new();
let mut edges = std::collections::HashMap::new();
for &((src, dst), time, diff) in &edge_list {
if time <= round {
*edges.entry((src, dst)).or_insert(0) += diff;
}
}

let mut dists = vec![usize::max_value(); nodes];
let mut dists = vec![usize::MAX; nodes];
for (&key, &val) in roots.iter() {
if val > 0 {
dists[key] = 0;
Expand All @@ -133,7 +138,7 @@ fn bfs_sequential(
changes = false;
for (&(src, dst), &cnt) in edges.iter() {
if cnt > 0 {
if dists[src] != usize::max_value() && dists[dst] > dists[src] + 1 {
if dists[src] != usize::MAX && dists[dst] > dists[src] + 1 {
dists[dst] = dists[src] + 1;
changes = true;
}
Expand All @@ -143,7 +148,7 @@ fn bfs_sequential(

let mut new_counts = vec![0; nodes];
for &value in dists.iter() {
if value != usize::max_value() {
if value != usize::MAX {
new_counts[value] += 1;
}
}
Expand All @@ -169,7 +174,7 @@ fn bfs_differential(
edges_list: Vec<((usize, usize), usize, isize)>,
config: Config,
) -> Vec<((usize, usize), usize, isize)> {
let (send, recv) = ::std::sync::mpsc::channel();
let (send, recv) = std::sync::mpsc::channel();
let send = Arc::new(Mutex::new(send));

timely::execute(config, move |worker| {
Expand Down Expand Up @@ -234,7 +239,16 @@ fn bfs<G: Scope>(
roots: &Collection<G, Node>,
) -> Collection<G, (Node, usize)>
where
G::Timestamp: Lattice + Ord,
G::Timestamp: Lattice + Ord + Containerized,
for<'a> G::Timestamp: PartialOrder<<<G::Timestamp as Containerized>::Region as Region>::ReadItem<'a>>,
<G::Timestamp as Containerized>::Region: Region<Owned=G::Timestamp> + Push<G::Timestamp>,
for<'a> <Product<G::Timestamp, u64> as Containerized>::Region: Region<Owned=Product<G::Timestamp, u64>> + Push<<<Product<G::Timestamp, u64> as Containerized>::Region as Region>::ReadItem<'a>>,
<G::Timestamp as Containerized>::Region: Clone + Ord,
for<'a> FlatProductRegion<<G::Timestamp as Containerized>::Region, MirrorRegion<u64>>: Push<&'a Product<G::Timestamp, u64>>,
for<'a> <FlatProductRegion<<G::Timestamp as Containerized>::Region, MirrorRegion<u64>> as Region>::ReadItem<'a>: Copy + Ord + Debug,
Product<G::Timestamp, u64>: for<'a> PartialOrder<<<Product<G::Timestamp, u64> as Containerized>::Region as Region>::ReadItem<'a>>,
for<'a> <<Product<G::Timestamp, u64> as Containerized>::Region as Region>::ReadItem<'a>: PartialOrder<Product<G::Timestamp, u64>>,
for<'a> <Product<G::Timestamp, u64> as Containerized>::Region: ReserveItems<<<Product<G::Timestamp, u64> as Containerized>::Region as Region>::ReadItem<'a>>,
{
// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));
Expand All @@ -244,8 +258,11 @@ where
let edges = edges.enter(&inner.scope());
let nodes = nodes.enter(&inner.scope());

inner
.join_map(&edges, |_k, l, d| (*d, l + 1))
type Spine<K, V, T, R = isize> = FlatValSpine<K, V, T, R, Vec<((K, V), T, R)>>;
let arranged1 = inner.arrange::<Spine<Node, Node, Product<G::Timestamp, _>>>();
let arranged2 = edges.arrange::<Spine<Node, Node, Product<G::Timestamp, _>>>();
arranged1
.join_core(&arranged2, move |_k, l, d| Some((d, l + 1)))
.concat(&nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
})
Expand Down

0 comments on commit c29adf6

Please sign in to comment.