Skip to content

Commit

Permalink
content: flush before checkpointing
Browse files Browse the repository at this point in the history
Problem: Before checkpointing, users need to remember to call
content.flush, to ensure data has been flushed to the backing store.
It is easy to forget this.

Within the content module, call content.flush before checkpointing.

Fixes #6242
  • Loading branch information
chu11 committed Dec 18, 2024
1 parent ef6a66b commit 3402b5b
Showing 1 changed file with 80 additions and 5 deletions.
85 changes: 80 additions & 5 deletions src/modules/content/checkpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,83 @@ static int checkpoint_put_forward (struct content_checkpoint *checkpoint,
return -1;
}

static void checkpoint_content_flush_continuation (flux_future_t *f, void *arg)
{
struct content_checkpoint *checkpoint = arg;
const flux_msg_t *msg = flux_future_aux_get (f, "msg");
const char *key;
json_t *value;
int flags = 0;
const char *errstr = NULL;

assert (msg);

if (flux_rpc_get (f, NULL) < 0) {
errstr = "error flushing content";
goto error;
}

if (flux_request_unpack (msg,
NULL,
"{s:s s:o s?i}",
"key", &key,
"value", &value,
"flags", &flags) < 0)
goto error;

if (checkpoint_put_forward (checkpoint,
msg,
key,
value,
&errstr) < 0)
goto error;

flux_future_destroy (f);
return;

error:
if (flux_respond_error (checkpoint->h, msg, errno, errstr) < 0)
flux_log_error (checkpoint->h,
"error responding to checkpoint-put request");
flux_future_destroy (f);
}

static int checkpoint_content_flush (struct content_checkpoint *checkpoint,
const flux_msg_t *msg,
const char **errstr)
{
const char *topic = "content.flush";
uint32_t rank;
flux_future_t *f = NULL;

if (flux_get_rank (checkpoint->h, &rank) < 0) {
(*errstr) = "error retrieving rank";
return -1;
}

if (!(f = flux_rpc (checkpoint->h, topic, NULL, rank, 0))
|| flux_future_then (f,
-1,
checkpoint_content_flush_continuation,
checkpoint) < 0)
goto error;

if (flux_future_aux_set (f,
"msg",
(void *)flux_msg_incref (msg),
(flux_free_f)flux_msg_decref) < 0) {
flux_msg_decref (msg);
goto error;
}

return 0;

error:
(*errstr) = "error starting content.flush RPC";
flux_future_destroy (f);
return -1;
}

void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
const flux_msg_t *msg, void *arg)
{
Expand All @@ -216,11 +293,9 @@ void content_checkpoint_put_request (flux_t *h, flux_msg_handler_t *mh,
"value", &value) < 0)
goto error;

if (checkpoint_put_forward (checkpoint,
msg,
key,
value,
&errstr) < 0)
if (checkpoint_content_flush (checkpoint,
msg,
&errstr) < 0)
goto error;

return;
Expand Down

0 comments on commit 3402b5b

Please sign in to comment.