Skip to content
This repository has been archived by the owner on Dec 2, 2021. It is now read-only.

CaRT-788 : Added new API to allow for reset of fabric interfaces on PSM2 when Eager message problems happen due to client reboots. #344

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
277 changes: 274 additions & 3 deletions src/cart/crt_hg.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,146 @@

#include "crt_internal.h"

#ifdef HASH_TEST
int hg_addr_table_index=0;
#endif

struct ha_entry {
hg_class_t *hg_class;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hg_class is associated with crt_context. As such we should keep the addr table per context and not store hg_class for each entry. In your current implementation (even if client id was unique), you would run into issues, since if client sends to server on address rank=0 tag=0 and on rank=0 tag=1, you will end up with 2 ha_entries for the same client id.

Storing this hash table per each context will save space.

Whenever hg_class is needed it can be derived from context in question from crt_ctx->cc_hg_ctx.chc.chc_hgcla

hg_addr_t hg_addr;
};

struct crt_ha_mapping {
d_list_t ha_link;
uint64_t ha_key;
struct ha_entry ha_value;

uint32_t ha_ref;
uint32_t ha_initialized;

pthread_mutex_t ha_mutex;
};

struct crt_ha_mapping *
crt_ha_link2ptr(d_list_t *rlink)
{
D_ASSERT(rlink != NULL);
return container_of(rlink, struct crt_ha_mapping, ha_link);
}

static struct crt_ha_mapping *
crt_ha_mapping_init(uint64_t key, struct ha_entry value)
{
struct crt_ha_mapping *ha;
int rc;

D_ALLOC_PTR(ha);
if (!ha) {
D_ERROR("Failed to allocate ha item\n");
D_GOTO(out, ha);
}

D_INIT_LIST_HEAD(&ha->ha_link);
ha->ha_ref = 0;
ha->ha_initialized = 1;

rc = D_MUTEX_INIT(&ha->ha_mutex, NULL);
if (rc != 0) {
D_FREE_PTR(ha);
D_GOTO(out, ha = NULL);
}

ha->ha_key = key;
ha->ha_value = value;

out:
return ha;
}

static int
ha_op_key_get(struct d_hash_table *hhtab, d_list_t *rlink, void **key_pp)
{
struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink);

*key_pp = (void *)&ha->ha_key;
return sizeof(ha->ha_key);
}

static uint32_t
ha_op_key_hash(struct d_hash_table *hhtab, const void *key, unsigned int ksize)
{
D_ASSERT(ksize == sizeof(uint64_t));

return (unsigned int)(*(const uint64_t *)key %
(1U << CRT_HG_ADDR_CACHE_LOOKUP_BITS));
}

static bool
ha_op_key_cmp(struct d_hash_table *hhtab, d_list_t *rlink,
const void *key, unsigned int ksize)
{
struct crt_ha_mapping *ha = crt_ha_link2ptr(rlink);

D_ASSERT(ksize == sizeof(uint64_t));

return ha->ha_key == *(uint64_t *)key;
}

static void
ha_op_rec_addref(struct d_hash_table *hhtab, d_list_t *halink)
{
struct crt_ha_mapping *ha = crt_ha_link2ptr(halink);

D_ASSERT(ha->ha_initialized);
D_MUTEX_LOCK(&ha->ha_mutex);
ha->ha_ref++;
D_MUTEX_UNLOCK(&ha->ha_mutex);
}

static bool
ha_op_rec_decref(struct d_hash_table *hhtab, d_list_t *halink)
{
uint32_t ref;
struct crt_ha_mapping *ha = crt_ha_link2ptr(halink);

D_ASSERT(ha->ha_initialized);
D_MUTEX_LOCK(&ha->ha_mutex);
ref = --ha->ha_ref;
D_MUTEX_UNLOCK(&ha->ha_mutex);

return ref == 0;
}

static void
crt_ha_destroy(struct crt_ha_mapping *ha)
{
D_ASSERT(ha != NULL);
D_ASSERT(ha->ha_ref == 0);
D_ASSERT(ha->ha_initialized == 1);

D_MUTEX_DESTROY(&ha->ha_mutex);
D_FREE(ha);
}

static void
ha_op_rec_free(struct d_hash_table *hhtab, d_list_t *halink)
{
crt_ha_destroy(crt_ha_link2ptr(halink));
}



static d_hash_table_ops_t ha_mapping_ops = {
.hop_key_get = ha_op_key_get,
.hop_key_hash = ha_op_key_hash,
.hop_key_cmp = ha_op_key_cmp,
.hop_rec_addref = ha_op_rec_addref,
.hop_rec_decref = ha_op_rec_decref,
.hop_rec_free = ha_op_rec_free,
};

struct d_hash_table ha_hash_table;

/*
* na_dict table should be in the same order of enum crt_na_type, the last one
* is terminator with NULL nad_str.
Expand Down Expand Up @@ -327,6 +467,85 @@ crt_hg_addr_lookup(struct crt_hg_context *hg_ctx, const char *name,
return rc;
}

int
crt_hg_remove_addr (hg_class_t *hg_class, hg_addr_t hg_addr)
{
hg_return_t ret = HG_SUCCESS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldnt reuse hg return codes for our own return codes


D_DEBUG(DB_TRACE, "removing addr from class \n" );
ret = HG_Addr_set_remove(hg_class, hg_addr);
if (ret != HG_SUCCESS) {
D_ERROR("HG_Addr_set_remove) failed, hg_ret %d.\n", ret);
D_GOTO(out, ret = -DER_HG);
}

D_DEBUG(DB_TRACE, "freeing addr from class\n");
ret = HG_Addr_free(hg_class, hg_addr);
if (ret != HG_SUCCESS) {
D_ERROR("HG_Addr_free() failed, hg_ret %d.\n", ret);
D_GOTO(out, ret = -DER_HG);
}

out:
return ret;
}
int
crt_hg_remove_client_id (uint64_t client_id)
{
hg_return_t ret = HG_SUCCESS;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there need to be checks here:

  1. if (!is_service()) return -- only supported on clients
  2. only for psm2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be int, we shouldnt reuse hg_return_t for our error codes

d_list_t *halink;
struct crt_ha_mapping *ha;
struct ha_entry ha_value;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the section from here on should be protected by some lock. its possible that client with same client_id connects as you are trying to destroy its entry, which can lead to race conditions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

halink = d_hash_rec_find(&ha_hash_table,
(void *)&client_id, sizeof(client_id));
if (!halink) {
D_ERROR("client=%" PRIu64 " not part of the hash table\n", client_id);
D_GOTO(out, ret = -DER_HG);
}

ha = crt_ha_link2ptr(halink);
ha_value = ha->ha_value;

d_hash_rec_decref(&ha_hash_table, halink);

ret = crt_hg_remove_addr (ha_value.hg_class, ha_value.hg_addr);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) space prohibited between function name and open parenthesis '('


if (ret != HG_SUCCESS) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible

D_ERROR("crt_remove_addr failed, hg_ret %d.\n", ret);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible

D_GOTO(out, ret = -DER_HG);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible



d_hash_rec_delete(&ha_hash_table,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible

&client_id, sizeof(client_id));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) trailing whitespace

D_DEBUG(DB_TRACE, "client id %" PRIu64 "removed from hash table\n",
client_id);
D_DEBUG(DB_TRACE, "removed hg class %" PRIu64 "\n",(long unsigned int) ha_value.hg_class );
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) line over 80 characters
(style) code indent should use tabs where possible
(style) type 'long unsigned int' should be specified in [[un]signed] [short|int|long|long long] order
(style) space required after that ',' (ctx:VxV)
(style) space prohibited before that close parenthesis ')'

D_DEBUG(DB_TRACE, "removed hg addr %" PRIu64 "\n",(long unsigned int) ha_value.hg_addr );
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) line over 80 characters
(style) code indent should use tabs where possible
(style) type 'long unsigned int' should be specified in [[un]signed] [short|int|long|long long] order
(style) space required after that ',' (ctx:VxV)
(style) space prohibited before that close parenthesis ')'


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) trailing whitespace

out:
return ret;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible

}
int
crt_hg_remove_all_client_ids (void)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) space prohibited between function name and open parenthesis '('

{
hg_return_t ret = HG_SUCCESS;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs


#ifdef HASH_TEST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it conditional? when server shuts down it needs to destroy all tables, which would execute similar code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats some test code. I needed to have a copy of the client id's in the test application. Was using it for that purpose. Yes this may transfer into the shutdown case we had discussed.

while (hg_addr_table_index > 0) {
ret = crt_hg_remove_client_id (hg_addr_table[--hg_addr_table_index]);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) line over 80 characters
(style) space prohibited between function name and open parenthesis '('

if (ret != HG_SUCCESS) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

D_ERROR("crt_remove_addr_all failed, hg_ret %d.\n", ret);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) line over 80 characters
(style) code indent should use tabs where possible
(style) please, no space before tabs

D_GOTO(out, ret = -DER_HG);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

}
#endif
out:
return ret;
}

int
crt_hg_addr_free(struct crt_hg_context *hg_ctx, hg_addr_t addr)
{
Expand Down Expand Up @@ -578,7 +797,15 @@ crt_hg_init(crt_phy_addr_t *addr, bool server)

D_DEBUG(DB_NET, "in crt_hg_init, listen address: %s.\n", *addr);
crt_gdata.cg_hg = hg_gdata;

/*Create the hg_addr hash table*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this section should also be only done if psm2

rc = d_hash_table_create_inplace(D_HASH_FT_NOLOCK,
CRT_LOOKUP_CACHE_BITS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be CRT_HG_ADDR_CACHE_LOOKUP_BITS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

NULL, &ha_mapping_ops,
&ha_hash_table);
if (rc != 0) {
D_ERROR("d_hash_table_create failed, rc: %d\n", rc);
D_GOTO(out, rc);
}
out:
if (info_string)
D_FREE(info_string);
Expand Down Expand Up @@ -618,7 +845,7 @@ crt_hg_fini()
D_WARN("Could not finalize NA class, na_ret: %d.\n", na_ret);
rc = -DER_HG;
}

d_hash_table_destroy_inplace(&ha_hash_table, true);
D_FREE(crt_gdata.cg_hg);
return rc;
}
Expand Down Expand Up @@ -824,7 +1051,9 @@ crt_rpc_handler_common(hg_handle_t hg_hdl)
bool is_coll_req = false;
int rc = 0;
struct crt_rpc_priv rpc_tmp = {0};

d_list_t *halink;
struct crt_ha_mapping *ha;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) please, no space before tabs

struct ha_entry ha_entry;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

hg_info = HG_Get_info(hg_hdl);
if (hg_info == NULL) {
D_ERROR("HG_Get_info failed.\n");
Expand Down Expand Up @@ -858,6 +1087,48 @@ crt_rpc_handler_common(hg_handle_t hg_hdl)
D_ASSERT(proc != NULL);
opc = rpc_tmp.crp_req_hdr.cch_opc;

if (crt_gdata.cg_na_plugin == CRT_NA_OFI_PSM2) {
/** Search for the recieved hg_addr in the ha hash table. If
* not found, then insert.
*/
halink = d_hash_rec_find(&ha_hash_table,
(void *)&rpc_tmp.crp_req_hdr.cch_clid,
sizeof(rpc_tmp.crp_req_hdr.cch_clid));
if (halink != NULL) {
D_DEBUG(DB_TRACE, "client id %" PRIu64 "already in hash table\n",
rpc_tmp.crp_req_hdr.cch_clid);

d_hash_rec_decref(&ha_hash_table, halink);
}
else {
ha_entry.hg_class = hg_info->hg_class;
HG_Addr_dup(hg_info->hg_class, hg_info->addr, &ha_entry.hg_addr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line may not work correctly after the mercury nameserver patch. In mercury the receiver side does not have the client's fi_addr. I haven't found the specific code where mercury constructs hg_info before calling this crt_rpc_handler_common(). I haven't found a way to let the receiver side retrieve the client's fi_addr on the fi/mercury level. We probably can do it on the cart level.

ha = crt_ha_mapping_init(rpc_tmp.crp_req_hdr.cch_clid, ha_entry);
if (!ha) {
D_ERROR("Failed to allocate entry\n");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

D_GOTO(out, hg_ret = HG_SUCCESS);
}
#ifdef HASH_TEST
hg_addr_table[hg_addr_table_index++] =
rpc_tmp.crp_req_hdr.cch_clid;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible


#endif
rc = d_hash_rec_insert(&ha_hash_table,
(void *)&rpc_tmp.crp_req_hdr.cch_clid,
sizeof(rpc_tmp.crp_req_hdr.cch_clid),
&ha->ha_link, true);
if (rc != 0) {
D_ERROR("Failed to add entry; rc=%d\n", rc);
crt_ha_destroy(ha);
D_GOTO(out, hg_ret = HG_SUCCESS);
}
D_DEBUG(DB_TRACE, "client id %" PRIu64 "inserted in hash table\n",
rpc_tmp.crp_req_hdr.cch_clid);
D_DEBUG(DB_TRACE, "inserted hg addr %" PRIu64 "\n", (long unsigned int)ha_entry.hg_addr );
D_DEBUG(DB_TRACE, "inserted hg class %" PRIu64 "\n",(long unsigned int) ha_entry.hg_class );

}
}
/**
* Set the opcode in the temp RPC so that it can be correctly logged.
*/
Expand Down
9 changes: 8 additions & 1 deletion src/cart/crt_hg.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,19 @@
/** MAX number of HG handles in pool */
#define CRT_HG_POOL_MAX_NUM (512)
/** number of prepost HG handles when enable pool */
#define CRT_HG_POOL_PREPOST_NUM (16)
#define CRT_HG_POOL_PREPOST_NUM (16)
#define CRT_HG_ADDR_CACHE_LOOKUP_BITS (5)
#define HASH_TEST 1

#ifdef HASH_TEST
uint64_t hg_addr_table[1000];
#endif
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) trailing whitespace


struct crt_rpc_priv;
struct crt_common_hdr;
struct crt_corpc_hdr;


/** type of NA plugin */
enum crt_na_type {
CRT_NA_SM = 0,
Expand Down
6 changes: 4 additions & 2 deletions src/cart/crt_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static int data_init(crt_init_options_t *opt)
crt_gdata.cg_addr = NULL;
crt_gdata.cg_na_plugin = CRT_NA_OFI_SOCKETS;
crt_gdata.cg_share_na = false;

crt_gdata.cg_clid = 0;
timeout = 0;

if (opt && opt->cio_crt_timeout != 0)
Expand Down Expand Up @@ -164,7 +164,9 @@ static int data_init(crt_init_options_t *opt)
else
d_fault_inject_disable();
}

if (opt && opt->cio_clid != 0)
crt_gdata.cg_clid = opt->cio_clid;
/*TBD Do we use an ENV variable if above not defined? */
gdata_init_flag = 1;
exit:
return rc;
Expand Down
2 changes: 2 additions & 0 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ struct crt_gdata {

/* protects crt_gdata */
pthread_rwlock_t cg_rwlock;
/* Job Id associated with this instance. */
uint64_t cg_clid;
};

extern struct crt_gdata crt_gdata;
Expand Down
3 changes: 2 additions & 1 deletion src/cart/crt_rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1139,7 +1139,8 @@ crt_req_send(crt_rpc_t *req, crt_cb_t complete_cb, void *arg)
* function. Referenced dropped at end of this function.
*/
RPC_ADDREF(rpc_priv);

/* Insert the local job id in the rpc request. */
rpc_priv->crp_req_hdr.cch_clid = crt_gdata.cg_clid;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible
(style) please, no space before tabs

if (req->cr_ctx == NULL) {
D_ERROR("invalid parameter (NULL req->cr_ctx).\n");
D_GOTO(out, rc = -DER_INVAL);
Expand Down
2 changes: 2 additions & 0 deletions src/cart/crt_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ struct crt_common_hdr {
uint32_t cch_xid;
/* used in crp_reply_hdr to propagate rpc failure back to sender */
uint32_t cch_rc;
/* client id */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible

uint64_t cch_clid;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(style) code indent should use tabs where possible

};

typedef enum {
Expand Down
Loading