diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 73d484c61c..4c0fbec68e 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -117,6 +117,7 @@ int auxTcpPortPresent(clusterNode *n); int auxTlsPortSetter(clusterNode *n, void *value, size_t length); sds auxTlsPortGetter(clusterNode *n, sds s); int auxTlsPortPresent(clusterNode *n); +static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen); static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); void freeClusterLink(clusterLink *link); int verifyClusterNodeId(const char *name, int length); @@ -149,6 +150,10 @@ static inline int defaultClientPort(void) { (server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot)) #define RCVBUF_INIT_LEN 1024 +#define RCVBUF_MIN_READ_LEN 14 +static_assert(offsetof(clusterMsg, type) + sizeof(uint16_t) == RCVBUF_MIN_READ_LEN, + "Incorrect length to read to identify type"); + #define RCVBUF_MAX_PREALLOC (1 << 20) /* 1MB */ /* Fixed timeout value for cluster operations (milliseconds) */ @@ -187,6 +192,54 @@ dictType clusterSdsToListType = { NULL /* allow to expand */ }; +typedef struct { + enum { ITER_DICT, ITER_LIST } type; + union { + dictIterator di; + listIter li; + }; +} ClusterNodeIterator; + +static void clusterNodeIterInitAllNodes(ClusterNodeIterator *iter) { + iter->type = ITER_DICT; + dictInitSafeIterator(&iter->di, server.cluster->nodes); +} + +static void clusterNodeIterInitMyShard(ClusterNodeIterator *iter) { + list *nodes = clusterGetNodesInMyShard(server.cluster->myself); + serverAssert(nodes != NULL); + iter->type = ITER_LIST; + listRewind(nodes, &iter->li); +} + +static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) { + switch (iter->type) { + case ITER_DICT: { + /* Get the next entry in the dictionary */ + dictEntry *de = dictNext(&iter->di); + /* Return the value associated with the entry, or NULL if no more entries */ + return de ? dictGetVal(de) : NULL; + } + case ITER_LIST: { + /* Get the next node in the list */ + listNode *ln = listNext(&iter->li); + /* Return the value associated with the node, or NULL if no more nodes */ + return ln ? listNodeValue(ln) : NULL; + } + /* This line is unreachable but added to avoid compiler warnings */ + default: { + serverPanic("bad type"); + return NULL; + } + } +} + +static void clusterNodeIterReset(ClusterNodeIterator *iter) { + if (iter->type == ITER_DICT) { + dictResetIterator(&iter->di); + } +} + /* Aux fields were introduced in Redis OSS 7.2 to support the persistence * of various important node properties, such as shard id, in nodes.conf. * Aux fields take an explicit format of name=value pairs and have no @@ -371,7 +424,10 @@ int auxTlsPortPresent(clusterNode *n) { typedef struct { size_t totlen; /* Total length of this block including the message */ int refcount; /* Number of cluster link send msg queues containing the message */ - clusterMsg msg; + union { + clusterMsg msg; + clusterMsgLight msg_light; + }; } clusterMsgSendBlock; /* ----------------------------------------------------------------------------- @@ -896,6 +952,7 @@ void clusterUpdateMyselfFlags(void) { int nofailover = server.cluster_replica_no_failover ? CLUSTER_NODE_NOFAILOVER : 0; myself->flags &= ~CLUSTER_NODE_NOFAILOVER; myself->flags |= nofailover; + myself->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED; if (myself->flags != oldflags) { clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE); } @@ -1211,12 +1268,16 @@ void clusterReset(int hard) { * CLUSTER communication link * -------------------------------------------------------------------------- */ static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen) { - uint32_t blocklen = msglen + sizeof(clusterMsgSendBlock) - sizeof(clusterMsg); + uint32_t blocklen = msglen + offsetof(clusterMsgSendBlock, msg); clusterMsgSendBlock *msgblock = zcalloc(blocklen); msgblock->refcount = 1; msgblock->totlen = blocklen; server.stat_cluster_links_memory += blocklen; - clusterBuildMessageHdr(&msgblock->msg, type, msglen); + if (IS_LIGHT_MESSAGE(type)) { + clusterBuildMessageHdrLight(&msgblock->msg_light, type, msglen); + } else { + clusterBuildMessageHdr(&msgblock->msg, type, msglen); + } return msgblock; } @@ -2817,10 +2878,53 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { return sender; } +static void clusterProcessPublishPacket(clusterMsgDataPublish *publish_data, uint16_t type) { + robj *channel, *message; + uint32_t channel_len, message_len; + + /* Don't bother creating useless objects if there are no + * Pub/Sub subscribers. */ + if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) || + (type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) { + channel_len = ntohl(publish_data->channel_len); + message_len = ntohl(publish_data->message_len); + channel = createStringObject((char *)publish_data->bulk_data, channel_len); + message = createStringObject((char *)publish_data->bulk_data + channel_len, message_len); + pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); + decrRefCount(channel); + decrRefCount(message); + } +} + +static void clusterProcessLightPacket(clusterLink *link, uint16_t type) { + clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf; + + if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { + clusterProcessPublishPacket(&hdr->data.publish.msg, type); + } +} + +static inline int messageTypeSupportsLightHdr(uint16_t type) { + switch (type) { + case CLUSTERMSG_TYPE_PUBLISH: return 1; + case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1; + } + return 0; +} + + int clusterIsValidPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg *)link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); - uint16_t type = ntohs(hdr->type); + int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type)); + uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK; + + if (is_light && !messageTypeSupportsLightHdr(type)) { + serverLog(LL_NOTICE, + "Packet of type '%s' (%u) does not support light cluster header. Marking packet as invalid.", + clusterGetMessageTypeString(type), type); + return 0; + } if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++; @@ -2876,9 +2980,18 @@ int clusterIsValidPacket(clusterLink *link) { explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); explen += sizeof(clusterMsgDataFail); } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { - explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); - explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) + - ntohl(hdr->data.publish.msg.message_len); + clusterMsgDataPublish *publish_data; + if (is_light) { + clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf; + publish_data = &hdr_light->data.publish.msg; + explen = sizeof(clusterMsgLight); + } else { + publish_data = &hdr->data.publish.msg; + explen = sizeof(clusterMsg); + } + explen -= sizeof(union clusterMsgData); + explen += + sizeof(clusterMsgDataPublish) - 8 + ntohl(publish_data->channel_len) + ntohl(publish_data->message_len); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK || type == CLUSTERMSG_TYPE_MFSTART) { explen = sizeof(clusterMsg) - sizeof(union clusterMsgData); @@ -2925,8 +3038,24 @@ int clusterProcessPacket(clusterLink *link) { } clusterMsg *hdr = (clusterMsg *)link->rcvbuf; - uint16_t type = ntohs(hdr->type); mstime_t now = mstime(); + int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type)); + uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK; + + if (is_light) { + if (!link->node || nodeInHandshake(link->node)) { + freeClusterLink(link); + serverLog( + LL_NOTICE, + "Closing link for node that sent a lightweight message of type %hu as its first message on the link", + type); + return 0; + } + clusterNode *sender = link->node; + sender->data_received = now; + clusterProcessLightPacket(link, type); + return 1; + } uint16_t flags = ntohs(hdr->flags); uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0; @@ -2939,6 +3068,15 @@ int clusterProcessPacket(clusterLink *link) { sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED; } + /* Checks if the node supports light message hdr */ + if (sender) { + if (flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) { + sender->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED; + } else { + sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_SUPPORTED; + } + } + /* Update the last time we saw any data from this node. We * use this in order to avoid detecting a timeout from a node that * is just sending a lot of data in the cluster bus, for instance @@ -3285,22 +3423,7 @@ int clusterProcessPacket(clusterLink *link) { } } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) { if (!sender) return 1; /* We don't know that node. */ - - robj *channel, *message; - uint32_t channel_len, message_len; - - /* Don't bother creating useless objects if there are no - * Pub/Sub subscribers. */ - if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) || - (type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) { - channel_len = ntohl(hdr->data.publish.msg.channel_len); - message_len = ntohl(hdr->data.publish.msg.message_len); - channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len); - message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len); - pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD); - decrRefCount(channel); - decrRefCount(message); - } + clusterProcessPublishPacket(&hdr->data.publish.msg, type); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ clusterSendFailoverAuthIfNeeded(sender, hdr); @@ -3465,6 +3588,16 @@ void clusterLinkConnectHandler(connection *conn) { serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport); } +/* Performs sanity check on the message signature and length depending on the type. */ +static inline int isClusterMsgSignatureAndLengthValid(clusterMsg *hdr) { + if (memcmp(hdr->sig, "RCmb", 4) != 0) return 0; + uint16_t type = ntohs(hdr->type); + uint32_t totlen = ntohl(hdr->totlen); + uint32_t minlen = IS_LIGHT_MESSAGE(type) ? CLUSTERMSG_LIGHT_MIN_LEN : CLUSTERMSG_MIN_LEN; + if (totlen < minlen) return 0; + return 1; +} + /* Read data. Try to read the first field of the header first to check the * full length of the packet. When a whole packet is in memory this function * will call the function to process the packet. And so forth. */ @@ -3477,17 +3610,17 @@ void clusterReadHandler(connection *conn) { while (1) { /* Read as long as there is data to read. */ rcvbuflen = link->rcvbuf_len; - if (rcvbuflen < 8) { - /* First, obtain the first 8 bytes to get the full message - * length. */ - readlen = 8 - rcvbuflen; + if (rcvbuflen < RCVBUF_MIN_READ_LEN) { + /* First, obtain the first 16 bytes to get the full message + * length and type. */ + readlen = RCVBUF_MIN_READ_LEN - rcvbuflen; } else { /* Finally read the full message. */ hdr = (clusterMsg *)link->rcvbuf; - if (rcvbuflen == 8) { + if (rcvbuflen == RCVBUF_MIN_READ_LEN) { /* Perform some sanity check on the message signature * and length. */ - if (memcmp(hdr->sig, "RCmb", 4) != 0 || ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) { + if (!isClusterMsgSignatureAndLengthValid(hdr)) { char ip[NET_IP_STR_LEN]; int port; if (connAddrPeerName(conn, ip, sizeof(ip), &port) == -1) { @@ -3534,7 +3667,7 @@ void clusterReadHandler(connection *conn) { } /* Total length obtained? Process this packet. */ - if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) { + if (rcvbuflen >= RCVBUF_MIN_READ_LEN && rcvbuflen == ntohl(hdr->totlen)) { if (clusterProcessPacket(link)) { if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) { size_t prev_rcvbuf_alloc = link->rcvbuf_alloc; @@ -3594,6 +3727,18 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { dictReleaseIterator(di); } +static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen) { + hdr->ver = htons(CLUSTER_PROTO_VER); + hdr->sig[0] = 'R'; + hdr->sig[1] = 'C'; + hdr->sig[2] = 'm'; + hdr->sig[3] = 'b'; + hdr->type = htons(type); + hdr->notused1 = 0; + hdr->notused2 = 0; + hdr->totlen = htonl(msglen); +} + /* Build the message header. hdr must point to a buffer at least * sizeof(clusterMsg) in bytes. */ static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { @@ -3861,23 +4006,38 @@ void clusterBroadcastPong(int target) { * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false * positive in this context. */ VALKEY_NO_SANITIZE("bounds") -clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) { +clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, int is_light, int is_sharded) { uint32_t channel_len, message_len; + uint16_t type = is_sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD : CLUSTERMSG_TYPE_PUBLISH; channel = getDecodedObject(channel); message = getDecodedObject(message); channel_len = sdslen(channel->ptr); message_len = sdslen(message->ptr); + size_t msglen; - size_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData); + if (is_light) { + /* We set the MSB for message that needs to sent using light header */ + type |= CLUSTERMSG_LIGHT; + msglen = sizeof(clusterMsgLight); + } else { + msglen = sizeof(clusterMsg); + } + msglen -= sizeof(union clusterMsgData); msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len; clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen); - - clusterMsg *hdr = &msgblock->msg; - hdr->data.publish.msg.channel_len = htonl(channel_len); - hdr->data.publish.msg.message_len = htonl(message_len); - memcpy(hdr->data.publish.msg.bulk_data, channel->ptr, sdslen(channel->ptr)); - memcpy(hdr->data.publish.msg.bulk_data + sdslen(channel->ptr), message->ptr, sdslen(message->ptr)); + clusterMsgDataPublish *hdr_data_msg; + if (is_light) { + clusterMsgLight *hdr_light = &msgblock->msg_light; + hdr_data_msg = &hdr_light->data.publish.msg; + } else { + clusterMsg *hdr = &msgblock->msg; + hdr_data_msg = &hdr->data.publish.msg; + } + hdr_data_msg->channel_len = htonl(channel_len); + hdr_data_msg->message_len = htonl(message_len); + memcpy(hdr_data_msg->bulk_data, channel->ptr, sdslen(channel->ptr)); + memcpy(hdr_data_msg->bulk_data + sdslen(channel->ptr), message->ptr, sdslen(message->ptr)); decrRefCount(channel); decrRefCount(message); @@ -3979,27 +4139,32 @@ int clusterSendModuleMessageToTarget(const char *target, * Publish this message across the slot (primary/replica). * -------------------------------------------------------------------------- */ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { - clusterMsgSendBlock *msgblock; - - if (!sharded) { - msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH); - clusterBroadcastMessage(msgblock); - clusterMsgSendBlockDecrRefCount(msgblock); - return; + clusterMsgSendBlock *msgblock, *msgblock_light; + msgblock_light = clusterCreatePublishMsgBlock(channel, message, 1, sharded); + /* We will only create msgblock with normal hdr if there are any nodes that do not support light hdr */ + msgblock = NULL; + ClusterNodeIterator iter; + if (sharded) { + clusterNodeIterInitMyShard(&iter); + } else { + clusterNodeIterInitAllNodes(&iter); } - listIter li; - listNode *ln; - list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself); - serverAssert(nodes_for_slot != NULL); - listRewind(nodes_for_slot, &li); - msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); - while ((ln = listNext(&li))) { - clusterNode *node = listNodeValue(ln); + clusterNode *node; + while ((node = clusterNodeIterNext(&iter)) != NULL) { if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue; - clusterSendMessage(node->link, msgblock); + if (nodeSupportsLightMsgHdr(node)) { + clusterSendMessage(node->link, msgblock_light); + } else { + if (msgblock == NULL) { + msgblock = clusterCreatePublishMsgBlock(channel, message, 0, sharded); + } + clusterSendMessage(node->link, msgblock); + } } - clusterMsgSendBlockDecrRefCount(msgblock); + clusterNodeIterReset(&iter); + if (msgblock != NULL) clusterMsgSendBlockDecrRefCount(msgblock); + clusterMsgSendBlockDecrRefCount(msgblock_light); } /* ----------------------------------------------------------------------------- diff --git a/src/cluster_legacy.h b/src/cluster_legacy.h index 09146e5dc7..0e8c0f5ff8 100644 --- a/src/cluster_legacy.h +++ b/src/cluster_legacy.h @@ -50,8 +50,9 @@ typedef struct clusterLink { #define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */ #define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */ #define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */ -#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* replica will not try to failover. */ +#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */ #define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */ +#define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */ #define CLUSTER_NODE_NULL_NAME \ "\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \ "\000\000\000\000\000\000\000\000\000\000\000\000" @@ -64,6 +65,7 @@ typedef struct clusterLink { #define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL) #define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER) #define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED) +#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) /* This structure represent elements of node->fail_reports. */ typedef struct clusterNodeFailReport { @@ -92,6 +94,13 @@ typedef struct clusterNodeFailReport { #define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */ #define CLUSTERMSG_TYPE_COUNT 11 /* Total number of message types. */ +#define CLUSTERMSG_LIGHT 0x8000 /* Modifier bit for message types that support light header */ + +#define CLUSTERMSG_MODIFIER_MASK (CLUSTERMSG_LIGHT) /* Modifier mask for header types. (if we add more in the future) */ + +/* We check for the modifier bit to determine if the message is sent using light header.*/ +#define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT) + /* Initially we don't know our "name", but we'll find it once we connect * to the first node, using the getsockname() function. Then we'll use this * address for all the next messages. */ @@ -289,6 +298,26 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset"); primary is up. */ #define CLUSTERMSG_FLAG0_EXT_DATA (1 << 2) /* Message contains extension data */ +typedef struct { + char sig[4]; /* Signature "RCmb" (Cluster message bus). */ + uint32_t totlen; /* Total length of this message */ + uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */ + uint16_t notused1; + uint16_t type; /* Message type */ + uint16_t notused2; + union clusterMsgData data; +} clusterMsgLight; + +static_assert(offsetof(clusterMsgLight, sig) == offsetof(clusterMsg, sig), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, notused1) == offsetof(clusterMsg, port), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, notused2) == offsetof(clusterMsg, count), "unexpected field offset"); +static_assert(offsetof(clusterMsgLight, data) == 16, "unexpected field offset"); + +#define CLUSTERMSG_LIGHT_MIN_LEN (sizeof(clusterMsgLight) - sizeof(union clusterMsgData)) + struct _clusterNode { mstime_t ctime; /* Node object creation time. */ char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */