diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6121e72 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "rust-spp" +version = "0.1.0" +authors = ["Ricardo Pieper "] +edition = "2018" + +[dependencies] +rand = "0.6.5" +lazy_static = "1.3.0" +raster = "0.2.0" +clap = "2.33.0" +num_cpus = "1.0" +rayon = "1.0.3" +time = "0.1.42" +tokio = "0.1.19" +futures = "0.1" +tokio-core = "0.1.17" +parking_lot = "*" +[dev-dependencies] +criterion = "0.2" + +[[bench]] +name = "mandelbrot_rustspp" +harness = false + +[[bench]] +name = "mandelbrot_pixbypix" +harness = false diff --git a/README.md b/README.md index 9d5a774..02a1538 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,42 @@ -# rust-ssp -Structured Stream Parallelism for Rust +# Rust SSP # + +Structured Stream Parallelism for Rust + +Define a pipeline with N steps. Pipelines can be normal pipelines or "farm pipelines" (in which some steps are parallel). +You can also define pipelines with mutable state. + + fn pipelined() { + let pipeline = pipeline![ + pipeline, + parallel!(LoadImage, 40), + parallel!(ApplyMoreSaturation, 2), + parallel!(ApplyEmboss, 2), + parallel!(ApplyGamma, 2), + parallel!(ApplySharpen, 2), + parallel!(ApplyGrayscale, 2), + parallel!(SaveImageAndGetResult, 40), + sequential!(PrintResult)]; + + let dir_entries = std::fs::read_dir("/Users/user/Desktop/imagens"); + + for entry in dir_entries.unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + + if path.extension().is_none() { continue; } + + println!("Posting {:?}", path.to_str().unwrap()); + + pipeline.post(path).unwrap(); + + } + + pipeline.end_and_wait(); + + println!("Finished."); + } + + +# How to Cite our Work + +Ricardo Pieper, Dalvan Griebler, and Luiz Gustavo Fernandes. 2019. **Structured Stream Parallelism for Rust.** In Proceedings of the XXIII Brazilian Symposium on Programming Languages (SBLP 2019). ACM, New York, NY, USA, 54-61. DOI: https://doi.org/10.1145/3355378.3355384 \ No newline at end of file diff --git a/benches/mandelbrot_pixbypix.rs b/benches/mandelbrot_pixbypix.rs new file mode 100644 index 0000000..96af988 --- /dev/null +++ b/benches/mandelbrot_pixbypix.rs @@ -0,0 +1,216 @@ + +#[macro_use] +extern crate criterion; + +use criterion::Criterion; +use criterion::ParameterizedBenchmark; +use criterion::Benchmark; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; +use rust_spp::*; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use raster::Color; + +struct SquareImage { + size: i32, + cur_x: i32, + cur_y: i32 +} + +impl Iterator for SquareImage { + type Item = (i32, i32); + fn next(&mut self) -> Option<(i32, i32)> { + let ret = (self.cur_x, self.cur_y); + + self.cur_y = self.cur_y + 1; + + if (self.cur_y == self.size) { + self.cur_x = self.cur_x + 1; + self.cur_y = 0; + } + if (self.cur_x == self.size) { + return None; + } + else { + return Some(ret); + } + } +} + +struct Parameters { + init_a: f64, + init_b: f64, + step: f64 +} + +fn calculate_pixel(x: i32, y: i32, step: f64, init_a: f64, init_b: f64) -> i32 { + let iterations = 10000; + + let im = init_b + (step * (y as f64)); + let mut a = init_a + (step * (x as f64)); + let cr = a; + + let mut b = im; + let mut k = 0; + + for i in 0.. iterations { + let a2 = a * a; + let b2 = b * b; + if (a2 + b2) > 4.0 { break; } + b = 2.0 * a * b + im; + a = a2 - b2 + cr; + k = i; + } + return k; +} + +struct CalculatePixelIterations { + params: Parameters +} +impl InOut<(i32, i32), (i32, i32, i32)> for CalculatePixelIterations { + fn process(&mut self, position: (i32, i32)) -> (i32, i32, i32) { + match position { + (x, y) => { + (x, y, calculate_pixel(x, y, self.params.step, self.params.init_a, self.params.init_b)) + } + } + } +} + +struct Renderer { +} +impl In<(i32, i32, i32), (usize, usize, u8)> for Renderer { + fn process(&mut self, data: (i32, i32, i32), _order: u64) -> (usize, usize, u8) { + let iterations = 10000; + match data { + (x, y, k) => { + (x as usize, y as usize, (255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8) + } + } + } +} + + +fn mandelbrot_rustspp(size: usize, threads: i32) { + { + let mut pipeline = pipeline![ + parallel!(CalculatePixelIterations { + params: Parameters { + init_a: -2.125, + init_b: -1.5, + step: 3.0 / (size as f64), + } + }, threads), + sequential!(Renderer { })]; + + for i in 0 .. size { + for j in 0 .. size { + pipeline.post((i as i32, j as i32)).unwrap(); + } + } + pipeline.end_and_wait(); + } +} + +fn mandelbrot_rayon(size: usize, thread_pool: Rc) { + //x: i32, y: i32, step: f64, init_a: f64, init_b: f64 + let buf: Arc>>> = Arc::new(Mutex::new(vec![vec![0u8; size]; size])); + + let pixels = SquareImage { size: size as i32, cur_x:0, cur_y:0 }; + + let as_vec: Vec<(i32, i32)> = pixels.into_iter().collect(); + + let params = Parameters { + init_a: -2.125, + init_b: -1.5, + step: 3.0 / (size as f64), + }; + + thread_pool.install(|| { + as_vec.into_par_iter() + .map(|(x, y)| (x, y, calculate_pixel(x, y, params.step, params.init_a, params.init_b))) + .for_each(|(x,y,k)| { + let mut buf = buf.lock().unwrap(); + let iterations = 10000; + buf[x as usize][y as usize] = (255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8; + }); + }); + +} + +fn get_pixel_value(iterations: i32, k: i32) -> u8 { + (255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8 +} + +type PixelPositionAndValue = (i32,i32,u8); + +fn mandelbrot_rayon_collect(size: usize, thread_pool: Rc) { + //x: i32, y: i32, step: f64, init_a: f64, init_b: f64 + + let pixels = SquareImage { size: size as i32, cur_x:0, cur_y:0 }; + + let as_vec: Vec<(i32, i32)> = pixels.into_iter().collect(); + + let params = Parameters { + init_a: -2.125, + init_b: -1.5, + step: 3.0 / (size as f64), + }; + + thread_pool.install(|| { + let mut buf: Vec> = vec![vec![0u8; size]; size]; + let mut all_pixels: Vec = vec![(0,0,0); size]; + + as_vec.into_par_iter() + .map(|(x, y)| (x, y, + get_pixel_value(10000, calculate_pixel(x, y, params.step, params.init_a, params.init_b)))) + .collect_into_vec(&mut all_pixels); + + for (x, y, pixel) in all_pixels { + buf[x as usize][y as usize] = pixel; + } + }); + +} + +fn criterion_benchmark(c: &mut Criterion) { + + let threads_to_run = 4 ..= (num_cpus::get() as i32) * 2; + println!("threads: {:?}", threads_to_run); + c.bench("mandelbrot pixel by pixel comparison", + ParameterizedBenchmark::new( + "rustsupp mandelbrot 1000x1000", + |b, &threads| { b.iter(|| mandelbrot_rustspp(1000, threads)); }, + threads_to_run) + .with_function("rayon mandelbrot 1000x1000", + |b, &threads| { + let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap()); + b.iter(|| mandelbrot_rayon(1000, pool.clone())); }) + .sample_size(10)); + +} + + +fn rayon_benchmark(c: &mut Criterion) { + + let threads_to_run = 1 ..= (num_cpus::get() as i32); + println!("threads: {:?}", threads_to_run); + c.bench("mandelbrot rayon pixel by pixel", + ParameterizedBenchmark::new( + "rayon foreach 1000x1000", + |b, &threads| { + let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap()); + b.iter(|| mandelbrot_rayon(1000, pool.clone())); + },threads_to_run) + .with_function("rayon collect 1000x1000", + |b, &threads| { + let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap()); + b.iter(|| mandelbrot_rayon_collect(1000, pool.clone())); }) + .sample_size(10)); + +} + +criterion_group!(benches, criterion_benchmark); +//criterion_group!(benches, rayon_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/benches/mandelbrot_rustspp.rs b/benches/mandelbrot_rustspp.rs new file mode 100644 index 0000000..8d989ce --- /dev/null +++ b/benches/mandelbrot_rustspp.rs @@ -0,0 +1,221 @@ + +#[macro_use] +extern crate criterion; + +use criterion::Criterion; +use criterion::ParameterizedBenchmark; +use criterion::Benchmark; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; +use rust_spp::*; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use raster::Color; +use futures::Map; +use tokio::*; +use tokio_core::reactor::Core; +use tokio::prelude::*; +use std::thread; +use std::time::*; +use futures::sync::*; + +use futures::{stream, Stream, Future}; +use futures::future::lazy; +struct ImageLine { + line_index: usize, + line_buffer: Vec +} + + +fn render_line(size: usize, line: usize) -> ImageLine { + + let init_a = -2.125 as f64; + let init_b = -1.5 as f64; + let range = 3.0 as f64; + let step = range / (size as f64); + + let mut m: Vec = vec![0; size]; + + let i = line; + + let im = init_b + (step * (i as f64)); + let iterations = 10000; + + for j in 0 .. size { + + let mut a = init_a + step * j as f64; + let cr = a; + + let mut b = im; + let mut k = 0; + + for ii in 0.. iterations { + let a2 = a * a; + let b2 = b * b; + if (a2 + b2) > 4.0 {break;} + b = 2.0 * a * b + im; + a = a2 - b2 + cr; + k = ii; + } + m[j] = (255 as f64 - (((k as f64) * 255 as f64 / (iterations as f64)))) as u8; + } + return ImageLine { + line_index: line as usize, + line_buffer: m + }; +} + +struct ComputeLine { + size: usize +} +impl ComputeLine { + fn new(size: usize) -> ComputeLine { + ComputeLine {size: size} + } +} +impl InOut for ComputeLine { + fn process(&mut self, image_line: usize) -> ImageLine { + render_line(self.size, image_line) + } +} + + +struct RenderLine { +} +impl In for RenderLine { + fn process(&mut self, image_line: ImageLine, _order: u64) -> ImageLine { + image_line + } +} + +fn mandelbrot_sequential(size: usize) -> Vec { + (0 .. size).into_iter() + .map(|image_line| render_line(size, image_line)) + .collect() +} + +fn mandelbrot_rustspp(size: usize, threads: usize) { + + let pipeline = pipeline![ + parallel!(move |line_index| render_line(size, line_index), threads as i32), + collect!()]; + + for i in 0 .. size { + pipeline.post(i as usize).unwrap(); + } + let rendered_image = pipeline.collect(); + for item in rendered_image { + println!("Collected line {:?}", item.line_index) + } +} + + + +fn mandelbrot_rustspp_ordered(size: usize, threads: usize) { + + let mut pipeline = pipeline![ + parallel!(ComputeLine::new(size), threads as i32), + collect_ordered!()]; + + for i in 0 .. size { + pipeline.post(i as usize).unwrap(); + } + let lines = pipeline.collect(); +} + + +fn mandelbrot_tokio(size: usize, threads: usize) { + + let mandelbrot_stream = stream::iter_ok(0..size) + .map(move |index| { + let (sender, receiver) = oneshot::channel::(); + tokio::spawn(lazy(move || { + let result = render_line(size, index ); + sender.send(result).ok(); + Ok(()) + })); + receiver + }) + .buffered(threads) + .for_each(|_rendered_line| { Ok(())}) + .map_err(|e| println!("listener error = {:?}", e)); + + tokio::run(mandelbrot_stream); +} + + +fn mandelbrot_rayon(size: usize, thread_pool: Rc) -> Vec{ + let mut b = vec![]; + thread_pool.install(|| { + (0 .. size).into_par_iter() + .map(|image_line| { render_line(size, image_line) }) + .collect_into_vec(&mut b); + }); + return b; +} + + +fn mandelbrot_tokio_unordered(size: usize, threads: usize) { + + let line_stream = stream::iter_ok(0..size); + + let mandelbrot_stream = line_stream + .map(move |index| { + let (sender, receiver) = oneshot::channel::(); + tokio::spawn(lazy(move ||{ + let result = render_line(size, index); + sender.send(result).ok(); + Ok(()) + })); + receiver + }) + .buffer_unordered(threads) + .for_each(|_rendered_line| { Ok(())}) + .map_err(|e| println!("listener error = {:?}", e)); + + tokio::run(mandelbrot_stream); + +} + +fn mandelbrot_benches(c: &mut Criterion) { + + c.bench("mandelbrot comparison", + ParameterizedBenchmark::new( + "mandelbrot rustspp ordered 1000x1000", + |b, &threads| { b.iter(|| mandelbrot_rustspp_ordered(1000, threads)); }, + 4 ..= num_cpus::get() * 2) + .sample_size(10)); + + /*c.bench("mandelbrot comparison", + ParameterizedBenchmark::new( + "mandelbrot rustspp unordered 1000x1000", + |b, &threads| { b.iter(|| mandelbrot_rustspp(1000, threads)); }, + 1 ..= num_cpus::get() * 2) + .sample_size(10)); + + c.bench("mandelbrot comparison", + ParameterizedBenchmark::new( + "mandelbrot rayon 1000x1000", + |b, &threads| { + let pool = Rc::new(ThreadPoolBuilder::new().num_threads(threads as usize).build().unwrap()); + b.iter(|| mandelbrot_rayon(1000, pool.clone())); }, + 1 ..= num_cpus::get() * 2) + .sample_size(10)); + + c.bench("mandelbrot comparison", + ParameterizedBenchmark::new( + "mandelbrot tokio ordered 1000x1000", + |b, &threads| { b.iter(|| mandelbrot_tokio(1000, threads)); }, + 1 ..= num_cpus::get() * 2) + .sample_size(10)); + + c.bench("mandelbrot comparison", + ParameterizedBenchmark::new( + "mandelbrot tokio unordered 1000x1000", + |b, &threads| { b.iter(|| mandelbrot_tokio_unordered(1000, threads)); }, + 1 ..= num_cpus::get() * 2) + .sample_size(10));*/ +} +//criterion_group!(benches, criterion_benchmark); +criterion_group!(benches, mandelbrot_benches); +criterion_main!(benches); \ No newline at end of file diff --git a/scripts/imageproc_perfdata.csv b/scripts/imageproc_perfdata.csv new file mode 100644 index 0000000..2aae63d --- /dev/null +++ b/scripts/imageproc_perfdata.csv @@ -0,0 +1,14 @@ +threads_per_filter,Tokio,Rust-SPP,stddev,Speedup Tokio,Speedup Rust-SPP,Throughput Tokio,Throughput Rust-SPP +0,261.978,261.978,2.390788573,1,1,1.210025269,1.210025269 +1,262.338,90.648,0.2669644171,0.9986277245,2.890058247,1.208364781,3.497043509 +2,67.342,50.64,0.3481379037,3.890261649,5.173341232,4.7073149,6.259873618 +3,44.612,37.32,0.2006240265,5.87236618,7.01977492,7.105711468,8.494105038 +4,36.254,31.134,0.217784297,7.226181939,8.414530738,8.743862746,10.18179482 +5,31.54,28.232,0.3372239612,8.306214331,9.279470105,10.05072923,11.22839331 +6,29.232,26.58,0.4470458589,8.962027915,9.856207675,10.84428024,11.92626035 +7,26.786,25.692,0.4742573141,9.780407676,10.19687062,11.83454043,12.33847112 +8,25.85,24.994,0.1836572895,10.13454545,10.48163559,12.26305609,12.68304393 +9,24.848,24.766,0.2900517195,10.54322279,10.57813131,12.757566,12.79980619 +10,24.26,24.934,0.3350074626,10.7987634,10.50685811,13.06677659,12.71356381 +11,24.022,23.734,0.3692289263,10.90575306,11.03808882,13.19623678,13.35636639 +12,23.862,24.122,0.3183865575,10.97887855,10.86054224,13.28472048,13.14153055 \ No newline at end of file diff --git a/scripts/imageproc_speedup.pdf b/scripts/imageproc_speedup.pdf new file mode 100644 index 0000000..970d29f Binary files /dev/null and b/scripts/imageproc_speedup.pdf differ diff --git a/scripts/imageproc_throughput.pdf b/scripts/imageproc_throughput.pdf new file mode 100644 index 0000000..8975a36 Binary files /dev/null and b/scripts/imageproc_throughput.pdf differ diff --git a/scripts/mandelbrot_comparison.pdf b/scripts/mandelbrot_comparison.pdf new file mode 100644 index 0000000..c0ea565 Binary files /dev/null and b/scripts/mandelbrot_comparison.pdf differ diff --git a/scripts/mandelbrot_perfdata.csv b/scripts/mandelbrot_perfdata.csv new file mode 100644 index 0000000..9d8579f --- /dev/null +++ b/scripts/mandelbrot_perfdata.csv @@ -0,0 +1,25 @@ +Level of Parallelism,Rust-SPP unordered,Rust-SPP ordered,Rayon,Rayon pixel by pixel,Tokio ordered,Tokio unordered,ideal,Speedup Rust-SPP,Speedup Rust-SPP Ordered,Speedup Rayon,Speedup Tokio Ordered,Speedup Tokio Unordered +1,6837,6834,6836,6847,6926,6966,6837,1,1,1,1,1 +2,3428,3440,3433,3423,3583,3530,3418.5,1.99445741,1.9875,1.991261288,1.908735696,1.937393768 +3,2352,2337,2899,2377,2414,2418,2279,2.906887755,2.925545571,2.358054502,2.833057167,2.828370554 +4,1857,1822,1822,1853,1894,1854,1709.25,3.68174475,3.752469813,3.751920966,3.610876452,3.688781014 +5,1519,1515,1490,1476,1559,1520,1367.4,4.500987492,4.512871287,4.587919463,4.386786402,4.499342105 +6,1307,1312,1389,1304,1335,1299,1139.5,5.231063504,5.211128049,4.921526278,5.122846442,5.264819092 +7,1136,1143,1236,1141,1162,1135,976.7142857,6.018485915,5.981627297,5.530744337,5.885542169,6.025550661 +8,1011,1012,1019,1001,1027,1007,854.625,6.762611276,6.755928854,6.708537782,6.659201558,6.791459782 +9,907,908,907,906,920,905,759.6666667,7.538037486,7.529735683,7.53693495,7.433695652,7.556906077 +10,824,824,831,826,836,823,683.7,8.297330097,8.297330097,8.226233454,8.18062201,8.309842041 +11,754,754,796,767,766,756,621.5454545,9.067639257,9.067639257,8.587939698,8.928198433,9.046296296 +12,695,695,761,716,704,697,569.75,9.837410072,9.837410072,8.982917214,9.714488636,9.81205165 +13,644,646,697,658,655,646,525.9230769,10.61645963,10.58359133,9.807747489,10.44122137,10.58668731 +14,602,602,655,623,613,601,488.3571429,11.35714286,11.35714286,10.43664122,11.15660685,11.37936772 +15,564,564,632,575,577,563,455.8,12.12234043,12.12234043,10.8164557,11.85268631,12.14742451 +16,531,530,546,535,544,529,427.3125,12.87570621,12.9,12.52014652,12.57169118,12.92816635 +17,500,501,509,500,514,499,402.1764706,13.674,13.64670659,13.4302554,13.30544747,13.70541082 +18,474,474,480,474,485,472,379.8333333,14.42405063,14.42405063,14.24166667,14.10103093,14.48940678 +19,450,450,456,452,466,448,359.8421053,15.19333333,15.19333333,14.99122807,14.67596567,15.265625 +20,428,427,433,430,441,426,341.85,15.97429907,16.0117096,15.78752887,15.50793651,16.05399061 +21,410,408,418,410,419,406,325.5714286,16.67560976,16.75735294,16.35406699,16.3221957,16.84482759 +22,392,393,407,390,397,387,310.7727273,17.44132653,17.39694656,16.7960688,17.22670025,17.67183463 +23,376,376,390,375,384,370,297.2608696,18.18351064,18.18351064,17.52820513,17.80989583,18.48378378 +24,364,363,383,359,370,355,284.875,18.78296703,18.83471074,17.84856397,18.48378378,19.26478873 \ No newline at end of file diff --git a/scripts/mandelbrot_speedup.pdf b/scripts/mandelbrot_speedup.pdf new file mode 100644 index 0000000..e082251 Binary files /dev/null and b/scripts/mandelbrot_speedup.pdf differ diff --git a/scripts/perfplotter.py b/scripts/perfplotter.py new file mode 100644 index 0000000..70e0004 --- /dev/null +++ b/scripts/perfplotter.py @@ -0,0 +1,184 @@ +import csv +import os +import matplotlib +import matplotlib.pyplot as plt +from matplotlib.font_manager import FontProperties +from os.path import isfile, join +import itertools +import math + +matplotlib.rcParams.update({'font.size': 10}) + +csvreader_imageproc = csv.DictReader(open('imageproc_perfdata.csv')) +csvreader_mandelbrot = csv.DictReader(open('mandelbrot_perfdata.csv')) + +def graph(xy, title, ylabel, xlabel, legend, file_path): + print("Creating graph") + + fig, axs = plt.subplots(1,1) + plt.title(title) + + x, y = xy + line = axs.plot( + x, y, legend + ) + + axs.set_ylabel(ylabel) + axs.set_xlabel(xlabel) + + labels = [if0thenseq(int(item)) for item in axs.get_xticks()] + + axs.set_xticklabels(labels) + + plt.savefig(file_path+'.pdf', bbox_inches="tight", format="pdf", dpi=200) + +def get_xy(x_col, y_col, reader): + x = [] + y = [] + + for row in reader: + x.append(float(row[x_col])) + y.append(float(row[y_col])) + return (x, y) + +def if0thenseq(val, seq): + if val == 0: + return seq + else: + return val + + +def graph_multi(lines, title, ylabel, xlabel,file_path, seq_label="Sequential"): + print("Creating graph") + + fig, axs = plt.subplots(1,1) + + ticks = list(filter(lambda x: x % 2 == 0, lines[0][0])) + + for (x,y,legend, marker, linestyle) in lines: + line = axs.plot( + x, y, + marker = marker, + label = legend, + linestyle = linestyle, + linewidth = 1, + markersize = 3 + ) + + axs.legend(loc='upper center', ncol = 3, bbox_to_anchor=(0.5, 1.25)) + + axs.set_ylabel(ylabel) + axs.set_xlabel(xlabel) + + plt.yticks(ticks) + plt.xticks(ticks) + + plt.grid() + + labels = [if0thenseq(int(item), seq_label) for item in axs.get_xticks()] + + axs.set_xticklabels(labels) + axs.set_aspect(0.5) + plt.savefig(file_path+'.pdf', bbox_inches="tight", format="pdf", dpi=300) + + +def graph_separated(lines, title, ylabel,file_path, seq_label="Sequential"): + print("Creating graph") + + fig, axs = plt.subplots(2,1) + + ticks = list(filter(lambda x: x % 2 == 0, lines[0][0])) + + curplot = 1 + for (x,y,legend, marker, linestyle, xlabel) in lines: + axs = plt.subplot(2,1,curplot) + line = axs.plot( + x, y, + marker = marker, + label = legend, + linestyle = linestyle, + linewidth = 1, + markersize = 3 + ) + + axs.legend(loc='upper center', ncol = 3, bbox_to_anchor=(0.5, 1.3)) + + axs.set_ylabel(ylabel) + axs.set_xlabel(xlabel) + + plt.yticks(ticks) + plt.xticks(ticks) + + plt.grid() + + labels = [if0thenseq(int(item), seq_label) for item in axs.get_xticks()] + + axs.set_xticklabels(labels) + axs.set_aspect(0.5) + + curplot = curplot + 1 + plt.tight_layout() + plt.savefig(file_path+'.pdf', bbox_inches="tight", format="pdf", dpi=300) + + + +def plot_mandelbrot_comparison(): + x_rustspp, y_rustspp = get_xy("Level of Parallelism", "Speedup Rust-SPP", csv.DictReader(open('mandelbrot_perfdata.csv'))) + x_rustspp_ordered, y_rustspp_ordered = get_xy("Level of Parallelism", "Speedup Rust-SPP Ordered", csv.DictReader(open('mandelbrot_perfdata.csv'))) + x_rayon, y_rayon = get_xy("Level of Parallelism", "Speedup Rayon", csv.DictReader(open('mandelbrot_perfdata.csv'))) + x_tokio_ordered, y_tokio_ordered = get_xy("Level of Parallelism", "Speedup Tokio Ordered", csv.DictReader(open('mandelbrot_perfdata.csv'))) + x_tokio_unordered, y_tokio_unordered = get_xy("Level of Parallelism", "Speedup Tokio Unordered", csv.DictReader(open('mandelbrot_perfdata.csv'))) + x_ideal, y_ideal = (x_rayon, x_rayon) + + lines = [(x_rustspp, y_rustspp, "SSP-Rust Unordered", 's', '--'), + (x_rustspp_ordered, y_rustspp_ordered, "SSP-Rust Ordered", '3', '-.'), + (x_rayon, y_rayon, "Rayon", '4', '--'), + (x_tokio_ordered, y_tokio_ordered, "Tokio Ordered", '+', ':'), + (x_tokio_unordered, y_tokio_unordered, "Tokio Unordered", 'x', ':'), + (x_ideal, y_ideal, "Ideal", '2', '-')] + # print(lines) + graph_multi(lines, + "Mandelbrot 1000x1000 Speedup", + "Speedup", + "Parallelism Level", + "mandelbrot_comparison") + + +def plot_imageproc_speedup(): + x, y_rustspp = get_xy("threads_per_filter", "Speedup Rust-SPP", csv.DictReader(open('imageproc_perfdata.csv'))) + x, y_tokio = get_xy("threads_per_filter", "Speedup Tokio", csv.DictReader(open('imageproc_perfdata.csv'))) + x, y_tokio_normalized = get_xy("threads_per_filter", "Speedup Tokio", csv.DictReader(open('imageproc_perfdata.csv'))) + + x = x[0:13] + print(len(x)) + y_rustspp = y_rustspp[0:13] + print(len(y_rustspp)) + y_tokio = y_tokio[0:13] + print(len(y_tokio)) + y_tokio_normalized = y_tokio_normalized[1:13] + y_tokio_normalized.append(y_tokio_normalized[-1]) + print(len(y_tokio_normalized)) + + graph_multi([ + (x,y_rustspp, "SSP-Rust", "x", ":"), + (x,y_tokio, "Tokio", "x", "--"), + (x,y_tokio_normalized, "Tokio Normalized", "o", ":")], + "Image processing speedup", + "Speedup", + "Parallelism Level", + "imageproc_speedup", seq_label="Seq") + +def plot_imageproc_throughput(): + x, y_rustspp = get_xy("threads_per_filter", "Throughput Rust-SPP", csv.DictReader(open('imageproc_perfdata.csv'))) + x, y_tokio = get_xy("threads_per_filter", "Throughput Tokio", csv.DictReader(open('imageproc_perfdata.csv'))) + + graph_separated([ + (x,y_rustspp, "SSP-Rust", "x", "-", "Threads per stage"), + (x,y_tokio, "Tokio", "x", "-", "Buffer size per stage")], + "Image processing throughput", + "Images/sec", + "imageproc_throughput", seq_label="Seq") + + +plot_mandelbrot_comparison() +plot_imageproc_throughput() diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 0000000..5008ddf Binary files /dev/null and b/src/.DS_Store differ diff --git a/src/blocks/blocks.rs b/src/blocks/blocks.rs new file mode 100644 index 0000000..275180c --- /dev/null +++ b/src/blocks/blocks.rs @@ -0,0 +1,43 @@ +use crate::work_storage::{WorkItem, TimestampedWorkItem}; + + +//Base trait for all blocks in the pipeline +//Used by the internals. Should be able to detal with +//timestamped items and also perform some automatic timestamping on its own +pub trait PipelineBlock { + fn process(&self, input: WorkItem); + fn process_timestamped(&self, input: TimestampedWorkItem); + fn collect(self: Box) -> Vec; +} + +#[derive(Clone, Copy)] +pub enum OrderingMode { + Unordered, + Ordered, +} + +pub enum BlockMode { + Sequential(OrderingMode), + Parallel(i32) +} + + + +pub struct MonitorLoop { + loop_function: Box () + Send> +} + +impl MonitorLoop { + + pub fn new(function: F) -> MonitorLoop + where F: FnOnce() -> (), F: Send + 'static { + MonitorLoop { + loop_function: Box::new(function) + } + } + + pub fn run(self) { + (self.loop_function)() + } + +} \ No newline at end of file diff --git a/src/blocks/in_block.rs b/src/blocks/in_block.rs new file mode 100644 index 0000000..e05bee1 --- /dev/null +++ b/src/blocks/in_block.rs @@ -0,0 +1,179 @@ +use crate::*; +use crate::blocks::*; +use work_storage::{WorkItem, TimestampedWorkItem}; +use std::sync::Arc; +use std::sync::atomic::{Ordering, AtomicUsize}; +use std::thread::JoinHandle; +use std::thread; +use work_storage::{BlockingQueue, BlockingOrderedSet}; +use parking_lot::{Mutex}; + +//Public API: An output node, receives values and causes side effects +pub trait In { + fn process(&mut self, input: TInput, order: u64) -> TCollected; +} + + +impl In for F where F: FnMut(TInput) -> TCollected { + fn process(&mut self, input: TInput, _order: u64) -> TCollected { + (*self)(input) + } +} + +//Internals: InBlock processing queue for blocks in the pipeline +pub struct InBlock { + work_queue: Arc>, + ordered_work: Arc>, + collected_items: Arc>>, + handler: Box Box>>, + ordering: OrderingMode, + counter: AtomicUsize +} + +// Internals: This is a thread-local object for in blocks +struct InBlockInfo { + handler: Box> +} + + +impl PipelineBlock for InBlock { + + //used by the public API + fn process(&self, input: WorkItem) { + match self.ordering { + //For the unordered case, just enqueue it + OrderingMode::Unordered => { + (*self.work_queue).enqueue(input); + }, + //For the ordered case: All InBlocks are single threaded + //so we keep a count. Store under an atomic counter + //in case we implement a multithreaded outblock + OrderingMode::Ordered => { + let c = self.counter.load(Ordering::SeqCst); + (*self.ordered_work).enqueue(TimestampedWorkItem(input, c as u64)); + self.counter.store(c + 1, Ordering::SeqCst); + } + }; + () + } + + //Used internally + fn process_timestamped(&self, input: TimestampedWorkItem) { + match self.ordering { + OrderingMode::Unordered => match input { + TimestampedWorkItem(work_item, _) => { + (*self.work_queue).enqueue(work_item); + } + }, + OrderingMode::Ordered => (*self.ordered_work).enqueue(input) + }; + } + + fn collect(self: Box) -> Vec { + match Arc::try_unwrap(self.collected_items) { + Ok(result) => result.into_inner(), + Err(_) => { + panic!("Could not unwrap Arc in call to collect"); + } + } + } +} + + + +impl InBlock +where + TInput: Send, + TInput: Sync, + TCollected: Send, + TCollected: Sync, +{ + pub fn monitor_posts(&mut self) -> MonitorLoop { + match self.ordering { + OrderingMode::Ordered => self.monitor_ordered(), + OrderingMode::Unordered => self.monitor_unordered() + } + + } + + fn monitor_unordered(&mut self) -> MonitorLoop { + let queue = self.work_queue.clone(); + + let mut info = InBlockInfo { + handler: (self.handler)() + }; + + let arc_collected = self.collected_items.clone(); + + MonitorLoop::new(move || { + let mut collected_list = arc_collected.lock(); + loop { + let item = queue.wait_and_dequeue(); + match item { + TimestampedWorkItem(WorkItem::Value(val), order) => { + let collected: TCollected = info.handler.process(val, order); + (*collected_list).push(collected); + }, + TimestampedWorkItem(WorkItem::Dropped, order) => { + () + } + TimestampedWorkItem(WorkItem::Stop, _) => { + break; + } + }; + } + }) + } + + pub fn monitor_ordered(&mut self) -> MonitorLoop { + let storage = self.ordered_work.clone(); + + let mut info = InBlockInfo { + handler: (self.handler)() + }; + let arc_collected = self.collected_items.clone(); + + MonitorLoop::new(move || { + let mut next_item = 0; + let mut collected_list = arc_collected.lock(); + loop { + let item = storage.wait_and_remove(next_item); + match item { + TimestampedWorkItem(WorkItem::Value(val), order) => { + debug_assert!(order == next_item); + next_item += 1; + let collected: TCollected = info.handler.process(val, order); + (*collected_list).push(collected); + } + TimestampedWorkItem(WorkItem::Dropped, order) => { + next_item += 1; + } + TimestampedWorkItem(WorkItem::Stop, _) => { + break; + } + }; + } + }) + } + +} + + +impl InBlock { + pub fn new(behavior: BlockMode, factory: Box Box>>) -> InBlock { + match behavior { + BlockMode::Parallel(_) => unimplemented!("parallel inblocks not implemented"), + BlockMode::Sequential(ordering) => InBlock { + work_queue: BlockingQueue::new(), + handler: factory, + ordering: ordering, + ordered_work: BlockingOrderedSet::new(), + counter: AtomicUsize::new(0), + collected_items: Arc::new(Mutex::new(vec![])) + }, + } + } +} + +unsafe impl Send for InBlockInfo {} +unsafe impl Sync for InBlockInfo {} diff --git a/src/blocks/inout_block.rs b/src/blocks/inout_block.rs new file mode 100644 index 0000000..2c23338 --- /dev/null +++ b/src/blocks/inout_block.rs @@ -0,0 +1,172 @@ +use crate::blocks::*; +use crate::work_storage::*; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::thread::JoinHandle; +use std::thread; + +// Public API: A Input-Output node; transforms some value into another +pub trait InOut { + fn process(&mut self, input: TInput) -> Option; +} + + +impl InOut for F where F: FnMut(TInput) -> Option { + fn process(&mut self, input: TInput) -> Option { + (*self)(input) + } +} + + +// Internals: This is a thread-local object for inout blocks +struct InOutBlockInfo { + next_step: Arc>>, + transformer: Box> +} + +//Internals: Processing queue for inout blocks in the pipeline +pub struct InOutBlock { + work_queue: Arc>, + next_step: Arc>>, + transformer_factory: Box Box>>, + replicas: i32, +} + +impl InOutBlock { + pub fn send_stop(&self) { + (*self.work_queue).enqueue(WorkItem::Stop); + } +} + +impl PipelineBlock +for InOutBlock +where + TInput: Send, + TInput: Sync, +{ + //used by the public API. Always unordered + fn process(&self, input: WorkItem) { + (*self.work_queue).enqueue(input); + } + + //Used internally + fn process_timestamped(&self, input: TimestampedWorkItem) { + (*self.work_queue).enqueue_timestamped(input) + } + + fn collect(self: Box) -> Vec { + match Arc::try_unwrap(self.next_step) { + Ok(result) => result.collect(), + Err(_) => { + panic!("Could not unwrap Arc in call to collect"); + } + } + } + +} + +impl InOutBlock +where + TInput: Send, + TInput: Sync, +{ + pub fn new( + next_step: Box>, + transformer: BlockMode, + transformer_factory: Box Box>> + ) -> InOutBlock { + match transformer { + BlockMode::Parallel(replicas) => { + InOutBlock::new_block(next_step, transformer_factory, replicas) + } + BlockMode::Sequential(_) => InOutBlock::new_block(next_step, transformer_factory, 1), + } + } + + pub fn new_block( + next_step: Box>, + transformer: Box Box>>, + replicas: i32, + ) -> InOutBlock { + InOutBlock { + work_queue: BlockingQueue::new(), + next_step: Arc::new(next_step), + transformer_factory: transformer, + replicas: replicas, + } + } + + + pub fn monitor_posts(&mut self) -> Vec { + let mut monitors: Vec = vec![]; + let alive_threads = Arc::new(AtomicUsize::new(self.replicas as usize)); + + for _ in 0..self.replicas { + let queue = self.work_queue.clone(); + let alive_threads = alive_threads.clone(); + + let mut info = InOutBlockInfo { + next_step: self.next_step.clone(), + transformer: (self.transformer_factory)(), + }; + + let monitor_loop = MonitorLoop::new(move || { + + loop { + let dequeued = queue.wait_and_dequeue(); + + match dequeued { + TimestampedWorkItem(WorkItem::Value(val), order) => { + let output = info.transformer.process(val); + + if let Some(val) = output { + info.next_step.process_timestamped(TimestampedWorkItem( + WorkItem::Value(val), + order, + )); + } else { + info.next_step.process_timestamped(TimestampedWorkItem( + WorkItem::Dropped, + order, + )); + } + }, + TimestampedWorkItem(WorkItem::Dropped, order) => { + info.next_step.process_timestamped(TimestampedWorkItem( + WorkItem::Dropped, + order, + )); + }, + TimestampedWorkItem(WorkItem::Stop, order) => { + let mut threads = alive_threads.load(Ordering::SeqCst); + + threads -= 1; + + if threads == 0 { + info.next_step.process_timestamped(TimestampedWorkItem( + WorkItem::Stop, + order, + )); + } + + alive_threads.store(threads, Ordering::SeqCst); + + //reenqueue the same item + queue.enqueue_timestamped(dequeued); + + break; + } + } + } + }); + monitors.push(monitor_loop); + } + + return monitors; + } + +} + +/* Assume a MapBlock can be passed to threads, and assume we'll implement parallelism correctly */ +unsafe impl Send for InOutBlockInfo {} +unsafe impl Sync for InOutBlockInfo {} diff --git a/src/blocks/mod.rs b/src/blocks/mod.rs new file mode 100644 index 0000000..672127b --- /dev/null +++ b/src/blocks/mod.rs @@ -0,0 +1,8 @@ + +pub mod blocks; +pub mod in_block; +pub mod inout_block; + +pub use blocks::{BlockMode, OrderingMode, PipelineBlock, MonitorLoop}; +pub use in_block::{In, InBlock}; +pub use inout_block::{InOut, InOutBlock}; \ No newline at end of file diff --git a/src/image_processing.rs b/src/image_processing.rs new file mode 100644 index 0000000..8f8ae69 --- /dev/null +++ b/src/image_processing.rs @@ -0,0 +1,458 @@ +//#![feature(trace_macros)] +//trace_macros!(true); + +use raster::filter; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; +use rust_spp::*; +use std::path::PathBuf; +use std::sync::{Arc, Mutex}; + +struct ImageToProcess { + path: PathBuf, + image: raster::Image, +} + +struct LoadImage; +impl InOut for LoadImage { + fn process(&mut self, input: PathBuf) -> Option { + Some(ImageToProcess { + image: raster::open(input.to_str().unwrap()).unwrap(), + path: input, + }) + } +} + +struct ApplyGrayscale; +impl InOut for ApplyGrayscale { + fn process(&mut self, input: ImageToProcess) -> Option { + let mut input = input; + filter::grayscale(&mut input.image).unwrap(); + Some(input) + } +} + +struct ApplySharpen; +impl InOut for ApplySharpen { + fn process(&mut self, input: ImageToProcess) -> Option { + let mut input = input; + filter::sharpen(&mut input.image).unwrap(); + Some(input) + } +} + +struct ApplyGamma; +impl InOut for ApplyGamma { + fn process(&mut self, input: ImageToProcess) -> Option { + let mut input = input; + filter::gamma(&mut input.image, 2.0).unwrap(); + Some(input) + } +} + +struct ApplyEmboss; +impl InOut for ApplyEmboss { + fn process(&mut self, input: ImageToProcess) -> Option { + let mut input = input; + filter::emboss(&mut input.image).unwrap(); + Some(input) + } +} + +struct ApplyMoreSaturation; +impl InOut for ApplyMoreSaturation { + fn process(&mut self, input: ImageToProcess) -> Option { + let mut input = input; + filter::saturation(&mut input.image, 0.2).unwrap(); + Some(input) + } +} + +struct ResizeTo500pxWidth; +impl InOut for ResizeTo500pxWidth { + fn process(&mut self, input: ImageToProcess) -> Option { + let mut input = input; + raster::transform::resize_exact_width(&mut input.image, 500).unwrap(); + Some(input) + } +} + +struct SaveImageAndGetResult; +impl InOut for SaveImageAndGetResult { + fn process(&mut self, input: ImageToProcess) -> Option { + let result_dir = "../processed_images"; + + let result = result_dir.to_owned() + + "/" + + input.path.file_stem().unwrap().to_str().unwrap() + + "_processed." + + input.path.extension().unwrap().to_str().unwrap(); + + raster::save(&input.image, &result).unwrap(); + + return Some(result.to_string()); + } +} + +struct PrintResult; +impl In for PrintResult { + fn process(&mut self, input: String, order: u64) { + // println!("Finished image {:?} {:?}", order, input) + } +} + +pub fn process_images(threads: i32) { + let mut pipeline = pipeline![ + parallel!( + |input: PathBuf| { + Some(ImageToProcess { + image: raster::open(input.to_str().unwrap()).unwrap(), + path: input, + }) + }, + 50 + ), + parallel!(ApplyMoreSaturation, threads), + parallel!(ApplyEmboss, threads), + parallel!(ApplyGamma, threads), + parallel!(ApplySharpen, threads), + parallel!(ApplyGrayscale, threads), + parallel!(ResizeTo500pxWidth, threads), + parallel!(SaveImageAndGetResult, 50), + sequential!(|input: String| {}) + ]; + + let dir_entries = std::fs::read_dir("../images"); + + for entry in dir_entries.unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + + if path.extension().is_none() { + continue; + } + + pipeline.post(path).unwrap(); + } + + pipeline.end_and_wait(); +} + +fn load_all_images() -> Vec { + + let pipeline = pipeline![ + parallel!(LoadImage, 50), + sequential!(|image: ImageToProcess| {image}) + ]; + + let dir_entries = std::fs::read_dir("../images"); + + for entry in dir_entries.unwrap() { + let entry = entry.unwrap(); + let path = entry.path(); + + if path.extension().is_none() { + continue; + } + + pipeline.post(path).unwrap(); + } + + let collected = pipeline.collect(); + println!("All {:?} images loaded", collected.len()); + collected +} + +struct DummySave; +impl In for DummySave { + fn process(&mut self, input: ImageToProcess, order: u64) { + // println!("Finished image {:?} {:?}", order, input) + } +} + +macro_rules! time { + ($e:expr, $clock_name:expr) => { + // let start = time::precise_time_s(); + $e; + // let end = time::precise_time_s(); + // println!("{:?} took {:?}", $clock_name, end - start); + }; +} + +pub fn process_sequential() -> f64 { + let all_images = load_all_images(); + // println!("Sequential"); + let result_dir = "../processed_images"; + + let start = time::precise_time_s(); + + // println!("dir entreis {:?}", dir_entries); + for image_to_process in all_images.into_iter() { + let path = image_to_process.path; + let mut image = image_to_process.image; + + //let result = result_dir.to_owned() + "/"+ + // path.file_stem().unwrap().to_str().unwrap() + + // "_processed." + + // path.extension().unwrap().to_str().unwrap(); + + // println!("Processing {:?}", path); + time!(filter::saturation(&mut image, 0.2).unwrap(), "saturation"); + time!(filter::emboss(&mut image).unwrap(), "emboss"); + time!(filter::gamma(&mut image, 2.0).unwrap(), "gamma"); + time!(filter::sharpen(&mut image).unwrap(), "sharpen"); + time!(filter::grayscale(&mut image).unwrap(), "grayscale"); + time!( + raster::transform::resize_exact_width(&mut image, 500).unwrap(), + "resize(500px)" + ); + + // time!(raster::save(&image, &result).unwrap(), "save"); + + // println!(""); + // println!(""); + } + + let end = time::precise_time_s(); + return end - start; +} + +fn emboss(mut input: ImageToProcess) -> ImageToProcess { + filter::emboss(&mut input.image).unwrap(); + input +} + +pub fn process_images_no_IO(threads: i32) -> f64 { + let all_images = load_all_images(); + + + let mut pipeline = pipeline![ + parallel!(|mut image_to_process: ImageToProcess| { + filter::saturation(&mut image_to_process.image, 0.2).unwrap(); + Some(image_to_process) + }, threads), + parallel!(|mut input: ImageToProcess| { + filter::emboss(&mut input.image).unwrap(); + Some(input) + }, threads), + parallel!(|mut input: ImageToProcess| { + filter::gamma(&mut input.image, 2.0).unwrap(); + Some(input) + }, threads), + parallel!(|mut input: ImageToProcess| { + filter::sharpen(&mut input.image).unwrap(); + Some(input) + }, threads), + parallel!(|mut input: ImageToProcess| { + filter::grayscale(&mut input.image).unwrap(); + Some(input) + }, threads), + parallel!(|mut input: ImageToProcess| { + raster::transform::resize_exact_width(&mut input.image, 500).unwrap(); + Some(input) + }, threads), + sequential!(|_|{}) + ]; + + let start = time::precise_time_s(); + + for entry in all_images { + pipeline.post(entry).unwrap(); + } + + pipeline.end_and_wait(); + + let end = time::precise_time_s(); + return end - start; +} + +use futures::future::lazy; +use futures::sync::*; +use futures::{stream, Future, Stream}; +use tokio::prelude::*; +use tokio::*; +use tokio_core::reactor::Core; + +macro_rules! spawn_return { + ($block:expr) => {{ + let (sender, receiver) = oneshot::channel::<_>(); + tokio::spawn(lazy(move || { + let result = $block; + sender.send(result).ok(); + Ok(()) + })); + receiver + }}; +} + +pub fn process_images_tokio(threads: i32) -> f64 { + let threads = threads as usize; + + let all_images = load_all_images(); + + let start = time::precise_time_s(); + + let image_stream = stream::iter_ok(all_images); + + let processing_pipeline = image_stream + .map(move |mut image_to_process: ImageToProcess| { + spawn_return!({ + filter::saturation(&mut image_to_process.image, 0.2).unwrap(); + image_to_process + }) + }) + .buffer_unordered(threads) + .map(move |mut img: ImageToProcess| { + spawn_return!({ + filter::emboss(&mut img.image).unwrap(); + img + }) + }) + .buffer_unordered(threads) + .map(move |mut img: ImageToProcess| { + spawn_return!({ + filter::gamma(&mut img.image, 2.0).unwrap(); + img + }) + }) + .buffer_unordered(threads) + .map(move |mut img: ImageToProcess| { + spawn_return!({ + filter::sharpen(&mut img.image).unwrap(); + img + }) + }) + .buffer_unordered(threads) + .map(move |mut img: ImageToProcess| { + spawn_return!({ + filter::grayscale(&mut img.image).unwrap(); + img + }) + }) + .buffer_unordered(threads) + .map(move |mut img: ImageToProcess| { + spawn_return!({ + raster::transform::resize_exact_width(&mut img.image, 500).unwrap(); + img + }) + }) + .buffer_unordered(threads) + .for_each(|_rendered_line| Ok(())) + .map_err(|e| println!("listener error = {:?}", e)); + + tokio::run(processing_pipeline); + + let end = time::precise_time_s(); + return end - start; +} + +pub fn process_images_tokio_unbuffered() -> f64 { + let all_images = load_all_images(); + + let start = time::precise_time_s(); + + let image_stream = stream::iter_ok(all_images); + + let processing_pipeline = image_stream + .map(move |mut image_to_process: ImageToProcess| { + filter::saturation(&mut image_to_process.image, 0.2).unwrap(); + image_to_process + }) + .map(move |mut img: ImageToProcess| { + filter::emboss(&mut img.image).unwrap(); + img + }) + .map(move |mut img: ImageToProcess| { + filter::gamma(&mut img.image, 2.0).unwrap(); + img + }) + .map(move |mut img: ImageToProcess| { + filter::sharpen(&mut img.image).unwrap(); + img + }) + .map(move |mut img: ImageToProcess| { + filter::grayscale(&mut img.image).unwrap(); + img + }) + .map(move |mut img: ImageToProcess| { + raster::transform::resize_exact_width(&mut img.image, 500).unwrap(); + img + }) + .for_each(|_rendered_line| Ok(())); + + tokio::run(processing_pipeline); + + let end = time::precise_time_s(); + return end - start; +} + +pub fn process_images_rayon(threads: i32) -> f64 { + let all_images = load_all_images(); + + let thread_pool = ThreadPoolBuilder::new() + .num_threads(threads as usize) + .build() + .unwrap(); + + let mut b = vec![]; + + let start = time::precise_time_s(); + + thread_pool.install(|| { + all_images + .into_par_iter() + .map(|mut img| { + filter::saturation(&mut img.image, 0.2).unwrap(); + img + }) + .map(|mut img| { + filter::emboss(&mut img.image).unwrap(); + img + }) + .map(|mut img| { + filter::gamma(&mut img.image, 2.0).unwrap(); + img + }) + .map(|mut img| { + filter::sharpen(&mut img.image).unwrap(); + img + }) + .map(|mut img| { + filter::grayscale(&mut img.image).unwrap(); + img + }) + .map(|mut img| { + raster::transform::resize_exact_width(&mut img.image, 500).unwrap(); + img + }) + .collect_into_vec(&mut b); + }); + + let end = time::precise_time_s(); + return end - start; +} + +/* +pub fn resize_images(parallelism_level: i32) -> f64 { + + let all_images_paths = load_images_from_dir(); + + let mut pipeline = Pipeline::new(); + + let steps = pipeline![ + Pipeline::new(), + parallel!(LoadImage, parallelism_level), + parallel!(Resize { width: 500 }, parallelism_level), + parallel!(SaveToDisk, parallelism_level), + sequential!(CollectResult)]; + + for entry in all_images{ + steps.post(entry); + } + + steps.end(); + + pipeline.wait(); +} + +*/ diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..d7c8c82 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,9 @@ +pub mod blocks; +pub mod work_storage; +#[macro_use] +pub mod spp; + + +pub use spp::*; +pub use blocks::*; +pub use work_storage::*; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..2ca75aa --- /dev/null +++ b/src/main.rs @@ -0,0 +1,71 @@ +use clap::{Arg, App}; + +mod image_processing; + +use rust_spp::*; +use rayon::prelude::*; +use rayon::ThreadPoolBuilder; + + +fn leak_test() { + let pipeline = pipeline![ + parallel!(|item: i32| { + Some(item * 2) + }, 20), + parallel!(|item: i32| { + if item % 5 == 0 { + None + } else { + Some(item * 2) + } + }, 20), + sequential!(|item: i32| { + println!("number: {:?}", item); + }) + ]; + + for i in 1..1000 { + pipeline.post(i).unwrap() + } +} + + +fn main() { + + leak_test(); + + + +/* + let matches = App::new("Rust-SPP tests") + .version("1") + .author("Ricardo Pieper") + .arg(Arg::with_name("threads") + .short("t") + .long("threads") + .help("sets the number of threads, lines in parallel being calculated") + .takes_value(true)) + .arg(Arg::with_name("executions") + .short("e") + .long("executions") + .help("How many times you want to execute") + .takes_value(true)) + .get_matches(); + + let threads = matches.value_of("threads").unwrap().parse::().unwrap(); + let executions = matches.value_of("executions").unwrap().parse::().unwrap(); + + for thread in 1 ..=threads { + println!("Executing with {:?} threads", thread); + for execution in 0..executions { + let time =/* if thread == 1 { + println!("Unbuffered"); + image_processing::process_images_tokio_unbuffered() + } else { + image_processing::process_images_tokio(thread) + };*/ image_processing::process_images_no_IO(thread); + println!("\tExecution {:?} took {:?}", execution, time); + } + } +*/ +} diff --git a/src/minimal_example.rs b/src/minimal_example.rs new file mode 100644 index 0000000..91d1e2b --- /dev/null +++ b/src/minimal_example.rs @@ -0,0 +1,94 @@ +//#![feature(trace_macros)] +//trace_macros!(true); + +#[macro_use] +extern crate lazy_static; + +mod blocks; +mod blocking_queue; +mod blocking_ordered_set; +mod out_block; +mod inout_block; +mod work_item; +mod spp; + +use blocking_queue::BlockingQueue; +use blocking_ordered_set::BlockingOrderedSet; +use rand::prelude::*; +use rand::Rng; +use std::cell::{Cell}; +use std::rc::Rc; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::{AtomicUsize}; +use std::thread; +use std::time::Duration; +use inout_block::*; +use out_block::*; +use blocks::*; +use std::fs; +use spp::Pipeline; + + +struct MultiplyBy2 { + rng: ThreadRng, +} +impl InOut for MultiplyBy2 { + fn process(&mut self, input: i32) -> i32 { + //thread::sleep(Duration::from_millis(self.rng.gen_range(1, 100))); + input + } +} + +struct Finalize { + counter: Arc> +} + +impl Out for Finalize { + fn process(&mut self, input: i32, order: u64) { + *self.counter.lock().unwrap() += 1; + } +} + +lazy_static! { + static ref counter : Arc> = Arc::new(Mutex::new(0)); +} + +fn main() { + + let mut pipeline = Pipeline::new(); + + let steps = pipeline![ + pipeline, + parallel!( + MultiplyBy2 { + rng: rand::thread_rng() + }, + 8 + ), + parallel!( + MultiplyBy2 { + rng: rand::thread_rng() + }, + 8 + ), + parallel!( + MultiplyBy2 { + rng: rand::thread_rng() + }, + 8 + ), + sequential!(Finalize { counter: counter.clone() }) + ]; + + + for i in 0..1000000 { + steps.post(i); + } + + steps.end(); + + pipeline.wait(); + + println!("Result: {:?}", *counter); + +} diff --git a/src/spp.rs b/src/spp.rs new file mode 100644 index 0000000..f5b5871 --- /dev/null +++ b/src/spp.rs @@ -0,0 +1,202 @@ + +use std::thread; +use std::thread::JoinHandle; +use crate::blocks::*; +use crate::work_storage::WorkItem; + +pub struct Pipeline { + signaled_end: bool, + initial_block: Option>, + monitors: Vec, + threads: Vec> +} + +impl Pipeline +where + TInput: Send, + TInput: Sync { + + pub fn new( + initial_block: InOutBlock, + monitors: Vec + ) -> Pipeline { + Pipeline { + initial_block: Some(initial_block), + monitors: monitors, + threads: vec![], + signaled_end: false + } + } + + fn end(&mut self) { + self.signaled_end = true; + match &self.initial_block { + Some(block) => block.send_stop(), + None => {} + } + } + + pub fn end_and_wait(&mut self) { + self.end(); + let all_threads = std::mem::replace(&mut self.threads, vec![]); + for thread in all_threads { + thread.join().unwrap(); + } + } + + pub fn post(&self, item: TInput) -> Result<(), ItemPostError> { + if self.signaled_end { + return Err(ItemPostError::StreamEnded); + } + match &self.initial_block { + Some(block) => { + block.process(WorkItem::Value(item)); + Ok(()) + } + None => Err(ItemPostError::UnknownError) + } + + } + + pub fn collect(mut self) -> Vec { + self.end_and_wait(); + + let current_block = std::mem::replace(&mut self.initial_block, None); + match current_block { + Some(block) => { + Box::new(block).collect() + } + None => vec![] + } + } + + pub fn start(&mut self) { + let monitors = std::mem::replace(&mut self.monitors, vec![]); + + for monitor in monitors { + self.threads.push(thread::spawn(move || { + monitor.run(); + })) + } + } +} + +impl Drop for Pipeline { + fn drop(&mut self) { + + let block = std::mem::replace(&mut self.initial_block, None); + + if !self.signaled_end { + block.unwrap().send_stop(); + } + + let all_threads = std::mem::replace(&mut self.threads, vec![]); + for thread in all_threads { + thread.join().unwrap(); + } + } +} + + +#[derive(Debug)] +pub enum ItemPostError { + StreamEnded, + UnknownError +} + +#[macro_export] +macro_rules! pipeline_propagate { + ($threads:expr, $s1:expr) => { + { + let (mode, factory) = $s1; + let mut block = InBlock::new(mode, factory); + $threads.push(block.monitor_posts()); + block + } + }; + + ($threads:expr, $s1:expr $(, $tail:expr)*) => { + { + let (mode, factory) = $s1; + let mut block = InOutBlock::new( + Box::new(pipeline_propagate!($threads, $($tail),*)), + mode, factory); + $threads.extend(block.monitor_posts()); + block + } + }; +} + + +#[macro_export] +macro_rules! pipeline { + ($s1:expr $(, $tail:expr)*) => { + { + let mut monitors = Vec::::new(); + let (mode, factory) = $s1; + let mut block = InOutBlock::new( + Box::new(pipeline_propagate!(monitors, $($tail),*)), + mode, factory); + monitors.extend(block.monitor_posts()); + + let mut pipeline = Pipeline::new(block, monitors); + pipeline.start(); + pipeline + } + }; +} + + +#[macro_export] +macro_rules! parallel { + ($block:expr, $threads:expr) => { + { + let mode = BlockMode::Parallel($threads); + let factory: Box Box>> = Box::new(move || Box::new($block)); + (mode, factory) + } + }; +} + + +#[macro_export] +macro_rules! sequential { + ($block:expr) => { + { + let mode = BlockMode::Sequential(OrderingMode::Unordered); + let factory: Box Box>> = Box::new(move || Box::new($block)); + (mode, factory) + } + }; +} + +#[macro_export] +macro_rules! sequential_ordered { + ($block:expr) => { + { + let mode = BlockMode::Sequential(OrderingMode::Ordered); + let factory: Box Box>> = Box::new(move || Box::new($block)); + (mode, factory) + } + }; +} + + +#[macro_export] +macro_rules! collect { + () => { + { + sequential!(move |item: _| {item}) + } + }; +} + + +#[macro_export] +macro_rules! collect_ordered { + () => { + { + sequential_ordered!(move |item: _| {item}) + } + }; +} \ No newline at end of file diff --git a/src/work_storage/blocking_ordered_set.rs b/src/work_storage/blocking_ordered_set.rs new file mode 100644 index 0000000..ebf2e31 --- /dev/null +++ b/src/work_storage/blocking_ordered_set.rs @@ -0,0 +1,42 @@ +use crate::work_storage::*; +use std::collections::BTreeMap; +use std::sync::{Arc}; +use parking_lot::{Mutex, Condvar}; + +pub struct BlockingOrderedSet { + storage: Mutex>>, + new_item_notifier: Condvar, +} + +impl BlockingOrderedSet { + pub fn new() -> Arc> { + Arc::new(BlockingOrderedSet { + storage: Mutex::new(BTreeMap::>::new()), + new_item_notifier: Condvar::new(), + }) + } + + pub fn enqueue(&self, item: TimestampedWorkItem) { + let mut queue = self.storage.lock(); + match item { + TimestampedWorkItem(_, order) => queue.insert(order, item) + }; + self.new_item_notifier.notify_one(); + } + + pub fn wait_and_remove(&self, item: u64) -> TimestampedWorkItem { + let mut storage = self.storage.lock(); + while (*storage).is_empty() || !(*storage).contains_key(&item) { + self.new_item_notifier.wait(&mut storage); + } + let removed_item = storage.remove(&item); + + match removed_item { + Some(value) => return value, + None => { panic!("Condition variable waited until item was found, but removal failed") } + } + } +} + +unsafe impl Send for BlockingOrderedSet {} +unsafe impl Sync for BlockingOrderedSet {} diff --git a/src/work_storage/blocking_queue.rs b/src/work_storage/blocking_queue.rs new file mode 100644 index 0000000..cdeaa18 --- /dev/null +++ b/src/work_storage/blocking_queue.rs @@ -0,0 +1,66 @@ + +use std::collections::VecDeque; +use std::sync::{Arc}; +use parking_lot::{Mutex, Condvar}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use crate::work_storage::*; + + +/* + * Thread-safe queue for storing work items. Each enqueued item gets a timestamp + * tag. + */ +pub struct BlockingQueue { + queue: (Mutex>>, Condvar), + number_of_inserts: AtomicUsize +} + +impl BlockingQueue { + + pub fn new() -> Arc> { + Arc::new(BlockingQueue { + queue: (Mutex::new(VecDeque::>::new()), + Condvar::new()), + number_of_inserts: AtomicUsize::new(0) + }) + } + + pub fn enqueue(&self, item: WorkItem) -> u64 { + let (mutex, cvar) = &self.queue; + let mut queue = mutex.lock(); + let current = self.number_of_inserts.load(Ordering::SeqCst); + + queue.push_back( + TimestampedWorkItem(item, current as u64)); + + self.number_of_inserts.store(current + 1, Ordering::SeqCst); + + cvar.notify_one(); + return current as u64; + } + + pub fn enqueue_timestamped(&self, item: TimestampedWorkItem) { + let (mutex, cvar) = &self.queue; + mutex.lock().push_back(item); + cvar.notify_one(); + } + + pub fn wait_and_dequeue(&self) -> TimestampedWorkItem { + let &(ref mutex, ref cvar) = &self.queue; + let mut queue = mutex.lock(); + while queue.is_empty() { + cvar.wait(&mut queue); + } + + debug_assert!(queue.is_empty() == false); + + let popped = queue.pop_front(); + + debug_assert!(popped.is_some()); + + popped.unwrap() + } +} + +unsafe impl Send for BlockingQueue {} +unsafe impl Sync for BlockingQueue {} \ No newline at end of file diff --git a/src/work_storage/mod.rs b/src/work_storage/mod.rs new file mode 100644 index 0000000..fb4537b --- /dev/null +++ b/src/work_storage/mod.rs @@ -0,0 +1,8 @@ + +pub mod blocking_queue; +pub mod blocking_ordered_set; +pub mod work_item; + +pub use blocking_queue::BlockingQueue; +pub use blocking_ordered_set::BlockingOrderedSet; +pub use work_item::{WorkItem, TimestampedWorkItem}; \ No newline at end of file diff --git a/src/work_storage/work_item.rs b/src/work_storage/work_item.rs new file mode 100644 index 0000000..e9f64f9 --- /dev/null +++ b/src/work_storage/work_item.rs @@ -0,0 +1,7 @@ +pub enum WorkItem { + Value(T), + Dropped, + Stop +} + +pub struct TimestampedWorkItem(pub WorkItem, pub u64);