Skip to content

Commit

Permalink
reproc(++): Rework drain and sinks.
Browse files Browse the repository at this point in the history
`reproc_drain` and `process::drain` have been moved sink.h and sink.hpp
respectively. `process::drain` is now `reproc::drain` and takes a
process as its first argument. reproc's built-in sinks have been changed
to return sinks. The functions themselves are now hidden in sink.c
  • Loading branch information
DaanDeMeyer committed Jan 4, 2020
1 parent 55eb283 commit 50997a4
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 249 deletions.
27 changes: 18 additions & 9 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,6 @@
Naming the enum after the function it is passed to (`reproc_stop`) is simpler
than using a different name.

- Inline the `reproc_sink_string` and `reproc_sink_discard` implementations in
the sink.h header.

This avoids issues with allocating across module (DLL) boundaries on Windows.

- Rewrite tests in C using CTest and `assert` and remove doctest.

Doctest is a great library but we don't really lose anything major by
Expand All @@ -193,13 +188,23 @@

- Make `reproc_strerror` thread-safe.

- Move `reproc_drain` to sink.h.

- Make `reproc_drain` take a separate sink for each output stream. Sinks are now
passed via the `reproc_sink` type.

Using separate sinks for both output streams allows for a lot more
flexibility. To use a single sink for both output streams, simply pass the
same sink to both the `out` and `err` arguments of `reproc_drain`.

- Turn `reproc_sink_string` and `reproc_sink_discard` into functions that return
sinks and hide the actual functions in sink.c.

- Add `reproc_free` to sink.h which must be used to free memory allocated by
`reproc_sink_string`.

This avoids issues with allocating across module (DLL) boundaries on Windows.

- Support passing timeouts to `reproc_read`, `reproc_write` and `reproc_drain`.

Pass `REPROC_INFINITE` as the timeout to retain the old behaviour.
Expand Down Expand Up @@ -244,16 +249,20 @@
- Add `sink::thread_safe::string` which is a thread-safe version of
`sink::string`.

- Make `process::drain` take a separate sink for each output stream.
- Move `process::drain` out of the `process` class and move it to sink.hpp.

`process.drain(...)` becomes `reproc::drain(process, ...)`.

- Make `reproc::drain` take a separate sink for each output stream.

Same reasoning as `reproc_drain`.

- Modify all included sinks to support the new `process::drain` behaviour.
- Modify all included sinks to support the new `reproc::drain` behaviour.

- Support passing timeouts to `process::read`, `process::write` and
`process::drain`.
`reproc::drain`.

The methods default to waiting indefinitely which matches their old behaviour.
They still default to waiting indefinitely which matches their old behaviour.

### CMake

Expand Down
1 change: 1 addition & 0 deletions reproc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ target_sources(reproc PRIVATE
src/${REPROC_PLATFORM}/process.c
src/${REPROC_PLATFORM}/redirect.c
src/reproc.c
src/sink.c
)

if(REPROC_TEST)
Expand Down
21 changes: 10 additions & 11 deletions reproc/examples/drain.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ int main(void)
goto cleanup;
}

// A sink function receives a single context parameter. `reproc_sink_string`
// requires a `char **` with its value set to `NULL` to be passed to
// `reproc_drain` along with `reproc_sink_string`. If a sink function needs
// more than one parameter, simply store the parameters in a struct and pass
// the address of the struct as the `context` parameter.
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink, REPROC_INFINITE);
// `reproc_drain` reads from a child process and passes the output to the
// given sinks. A sink consists of a function pointer and a context pointer
// which is always passed to the function. reproc provides several built-in
// sinks such as `reproc_sink_string` which stores all provided output in the
// given string. Passing the same sink to both output streams makes sure the
// output from both streams is combined into a single string.
reproc_sink sink = reproc_sink_string(&output);
r = reproc_drain(process, sink, sink, REPROC_INFINITE);
if (r < 0) {
goto cleanup;
}
Expand All @@ -56,10 +57,8 @@ int main(void)
}

cleanup:
// `output` always points to valid memory or is set to `NULL` by
// `reproc_sink_string` so it's always safe to call `free` on it.
free(output);

// Memory allocated by `reproc_sink_string` must be freed with `reproc_free`.
reproc_free(output);
reproc_destroy(process);

if (r < 0) {
Expand Down
40 changes: 0 additions & 40 deletions reproc/include/reproc/reproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,6 @@ typedef struct reproc_options {
reproc_stop_actions stop_actions;
} reproc_options;

/*! Used by `reproc_drain` to provide data to the caller. Each time data is
read, `function` is called with `context`. See `reproc_drain` and the `drain`
example for more information .*/
typedef struct reproc_sink {
bool (*function)(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context);
void *context;
} reproc_sink;

/*! Allocate a new `reproc_t` instance on the heap. */
REPROC_EXPORT reproc_t *reproc_new(void);

Expand Down Expand Up @@ -187,35 +176,6 @@ REPROC_EXPORT int reproc_read(reproc_t *process,
size_t size,
int timeout);

/*!
Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of
the sinks returns false. The `out` and `err` sinks receive the output from
stdout and stderr respectively. The same sink may be passed to both `out` and
`err`.
If `out` or `err` are `NULL`, all output on the corresponding stream is
discarded.
`reproc_drain` always starts by calling both sinks once with an empty buffer and
`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all
output from the previous call to `reproc_drain` one by one.
Each call to `reproc_read` is passed the given timeout. If a call to
`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`.
Note that his function returns 0 instead of `REPROC_EPIPE` when both output
streams of the child process are closed.
For examples of sinks, see `sink.h`.
Actionable errors:
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int reproc_drain(reproc_t *process,
reproc_sink *out,
reproc_sink *err,
int timeout);

/*!
Writes `size` bytes from `buffer` to the standard input (stdin) of the child
process.
Expand Down
113 changes: 43 additions & 70 deletions reproc/include/reproc/sink.h
Original file line number Diff line number Diff line change
@@ -1,30 +1,56 @@
/*! Sink functions that can be passed to `reproc_drain`. */

#pragma once

#include <reproc/reproc.h>

#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>

#ifdef __cplusplus
extern "C" {
#endif

/*! Used by `reproc_drain` to provide data to the caller. Each time data is
read, `function` is called with `context` .*/
typedef struct reproc_sink {
bool (*function)(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context);
void *context;
} reproc_sink;

/*!
Stores the output (both stdout and stderr) of a process in a single contiguous C
string.
Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of
the sinks returns false. The `out` and `err` sinks receive the output from
stdout and stderr respectively. The same sink may be passed to both `out` and
`err`.
`reproc_drain` always starts by calling both sinks once with an empty buffer and
`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all
output from the previous call to `reproc_drain` one by one.
Each call to `reproc_read` is passed the given timeout. If a call to
`reproc_read` times out, this function returns `REPROC_ETIMEDOUT`.
Note that his function returns 0 instead of `REPROC_EPIPE` when both output
streams of the child process are closed.
For examples of sinks, see `sink.h`.
Actionable errors:
- `REPROC_ETIMEDOUT`
*/
REPROC_EXPORT int
reproc_drain(reproc_t *process, reproc_sink out, reproc_sink err, int timeout);

/*!
Stores the output (both stdout and stderr) of a process in `output`.
Expects a `char **` with its value set to `NULL` as its initial context.
(Re)Allocates memory as necessary to store the output and assigns it as the
value of the given context. If allocating more memory fails, the already
allocated memory is freed and the value of the given context is set to `NULL`.
After calling `reproc_drain` with `reproc_sink_string`, the value of `context`
After calling `reproc_drain` with `reproc_sink_string`, the value of `output`
will either point to valid memory or will be set to `NULL`. This means it is
always safe to call `free` on `context`'s value after `reproc_drain` finishes.
always safe to call `free` on `output`'s value after `reproc_drain` finishes.
Because the context this function expects does not store the output size,
`strlen` is called each time data is read to calculate the current size of the
Expand All @@ -37,65 +63,12 @@ their output because `strlen` is used to calculate the current output size.
The `drain` example shows how to use `reproc_sink_string`.
```
*/
static bool reproc_sink_string(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context);
REPROC_EXPORT reproc_sink reproc_sink_string(char **output);

/*! Discards the output of a process. */
static bool reproc_sink_discard(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context);

// We inline the sink implementations to avoid allocation/de-allocation issues
// over module (DLL) boundaries. By inlining the implementations, memory is
// always allocated and freed in the user's executable/DLL.

#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-function"

static inline bool reproc_sink_string(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context)
{
(void) stream;

char **string = (char **) context;
size_t string_size = *string == NULL ? 0 : strlen(*string);

char *realloc_result = (char *) realloc(*string, string_size + size + 1);
if (realloc_result == NULL) {
free(*string);
*string = NULL;
return false;
} else {
*string = realloc_result;
}

memcpy(*string + string_size, buffer, size);

(*string)[string_size + size] = '\0';

return true;
}

static inline bool reproc_sink_discard(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context)
{
(void) stream;
(void) buffer;
(void) size;
(void) context;

return true;
}

#pragma clang diagnostic pop
REPROC_EXPORT reproc_sink reproc_sink_discard(void);

#ifdef __cplusplus
}
#endif
/*! Calls `free` on `ptr` and returns `NULL`. Use this function to free memory
allocated by `reproc_sink_string`. This avoids issues with allocating across
module (DLL) boundaries on Windows. */
REPROC_EXPORT void *reproc_free(void *ptr);
41 changes: 0 additions & 41 deletions reproc/src/reproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,47 +195,6 @@ int reproc_read(reproc_t *process,
return r; // bytes read
}

int reproc_drain(reproc_t *process,
reproc_sink *out,
reproc_sink *err,
int timeout)
{
assert_return(process, REPROC_EINVAL);

const uint8_t initial = 0;

// A single call to `read` might contain multiple messages. By always calling
// both sinks once with no data before reading, we give them the chance to
// process all previous output one by one before reading from the child
// process again.
if ((out && !out->function(REPROC_STREAM_IN, &initial, 0, out->context)) ||
(err && !err->function(REPROC_STREAM_IN, &initial, 0, err->context))) {
return 0;
}

uint8_t buffer[4096];
int r = -1;

while (true) {
REPROC_STREAM stream = { 0 };
r = reproc_read(process, &stream, buffer, ARRAY_SIZE(buffer), timeout);
if (r < 0) {
break;
}

size_t bytes_read = (size_t) r;

reproc_sink *sink = stream == REPROC_STREAM_OUT ? out : err;

// `sink` returns false to tell us to stop reading.
if (sink && !sink->function(stream, buffer, bytes_read, sink->context)) {
break;
}
}

return r == REPROC_EPIPE ? 0 : r;
}

int reproc_write(reproc_t *process,
const uint8_t *buffer,
size_t size,
Expand Down
Loading

0 comments on commit 50997a4

Please sign in to comment.