Skip to content

Commit

Permalink
reproc(++): Take separate sinks for each output stream.
Browse files Browse the repository at this point in the history
This is much more flexible than always passing both output streams to
the same sink.
  • Loading branch information
DaanDeMeyer committed Jan 3, 2020
1 parent 1e75de2 commit 698a166
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 82 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@

- Make `reproc_strerror` thread-safe.

- Make `reproc_drain` take a separate sink for each output stream. Sinks are now
passed via the `reproc_sink` type.

Using separate sinks for both output streams allows for a lot more
flexibility. To use a single sink for both output streams, simply pass the
same sink to both the `out` and `err` arguments of `reproc_drain`.

### reproc++

- Remove `process::parse`, `process::exit_status` and `process::running`.
Expand Down Expand Up @@ -231,6 +238,12 @@
- Add `sink::thread_safe::string` which is a thread-safe version of
`sink::string`.

- Make `process::drain` take a separate sink for each output stream.

Same reasoning as `reproc_drain`.

- Modify all included sinks to support the new `process::drain` behaviour.

### CMake

- Drop required CMake version to CMake 3.12.
Expand Down
3 changes: 2 additions & 1 deletion reproc/examples/drain.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ int main(void)
// `reproc_drain` along with `reproc_sink_string`. If a sink function needs
// more than one parameter, simply store the parameters in a struct and pass
// the address of the struct as the `context` parameter.
r = reproc_drain(process, reproc_sink_string, &output);
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
if (r < 0) {
goto cleanup;
}
Expand Down
32 changes: 22 additions & 10 deletions reproc/include/reproc/reproc.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ typedef struct reproc_options {
reproc_stop_actions stop_actions;
} reproc_options;

/*! Used by `reproc_drain` to provide data to the caller. Each time data is
read, `function` is called with `context`. See `reproc_drain` and the `drain`
example for more information .*/
typedef struct reproc_sink {
bool (*function)(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context);
void *context;
} reproc_sink;

/*! Allocate a new `reproc_t` instance on the heap. */
REPROC_EXPORT reproc_t *reproc_new(void);

Expand Down Expand Up @@ -171,24 +182,25 @@ REPROC_EXPORT int reproc_read(reproc_t *process,
size_t size);

/*!
Calls `reproc_read` on `stream` until `reproc_read` returns an error or `sink`
returns false. `sink` receives the output after each read, along with `context`.
Calls `reproc_read` on `stream` until `reproc_read` returns an error or one of
the sinks returns false. The `out` and `err` sinks receive the output from
stdout and stderr respectively. The same sink may be passed to both `out` and
`err`.
If `out` or `err` are `NULL`, all output on the corresponding stream is
discarded.
`reproc_drain` always starts by calling `sink` once with an empty buffer and
`stream` set to `REPROC_STREAM_IN` to give the sink the chance to process all
`reproc_drain` always starts by calling both sinks once with an empty buffer and
`stream` set to `REPROC_STREAM_IN` to give each sink the chance to process all
output from the previous call to `reproc_drain` one by one.
Note that his function returns 0 instead of `REPROC_EPIPE` when both output
streams of the child process are closed.
For examples of sinks, see `sink.h`.
*/
REPROC_EXPORT int reproc_drain(reproc_t *process,
bool (*sink)(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context),
void *context);
REPROC_EXPORT int
reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err);

/*!
Writes `size` bytes from `buffer` to the standard input (stdin) of the child
Expand Down
22 changes: 11 additions & 11 deletions reproc/src/reproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,18 @@ int reproc_read(reproc_t *process,
return r; // bytes read
}

int reproc_drain(reproc_t *process,
bool (*sink)(REPROC_STREAM stream,
const uint8_t *buffer,
size_t size,
void *context),
void *context)
int reproc_drain(reproc_t *process, reproc_sink *out, reproc_sink *err)
{
assert_return(process, REPROC_EINVAL);
assert_return(sink, REPROC_EINVAL);

const uint8_t initial = 0;

// A single call to `read` might contain multiple messages. By always calling
// `sink` once with no data before reading, we give it the chance to process
// all previous output one by one before reading from the child process again.
if (!sink(REPROC_STREAM_IN, (uint8_t[]){ 0 }, 0, context)) {
// both sinks once with no data before reading, we give them the chance to
// process all previous output one by one before reading from the child
// process again.
if ((out && !out->function(REPROC_STREAM_IN, &initial, 0, out->context)) ||
(err && !err->function(REPROC_STREAM_IN, &initial, 0, err->context))) {
return 0;
}

Expand All @@ -232,8 +230,10 @@ int reproc_drain(reproc_t *process,

size_t bytes_read = (size_t) r;

reproc_sink *sink = stream == REPROC_STREAM_OUT ? out : err;

// `sink` returns false to tell us to stop reading.
if (!sink(stream, buffer, bytes_read, context)) {
if (sink && !sink->function(stream, buffer, bytes_read, sink->context)) {
break;
}
}
Expand Down
3 changes: 2 additions & 1 deletion reproc/test/argv.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ int main(void)
assert(r == 0);

char *output = NULL;
r = reproc_drain(process, reproc_sink_string, &output);
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
assert(r == 0);
assert(output != NULL);

Expand Down
3 changes: 2 additions & 1 deletion reproc/test/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ int main(void)
assert(r == 0);

char *output = NULL;
r = reproc_drain(process, reproc_sink_string, &output);
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
assert(r == 0);
assert(output != NULL);

Expand Down
3 changes: 2 additions & 1 deletion reproc/test/io.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ static int io(const char *mode, const char *input, const char *expected)
assert(r == 0);

char *output = NULL;
r = reproc_drain(process, reproc_sink_string, &output);
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
assert(r == 0);
assert(output != NULL);

Expand Down
3 changes: 2 additions & 1 deletion reproc/test/overflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ int main(void)
assert(r >= 0);

char *output = NULL;
r = reproc_drain(process, reproc_sink_string, &output);
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
assert(r >= 0);
assert(output != NULL);

Expand Down
3 changes: 2 additions & 1 deletion reproc/test/working-directory.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ int main(void)
assert(r == 0);

char *output = NULL;
r = reproc_drain(process, reproc_sink_string, &output);
reproc_sink sink = { reproc_sink_string, &output };
r = reproc_drain(process, &sink, &sink);
assert(r == 0);

replace(output, '\\', '/');
Expand Down
8 changes: 4 additions & 4 deletions reprocxx/examples/background.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ int main(int argc, char *argv[])
auto drain_async = std::async(std::launch::async, [&process, &output,
&mutex]() {
// `sink::thread_safe::string` locks a given mutex before appending to the
// given string(s), allowing working with the string(s) across multiple
// threads if the mutex is locked in the other threads as well.
reproc::sink::thread_safe::string sink(output, output, mutex);
return process.drain(sink);
// given string, allowing working with the string across multiple threads if
// the mutex is locked in the other threads as well.
reproc::sink::thread_safe::string sink(output, mutex);
return process.drain(sink, sink);
});

// Show new output every 2 seconds.
Expand Down
11 changes: 6 additions & 5 deletions reprocxx/examples/cmake-help.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ int main()

// `process::drain` reads from the stdout and stderr streams of the child
// process until both are closed or an error occurs. Providing it with a
// string sink makes it store all output in the string(s) passed to the string
// sink. Passing the same string to both the `out` and `err` arguments of
// `sink::string` causes the stdout and stderr output to get stored in the
// same string.
// string sink for a specific stream makes it store all output of that stream
// in the string passed to the string sink. Passing the same sink to both the
// `out` and `err` arguments of `process::drain` causes the stdout and stderr
// output to get stored in the same string.
std::string output;
ec = process.drain(reproc::sink::string(output, output));
reproc::sink::string sink(output);
ec = process.drain(sink, sink);
if (ec) {
return fail(ec);
}
Expand Down
17 changes: 10 additions & 7 deletions reprocxx/include/reproc++/reproc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,17 @@ class process {
`reproc_drain` but takes a lambda as its argument instead of a function and
context pointer.
Unlike `reproc_drain`, it is not possible to pass `NULL` sinks to this method.
Instead, use `sink::discard` which has the same effect.
`sink` expects the following signature:
```c++
bool sink(stream stream, const uint8_t *buffer, size_t size);
```
*/
template <typename Sink>
std::error_code drain(Sink &&sink);
std::error_code drain(Sink &&out, Sink &&err);

REPROCXX_EXPORT std::error_code write(const uint8_t *buffer,
size_t size) noexcept;
Expand All @@ -127,16 +130,14 @@ class process {
};

template <typename Sink>
std::error_code process::drain(Sink &&sink)
std::error_code process::drain(Sink &&out, Sink &&err)
{
// We can't use compound literals in C++ to pass the initial value to `sink`
// so we use a constexpr value instead.
static constexpr uint8_t initial = 0;

// A single call to `read` might contain multiple messages. By always calling
// `sink` once with no data before reading, we give it the chance to process
// all previous output before reading from the child process again.
if (!sink(stream::in, &initial, 0)) {
// both sinks once with no data before reading, we give them the chance to
// process all previous output before reading from the child process again.
if (!out(stream::in, &initial, 0) || !err(stream::in, &initial, 0)) {
return {};
}

Expand All @@ -151,6 +152,8 @@ std::error_code process::drain(Sink &&sink)
break;
}

auto &sink = stream == stream::out ? out : err;

// `sink` returns false to tell us to stop reading.
if (!sink(stream, buffer.data(), bytes_read)) {
break;
Expand Down
52 changes: 13 additions & 39 deletions reprocxx/include/reproc++/sink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,59 +10,33 @@
namespace reproc {
namespace sink {

/*! Reads all output into `out`. */
/*! Reads all output into `string`. */
class string {
std::string &out_;
std::string &err_;
std::string &string_;

public:
explicit string(std::string &out, std::string &err) noexcept
: out_(out), err_(err)
{}
explicit string(std::string &string) noexcept : string_(string) {}

bool operator()(stream stream, const uint8_t *buffer, size_t size)
{
switch (stream) {
case stream::out:
out_.append(reinterpret_cast<const char *>(buffer), size);
break;
case stream::err:
err_.append(reinterpret_cast<const char *>(buffer), size);
break;
case stream::in:
break;
}

(void) stream;
string_.append(reinterpret_cast<const char *>(buffer), size);
return true;
}
};

/*! Forwards all output to `out`. */
/*! Forwards all output to `ostream`. */
class ostream {
std::ostream &out_;
std::ostream &err_;
std::ostream &ostream_;

public:
explicit ostream(std::ostream &out, std::ostream &err) noexcept

: out_(out), err_(err)
{}
explicit ostream(std::ostream &ostream) noexcept : ostream_(ostream) {}

bool operator()(stream stream, const uint8_t *buffer, size_t size)
{
switch (stream) {
case stream::out:
out_.write(reinterpret_cast<const char *>(buffer),
static_cast<std::streamsize>(size));
break;
case stream::err:
err_.write(reinterpret_cast<const char *>(buffer),
(void) stream;
ostream_.write(reinterpret_cast<const char *>(buffer),
static_cast<std::streamsize>(size));
break;
case stream::in:
break;
}

return true;
}
};
Expand All @@ -82,14 +56,14 @@ class discard {

namespace thread_safe {

/*! `sink::string` but locks the given mutex before appending to the string. */
/*! `sink::string` but locks the given mutex before invoking the sink. */
class string {
sink::string sink_;
std::mutex &mutex_;

public:
string(std::string &out, std::string &err, std::mutex &mutex) noexcept
: sink_(out, err), mutex_(mutex)
string(std::string &string, std::mutex &mutex) noexcept
: sink_(string), mutex_(mutex)
{}

bool operator()(stream stream, const uint8_t *buffer, size_t size)
Expand Down

0 comments on commit 698a166

Please sign in to comment.