Skip to content

Commit

Permalink
Add PipeConfig to allow advanced configuration of pipe() operations.
Browse files Browse the repository at this point in the history
  • Loading branch information
s-ludwig committed Dec 15, 2023
1 parent 06b7e49 commit cb4bfac
Showing 1 changed file with 22 additions and 4 deletions.
26 changes: 22 additions & 4 deletions source/vibe/core/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,27 @@ public import eventcore.driver : IOMode;
ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStream sink,
ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
PipeConfig cfg;
cfg.mode = mode;
return pipe!(InputStream, OutputStream)(source, sink, nbytes, cfg);
}
/// ditto
ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStream sink,
ulong nbytes, PipeConfig config) @blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
import vibe.internal.allocator : theAllocator, makeArray, dispose;
import vibe.core.core : runTask;
import vibe.core.sync : LocalManualEvent, createManualEvent;
import vibe.core.task : InterruptException;

final switch (mode) {
assert(config.initialBufferSize > 0, "PipeConfig.initialBufferSize must be non-zero");

final switch (config.mode) {
case PipeMode.sequential:
{
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
scope buffer = cast(ubyte[]) theAllocator.allocate(config.initialBufferSize);
scope (exit) theAllocator.dispose(buffer);

ulong ret = 0;
Expand Down Expand Up @@ -83,6 +94,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
InputStream source;
OutputStream sink;
ulong nbytes;
size_t initialBufferSize;
ubyte[][bufcount] buffers;
size_t[bufcount] bufferFill;
// buffer index that is being read/written
Expand All @@ -94,7 +106,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
void readLoop()
{
// gradually increased depending on read speed
size_t rbsize = 64*1024;
size_t rbsize = min(this.initialBufferSize, bufsize);

while (true) {
ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
Expand All @@ -109,7 +121,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
auto tm = MonoTime.currTime;
source.read(buffers[bi][0 .. chunk]);
if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs)
rbsize *= 2;
rbsize = min(rbsize * 2, bufsize);
if (nbytes != ulong.max) nbytes -= chunk;
bytesWritten += chunk;
bufferFill[bi] = chunk;
Expand Down Expand Up @@ -140,6 +152,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
scope (exit) theAllocator.dispose(buffer);

ConcurrentPipeState state;
state.initialBufferSize = config.initialBufferSize;
foreach (i; 0 .. bufcount)
state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)];
swap(state.source, source);
Expand Down Expand Up @@ -198,6 +211,11 @@ enum PipeMode {
concurrent
}

struct PipeConfig {
PipeMode mode;
size_t initialBufferSize = 64 * 1024;
}


/** Marks a function as blocking.
Expand Down

0 comments on commit cb4bfac

Please sign in to comment.