diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index bf83b977a4..9a9d485bd7 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -974,6 +974,7 @@ void clusterInit(void) { server.cluster->failover_auth_epoch = 0; server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; server.cluster->lastVoteEpoch = 0; + server.cluster->is_light_hdr_supported = 1; /* Initialize stats */ for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) { @@ -3607,11 +3608,12 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { uint16_t type = ntohs(msgblock->msg.type); int is_msg_type_publish = (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD || type == CLUSTERMSG_TYPE_PUBLISH_LIGHT || type == CLUSTERMSG_TYPE_PUBLISHSHARD_LIGHT); + int nodes_not_supporting_light_header = 0; di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - + if (!nodeSupportsLightMsgHdr(node)) nodes_not_supporting_light_header++; if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; if (!is_msg_type_publish) { clusterSendMessage(node->link, msgblock); @@ -3620,6 +3622,12 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { } } dictReleaseIterator(di); + + if (nodes_not_supporting_light_header) { + server.cluster->is_light_hdr_supported = 0; + } else { + server.cluster->is_light_hdr_supported = 1; + } } /* Build the message header. hdr must point to a buffer at least @@ -4056,10 +4064,12 @@ void clusterPropagatePublish(robj *channel, robj **messages, int count, int shar if (!sharded) { clusterBroadcastMessage(msgblock_light); clusterMsgSendBlockDecrRefCount(msgblock_light); - for (i = 0; i < count; i++) { - msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISH); - clusterBroadcastMessage(msgblock); - clusterMsgSendBlockDecrRefCount(msgblock); + if (!server.cluster->is_light_hdr_supported) { + for (i = 0; i < count; i++) { + msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISH); + clusterBroadcastMessage(msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); + } } return; } @@ -4070,28 +4080,39 @@ void clusterPropagatePublish(robj *channel, robj **messages, int count, int shar list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself); serverAssert(nodes_for_slot != NULL); listRewind(nodes_for_slot, &li); + int nodes_not_supporting_light_header = 0; while ((ln = listNext(&li))) { node = listNodeValue(ln); if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - if (nodeSupportsLightMsgHdr(node)) + if (nodeSupportsLightMsgHdr(node)) { clusterSendMessage(node->link, msgblock_light); - else + } else { + nodes_not_supporting_light_header++; continue; + } } clusterMsgSendBlockDecrRefCount(msgblock_light); - for (i = 0; i < count; i++) { - listRewind(nodes_for_slot, &li); - msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISHSHARD); - while ((ln = listNext(&li))) { - node = listNodeValue(ln); - if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - if (nodeSupportsLightMsgHdr(node)) - continue; - else - clusterSendMessage(node->link, msgblock); + if (nodes_not_supporting_light_header) { + server.cluster->is_light_hdr_supported = 0; + } else { + server.cluster->is_light_hdr_supported = 1; + } + + if (!server.cluster->is_light_hdr_supported) { + for (i = 0; i < count; i++) { + listRewind(nodes_for_slot, &li); + msgblock = clusterCreatePublishMsgBlock(channel, messages[i], CLUSTERMSG_TYPE_PUBLISHSHARD); + while ((ln = listNext(&li))) { + node = listNodeValue(ln); + if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; + if (nodeSupportsLightMsgHdr(node)) + continue; + else + clusterSendMessage(node->link, msgblock); + } + clusterMsgSendBlockDecrRefCount(msgblock); } - clusterMsgSendBlockDecrRefCount(msgblock); } } diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 002c02e6d9..e962cc94f2 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -365,6 +365,7 @@ struct clusterState { clusterNode *migrating_slots_to[CLUSTER_SLOTS]; clusterNode *importing_slots_from[CLUSTER_SLOTS]; clusterNode *slots[CLUSTER_SLOTS]; + int is_light_hdr_supported; /* The following fields are used to take the replica state on elections. */ mstime_t failover_auth_time; /* Time of previous or next election. */ int failover_auth_count; /* Number of votes received so far. */