Skip to content

Commit

Permalink
[patch] fix bug in processing kafka metadata from MSK (#184)
Browse files Browse the repository at this point in the history
* [patch] Trace out list of brokers in the metadata

* [patch] Cope with broker nodeids higher than broker count

---------

Co-authored-by: Jon Levell <[email protected]>
  • Loading branch information
jonquark and Jon Levell authored Jul 22, 2024
1 parent af2e7a2 commit 5b90141
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions server_proxy/src/pxmhub.c
Original file line number Diff line number Diff line change
Expand Up @@ -3318,8 +3318,17 @@ static int processPartMetadata(ism_mhub_t * mhub, mhub_broker_list_t * brokers,
mhub->id, topicname, partid, partrc, leader);
if (topic) {
mhub_part_t * part = topic->partitions+partid;

//Check the leader is a valid broker
int numLeaderMatches = 0;
for (i = 0; i < brokercnt; i++){
if (brokers[i].nodeid == leader){
numLeaderMatches += 1;
}
}

pthread_mutex_lock(&part->lock);
if (partrc == 0 && leader < brokercnt) {
if (partrc == 0 && numLeaderMatches == 1) {
part->valid = 1;
if(part->leader != leader){
//If leader changed, need to disconnect transport.
Expand Down Expand Up @@ -3358,9 +3367,9 @@ static int processPartMetadata(ism_mhub_t * mhub, mhub_broker_list_t * brokers,
ism_common_setTimerOnce(ISM_TIMER_LOW, (ism_attime_t)mhubCreateData, info, 1000000);
}
} else {
if (part->valid < 2) {
LOG(WARN, Server, 975, "%s%-s%-s%u%d", "MessageHub partition metadata error: Org={0} ID={1} Topic={2} Part={3} RC={4}",
LOG(WARN, Server, 975, "%s%-s%-s%u%d", "MessageHub partition metadata error: Org={0} ID={1} Topic={2} Part={3} RC={4}",
mhub->tenant->name, mhub->id, topic->name, partid, partrc);
if (part->valid < 2) {
part->valid = 2; /* Topic not valid */
}
}
Expand Down Expand Up @@ -3720,6 +3729,8 @@ static int mhubReceiveMetadata(ism_transport_t * transport, char * inbuf, int bu
brokers[i].nodeid = ism_kafka_getInt4(buf);
brokers[i].broker_len = ism_kafka_getString(buf, (char * *)&brokers[i].broker);
brokers[i].port = (uint16_t) ism_kafka_getInt4(buf);
TRACE(8, "MessageHub broker metadata: arrayindex=%d nodeid=%d broker=%s port=%d\n",
i, brokers[i].nodeid, brokers[i].broker, brokers[i].port);
}
int topic_count = ism_kafka_getInt4(buf);
for (i=0; rc==0 && i<topic_count; i++) {
Expand Down

0 comments on commit 5b90141

Please sign in to comment.