Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preferred nodes #139

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions corvus.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,30 @@ syslog 0
#
# * `master`, forward all reading commands to master, the default
# * `read-slave-only`, forward all reading commands to slaves
# * `read-preferred-only` forward all reading commands to a slave or master based on the preferred_node list
# * `both`, forward reading commands to both master and slaves
#
# If new slaves are added to the cluster, `PROXY UPDATESLOTMAP` should be emmited
# to tell corvus to use the newly added slaves.
#
# read-strategy master

#
# When the `read-preferred-only` strategy is configured, the nodes from the `preferred_nodes` list
# will be preferred to any other node for a given slot.
# This is useful if a cluster is distributed geographically. It can be preferable to query the local nodes
# If the cluster shows a preferred node as disconnected, it won't be used but the cluster status will be
# polled until all preferred nodes are available again.
#
# preferred_nodes localhost:8003,localhost:8004

#
# When `read-preferred` read-strategy is used and the cluster shows a preferred node to be disconnected,
# an active polling of the cluster configuration is started to determine as soon as possible when the
# preferred node is back online so that it can be used again.
#
polling_interval 30

# Slowlog
# The following two configs are almost the same with redis.
# Every command whose lantency exceeds `slowlog-log-slower-than` will be considered a slow command,
Expand Down
77 changes: 73 additions & 4 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const char * CONFIG_OPTIONS[] = {
"metric_interval",
"stats",
"read-strategy",
"preferred_nodes",
"polling_interval",
"requirepass",
"client_timeout",
"server_timeout",
Expand All @@ -48,6 +50,9 @@ void config_init()
config.bind = 12345;
config.node = cv_calloc(1, sizeof(struct node_conf));
config.node->refcount = 1;
config.preferred_node = cv_calloc(1, sizeof(struct node_conf));
config.preferred_node->refcount = 1;
config.polling_interval = 10;
config.thread = DEFAULT_THREAD;
config.loglevel = INFO;
config.syslog = 0;
Expand All @@ -56,7 +61,7 @@ void config_init()
config.server_timeout = 0;
config.bufsize = DEFAULT_BUFSIZE;
config.requirepass = NULL;
config.readslave = config.readmasterslave = false;
config.readslave = config.readmasterslave = config.readpreferred = false;
config.slowlog_max_len = 1024;
config.slowlog_log_slower_than = -1;
config.slowlog_statsd_enabled = 0;
Expand All @@ -68,6 +73,7 @@ void config_init()
void config_free()
{
config_node_dec_ref(config.node);
config_node_dec_ref(config.preferred_node);
pthread_mutex_destroy(&lock_conf_node);
pthread_mutex_destroy(&lock_config_rewrite);
}
Expand Down Expand Up @@ -106,6 +112,25 @@ void config_set_node(struct node_conf *node)
config_node_dec_ref(oldnode);
}

struct node_conf *config_get_preferred_node()
{
pthread_mutex_lock(&lock_conf_node);
struct node_conf *node = config.preferred_node;
int refcount = ATOMIC_INC(node->refcount, 1);
pthread_mutex_unlock(&lock_conf_node);
assert(refcount >= 1);
return node;
}

void config_set_preferred_node(struct node_conf *node)
{
pthread_mutex_lock(&lock_conf_node);
struct node_conf *oldnode = config.preferred_node;
config.preferred_node = node;
pthread_mutex_unlock(&lock_conf_node);
config_node_dec_ref(oldnode);
}

void config_node_dec_ref(struct node_conf *node)
{
int refcount = ATOMIC_DEC(node->refcount, 1);
Expand Down Expand Up @@ -134,6 +159,24 @@ void config_node_to_str(char *str, size_t max_len)
config_node_dec_ref(nodes);
}

void config_preferred_node_to_str(char *str, size_t max_len)
{
struct node_conf *nodes = config_get_preferred_node();
char buf[ADDRESS_LEN + 1];
for (size_t i = 0; i != nodes->len; i++) {
struct address *addr = &nodes->addr[i];
size_t len = snprintf(buf, max_len, "%s:%u",
addr->ip, addr->port);
size_t comma_len = i > 0 ? 1 : 0;
if (len + comma_len > max_len) break;
if (comma_len) *str++ = ',';
strcpy(str, buf);
str += len;
max_len -= len + comma_len;
}
config_node_dec_ref(nodes);
}

int parse_int(char *s, int *result)
{
const int max = 100000000;
Expand Down Expand Up @@ -188,11 +231,15 @@ int config_add(char *name, char *value)
config.readslave = true;
}
} else if (strcmp(name, "read-strategy") == 0) {
config.readpreferred = false;
if (strcmp(value, "read-slave-only") == 0) {
config.readmasterslave = false;
config.readslave = true;
} else if (strcmp(value, "both") == 0) {
config.readmasterslave = config.readslave = true;
} else if (strcmp(value, "read-preferred") == 0) {
config.readmasterslave = config.readslave = false;
config.readpreferred = true;
} else {
config.readmasterslave = config.readslave = false;
}
Expand Down Expand Up @@ -224,6 +271,9 @@ int config_add(char *name, char *value)
} else if (strcmp(name, "metric_interval") == 0) {
TRY_PARSE_INT();
config.metric_interval = val > 0 ? val : 10;
} else if (strcmp(name, "polling_interval") == 0) {
TRY_PARSE_INT();
config.polling_interval = val > 0 ? val : 10;
} else if (strcmp(name, "loglevel") == 0) {
if (strcasecmp(value, "debug") == 0) {
ATOMIC_SET(config.loglevel, DEBUG);
Expand All @@ -247,7 +297,7 @@ int config_add(char *name, char *value)
config.requirepass = cv_calloc(strlen(value) + 1, sizeof(char));
memcpy(config.requirepass, value, strlen(value));
}
} else if (strcmp(name, "node") == 0) {
} else if (strcmp(name, "node") == 0 || strcmp(name, "preferred_nodes") == 0) {
// strtok will modify `value` to tokenize it.
// Copy it first in case value is a string literal
char buf[strlen(value) + 1];
Expand All @@ -266,14 +316,18 @@ int config_add(char *name, char *value)
p = strtok(NULL, ",");
}
if (addr_cnt == 0) {
LOG(WARN, "received empty node value in config set");
LOG(WARN, "received empty %s value in config set", name);
return CORVUS_ERR;
}
struct node_conf *newnode = cv_malloc(sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
newnode->refcount = 1;
config_set_node(newnode);
if (strcmp(name, "node") == 0) {
config_set_node(newnode);
} else {
config_set_preferred_node(newnode);
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
TRY_PARSE_INT();
ATOMIC_SET(config.slowlog_log_slower_than, val);
Expand All @@ -297,6 +351,8 @@ int config_get(const char *name, char *value, size_t max_len)
snprintf(value, max_len, "%u", config.bind);
} else if (strcmp(name, "node") == 0) {
config_node_to_str(value, max_len);
} else if (strcmp(name, "preferred_nodes") == 0) {
config_preferred_node_to_str(value, max_len);
} else if (strcmp(name, "thread") == 0) {
snprintf(value, max_len, "%d", config.thread);
} else if (strcmp(name, "loglevel") == 0) {
Expand Down Expand Up @@ -335,6 +391,8 @@ int config_get(const char *name, char *value, size_t max_len)
snprintf(value, max_len, "%d", config.slowlog_max_len);
} else if (strcmp(name, "slowlog-statsd-enabled") == 0) {
strncpy(value, BOOL_STR(config.slowlog_statsd_enabled), max_len);
} else if (strcmp(name, "polling_interval") == 0) {
snprintf(value, max_len, "%d", config.polling_interval);
} else {
return CORVUS_ERR;
}
Expand Down Expand Up @@ -612,3 +670,14 @@ bool config_option_changable(const char *option)
}
return false;
}

bool config_is_preferred_node(struct address *node)
{
struct node_conf *preferred_node = config_get_preferred_node();
for (size_t i = 0; i < preferred_node->len; i++) {
if (socket_cmp(node, &preferred_node->addr[i]) == 0) {
return true;
}
}
return false;
}
5 changes: 5 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct corvus_config {
char cluster[CLUSTER_NAME_SIZE + 1];
uint16_t bind;
struct node_conf *node;
struct node_conf *preferred_node; /* List of nodes that should be set a higher priority */
int thread;
int loglevel;
bool syslog;
Expand All @@ -25,6 +26,8 @@ struct corvus_config {
bool stats;
bool readslave;
bool readmasterslave;
bool readpreferred;
uint16_t polling_interval; /* Intervall to use when polling for cluster configuration */
char *requirepass;
int64_t client_timeout;
int64_t server_timeout;
Expand All @@ -46,5 +49,7 @@ void config_set_node(struct node_conf *node);
void config_node_dec_ref(struct node_conf *node);
int config_add(char *name, char *value);
bool config_option_changable(const char *option);
struct node_conf *config_get_preferred_node();
bool config_is_preferred_node(struct address *node);

#endif /* end of include guard: CONFIG_H */
11 changes: 9 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,21 @@ struct connection *conn_get_server(struct context *ctx, uint16_t slot,
bool readonly = false;

if (slot_get_node_addr(slot, &info)) {
addr = &info.nodes[0];
addr = &info.nodes[0].addr;
if (access != CMD_ACCESS_WRITE && config.readslave && info.index > 1) {
int r = rand_r(&ctx->seed);
if (!config.readmasterslave || r % info.index != 0) {
int i = r % (info.index - 1);
addr = &info.nodes[++i];
addr = &info.nodes[++i].addr;
readonly = true;
}
} else if (access != CMD_ACCESS_WRITE && config.readpreferred) {
/*
* If readpreferred is set and we're not running a 'write' command
* use the first node available that's in the preferred list.
*/
addr = &info.preferred_nodes[0].addr;
readonly = true;
}
if (addr->port > 0) {
return conn_get_server_from_pool(ctx, addr, readonly);
Expand Down
10 changes: 8 additions & 2 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,15 @@ int server_read(struct connection *server)
cmd->asking = 0;
continue;
case CORVUS_READONLY:
LOG(DEBUG, "recv readonly");
if (cmd->reply_type == REP_ERROR) {
LOG(ERROR, "recv invalid readonly reponse");
info->readonly = true;
} else {
LOG(DEBUG, "recv readonly");
}

mbuf_range_clear(cmd->ctx, cmd->rep_buf);
server->info->readonly_sent = false;
info->readonly_sent = false;
continue;
case CORVUS_OK:
STAILQ_REMOVE_HEAD(&info->waiting_queue, waiting_next);
Expand Down
Loading