diff --git a/server_proxy/src/pxmhub.c b/server_proxy/src/pxmhub.c index af4742dc..4f3c40c3 100644 --- a/server_proxy/src/pxmhub.c +++ b/server_proxy/src/pxmhub.c @@ -3318,8 +3318,17 @@ static int processPartMetadata(ism_mhub_t * mhub, mhub_broker_list_t * brokers, mhub->id, topicname, partid, partrc, leader); if (topic) { mhub_part_t * part = topic->partitions+partid; + + //Check the leader is a valid broker + int numLeaderMatches = 0; + for (i = 0; i < brokercnt; i++){ + if (brokers[i].nodeid == leader){ + numLeaderMatches += 1; + } + } + pthread_mutex_lock(&part->lock); - if (partrc == 0 && leader < brokercnt) { + if (partrc == 0 && numLeaderMatches == 1) { part->valid = 1; if(part->leader != leader){ //If leader changed, need to disconnect transport. @@ -3358,9 +3367,9 @@ static int processPartMetadata(ism_mhub_t * mhub, mhub_broker_list_t * brokers, ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubCreateData, info, 1000000); } } else { - if (part->valid < 2) { - LOG(WARN, Server, 975, "%s%-s%-s%u%d", "MessageHub partition metadata error: Org={0} ID={1} Topic={2} Part={3} RC={4}", + LOG(WARN, Server, 975, "%s%-s%-s%u%d", "MessageHub partition metadata error: Org={0} ID={1} Topic={2} Part={3} RC={4}", mhub->tenant->name, mhub->id, topic->name, partid, partrc); + if (part->valid < 2) { part->valid = 2; /* Topic not valid */ } } @@ -3720,6 +3729,8 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu brokers[i].nodeid = ism_kafka_getInt4(buf); brokers[i].broker_len = ism_kafka_getString(buf, (char * *)&brokers[i].broker); brokers[i].port = (uint16_t) ism_kafka_getInt4(buf); + TRACE(8, "MessageHub broker metadata: arrayindex=%d nodeid=%d broker=%s port=%d\n", + i, brokers[i].nodeid, brokers[i].broker, brokers[i].port); } int topic_count = ism_kafka_getInt4(buf); for (i=0; rc==0 && i