diff --git a/source/vibe/core/stream.d b/source/vibe/core/stream.d index de35e99e..2d243d09 100644 --- a/source/vibe/core/stream.d +++ b/source/vibe/core/stream.d @@ -38,16 +38,27 @@ public import eventcore.driver : IOMode; ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStream sink, ulong nbytes, PipeMode mode = PipeMode.sequential) @blocking @trusted if (isOutputStream!OutputStream && isInputStream!InputStream) +{ + PipeConfig cfg; + cfg.mode = mode; + return pipe!(InputStream, OutputStream)(source, sink, nbytes, cfg); +} +/// ditto +ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStream sink, + ulong nbytes, PipeConfig config) @blocking @trusted + if (isOutputStream!OutputStream && isInputStream!InputStream) { import vibe.internal.allocator : theAllocator, makeArray, dispose; import vibe.core.core : runTask; import vibe.core.sync : LocalManualEvent, createManualEvent; import vibe.core.task : InterruptException; - final switch (mode) { + assert(config.initialBufferSize > 0, "PipeConfig.initialBufferSize must be non-zero"); + + final switch (config.mode) { case PipeMode.sequential: { - scope buffer = cast(ubyte[]) theAllocator.allocate(64*1024); + scope buffer = cast(ubyte[]) theAllocator.allocate(config.initialBufferSize); scope (exit) theAllocator.dispose(buffer); ulong ret = 0; @@ -83,6 +94,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre InputStream source; OutputStream sink; ulong nbytes; + size_t initialBufferSize; ubyte[][bufcount] buffers; size_t[bufcount] bufferFill; // buffer index that is being read/written @@ -94,7 +106,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre void readLoop() { // gradually increased depending on read speed - size_t rbsize = 64*1024; + size_t rbsize = min(this.initialBufferSize, bufsize); while (true) { ulong remaining = nbytes == ulong.max ? source.leastSize : nbytes; @@ -109,7 +121,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre auto tm = MonoTime.currTime; source.read(buffers[bi][0 .. chunk]); if (rbsize < bufsize && MonoTime.currTime - tm < 100.msecs) - rbsize *= 2; + rbsize = min(rbsize * 2, bufsize); if (nbytes != ulong.max) nbytes -= chunk; bytesWritten += chunk; bufferFill[bi] = chunk; @@ -140,6 +152,7 @@ ulong pipe(InputStream, OutputStream)(scope InputStream source, scope OutputStre scope (exit) theAllocator.dispose(buffer); ConcurrentPipeState state; + state.initialBufferSize = config.initialBufferSize; foreach (i; 0 .. bufcount) state.buffers[i] = buffer[i*($/bufcount) .. (i+1)*($/bufcount)]; swap(state.source, source); @@ -198,6 +211,11 @@ enum PipeMode { concurrent } +struct PipeConfig { + PipeMode mode; + size_t initialBufferSize = 64 * 1024; +} + /** Marks a function as blocking.