Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

broker: enable brokers to be added to running instances #5184

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/broker/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ libbroker_la_SOURCES = \
boot_config.c \
boot_pmi.h \
boot_pmi.c \
boot_flub.h \
boot_flub.c \
publisher.h \
publisher.c \
groups.h \
Expand Down
223 changes: 223 additions & 0 deletions src/broker/boot_flub.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

/* boot_flub.c - FLUx Boot protocol
*
* Add a broker to an existing Flux instance.
*/

#if HAVE_CONFIG_H
#include "config.h"
#endif
#include <errno.h>
#include <jansson.h>
#include <flux/core.h>

#include "src/common/libidset/idset.h"
#include "src/common/libutil/errprintf.h"
#include "src/common/libutil/errno_safe.h"
#include "src/common/libutil/ipaddr.h"
#include "ccan/str/str.h"

#include "attr.h"
#include "overlay.h"
#include "topology.h"

#include "boot_flub.h"

struct boot_info {
int size;
int rank;
json_t *attrs;
};

struct boot_parent {
char *pubkey;
int rank;
const char *uri;
};

static int wait_for_group_membership (flux_t *h,
const char *name,
int rank,
flux_error_t *error)
{
flux_future_t *f;
bool online = false;

if (!(f = flux_rpc_pack (h,
"groups.get",
0,
FLUX_RPC_STREAMING,
"{s:s}",
"name", "broker.online"))) {
errprintf (error, "broker.online: %s", strerror (errno));
return -1;

Check warning on line 62 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L61-L62

Added lines #L61 - L62 were not covered by tests
}
do {
const char *members;
struct idset *ids;

if (flux_rpc_get_unpack (f, "{s:s}", "members", &members) < 0) {
errprintf (error, "broker.online: %s", future_strerror (f, errno));
break;

Check warning on line 70 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L69-L70

Added lines #L69 - L70 were not covered by tests
}
if ((ids = idset_decode (members))
&& idset_test (ids, rank))
online = true;
idset_destroy (ids);
flux_future_reset (f);
} while (!online);
flux_future_destroy (f);

return online ? 0 : -1;
}

static int set_attrs (attr_t *attrs, json_t *dict)
{
const char *key;
json_t *val;

json_object_foreach (dict, key, val) {
const char *s = json_string_value (val);
if (!s) {
errno = EPROTO;
return -1;

Check warning on line 92 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L91-L92

Added lines #L91 - L92 were not covered by tests
}
if (attr_add (attrs, key, s, ATTR_IMMUTABLE) < 0)
return -1;
}
return 0;
}

int boot_flub (struct broker *ctx, flux_error_t *error)
{
const char *uri = NULL;
flux_t *h;
flux_future_t *f = NULL;
flux_future_t *f2 = NULL;
struct topology *topo = NULL;
const char *topo_uri;
struct boot_info info;
struct boot_parent parent;
const char *bind_uri = NULL;
int rc = -1;

/* Ask a Flux instance to allocate an available rank.
* N.B. the broker unsets FLUX_URI so either it is set on the
* command line via broker.boot-server, or the compiled-in path
* is used (system instance).
*/
(void)attr_get (ctx->attrs, "broker.boot-server", &uri, NULL);
if (!(h = flux_open_ex (uri, 0, error)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to call uri_resolve(3) here? Then a JOBID or other resolvable URI could be provided directly to the broker.boot-server attribute.

Although, if the intent is to add a higher level command like flux expand OPTIONS... JOBID, perhaps that's unnecessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this might be helpful. I'm not sure what the next steps are for tooling so may as well add that for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A complication is that uri_resolve(3) requires FLUX_URI to be set, and the broker unsets that. I could setenv FLUX_URI to the value of the parent-uri attribute around the call but since the environment is global...eww? Maybe we should table this one for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, yeah, that's a bummer. It might be useful in the long term in case you target the URI of a job that hasn't started yet, but doesn't seem like a big deal for now.

Maybe we need a version of uri_resolve(3) that takes a uri argument and (somehow) passes that down to flux uri if that command is invoked. Meh, probably too soon to worry about it.

return -1;
if (!(f = flux_rpc_pack (h,
"overlay.flub-getinfo",
0,
0,
"{}"))
|| flux_rpc_get_unpack (f,
"{s:i s:i s:o}",
"rank", &info.rank,
"size", &info.size,
"attrs", &info.attrs) < 0) {
errprintf (error, "%s", future_strerror (f, errno));
goto out;
}
/* Set instance attributes obtained from boot server.
*/
if (set_attrs (ctx->attrs, info.attrs) < 0) {
errprintf (error, "error setting attributes: %s", strerror (errno));
goto out;

Check warning on line 138 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L137-L138

Added lines #L137 - L138 were not covered by tests
}
/* Create topology. All ranks are assumed to have the same topology.
* The tbon.topo attribute is set in overlay_create() if not provided
* on the command line.
*/
if (attr_get (ctx->attrs, "tbon.topo", &topo_uri, NULL) < 0) {
errprintf (error, "error fetching tbon.topo attribute");
goto out;

Check warning on line 146 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L145-L146

Added lines #L145 - L146 were not covered by tests
}
if (!(topo = topology_create (topo_uri, info.size, NULL))
|| topology_set_rank (topo, info.rank) < 0
|| overlay_set_topology (ctx->overlay, topo) < 0) {
errprintf (error, "error creating topology: %s", strerror (errno));
goto out;

Check warning on line 152 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L151-L152

Added lines #L151 - L152 were not covered by tests
}
if ((parent.rank = topology_get_parent (topo)) < 0) {
errprintf (error,

Check warning on line 155 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L155

Added line #L155 was not covered by tests
"rank %d has no parent in %s topology",
info.rank,
topo_uri);
goto out;

Check warning on line 159 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L159

Added line #L159 was not covered by tests
}
/* Exchange public keys with TBON parent and obtain its URI.
*/
if (wait_for_group_membership (h, "broker.online", parent.rank, error) < 0)
goto out;

Check warning on line 164 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L164

Added line #L164 was not covered by tests
if (!(f2 = flux_rpc_pack (h,
"overlay.flub-kex",
parent.rank,
0,
"{s:s s:s}",
"name", overlay_cert_name (ctx->overlay),
"pubkey", overlay_cert_pubkey (ctx->overlay)))
|| flux_rpc_get_unpack (f2,
"{s:s s:s}",
"pubkey", &parent.pubkey,
"uri", &parent.uri) < 0) {
errprintf (error, "%s", future_strerror (f, errno));
goto out;

Check warning on line 177 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L176-L177

Added lines #L176 - L177 were not covered by tests
}
/* Inform overlay subsystem of parent info.
*/
if (overlay_set_parent_uri (ctx->overlay, parent.uri) < 0
|| overlay_set_parent_pubkey (ctx->overlay, parent.pubkey) < 0) {
errprintf (error,

Check warning on line 183 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L183

Added line #L183 was not covered by tests
"error setting up overlay parameters: %s",
strerror (errno));
goto out;

Check warning on line 186 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L185-L186

Added lines #L185 - L186 were not covered by tests
}
/* If there are children, bind to zmq socket and update tbon.endpoint.
* Since we don't know if OUR children are co-located on the same node,
* always use the tcp transport.
*/
if (topology_get_child_ranks (topo, NULL, 0) > 0) {
char ipaddr[HOST_NAME_MAX + 1];
char *wild = NULL;

overlay_set_ipv6 (ctx->overlay, 1);
if (ipaddr_getprimary (ipaddr,
sizeof (ipaddr),
error->text,
sizeof (error->text)) < 0)
goto out;

Check warning on line 201 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L201

Added line #L201 was not covered by tests
if (asprintf (&wild, "tcp://%s:*", ipaddr) < 0
|| overlay_bind (ctx->overlay, wild) < 0) {
errprintf (error, "error binding to tcp://%s:*", ipaddr);
ERRNO_SAFE_WRAP (free, wild);
goto out;

Check warning on line 206 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L204-L206

Added lines #L204 - L206 were not covered by tests
}
bind_uri = overlay_get_bind_uri (ctx->overlay);
}
if (attr_add (ctx->attrs, "tbon.endpoint", bind_uri, ATTR_IMMUTABLE) < 0) {
errprintf (error, "setattr tbon.endpoint");
goto out;

Check warning on line 212 in src/broker/boot_flub.c

View check run for this annotation

Codecov / codecov/patch

src/broker/boot_flub.c#L211-L212

Added lines #L211 - L212 were not covered by tests
}
rc = 0;
out:
topology_decref (topo);
flux_future_destroy (f);
flux_future_destroy (f2);
flux_close (h);
return rc;
}

// vi:ts=4 sw=4 expandtab
22 changes: 22 additions & 0 deletions src/broker/boot_flub.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/************************************************************\
* Copyright 2023 Lawrence Livermore National Security, LLC
* (c.f. AUTHORS, NOTICE.LLNS, COPYING)
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* SPDX-License-Identifier: LGPL-3.0
\************************************************************/

#ifndef BROKER_BOOT_FLUB_H
#define BROKER_BOOT_FLUB_H

#include <flux/core.h>

#include "broker.h"

int boot_flub (struct broker *ctx, flux_error_t *error);

#endif /* BROKER_BOOT_FLUB_H */

// vi:ts=4 sw=4 expandtab
Loading
Loading