From 50997a4424a7fab87181f86f0a1d9443a723aa3b Mon Sep 17 00:00:00 2001 From: Daan De Meyer Date: Sat, 4 Jan 2020 18:01:27 +0100 Subject: [PATCH] reproc(++): Rework drain and sinks. `reproc_drain` and `process::drain` have been moved sink.h and sink.hpp respectively. `process::drain` is now `reproc::drain` and takes a process as its first argument. reproc's built-in sinks have been changed to return sinks. The functions themselves are now hidden in sink.c --- CHANGELOG.md | 27 ++++--- reproc/CMakeLists.txt | 1 + reproc/examples/drain.c | 21 +++-- reproc/include/reproc/reproc.h | 40 ---------- reproc/include/reproc/sink.h | 113 ++++++++++----------------- reproc/src/reproc.c | 41 ---------- reproc/src/sink.c | 106 +++++++++++++++++++++++++ reproc/test/argv.c | 7 +- reproc/test/environment.c | 7 +- reproc/test/io.c | 7 +- reproc/test/overflow.c | 6 +- reproc/test/working-directory.c | 7 +- reprocxx/examples/background.cpp | 2 +- reprocxx/examples/cmake-help.cpp | 16 ++-- reprocxx/include/reproc++/reproc.hpp | 54 ------------- reprocxx/include/reproc++/sink.hpp | 49 ++++++++++++ 16 files changed, 255 insertions(+), 249 deletions(-) create mode 100644 reproc/src/sink.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a5e08323..71597251c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -176,11 +176,6 @@ Naming the enum after the function it is passed to (`reproc_stop`) is simpler than using a different name. -- Inline the `reproc_sink_string` and `reproc_sink_discard` implementations in - the sink.h header. - - This avoids issues with allocating across module (DLL) boundaries on Windows. - - Rewrite tests in C using CTest and `assert` and remove doctest. Doctest is a great library but we don't really lose anything major by @@ -193,6 +188,8 @@ - Make `reproc_strerror` thread-safe. +- Move `reproc_drain` to sink.h. + - Make `reproc_drain` take a separate sink for each output stream. Sinks are now passed via the `reproc_sink` type. @@ -200,6 +197,14 @@ flexibility. To use a single sink for both output streams, simply pass the same sink to both the `out` and `err` arguments of `reproc_drain`. +- Turn `reproc_sink_string` and `reproc_sink_discard` into functions that return + sinks and hide the actual functions in sink.c. + +- Add `reproc_free` to sink.h which must be used to free memory allocated by + `reproc_sink_string`. + + This avoids issues with allocating across module (DLL) boundaries on Windows. + - Support passing timeouts to `reproc_read`, `reproc_write` and `reproc_drain`. Pass `REPROC_INFINITE` as the timeout to retain the old behaviour. @@ -244,16 +249,20 @@ - Add `sink::thread_safe::string` which is a thread-safe version of `sink::string`. -- Make `process::drain` take a separate sink for each output stream. +- Move `process::drain` out of the `process` class and move it to sink.hpp. + + `process.drain(...)` becomes `reproc::drain(process, ...)`. + +- Make `reproc::drain` take a separate sink for each output stream. Same reasoning as `reproc_drain`. -- Modify all included sinks to support the new `process::drain` behaviour. +- Modify all included sinks to support the new `reproc::drain` behaviour. - Support passing timeouts to `process::read`, `process::write` and - `process::drain`. + `reproc::drain`. - The methods default to waiting indefinitely which matches their old behaviour. + They still default to waiting indefinitely which matches their old behaviour. ### CMake diff --git a/reproc/CMakeLists.txt b/reproc/CMakeLists.txt index afd98463c..1471c4fbb 100644 --- a/reproc/CMakeLists.txt +++ b/reproc/CMakeLists.txt @@ -18,6 +18,7 @@ target_sources(reproc PRIVATE src/${REPROC_PLATFORM}/process.c src/${REPROC_PLATFORM}/redirect.c src/reproc.c + src/sink.c ) if(REPROC_TEST) diff --git a/reproc/examples/drain.c b/reproc/examples/drain.c index 520fb58b4..3b079f539 100644 --- a/reproc/examples/drain.c +++ b/reproc/examples/drain.c @@ -32,13 +32,14 @@ int main(void) goto cleanup; } - // A sink function receives a single context parameter. `reproc_sink_string` - // requires a `char **` with its value set to `NULL` to be passed to - // `reproc_drain` along with `reproc_sink_string`. If a sink function needs - // more than one parameter, simply store the parameters in a struct and pass - // the address of the struct as the `context` parameter. - reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); + // `reproc_drain` reads from a child process and passes the output to the + // given sinks. A sink consists of a function pointer and a context pointer + // which is always passed to the function. reproc provides several built-in + // sinks such as `reproc_sink_string` which stores all provided output in the + // given string. Passing the same sink to both output streams makes sure the + // output from both streams is combined into a single string. + reproc_sink sink = reproc_sink_string(&output); + r = reproc_drain(process, sink, sink, REPROC_INFINITE); if (r < 0) { goto cleanup; } @@ -56,10 +57,8 @@ int main(void) } cleanup: - // `output` always points to valid memory or is set to `NULL` by - // `reproc_sink_string` so it's always safe to call `free` on it. - free(output); - + // Memory allocated by `reproc_sink_string` must be freed with `reproc_free`. + reproc_free(output); reproc_destroy(process); if (r < 0) { diff --git a/reproc/include/reproc/reproc.h b/reproc/include/reproc/reproc.h index 3ff01085f..5249ca17d 100644 --- a/reproc/include/reproc/reproc.h +++ b/reproc/include/reproc/reproc.h @@ -120,17 +120,6 @@ typedef struct reproc_options { reproc_stop_actions stop_actions; } reproc_options; -/*! Used by `reproc_drain` to provide data to the caller. Each time data is -read, `function` is called with `context`. See `reproc_drain` and the `drain` -example for more information .*/ -typedef struct reproc_sink { - bool (*function)(REPROC_STREAM stream, - const uint8_t *buffer, - size_t size, - void *context); - void *context; -} reproc_sink; - /*! Allocate a new `reproc_t` instance on the heap. */ REPROC_EXPORT reproc_t *reproc_new(void); @@ -187,35 +176,6 @@ REPROC_EXPORT int reproc_read(reproc_t *process, size_t size, int timeout); -/*! -Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of -the sinks returns false. The `out` and `err` sinks receive the output from -stdout and stderr respectively. The same sink may be passed to both `out` and -`err`. - -If `out` or `err` are `NULL`, all output on the corresponding stream is -discarded. - -`reproc_drain` always starts by calling both sinks once with an empty buffer and -`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all -output from the previous call to `reproc_drain` one by one. - -Each call to `reproc_read` is passed the given timeout. If a call to -`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`. - -Note that his function returns 0 instead of `REPROC_EPIPE` when both output -streams of the child process are closed. - -For examples of sinks, see `sink.h`. - -Actionable errors: -- `REPROC_ETIMEDOUT` -*/ -REPROC_EXPORT int reproc_drain(reproc_t *process, - reproc_sink *out, - reproc_sink *err, - int timeout); - /*! Writes `size` bytes from `buffer` to the standard input (stdin) of the child process. diff --git a/reproc/include/reproc/sink.h b/reproc/include/reproc/sink.h index 6d61104c5..a746aa580 100644 --- a/reproc/include/reproc/sink.h +++ b/reproc/include/reproc/sink.h @@ -1,30 +1,56 @@ -/*! Sink functions that can be passed to `reproc_drain`. */ - #pragma once #include -#include -#include -#include -#include - #ifdef __cplusplus extern "C" { #endif +/*! Used by `reproc_drain` to provide data to the caller. Each time data is +read, `function` is called with `context` .*/ +typedef struct reproc_sink { + bool (*function)(REPROC_STREAM stream, + const uint8_t *buffer, + size_t size, + void *context); + void *context; +} reproc_sink; + /*! -Stores the output (both stdout and stderr) of a process in a single contiguous C -string. +Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of +the sinks returns false. The `out` and `err` sinks receive the output from +stdout and stderr respectively. The same sink may be passed to both `out` and +`err`. + +`reproc_drain` always starts by calling both sinks once with an empty buffer and +`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all +output from the previous call to `reproc_drain` one by one. + +Each call to `reproc_read` is passed the given timeout. If a call to +`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`. + +Note that his function returns 0 instead of `REPROC_EPIPE` when both output +streams of the child process are closed. + +For examples of sinks, see `sink.h`. + +Actionable errors: +- `REPROC_ETIMEDOUT` +*/ +REPROC_EXPORT int +reproc_drain(reproc_t *process, reproc_sink out, reproc_sink err, int timeout); + +/*! +Stores the output (both stdout and stderr) of a process in `output`. Expects a `char **` with its value set to `NULL` as its initial context. (Re)Allocates memory as necessary to store the output and assigns it as the value of the given context. If allocating more memory fails, the already allocated memory is freed and the value of the given context is set to `NULL`. -After calling `reproc_drain` with `reproc_sink_string`, the value of `context` +After calling `reproc_drain` with `reproc_sink_string`, the value of `output` will either point to valid memory or will be set to `NULL`. This means it is -always safe to call `free` on `context`'s value after `reproc_drain` finishes. +always safe to call `free` on `output`'s value after `reproc_drain` finishes. Because the context this function expects does not store the output size, `strlen` is called each time data is read to calculate the current size of the @@ -37,65 +63,12 @@ their output because `strlen` is used to calculate the current output size. The `drain` example shows how to use `reproc_sink_string`. ``` */ -static bool reproc_sink_string(REPROC_STREAM stream, - const uint8_t *buffer, - size_t size, - void *context); +REPROC_EXPORT reproc_sink reproc_sink_string(char **output); /*! Discards the output of a process. */ -static bool reproc_sink_discard(REPROC_STREAM stream, - const uint8_t *buffer, - size_t size, - void *context); - -// We inline the sink implementations to avoid allocation/de-allocation issues -// over module (DLL) boundaries. By inlining the implementations, memory is -// always allocated and freed in the user's executable/DLL. - -#pragma clang diagnostic push -#pragma clang diagnostic ignored "-Wunused-function" - -static inline bool reproc_sink_string(REPROC_STREAM stream, - const uint8_t *buffer, - size_t size, - void *context) -{ - (void) stream; - - char **string = (char **) context; - size_t string_size = *string == NULL ? 0 : strlen(*string); - - char *realloc_result = (char *) realloc(*string, string_size + size + 1); - if (realloc_result == NULL) { - free(*string); - *string = NULL; - return false; - } else { - *string = realloc_result; - } - - memcpy(*string + string_size, buffer, size); - - (*string)[string_size + size] = '\0'; - - return true; -} - -static inline bool reproc_sink_discard(REPROC_STREAM stream, - const uint8_t *buffer, - size_t size, - void *context) -{ - (void) stream; - (void) buffer; - (void) size; - (void) context; - - return true; -} - -#pragma clang diagnostic pop +REPROC_EXPORT reproc_sink reproc_sink_discard(void); -#ifdef __cplusplus -} -#endif +/*! Calls `free` on `ptr` and returns `NULL`. Use this function to free memory +allocated by `reproc_sink_string`. This avoids issues with allocating across +module (DLL) boundaries on Windows. */ +REPROC_EXPORT void *reproc_free(void *ptr); diff --git a/reproc/src/reproc.c b/reproc/src/reproc.c index 0a91ba13e..4e102297b 100644 --- a/reproc/src/reproc.c +++ b/reproc/src/reproc.c @@ -195,47 +195,6 @@ int reproc_read(reproc_t *process, return r; // bytes read } -int reproc_drain(reproc_t *process, - reproc_sink *out, - reproc_sink *err, - int timeout) -{ - assert_return(process, REPROC_EINVAL); - - const uint8_t initial = 0; - - // A single call to `read` might contain multiple messages. By always calling - // both sinks once with no data before reading, we give them the chance to - // process all previous output one by one before reading from the child - // process again. - if ((out && !out->function(REPROC_STREAM_IN, &initial, 0, out->context)) || - (err && !err->function(REPROC_STREAM_IN, &initial, 0, err->context))) { - return 0; - } - - uint8_t buffer[4096]; - int r = -1; - - while (true) { - REPROC_STREAM stream = { 0 }; - r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer), timeout); - if (r < 0) { - break; - } - - size_t bytes_read = (size_t) r; - - reproc_sink *sink = stream == REPROC_STREAM_OUT ? out : err; - - // `sink` returns false to tell us to stop reading. - if (sink && !sink->function(stream, buffer, bytes_read, sink->context)) { - break; - } - } - - return r == REPROC_EPIPE ? 0 : r; -} - int reproc_write(reproc_t *process, const uint8_t *buffer, size_t size, diff --git a/reproc/src/sink.c b/reproc/src/sink.c new file mode 100644 index 000000000..7e415d239 --- /dev/null +++ b/reproc/src/sink.c @@ -0,0 +1,106 @@ +#include + +#include "error.h" +#include "macro.h" + +#include +#include + +int reproc_drain(reproc_t *process, + reproc_sink out, + reproc_sink err, + int timeout) +{ + assert_return(process, REPROC_EINVAL); + assert_return(out.function, REPROC_EINVAL); + assert_return(err.function, REPROC_EINVAL); + + const uint8_t initial = 0; + + // A single call to `read` might contain multiple messages. By always calling + // both sinks once with no data before reading, we give them the chance to + // process all previous output one by one before reading from the child + // process again. + if (!out.function(REPROC_STREAM_IN, &initial, 0, out.context) || + !err.function(REPROC_STREAM_IN, &initial, 0, err.context)) { + return 0; + } + + uint8_t buffer[4096]; + int r = -1; + + while (true) { + REPROC_STREAM stream = { 0 }; + r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer), timeout); + if (r < 0) { + break; + } + + size_t bytes_read = (size_t) r; + + reproc_sink sink = stream == REPROC_STREAM_OUT ? out : err; + + // `sink` returns false to tell us to stop reading. + if (!sink.function(stream, buffer, bytes_read, sink.context)) { + break; + } + } + + return r == REPROC_EPIPE ? 0 : r; +} + +static bool sink_string(REPROC_STREAM stream, + const uint8_t *buffer, + size_t size, + void *context) +{ + (void) stream; + + char **string = (char **) context; + size_t string_size = *string == NULL ? 0 : strlen(*string); + + char *realloc_result = (char *) realloc(*string, string_size + size + 1); + if (realloc_result == NULL) { + free(*string); + *string = NULL; + return false; + } else { + *string = realloc_result; + } + + memcpy(*string + string_size, buffer, size); + + (*string)[string_size + size] = '\0'; + + return true; +} + +reproc_sink reproc_sink_string(char **output) +{ + return (reproc_sink){ sink_string, output }; +} + +static bool sink_discard(REPROC_STREAM stream, + const uint8_t *buffer, + size_t size, + void *context) +{ + (void) stream; + (void) buffer; + (void) size; + (void) context; + + return true; +} + +reproc_sink reproc_sink_discard(void) +{ + return (reproc_sink){ sink_discard, NULL }; +} + +void *reproc_free(void *ptr) +{ + assert_return(ptr, NULL); + free(ptr); + return NULL; +} diff --git a/reproc/test/argv.c b/reproc/test/argv.c index 8c4245f64..c888bae00 100644 --- a/reproc/test/argv.c +++ b/reproc/test/argv.c @@ -3,6 +3,7 @@ #undef NDEBUG #include +#include int main(void) { @@ -18,8 +19,8 @@ int main(void) assert(r == 0); char *output = NULL; - reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); + reproc_sink sink = reproc_sink_string(&output); + r = reproc_drain(process, sink, sink, REPROC_INFINITE); assert(r == 0); assert(output != NULL); @@ -40,7 +41,7 @@ int main(void) assert(*current == '\0'); reproc_destroy(process); - free(output); + reproc_free(output); return 0; } diff --git a/reproc/test/environment.c b/reproc/test/environment.c index a93aed975..12cdf10ed 100644 --- a/reproc/test/environment.c +++ b/reproc/test/environment.c @@ -3,6 +3,7 @@ #undef NDEBUG #include +#include int main(void) { @@ -18,8 +19,8 @@ int main(void) assert(r == 0); char *output = NULL; - reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); + reproc_sink sink = reproc_sink_string(&output); + r = reproc_drain(process, sink, sink, REPROC_INFINITE); assert(r == 0); assert(output != NULL); @@ -40,7 +41,7 @@ int main(void) assert(*current == '\0'); reproc_destroy(process); - free(output); + reproc_free(output); return 0; } diff --git a/reproc/test/io.c b/reproc/test/io.c index 77beae421..71845a6d1 100644 --- a/reproc/test/io.c +++ b/reproc/test/io.c @@ -3,6 +3,7 @@ #undef NDEBUG #include +#include static int io(const char *mode, const char *input, const char *expected) { @@ -23,8 +24,8 @@ static int io(const char *mode, const char *input, const char *expected) assert(r == 0); char *output = NULL; - reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); + reproc_sink sink = reproc_sink_string(&output); + r = reproc_drain(process, sink, sink, REPROC_INFINITE); assert(r == 0); assert(output != NULL); @@ -34,7 +35,7 @@ static int io(const char *mode, const char *input, const char *expected) assert(r == 0); reproc_destroy(process); - free(output); + reproc_free(output); return 0; } diff --git a/reproc/test/overflow.c b/reproc/test/overflow.c index 24b39fb1b..f24bacb01 100644 --- a/reproc/test/overflow.c +++ b/reproc/test/overflow.c @@ -17,8 +17,8 @@ int main(void) assert(r >= 0); char *output = NULL; - reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); + reproc_sink sink = reproc_sink_string(&output); + r = reproc_drain(process, sink, sink, REPROC_INFINITE); assert(r >= 0); assert(output != NULL); @@ -26,7 +26,7 @@ int main(void) assert(r == 0); reproc_destroy(process); - free(output); + reproc_free(output); return 0; } diff --git a/reproc/test/working-directory.c b/reproc/test/working-directory.c index 2c8d8227e..f5aaf11f7 100644 --- a/reproc/test/working-directory.c +++ b/reproc/test/working-directory.c @@ -3,6 +3,7 @@ #undef NDEBUG #include +#include static void replace(char *string, char old, char new) { @@ -25,8 +26,8 @@ int main(void) assert(r == 0); char *output = NULL; - reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); + reproc_sink sink = reproc_sink_string(&output); + r = reproc_drain(process, sink, sink, REPROC_INFINITE); assert(r == 0); replace(output, '\\', '/'); @@ -36,7 +37,7 @@ int main(void) assert(r == 0); reproc_destroy(process); - free(output); + reproc_free(output); return 0; } diff --git a/reprocxx/examples/background.cpp b/reprocxx/examples/background.cpp index b920f8a19..617888aff 100644 --- a/reprocxx/examples/background.cpp +++ b/reprocxx/examples/background.cpp @@ -58,7 +58,7 @@ int main(int argc, char *argv[]) // given string, allowing working with the string across multiple threads if // the mutex is locked in the other threads as well. reproc::sink::thread_safe::string sink(output, mutex); - return process.drain(sink, sink); + return reproc::drain(process, sink, sink); }); // Show new output every 2 seconds. diff --git a/reprocxx/examples/cmake-help.cpp b/reprocxx/examples/cmake-help.cpp index 11dc2c994..9f0018523 100644 --- a/reprocxx/examples/cmake-help.cpp +++ b/reprocxx/examples/cmake-help.cpp @@ -36,15 +36,15 @@ int main() return fail(ec); } - // `process::drain` reads from the stdout and stderr streams of the child - // process until both are closed or an error occurs. Providing it with a - // string sink for a specific stream makes it store all output of that stream - // in the string passed to the string sink. Passing the same sink to both the - // `out` and `err` arguments of `process::drain` causes the stdout and stderr - // output to get stored in the same string. + // `reproc::drain` reads from the stdout and stderr streams of `process` until + // both are closed or an error occurs. Providing it with a string sink for a + // specific stream makes it store all output of that stream in the string + // passed to the string sink. Passing the same sink to both the `out` and + // `err` arguments of `reproc::drain` causes the stdout and stderr output to + // get stored in the same string. std::string output; reproc::sink::string sink(output); - ec = process.drain(sink, sink); + ec = reproc::drain(process, sink, sink); if (ec) { return fail(ec); } @@ -53,7 +53,7 @@ int main() // It's easy to define your own sinks as well. Take a look at `sink.hpp` in // the repository to see how `sink::string` and other sinks are implemented. - // The documentation of `process::drain` also provides more information on the + // The documentation of `reproc::drain` also provides more information on the // requirements a sink should fulfill. // By default, The `process` destructor waits indefinitely for the child diff --git a/reprocxx/include/reproc++/reproc.hpp b/reprocxx/include/reproc++/reproc.hpp index 94dc118c4..2de6e5736 100644 --- a/reprocxx/include/reproc++/reproc.hpp +++ b/reprocxx/include/reproc++/reproc.hpp @@ -95,24 +95,6 @@ class process { size_t size, reproc::milliseconds timeout = reproc::infinite) noexcept; - /*! - `reproc_drain` but takes a lambda as its argument instead of a function and - context pointer. Defaults to waiting indefinitely for each read to complete. - - Unlike `reproc_drain`, it is not possible to pass `NULL` sinks to this method. - Instead, use `sink::discard` which has the same effect. - - `sink` expects the following signature: - - ```c++ - bool sink(stream stream, const uint8_t *buffer, size_t size); - ``` - */ - template - std::error_code drain(Sink &&out, - Sink &&err, - reproc::milliseconds timeout = reproc::infinite); - /*! reproc_write` but defaults to waiting indefinitely for each write to complete. */ REPROCXX_EXPORT std::error_code @@ -138,40 +120,4 @@ class process { std::unique_ptr process_; }; -template -std::error_code -process::drain(Sink &&out, Sink &&err, reproc::milliseconds timeout) -{ - static constexpr uint8_t initial = 0; - - // A single call to `read` might contain multiple messages. By always calling - // both sinks once with no data before reading, we give them the chance to - // process all previous output before reading from the child process again. - if (!out(stream::in, &initial, 0) || !err(stream::in, &initial, 0)) { - return {}; - } - - std::array buffer = {}; - std::error_code ec; - - while (true) { - stream stream = {}; - size_t bytes_read = 0; - std::tie(stream, bytes_read, ec) = read(buffer.data(), buffer.size(), - timeout); - if (ec) { - break; - } - - auto &sink = stream == stream::out ? out : err; - - // `sink` returns false to tell us to stop reading. - if (!sink(stream, buffer.data(), bytes_read)) { - break; - } - } - - return ec == error::broken_pipe ? std::error_code() : ec; -} - } diff --git a/reprocxx/include/reproc++/sink.hpp b/reprocxx/include/reproc++/sink.hpp index 509126735..5b2859ec4 100644 --- a/reprocxx/include/reproc++/sink.hpp +++ b/reprocxx/include/reproc++/sink.hpp @@ -8,6 +8,55 @@ #include namespace reproc { + +/*! +`reproc_drain` but takes lambdas as sinks and defaults to waiting indefinitely +for each read to complete. + +`out` and `err` expect the following signature: + +```c++ +bool sink(stream stream, const uint8_t *buffer, size_t size); +``` +*/ +template +std::error_code drain(process &process, + Out &&out, + Err &&err, + reproc::milliseconds timeout = reproc::infinite) +{ + static constexpr uint8_t initial = 0; + + // A single call to `read` might contain multiple messages. By always calling + // both sinks once with no data before reading, we give them the chance to + // process all previous output before reading from the child process again. + if (!out(stream::in, &initial, 0) || !err(stream::in, &initial, 0)) { + return {}; + } + + std::array buffer = {}; + std::error_code ec; + + while (true) { + stream stream = {}; + size_t bytes_read = 0; + std::tie(stream, bytes_read, ec) = process.read(buffer.data(), + buffer.size(), timeout); + if (ec) { + break; + } + + auto &sink = stream == stream::out ? out : err; + + // `sink` returns false to tell us to stop reading. + if (!sink(stream, buffer.data(), bytes_read)) { + break; + } + } + + return ec == error::broken_pipe ? std::error_code() : ec; +} + namespace sink { /*! Reads all output into `string`. */