Skip to content

Commit

Permalink
Implement REPLCONF VERSION (#554)
Browse files Browse the repository at this point in the history
The replica sends its version when initiating replication, in
pipeline with other REPLCONF commands.

The primary stores it in the client struct. Other fields are made
smaller to avoid making the client struct consume more memory.

Fixes #414.

---------

Signed-off-by: Viktor Söderqvist <[email protected]>
  • Loading branch information
zuiderkwast authored May 27, 2024
1 parent e4ead94 commit 045d475
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 12 deletions.
1 change: 1 addition & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ client *createClient(connection *conn) {
c->repl_last_partial_write = 0;
c->slave_listening_port = 0;
c->slave_addr = NULL;
c->replica_version = 0;
c->slave_capa = SLAVE_CAPA_NONE;
c->slave_req = SLAVE_REQ_NONE;
c->reply = listCreate();
Expand Down
34 changes: 33 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,10 @@ void syncCommand(client *c) {
* - rdb-filter-only <include-filters>
* Define "include" filters for the RDB snapshot. Currently we only support
* a single include filter: "functions". Passing an empty string "" will
* result in an empty RDB. */
* result in an empty RDB.
*
* - version <major.minor.patch>
* The replica reports its version. */
void replconfCommand(client *c) {
int j;

Expand Down Expand Up @@ -1225,6 +1228,15 @@ void replconfCommand(client *c) {
}
}
sdsfreesplitres(filters, filter_count);
} else if (!strcasecmp(c->argv[j]->ptr, "version")) {
/* REPLCONF VERSION x.y.z */
int version = version2num(c->argv[j + 1]->ptr);
if (version >= 0) {
c->replica_version = version;
} else {
addReplyErrorFormat(c, "Unrecognized version format: %s", (char *)c->argv[j + 1]->ptr);
return;
}
} else {
addReplyErrorFormat(c, "Unrecognized REPLCONF option: %s", (char *)c->argv[j]->ptr);
return;
Expand Down Expand Up @@ -2623,6 +2635,10 @@ void syncWithMaster(connection *conn) {
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;

server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
Expand Down Expand Up @@ -2696,6 +2712,22 @@ void syncWithMaster(connection *conn) {
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY;
}

/* Receive VERSION reply. */
if (server.repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF VERSION: %s",
err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}

Expand Down
22 changes: 12 additions & 10 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,15 @@ typedef enum {
REPL_STATE_CONNECT, /* Must connect to master */
REPL_STATE_CONNECTING, /* Connecting to master */
/* --- Handshake states, must be ordered --- */
REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */
REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */
REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */
REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequence to master */
REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
REPL_STATE_TRANSFER, /* Receiving .rdb from master */
REPL_STATE_CONNECTED, /* Connected to master */
Expand Down Expand Up @@ -1258,8 +1259,9 @@ typedef struct client {
char replid[CONFIG_RUN_ID_SIZE + 1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
int replica_version; /* Version on the form 0xMMmmpp. */
short slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
short slave_req; /* Slave requirements: SLAVE_REQ_* */
multiState mstate; /* MULTI/EXEC state */
blockingState bstate; /* blocking state */
long long woff; /* Last write global replication offset. */
Expand Down
3 changes: 2 additions & 1 deletion src/unit/test_files.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ int test_string2l(int argc, char **argv, int flags);
int test_ll2string(int argc, char **argv, int flags);
int test_ld2string(int argc, char **argv, int flags);
int test_fixedpoint_d2string(int argc, char **argv, int flags);
int test_version2num(int argc, char **argv, int flags);
int test_reclaimFilePageCache(int argc, char **argv, int flags);
int test_ziplistCreateIntList(int argc, char **argv, int flags);
int test_ziplistPop(int argc, char **argv, int flags);
Expand Down Expand Up @@ -77,7 +78,7 @@ unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEnco
unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreIteratorRemoveAllKeysDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysNoDeleteEmptyDict}, {"test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict", test_kvstoreDictIteratorRemoveAllKeysDeleteEmptyDict}, {NULL, NULL}};
unitTest __test_sds_c[] = {{"test_sds", test_sds}, {NULL, NULL}};
unitTest __test_sha1_c[] = {{"test_sha1", test_sha1}, {NULL, NULL}};
unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}};
unitTest __test_util_c[] = {{"test_string2ll", test_string2ll}, {"test_string2l", test_string2l}, {"test_ll2string", test_ll2string}, {"test_ld2string", test_ld2string}, {"test_fixedpoint_d2string", test_fixedpoint_d2string}, {"test_version2num", test_version2num}, {"test_reclaimFilePageCache", test_reclaimFilePageCache}, {NULL, NULL}};
unitTest __test_ziplist_c[] = {{"test_ziplistCreateIntList", test_ziplistCreateIntList}, {"test_ziplistPop", test_ziplistPop}, {"test_ziplistGetElementAtIndex3", test_ziplistGetElementAtIndex3}, {"test_ziplistGetElementOutOfRange", test_ziplistGetElementOutOfRange}, {"test_ziplistGetLastElement", test_ziplistGetLastElement}, {"test_ziplistGetFirstElement", test_ziplistGetFirstElement}, {"test_ziplistGetElementOutOfRangeReverse", test_ziplistGetElementOutOfRangeReverse}, {"test_ziplistIterateThroughFullList", test_ziplistIterateThroughFullList}, {"test_ziplistIterateThroughListFrom1ToEnd", test_ziplistIterateThroughListFrom1ToEnd}, {"test_ziplistIterateThroughListFrom2ToEnd", test_ziplistIterateThroughListFrom2ToEnd}, {"test_ziplistIterateThroughStartOutOfRange", test_ziplistIterateThroughStartOutOfRange}, {"test_ziplistIterateBackToFront", test_ziplistIterateBackToFront}, {"test_ziplistIterateBackToFrontDeletingAllItems", test_ziplistIterateBackToFrontDeletingAllItems}, {"test_ziplistDeleteInclusiveRange0To0", test_ziplistDeleteInclusiveRange0To0}, {"test_ziplistDeleteInclusiveRange0To1", test_ziplistDeleteInclusiveRange0To1}, {"test_ziplistDeleteInclusiveRange1To2", test_ziplistDeleteInclusiveRange1To2}, {"test_ziplistDeleteWithStartIndexOutOfRange", test_ziplistDeleteWithStartIndexOutOfRange}, {"test_ziplistDeleteWithNumOverflow", test_ziplistDeleteWithNumOverflow}, {"test_ziplistDeleteFooWhileIterating", test_ziplistDeleteFooWhileIterating}, {"test_ziplistReplaceWithSameSize", test_ziplistReplaceWithSameSize}, {"test_ziplistReplaceWithDifferentSize", test_ziplistReplaceWithDifferentSize}, {"test_ziplistRegressionTestForOver255ByteStrings", test_ziplistRegressionTestForOver255ByteStrings}, {"test_ziplistRegressionTestDeleteNextToLastEntries", test_ziplistRegressionTestDeleteNextToLastEntries}, {"test_ziplistCreateLongListAndCheckIndices", test_ziplistCreateLongListAndCheckIndices}, {"test_ziplistCompareStringWithZiplistEntries", test_ziplistCompareStringWithZiplistEntries}, {"test_ziplistMergeTest", test_ziplistMergeTest}, {"test_ziplistStressWithRandomPayloadsOfDifferentEncoding", test_ziplistStressWithRandomPayloadsOfDifferentEncoding}, {"test_ziplistCascadeUpdateEdgeCases", test_ziplistCascadeUpdateEdgeCases}, {"test_ziplistInsertEdgeCase", test_ziplistInsertEdgeCase}, {"test_ziplistStressWithVariableSize", test_ziplistStressWithVariableSize}, {"test_BenchmarkziplistFind", test_BenchmarkziplistFind}, {"test_BenchmarkziplistIndex", test_BenchmarkziplistIndex}, {"test_BenchmarkziplistValidateIntegrity", test_BenchmarkziplistValidateIntegrity}, {"test_BenchmarkziplistCompareWithString", test_BenchmarkziplistCompareWithString}, {"test_BenchmarkziplistCompareWithNumber", test_BenchmarkziplistCompareWithNumber}, {"test_ziplistStress__ziplistCascadeUpdate", test_ziplistStress__ziplistCascadeUpdate}, {NULL, NULL}};
unitTest __test_zmalloc_c[] = {{"test_zmallocInitialUsedMemory", test_zmallocInitialUsedMemory}, {"test_zmallocAllocReallocCallocAndFree", test_zmallocAllocReallocCallocAndFree}, {"test_zmallocAllocZeroByteAndFree", test_zmallocAllocZeroByteAndFree}, {NULL, NULL}};

Expand Down
15 changes: 15 additions & 0 deletions src/unit/test_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,21 @@ int test_fixedpoint_d2string(int argc, char **argv, int flags) {
return 0;
}

int test_version2num(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
TEST_ASSERT(version2num("7.2.5") == 0x070205);
TEST_ASSERT(version2num("255.255.255") == 0xffffff);
TEST_ASSERT(version2num("7.2.256") == -1);
TEST_ASSERT(version2num("7.2") == -1);
TEST_ASSERT(version2num("7.2.1.0") == -1);
TEST_ASSERT(version2num("1.-2.-3") == -1);
TEST_ASSERT(version2num("1.2.3-rc4") == -1);
TEST_ASSERT(version2num("") == -1);
return 0;
}

#if defined(__linux__)
/* Since fadvise and mincore is only supported in specific platforms like
* Linux, we only verify the fadvise mechanism works in Linux */
Expand Down
23 changes: 23 additions & 0 deletions src/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,29 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
return 0;
}

/* Parses a version string on the form "major.minor.patch" and returns an
* integer on the form 0xMMmmpp. Returns -1 on parse error. */
int version2num(const char *version) {
int v = 0, part = 0, numdots = 0;
const char *p = version;
do {
if (*p >= '0' && *p <= '9') {
part = part * 10 + (unsigned)(*p - '0');
if (part > 255) return -1;
} else if (*p == '.') {
if (++numdots > 2) return -1;
v = (v << 8) | part;
part = 0;
} else {
return -1;
}
p++;
} while (*p);
if (numdots != 2) return -1;
v = (v << 8) | part;
return v;
}

/* Get random bytes, attempts to get an initial seed from /dev/urandom and
* the uses a one way hash function in counter mode to generate a random
* stream. However if /dev/urandom is not available, a weaker seed is used.
Expand Down
1 change: 1 addition & 0 deletions src/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ int d2string(char *buf, size_t len, double value);
int fixedpoint_d2string(char *dst, size_t dstlen, double dvalue, int fractional_digits);
int ld2string(char *buf, size_t len, long double value, ld2string_mode mode);
int double2ll(double d, long long *out);
int version2num(const char *version);
int yesnotoi(char *s);
sds getAbsolutePath(char *filename);
long getTimeZone(void);
Expand Down

0 comments on commit 045d475

Please sign in to comment.