Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boot: replay in subprocess #654

Merged
merged 14 commits into from
Jun 24, 2024
138 changes: 130 additions & 8 deletions pkg/vere/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "db/lmdb.h"
#include "getopt.h"
#include "libgen.h"
#include "pthread.h"
#include "spawn.h"

#include "ca_bundle.h"
#include "pace.h"
Expand Down Expand Up @@ -299,12 +301,13 @@ _main_getopt(c3_i argc, c3_c** argv)
{ "prop-url", required_argument, NULL, 2 },
{ "prop-name", required_argument, NULL, 3 },
//
{ "urth-loom", required_argument, NULL, 5 },
{ "no-demand", no_argument, NULL, 6 },
{ "swap", no_argument, NULL, 7 },
{ "swap-to", required_argument, NULL, 8 },
{ "toss", required_argument, NULL, 9 },
{ "urth-loom", required_argument, NULL, 5 },
{ "no-demand", no_argument, NULL, 6 },
{ "swap", no_argument, NULL, 7 },
{ "swap-to", required_argument, NULL, 8 },
{ "toss", required_argument, NULL, 9 },
{ "behn-allow-blocked", no_argument, NULL, 10 },
{ "serf-bin", required_argument, NULL, 11 },
{ "lmdb-map-size", required_argument, NULL, 12 },
//
{ NULL, 0, NULL, 0 },
Expand Down Expand Up @@ -349,10 +352,15 @@ _main_getopt(c3_i argc, c3_c** argv)
u3_Host.ops_u.beb = c3y;
break;
}
case 11: { // serf-bin
u3_Host.wrk_c = strdup(optarg);
break;
}
case 12: { // lmdb-map-size
if ( 1 != sscanf(optarg, "%" SCNuMAX, &u3_Host.ops_u.siz_i) ) {
return c3n;
}

break;
}
// special args
Expand Down Expand Up @@ -2370,6 +2378,113 @@ _cw_play_impl(c3_d eve_d, c3_d sap_d, c3_o mel_o, c3_o sof_o, c3_o ful_o)
return pay_d;
}

/* _cw_play_fork_heed(): wait for EOF on STDIN or until canceled.
*/
void* _cw_play_fork_heed(void* arg) {
c3_c buf[1];
c3_zs red;

do {
pthread_testcancel();
red = read(STDIN_FILENO, buf, sizeof(buf));
if ( 0 == red ) {
fprintf(stderr, "play: god save the king! committing sudoku...\r\n");
exit(1);
}
} while ( 0 < red );

return NULL;
}

/* _cw_play_fork(): spawn a subprocess for event replay.
*/
static c3_i
_cw_play_fork(c3_d eve_d, c3_d sap_d, c3_o mel_o, c3_o sof_o, c3_o ful_o)
{
// prepare args
//
c3_c eve_c[21], sap_c[21] = { 0 };
if ( 0 > sprintf(eve_c, "%" PRIu64, eve_d) ||
0 > sprintf(sap_c, "%" PRIu64, sap_d) )
{
fprintf(stderr, "play: error parsing args\r\n");
return 1;
}

c3_c *argv[11] = {
u3_Host.wrk_c,
"play",
u3_Host.dir_c,
"--replay-to",
eve_c,
"--snap-at",
sap_c,
};

c3_z i = 7;
if _(mel_o) {
argv[i++] = "--auto-meld";
}
if _(sof_o) {
argv[i++] = "--soft-mugs";
}
if _(ful_o) {
argv[i++] = "--full";
}
argv[i] = NULL;

// prepare a pipe for ipc with the subprocess
//
c3_i pipefd[2];
if ( 0 != pipe(pipefd) ) {
fprintf(stderr, "play: failed to open pipe\r\n");
return 1;
}

// set the child process' stdin to read from the pipe
//
posix_spawn_file_actions_t action;
posix_spawn_file_actions_init(&action);
posix_spawn_file_actions_addclose(&action, pipefd[1]);
posix_spawn_file_actions_adddup2(&action, pipefd[0], STDIN_FILENO);

// spawn a new serf process and call its play subcommand
//
pid_t pid;
if ( 0 != posix_spawn(&pid, u3_Host.wrk_c, &action, 0, argv, 0) ) {
fprintf(stderr, "play: posix_spawn: %d\r\n", errno);
return 1;
}

// close the read end of the pipe in the parent
//
close(pipefd[0]);

// wait for the child to exit
//
c3_i sat_i;
if ( -1 == waitpid(pid, &sat_i, 0) ) {
fprintf(stderr, "play: waitpid: %d\r\n", errno);
return 1;
}

if ( WIFEXITED(sat_i) ) {
c3_i ret_i = WEXITSTATUS(sat_i);
if ( 0 != ret_i ) {
fprintf(stderr, "play: exited with %d\r\n", ret_i);
}
return ret_i;
}
else if ( WIFSIGNALED(sat_i) ) {
fprintf(stderr, "play: terminated by signal %d\r\n", WTERMSIG(sat_i));
return 1;
}
else {
fprintf(stderr, "play: strange termination\r\n");
return 1;
}
}

/* _cw_play(): replay events, but better.
*/
static void
Expand Down Expand Up @@ -2462,9 +2577,14 @@ _cw_play(c3_i argc, c3_c* argv[])
exit(1);
}

pthread_t ted;
pthread_create(&ted, NULL, _cw_play_fork_heed, NULL);

if ( !_cw_play_impl(eve_d, sap_d, mel_o, sof_o, ful_o) ) {
fprintf(stderr, "mars: nothing to do!\r\n");
}

pthread_cancel(ted);
}

/* _cw_prep(): prepare for upgrade
Expand Down Expand Up @@ -3037,8 +3157,6 @@ main(c3_i argc,

_main_self_path();

// XX add argument
//
if ( !u3_Host.wrk_c ) {
u3_Host.wrk_c = bin_c;
}
Expand Down Expand Up @@ -3181,7 +3299,11 @@ main(c3_i argc,
// we need the current snapshot's latest event number to
// validate whether we can execute disk migration
if ( u3_Host.ops_u.nuu == c3n ) {
_cw_play_impl(0, 0, c3n, c3n, c3n);
c3_i sat_i = _cw_play_fork(0, 0, c3n, c3n, c3n);
if ( sat_i ) {
fprintf(stderr, "play: replay failed: %d\r\n", sat_i);
exit(sat_i);
}
signal(SIGTSTP, _stop_exit);
// XX unmap loom, else parts of the snapshot could be left in memory
}
Expand Down
Loading