Skip to content

Latest commit

 

History

History
61 lines (57 loc) · 2 KB

concurrent_fork_join.md

File metadata and controls

61 lines (57 loc) · 2 KB

Concurrent Fork Join

fn sequential() {
    let (sender, receiver) = kanal::unbounded_async();
    tokio::spawn(async move {
        for i in 0..count {
            sender.send(i).await.unwrap();
        }
    });
    let count = 100;

    let tasks = FuturesUnordered::new();
    for i in 0..count {
        let recv = receiver.clone();
        tasks.push(async move {
            let x = recv.recv().await.unwrap();
            // simulate computation
            std::thread::sleep(std::time::Duration::from_millis(1));
            println!("{}", x);
        });
    }
    tasks.collect::<Vec<_>>().await;
    // this will run about 105 ms in 32 cpus machine
}

fn parallel(){
    let (sender, receiver) = kanal::unbounded_async();

    tokio::spawn(async move {
        for i in 0..count {
            sender.send(i).await.unwrap();
        }
    });
    
    let mut tasks = Vec::new();
    // 64 = concurrency but limited by logical cpu count (32 in my machine)
    for _ in 0..64 {
        let receiver = receiver.clone();
        // spawn to let work stealing do its job
        let task = tokio::spawn(async move {
            // 4 allow 1 job to buffer 4 items
            // 1 = init size
            ConcurrentForkJoinTask::new(receiver, 4, 1, |x: i32| async move {
                // simulate computation
                std::thread::sleep(std::time::Duration::from_millis(1));
                println!("{}", x);
            }).collect::<Vec<_>>().await;
            ()
        });
        tasks.push(task);
    }
    // this only wait for task in other thread to finish
    futures_util::stream::iter(tasks)
        .for_each_concurrent(None, |it| async move { it.await.unwrap(); })
        .await;
    // this will run about 7.5ms in 32 cpus machine (not yield 32x probably cause by numad pin the process in 1ccd)
}
  • ConcurrentForkJoinTask allow async code when compare to rayon
  • ConcurrentForkJoinTask rely on mpmc channel to distribute work
  • ConcurrentForkJoinTask work well with execution that heavy computation