From 5a1980625fcb65cef2ada2c20c06b1e183ea49db Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Fri, 16 Feb 2018 18:06:10 -0800 Subject: [PATCH 1/2] Add a cap on total queue files This commit adds a `max_disk_files` option to `channel_with_explicit_capacity` that allows the caller to set the total amount of on-disk consumption. The caller may choose to loop on a send which results in `hopper::Error::Full` or drop the data and try again later. Hopper will _not_ block sends when there is no space available -- in memory or disk -- to store the item. This work is done in the service of supporting https://github.com/postmates/cernan/issues/411 and https://github.com/postmates/cernan/issues/412. Signed-off-by: Brian L. Troutwine --- README.md | 73 +++++++-------- benches/stdlib_comparison.rs | 10 ++- src/lib.rs | 170 +++++++++++++++++++++++++++++++---- src/receiver.rs | 9 +- src/sender.rs | 55 +++++++----- 5 files changed, 231 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 7ace122..5f01321 100644 --- a/README.md +++ b/README.md @@ -8,11 +8,11 @@ consumes a bounded amount of memory. This is done by paging elements to disk at need. The ambition here is to support mpsc style communication without allocating unbounded amounts of memory or dropping inputs on the floor. -## Quickstart +## Quickstart Include the hopper library in your Cargo.toml -`hopper = "0.2"` +`hopper = "0.4"` and use it in much the same way you'd use stdlib's mpsc: @@ -28,28 +28,15 @@ assert_eq!(Some(9), rcv.iter().next()); ``` The primary difference here is that you must provide a name for the channel and -a directory where hopper can page items to disk. +a directory where hopper can page items to disk. -## Wait, page items to disk? - -Imagine that hopper's internal structure is laid out like a contiguous array: - -``` -[---------------|----------------|~~~~~~~~~~. . .~~~~~~~~~~~~~~~~] -0 1024 2048 -``` - -Between the indicies of 0 and 1024 hopper stores items in-memory until they are -retrieved. Above index 1024 items are paged out to disk. Items stored between -index 1024 and 2048 are temporarily buffered in memory to allow a single page to -disk once this buffer is full. This scheme fixes the memory burden of the system -at the expense of disk IO. - Hopper is intended to be used in situtations where your system cannot [load-shed](http://ferd.ca/queues-don-t-fix-overload.html) inputs and -_must_ eventually process them. While hopper does page to disk it will not -preserve writes across restarts, much in the same way as stdlib mpsc. - +_must_ eventually process them. Hopper maintains an in-memory buffer of inputs +with disk overflow when the in-memory buffer is full. While hopper does page to +disk it will not preserve writes across restarts, much in the same way as stdlib +mpsc. + ## Inside Baseball Hopper's channel looks very much like a named pipe in Unix. You supply a @@ -59,9 +46,9 @@ supplied to the above two functions is used to create a directory under `data_dir`. This directory gets filled up with monotonically increasing files in situations where the disk paging is in use. We'll treat this exclusively from here on. - + The on-disk structure look like so: - + ```text data-dir/ sink-name0/ @@ -78,7 +65,7 @@ responsible for _creating_ "queue files". In the above, `data-dir/sink-name*/*` are queue files. These files are treated as append-only logs by the Senders. The Receivers trawl through these logs to read the data serialized there. - + ### Won't this fill up my disk? Maybe! Each Sender has a notion of the maximum bytes it may read--which you @@ -88,24 +75,26 @@ attempt to mark the queue file as read-only and create a new file. The Receiver is programmed to read its current queue file until it reaches EOF and finds the file is read-only, at which point it deletes the file--it is the only reader--and moves on to the next. - -If the Receiver is unable to keep up with the Senders then, oops, your disk will -gradually fill up. + +`hopper::channel_with_max_bytes` takes a `max_disk_files` argument, defining the +total number of overflow files that can exist concurrently. If all memory and +disk buffers are full sends into the queue will fail but the error result is +written such that the caller can recover ownership of the input value. By +default `max_disk_files == usize::max_value()` and so if the Receiver is unable +to keep up with the Senders then, oops, your disk will gradually fill up. ### What kind of filesystem options will I need? -Hopper is intended to work on any wacky old filesystem with any options, -even at high concurrency. As common filesystems do not support interleaving -[small atomic - writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4) -hopper limits itself to one exclusive Sender or one exclusive Receiver at a -time. This potentially limits the concurrency of mpsc but maintains data -integrity. We are open to improvements in this area. - -## What kind of performance does hopper have? - -Hopper ships with a small-ish benchmark suite. For Postmates' workload, writes -of 10 MB/s are not uncommon and we've pushed that to 80 MB/s on occassion. So! -not super pokey but the actual upper-bounds have not been probed and can -probably be expanded. See end note in "What kind of filesystem options will I -need?" as an example of where hopper can be made better in concurrent workloads. +Hopper is intended to work on any wacky old filesystem with any options, even at +high concurrency. As common filesystems do not support +interleaving +[small atomic writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4) hopper +limits itself to one exclusive Sender and one exclusive Receiver at a time. This +potentially limits the concurrency of mpsc but maintains data integrity. We are +open to improvements in this area. + +## What kind of performance does hopper have? + +Hopper ships with benchmarks. We've seen performance only 30% slower than +stdlib's mpsc all the way up to 70x worse, depending on configuration options, +disk IO speeds and the like. We warmly encourage you benchmark on your system. diff --git a/benches/stdlib_comparison.rs b/benches/stdlib_comparison.rs index 19617a0..e873911 100644 --- a/benches/stdlib_comparison.rs +++ b/benches/stdlib_comparison.rs @@ -52,9 +52,13 @@ fn hopper_tst(input: HopperInput) -> () { let in_memory_bytes = sz * input.in_memory_total; let max_disk_bytes = sz * input.max_disk_total; if let Ok(dir) = tempdir::TempDir::new("hopper") { - if let Ok((snd, mut rcv)) = - channel_with_explicit_capacity("tst", dir.path(), in_memory_bytes, max_disk_bytes) - { + if let Ok((snd, mut rcv)) = channel_with_explicit_capacity( + "tst", + dir.path(), + in_memory_bytes, + max_disk_bytes, + usize::max_value(), + ) { let chunk_size = input.ith / input.total_senders; let mut snd_jh = Vec::new(); diff --git a/src/lib.rs b/src/lib.rs index 50826c8..5d198e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -77,7 +77,8 @@ pub use self::receiver::Receiver; pub use self::sender::Sender; use serde::Serialize; use serde::de::DeserializeOwned; -use std::{fs, io, mem}; +use std::{fs, io, mem, sync}; +use std::sync::atomic::AtomicUsize; use std::path::Path; /// Defines the errors that hopper will bubble up @@ -96,6 +97,9 @@ pub enum Error { IoError(io::Error), /// Could not flush Sender NoFlush, + /// Could not write element because there is no remaining memory or disk + /// space + Full, } /// Create a (Sender, Reciever) pair in a like fashion to @@ -121,7 +125,7 @@ pub fn channel(name: &str, data_dir: &Path) -> Result<(Sender, Receiver where T: Serialize + DeserializeOwned, { - channel_with_explicit_capacity(name, data_dir, 0x100_000, 0x10_000_000) + channel_with_explicit_capacity(name, data_dir, 0x100_000, 0x10_000_000, usize::max_value()) } /// Create a (Sender, Reciever) pair in a like fashion to @@ -130,13 +134,18 @@ where /// This function creates a Sender and Receiver pair with name `name` whose /// queue files are stored in `data_dir`. The maximum number of bytes that will /// be stored in-memory are `max(max_memory_bytes, size_of(T))` and the maximum -/// size of a queue file will be `max(max_disk_bytes, 1Mb)`. The Sender is -/// clonable. +/// size of a queue file will be `max(max_disk_bytes, 1Mb)`. `max_disk_files` +/// sets the total number of concurrent queue files which are allowed to +/// exist. The total on-disk consumption of hopper will then be +/// `max(max_memory_bytes, size_of(T)) * max_disk_files`. +/// +/// The Sender is clonable. pub fn channel_with_explicit_capacity( name: &str, data_dir: &Path, max_memory_bytes: usize, max_disk_bytes: usize, + max_disk_files: usize, ) -> Result<(Sender, Receiver), Error> where T: Serialize + DeserializeOwned, @@ -157,8 +166,15 @@ where if let Err(e) = private::clear_directory(&root) { return Err(Error::IoError(e)); } - let sender = Sender::new(name, &root, max_disk_bytes, q.clone())?; - let receiver = Receiver::new(&root, q)?; + let max_disk_files = sync::Arc::new(AtomicUsize::new(max_disk_files)); + let sender = Sender::new( + name, + &root, + max_disk_bytes, + q.clone(), + sync::Arc::clone(&max_disk_files), + )?; + let receiver = Receiver::new(&root, q, sync::Arc::clone(&max_disk_files))?; Ok((sender, receiver)) } @@ -171,13 +187,103 @@ mod test { use super::channel_with_explicit_capacity; use std::{mem, thread}; - fn round_trip_exp(in_memory_limit: usize, max_bytes: usize, total_elems: usize) -> bool { + #[test] + fn ingress_shedding() { + if let Ok(dir) = tempdir::TempDir::new("hopper") { + if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity::( + "round_trip_order_preserved", // name + dir.path(), // data_dir + 8, // max_memory_bytes + 32, // max_disk_bytes + 2, // max_disk_files + ) { + let total_elems = 5 * 131082; + // Magic constant, depends on compression level and what + // not. May need to do a looser assertion. + let expected_shed_sends = 383981; + let mut shed_sends = 0; + let mut sent_values = Vec::new(); + for i in 0..total_elems { + loop { + match snd.send(i) { + Ok(()) => { + sent_values.push(i); + break; + } + Err((r, err)) => { + assert_eq!(r, i); + match err { + super::Error::Full => { + shed_sends += 1; + break; + } + _ => { + continue; + } + } + } + } + } + } + assert_eq!(shed_sends, expected_shed_sends); + + let mut received_elements = 0; + // clear space for one more element + let mut attempts = 0; + loop { + match rcv.iter().next() { + None => { + attempts += 1; + assert!(attempts < 10_000); + } + Some(res) => { + received_elements += 1; + assert_eq!(res, 0); + break; + } + } + } + // flush any disk writes + loop { + if snd.flush().is_ok() { + break; + } + } + // pull the rest of the elements + let mut attempts = 0; + for i in &sent_values[1..] { + loop { + match rcv.iter().next() { + None => { + attempts += 1; + assert!(attempts < 10_000); + } + Some(res) => { + received_elements += 1; + assert_eq!(*i, res); + break; + } + } + } + } + assert_eq!(received_elements, sent_values.len()); + } + } + } + + fn round_trip_exp( + in_memory_limit: usize, + max_bytes: usize, + max_disk_files: usize, + total_elems: usize, + ) -> bool { if let Ok(dir) = tempdir::TempDir::new("hopper") { if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity( "round_trip_order_preserved", dir.path(), in_memory_limit, max_bytes, + max_disk_files, ) { for i in 0..total_elems { loop { @@ -208,7 +314,6 @@ mod test { } // pull the rest of the elements for i in 1..total_elems { - // the +1 is for the unflushed item let mut attempts = 0; loop { match rcv.iter().next() { @@ -235,7 +340,13 @@ mod test { if (in_memory_limit / sz) == 0 || (max_bytes / sz) == 0 || total_elems == 0 { return TestResult::discard(); } - TestResult::from_bool(round_trip_exp(in_memory_limit, max_bytes, total_elems)) + let max_disk_files = usize::max_value(); + TestResult::from_bool(round_trip_exp( + in_memory_limit, + max_bytes, + max_disk_files, + total_elems, + )) } QuickCheck::new().quickcheck(inner as fn(usize, usize, usize) -> TestResult); } @@ -244,12 +355,17 @@ mod test { total_senders: usize, in_memory_bytes: usize, disk_bytes: usize, + max_disk_files: usize, vals: Vec, ) -> bool { if let Ok(dir) = tempdir::TempDir::new("hopper") { - if let Ok((snd, mut rcv)) = - channel_with_explicit_capacity("tst", dir.path(), in_memory_bytes, disk_bytes) - { + if let Ok((snd, mut rcv)) = channel_with_explicit_capacity( + "tst", + dir.path(), + in_memory_bytes, + disk_bytes, + max_disk_files, + ) { let chunk_size = vals.len() / total_senders; let mut snd_jh = Vec::new(); @@ -327,6 +443,7 @@ mod test { let total_senders = 10; let in_memory_bytes = 50; let disk_bytes = 10; + let max_disk_files = 100; let vals = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]; let mut loops = 0; @@ -335,6 +452,7 @@ mod test { total_senders, in_memory_bytes, disk_bytes, + max_disk_files, vals.clone(), )); loops += 1; @@ -351,6 +469,7 @@ mod test { total_senders: usize, in_memory_bytes: usize, disk_bytes: usize, + max_disk_files: usize, vals: Vec, ) -> TestResult { let sz = mem::size_of::(); @@ -364,21 +483,28 @@ mod test { total_senders, in_memory_bytes, disk_bytes, + max_disk_files, vals, )) } - QuickCheck::new().quickcheck(inner as fn(usize, usize, usize, Vec) -> TestResult); + QuickCheck::new() + .quickcheck(inner as fn(usize, usize, usize, usize, Vec) -> TestResult); } fn single_sender_single_rcv_round_trip_exp( in_memory_bytes: usize, disk_bytes: usize, + max_disk_files: usize, total_vals: usize, ) -> bool { if let Ok(dir) = tempdir::TempDir::new("hopper") { - if let Ok((mut snd, mut rcv)) = - channel_with_explicit_capacity("tst", dir.path(), in_memory_bytes, disk_bytes) - { + if let Ok((mut snd, mut rcv)) = channel_with_explicit_capacity( + "tst", + dir.path(), + in_memory_bytes, + disk_bytes, + max_disk_files, + ) { let builder = thread::Builder::new(); if let Ok(snd_jh) = builder.spawn(move || { for i in 0..total_vals { @@ -433,7 +559,7 @@ mod test { fn explicit_single_sender_single_rcv_round_trip() { let mut loops = 0; loop { - assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5)); + assert!(single_sender_single_rcv_round_trip_exp(8, 8, 5, 10)); loops += 1; if loops > 2_500 { break; @@ -446,7 +572,12 @@ mod test { fn single_sender_single_rcv_round_trip() { // Similar to the multi sender test except now with a single sender we // can guarantee order. - fn inner(in_memory_bytes: usize, disk_bytes: usize, total_vals: usize) -> TestResult { + fn inner( + in_memory_bytes: usize, + disk_bytes: usize, + max_disk_files: usize, + total_vals: usize, + ) -> TestResult { let sz = mem::size_of::(); if total_vals == 0 || (in_memory_bytes / sz) == 0 || (disk_bytes / sz) == 0 { return TestResult::discard(); @@ -454,10 +585,11 @@ mod test { TestResult::from_bool(single_sender_single_rcv_round_trip_exp( in_memory_bytes, disk_bytes, + max_disk_files, total_vals, )) } - QuickCheck::new().quickcheck(inner as fn(usize, usize, usize) -> TestResult); + QuickCheck::new().quickcheck(inner as fn(usize, usize, usize, usize) -> TestResult); } } diff --git a/src/receiver.rs b/src/receiver.rs index dafdc54..a182c74 100644 --- a/src/receiver.rs +++ b/src/receiver.rs @@ -2,7 +2,8 @@ use bincode::{deserialize_from, Infinite}; use private; use byteorder::{BigEndian, ReadBytesExt}; use serde::de::DeserializeOwned; -use std::fs; +use std::{fs, sync}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::io::{BufReader, ErrorKind, Read, Seek, SeekFrom}; use std::iter::IntoIterator; use std::marker::PhantomData; @@ -18,6 +19,7 @@ pub struct Receiver { resource_type: PhantomData, mem_buffer: private::Queue, disk_writes_to_read: usize, + max_disk_files: sync::Arc, } impl Receiver @@ -28,6 +30,7 @@ where pub fn new( data_dir: &Path, mem_buffer: private::Queue, + max_disk_files: sync::Arc, ) -> Result, super::Error> { let setup_mem_buffer = mem_buffer.clone(); // clone is cheeeeeap let guard = setup_mem_buffer.lock_front(); @@ -48,6 +51,7 @@ where resource_type: PhantomData, mem_buffer: mem_buffer, disk_writes_to_read: 0, + max_disk_files: max_disk_files, }) } Err(e) => Err(super::Error::IoError(e)), @@ -61,7 +65,7 @@ where // disk read happens and no `T` is returned this is an unrecoverable error. fn read_disk_value(&mut self) -> Result { loop { - match self.fp.read_u64::() { + match self.fp.read_u32::() { Ok(payload_size_in_bytes) => { let mut payload_buf = vec![0; payload_size_in_bytes as usize]; match self.fp.read_exact(&mut payload_buf[..]) { @@ -100,6 +104,7 @@ where Ok(seq_num) => { let old_log = self.root.join(format!("{}", seq_num)); fs::remove_file(old_log).expect("could not remove log"); + self.max_disk_files.fetch_add(1, Ordering::Relaxed); let lg = self.root.join(format!("{}", seq_num.wrapping_add(1))); match fs::OpenOptions::new().read(true).open(&lg) { diff --git a/src/sender.rs b/src/sender.rs index 9dc097a..7dddf31 100644 --- a/src/sender.rs +++ b/src/sender.rs @@ -3,7 +3,8 @@ use byteorder::{BigEndian, WriteBytesExt}; use private; use serde::{Deserialize, Serialize}; use std::fs; -use std::sync::MutexGuard; +use std::sync::{Arc, MutexGuard}; +use std::sync::atomic::{AtomicUsize, Ordering}; use deque::BackGuardInner; use std::io::{BufWriter, Write}; use std::marker::PhantomData; @@ -12,6 +13,8 @@ use flate2::Compression; use flate2::write::DeflateEncoder; use deque; +const PAYLOAD_LEN_BYTES: usize = ::std::mem::size_of::(); + #[derive(Debug)] /// The 'send' side of hopper, similar to `std::sync::mpsc::Sender`. pub struct Sender { @@ -20,6 +23,7 @@ pub struct Sender { max_disk_bytes: usize, mem_buffer: private::Queue, resource_type: PhantomData, + disk_files_capacity: Arc, } #[derive(Default, Debug)] @@ -42,6 +46,7 @@ where max_disk_bytes: self.max_disk_bytes, mem_buffer: self.mem_buffer.clone(), resource_type: self.resource_type, + disk_files_capacity: Arc::clone(&self.disk_files_capacity), } } } @@ -56,6 +61,7 @@ where data_dir: &Path, max_disk_bytes: usize, mem_buffer: private::Queue, + max_disk_files: Arc, ) -> Result, super::Error> where S: Into, @@ -79,6 +85,7 @@ where max_disk_bytes: max_disk_bytes, mem_buffer: mem_buffer, resource_type: PhantomData, + disk_files_capacity: max_disk_files, }) } Err(e) => Err(super::Error::IoError(e)), @@ -101,8 +108,7 @@ where // If the individual sender writes enough to go over the max we mark the // file read-only--which will help the receiver to decide it has hit the // end of its log file--and create a new log file. - let bytes_written = - (*guard).inner.bytes_written + payload_len + ::std::mem::size_of::(); + let bytes_written = (*guard).inner.bytes_written + payload_len + PAYLOAD_LEN_BYTES; if (bytes_written > self.max_disk_bytes) || (*guard).inner.sender_fp.is_none() { // Once we've gone over the write limit for our current file or find // that we've gotten behind the current queue file we need to seek @@ -116,17 +122,23 @@ where }); (*guard).inner.sender_seq_num = (*guard).inner.sender_seq_num.wrapping_add(1); (*guard).inner.path = self.root.join(format!("{}", (*guard).inner.sender_seq_num)); - match fs::OpenOptions::new() - .append(true) - .create(true) - .open(&(*guard).inner.path) - { - Ok(fp) => { - (*guard).inner.sender_fp = Some(BufWriter::new(fp)); - (*guard).inner.bytes_written = 0; - } - Err(e) => { - return Err((event, super::Error::IoError(e))); + let disk_files_capacity = self.disk_files_capacity.load(Ordering::Acquire); + if disk_files_capacity == 0 { + return Err((event, super::Error::Full)); + } else { + match fs::OpenOptions::new() + .append(true) + .create(true) + .open(&(*guard).inner.path) + { + Ok(fp) => { + self.disk_files_capacity.fetch_sub(1, Ordering::Release); + (*guard).inner.sender_fp = Some(BufWriter::new(fp)); + (*guard).inner.bytes_written = 0; + } + Err(e) => { + return Err((event, super::Error::IoError(e))); + } } } } @@ -134,8 +146,8 @@ where assert!((*guard).inner.sender_fp.is_some()); let mut bytes_written = 0; if let Some(ref mut fp) = (*guard).inner.sender_fp { - match fp.write_u64::(payload_len as u64) { - Ok(()) => bytes_written += ::std::mem::size_of::(), + match fp.write_u32::(payload_len as u32) { + Ok(()) => bytes_written += PAYLOAD_LEN_BYTES, Err(e) => { return Err((event, super::Error::IoError(e))); } @@ -190,11 +202,14 @@ where Ok(()) } - /// send writes data out in chunks, like so: - /// - /// u32: payload_size - /// [u8] payload + /// Send a event into the queue /// + /// This function will fail with IO errors if the underlying queue files are + /// temporarily exhausted -- say, due to lack of file descriptors -- of with + /// Full if there is no more space in the in-memory buffer _or_ on disk, as + /// per the `max_disk_files` setting from + /// `channel_with_explicit_capacity`. Ownership of the event will be + /// returned back to the caller on failure. pub fn send(&mut self, event: T) -> Result<(), (T, super::Error)> { // Welcome. Let me tell you about the time I fell off the toilet, hit my // head and when I woke up I saw this! ~passes knapkin drawing of the From 54d2c75e3a29af4647b6c5ba8a14d3dea01040ea Mon Sep 17 00:00:00 2001 From: "Brian L. Troutwine" Date: Tue, 20 Feb 2018 10:29:50 -0800 Subject: [PATCH 2/2] Language update The language in the README was a touch confusing. If it's still confusing it's no longer confusing in the same way. Signed-off-by: Brian L. Troutwine --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5f01321..7f4f32a 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ the only reader--and moves on to the next. `hopper::channel_with_max_bytes` takes a `max_disk_files` argument, defining the total number of overflow files that can exist concurrently. If all memory and -disk buffers are full sends into the queue will fail but the error result is +disk buffers are full, sends into the queue will fail. The error result is written such that the caller can recover ownership of the input value. By default `max_disk_files == usize::max_value()` and so if the Receiver is unable to keep up with the Senders then, oops, your disk will gradually fill up.