From 09c24ec40a1069d0f1ed2369ab1ad84957e8d28e Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Mon, 27 Jan 2020 16:36:40 +0000 Subject: [PATCH 1/9] CaRT-788 : Added new API to allow for reset of fabric interfaces on PSM2 when Eager message problems happen due to client reboots. Signed-off-by: Vikram Chhabra --- src/cart/crt_hg.c | 277 +++++++++++++++++++++++++++++++++- src/cart/crt_hg.h | 9 +- src/cart/crt_init.c | 6 +- src/cart/crt_internal_types.h | 2 + src/cart/crt_rpc.c | 3 +- src/cart/crt_rpc.h | 2 + src/crt_launch/crt_launch.c | 15 +- src/include/cart/api.h | 15 ++ src/include/cart/types.h | 2 + 9 files changed, 318 insertions(+), 13 deletions(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index cca2a32f8..56daa416f 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -42,6 +42,146 @@ #include "crt_internal.h" +#ifdef HASH_TEST +int hg_addr_table_index=0; +#endif + +struct ha_entry { + hg_class_t *hg_class; + hg_addr_t hg_addr; +}; + +struct crt_ha_mapping { + d_list_t ha_link; + uint64_t ha_key; + struct ha_entry ha_value; + + uint32_t ha_ref; + uint32_t ha_initialized; + + pthread_mutex_t ha_mutex; +}; + +struct crt_ha_mapping * +crt_ha_link2ptr(d_list_t *rlink) +{ + D_ASSERT(rlink != NULL); + return container_of(rlink, struct crt_ha_mapping, ha_link); +} + +static struct crt_ha_mapping * +crt_ha_mapping_init(uint64_t key, struct ha_entry value) +{ + struct crt_ha_mapping *ha; + int rc; + + D_ALLOC_PTR(ha); + if (!ha) { + D_ERROR("Failed to allocate ha item\n"); + D_GOTO(out, ha); + } + + D_INIT_LIST_HEAD(&ha->ha_link); + ha->ha_ref = 0; + ha->ha_initialized = 1; + + rc = D_MUTEX_INIT(&ha->ha_mutex, NULL); + if (rc != 0) { + D_FREE_PTR(ha); + D_GOTO(out, ha = NULL); + } + + ha->ha_key = key; + ha->ha_value = value; + +out: + return ha; +} + +static int +ha_op_key_get(struct d_hash_table *hhtab, d_list_t *rlink, void **key_pp) +{ + struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink); + + *key_pp = (void *)&ha->ha_key; + return sizeof(ha->ha_key); +} + +static uint32_t +ha_op_key_hash(struct d_hash_table *hhtab, const void *key, unsigned int ksize) +{ + D_ASSERT(ksize == sizeof(uint64_t)); + + return (unsigned int)(*(const uint64_t *)key % + (1U << CRT_HG_ADDR_CACHE_LOOKUP_BITS)); +} + +static bool +ha_op_key_cmp(struct d_hash_table *hhtab, d_list_t *rlink, + const void *key, unsigned int ksize) +{ + struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink); + + D_ASSERT(ksize == sizeof(uint64_t)); + + return ha->ha_key == *(uint64_t *)key; +} + +static void +ha_op_rec_addref(struct d_hash_table *hhtab, d_list_t *halink) +{ + struct crt_ha_mapping *ha = crt_ha_link2ptr(halink); + + D_ASSERT(ha->ha_initialized); + D_MUTEX_LOCK(&ha->ha_mutex); + ha->ha_ref++; + D_MUTEX_UNLOCK(&ha->ha_mutex); +} + +static bool +ha_op_rec_decref(struct d_hash_table *hhtab, d_list_t *halink) +{ + uint32_t ref; + struct crt_ha_mapping *ha = crt_ha_link2ptr(halink); + + D_ASSERT(ha->ha_initialized); + D_MUTEX_LOCK(&ha->ha_mutex); + ref = --ha->ha_ref; + D_MUTEX_UNLOCK(&ha->ha_mutex); + + return ref == 0; +} + +static void +crt_ha_destroy(struct crt_ha_mapping *ha) +{ + D_ASSERT(ha != NULL); + D_ASSERT(ha->ha_ref == 0); + D_ASSERT(ha->ha_initialized == 1); + + D_MUTEX_DESTROY(&ha->ha_mutex); + D_FREE(ha); +} + +static void +ha_op_rec_free(struct d_hash_table *hhtab, d_list_t *halink) +{ + crt_ha_destroy(crt_ha_link2ptr(halink)); +} + + + +static d_hash_table_ops_t ha_mapping_ops = { + .hop_key_get = ha_op_key_get, + .hop_key_hash = ha_op_key_hash, + .hop_key_cmp = ha_op_key_cmp, + .hop_rec_addref = ha_op_rec_addref, + .hop_rec_decref = ha_op_rec_decref, + .hop_rec_free = ha_op_rec_free, +}; + +struct d_hash_table ha_hash_table; + /* * na_dict table should be in the same order of enum crt_na_type, the last one * is terminator with NULL nad_str. @@ -327,6 +467,85 @@ crt_hg_addr_lookup(struct crt_hg_context *hg_ctx, const char *name, return rc; } +int +crt_hg_remove_addr (hg_class_t *hg_class, hg_addr_t hg_addr) +{ + hg_return_t ret = HG_SUCCESS; + + D_DEBUG(DB_TRACE, "removing addr from class \n" ); + ret = HG_Addr_set_remove(hg_class, hg_addr); + if (ret != HG_SUCCESS) { + D_ERROR("HG_Addr_set_remove) failed, hg_ret %d.\n", ret); + D_GOTO(out, ret = -DER_HG); + } + + D_DEBUG(DB_TRACE, "freeing addr from class\n"); + ret = HG_Addr_free(hg_class, hg_addr); + if (ret != HG_SUCCESS) { + D_ERROR("HG_Addr_free() failed, hg_ret %d.\n", ret); + D_GOTO(out, ret = -DER_HG); + } + +out: + return ret; +} +int +crt_hg_remove_client_id (uint64_t client_id) +{ + hg_return_t ret = HG_SUCCESS; + d_list_t *halink; + struct crt_ha_mapping *ha; + struct ha_entry ha_value; + + halink = d_hash_rec_find(&ha_hash_table, + (void *)&client_id, sizeof(client_id)); + if (!halink) { + D_ERROR("client=%" PRIu64 " not part of the hash table\n", client_id); + D_GOTO(out, ret = -DER_HG); + } + + ha = crt_ha_link2ptr(halink); + ha_value = ha->ha_value; + + d_hash_rec_decref(&ha_hash_table, halink); + + ret = crt_hg_remove_addr (ha_value.hg_class, ha_value.hg_addr); + + if (ret != HG_SUCCESS) { + D_ERROR("crt_remove_addr failed, hg_ret %d.\n", ret); + D_GOTO(out, ret = -DER_HG); + } + + + d_hash_rec_delete(&ha_hash_table, + &client_id, sizeof(client_id)); + + D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n", + client_id); + D_DEBUG(DB_TRACE, "removed hg class %" PRIu64 "\n",(long unsigned int) ha_value.hg_class ); + D_DEBUG(DB_TRACE, "removed hg addr %" PRIu64 "\n",(long unsigned int) ha_value.hg_addr ); + +out: + return ret; +} +int +crt_hg_remove_all_client_ids (void) +{ + hg_return_t ret = HG_SUCCESS; + +#ifdef HASH_TEST + while (hg_addr_table_index > 0) { + ret = crt_hg_remove_client_id (hg_addr_table[--hg_addr_table_index]); + if (ret != HG_SUCCESS) { + D_ERROR("crt_remove_addr_all failed, hg_ret %d.\n", ret); + D_GOTO(out, ret = -DER_HG); + } + } +#endif +out: + return ret; +} + int crt_hg_addr_free(struct crt_hg_context *hg_ctx, hg_addr_t addr) { @@ -578,7 +797,15 @@ crt_hg_init(crt_phy_addr_t *addr, bool server) D_DEBUG(DB_NET, "in crt_hg_init, listen address: %s.\n", *addr); crt_gdata.cg_hg = hg_gdata; - + /*Create the hg_addr hash table*/ + rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK, + CRT_LOOKUP_CACHE_BITS, + NULL, &ha_mapping_ops, + &ha_hash_table); + if (rc != 0) { + D_ERROR("d_hash_table_create failed, rc: %d\n", rc); + D_GOTO(out, rc); + } out: if (info_string) D_FREE(info_string); @@ -618,7 +845,7 @@ crt_hg_fini() D_WARN("Could not finalize NA class, na_ret: %d.\n", na_ret); rc = -DER_HG; } - + d_hash_table_destroy_inplace(&ha_hash_table, true); D_FREE(crt_gdata.cg_hg); return rc; } @@ -824,7 +1051,9 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) bool is_coll_req = false; int rc = 0; struct crt_rpc_priv rpc_tmp = {0}; - + d_list_t *halink; + struct crt_ha_mapping *ha; + struct ha_entry ha_entry; hg_info = HG_Get_info(hg_hdl); if (hg_info == NULL) { D_ERROR("HG_Get_info failed.\n"); @@ -858,6 +1087,48 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) D_ASSERT(proc != NULL); opc = rpc_tmp.crp_req_hdr.cch_opc; + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) { + /** Search for the recieved hg_addr in the ha hash table. If + * not found, then insert. + */ + halink = d_hash_rec_find(&ha_hash_table, + (void *)&rpc_tmp.crp_req_hdr.cch_clid, + sizeof(rpc_tmp.crp_req_hdr.cch_clid)); + if (halink != NULL) { + D_DEBUG(DB_TRACE, "client id %" PRIu64 "already in hash table\n", + rpc_tmp.crp_req_hdr.cch_clid); + + d_hash_rec_decref(&ha_hash_table, halink); + } + else { + ha_entry.hg_class = hg_info->hg_class; + HG_Addr_dup(hg_info->hg_class, hg_info->addr, &ha_entry.hg_addr); + ha = crt_ha_mapping_init(rpc_tmp.crp_req_hdr.cch_clid, ha_entry); + if (!ha) { + D_ERROR("Failed to allocate entry\n"); + D_GOTO(out, hg_ret = HG_SUCCESS); + } +#ifdef HASH_TEST + hg_addr_table[hg_addr_table_index++] = + rpc_tmp.crp_req_hdr.cch_clid; + +#endif + rc = d_hash_rec_insert(&ha_hash_table, + (void *)&rpc_tmp.crp_req_hdr.cch_clid, + sizeof(rpc_tmp.crp_req_hdr.cch_clid), + &ha->ha_link, true); + if (rc != 0) { + D_ERROR("Failed to add entry; rc=%d\n", rc); + crt_ha_destroy(ha); + D_GOTO(out, hg_ret = HG_SUCCESS); + } + D_DEBUG(DB_TRACE, "client id %" PRIu64 "inserted in hash table\n", + rpc_tmp.crp_req_hdr.cch_clid); + D_DEBUG(DB_TRACE, "inserted hg addr %" PRIu64 "\n", (long unsigned int)ha_entry.hg_addr ); + D_DEBUG(DB_TRACE, "inserted hg class %" PRIu64 "\n",(long unsigned int) ha_entry.hg_class ); + + } + } /** * Set the opcode in the temp RPC so that it can be correctly logged. */ diff --git a/src/cart/crt_hg.h b/src/cart/crt_hg.h index 591ae7ef5..a54802a20 100644 --- a/src/cart/crt_hg.h +++ b/src/cart/crt_hg.h @@ -58,12 +58,19 @@ /** MAX number of HG handles in pool */ #define CRT_HG_POOL_MAX_NUM (512) /** number of prepost HG handles when enable pool */ -#define CRT_HG_POOL_PREPOST_NUM (16) +#define CRT_HG_POOL_PREPOST_NUM (16) +#define CRT_HG_ADDR_CACHE_LOOKUP_BITS (5) +#define HASH_TEST 1 + +#ifdef HASH_TEST +uint64_t hg_addr_table[1000]; +#endif struct crt_rpc_priv; struct crt_common_hdr; struct crt_corpc_hdr; + /** type of NA plugin */ enum crt_na_type { CRT_NA_SM = 0, diff --git a/src/cart/crt_init.c b/src/cart/crt_init.c index b9f2b5d3c..d6685916a 100644 --- a/src/cart/crt_init.c +++ b/src/cart/crt_init.c @@ -97,7 +97,7 @@ static int data_init(crt_init_options_t *opt) crt_gdata.cg_addr = NULL; crt_gdata.cg_na_plugin = CRT_NA_OFI_SOCKETS; crt_gdata.cg_share_na = false; - + crt_gdata.cg_clid = 0; timeout = 0; if (opt && opt->cio_crt_timeout != 0) @@ -164,7 +164,9 @@ static int data_init(crt_init_options_t *opt) else d_fault_inject_disable(); } - + if (opt && opt->cio_clid != 0) + crt_gdata.cg_clid = opt->cio_clid; + /*TBD Do we use an ENV variable if above not defined? */ gdata_init_flag = 1; exit: return rc; diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 6d50696ed..3fd62b982 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -96,6 +96,8 @@ struct crt_gdata { /* protects crt_gdata */ pthread_rwlock_t cg_rwlock; + /* Job Id associated with this instance. */ + uint64_t cg_clid; }; extern struct crt_gdata crt_gdata; diff --git a/src/cart/crt_rpc.c b/src/cart/crt_rpc.c index b5db3544f..73dd9976e 100644 --- a/src/cart/crt_rpc.c +++ b/src/cart/crt_rpc.c @@ -1147,7 +1147,8 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg) * function. Referenced dropped at end of this function. */ RPC_ADDREF(rpc_priv); - + /* Insert the local job id in the rpc request. */ + rpc_priv->crp_req_hdr.cch_clid = crt_gdata.cg_clid; if (req->cr_ctx == NULL) { D_ERROR("invalid parameter (NULL req->cr_ctx).\n"); D_GOTO(out, rc = -DER_INVAL); diff --git a/src/cart/crt_rpc.h b/src/cart/crt_rpc.h index 61941e5f6..2cbcf3bf4 100644 --- a/src/cart/crt_rpc.h +++ b/src/cart/crt_rpc.h @@ -101,6 +101,8 @@ struct crt_common_hdr { uint32_t cch_xid; /* used in crp_reply_hdr to propagate rpc failure back to sender */ uint32_t cch_rc; + /* client id */ + uint64_t cch_clid; }; typedef enum { diff --git a/src/crt_launch/crt_launch.c b/src/crt_launch/crt_launch.c index 5d9a26d0d..6237259e4 100644 --- a/src/crt_launch/crt_launch.c +++ b/src/crt_launch/crt_launch.c @@ -148,13 +148,16 @@ parse_args(int argc, char **argv) static int get_self_uri(struct host *h) { - char *uri; - crt_context_t ctx; - char *p; - int len; - int rc; + char *uri; + crt_context_t ctx; + char *p; + int len; + int rc; + crt_init_options_t opt = {0}; - rc = crt_init(0, CRT_FLAG_BIT_SERVER); + opt.cio_clid = h->my_rank; + + rc = crt_init_opt(0, CRT_FLAG_BIT_SERVER, &opt); if (rc != 0) { D_ERROR("crt_init() failed; rc=%d\n", rc); D_GOTO(out, rc); diff --git a/src/include/cart/api.h b/src/include/cart/api.h index ec211ca45..3942d9eb5 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -1972,6 +1972,21 @@ int crt_group_secondary_modify(crt_group_t *grp, d_rank_list_t *sec_ranks, d_rank_list_t *prim_ranks, crt_group_mod_op_t op, uint32_t version); +/** + * Remove a client id from the Server hash table of Hg and Fabric Interface + * addresses. + * + * param[in] client_id System wide unique client id + * + * return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_hg_remove_client_id (uint64_t client_id); + +/* Test Function. */ +int crt_hg_remove_all_client_ids (void); + + #define crt_proc__Bool crt_proc_bool #define crt_proc_d_rank_t crt_proc_uint32_t #define crt_proc_int crt_proc_int32_t diff --git a/src/include/cart/types.h b/src/include/cart/types.h index 2b29c4842..c0c599353 100644 --- a/src/include/cart/types.h +++ b/src/include/cart/types.h @@ -92,6 +92,8 @@ typedef struct crt_init_options { /** Used with cio_use_credits to set credit limit */ int cio_ep_credits; + /** job id used to uniquely identify the user. **/ + uint64_t cio_clid; } crt_init_options_t; typedef int crt_status_t; From be0983a92d06a852a035420fc4aa5c231e67b4b6 Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Thu, 6 Feb 2020 19:08:11 +0000 Subject: [PATCH 2/9] CART-788: New API for PSM2 provider to fix eager message issues. Signed-off-by: Vikram Chhabra --- src/cart/crt_group.c | 3 + src/cart/crt_hg.c | 608 +++++++++++++++++++++++----------- src/cart/crt_hg.h | 7 +- src/cart/crt_internal_types.h | 6 + src/cart/crt_rpc.c | 6 +- src/cart/crt_rpc.h | 4 +- src/include/cart/api.h | 6 +- 7 files changed, 432 insertions(+), 208 deletions(-) diff --git a/src/cart/crt_group.c b/src/cart/crt_group.c index fe50b946e..000279d23 100644 --- a/src/cart/crt_group.c +++ b/src/cart/crt_group.c @@ -2709,6 +2709,9 @@ crt_group_rank_remove_internal(struct crt_grp_priv *grp_priv, d_rank_t rank) d_hash_rec_delete(&grp_priv->gp_uri_lookup_cache, &rank, sizeof(d_rank_t)); + /*If PSM2 then remove the rank from the hash table. */ + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) + crt_hg_remove_hash_rank(rank); } else { d_rank_t prim_rank; diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 56daa416f..ec415730d 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -42,145 +42,145 @@ #include "crt_internal.h" -#ifdef HASH_TEST -int hg_addr_table_index=0; -#endif - struct ha_entry { - hg_class_t *hg_class; - hg_addr_t hg_addr; + hg_class_t *ha_class; + hg_addr_t ha_addr; +}; + +struct crt_ha_list_entry { + struct ha_entry chl_entry; + d_list_t chl_list_link; }; struct crt_ha_mapping { - d_list_t ha_link; - uint64_t ha_key; - struct ha_entry ha_value; + d_list_t chm_link; + uint64_t chm_key; - uint32_t ha_ref; - uint32_t ha_initialized; + d_list_t chm_list; + uint32_t chm_ref; + uint32_t chm_initialized; - pthread_mutex_t ha_mutex; + pthread_mutex_t chm_mutex; + pthread_rwlock_t chm_rwlock; /* to protect the list */ }; struct crt_ha_mapping * crt_ha_link2ptr(d_list_t *rlink) { - D_ASSERT(rlink != NULL); - return container_of(rlink, struct crt_ha_mapping, ha_link); + D_ASSERT(rlink != NULL); + return container_of(rlink, struct crt_ha_mapping, chm_link); } static struct crt_ha_mapping * crt_ha_mapping_init(uint64_t key, struct ha_entry value) { - struct crt_ha_mapping *ha; - int rc; - - D_ALLOC_PTR(ha); - if (!ha) { - D_ERROR("Failed to allocate ha item\n"); - D_GOTO(out, ha); - } + struct crt_ha_mapping *ha; + int rc; - D_INIT_LIST_HEAD(&ha->ha_link); - ha->ha_ref = 0; - ha->ha_initialized = 1; + D_ALLOC_PTR(ha); + if (!ha) { + D_ERROR("Failed to allocate ha item\n"); + D_GOTO(out, ha); + } - rc = D_MUTEX_INIT(&ha->ha_mutex, NULL); - if (rc != 0) { - D_FREE_PTR(ha); - D_GOTO(out, ha = NULL); - } + D_INIT_LIST_HEAD(&ha->chm_link); + D_INIT_LIST_HEAD(&ha->chm_list); + ha->chm_ref = 0; + ha->chm_initialized = 1; - ha->ha_key = key; - ha->ha_value = value; + rc = D_MUTEX_INIT(&ha->chm_mutex, NULL); + if (rc != 0) { + D_FREE_PTR(ha); + D_GOTO(out, ha = NULL); + } + ha->chm_key = key; out: - return ha; + return ha; } static int ha_op_key_get(struct d_hash_table *hhtab, d_list_t *rlink, void **key_pp) { - struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink); + struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink); - *key_pp = (void *)&ha->ha_key; - return sizeof(ha->ha_key); + *key_pp = (void *)&ha->chm_key; + return sizeof(ha->chm_key); } static uint32_t ha_op_key_hash(struct d_hash_table *hhtab, const void *key, unsigned int ksize) { - D_ASSERT(ksize == sizeof(uint64_t)); + D_ASSERT(ksize == sizeof(uint64_t)); - return (unsigned int)(*(const uint64_t *)key % - (1U << CRT_HG_ADDR_CACHE_LOOKUP_BITS)); + return (unsigned int)(*(const uint64_t *)key % + (1U << CRT_HG_ADDR_CACHE_LOOKUP_BITS)); } static bool ha_op_key_cmp(struct d_hash_table *hhtab, d_list_t *rlink, - const void *key, unsigned int ksize) + const void *key, unsigned int ksize) { - struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink); + struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink); - D_ASSERT(ksize == sizeof(uint64_t)); + D_ASSERT(ksize == sizeof(uint64_t)); - return ha->ha_key == *(uint64_t *)key; + return ha->chm_key == *(uint64_t *)key; } static void ha_op_rec_addref(struct d_hash_table *hhtab, d_list_t *halink) { - struct crt_ha_mapping *ha = crt_ha_link2ptr(halink); + struct crt_ha_mapping *ha = crt_ha_link2ptr(halink); - D_ASSERT(ha->ha_initialized); - D_MUTEX_LOCK(&ha->ha_mutex); - ha->ha_ref++; - D_MUTEX_UNLOCK(&ha->ha_mutex); + D_ASSERT(ha->chm_initialized); + D_MUTEX_LOCK(&ha->chm_mutex); + ha->chm_ref++; + D_MUTEX_UNLOCK(&ha->chm_mutex); } static bool ha_op_rec_decref(struct d_hash_table *hhtab, d_list_t *halink) { - uint32_t ref; - struct crt_ha_mapping *ha = crt_ha_link2ptr(halink); + uint32_t ref; + struct crt_ha_mapping *ha = crt_ha_link2ptr(halink); - D_ASSERT(ha->ha_initialized); - D_MUTEX_LOCK(&ha->ha_mutex); - ref = --ha->ha_ref; - D_MUTEX_UNLOCK(&ha->ha_mutex); + D_ASSERT(ha->chm_initialized); + D_MUTEX_LOCK(&ha->chm_mutex); + ref = --ha->chm_ref; + D_MUTEX_UNLOCK(&ha->chm_mutex); - return ref == 0; + return ref == 0; } static void crt_ha_destroy(struct crt_ha_mapping *ha) { - D_ASSERT(ha != NULL); - D_ASSERT(ha->ha_ref == 0); - D_ASSERT(ha->ha_initialized == 1); + D_ASSERT(ha != NULL); + D_ASSERT(ha->chm_ref == 0); + D_ASSERT(ha->chm_initialized == 1); - D_MUTEX_DESTROY(&ha->ha_mutex); - D_FREE(ha); + D_MUTEX_DESTROY(&ha->chm_mutex); + D_FREE(ha); } static void ha_op_rec_free(struct d_hash_table *hhtab, d_list_t *halink) { - crt_ha_destroy(crt_ha_link2ptr(halink)); + crt_ha_destroy(crt_ha_link2ptr(halink)); } static d_hash_table_ops_t ha_mapping_ops = { - .hop_key_get = ha_op_key_get, - .hop_key_hash = ha_op_key_hash, - .hop_key_cmp = ha_op_key_cmp, - .hop_rec_addref = ha_op_rec_addref, - .hop_rec_decref = ha_op_rec_decref, - .hop_rec_free = ha_op_rec_free, + .hop_key_get = ha_op_key_get, + .hop_key_hash = ha_op_key_hash, + .hop_key_cmp = ha_op_key_cmp, + .hop_rec_addref = ha_op_rec_addref, + .hop_rec_decref = ha_op_rec_decref, + .hop_rec_free = ha_op_rec_free, }; -struct d_hash_table ha_hash_table; /* * na_dict table should be in the same order of enum crt_na_type, the last one @@ -468,82 +468,82 @@ crt_hg_addr_lookup(struct crt_hg_context *hg_ctx, const char *name, } int -crt_hg_remove_addr (hg_class_t *hg_class, hg_addr_t hg_addr) +crt_hg_remove_addr(hg_class_t *hg_class, hg_addr_t hg_addr) { - hg_return_t ret = HG_SUCCESS; + hg_return_t ret = HG_SUCCESS; - D_DEBUG(DB_TRACE, "removing addr from class \n" ); - ret = HG_Addr_set_remove(hg_class, hg_addr); - if (ret != HG_SUCCESS) { - D_ERROR("HG_Addr_set_remove) failed, hg_ret %d.\n", ret); - D_GOTO(out, ret = -DER_HG); - } - - D_DEBUG(DB_TRACE, "freeing addr from class\n"); - ret = HG_Addr_free(hg_class, hg_addr); - if (ret != HG_SUCCESS) { - D_ERROR("HG_Addr_free() failed, hg_ret %d.\n", ret); - D_GOTO(out, ret = -DER_HG); - } + D_DEBUG(DB_TRACE, "removing hg addr %" PRIu64 "\n", + (long unsigned int)hg_addr ); + D_DEBUG(DB_TRACE, "removing hg class %" PRIu64 "\n", + (long unsigned int) hg_class ); + ret = HG_Addr_set_remove(hg_class, hg_addr); + if (ret != HG_SUCCESS) { + D_ERROR("HG_Addr_set_remove) failed, hg_ret %d.\n", ret); + D_GOTO(out, ret = -DER_HG); + } + ret = HG_Addr_free(hg_class, hg_addr); + if (ret != HG_SUCCESS) { + D_ERROR("HG_Addr_free() failed, hg_ret %d.\n", ret); + D_GOTO(out, ret = -DER_HG); + } out: - return ret; + return ret; } + int -crt_hg_remove_client_id (uint64_t client_id) +crt_hg_remove_client_id(uint64_t client_id) { - hg_return_t ret = HG_SUCCESS; - d_list_t *halink; - struct crt_ha_mapping *ha; - struct ha_entry ha_value; - - halink = d_hash_rec_find(&ha_hash_table, - (void *)&client_id, sizeof(client_id)); - if (!halink) { - D_ERROR("client=%" PRIu64 " not part of the hash table\n", client_id); - D_GOTO(out, ret = -DER_HG); - } + int ret = 0; + d_list_t *halink; + struct crt_ha_mapping *ha; + struct ha_entry ha_value; + struct crt_ha_list_entry *entry; + struct crt_context *crt_ctx; - ha = crt_ha_link2ptr(halink); - ha_value = ha->ha_value; + if (crt_gdata.cg_na_plugin != CRT_NA_OFI_PSM2) + D_GOTO(out, ret); - d_hash_rec_decref(&ha_hash_table, halink); + if (!crt_is_service()) + D_GOTO(out, ret); - ret = crt_hg_remove_addr (ha_value.hg_class, ha_value.hg_addr); + D_RWLOCK_RDLOCK(&crt_gdata.cg_rwlock); - if (ret != HG_SUCCESS) { - D_ERROR("crt_remove_addr failed, hg_ret %d.\n", ret); - D_GOTO(out, ret = -DER_HG); - } + d_list_for_each_entry(crt_ctx, &crt_gdata.cg_ctx_list, cc_link) { + //Can this lock create a deadlock? TBD + D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_hash_table_rwlock); + halink = d_hash_rec_find(&crt_ctx->cc_ha_hash_table, + (void *)&client_id, sizeof(client_id)); + if (!halink) { + D_ERROR("client=%" PRIu64 " not part of the hash table\n", + client_id); + } + ha = crt_ha_link2ptr(halink); + /* Remove all entries from list. */ + while ((entry = d_list_pop_entry(&ha->chm_list, + struct crt_ha_list_entry, chl_list_link)) != NULL) { + ha_value = entry->chl_entry; + D_FREE(entry); - d_hash_rec_delete(&ha_hash_table, - &client_id, sizeof(client_id)); - - D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n", - client_id); - D_DEBUG(DB_TRACE, "removed hg class %" PRIu64 "\n",(long unsigned int) ha_value.hg_class ); - D_DEBUG(DB_TRACE, "removed hg addr %" PRIu64 "\n",(long unsigned int) ha_value.hg_addr ); - -out: - return ret; -} -int -crt_hg_remove_all_client_ids (void) -{ - hg_return_t ret = HG_SUCCESS; + ret = crt_hg_remove_addr(ha_value.ha_class, ha_value.ha_addr); + + if (ret != HG_SUCCESS) { + D_ERROR("crt_remove_addr failed, hg_ret %d.\n", ret); + } + } + + d_hash_rec_decref(&crt_ctx->cc_ha_hash_table, halink); -#ifdef HASH_TEST - while (hg_addr_table_index > 0) { - ret = crt_hg_remove_client_id (hg_addr_table[--hg_addr_table_index]); - if (ret != HG_SUCCESS) { - D_ERROR("crt_remove_addr_all failed, hg_ret %d.\n", ret); - D_GOTO(out, ret = -DER_HG); - } + d_hash_rec_delete(&crt_ctx->cc_ha_hash_table, &client_id, + sizeof(client_id)); + D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n", + client_id); + D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_hash_table_rwlock); } -#endif + D_RWLOCK_UNLOCK(&crt_gdata.cg_rwlock); out: - return ret; + return ret; } int @@ -797,15 +797,7 @@ crt_hg_init(crt_phy_addr_t *addr, bool server) D_DEBUG(DB_NET, "in crt_hg_init, listen address: %s.\n", *addr); crt_gdata.cg_hg = hg_gdata; - /*Create the hg_addr hash table*/ - rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK, - CRT_LOOKUP_CACHE_BITS, - NULL, &ha_mapping_ops, - &ha_hash_table); - if (rc != 0) { - D_ERROR("d_hash_table_create failed, rc: %d\n", rc); - D_GOTO(out, rc); - } + out: if (info_string) D_FREE(info_string); @@ -816,6 +808,131 @@ crt_hg_init(crt_phy_addr_t *addr, bool server) return rc; } +int +crt_dump_ha_list(d_list_t *halist) +{ + struct crt_ha_list_entry *entry; + struct crt_ha_list_entry *temp; + struct ha_entry ha_value; + int rc = 0; + + d_list_for_each_entry_safe(entry, temp, halist, + chl_list_link) { + ha_value = entry->chl_entry; + + D_DEBUG(DB_TRACE, "hg addr %" PRIu64 "\n", + (uint64_t)ha_value.ha_addr ); + D_DEBUG(DB_TRACE, "hg class %" PRIu64 "\n\n", + (uint64_t) ha_value.ha_class ); + } + return rc; +} + + +int +crt_hg_ha_list_empty(d_list_t *halink, void *arg) +{ + struct crt_ha_mapping *ha; + struct crt_ha_list_entry *entry; + struct crt_ha_list_entry *temp; + struct ha_entry ha_value; + int rc = 0; + + D_ASSERT(arg == NULL); + ha = crt_ha_link2ptr(halink); + + if (ha->chm_initialized != 1) + D_GOTO(out,rc); + + D_DEBUG(DB_TRACE, "client id %" PRIu64 " removing from hash table\n", + ha->chm_key); + + d_list_for_each_entry_safe(entry, temp, &ha->chm_list, + chl_list_link) { + ha_value = entry->chl_entry; + rc = crt_hg_remove_addr (ha_value.ha_class, ha_value.ha_addr); + + if (rc != HG_SUCCESS) { + D_ERROR("crt_remove_addr failed, hg_ret %d.\n", rc); + } + D_FREE(entry); + } +out: + return rc; +} + +int +crt_hg_remove_hash_rank(d_rank_t rank) +{ + d_list_t *halink; + struct crt_ha_mapping *ha; + struct ha_entry ha_value; + struct crt_ha_list_entry *entry; + uint64_t client_id; + int rc = 0; + + struct crt_context *crt_ctx; + +// if (!crt_is_service()) +// D_GOTO(out, rc); + + if (crt_gdata.cg_na_plugin != CRT_NA_OFI_PSM2) + D_GOTO(out, rc); + D_RWLOCK_RDLOCK(&crt_gdata.cg_rwlock); + + client_id = (uint64_t)rank; + d_list_for_each_entry(crt_ctx, &crt_gdata.cg_ctx_list, cc_link) { + //Can this lock create a deadlock? TBD + D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); + halink = d_hash_rec_find(&crt_ctx->cc_ha_server_hash_table, + (void *)&client_id, sizeof(client_id)); + if (!halink) { + D_ERROR("client=%" PRIu64 "not in server hash table\n", + client_id); + D_GOTO(out, rc = -DER_NONEXIST); + } + + ha = crt_ha_link2ptr(halink); + + /* Remove all entries from list. */ + while ((entry = d_list_pop_entry(&ha->chm_list, + struct crt_ha_list_entry, chl_list_link)) != NULL) { + ha_value = entry->chl_entry; + D_FREE(entry); + rc = crt_hg_remove_addr (ha_value.ha_class, ha_value.ha_addr); + if (rc != HG_SUCCESS) { + D_ERROR("crt_remove_addr failed, hg_ret %d.\n", rc); + rc = -DER_NONEXIST; + } + } + + d_hash_rec_decref(&crt_ctx->cc_ha_server_hash_table, halink); + d_hash_rec_delete(&crt_ctx->cc_ha_server_hash_table, + &client_id, sizeof(client_id)); + D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); + } + D_RWLOCK_RDLOCK(&crt_gdata.cg_rwlock); + D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n", + client_id); +out: + return rc; +} + +int +crt_hg_ha_table_empty(struct d_hash_table *ha_table) +{ + int rc = 0; + + rc = d_hash_table_traverse(ha_table, + crt_hg_ha_list_empty, NULL); + if (rc != 0) + D_ERROR("d_hash_table_traverse failed, rc: %d.\n", rc); + + d_hash_table_destroy_inplace(ha_table, true); + + return rc; +} + /* be called only in crt_finalize */ int crt_hg_fini() @@ -845,7 +962,7 @@ crt_hg_fini() D_WARN("Could not finalize NA class, na_ret: %d.\n", na_ret); rc = -DER_HG; } - d_hash_table_destroy_inplace(&ha_hash_table, true); + D_FREE(crt_gdata.cg_hg); return rc; } @@ -867,6 +984,27 @@ crt_hg_ctx_init(struct crt_hg_context *hg_ctx, int idx) D_DEBUG(DB_NET, "crt_gdata.cg_share_na %d, crt_is_service() %d\n", crt_gdata.cg_share_na, crt_is_service()); + + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) { + /*Create the hg_addr hash tables*/ + rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK, + CRT_HG_ADDR_CACHE_LOOKUP_BITS, NULL, + &ha_mapping_ops, &crt_ctx->cc_ha_hash_table); + if (rc != 0) { + D_ERROR("d_hash_table_create failed, rc: %d\n", rc); + D_GOTO(out, rc); + } + + rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK, + CRT_HG_ADDR_CACHE_LOOKUP_BITS, NULL, + &ha_mapping_ops, &crt_ctx->cc_ha_server_hash_table); + if (rc != 0) { + D_ERROR("d_hash_table_create server failed, rc: %d\n", + rc); + D_GOTO(out, rc); + } + } + if (idx == 0 || crt_gdata.cg_share_na == true) { hg_context = HG_Context_create_id(crt_gdata.cg_hg->chg_hgcla, idx); @@ -981,15 +1119,32 @@ crt_hg_ctx_init(struct crt_hg_context *hg_ctx, int idx) int crt_hg_ctx_fini(struct crt_hg_context *hg_ctx) { - hg_context_t *hg_context; - hg_return_t hg_ret = HG_SUCCESS; - na_return_t na_ret; - int rc = 0; + hg_context_t *hg_context; + hg_return_t hg_ret = HG_SUCCESS; + na_return_t na_ret; + int rc = 0; + struct crt_context *crt_ctx; D_ASSERT(hg_ctx != NULL); hg_context = hg_ctx->chc_hgctx; D_ASSERT(hg_context != NULL); + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2){ + crt_ctx = container_of(hg_ctx, struct crt_context, cc_hg_ctx); + D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_hash_table_rwlock); + rc = crt_hg_ha_table_empty(&crt_ctx->cc_ha_hash_table); + if (rc != 0) + D_ERROR("d_hash_table_empty failed, rc: %d.\n", rc); + D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_hash_table_rwlock); + + + D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); + rc = crt_hg_ha_table_empty(&crt_ctx->cc_ha_server_hash_table); + if (rc != 0) + D_ERROR("d_hash_table_empty server failed, rc: %d.\n", rc); + D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); + } + crt_hg_pool_fini(hg_ctx); hg_ret = HG_Context_destroy(hg_context); @@ -1039,21 +1194,29 @@ crt_hg_context_lookup(hg_context_t *hg_ctx) int crt_rpc_handler_common(hg_handle_t hg_hdl) { - struct crt_context *crt_ctx; - struct crt_hg_context *hg_ctx; - const struct hg_info *hg_info; - struct crt_rpc_priv *rpc_priv; - crt_rpc_t *rpc_pub; - crt_opcode_t opc; - crt_proc_t proc = NULL; - struct crt_opc_info *opc_info = NULL; - hg_return_t hg_ret = HG_SUCCESS; - bool is_coll_req = false; - int rc = 0; - struct crt_rpc_priv rpc_tmp = {0}; - d_list_t *halink; - struct crt_ha_mapping *ha; - struct ha_entry ha_entry; + struct crt_context *crt_ctx; + struct crt_hg_context *hg_ctx; + const struct hg_info *hg_info; + struct crt_rpc_priv *rpc_priv; + crt_rpc_t *rpc_pub; + crt_opcode_t opc; + crt_proc_t proc = NULL; + struct crt_opc_info *opc_info = NULL; + hg_return_t hg_ret = HG_SUCCESS; + bool is_coll_req = false; + int rc = 0; + struct crt_rpc_priv rpc_tmp = {0}; + crt_rpc_t *rpc_tmp_pub; + d_list_t *halink; + struct crt_ha_mapping *ha; + struct ha_entry ha_entry; + bool found = false; + struct crt_ha_list_entry *ha_list_entry; + d_rank_t hdr_src_rank; + uint64_t client_id; + struct d_hash_table *ha_hash_table_ptr; + pthread_rwlock_t *ha_hash_table_rwlock_ptr; + hg_info = HG_Get_info(hg_hdl); if (hg_info == NULL) { D_ERROR("HG_Get_info failed.\n"); @@ -1088,46 +1251,103 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) opc = rpc_tmp.crp_req_hdr.cch_opc; if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) { + + /* Determine whether this came from cleint or server in order to + * determine which hash table to use + */ + rpc_tmp_pub = &rpc_tmp.crp_pub; + + rc = crt_req_src_rank_get(rpc_tmp_pub, &hdr_src_rank); + if (rc != 0) { + D_ERROR("failed to retrieve rpc src rank, rc: %d.\n", rc); + D_GOTO(out, hg_ret = -DER_NONEXIST); + } + /** Search for the recieved hg_addr in the ha hash table. If - * not found, then insert. - */ - halink = d_hash_rec_find(&ha_hash_table, - (void *)&rpc_tmp.crp_req_hdr.cch_clid, - sizeof(rpc_tmp.crp_req_hdr.cch_clid)); - if (halink != NULL) { - D_DEBUG(DB_TRACE, "client id %" PRIu64 "already in hash table\n", - rpc_tmp.crp_req_hdr.cch_clid); - - d_hash_rec_decref(&ha_hash_table, halink); - } + * not found, then insert. + */ + + if (hdr_src_rank == CRT_NO_RANK) { + ha_hash_table_rwlock_ptr = &crt_ctx->cc_ha_hash_table_rwlock; + ha_hash_table_ptr = &crt_ctx->cc_ha_hash_table; + client_id = rpc_tmp.crp_req_hdr.cch_clid; + } else { - ha_entry.hg_class = hg_info->hg_class; - HG_Addr_dup(hg_info->hg_class, hg_info->addr, &ha_entry.hg_addr); - ha = crt_ha_mapping_init(rpc_tmp.crp_req_hdr.cch_clid, ha_entry); - if (!ha) { - D_ERROR("Failed to allocate entry\n"); - D_GOTO(out, hg_ret = HG_SUCCESS); - } -#ifdef HASH_TEST - hg_addr_table[hg_addr_table_index++] = - rpc_tmp.crp_req_hdr.cch_clid; - -#endif - rc = d_hash_rec_insert(&ha_hash_table, - (void *)&rpc_tmp.crp_req_hdr.cch_clid, - sizeof(rpc_tmp.crp_req_hdr.cch_clid), - &ha->ha_link, true); - if (rc != 0) { - D_ERROR("Failed to add entry; rc=%d\n", rc); - crt_ha_destroy(ha); - D_GOTO(out, hg_ret = HG_SUCCESS); - } - D_DEBUG(DB_TRACE, "client id %" PRIu64 "inserted in hash table\n", - rpc_tmp.crp_req_hdr.cch_clid); - D_DEBUG(DB_TRACE, "inserted hg addr %" PRIu64 "\n", (long unsigned int)ha_entry.hg_addr ); - D_DEBUG(DB_TRACE, "inserted hg class %" PRIu64 "\n",(long unsigned int) ha_entry.hg_class ); - + ha_hash_table_rwlock_ptr = &crt_ctx->cc_ha_server_hash_table_rwlock; + ha_hash_table_ptr = &crt_ctx->cc_ha_server_hash_table; + client_id = (uint64_t)hdr_src_rank; + } + D_RWLOCK_WRLOCK(ha_hash_table_rwlock_ptr); + halink = d_hash_rec_find(ha_hash_table_ptr, (void *)&client_id, + sizeof(client_id)); + if (halink != NULL) { + /*Traverse the list and add new entry to tail if not found. */ + ha = crt_ha_link2ptr(halink); + d_list_for_each_entry(ha_list_entry, &ha->chm_list, + chl_list_link) { + if ((ha_list_entry->chl_entry.ha_class == hg_info->hg_class) + && (HG_Addr_cmp(hg_info->hg_class, + ha_list_entry->chl_entry.ha_addr, + hg_info->addr))) { + found = true; + break; + } + } + + if (!found) { + /* Add new entry to tail. */ + D_ALLOC_PTR(ha_list_entry); + if (ha_list_entry == NULL) { + D_ERROR("Failed to allocate entry for hash table list\n"); + d_hash_rec_decref(ha_hash_table_ptr, halink); + D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); + D_GOTO(out, hg_ret = -DER_NOMEM); + } + + D_INIT_LIST_HEAD(&ha_list_entry->chl_list_link); + ha_list_entry->chl_entry.ha_class = hg_info->hg_class; + HG_Addr_dup(hg_info->hg_class, hg_info->addr, + &ha_list_entry->chl_entry.ha_addr); + + d_list_add_tail(&ha_list_entry->chl_list_link, &ha->chm_list); + crt_dump_ha_list(&ha->chm_list); + } + d_hash_rec_decref(ha_hash_table_ptr, halink); + } + else { + ha = crt_ha_mapping_init(client_id, ha_entry); + if (!ha) { + D_ERROR("Failed to allocate entry\n"); + D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); + D_GOTO(out, hg_ret = -DER_NOMEM); + } + + D_ALLOC_PTR(ha_list_entry); + if (ha_list_entry == NULL) { + D_ERROR("Failed to allocate entry for hash table list\n"); + D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); + D_GOTO(out, hg_ret = -DER_NOMEM); + } + + D_INIT_LIST_HEAD(&ha_list_entry->chl_list_link); + ha_list_entry->chl_entry.ha_class = hg_info->hg_class; + HG_Addr_dup(hg_info->hg_class, hg_info->addr, + &ha_list_entry->chl_entry.ha_addr); + + d_list_add_tail(&ha_list_entry->chl_list_link, &ha->chm_list); + crt_dump_ha_list(&ha->chm_list); + rc = d_hash_rec_insert(ha_hash_table_ptr, + (void *)&client_id, sizeof(client_id), + &ha->chm_link, true); + if (rc != 0) { + D_ERROR("Failed to add entry; rc=%d\n", rc); + crt_ha_destroy(ha); //TBD is this needed + //D_RWLOCK_UNLOCK(&ha->chm_rwlock); + D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); + D_GOTO(out, hg_ret = -DER_NONEXIST); + } } + D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); } /** * Set the opcode in the temp RPC so that it can be correctly logged. diff --git a/src/cart/crt_hg.h b/src/cart/crt_hg.h index a54802a20..5529f6bdc 100644 --- a/src/cart/crt_hg.h +++ b/src/cart/crt_hg.h @@ -60,11 +60,6 @@ /** number of prepost HG handles when enable pool */ #define CRT_HG_POOL_PREPOST_NUM (16) #define CRT_HG_ADDR_CACHE_LOOKUP_BITS (5) -#define HASH_TEST 1 - -#ifdef HASH_TEST -uint64_t hg_addr_table[1000]; -#endif struct crt_rpc_priv; struct crt_common_hdr; @@ -159,6 +154,8 @@ int crt_hg_addr_free(struct crt_hg_context *hg_ctx, hg_addr_t addr); int crt_hg_get_addr(hg_class_t *hg_class, char *addr_str, size_t *str_size); int crt_rpc_handler_common(hg_handle_t hg_hdl); +int crt_hg_remove_hash_rank(d_rank_t rank); + /* crt_hg_proc.c */ int crt_hg_unpack_header(hg_handle_t hg_hdl, struct crt_rpc_priv *rpc_priv, diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 3fd62b982..a238c5636 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -167,6 +167,12 @@ struct crt_context { crt_rpc_task_t cc_rpc_cb; /* rpc callback */ /* in-flight endpoint tracking hash table */ struct d_hash_table cc_epi_table; + /* hg addr hash tables */ + struct d_hash_table cc_ha_hash_table; + struct d_hash_table cc_ha_server_hash_table; + /* Locks associated with above hg addr tables. */ + pthread_rwlock_t cc_ha_hash_table_rwlock; + pthread_rwlock_t cc_ha_server_hash_table_rwlock; /* binheap for inflight RPC timeout tracking */ struct d_binheap cc_bh_timeout; /* mutex to protect cc_epi_table and timeout binheap */ diff --git a/src/cart/crt_rpc.c b/src/cart/crt_rpc.c index d4704501f..1d6fcc6ec 100644 --- a/src/cart/crt_rpc.c +++ b/src/cart/crt_rpc.c @@ -1139,8 +1139,10 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg) * function. Referenced dropped at end of this function. */ RPC_ADDREF(rpc_priv); - /* Insert the local job id in the rpc request. */ - rpc_priv->crp_req_hdr.cch_clid = crt_gdata.cg_clid; + /* Insert the local job id in the rpc request for PSM2. */ + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) + rpc_priv->crp_req_hdr.cch_clid = crt_gdata.cg_clid; + if (req->cr_ctx == NULL) { D_ERROR("invalid parameter (NULL req->cr_ctx).\n"); D_GOTO(out, rc = -DER_INVAL); diff --git a/src/cart/crt_rpc.h b/src/cart/crt_rpc.h index fd840a80f..7c6298838 100644 --- a/src/cart/crt_rpc.h +++ b/src/cart/crt_rpc.h @@ -101,8 +101,8 @@ struct crt_common_hdr { uint32_t cch_xid; /* used in crp_reply_hdr to propagate rpc failure back to sender */ uint32_t cch_rc; - /* client id */ - uint64_t cch_clid; + /* client id */ + uint64_t cch_clid; }; typedef enum { diff --git a/src/include/cart/api.h b/src/include/cart/api.h index da94bc11f..a1a29b32a 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -1955,11 +1955,7 @@ int crt_group_secondary_modify(crt_group_t *grp, d_rank_list_t *sec_ranks, * return DER_SUCCESS on success, negative value on * failure. */ -int crt_hg_remove_client_id (uint64_t client_id); - -/* Test Function. */ -int crt_hg_remove_all_client_ids (void); - +int crt_hg_remove_client_id(uint64_t client_id); #define crt_proc__Bool crt_proc_bool #define crt_proc_d_rank_t crt_proc_uint32_t From ccd051b02a904c4f13ae642579a771f333df248c Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Thu, 6 Feb 2020 19:58:40 +0000 Subject: [PATCH 3/9] Fixed whitespace issues Signed-off-by: Vikram Chhabra --- src/cart/crt_hg.c | 151 +++++++++++++++++++--------------- src/cart/crt_internal_types.h | 24 +++--- src/crt_launch/crt_launch.c | 4 +- 3 files changed, 97 insertions(+), 82 deletions(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index ec415730d..43f0705dc 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -53,15 +53,15 @@ struct crt_ha_list_entry { }; struct crt_ha_mapping { - d_list_t chm_link; - uint64_t chm_key; + d_list_t chm_link; + uint64_t chm_key; - d_list_t chm_list; - uint32_t chm_ref; - uint32_t chm_initialized; + d_list_t chm_list; + uint32_t chm_ref; + uint32_t chm_initialized; - pthread_mutex_t chm_mutex; - pthread_rwlock_t chm_rwlock; /* to protect the list */ + pthread_mutex_t chm_mutex; + pthread_rwlock_t chm_rwlock; /* to protect the list */ }; struct crt_ha_mapping * @@ -473,9 +473,9 @@ crt_hg_remove_addr(hg_class_t *hg_class, hg_addr_t hg_addr) hg_return_t ret = HG_SUCCESS; D_DEBUG(DB_TRACE, "removing hg addr %" PRIu64 "\n", - (long unsigned int)hg_addr ); + (uint64_t)hg_addr); D_DEBUG(DB_TRACE, "removing hg class %" PRIu64 "\n", - (long unsigned int) hg_class ); + (uint64_t)hg_class); ret = HG_Addr_set_remove(hg_class, hg_addr); if (ret != HG_SUCCESS) { D_ERROR("HG_Addr_set_remove) failed, hg_ret %d.\n", ret); @@ -510,26 +510,29 @@ crt_hg_remove_client_id(uint64_t client_id) D_RWLOCK_RDLOCK(&crt_gdata.cg_rwlock); d_list_for_each_entry(crt_ctx, &crt_gdata.cg_ctx_list, cc_link) { - //Can this lock create a deadlock? TBD + /*Can this lock create a deadlock? TBD */ D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_hash_table_rwlock); halink = d_hash_rec_find(&crt_ctx->cc_ha_hash_table, (void *)&client_id, sizeof(client_id)); if (!halink) { - D_ERROR("client=%" PRIu64 " not part of the hash table\n", + D_ERROR("client=%" PRIu64 " not in the hash table\n", client_id); } ha = crt_ha_link2ptr(halink); /* Remove all entries from list. */ while ((entry = d_list_pop_entry(&ha->chm_list, - struct crt_ha_list_entry, chl_list_link)) != NULL) { + struct crt_ha_list_entry, chl_list_link)) + != NULL) { ha_value = entry->chl_entry; D_FREE(entry); - ret = crt_hg_remove_addr(ha_value.ha_class, ha_value.ha_addr); + ret = crt_hg_remove_addr(ha_value.ha_class, + ha_value.ha_addr); if (ret != HG_SUCCESS) { - D_ERROR("crt_remove_addr failed, hg_ret %d.\n", ret); + D_ERROR("crt_remove_addr err, hg_ret %d.\n", + ret); } } @@ -537,7 +540,7 @@ crt_hg_remove_client_id(uint64_t client_id) d_hash_rec_delete(&crt_ctx->cc_ha_hash_table, &client_id, sizeof(client_id)); - D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n", + D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash\n", client_id); D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_hash_table_rwlock); } @@ -821,10 +824,10 @@ crt_dump_ha_list(d_list_t *halist) ha_value = entry->chl_entry; D_DEBUG(DB_TRACE, "hg addr %" PRIu64 "\n", - (uint64_t)ha_value.ha_addr ); + (uint64_t)ha_value.ha_addr); D_DEBUG(DB_TRACE, "hg class %" PRIu64 "\n\n", - (uint64_t) ha_value.ha_class ); - } + (uint64_t) ha_value.ha_class); + } return rc; } @@ -842,7 +845,7 @@ crt_hg_ha_list_empty(d_list_t *halink, void *arg) ha = crt_ha_link2ptr(halink); if (ha->chm_initialized != 1) - D_GOTO(out,rc); + D_GOTO(out, rc); D_DEBUG(DB_TRACE, "client id %" PRIu64 " removing from hash table\n", ha->chm_key); @@ -850,7 +853,7 @@ crt_hg_ha_list_empty(d_list_t *halink, void *arg) d_list_for_each_entry_safe(entry, temp, &ha->chm_list, chl_list_link) { ha_value = entry->chl_entry; - rc = crt_hg_remove_addr (ha_value.ha_class, ha_value.ha_addr); + rc = crt_hg_remove_addr(ha_value.ha_class, ha_value.ha_addr); if (rc != HG_SUCCESS) { D_ERROR("crt_remove_addr failed, hg_ret %d.\n", rc); @@ -865,24 +868,23 @@ int crt_hg_remove_hash_rank(d_rank_t rank) { d_list_t *halink; - struct crt_ha_mapping *ha; - struct ha_entry ha_value; + struct crt_ha_mapping *ha; + struct ha_entry ha_value; struct crt_ha_list_entry *entry; uint64_t client_id; - int rc = 0; - + int rc = 0; struct crt_context *crt_ctx; -// if (!crt_is_service()) -// D_GOTO(out, rc); - +/* if (!crt_is_service()) +* D_GOTO(out, rc); +*/ if (crt_gdata.cg_na_plugin != CRT_NA_OFI_PSM2) D_GOTO(out, rc); D_RWLOCK_RDLOCK(&crt_gdata.cg_rwlock); client_id = (uint64_t)rank; d_list_for_each_entry(crt_ctx, &crt_gdata.cg_ctx_list, cc_link) { - //Can this lock create a deadlock? TBD + /*Can this lock create a deadlock? TBD*/ D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); halink = d_hash_rec_find(&crt_ctx->cc_ha_server_hash_table, (void *)&client_id, sizeof(client_id)); @@ -896,12 +898,15 @@ crt_hg_remove_hash_rank(d_rank_t rank) /* Remove all entries from list. */ while ((entry = d_list_pop_entry(&ha->chm_list, - struct crt_ha_list_entry, chl_list_link)) != NULL) { + struct crt_ha_list_entry, chl_list_link)) + != NULL) { ha_value = entry->chl_entry; D_FREE(entry); - rc = crt_hg_remove_addr (ha_value.ha_class, ha_value.ha_addr); + rc = crt_hg_remove_addr (ha_value.ha_class, + ha_value.ha_addr); if (rc != HG_SUCCESS) { - D_ERROR("crt_remove_addr failed, hg_ret %d.\n", rc); + D_ERROR("crt_remove_addr failed, hg_ret %d.\n", + rc); rc = -DER_NONEXIST; } } @@ -1129,7 +1134,7 @@ crt_hg_ctx_fini(struct crt_hg_context *hg_ctx) hg_context = hg_ctx->chc_hgctx; D_ASSERT(hg_context != NULL); - if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2){ + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) { crt_ctx = container_of(hg_ctx, struct crt_context, cc_hg_ctx); D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_hash_table_rwlock); rc = crt_hg_ha_table_empty(&crt_ctx->cc_ha_hash_table); @@ -1199,23 +1204,23 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) const struct hg_info *hg_info; struct crt_rpc_priv *rpc_priv; crt_rpc_t *rpc_pub; - crt_opcode_t opc; - crt_proc_t proc = NULL; + crt_opcode_t opc; + crt_proc_t proc = NULL; struct crt_opc_info *opc_info = NULL; - hg_return_t hg_ret = HG_SUCCESS; - bool is_coll_req = false; - int rc = 0; - struct crt_rpc_priv rpc_tmp = {0}; + hg_return_t hg_ret = HG_SUCCESS; + bool is_coll_req = false; + int rc = 0; + struct crt_rpc_priv rpc_tmp = {0}; crt_rpc_t *rpc_tmp_pub; d_list_t *halink; - struct crt_ha_mapping *ha; - struct ha_entry ha_entry; - bool found = false; - struct crt_ha_list_entry *ha_list_entry; + struct crt_ha_mapping *ha; + struct ha_entry ha_entry; + bool found = false; + struct crt_ha_list_entry *ha_list_entry; d_rank_t hdr_src_rank; uint64_t client_id; - struct d_hash_table *ha_hash_table_ptr; - pthread_rwlock_t *ha_hash_table_rwlock_ptr; + struct d_hash_table *ha_hash_table_ptr; + pthread_rwlock_t *ha_hash_table_rwlock_ptr; hg_info = HG_Get_info(hg_hdl); if (hg_info == NULL) { @@ -1252,14 +1257,15 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) { - /* Determine whether this came from cleint or server in order to + /* Determine whether this came from cleint or server in order to * determine which hash table to use */ rpc_tmp_pub = &rpc_tmp.crp_pub; rc = crt_req_src_rank_get(rpc_tmp_pub, &hdr_src_rank); if (rc != 0) { - D_ERROR("failed to retrieve rpc src rank, rc: %d.\n", rc); + D_ERROR("failed to retrieve rpc src rank, rc: %d.\n", + rc); D_GOTO(out, hg_ret = -DER_NONEXIST); } @@ -1268,26 +1274,30 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) */ if (hdr_src_rank == CRT_NO_RANK) { - ha_hash_table_rwlock_ptr = &crt_ctx->cc_ha_hash_table_rwlock; + ha_hash_table_rwlock_ptr = + &crt_ctx->cc_ha_hash_table_rwlock; ha_hash_table_ptr = &crt_ctx->cc_ha_hash_table; client_id = rpc_tmp.crp_req_hdr.cch_clid; - } - else { - ha_hash_table_rwlock_ptr = &crt_ctx->cc_ha_server_hash_table_rwlock; + } else { + ha_hash_table_rwlock_ptr = + &crt_ctx->cc_ha_server_hash_table_rwlock; ha_hash_table_ptr = &crt_ctx->cc_ha_server_hash_table; client_id = (uint64_t)hdr_src_rank; } D_RWLOCK_WRLOCK(ha_hash_table_rwlock_ptr); - halink = d_hash_rec_find(ha_hash_table_ptr, (void *)&client_id, + halink = d_hash_rec_find(ha_hash_table_ptr, (void *)&client_id, sizeof(client_id)); if (halink != NULL) { - /*Traverse the list and add new entry to tail if not found. */ + /*Traverse the list and add new entry to tail + *if not found. + */ ha = crt_ha_link2ptr(halink); d_list_for_each_entry(ha_list_entry, &ha->chm_list, chl_list_link) { - if ((ha_list_entry->chl_entry.ha_class == hg_info->hg_class) - && (HG_Addr_cmp(hg_info->hg_class, - ha_list_entry->chl_entry.ha_addr, + if ((ha_list_entry->chl_entry.ha_class == + hg_info->hg_class) + && (HG_Addr_cmp(hg_info->hg_class, + ha_list_entry->chl_entry.ha_addr, hg_info->addr))) { found = true; break; @@ -1298,23 +1308,28 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) /* Add new entry to tail. */ D_ALLOC_PTR(ha_list_entry); if (ha_list_entry == NULL) { - D_ERROR("Failed to allocate entry for hash table list\n"); - d_hash_rec_decref(ha_hash_table_ptr, halink); - D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); + D_ERROR("Failed to allocate entry\n"); + d_hash_rec_decref(ha_hash_table_ptr, + halink); + D_RWLOCK_UNLOCK + (ha_hash_table_rwlock_ptr); D_GOTO(out, hg_ret = -DER_NOMEM); } - D_INIT_LIST_HEAD(&ha_list_entry->chl_list_link); - ha_list_entry->chl_entry.ha_class = hg_info->hg_class; - HG_Addr_dup(hg_info->hg_class, hg_info->addr, + D_INIT_LIST_HEAD + (&ha_list_entry->chl_list_link); + ha_list_entry->chl_entry.ha_class = + hg_info->hg_class; + HG_Addr_dup(hg_info->hg_class, + hg_info->addr, &ha_list_entry->chl_entry.ha_addr); - d_list_add_tail(&ha_list_entry->chl_list_link, &ha->chm_list); + d_list_add_tail(&ha_list_entry->chl_list_link, + &ha->chm_list); crt_dump_ha_list(&ha->chm_list); } d_hash_rec_decref(ha_hash_table_ptr, halink); - } - else { + } else { ha = crt_ha_mapping_init(client_id, ha_entry); if (!ha) { D_ERROR("Failed to allocate entry\n"); @@ -1324,7 +1339,7 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) D_ALLOC_PTR(ha_list_entry); if (ha_list_entry == NULL) { - D_ERROR("Failed to allocate entry for hash table list\n"); + D_ERROR("Failed to allocate entry\n"); D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); D_GOTO(out, hg_ret = -DER_NOMEM); } @@ -1334,15 +1349,15 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) HG_Addr_dup(hg_info->hg_class, hg_info->addr, &ha_list_entry->chl_entry.ha_addr); - d_list_add_tail(&ha_list_entry->chl_list_link, &ha->chm_list); + d_list_add_tail(&ha_list_entry->chl_list_link, + &ha->chm_list); crt_dump_ha_list(&ha->chm_list); rc = d_hash_rec_insert(ha_hash_table_ptr, (void *)&client_id, sizeof(client_id), &ha->chm_link, true); if (rc != 0) { D_ERROR("Failed to add entry; rc=%d\n", rc); - crt_ha_destroy(ha); //TBD is this needed - //D_RWLOCK_UNLOCK(&ha->chm_rwlock); + crt_ha_destroy(ha); /*TBD is this needed*/ D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); D_GOTO(out, hg_ret = -DER_NONEXIST); } diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index a238c5636..0c481d0b3 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -160,25 +160,25 @@ extern struct crt_plugin_gdata crt_plugin_gdata; /* crt_context */ struct crt_context { - d_list_t cc_link; /* link to gdata.cg_ctx_list */ - int cc_idx; /* context index */ - struct crt_hg_context cc_hg_ctx; /* HG context */ + d_list_t cc_link; /* link to gdata.cg_ctx_list */ + int cc_idx; /* context index */ + struct crt_hg_context cc_hg_ctx; /* HG context */ void *cc_rpc_cb_arg; - crt_rpc_task_t cc_rpc_cb; /* rpc callback */ + crt_rpc_task_t cc_rpc_cb; /* rpc callback */ /* in-flight endpoint tracking hash table */ - struct d_hash_table cc_epi_table; + struct d_hash_table cc_epi_table; /* hg addr hash tables */ - struct d_hash_table cc_ha_hash_table; - struct d_hash_table cc_ha_server_hash_table; + struct d_hash_table cc_ha_hash_table; + struct d_hash_table cc_ha_server_hash_table; /* Locks associated with above hg addr tables. */ - pthread_rwlock_t cc_ha_hash_table_rwlock; - pthread_rwlock_t cc_ha_server_hash_table_rwlock; + pthread_rwlock_t cc_ha_hash_table_rwlock; + pthread_rwlock_t cc_ha_server_hash_table_rwlock; /* binheap for inflight RPC timeout tracking */ - struct d_binheap cc_bh_timeout; + struct d_binheap cc_bh_timeout; /* mutex to protect cc_epi_table and timeout binheap */ - pthread_mutex_t cc_mutex; + pthread_mutex_t cc_mutex; /* timeout per-context */ - uint32_t cc_timeout_sec; + uint32_t cc_timeout_sec; }; /* in-flight RPC req list, be tracked per endpoint for every crt_context */ diff --git a/src/crt_launch/crt_launch.c b/src/crt_launch/crt_launch.c index 6237259e4..e1f1ae1ca 100644 --- a/src/crt_launch/crt_launch.c +++ b/src/crt_launch/crt_launch.c @@ -153,9 +153,9 @@ get_self_uri(struct host *h) char *p; int len; int rc; - crt_init_options_t opt = {0}; + crt_init_options_t opt = {0}; - opt.cio_clid = h->my_rank; + opt.cio_clid = h->my_rank; rc = crt_init_opt(0, CRT_FLAG_BIT_SERVER, &opt); if (rc != 0) { From 9ef2daf75c42b74c57b776187e239fbc0138fc92 Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Thu, 6 Feb 2020 20:30:09 +0000 Subject: [PATCH 4/9] Checkpatch errors Signed-off-by: Vikram Chhabra --- src/cart/crt_group.c | 2 +- src/cart/crt_hg.c | 19 ++++++++++--------- src/cart/crt_internal_types.h | 6 +++--- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/src/cart/crt_group.c b/src/cart/crt_group.c index 000279d23..ca94b9df6 100644 --- a/src/cart/crt_group.c +++ b/src/cart/crt_group.c @@ -2710,7 +2710,7 @@ crt_group_rank_remove_internal(struct crt_grp_priv *grp_priv, d_rank_t rank) d_hash_rec_delete(&grp_priv->gp_uri_lookup_cache, &rank, sizeof(d_rank_t)); /*If PSM2 then remove the rank from the hash table. */ - if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) crt_hg_remove_hash_rank(rank); } else { d_rank_t prim_rank; diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 43f0705dc..b62b3663e 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -497,7 +497,7 @@ crt_hg_remove_client_id(uint64_t client_id) int ret = 0; d_list_t *halink; struct crt_ha_mapping *ha; - struct ha_entry ha_value; + struct ha_entry ha_value; struct crt_ha_list_entry *entry; struct crt_context *crt_ctx; @@ -858,8 +858,8 @@ crt_hg_ha_list_empty(d_list_t *halink, void *arg) if (rc != HG_SUCCESS) { D_ERROR("crt_remove_addr failed, hg_ret %d.\n", rc); } - D_FREE(entry); - } + D_FREE(entry); + } out: return rc; } @@ -889,7 +889,7 @@ crt_hg_remove_hash_rank(d_rank_t rank) halink = d_hash_rec_find(&crt_ctx->cc_ha_server_hash_table, (void *)&client_id, sizeof(client_id)); if (!halink) { - D_ERROR("client=%" PRIu64 "not in server hash table\n", + D_ERROR("client=%" PRIu64 "not in server hash table\n", client_id); D_GOTO(out, rc = -DER_NONEXIST); } @@ -908,7 +908,7 @@ crt_hg_remove_hash_rank(d_rank_t rank) D_ERROR("crt_remove_addr failed, hg_ret %d.\n", rc); rc = -DER_NONEXIST; - } + } } d_hash_rec_decref(&crt_ctx->cc_ha_server_hash_table, halink); @@ -992,7 +992,7 @@ crt_hg_ctx_init(struct crt_hg_context *hg_ctx, int idx) if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) { /*Create the hg_addr hash tables*/ - rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK, + rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK, CRT_HG_ADDR_CACHE_LOOKUP_BITS, NULL, &ha_mapping_ops, &crt_ctx->cc_ha_hash_table); if (rc != 0) { @@ -1146,7 +1146,8 @@ crt_hg_ctx_fini(struct crt_hg_context *hg_ctx) D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); rc = crt_hg_ha_table_empty(&crt_ctx->cc_ha_server_hash_table); if (rc != 0) - D_ERROR("d_hash_table_empty server failed, rc: %d.\n", rc); + D_ERROR("d_hash_table_empty server failed, rc: %d.\n", + rc); D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); } @@ -1289,8 +1290,8 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) sizeof(client_id)); if (halink != NULL) { /*Traverse the list and add new entry to tail - *if not found. - */ + *if not found. + */ ha = crt_ha_link2ptr(halink); d_list_for_each_entry(ha_list_entry, &ha->chm_list, chl_list_link) { diff --git a/src/cart/crt_internal_types.h b/src/cart/crt_internal_types.h index 0c481d0b3..56c42b1e5 100644 --- a/src/cart/crt_internal_types.h +++ b/src/cart/crt_internal_types.h @@ -171,12 +171,12 @@ struct crt_context { struct d_hash_table cc_ha_hash_table; struct d_hash_table cc_ha_server_hash_table; /* Locks associated with above hg addr tables. */ - pthread_rwlock_t cc_ha_hash_table_rwlock; - pthread_rwlock_t cc_ha_server_hash_table_rwlock; + pthread_rwlock_t cc_ha_hash_table_rwlock; + pthread_rwlock_t cc_ha_server_hash_table_rwlock; /* binheap for inflight RPC timeout tracking */ struct d_binheap cc_bh_timeout; /* mutex to protect cc_epi_table and timeout binheap */ - pthread_mutex_t cc_mutex; + pthread_mutex_t cc_mutex; /* timeout per-context */ uint32_t cc_timeout_sec; }; From 8dfc691f0c68542b1adb1e8ab46d736afa5497be Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Thu, 6 Feb 2020 20:48:03 +0000 Subject: [PATCH 5/9] checkpatch failures Signed-off-by: Vikram Chhabra --- src/cart/crt_hg.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index b62b3663e..aff1a1bf1 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -902,7 +902,7 @@ crt_hg_remove_hash_rank(d_rank_t rank) != NULL) { ha_value = entry->chl_entry; D_FREE(entry); - rc = crt_hg_remove_addr (ha_value.ha_class, + rc = crt_hg_remove_addr(ha_value.ha_class, ha_value.ha_addr); if (rc != HG_SUCCESS) { D_ERROR("crt_remove_addr failed, hg_ret %d.\n", @@ -1290,7 +1290,7 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) sizeof(client_id)); if (halink != NULL) { /*Traverse the list and add new entry to tail - *if not found. + *if not found. */ ha = crt_ha_link2ptr(halink); d_list_for_each_entry(ha_list_entry, &ha->chm_list, From 700d7ceb472d2f44df66789cf8c9203b4d62c6b1 Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Wed, 19 Feb 2020 17:43:29 +0000 Subject: [PATCH 6/9] CART-788: Fixes for PSM2 Signed-off-by: Vikram Chhabra --- scons_local | 2 +- src/cart/crt_hg.c | 30 ++++++++++++++++++------------ src/include/cart/api.h | 32 +++++++++++++++++++++++--------- 3 files changed, 42 insertions(+), 22 deletions(-) diff --git a/scons_local b/scons_local index bbb87d033..3bcd63602 160000 --- a/scons_local +++ b/scons_local @@ -1 +1 @@ -Subproject commit bbb87d033b7137a4096960b0779e21243a1aa58c +Subproject commit 3bcd636021aacc00ba41a415d1101daa670f5015 diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index aff1a1bf1..6889ed58b 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2016-2018 Intel Corporation +/* Copyright (C) 2016-2020 Intel Corporation * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -472,13 +472,11 @@ crt_hg_remove_addr(hg_class_t *hg_class, hg_addr_t hg_addr) { hg_return_t ret = HG_SUCCESS; - D_DEBUG(DB_TRACE, "removing hg addr %" PRIu64 "\n", - (uint64_t)hg_addr); - D_DEBUG(DB_TRACE, "removing hg class %" PRIu64 "\n", - (uint64_t)hg_class); + D_DEBUG(DB_TRACE, "removing hg addr %" PRIu64," class %" PRIu64 "\n", + (uint64_t)hg_addr, (uint64_t)hg_class); ret = HG_Addr_set_remove(hg_class, hg_addr); if (ret != HG_SUCCESS) { - D_ERROR("HG_Addr_set_remove) failed, hg_ret %d.\n", ret); + D_ERROR("HG_Addr_set_remove failed, hg_ret %d.\n", ret); D_GOTO(out, ret = -DER_HG); } ret = HG_Addr_free(hg_class, hg_addr); @@ -492,7 +490,7 @@ crt_hg_remove_addr(hg_class_t *hg_class, hg_addr_t hg_addr) } int -crt_hg_remove_client_id(uint64_t client_id) +crt_cleanup_client_id(uint64_t client_id) { int ret = 0; d_list_t *halink; @@ -517,6 +515,7 @@ crt_hg_remove_client_id(uint64_t client_id) if (!halink) { D_ERROR("client=%" PRIu64 " not in the hash table\n", client_id); + continue; } ha = crt_ha_link2ptr(halink); @@ -676,7 +675,10 @@ crt_get_info_string(char **string) } else { /* OFI_PORT is only for context 0 to use */ port = crt_na_ofi_conf.noc_port; - crt_na_ofi_conf.noc_port = -1; + if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) + crt_na_ofi_conf.noc_port++; + else + crt_na_ofi_conf.noc_port = -1; D_ASPRINTF(*string, "%s://%s/%s:%d", plugin_str, crt_na_ofi_conf.noc_domain, @@ -798,7 +800,11 @@ crt_hg_init(crt_phy_addr_t *addr, bool server) } } - D_DEBUG(DB_NET, "in crt_hg_init, listen address: %s.\n", *addr); + if (server) + D_DEBUG(DB_NET, "listening address: %s.\n", *addr); + else + D_DEBUG(DB_NET, "passive address: %s.\n", *addr); + crt_gdata.cg_hg = hg_gdata; out: @@ -832,6 +838,7 @@ crt_dump_ha_list(d_list_t *halist) } +/* This function will be used to clean out server hash tables. */ int crt_hg_ha_list_empty(d_list_t *halink, void *arg) { @@ -884,14 +891,13 @@ crt_hg_remove_hash_rank(d_rank_t rank) client_id = (uint64_t)rank; d_list_for_each_entry(crt_ctx, &crt_gdata.cg_ctx_list, cc_link) { - /*Can this lock create a deadlock? TBD*/ D_RWLOCK_WRLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); halink = d_hash_rec_find(&crt_ctx->cc_ha_server_hash_table, (void *)&client_id, sizeof(client_id)); if (!halink) { D_ERROR("client=%" PRIu64 "not in server hash table\n", client_id); - D_GOTO(out, rc = -DER_NONEXIST); + continue; } ha = crt_ha_link2ptr(halink); @@ -916,7 +922,7 @@ crt_hg_remove_hash_rank(d_rank_t rank) &client_id, sizeof(client_id)); D_RWLOCK_UNLOCK(&crt_ctx->cc_ha_server_hash_table_rwlock); } - D_RWLOCK_RDLOCK(&crt_gdata.cg_rwlock); + D_RWLOCK_UNLOCK(&crt_gdata.cg_rwlock); D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n", client_id); out: diff --git a/src/include/cart/api.h b/src/include/cart/api.h index a1a29b32a..9ca7c6ef1 100644 --- a/src/include/cart/api.h +++ b/src/include/cart/api.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2016-2019 Intel Corporation +/* Copyright (C) 2016-2020 Intel Corporation * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -715,14 +715,14 @@ crt_rpc_register(crt_opcode_t opc, uint32_t flags, struct crt_req_format *drf); * see DAOS. * * \param[in] ctx The cart context. - * \param[in] rpc RPC received. + * \param[in] rpc_hdlr_arg The argument of rpc_hdlr. * \param[in] rpc_hdlr Real RPC handler. - * \param[in] arg The argument for the RPC handler. + * \param[in] arg Extra argument for the callback. * * \return 0 for success, negative value if failed. * */ -typedef int (*crt_rpc_task_t) (crt_context_t *ctx, crt_rpc_t *rpc, +typedef int (*crt_rpc_task_t) (crt_context_t *ctx, void *rpc_hdlr_arg, void (*rpc_hdlr)(void *), void *arg); /** * Register RPC process callback for all RPCs this context received. @@ -1947,15 +1947,29 @@ int crt_group_secondary_modify(crt_group_t *grp, d_rank_list_t *sec_ranks, uint32_t version); /** - * Remove a client id from the Server hash table of Hg and Fabric Interface - * addresses. + * Cleanup the client ids on server, when there is an abrubt client exit. + * Currently only used on PSM2. * - * param[in] client_id System wide unique client id + * \param[in] client_id System wide unique client id * - * return DER_SUCCESS on success, negative value on + * \return DER_SUCCESS on success, negative value on * failure. */ -int crt_hg_remove_client_id(uint64_t client_id); +int crt_cleanup_client_id(uint64_t client_id); + + * Initialize swim on the specified context index. + * + * \param[in] crt_ctx_idx Context index to initialize swim on + * + * \return DER_SUCCESS on success, negative value on + * failure. + */ +int crt_swim_init(int crt_ctx_idx); + +/** Finalize swim. + */ +void crt_swim_fini(void); +>>>>>>> master #define crt_proc__Bool crt_proc_bool #define crt_proc_d_rank_t crt_proc_uint32_t From 902a83bfe9a58f99973437006e689a9fa7dba7ce Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Wed, 19 Feb 2020 18:16:50 +0000 Subject: [PATCH 7/9] CaRT788: Getting mercury patch Signed-off-by: Vikram Chhabra --- src/cart/crt_hg.c | 2 +- utils/build.config | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 6889ed58b..cada2f978 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -472,7 +472,7 @@ crt_hg_remove_addr(hg_class_t *hg_class, hg_addr_t hg_addr) { hg_return_t ret = HG_SUCCESS; - D_DEBUG(DB_TRACE, "removing hg addr %" PRIu64," class %" PRIu64 "\n", + D_DEBUG(DB_TRACE, "removing hg addr %"PRIx64 " class %"PRIx64 "\n", (uint64_t)hg_addr, (uint64_t)hg_class); ret = HG_Addr_set_remove(hg_class, hg_addr); if (ret != HG_SUCCESS) { diff --git a/utils/build.config b/utils/build.config index d5fe6b5c6..f53e49b23 100644 --- a/utils/build.config +++ b/utils/build.config @@ -4,5 +4,5 @@ component=cart [commit_versions] OFI = 955f3a07dd011fb1dbfa6b6c772ada03d5af320e OPENPA = v1.0.4 -MERCURY = c2c262813811c3ede28ee32fdebbffd417a7cb80 +MERCURY = 951d32a4a07ab4b6fae9655dfbbfdbf4f02838fd PSM2 = PSM2_11.2.78 From 8dac3aa46b3568311f043a541134ada7c4cef8a0 Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Wed, 19 Feb 2020 21:08:27 +0000 Subject: [PATCH 8/9] Fixed build error. Signed-off-by: Vikram Chhabra --- src/cart/crt_hg.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index cada2f978..46d076f4c 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -72,7 +72,7 @@ crt_ha_link2ptr(d_list_t *rlink) } static struct crt_ha_mapping * -crt_ha_mapping_init(uint64_t key, struct ha_entry value) +crt_ha_mapping_init(uint64_t key) { struct crt_ha_mapping *ha; int rc; @@ -1221,7 +1221,6 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) crt_rpc_t *rpc_tmp_pub; d_list_t *halink; struct crt_ha_mapping *ha; - struct ha_entry ha_entry; bool found = false; struct crt_ha_list_entry *ha_list_entry; d_rank_t hdr_src_rank; @@ -1337,7 +1336,7 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) } d_hash_rec_decref(ha_hash_table_ptr, halink); } else { - ha = crt_ha_mapping_init(client_id, ha_entry); + ha = crt_ha_mapping_init(client_id); if (!ha) { D_ERROR("Failed to allocate entry\n"); D_RWLOCK_UNLOCK(ha_hash_table_rwlock_ptr); From 707d365d2c60dfa9dfca8560707c1cdbc41bf1da Mon Sep 17 00:00:00 2001 From: Vikram Chhabra Date: Wed, 19 Feb 2020 22:27:16 +0000 Subject: [PATCH 9/9] Fixed CentOS build error Signed-off-by: Vikram Chhabra --- src/cart/crt_hg.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cart/crt_hg.c b/src/cart/crt_hg.c index 46d076f4c..63e1c7a6f 100644 --- a/src/cart/crt_hg.c +++ b/src/cart/crt_hg.c @@ -1302,7 +1302,7 @@ crt_rpc_handler_common(hg_handle_t hg_hdl) chl_list_link) { if ((ha_list_entry->chl_entry.ha_class == hg_info->hg_class) - && (HG_Addr_cmp(hg_info->hg_class, + && ((bool)HG_Addr_cmp(hg_info->hg_class, ha_list_entry->chl_entry.ha_addr, hg_info->addr))) { found = true;