diff --git a/src/networking.c b/src/networking.c index 121931a111..7054ffc126 100644 --- a/src/networking.c +++ b/src/networking.c @@ -177,6 +177,7 @@ client *createClient(connection *conn) { c->repl_last_partial_write = 0; c->slave_listening_port = 0; c->slave_addr = NULL; + c->replica_version = 0; c->slave_capa = SLAVE_CAPA_NONE; c->slave_req = SLAVE_REQ_NONE; c->reply = listCreate(); diff --git a/src/replication.c b/src/replication.c index e2612e75b5..069d60a678 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1122,7 +1122,10 @@ void syncCommand(client *c) { * - rdb-filter-only * Define "include" filters for the RDB snapshot. Currently we only support * a single include filter: "functions". Passing an empty string "" will - * result in an empty RDB. */ + * result in an empty RDB. + * + * - version + * The replica reports its version. */ void replconfCommand(client *c) { int j; @@ -1225,6 +1228,15 @@ void replconfCommand(client *c) { } } sdsfreesplitres(filters, filter_count); + } else if (!strcasecmp(c->argv[j]->ptr, "version")) { + /* REPLCONF VERSION x.y.z */ + int version = version2num(c->argv[j + 1]->ptr); + if (version >= 0) { + c->replica_version = version; + } else { + addReplyErrorFormat(c, "Unrecognized version format: %s", (char *)c->argv[j + 1]->ptr); + return; + } } else { addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr); return; @@ -2623,6 +2635,10 @@ void syncWithMaster(connection *conn) { err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", NULL); if (err) goto write_error; + /* Inform the primary of our (replica) version. */ + err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL); + if (err) goto write_error; + server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; return; } @@ -2696,6 +2712,22 @@ void syncWithMaster(connection *conn) { } sdsfree(err); err = NULL; + server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY; + } + + /* Receive VERSION reply. */ + if (server.repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) { + err = receiveSynchronousResponse(conn); + if (err == NULL) goto no_response_error; + /* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */ + if (err[0] == '-') { + serverLog(LL_NOTICE, + "(Non critical) Primary does not understand " + "REPLCONF VERSION: %s", + err); + } + sdsfree(err); + err = NULL; server.repl_state = REPL_STATE_SEND_PSYNC; } diff --git a/src/server.h b/src/server.h index 70beb54f43..7011be3033 100644 --- a/src/server.h +++ b/src/server.h @@ -468,14 +468,15 @@ typedef enum { REPL_STATE_CONNECT, /* Must connect to master */ REPL_STATE_CONNECTING, /* Connecting to master */ /* --- Handshake states, must be ordered --- */ - REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ - REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */ - REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ - REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ - REPL_STATE_SEND_PSYNC, /* Send PSYNC */ - REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ + REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ + REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */ + REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ + REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_SEND_PSYNC, /* Send PSYNC */ + REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ /* --- End of handshake states --- */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */ REPL_STATE_CONNECTED, /* Connected to master */ @@ -1258,8 +1259,9 @@ typedef struct client { char replid[CONFIG_RUN_ID_SIZE + 1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char *slave_addr; /* Optionally given by REPLCONF ip-address */ - int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ - int slave_req; /* Slave requirements: SLAVE_REQ_* */ + int replica_version; /* Version on the form 0xMMmmpp. */ + short slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ + short slave_req; /* Slave requirements: SLAVE_REQ_* */ multiState mstate; /* MULTI/EXEC state */ blockingState bstate; /* blocking state */ long long woff; /* Last write global replication offset. */ diff --git a/src/unit/test_files.h b/src/unit/test_files.h index 6af8f4c380..7da3d26473 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -29,6 +29,7 @@ int test_string2l(int argc, char **argv, int flags); int test_ll2string(int argc, char **argv, int flags); int test_ld2string(int argc, char **argv, int flags); int test_fixedpoint_d2string(int argc, char **argv, int flags); +int test_version2num(int argc, char **argv, int flags); int test_reclaimFilePageCache(int argc, char **argv, int flags); int test_ziplistCreateIntList(int argc, char **argv, int flags); int test_ziplistPop(int argc, char **argv, int flags); @@ -77,7 +78,7 @@ unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEnco unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict}, {NULL, NULL}}; unitTest __test_sds_c[] = {{"test_sds", test_sds}, {NULL, NULL}}; unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}}; -unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}}; +unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}}; unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}}; unitTest __test_zmalloc_c[] = {{"test_zmallocInitialUsedMemory", test_zmallocInitialUsedMemory}, {"test_zmallocAllocReallocCallocAndFree", test_zmallocAllocReallocCallocAndFree}, {"test_zmallocAllocZeroByteAndFree", test_zmallocAllocZeroByteAndFree}, {NULL, NULL}}; diff --git a/src/unit/test_util.c b/src/unit/test_util.c index 30f70c8350..ade3d60901 100644 --- a/src/unit/test_util.c +++ b/src/unit/test_util.c @@ -253,6 +253,21 @@ int test_fixedpoint_d2string(int argc, char **argv, int flags) { return 0; } +int test_version2num(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + TEST_ASSERT(version2num("7.2.5") == 0x070205); + TEST_ASSERT(version2num("255.255.255") == 0xffffff); + TEST_ASSERT(version2num("7.2.256") == -1); + TEST_ASSERT(version2num("7.2") == -1); + TEST_ASSERT(version2num("7.2.1.0") == -1); + TEST_ASSERT(version2num("1.-2.-3") == -1); + TEST_ASSERT(version2num("1.2.3-rc4") == -1); + TEST_ASSERT(version2num("") == -1); + return 0; +} + #if defined(__linux__) /* Since fadvise and mincore is only supported in specific platforms like * Linux, we only verify the fadvise mechanism works in Linux */ diff --git a/src/util.c b/src/util.c index c3e494f526..8f5e03614c 100644 --- a/src/util.c +++ b/src/util.c @@ -872,6 +872,29 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) { return 0; } +/* Parses a version string on the form "major.minor.patch" and returns an + * integer on the form 0xMMmmpp. Returns -1 on parse error. */ +int version2num(const char *version) { + int v = 0, part = 0, numdots = 0; + const char *p = version; + do { + if (*p >= '0' && *p <= '9') { + part = part * 10 + (unsigned)(*p - '0'); + if (part > 255) return -1; + } else if (*p == '.') { + if (++numdots > 2) return -1; + v = (v << 8) | part; + part = 0; + } else { + return -1; + } + p++; + } while (*p); + if (numdots != 2) return -1; + v = (v << 8) | part; + return v; +} + /* Get random bytes, attempts to get an initial seed from /dev/urandom and * the uses a one way hash function in counter mode to generate a random * stream. However if /dev/urandom is not available, a weaker seed is used. diff --git a/src/util.h b/src/util.h index 3d19c9c04c..d675f4c6cd 100644 --- a/src/util.h +++ b/src/util.h @@ -78,6 +78,7 @@ int d2string(char *buf, size_t len, double value); int fixedpoint_d2string(char *dst, size_t dstlen, double dvalue, int fractional_digits); int ld2string(char *buf, size_t len, long double value, ld2string_mode mode); int double2ll(double d, long long *out); +int version2num(const char *version); int yesnotoi(char *s); sds getAbsolutePath(char *filename); long getTimeZone(void);