Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a cap on total queue files #20

Merged
merged 2 commits into from
Feb 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 31 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ consumes a bounded amount of memory. This is done by paging elements to disk at
need. The ambition here is to support mpsc style communication without
allocating unbounded amounts of memory or dropping inputs on the floor.

## Quickstart
## Quickstart

Include the hopper library in your Cargo.toml

`hopper = "0.2"`
`hopper = "0.4"`

and use it in much the same way you'd use stdlib's mpsc:

Expand All @@ -28,28 +28,15 @@ assert_eq!(Some(9), rcv.iter().next());
```

The primary difference here is that you must provide a name for the channel and
a directory where hopper can page items to disk.
a directory where hopper can page items to disk.

## Wait, page items to disk?

Imagine that hopper's internal structure is laid out like a contiguous array:

```
[---------------|----------------|~~~~~~~~~~. . .~~~~~~~~~~~~~~~~]
0 1024 2048
```

Between the indicies of 0 and 1024 hopper stores items in-memory until they are
retrieved. Above index 1024 items are paged out to disk. Items stored between
index 1024 and 2048 are temporarily buffered in memory to allow a single page to
disk once this buffer is full. This scheme fixes the memory burden of the system
at the expense of disk IO.

Hopper is intended to be used in situtations where your system
cannot [load-shed](http://ferd.ca/queues-don-t-fix-overload.html) inputs and
_must_ eventually process them. While hopper does page to disk it will not
preserve writes across restarts, much in the same way as stdlib mpsc.

_must_ eventually process them. Hopper maintains an in-memory buffer of inputs
with disk overflow when the in-memory buffer is full. While hopper does page to
disk it will not preserve writes across restarts, much in the same way as stdlib
mpsc.

## Inside Baseball

Hopper's channel looks very much like a named pipe in Unix. You supply a
Expand All @@ -59,9 +46,9 @@ supplied to the above two functions is used to create a directory under
`data_dir`. This directory gets filled up with monotonically increasing
files in situations where the disk paging is in use. We'll treat this
exclusively from here on.

The on-disk structure look like so:

```text
data-dir/
sink-name0/
Expand All @@ -78,7 +65,7 @@ responsible for _creating_ "queue files". In the above,
`data-dir/sink-name*/*` are queue files. These files are treated as
append-only logs by the Senders. The Receivers trawl through these logs to
read the data serialized there.

### Won't this fill up my disk?

Maybe! Each Sender has a notion of the maximum bytes it may read--which you
Expand All @@ -88,24 +75,26 @@ attempt to mark the queue file as read-only and create a new file. The
Receiver is programmed to read its current queue file until it reaches EOF
and finds the file is read-only, at which point it deletes the file--it is
the only reader--and moves on to the next.

If the Receiver is unable to keep up with the Senders then, oops, your disk will
gradually fill up.

`hopper::channel_with_max_bytes` takes a `max_disk_files` argument, defining the
total number of overflow files that can exist concurrently. If all memory and
disk buffers are full, sends into the queue will fail. The error result is
written such that the caller can recover ownership of the input value. By
default `max_disk_files == usize::max_value()` and so if the Receiver is unable
to keep up with the Senders then, oops, your disk will gradually fill up.

### What kind of filesystem options will I need?

Hopper is intended to work on any wacky old filesystem with any options,
even at high concurrency. As common filesystems do not support interleaving
[small atomic
writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4)
hopper limits itself to one exclusive Sender or one exclusive Receiver at a
time. This potentially limits the concurrency of mpsc but maintains data
integrity. We are open to improvements in this area.

## What kind of performance does hopper have?

Hopper ships with a small-ish benchmark suite. For Postmates' workload, writes
of 10 MB/s are not uncommon and we've pushed that to 80 MB/s on occassion. So!
not super pokey but the actual upper-bounds have not been probed and can
probably be expanded. See end note in "What kind of filesystem options will I
need?" as an example of where hopper can be made better in concurrent workloads.
Hopper is intended to work on any wacky old filesystem with any options, even at
high concurrency. As common filesystems do not support
interleaving
[small atomic writes](https://stackoverflow.com/questions/32851672/is-overwriting-a-small-file-atomic-on-ext4) hopper
limits itself to one exclusive Sender and one exclusive Receiver at a time. This
potentially limits the concurrency of mpsc but maintains data integrity. We are
open to improvements in this area.

## What kind of performance does hopper have?

Hopper ships with benchmarks. We've seen performance only 30% slower than
stdlib's mpsc all the way up to 70x worse, depending on configuration options,
disk IO speeds and the like. We warmly encourage you benchmark on your system.
10 changes: 7 additions & 3 deletions benches/stdlib_comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,13 @@ fn hopper_tst(input: HopperInput) -> () {
let in_memory_bytes = sz * input.in_memory_total;
let max_disk_bytes = sz * input.max_disk_total;
if let Ok(dir) = tempdir::TempDir::new("hopper") {
if let Ok((snd, mut rcv)) =
channel_with_explicit_capacity("tst", dir.path(), in_memory_bytes, max_disk_bytes)
{
if let Ok((snd, mut rcv)) = channel_with_explicit_capacity(
"tst",
dir.path(),
in_memory_bytes,
max_disk_bytes,
usize::max_value(),
) {
let chunk_size = input.ith / input.total_senders;

let mut snd_jh = Vec::new();
Expand Down
Loading