Skip to content

Commit

Permalink
reproc(++): Support passing timeouts to I/O functions/methods.
Browse files Browse the repository at this point in the history
A sofisticated process cleanup mechanism isn't exactly useful if users
are blocked on reading/writing to the child process standard streams. By
adding timeouts, users can choose to terminate a program if it takes too
long to produce any output or accept input.
  • Loading branch information
DaanDeMeyer committed Jan 4, 2020
1 parent 698a166 commit 55eb283
Show file tree
Hide file tree
Showing 18 changed files with 230 additions and 119 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,12 @@
flexibility. To use a single sink for both output streams, simply pass the
same sink to both the `out` and `err` arguments of `reproc_drain`.

- Support passing timeouts to `reproc_read`, `reproc_write` and `reproc_drain`.

Pass `REPROC_INFINITE` as the timeout to retain the old behaviour.

- Use `int` to represent timeout values.

### reproc++

- Remove `process::parse`, `process::exit_status` and `process::running`.
Expand Down Expand Up @@ -244,6 +250,11 @@

- Modify all included sinks to support the new `process::drain` behaviour.

- Support passing timeouts to `process::read`, `process::write` and
`process::drain`.

The methods default to waiting indefinitely which matches their old behaviour.

### CMake

- Drop required CMake version to CMake 3.12.
Expand Down
2 changes: 1 addition & 1 deletion reproc/examples/drain.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ int main(void)
// more than one parameter, simply store the parameters in a struct and pass
// the address of the struct as the `context` parameter.
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
r = reproc_drain(process, &sink, &sink, REPROC_INFINITE);
if (r < 0) {
goto cleanup;
}
Expand Down
2 changes: 1 addition & 1 deletion reproc/examples/git-status.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ int main(void)
// stdout and stderr output in the same string, we pass `NULL` since we
// don't need to know which stream was read from.
uint8_t buffer[4096];
r = reproc_read(process, NULL, buffer, sizeof(buffer));
r = reproc_read(process, NULL, buffer, sizeof(buffer), REPROC_INFINITE);
if (r < 0) {
break;
}
Expand Down
37 changes: 29 additions & 8 deletions reproc/include/reproc/reproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ REPROC_EXPORT extern const int REPROC_SIGKILL;
REPROC_EXPORT extern const int REPROC_SIGTERM;

/*! Tells a function that takes a timeout value to wait indefinitely. */
REPROC_EXPORT extern const unsigned int REPROC_INFINITE;
REPROC_EXPORT extern const int REPROC_INFINITE;

/*! Stream identifiers used to indicate which stream to act on. */
typedef enum {
Expand Down Expand Up @@ -69,7 +69,7 @@ typedef enum {

typedef struct reproc_stop_action {
REPROC_STOP action;
unsigned int timeout;
int timeout;
} reproc_stop_action;

typedef struct reproc_stop_actions {
Expand Down Expand Up @@ -170,16 +170,22 @@ stderr stream and returns the amount of bytes read.
If `stream` is not `NULL`, it is used to store the stream that was
read from (`REPROC_STREAM_OUT` or `REPROC_STREAM_ERR`).
If no output stream is closed or read from within the given timeout, this
function returns `REPROC_ETIMEDOUT`. If one of the output streams is closed,
`timeout` is reset before waiting again for the other stream.
If both streams are closed by the child process or weren't opened with
`REPROC_REDIRECT_PIPE`, this function returns `REPROC_EPIPE`.
Actionable errors:
- `REPROC_EPIPE`
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int reproc_read(reproc_t *process,
REPROC_STREAM *stream,
uint8_t *buffer,
size_t size);
size_t size,
int timeout);

/*!
Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of
Expand All @@ -194,18 +200,30 @@ discarded.
`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all
output from the previous call to `reproc_drain` one by one.
Each call to `reproc_read` is passed the given timeout. If a call to
`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`.
Note that his function returns 0 instead of `REPROC_EPIPE` when both output
streams of the child process are closed.
For examples of sinks, see `sink.h`.
Actionable errors:
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int
reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err);
REPROC_EXPORT int reproc_drain(reproc_t *process,
reproc_sink *out,
reproc_sink *err,
int timeout);

/*!
Writes `size` bytes from `buffer` to the standard input (stdin) of the child
process.
If no data can be written within the given timeout, this function returns
`REPROC_ETIMEDOUT`. After writing some data, `timeout` is reset before trying to
write again.
(POSIX) By default, writing to a closed stdin pipe terminates the parent process
with the `SIGPIPE` signal. `reproc_write` will only return `REPROC_EPIPE` if
this signal is ignored by the parent process.
Expand All @@ -216,9 +234,12 @@ the redirect options passed to `reproc_start`.
Actionable errors:
- `REPROC_EPIPE`
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int
reproc_write(reproc_t *process, const uint8_t *buffer, size_t size);
REPROC_EXPORT int reproc_write(reproc_t *process,
const uint8_t *buffer,
size_t size,
int timeout);

/*!
Closes the child process standard stream indicated by `stream`.
Expand All @@ -241,7 +262,7 @@ wait indefinitely for the child process to exit.
Actionable errors:
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int reproc_wait(reproc_t *process, unsigned int timeout);
REPROC_EXPORT int reproc_wait(reproc_t *process, int timeout);

/*!
Sends the `SIGTERM` signal (POSIX) or the `CTRL-BREAK` signal (Windows) to the
Expand Down
6 changes: 4 additions & 2 deletions reproc/src/pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ int pipe_read(handle pipe, uint8_t *buffer, size_t size);

// Writes up to `size` bytes from `buffer` to the pipe indicated by `pipe` and
// returns the amount of bytes written.
int pipe_write(handle pipe, const uint8_t *buffer, size_t size);
int pipe_write(handle pipe, const uint8_t *buffer, size_t size, int timeout);

// Stores the value of the first of `out` and `ready` that will have data
// available to read in `ready`.
int pipe_wait(handle out, handle err, handle *ready);
//
// Returns `REPROC_EPIPE` if both `out` and `err` are invalid handles.
int pipe_wait(handle out, handle err, handle *ready, int timeout);
45 changes: 31 additions & 14 deletions reproc/src/posix/pipe.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,34 +84,51 @@ int pipe_read(int pipe, uint8_t *buffer, size_t size)
return error_unify_or_else((int) r, (int) r);
}

int pipe_write(int pipe, const uint8_t *buffer, size_t size)
int pipe_write(int pipe, const uint8_t *buffer, size_t size, int timeout)
{
assert(pipe != HANDLE_INVALID);
assert(buffer);

ssize_t r = write(pipe, buffer, size);
assert(r <= INT_MAX);
struct pollfd pollfd = { .fd = pipe, .events = POLLOUT };
int r = -1;

return error_unify_or_else((int) r, (int) r);
}
r = poll(&pollfd, 1, timeout);
if (r <= 0) {
r = r == 0 ? -ETIMEDOUT : r;
return error_unify(r);
}

static const int POLL_INFINITE = -1;
r = (int) write(pipe, buffer, size);

return error_unify_or_else(r, r);
}

int pipe_wait(int out, int err, int *ready)
int pipe_wait(int out, int err, int *ready, int timeout)
{
assert(out != HANDLE_INVALID);
assert(err != HANDLE_INVALID);
assert(ready);

struct pollfd pollfds[2] = { { .fd = out, .events = POLLIN },
{ .fd = err, .events = POLLIN } };
struct pollfd pollfds[2] = { { 0 }, { 0 } };
nfds_t num_pollfds = 0;

int r = poll(pollfds, ARRAY_SIZE(pollfds), POLL_INFINITE);
if (r < 0) {
if (out != HANDLE_INVALID) {
pollfds[num_pollfds++] = (struct pollfd){ .fd = out, .events = POLLIN };
}

if (err != HANDLE_INVALID) {
pollfds[num_pollfds++] = (struct pollfd){ .fd = err, .events = POLLIN };
}

if (num_pollfds == 0) {
return -EPIPE;
}

int r = poll(pollfds, num_pollfds, timeout);
if (r <= 0) {
r = r == 0 ? -ETIMEDOUT : r;
return error_unify(r);
}

for (size_t i = 0; i < ARRAY_SIZE(pollfds); i++) {
for (size_t i = 0; i < num_pollfds; i++) {
struct pollfd pollfd = pollfds[i];

if (pollfd.revents > 0) {
Expand Down
8 changes: 2 additions & 6 deletions reproc/src/posix/process.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
#include <sys/wait.h>
#include <unistd.h>

// Including the entire reproc.h header is overkill so we import only the
// constant we need.
extern const unsigned int REPROC_INFINITE;

static int signal_mask(int how, const sigset_t *newmask, sigset_t *oldmask)
{
int r = -1;
Expand Down Expand Up @@ -350,14 +346,14 @@ static int parse_status(int status)
return WIFEXITED(status) ? WEXITSTATUS(status) : WTERMSIG(status) + UINT8_MAX;
}

int process_wait(pid_t process, unsigned int timeout)
int process_wait(pid_t process, int timeout)
{
assert(process != HANDLE_INVALID);

int status = 0;
int r = -1;

if (timeout == 0 || timeout == REPROC_INFINITE) {
if (timeout <= 0) {
r = waitpid(process, &status, timeout == 0 ? WNOHANG : 0);

if (r >= 0) {
Expand Down
2 changes: 1 addition & 1 deletion reproc/src/process.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int process_create(handle *process,
//
// If `timeout` is `REPROC_INFINITE`, this function waits indefinitely for a
// process to exit.
int process_wait(handle process, unsigned int timeout);
int process_wait(handle process, int timeout);

// Sends the `SIGTERM` (POSIX) or `CTRL-BREAK` (Windows) signal to the process
// indicated by `process`.
Expand Down
36 changes: 17 additions & 19 deletions reproc/src/reproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ enum { STATUS_NOT_STARTED = -2, STATUS_IN_PROGRESS = -1 };
const int REPROC_SIGKILL = UINT8_MAX + 9;
const int REPROC_SIGTERM = UINT8_MAX + 15;

const unsigned int REPROC_INFINITE = 0xFFFFFFFF; // == `INFINITE` on Windows
const int REPROC_INFINITE = -1;

static int redirect(handle *parent,
handle *child,
Expand Down Expand Up @@ -152,7 +152,8 @@ int reproc_start(reproc_t *process,
int reproc_read(reproc_t *process,
REPROC_STREAM *stream,
uint8_t *buffer,
size_t size)
size_t size,
int timeout)
{
assert_return(process, REPROC_EINVAL);
assert_return(buffer, REPROC_EINVAL);
Expand All @@ -163,18 +164,9 @@ int reproc_read(reproc_t *process,
// Get the first pipe that will have data available to be read.
handle ready = HANDLE_INVALID;

if (process->stdio.out == HANDLE_INVALID &&
process->stdio.err == HANDLE_INVALID) {
return REPROC_EPIPE;
} else if (process->stdio.out == HANDLE_INVALID) {
ready = process->stdio.err;
} else if (process->stdio.err == HANDLE_INVALID) {
ready = process->stdio.out;
} else {
r = pipe_wait(process->stdio.out, process->stdio.err, &ready);
if (r < 0) {
return r;
}
r = pipe_wait(process->stdio.out, process->stdio.err, &ready, timeout);
if (r < 0) {
return r;
}

r = pipe_read(ready, buffer, size);
Expand Down Expand Up @@ -203,7 +195,10 @@ int reproc_read(reproc_t *process,
return r; // bytes read
}

int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err)
int reproc_drain(reproc_t *process,
reproc_sink *out,
reproc_sink *err,
int timeout)
{
assert_return(process, REPROC_EINVAL);

Expand All @@ -223,7 +218,7 @@ int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err)

while (true) {
REPROC_STREAM stream = { 0 };
r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer));
r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer), timeout);
if (r < 0) {
break;
}
Expand All @@ -241,7 +236,10 @@ int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err)
return r == REPROC_EPIPE ? 0 : r;
}

int reproc_write(reproc_t *process, const uint8_t *buffer, size_t size)
int reproc_write(reproc_t *process,
const uint8_t *buffer,
size_t size,
int timeout)
{
assert_return(process, REPROC_EINVAL);
assert_return(buffer, REPROC_EINVAL);
Expand All @@ -253,7 +251,7 @@ int reproc_write(reproc_t *process, const uint8_t *buffer, size_t size)
int r = -1;

do {
r = pipe_write(process->stdio.in, buffer, size);
r = pipe_write(process->stdio.in, buffer, size, timeout);

if (r == REPROC_EPIPE) {
process->stdio.in = handle_destroy(process->stdio.in);
Expand Down Expand Up @@ -293,7 +291,7 @@ int reproc_close(reproc_t *process, REPROC_STREAM stream)
return 0;
}

int reproc_wait(reproc_t *process, unsigned int timeout)
int reproc_wait(reproc_t *process, int timeout)
{
assert_return(process, REPROC_EINVAL);
assert_return(process->status != STATUS_NOT_STARTED, REPROC_EINVAL);
Expand Down
Loading

0 comments on commit 55eb283

Please sign in to comment.