-
Notifications
You must be signed in to change notification settings - Fork 50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
broker: enable brokers to be added to running instances #5184
Open
garlick
wants to merge
8
commits into
flux-framework:master
Choose a base branch
from
garlick:flub
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
7ee4e69
broker: ensure CURVE certificate has a name
garlick 2e9f9f1
broker: allow instance size > PMI bootstrap size
garlick b6328d9
broker: refactor bootstrap block
garlick 35ac794
broker: add flub bootstrap method
garlick c03df33
broker: add flub RPC methods to overlay
garlick 9ccf30c
testsuite: add coverage for instance size override
garlick 1a59ee7
testsuite: cover flub bootstrap
garlick 0ba34b1
broker: provision dead brokers for flub replacement
garlick File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
/************************************************************\ | ||
* Copyright 2023 Lawrence Livermore National Security, LLC | ||
* (c.f. AUTHORS, NOTICE.LLNS, COPYING) | ||
* | ||
* This file is part of the Flux resource manager framework. | ||
* For details, see https://github.com/flux-framework. | ||
* | ||
* SPDX-License-Identifier: LGPL-3.0 | ||
\************************************************************/ | ||
|
||
/* boot_flub.c - FLUx Boot protocol | ||
* | ||
* Add a broker to an existing Flux instance. | ||
*/ | ||
|
||
#if HAVE_CONFIG_H | ||
#include "config.h" | ||
#endif | ||
#include <errno.h> | ||
#include <jansson.h> | ||
#include <flux/core.h> | ||
|
||
#include "src/common/libidset/idset.h" | ||
#include "src/common/libutil/errprintf.h" | ||
#include "src/common/libutil/errno_safe.h" | ||
#include "src/common/libutil/ipaddr.h" | ||
#include "ccan/str/str.h" | ||
|
||
#include "attr.h" | ||
#include "overlay.h" | ||
#include "topology.h" | ||
|
||
#include "boot_flub.h" | ||
|
||
struct boot_info { | ||
int size; | ||
int rank; | ||
json_t *attrs; | ||
}; | ||
|
||
struct boot_parent { | ||
char *pubkey; | ||
int rank; | ||
const char *uri; | ||
}; | ||
|
||
static int wait_for_group_membership (flux_t *h, | ||
const char *name, | ||
int rank, | ||
flux_error_t *error) | ||
{ | ||
flux_future_t *f; | ||
bool online = false; | ||
|
||
if (!(f = flux_rpc_pack (h, | ||
"groups.get", | ||
0, | ||
FLUX_RPC_STREAMING, | ||
"{s:s}", | ||
"name", "broker.online"))) { | ||
errprintf (error, "broker.online: %s", strerror (errno)); | ||
return -1; | ||
} | ||
do { | ||
const char *members; | ||
struct idset *ids; | ||
|
||
if (flux_rpc_get_unpack (f, "{s:s}", "members", &members) < 0) { | ||
errprintf (error, "broker.online: %s", future_strerror (f, errno)); | ||
break; | ||
} | ||
if ((ids = idset_decode (members)) | ||
&& idset_test (ids, rank)) | ||
online = true; | ||
idset_destroy (ids); | ||
flux_future_reset (f); | ||
} while (!online); | ||
flux_future_destroy (f); | ||
|
||
return online ? 0 : -1; | ||
} | ||
|
||
static int set_attrs (attr_t *attrs, json_t *dict) | ||
{ | ||
const char *key; | ||
json_t *val; | ||
|
||
json_object_foreach (dict, key, val) { | ||
const char *s = json_string_value (val); | ||
if (!s) { | ||
errno = EPROTO; | ||
return -1; | ||
} | ||
if (attr_add (attrs, key, s, ATTR_IMMUTABLE) < 0) | ||
return -1; | ||
} | ||
return 0; | ||
} | ||
|
||
int boot_flub (struct broker *ctx, flux_error_t *error) | ||
{ | ||
const char *uri = NULL; | ||
flux_t *h; | ||
flux_future_t *f = NULL; | ||
flux_future_t *f2 = NULL; | ||
struct topology *topo = NULL; | ||
const char *topo_uri; | ||
struct boot_info info; | ||
struct boot_parent parent; | ||
const char *bind_uri = NULL; | ||
int rc = -1; | ||
|
||
/* Ask a Flux instance to allocate an available rank. | ||
* N.B. the broker unsets FLUX_URI so either it is set on the | ||
* command line via broker.boot-server, or the compiled-in path | ||
* is used (system instance). | ||
*/ | ||
(void)attr_get (ctx->attrs, "broker.boot-server", &uri, NULL); | ||
if (!(h = flux_open_ex (uri, 0, error))) | ||
return -1; | ||
if (!(f = flux_rpc_pack (h, | ||
"overlay.flub-getinfo", | ||
0, | ||
0, | ||
"{}")) | ||
|| flux_rpc_get_unpack (f, | ||
"{s:i s:i s:o}", | ||
"rank", &info.rank, | ||
"size", &info.size, | ||
"attrs", &info.attrs) < 0) { | ||
errprintf (error, "%s", future_strerror (f, errno)); | ||
goto out; | ||
} | ||
/* Set instance attributes obtained from boot server. | ||
*/ | ||
if (set_attrs (ctx->attrs, info.attrs) < 0) { | ||
errprintf (error, "error setting attributes: %s", strerror (errno)); | ||
goto out; | ||
} | ||
/* Create topology. All ranks are assumed to have the same topology. | ||
* The tbon.topo attribute is set in overlay_create() if not provided | ||
* on the command line. | ||
*/ | ||
if (attr_get (ctx->attrs, "tbon.topo", &topo_uri, NULL) < 0) { | ||
errprintf (error, "error fetching tbon.topo attribute"); | ||
goto out; | ||
} | ||
if (!(topo = topology_create (topo_uri, info.size, NULL)) | ||
|| topology_set_rank (topo, info.rank) < 0 | ||
|| overlay_set_topology (ctx->overlay, topo) < 0) { | ||
errprintf (error, "error creating topology: %s", strerror (errno)); | ||
goto out; | ||
} | ||
if ((parent.rank = topology_get_parent (topo)) < 0) { | ||
errprintf (error, | ||
"rank %d has no parent in %s topology", | ||
info.rank, | ||
topo_uri); | ||
goto out; | ||
} | ||
/* Exchange public keys with TBON parent and obtain its URI. | ||
*/ | ||
if (wait_for_group_membership (h, "broker.online", parent.rank, error) < 0) | ||
goto out; | ||
if (!(f2 = flux_rpc_pack (h, | ||
"overlay.flub-kex", | ||
parent.rank, | ||
0, | ||
"{s:s s:s}", | ||
"name", overlay_cert_name (ctx->overlay), | ||
"pubkey", overlay_cert_pubkey (ctx->overlay))) | ||
|| flux_rpc_get_unpack (f2, | ||
"{s:s s:s}", | ||
"pubkey", &parent.pubkey, | ||
"uri", &parent.uri) < 0) { | ||
errprintf (error, "%s", future_strerror (f, errno)); | ||
goto out; | ||
} | ||
/* Inform overlay subsystem of parent info. | ||
*/ | ||
if (overlay_set_parent_uri (ctx->overlay, parent.uri) < 0 | ||
|| overlay_set_parent_pubkey (ctx->overlay, parent.pubkey) < 0) { | ||
errprintf (error, | ||
"error setting up overlay parameters: %s", | ||
strerror (errno)); | ||
goto out; | ||
} | ||
/* If there are children, bind to zmq socket and update tbon.endpoint. | ||
* Since we don't know if OUR children are co-located on the same node, | ||
* always use the tcp transport. | ||
*/ | ||
if (topology_get_child_ranks (topo, NULL, 0) > 0) { | ||
char ipaddr[HOST_NAME_MAX + 1]; | ||
char *wild = NULL; | ||
|
||
overlay_set_ipv6 (ctx->overlay, 1); | ||
if (ipaddr_getprimary (ipaddr, | ||
sizeof (ipaddr), | ||
error->text, | ||
sizeof (error->text)) < 0) | ||
goto out; | ||
if (asprintf (&wild, "tcp://%s:*", ipaddr) < 0 | ||
|| overlay_bind (ctx->overlay, wild) < 0) { | ||
errprintf (error, "error binding to tcp://%s:*", ipaddr); | ||
ERRNO_SAFE_WRAP (free, wild); | ||
goto out; | ||
} | ||
bind_uri = overlay_get_bind_uri (ctx->overlay); | ||
} | ||
if (attr_add (ctx->attrs, "tbon.endpoint", bind_uri, ATTR_IMMUTABLE) < 0) { | ||
errprintf (error, "setattr tbon.endpoint"); | ||
goto out; | ||
} | ||
rc = 0; | ||
out: | ||
topology_decref (topo); | ||
flux_future_destroy (f); | ||
flux_future_destroy (f2); | ||
flux_close (h); | ||
return rc; | ||
} | ||
|
||
// vi:ts=4 sw=4 expandtab |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/************************************************************\ | ||
* Copyright 2023 Lawrence Livermore National Security, LLC | ||
* (c.f. AUTHORS, NOTICE.LLNS, COPYING) | ||
* | ||
* This file is part of the Flux resource manager framework. | ||
* For details, see https://github.com/flux-framework. | ||
* | ||
* SPDX-License-Identifier: LGPL-3.0 | ||
\************************************************************/ | ||
|
||
#ifndef BROKER_BOOT_FLUB_H | ||
#define BROKER_BOOT_FLUB_H | ||
|
||
#include <flux/core.h> | ||
|
||
#include "broker.h" | ||
|
||
int boot_flub (struct broker *ctx, flux_error_t *error); | ||
|
||
#endif /* BROKER_BOOT_FLUB_H */ | ||
|
||
// vi:ts=4 sw=4 expandtab |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason not to call
uri_resolve(3)
here? Then a JOBID or other resolvable URI could be provided directly to thebroker.boot-server
attribute.Although, if the intent is to add a higher level command like
flux expand OPTIONS... JOBID
, perhaps that's unnecessary.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this might be helpful. I'm not sure what the next steps are for tooling so may as well add that for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A complication is that
uri_resolve(3)
requires FLUX_URI to be set, and the broker unsets that. I could setenv FLUX_URI to the value of theparent-uri
attribute around the call but since the environment is global...eww? Maybe we should table this one for now.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, yeah, that's a bummer. It might be useful in the long term in case you target the URI of a job that hasn't started yet, but doesn't seem like a big deal for now.
Maybe we need a version of
uri_resolve(3)
that takes auri
argument and (somehow) passes that down toflux uri
if that command is invoked. Meh, probably too soon to worry about it.