From 55eb2834e2bd826f7a937a94b776e2741e6632bd Mon Sep 17 00:00:00 2001 From: Daan De Meyer Date: Sat, 4 Jan 2020 16:51:11 +0100 Subject: [PATCH] reproc(++): Support passing timeouts to I/O functions/methods. A sofisticated process cleanup mechanism isn't exactly useful if users are blocked on reading/writing to the child process standard streams. By adding timeouts, users can choose to terminate a program if it takes too long to produce any output or accept input. --- CHANGELOG.md | 11 +++ reproc/examples/drain.c | 2 +- reproc/examples/git-status.c | 2 +- reproc/include/reproc/reproc.h | 37 ++++++++-- reproc/src/pipe.h | 6 +- reproc/src/posix/pipe.c | 45 ++++++++---- reproc/src/posix/process.c | 8 +- reproc/src/process.h | 2 +- reproc/src/reproc.c | 36 +++++---- reproc/src/windows/pipe.c | 106 ++++++++++++++++----------- reproc/src/windows/process.c | 5 +- reproc/test/argv.c | 2 +- reproc/test/environment.c | 2 +- reproc/test/io.c | 33 ++++++++- reproc/test/overflow.c | 2 +- reproc/test/working-directory.c | 2 +- reprocxx/include/reproc++/reproc.hpp | 33 ++++++--- reprocxx/src/reproc.cpp | 15 ++-- 18 files changed, 230 insertions(+), 119 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9729a5927..5a5e08323 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -200,6 +200,12 @@ flexibility. To use a single sink for both output streams, simply pass the same sink to both the `out` and `err` arguments of `reproc_drain`. +- Support passing timeouts to `reproc_read`, `reproc_write` and `reproc_drain`. + + Pass `REPROC_INFINITE` as the timeout to retain the old behaviour. + +- Use `int` to represent timeout values. + ### reproc++ - Remove `process::parse`, `process::exit_status` and `process::running`. @@ -244,6 +250,11 @@ - Modify all included sinks to support the new `process::drain` behaviour. +- Support passing timeouts to `process::read`, `process::write` and + `process::drain`. + + The methods default to waiting indefinitely which matches their old behaviour. + ### CMake - Drop required CMake version to CMake 3.12. diff --git a/reproc/examples/drain.c b/reproc/examples/drain.c index 7cef0f288..520fb58b4 100644 --- a/reproc/examples/drain.c +++ b/reproc/examples/drain.c @@ -38,7 +38,7 @@ int main(void) // more than one parameter, simply store the parameters in a struct and pass // the address of the struct as the `context` parameter. reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink); + r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); if (r < 0) { goto cleanup; } diff --git a/reproc/examples/git-status.c b/reproc/examples/git-status.c index 16ff88268..a52d4e6e8 100644 --- a/reproc/examples/git-status.c +++ b/reproc/examples/git-status.c @@ -56,7 +56,7 @@ int main(void) // stdout and stderr output in the same string, we pass `NULL` since we // don't need to know which stream was read from. uint8_t buffer[4096]; - r = reproc_read(process, NULL, buffer, sizeof(buffer)); + r = reproc_read(process, NULL, buffer, sizeof(buffer), REPROC_INFINITE); if (r < 0) { break; } diff --git a/reproc/include/reproc/reproc.h b/reproc/include/reproc/reproc.h index faef7b1c1..3ff01085f 100644 --- a/reproc/include/reproc/reproc.h +++ b/reproc/include/reproc/reproc.h @@ -33,7 +33,7 @@ REPROC_EXPORT extern const int REPROC_SIGKILL; REPROC_EXPORT extern const int REPROC_SIGTERM; /*! Tells a function that takes a timeout value to wait indefinitely. */ -REPROC_EXPORT extern const unsigned int REPROC_INFINITE; +REPROC_EXPORT extern const int REPROC_INFINITE; /*! Stream identifiers used to indicate which stream to act on. */ typedef enum { @@ -69,7 +69,7 @@ typedef enum { typedef struct reproc_stop_action { REPROC_STOP action; - unsigned int timeout; + int timeout; } reproc_stop_action; typedef struct reproc_stop_actions { @@ -170,16 +170,22 @@ stderr stream and returns the amount of bytes read. If `stream` is not `NULL`, it is used to store the stream that was read from (`REPROC_STREAM_OUT` or `REPROC_STREAM_ERR`). +If no output stream is closed or read from within the given timeout, this +function returns `REPROC_ETIMEDOUT`. If one of the output streams is closed, +`timeout` is reset before waiting again for the other stream. + If both streams are closed by the child process or weren't opened with `REPROC_REDIRECT_PIPE`, this function returns `REPROC_EPIPE`. Actionable errors: - `REPROC_EPIPE` +- `REPROC_ETIMEDOUT` */ REPROC_EXPORT int reproc_read(reproc_t *process, REPROC_STREAM *stream, uint8_t *buffer, - size_t size); + size_t size, + int timeout); /*! Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of @@ -194,18 +200,30 @@ discarded. `stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all output from the previous call to `reproc_drain` one by one. +Each call to `reproc_read` is passed the given timeout. If a call to +`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`. + Note that his function returns 0 instead of `REPROC_EPIPE` when both output streams of the child process are closed. For examples of sinks, see `sink.h`. + +Actionable errors: +- `REPROC_ETIMEDOUT` */ -REPROC_EXPORT int -reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err); +REPROC_EXPORT int reproc_drain(reproc_t *process, + reproc_sink *out, + reproc_sink *err, + int timeout); /*! Writes `size` bytes from `buffer` to the standard input (stdin) of the child process. +If no data can be written within the given timeout, this function returns +`REPROC_ETIMEDOUT`. After writing some data, `timeout` is reset before trying to +write again. + (POSIX) By default, writing to a closed stdin pipe terminates the parent process with the `SIGPIPE` signal. `reproc_write` will only return `REPROC_EPIPE` if this signal is ignored by the parent process. @@ -216,9 +234,12 @@ the redirect options passed to `reproc_start`. Actionable errors: - `REPROC_EPIPE` +- `REPROC_ETIMEDOUT` */ -REPROC_EXPORT int -reproc_write(reproc_t *process, const uint8_t *buffer, size_t size); +REPROC_EXPORT int reproc_write(reproc_t *process, + const uint8_t *buffer, + size_t size, + int timeout); /*! Closes the child process standard stream indicated by `stream`. @@ -241,7 +262,7 @@ wait indefinitely for the child process to exit. Actionable errors: - `REPROC_ETIMEDOUT` */ -REPROC_EXPORT int reproc_wait(reproc_t *process, unsigned int timeout); +REPROC_EXPORT int reproc_wait(reproc_t *process, int timeout); /*! Sends the `SIGTERM` signal (POSIX) or the `CTRL-BREAK` signal (Windows) to the diff --git a/reproc/src/pipe.h b/reproc/src/pipe.h index b2aa5a9ee..378be7809 100644 --- a/reproc/src/pipe.h +++ b/reproc/src/pipe.h @@ -24,8 +24,10 @@ int pipe_read(handle pipe, uint8_t *buffer, size_t size); // Writes up to `size` bytes from `buffer` to the pipe indicated by `pipe` and // returns the amount of bytes written. -int pipe_write(handle pipe, const uint8_t *buffer, size_t size); +int pipe_write(handle pipe, const uint8_t *buffer, size_t size, int timeout); // Stores the value of the first of `out` and `ready` that will have data // available to read in `ready`. -int pipe_wait(handle out, handle err, handle *ready); +// +// Returns `REPROC_EPIPE` if both `out` and `err` are invalid handles. +int pipe_wait(handle out, handle err, handle *ready, int timeout); diff --git a/reproc/src/posix/pipe.c b/reproc/src/posix/pipe.c index e43bf8bd2..cf499a742 100644 --- a/reproc/src/posix/pipe.c +++ b/reproc/src/posix/pipe.c @@ -84,34 +84,51 @@ int pipe_read(int pipe, uint8_t *buffer, size_t size) return error_unify_or_else((int) r, (int) r); } -int pipe_write(int pipe, const uint8_t *buffer, size_t size) +int pipe_write(int pipe, const uint8_t *buffer, size_t size, int timeout) { assert(pipe != HANDLE_INVALID); assert(buffer); - ssize_t r = write(pipe, buffer, size); - assert(r <= INT_MAX); + struct pollfd pollfd = { .fd = pipe, .events = POLLOUT }; + int r = -1; - return error_unify_or_else((int) r, (int) r); -} + r = poll(&pollfd, 1, timeout); + if (r <= 0) { + r = r == 0 ? -ETIMEDOUT : r; + return error_unify(r); + } -static const int POLL_INFINITE = -1; + r = (int) write(pipe, buffer, size); + + return error_unify_or_else(r, r); +} -int pipe_wait(int out, int err, int *ready) +int pipe_wait(int out, int err, int *ready, int timeout) { - assert(out != HANDLE_INVALID); - assert(err != HANDLE_INVALID); assert(ready); - struct pollfd pollfds[2] = { { .fd = out, .events = POLLIN }, - { .fd = err, .events = POLLIN } }; + struct pollfd pollfds[2] = { { 0 }, { 0 } }; + nfds_t num_pollfds = 0; - int r = poll(pollfds, ARRAY_SIZE(pollfds), POLL_INFINITE); - if (r < 0) { + if (out != HANDLE_INVALID) { + pollfds[num_pollfds++] = (struct pollfd){ .fd = out, .events = POLLIN }; + } + + if (err != HANDLE_INVALID) { + pollfds[num_pollfds++] = (struct pollfd){ .fd = err, .events = POLLIN }; + } + + if (num_pollfds == 0) { + return -EPIPE; + } + + int r = poll(pollfds, num_pollfds, timeout); + if (r <= 0) { + r = r == 0 ? -ETIMEDOUT : r; return error_unify(r); } - for (size_t i = 0; i < ARRAY_SIZE(pollfds); i++) { + for (size_t i = 0; i < num_pollfds; i++) { struct pollfd pollfd = pollfds[i]; if (pollfd.revents > 0) { diff --git a/reproc/src/posix/process.c b/reproc/src/posix/process.c index 0658c69c0..57978eb9b 100644 --- a/reproc/src/posix/process.c +++ b/reproc/src/posix/process.c @@ -14,10 +14,6 @@ #include #include -// Including the entire reproc.h header is overkill so we import only the -// constant we need. -extern const unsigned int REPROC_INFINITE; - static int signal_mask(int how, const sigset_t *newmask, sigset_t *oldmask) { int r = -1; @@ -350,14 +346,14 @@ static int parse_status(int status) return WIFEXITED(status) ? WEXITSTATUS(status) : WTERMSIG(status) + UINT8_MAX; } -int process_wait(pid_t process, unsigned int timeout) +int process_wait(pid_t process, int timeout) { assert(process != HANDLE_INVALID); int status = 0; int r = -1; - if (timeout == 0 || timeout == REPROC_INFINITE) { + if (timeout <= 0) { r = waitpid(process, &status, timeout == 0 ? WNOHANG : 0); if (r >= 0) { diff --git a/reproc/src/process.h b/reproc/src/process.h index 4848e2546..f7bef5889 100644 --- a/reproc/src/process.h +++ b/reproc/src/process.h @@ -32,7 +32,7 @@ int process_create(handle *process, // // If `timeout` is `REPROC_INFINITE`, this function waits indefinitely for a // process to exit. -int process_wait(handle process, unsigned int timeout); +int process_wait(handle process, int timeout); // Sends the `SIGTERM` (POSIX) or `CTRL-BREAK` (Windows) signal to the process // indicated by `process`. diff --git a/reproc/src/reproc.c b/reproc/src/reproc.c index 8ff19befb..0a91ba13e 100644 --- a/reproc/src/reproc.c +++ b/reproc/src/reproc.c @@ -22,7 +22,7 @@ enum { STATUS_NOT_STARTED = -2, STATUS_IN_PROGRESS = -1 }; const int REPROC_SIGKILL = UINT8_MAX + 9; const int REPROC_SIGTERM = UINT8_MAX + 15; -const unsigned int REPROC_INFINITE = 0xFFFFFFFF; // == `INFINITE` on Windows +const int REPROC_INFINITE = -1; static int redirect(handle *parent, handle *child, @@ -152,7 +152,8 @@ int reproc_start(reproc_t *process, int reproc_read(reproc_t *process, REPROC_STREAM *stream, uint8_t *buffer, - size_t size) + size_t size, + int timeout) { assert_return(process, REPROC_EINVAL); assert_return(buffer, REPROC_EINVAL); @@ -163,18 +164,9 @@ int reproc_read(reproc_t *process, // Get the first pipe that will have data available to be read. handle ready = HANDLE_INVALID; - if (process->stdio.out == HANDLE_INVALID && - process->stdio.err == HANDLE_INVALID) { - return REPROC_EPIPE; - } else if (process->stdio.out == HANDLE_INVALID) { - ready = process->stdio.err; - } else if (process->stdio.err == HANDLE_INVALID) { - ready = process->stdio.out; - } else { - r = pipe_wait(process->stdio.out, process->stdio.err, &ready); - if (r < 0) { - return r; - } + r = pipe_wait(process->stdio.out, process->stdio.err, &ready, timeout); + if (r < 0) { + return r; } r = pipe_read(ready, buffer, size); @@ -203,7 +195,10 @@ int reproc_read(reproc_t *process, return r; // bytes read } -int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err) +int reproc_drain(reproc_t *process, + reproc_sink *out, + reproc_sink *err, + int timeout) { assert_return(process, REPROC_EINVAL); @@ -223,7 +218,7 @@ int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err) while (true) { REPROC_STREAM stream = { 0 }; - r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer)); + r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer), timeout); if (r < 0) { break; } @@ -241,7 +236,10 @@ int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err) return r == REPROC_EPIPE ? 0 : r; } -int reproc_write(reproc_t *process, const uint8_t *buffer, size_t size) +int reproc_write(reproc_t *process, + const uint8_t *buffer, + size_t size, + int timeout) { assert_return(process, REPROC_EINVAL); assert_return(buffer, REPROC_EINVAL); @@ -253,7 +251,7 @@ int reproc_write(reproc_t *process, const uint8_t *buffer, size_t size) int r = -1; do { - r = pipe_write(process->stdio.in, buffer, size); + r = pipe_write(process->stdio.in, buffer, size, timeout); if (r == REPROC_EPIPE) { process->stdio.in = handle_destroy(process->stdio.in); @@ -293,7 +291,7 @@ int reproc_close(reproc_t *process, REPROC_STREAM stream) return 0; } -int reproc_wait(reproc_t *process, unsigned int timeout) +int reproc_wait(reproc_t *process, int timeout) { assert_return(process, REPROC_EINVAL); assert_return(process->status != STATUS_NOT_STARTED, REPROC_EINVAL); diff --git a/reproc/src/windows/pipe.c b/reproc/src/windows/pipe.c index 99df488dc..46026fc40 100644 --- a/reproc/src/windows/pipe.c +++ b/reproc/src/windows/pipe.c @@ -93,30 +93,22 @@ int pipe_read(HANDLE pipe, uint8_t *buffer, size_t size) int r = 0; OVERLAPPED overlapped = { 0 }; - overlapped.hEvent = CreateEvent(&HANDLE_DO_NOT_INHERIT, true, false, NULL); - if (overlapped.hEvent == NULL) { - goto cleanup; - } - r = ReadFile(pipe, buffer, (DWORD) size, NULL, &overlapped); if (r == 0 && GetLastError() != ERROR_IO_PENDING) { - goto cleanup; + return error_unify(r); } - r = GetOverlappedResult(pipe, &overlapped, &bytes_read, true); + r = GetOverlappedResultEx(pipe, &overlapped, &bytes_read, INFINITE, false); if (r == 0) { - goto cleanup; + return error_unify(r); } assert(bytes_read <= INT_MAX); -cleanup: - handle_destroy(overlapped.hEvent); - return error_unify_or_else(r, (int) bytes_read); } -int pipe_write(HANDLE pipe, const uint8_t *buffer, size_t size) +int pipe_write(HANDLE pipe, const uint8_t *buffer, size_t size, int timeout) { assert(pipe && pipe != HANDLE_INVALID); assert(buffer); @@ -126,38 +118,58 @@ int pipe_write(HANDLE pipe, const uint8_t *buffer, size_t size) int r = 0; OVERLAPPED overlapped = { 0 }; - overlapped.hEvent = CreateEvent(&HANDLE_DO_NOT_INHERIT, true, false, NULL); - if (overlapped.hEvent == NULL) { - goto cleanup; - } - r = WriteFile(pipe, buffer, (DWORD) size, NULL, &overlapped); if (r == 0 && GetLastError() != ERROR_IO_PENDING) { - goto cleanup; + return error_unify(r); + } + + r = GetOverlappedResultEx(pipe, &overlapped, &bytes_written, + timeout < 0 ? INFINITE : (DWORD) timeout, false); + if (r == 0 && GetLastError() != WAIT_TIMEOUT) { + return error_unify(r); } - r = GetOverlappedResult(pipe, &overlapped, &bytes_written, true); if (r == 0) { - goto cleanup; + // The timeout expired. Cancel the ongoing write. + r = CancelIo(pipe); + assert_unused(r != 0); + + // Check if the write was actually cancelled. + r = GetOverlappedResult(pipe, &overlapped, &bytes_written, true); + + // The write might have completed before it was cancelled so only error if + // the operation was actually aborted. + if (r == 0 && GetLastError() == ERROR_OPERATION_ABORTED) { + r = -WAIT_TIMEOUT; + } } assert(bytes_written <= INT_MAX); -cleanup: - handle_destroy(overlapped.hEvent); - return error_unify_or_else(r, (int) bytes_written); } -int pipe_wait(HANDLE out, HANDLE err, HANDLE *ready) +int pipe_wait(HANDLE out, HANDLE err, HANDLE *ready, int timeout) { - assert(out && out != HANDLE_INVALID); - assert(err && err != HANDLE_INVALID); assert(ready); - HANDLE pipes[2] = { out, err }; - OVERLAPPED overlapped[2] = { { 0 }, { 0 } }; - HANDLE events[2] = { HANDLE_INVALID, HANDLE_INVALID }; + HANDLE pipes[2] = { HANDLE_INVALID, HANDLE_INVALID }; + DWORD num_pipes = 0; + + if (out != HANDLE_INVALID) { + pipes[num_pipes++] = out; + } + + if (err != HANDLE_INVALID) { + pipes[num_pipes++] = err; + } + + if (num_pipes == 0) { + return -ERROR_BROKEN_PIPE; + } + + OVERLAPPED overlapped[ARRAY_SIZE(pipes)] = { { 0 }, { 0 } }; + HANDLE events[ARRAY_SIZE(pipes)] = { HANDLE_INVALID, HANDLE_INVALID }; int r = 0; // We emulate POSIX `poll` by issuing overlapped zero-sized reads and waiting @@ -165,7 +177,7 @@ int pipe_wait(HANDLE out, HANDLE err, HANDLE *ready) // multiprocessing module `wait` implementation: // https://github.com/python/cpython/blob/10ecbadb799ddf3393d1fc80119a3db14724d381/Lib/multiprocessing/connection.py#L826 - for (size_t i = 0; i < ARRAY_SIZE(overlapped); i++) { + for (size_t i = 0; i < num_pipes; i++) { overlapped[i].hEvent = CreateEvent(&HANDLE_DO_NOT_INHERIT, true, false, NULL); if (overlapped[i].hEvent == NULL) { @@ -189,11 +201,16 @@ int pipe_wait(HANDLE out, HANDLE err, HANDLE *ready) events[i] = overlapped[i].hEvent; } - DWORD result = WaitForMultipleObjects(ARRAY_SIZE(events), events, false, - INFINITE); - // We don't expect `WAIT_ABANDONED_0` or `WAIT_TIMEOUT` to be valid here. - assert(result < WAIT_ABANDONED_0); - assert(result != WAIT_TIMEOUT); + DWORD result = WaitForMultipleObjects(num_pipes, events, false, + timeout < 0 ? INFINITE + : (DWORD) timeout); + // We don't expect `WAIT_ABANDONED_0` to be valid here. + assert(result != WAIT_ABANDONED_0); + + if (result == WAIT_TIMEOUT) { + r = -WAIT_TIMEOUT; + goto cleanup; + } if (result == WAIT_FAILED) { r = 0; @@ -204,14 +221,17 @@ int pipe_wait(HANDLE out, HANDLE err, HANDLE *ready) *ready = pipes[result]; r = 1; -cleanup:; - PROTECT_SYSTEM_ERROR; - - for (size_t i = 0; i < ARRAY_SIZE(overlapped); i++) { - +cleanup: + for (size_t i = 0; i < num_pipes; i++) { // Cancel any remaining zero-sized reads that we queued if they have not yet // completed. + if (events[i] == HANDLE_INVALID) { + continue; + } + + PROTECT_SYSTEM_ERROR; + r = CancelIo(pipes[i]); assert_unused(r != 0 || GetLastError() == ERROR_NOT_FOUND); @@ -226,11 +246,11 @@ cleanup:; r = GetOverlappedResult(pipes[i], &overlapped[i], &bytes_transferred, true); assert_unused(r != 0 || (GetLastError() == ERROR_OPERATION_ABORTED || GetLastError() == ERROR_BROKEN_PIPE)); - } - UNPROTECT_SYSTEM_ERROR; + UNPROTECT_SYSTEM_ERROR; + } - for (size_t i = 0; i < ARRAY_SIZE(overlapped); i++) { + for (size_t i = 0; i < num_pipes; i++) { handle_destroy(overlapped[i].hEvent); } diff --git a/reproc/src/windows/process.c b/reproc/src/windows/process.c index 196542b18..246300bc8 100644 --- a/reproc/src/windows/process.c +++ b/reproc/src/windows/process.c @@ -386,13 +386,14 @@ int process_create(HANDLE *process, return error_unify(r); } -int process_wait(HANDLE process, unsigned int timeout) +int process_wait(HANDLE process, int timeout) { assert(process); int r = 0; - r = (int) WaitForSingleObject(process, timeout); + r = (int) WaitForSingleObject(process, + timeout < 0 ? INFINITE : (DWORD) timeout); if (r == WAIT_TIMEOUT) { return -WAIT_TIMEOUT; } diff --git a/reproc/test/argv.c b/reproc/test/argv.c index f509ea6ab..8c4245f64 100644 --- a/reproc/test/argv.c +++ b/reproc/test/argv.c @@ -19,7 +19,7 @@ int main(void) char *output = NULL; reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink); + r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); assert(r == 0); assert(output != NULL); diff --git a/reproc/test/environment.c b/reproc/test/environment.c index 73de78db8..a93aed975 100644 --- a/reproc/test/environment.c +++ b/reproc/test/environment.c @@ -19,7 +19,7 @@ int main(void) char *output = NULL; reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink); + r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); assert(r == 0); assert(output != NULL); diff --git a/reproc/test/io.c b/reproc/test/io.c index 0f55febb6..77beae421 100644 --- a/reproc/test/io.c +++ b/reproc/test/io.c @@ -16,7 +16,7 @@ static int io(const char *mode, const char *input, const char *expected) r = reproc_start(process, argv, (reproc_options){ 0 }); assert(r == 0); - r = reproc_write(process, (uint8_t *) input, strlen(input)); + r = reproc_write(process, (uint8_t *) input, strlen(input), REPROC_INFINITE); assert(r == 0); r = reproc_close(process, REPROC_STREAM_IN); @@ -24,7 +24,7 @@ static int io(const char *mode, const char *input, const char *expected) char *output = NULL; reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink); + r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); assert(r == 0); assert(output != NULL); @@ -39,6 +39,33 @@ static int io(const char *mode, const char *input, const char *expected) return 0; } +static int timeout(void) +{ + int r = -1; + + reproc_t *process = reproc_new(); + assert(process); + + const char *argv[3] = { RESOURCE_DIRECTORY "/io", "stdout", NULL }; + + r = reproc_start(process, argv, (reproc_options){ 0 }); + assert(r == 0); + + uint8_t buffer = 0; + r = reproc_read(process, NULL, &buffer, sizeof(buffer), 200); + assert(r == REPROC_ETIMEDOUT); + + r = reproc_close(process, REPROC_STREAM_IN); + assert(r == 0); + + r = reproc_read(process, NULL, &buffer, sizeof(buffer), REPROC_INFINITE); + assert(r == REPROC_EPIPE); + + reproc_destroy(process); + + return 0; +} + #define MESSAGE "reproc stands for REdirected PROCess" int main(void) @@ -47,5 +74,7 @@ int main(void) io("stderr", MESSAGE, MESSAGE); io("both", MESSAGE, MESSAGE MESSAGE); + timeout(); + return 0; } diff --git a/reproc/test/overflow.c b/reproc/test/overflow.c index fe2c4d4ea..24b39fb1b 100644 --- a/reproc/test/overflow.c +++ b/reproc/test/overflow.c @@ -18,7 +18,7 @@ int main(void) char *output = NULL; reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink); + r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); assert(r >= 0); assert(output != NULL); diff --git a/reproc/test/working-directory.c b/reproc/test/working-directory.c index 08e57489a..2c8d8227e 100644 --- a/reproc/test/working-directory.c +++ b/reproc/test/working-directory.c @@ -26,7 +26,7 @@ int main(void) char *output = NULL; reproc_sink sink = { reproc_sink_string, &output }; - r = reproc_drain(process, &sink, &sink); + r = reproc_drain(process, &sink, &sink, REPROC_INFINITE); assert(r == 0); replace(output, '\\', '/'); diff --git a/reprocxx/include/reproc++/reproc.hpp b/reprocxx/include/reproc++/reproc.hpp index 65a782cde..94dc118c4 100644 --- a/reprocxx/include/reproc++/reproc.hpp +++ b/reprocxx/include/reproc++/reproc.hpp @@ -33,9 +33,9 @@ REPROCXX_EXPORT extern const int terminate; } -/*! Timeout values are passed as `reproc::milliseconds` instead of `unsigned -int` in reproc++. */ -using milliseconds = std::chrono::duration; +/*! Timeout values are passed as `reproc::milliseconds` instead of `int` in +reproc++. */ +using milliseconds = std::chrono::duration; enum class redirect { pipe, inherit, discard }; @@ -88,13 +88,16 @@ class process { REPROCXX_EXPORT std::error_code start(const arguments &arguments, const options &options = {}) noexcept; - /*! `reproc_read` but returns a tuple of (stream, bytes read, error). */ + /*! `reproc_read` but returns a tuple of (stream, bytes read, error) and + defaults to waiting indefinitely for each read to complete.*/ REPROCXX_EXPORT std::tuple - read(uint8_t *buffer, size_t size) noexcept; + read(uint8_t *buffer, + size_t size, + reproc::milliseconds timeout = reproc::infinite) noexcept; /*! `reproc_drain` but takes a lambda as its argument instead of a function and - context pointer. + context pointer. Defaults to waiting indefinitely for each read to complete. Unlike `reproc_drain`, it is not possible to pass `NULL` sinks to this method. Instead, use `sink::discard` which has the same effect. @@ -106,10 +109,16 @@ class process { ``` */ template - std::error_code drain(Sink &&out, Sink &&err); + std::error_code drain(Sink &&out, + Sink &&err, + reproc::milliseconds timeout = reproc::infinite); - REPROCXX_EXPORT std::error_code write(const uint8_t *buffer, - size_t size) noexcept; + /*! reproc_write` but defaults to waiting indefinitely for each write to + complete. */ + REPROCXX_EXPORT std::error_code + write(const uint8_t *buffer, + size_t size, + reproc::milliseconds timeout = reproc::infinite) noexcept; REPROCXX_EXPORT std::error_code close(stream stream) noexcept; @@ -130,7 +139,8 @@ class process { }; template -std::error_code process::drain(Sink &&out, Sink &&err) +std::error_code +process::drain(Sink &&out, Sink &&err, reproc::milliseconds timeout) { static constexpr uint8_t initial = 0; @@ -147,7 +157,8 @@ std::error_code process::drain(Sink &&out, Sink &&err) while (true) { stream stream = {}; size_t bytes_read = 0; - std::tie(stream, bytes_read, ec) = read(buffer.data(), buffer.size()); + std::tie(stream, bytes_read, ec) = read(buffer.data(), buffer.size(), + timeout); if (ec) { break; } diff --git a/reprocxx/src/reproc.cpp b/reprocxx/src/reproc.cpp index 0749c0a6d..c060e69de 100644 --- a/reprocxx/src/reproc.cpp +++ b/reprocxx/src/reproc.cpp @@ -59,20 +59,25 @@ std::error_code process::start(const arguments &arguments, }; int r = reproc_start(process_.get(), arguments.data(), reproc_options); + return error_code_from(r); } -std::tuple process::read(uint8_t *buffer, - size_t size) noexcept +std::tuple +process::read(uint8_t *buffer, + size_t size, + reproc::milliseconds timeout) noexcept { REPROC_STREAM stream = {}; - int r = reproc_read(process_.get(), &stream, buffer, size); + int r = reproc_read(process_.get(), &stream, buffer, size, timeout.count()); return { static_cast(stream), r, error_code_from(r) }; } -std::error_code process::write(const uint8_t *buffer, size_t size) noexcept +std::error_code process::write(const uint8_t *buffer, + size_t size, + reproc::milliseconds timeout) noexcept { - int r = reproc_write(process_.get(), buffer, size); + int r = reproc_write(process_.get(), buffer, size, timeout.count()); return error_code_from(r); }