Skip to content

Commit

Permalink
Sentinel fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Jan 6, 2025
1 parent a961435 commit 918c911
Showing 1 changed file with 64 additions and 53 deletions.
117 changes: 64 additions & 53 deletions src/redisx-sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,71 +33,23 @@ static int rTryConnectSentinel(Redis *redis, int serverIndex) {
char desc[80];
int status;

redisxDisconnect(redis);

sprintf(desc, "sentinel server %d", serverIndex);
prop_error(fn, rSetServerAsync(redis, desc, server.host, server.port));

status = rConnectClient(redis, REDISX_INTERACTIVE_CHANNEL);
if(status != X_SUCCESS) return status; // No error propagation. It's OK if server is down...

// Move server to the top of the list, so next time we try this one first...
memmove(&s->servers[1], s->servers, serverIndex * sizeof(RedisServer));
s->servers[0] = server;

return X_SUCCESS;
}


/// \cond PRIVATE

/**
* Obtains the current Sentinel master from the given participating node.
*
* @param redis A Redis server instance
* @return X_SUCCESS (0) if successful, or else an error code <0.
*/
int rDiscoverSentinel(Redis *redis) {
static const char *fn = "rConnectSentinel";

RedisPrivate *p = (RedisPrivate *) redis->priv;
const RedisSentinel *s = p->sentinel;
int i, savedTimeout = p->timeoutMillis;

p->timeoutMillis = s->timeoutMillis > 0 ? s->timeoutMillis : REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS;

for(i = 0; i < s->nServers; i++) if(rTryConnectSentinel(redis, i) == X_SUCCESS) {
RESP *reply;
int status;

// Get the name of the master...
reply = redisxRequest(redis, "SENTINEL", "get-master-addr-by-name", s->serviceName, NULL, &status);
rCloseClientAsync(redis->interactive);

if(status) continue;

if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 2) == X_SUCCESS) {
RESP **component = (RESP **) reply->value;
int port = (int) strtol((char *) component[1]->value, NULL, 10);

status = rSetServerAsync(redis, "sentinel master", (char *) component[0]->value, port);

redisxDestroyRESP(reply);
p->timeoutMillis = savedTimeout;

prop_error(fn, status);

// TODO update sentinel server list...

return X_SUCCESS;
}
}

p->timeoutMillis = savedTimeout;
return x_error(X_NO_SERVICE, ENOTCONN, fn, "no Sentinel server available");
}

/**
* Verifies that a given Redis instance is in the master role. User's should always communicate
* to the master, not to replicas.
* Verifies that a given Redis instance is in the master role. It is assumed that we have a live
* interactive connection to the server.
*
* @param redis A redis server instance
* @return X_SUCCESS (0) if the server is confirmed to be the master, or else an error
Expand Down Expand Up @@ -139,11 +91,70 @@ int rConfirmMasterRole(Redis *redis) {
redisxDestroyRESP(reply);

if(status) return x_error(X_FAILURE, EAGAIN, fn, "Replica is not master");

return X_SUCCESS;
}

/// \endcond
/**
* Obtains the current Sentinel master from the given participating node.
*
* @param redis A Redis server instance
* @return X_SUCCESS (0) if successful, or else an error code &lt;0.
*/
int rDiscoverSentinel(Redis *redis) {
static const char *fn = "rConnectSentinel";

RedisPrivate *p = (RedisPrivate *) redis->priv;
const RedisSentinel *s = p->sentinel;
int i, savedTimeout = p->timeoutMillis;

p->timeoutMillis = s->timeoutMillis > 0 ? s->timeoutMillis : REDISX_DEFAULT_SENTINEL_TIMEOUT_MILLIS;

for(i = 0; i < s->nServers; i++) if(rTryConnectSentinel(redis, i) == X_SUCCESS) {
RedisServer server = s->servers[i]; // A local copy of the server data
RESP *reply;
int status;

if(i > 0) {
// Move to the top of the list, so next time we try this one first...
memmove(&s->servers[1], s->servers, i * sizeof(RedisServer));
s->servers[0] = server;
}

// Check if this is master...
if(rConfirmMasterRole(redis)) goto success; // @suppress("Goto statement used")

// Get the name of the master...
reply = redisxRequest(redis, "SENTINEL", "get-master-addr-by-name", s->serviceName, NULL, &status);
if(status) continue;

if(redisxCheckDestroyRESP(reply, RESP_ARRAY, 2) == X_SUCCESS) {
RESP **component = (RESP **) reply->value;
int port = (int) strtol((char *) component[1]->value, NULL, 10);

status = rSetServerAsync(redis, "sentinel master", (char *) component[0]->value, port);

redisxDestroyRESP(reply);
prop_error(fn, status);

// TODO update sentinel server list?...

goto success; // @suppress("Goto statement used")
}
}

p->timeoutMillis = savedTimeout;
return x_error(X_NO_SERVICE, ENOTCONN, fn, "no Sentinel server available");

// --------------------------------------------------------------------------------------------------
success:

p->timeoutMillis = savedTimeout;
return X_SUCCESS;

}

/// \endcond

/**
* Validates a Sentinel configuration.
Expand Down

0 comments on commit 918c911

Please sign in to comment.