Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PipeConfig to allow advanced configuration of pipe() operations #371

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions source/vibe/core/stream.d
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,27 @@ public import eventcore.driver : IOMode;
ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStream sink,
ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
PipeConfig cfg;
cfg.mode = mode;
return pipe!(InputStream, OutputStream)(source, sink, nbytes, cfg);
}
/// ditto
ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStream sink,
ulong nbytes, PipeConfig config) @blocking @trusted
if (isOutputStream!OutputStream && isInputStream!InputStream)
{
import vibe.internal.allocator : theAllocator, makeArray, dispose;
import vibe.core.core : runTask;
import vibe.core.sync : LocalManualEvent, createManualEvent;
import vibe.core.task : InterruptException;

final switch (mode) {
assert(config.initialBufferSize > 0, "PipeConfig.initialBufferSize must be non-zero");

final switch (config.mode) {
case PipeMode.sequential:
{
scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024);
scope buffer = cast(ubyte[]) theAllocator.allocate(config.initialBufferSize);
scope (exit) theAllocator.dispose(buffer);

ulong ret = 0;
Expand Down Expand Up @@ -83,6 +94,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
InputStream source;
OutputStream sink;
ulong nbytes;
size_t initialBufferSize;
ubyte[][bufcount] buffers;
size_t[bufcount] bufferFill;
// buffer index that is being read/written
Expand All @@ -94,7 +106,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
void readLoop()
{
// gradually increased depending on read speed
size_t rbsize = 64*1024;
size_t rbsize = min(this.initialBufferSize, bufsize);

while (true) {
ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes;
Expand All @@ -109,7 +121,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
auto tm = MonoTime.currTime;
source.read(buffers[bi][0 .. chunk]);
if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs)
rbsize *= 2;
rbsize = min(rbsize * 2, bufsize);
if (nbytes != ulong.max) nbytes -= chunk;
bytesWritten += chunk;
bufferFill[bi] = chunk;
Expand Down Expand Up @@ -140,6 +152,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre
scope (exit) theAllocator.dispose(buffer);

ConcurrentPipeState state;
state.initialBufferSize = config.initialBufferSize;
foreach (i; 0 .. bufcount)
state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)];
swap(state.source, source);
Expand Down Expand Up @@ -198,6 +211,11 @@ enum PipeMode {
concurrent
}

struct PipeConfig {
PipeMode mode;
size_t initialBufferSize = 64 * 1024;
}


/** Marks a function as blocking.

Expand Down