Skip to content

Commit

Permalink
Refactor map_dist
Browse files Browse the repository at this point in the history
Signed-off-by: Li Wei <[email protected]>
Required-githooks: true
  • Loading branch information
liw committed Feb 7, 2024
1 parent 5fabdb3 commit fbf2dd1
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 51 deletions.
4 changes: 2 additions & 2 deletions src/include/daos_srv/pool.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -340,7 +340,7 @@ int ds_pool_iv_srv_hdl_fetch(struct ds_pool *pool, uuid_t *pool_hdl_uuid,
uuid_t *cont_hdl_uuid);

int ds_pool_svc_term_get(uuid_t uuid, uint64_t *term);
int ds_pool_svc_global_map_version_get(uuid_t uuid, uint32_t *global_ver);
int ds_pool_svc_query_map_dist(uuid_t uuid, uint32_t *version, bool *idle);

int
ds_pool_child_map_refresh_sync(struct ds_pool_child *dpc);
Expand Down
14 changes: 8 additions & 6 deletions src/include/daos_srv/rsvc.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2019-2022 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -75,10 +75,10 @@ struct ds_rsvc_class {
void (*sc_drain)(struct ds_rsvc *svc);

/**
* Distribute the system/pool map in the system/pool. This callback is
* optional.
* Distribute the system/pool map in the system/pool and return its
* version. This callback is optional.
*/
int (*sc_map_dist)(struct ds_rsvc *svc);
int (*sc_map_dist)(struct ds_rsvc *svc, uint32_t *version);
};

void ds_rsvc_class_register(enum ds_rsvc_class_id id,
Expand All @@ -105,7 +105,6 @@ struct ds_rsvc {
char *s_db_path;
uuid_t s_db_uuid;
int s_ref;
uint32_t s_gen;
ABT_mutex s_mutex; /* for the following members */
bool s_stop;
bool s_destroy; /* when putting last ref */
Expand All @@ -114,7 +113,9 @@ struct ds_rsvc {
ABT_cond s_state_cv;
int s_leader_ref; /* on leader state */
ABT_cond s_leader_ref_cv;
bool s_map_dist; /* has a map dist request? */
bool s_map_dist; /* has a queued map dist request? */
bool s_map_dist_inp; /* has a in-progress map dist request? */
uint32_t s_map_dist_ver; /* highest map version distributed */
ABT_cond s_map_dist_cv;
ABT_thread s_map_distd;
bool s_map_distd_stop;
Expand Down Expand Up @@ -175,6 +176,7 @@ int ds_rsvc_list_attr(struct ds_rsvc *svc, struct rdb_tx *tx, rdb_path_t *path,
size_t ds_rsvc_get_md_cap(void);

void ds_rsvc_request_map_dist(struct ds_rsvc *svc);
void ds_rsvc_query_map_dist(struct ds_rsvc *svc, uint32_t *version, bool *idle);
void ds_rsvc_wait_map_dist(struct ds_rsvc *svc);

#endif /* DAOS_SRV_RSVC_H */
8 changes: 5 additions & 3 deletions src/mgmt/srv_system.c
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ map_update_bcast(crt_context_t ctx, struct mgmt_svc *svc, uint32_t map_version,
}

static int
mgmt_svc_map_dist_cb(struct ds_rsvc *rsvc)
mgmt_svc_map_dist_cb(struct ds_rsvc *rsvc, uint32_t *version)
{
struct mgmt_svc *svc = mgmt_svc_obj(rsvc);
struct dss_module_info *info = dss_get_module_info();
Expand All @@ -283,10 +283,12 @@ mgmt_svc_map_dist_cb(struct ds_rsvc *rsvc)

rc = map_update_bcast(info->dmi_ctx, svc, map_version,
n_map_servers, map_servers);

free_server_list(map_servers, n_map_servers);
if (rc != 0)
return rc;

return rc;
*version = map_version;
return 0;
}

static struct ds_rsvc_class mgmt_svc_rsvc_class = {
Expand Down
10 changes: 5 additions & 5 deletions src/pool/srv_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ struct pool_svc {
bool ps_force_notify; /* MS of PS membership */
struct pool_svc_sched ps_reconf_sched;
struct pool_svc_sched ps_rfcheck_sched; /* Check all containers RF for the pool */
uint32_t ps_global_map_version; /* global pool map version on all targets */
uint32_t ps_ops_enabled; /* cached ds_pool_prop_svc_ops_enabled */
uint32_t ps_ops_max; /* cached ds_pool_prop_svc_ops_max */
uint32_t ps_ops_age; /* cached ds_pool_prop_svc_ops_age */
Expand Down Expand Up @@ -2028,7 +2027,7 @@ pool_svc_drain_cb(struct ds_rsvc *rsvc)
}

static int
pool_svc_map_dist_cb(struct ds_rsvc *rsvc)
pool_svc_map_dist_cb(struct ds_rsvc *rsvc, uint32_t *version)
{
struct pool_svc *svc = pool_svc_obj(rsvc);
struct rdb_tx tx;
Expand All @@ -2055,7 +2054,8 @@ pool_svc_map_dist_cb(struct ds_rsvc *rsvc)
map_version);
D_GOTO(out, rc);
}
svc->ps_global_map_version = max(svc->ps_global_map_version, map_version);

*version = map_version;
out:
if (map_buf != NULL)
D_FREE(map_buf);
Expand Down Expand Up @@ -8183,7 +8183,7 @@ ds_pool_iv_ns_update(struct ds_pool *pool, unsigned int master_rank,
}

int
ds_pool_svc_global_map_version_get(uuid_t uuid, uint32_t *version)
ds_pool_svc_query_map_dist(uuid_t uuid, uint32_t *version, bool *idle)
{
struct pool_svc *svc;
int rc;
Expand All @@ -8192,7 +8192,7 @@ ds_pool_svc_global_map_version_get(uuid_t uuid, uint32_t *version)
if (rc != 0)
return rc;

*version = svc->ps_global_map_version;
ds_rsvc_query_map_dist(&svc->ps_rsvc, version, idle);

pool_svc_put_leader(svc);
return 0;
Expand Down
15 changes: 7 additions & 8 deletions src/rebuild/srv.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* (C) Copyright 2016-2023 Intel Corporation.
* (C) Copyright 2016-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -1443,7 +1443,7 @@ rebuild_task_ult(void *arg)
{
struct rebuild_task *task = arg;
struct ds_pool *pool;
uint32_t global_ver = 0;
uint32_t map_dist_ver = 0;
struct rebuild_global_pool_tracker *rgt = NULL;
d_rank_t myrank;
uint64_t cur_ts = 0;
Expand All @@ -1468,26 +1468,25 @@ rebuild_task_ult(void *arg)
/* Check if the leader pool map has been synced to all other targets
* to avoid -DER_GRP error.
*/
rc = ds_pool_svc_global_map_version_get(task->dst_pool_uuid, &global_ver);
rc = ds_pool_svc_query_map_dist(task->dst_pool_uuid, &map_dist_ver, NULL);
if (rc) {
D_ERROR("Get pool service version failed: "DF_RC"\n",
DP_RC(rc));
DL_ERROR(rc, DF_UUID ": failed to get pool map distribution version",
DP_UUID(task->dst_pool_uuid));
D_GOTO(out_pool, rc);
}

D_DEBUG(DB_REBUILD, "global_ver %u map ver %u\n", global_ver,
D_DEBUG(DB_REBUILD, "map_dist_ver %u map ver %u\n", map_dist_ver,
task->dst_map_ver);

if (pool->sp_stopping)
D_GOTO(out_pool, rc = -DER_SHUTDOWN);

if (pool->sp_map_version <= global_ver)
if (pool->sp_map_version <= map_dist_ver)
break;

dss_sleep(1000);
}


rc = crt_group_rank(pool->sp_group, &myrank);
D_ASSERT(rc == 0);
rc = rebuild_notify_ras_start(&task->dst_pool_uuid, task->dst_map_ver,
Expand Down
83 changes: 56 additions & 27 deletions src/rsvc/srv.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* (C) Copyright 2019-2023 Intel Corporation.
* (C) Copyright 2019-2024 Intel Corporation.
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -406,8 +406,9 @@ init_map_distd(struct ds_rsvc *svc)
int rc;

D_ASSERT(svc->s_map_distd == ABT_THREAD_NULL);
svc->s_gen = 0;
svc->s_map_dist = false;
svc->s_map_dist_inp = false;
svc->s_map_dist_ver = 0;
svc->s_map_distd_stop = false;

ds_rsvc_get(svc);
Expand Down Expand Up @@ -608,45 +609,52 @@ static void
map_distd(void *arg)
{
struct ds_rsvc *svc = arg;
uint32_t gen = 0;
int rc;
bool stop;

D_DEBUG(DB_MD, "%s: start\n", svc->s_name);
ABT_mutex_lock(svc->s_mutex);
for (;;) {
ABT_mutex_lock(svc->s_mutex);
uint32_t version;
int rc;

for (;;) {
stop = svc->s_map_distd_stop;
if (stop)
break;
if (svc->s_map_distd_stop)
goto break_out;
if (svc->s_map_dist) {
gen = svc->s_gen;
/* Dequeue the request and start serving it. */
svc->s_map_dist = false;
svc->s_map_dist_inp = true;
break;
}
sched_cond_wait(svc->s_map_dist_cv, svc->s_mutex);
}
ABT_mutex_unlock(svc->s_mutex);
if (stop)
break;
rc = rsvc_class(svc->s_class)->sc_map_dist(svc);

rc = rsvc_class(svc->s_class)->sc_map_dist(svc, &version);
if (rc != 0) {
/*
* Try again, but back off a little bit to limit the
* retry rate.
*/
dss_sleep(3000 /* ms */);
}

ABT_mutex_lock(svc->s_mutex);
/* Stop serving the request. */
svc->s_map_dist_inp = false;
if (rc == 0) {
if (version > svc->s_map_dist_ver)
svc->s_map_dist_ver = version;
ABT_cond_broadcast(svc->s_map_dist_cv);
} else {
ABT_mutex_lock(svc->s_mutex);
if (gen == svc->s_gen) {
svc->s_map_dist = false;
ABT_cond_broadcast(svc->s_map_dist_cv);
}
ABT_mutex_unlock(svc->s_mutex);
/* Enqueue the request again. */
svc->s_map_dist = true;
}
}
break_out:
ABT_mutex_unlock(svc->s_mutex);
put_leader(svc);
ds_rsvc_put(svc);
D_DEBUG(DB_MD, "%s: stop\n", svc->s_name);
ds_rsvc_put(svc);
}

/**
Expand All @@ -658,27 +666,48 @@ map_distd(void *arg)
void
ds_rsvc_request_map_dist(struct ds_rsvc *svc)
{
svc->s_gen++;
svc->s_map_dist = true;
ABT_cond_broadcast(svc->s_map_dist_cv);
D_DEBUG(DB_MD, "%s: requested map distribution\n", svc->s_name);
}

/**
* Query the map distribution state.
*
* \param[in] svc replicated service
* \param[out] version if not NULL, highest map version distributed
* successfully
* \param[out] idle if not NULL, whether map distribution is idle (i.e., no
* in-progress or pending request)
*/
void
ds_rsvc_wait_map_dist(struct ds_rsvc *svc)
ds_rsvc_query_map_dist(struct ds_rsvc *svc, uint32_t *version, bool *idle)
{
D_DEBUG(DB_MD, "%s: waiting map dist %u\n", svc->s_name, svc->s_gen);
if (version != NULL)
*version = svc->s_map_dist_ver;
if (idle != NULL)
*idle = !svc->s_map_dist_inp && !svc->s_map_dist;
}

/**
* Wait until map distribution is idle or stopping.
*
* \param[in] svc replicated service
*/
void
ds_rsvc_wait_map_dist(struct ds_rsvc *svc)
{
D_DEBUG(DB_MD, "%s: begin", svc->s_name);
ABT_mutex_lock(svc->s_mutex);
for (;;) {
if (svc->s_map_distd_stop || !svc->s_map_dist)
if (svc->s_map_distd_stop)
break;
if (!svc->s_map_dist && !svc->s_map_dist_inp)
break;

sched_cond_wait(svc->s_map_dist_cv, svc->s_mutex);
}
ABT_mutex_unlock(svc->s_mutex);

D_DEBUG(DB_MD, "%s: map dist done %u\n", svc->s_name, svc->s_gen);
D_DEBUG(DB_MD, "%s: end", svc->s_name);
}

static char *
Expand Down

0 comments on commit fbf2dd1

Please sign in to comment.