Skip to content

Commit

Permalink
Rebase based on unstable branch.
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Lipeng Zhu <[email protected]>
Co-authored-by: Wangyang Guo <[email protected]>
  • Loading branch information
lipzhu and guowangy committed Jul 30, 2024
1 parent b4d96ca commit 162c183
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 16 deletions.
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ else
LIBCRYPTO_LIBS=-lcrypto
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_LIBURING
FINAL_LIBS+= -luring
endif
endif

BUILD_NO:=0
BUILD_YES:=1
BUILD_MODULE:=2
Expand Down Expand Up @@ -423,7 +434,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3106,6 +3106,7 @@ standardConfig static_configs[] = {
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
91 changes: 91 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include "io_uring.h"

#ifdef HAVE_LIBURING
#include <liburing.h>
#include <string.h>
#include "zmalloc.h"

/* io_uring instance queue depth. */
#define IO_URING_DEPTH 256

static size_t io_uring_write_queue_len = 0;
static struct io_uring *_io_uring;

/* Initialize io_uring at server startup if io_uring enabled,
* setup io_uring submission and completion. */
int initIOUring(void) {
struct io_uring_params params;
struct io_uring *ring = zmalloc(sizeof(struct io_uring));
memset(&params, 0, sizeof(params));
/* On success, io_uring_queue_init_params(3) returns 0 and ring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
if (io_uring_queue_init_params(IO_URING_DEPTH, ring, &params) < 0) return IO_URING_ERR;
_io_uring = ring;
return IO_URING_OK;
}

/* Use io_uring to handle the client write request. */
int ioUringPrepWrite(void *c, int fd, const void *buf, size_t len) {
struct io_uring_sqe *sqe = io_uring_get_sqe(_io_uring);
if (sqe == NULL) return IO_URING_ERR;
io_uring_prep_send(sqe, fd, buf, len, MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, c);
io_uring_write_queue_len++;
return IO_URING_OK;
}

/* Submit requests to the submission queue and wait for completion. */
int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_hanlder) {
/* An optimization for connWrite: batch submit the write(3). */
if (io_uring_submit(_io_uring) < 0) return IO_URING_ERR;
/* Wait for all submitted queue entries complete. */
while (io_uring_write_queue_len) {
struct io_uring_cqe *cqe;
int ret = io_uring_wait_cqe(_io_uring, &cqe);
if (ret == 0) {
if (cqe_hanlder) {
void *client = io_uring_cqe_get_data(cqe);
cqe_hanlder(client, cqe->res);
}
io_uring_cqe_seen(_io_uring, cqe);
io_uring_write_queue_len--;
} else {
return IO_URING_ERR;
}
}
return IO_URING_OK;
}

/* Free io_uring. */
void freeIOUring(void) {
io_uring_queue_exit(_io_uring);
zfree(_io_uring);
_io_uring = NULL;
}
#else
#ifndef UNUSED
#define UNUSED(V) ((void)V)
#endif

int initIOUring(void) {
return 0;
}

int ioUringPrepWrite(void *c, int fd, const void *buf, size_t len) {
UNUSED(c);
UNUSED(fd);
UNUSED(buf);
UNUSED(len);
return 0;
}

int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_hanlder) {
UNUSED(cqe_handler);
return 0;
}

void freeIOUring(void) {
UNUSED(ring_context);
}
#endif
15 changes: 15 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef IO_URING_H
#define IO_URING_H
#include <stddef.h>

#define IO_URING_OK 0
#define IO_URING_ERR -1

typedef void (*io_uring_cqe_handler)(void *, int);

int initIOUring(void);
int ioUringPrepWrite(void *client, int fd, const void *buf, size_t len);
int ioUringWaitWriteBarrier(io_uring_cqe_handler cqe_hanlder);
void freeIOUring(void);

#endif /* IO_URING_H */
132 changes: 118 additions & 14 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "fmtargs.h"
#include <strings.h>
#include "io_threads.h"
#include "io_uring.h"
#include <sys/socket.h>
#include <sys/uio.h>
#include <math.h>
Expand Down Expand Up @@ -2424,6 +2425,82 @@ int processIOThreadsWriteDone(void) {
return processed;
}

/* If client is suitable to use io_uring to handle the write request. */
static inline int _canWriteUsingIOUring(client *c) {
if (server.io_uring_enabled && server.io_threads_num == 1) {
/* Currently, we only use io_uring to handle the static buffer write requests. */
return getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 && c->bufpos > 0;
}
return 0;
}

/* Check the completed io_uring event and update the state. */
static int _checkPendingIOUringWriteState(client *c) {
/* Note that where synchronous system calls will return -1 on
* failure and set errno to the actual error value,
* io_uring never uses errno. Instead it returns the negated
* errno directly in the CQE res field. */
if (c->nwritten <= 0) {
if (c->nwritten != -EAGAIN) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return IO_URING_ERR;
}

c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
atomic_fetch_add_explicit(&server.stat_net_output_bytes, c->nwritten, memory_order_relaxed);
c->net_output_bytes += c->nwritten;

/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!c->flag.primary) c->last_interaction = server.unixtime;

return IO_URING_OK;
}

static void _postIOUringWrite(void) {
listIter li;
listNode *ln;
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flag.pending_io_uring_write = 0;
listUnlinkNode(server.clients_pending_write, ln);

if (_checkPendingIOUringWriteState(c) == IO_URING_ERR) continue;
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Close connection after entire reply has been sent. */
if (c->flag.close_after_reply) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.*/
updateClientMemUsageAndBucket(c);
}
}

void setClientLastWritten(void *data, int cqeRes) {
client *c = data;
c->nwritten = cqeRes;
}

/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
Expand All @@ -2443,34 +2520,61 @@ int handleClientsWithPendingWrites(void) {
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flag.pending_write = 0;
listUnlinkNode(server.clients_pending_write, ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flag.protected) continue;
if (c->flag.protected) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* Don't write to clients that are going to be closed anyway. */
if (c->flag.close_asap) continue;
if (c->flag.close_asap) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

if (!clientHasPendingReplies(c)) continue;
if (!clientHasPendingReplies(c)) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) continue;
if (trySendWriteToIOThreads(c) == C_OK) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

/* We can't write to the client while IO operation is in progress. */
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) continue;
if (c->io_write_state != CLIENT_IDLE || c->io_read_state != CLIENT_IDLE) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}

processed++;

/* Try to write buffers to the client socket. */
if (writeToClient(c) == C_ERR) continue;

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
if (_canWriteUsingIOUring(c)) {
c->flag.pending_io_uring_write = 1;
if (ioUringPrepWrite(c, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen) == IO_URING_ERR) {
listUnlinkNode(server.clients_pending_write, ln);
continue;
}
} else {
listUnlinkNode(server.clients_pending_write, ln);
/* Try to write buffers to the client socket. */
if (writeToClient(c) == C_ERR) continue;

/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
installClientWriteHandler(c);
}
}
}

if (server.io_uring_enabled && server.io_threads_num == 1 && listLength(server.clients_pending_write) > 0) {
ioUringWaitWriteBarrier(setClientLastWritten);
_postIOUringWrite();
}
return processed;
}

Expand Down
8 changes: 8 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "io_uring.h"

#include <time.h>
#include <signal.h>
Expand Down Expand Up @@ -2821,6 +2822,12 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initIOThreads();
if (server.io_uring_enabled) {
if (initIOUring() == IO_URING_ERR) {
serverLog(LL_WARNING, "Failed to initialize io_uring.");
exit(1);
}
}
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -6968,6 +6975,7 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
if (server.io_uring_enabled) freeIOUring();
return 0;
}

Expand Down
4 changes: 3 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,8 @@ typedef struct ClientFlags {
* By using this flag, we ensure that the RDB client remains intact until the replica
* \ has successfully initiated PSYNC. */
uint64_t repl_rdb_channel : 1; /* Dual channel replication sync: track a connection which is used for rdb snapshot */
uint64_t reserved : 7; /* Reserved for future use */
uint64_t pending_io_uring_write : 1; /* Client has output to send using io_uring. */
uint64_t reserved : 6; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -2219,6 +2220,7 @@ struct valkeyServer {
sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */
/* Local environment */
char *locale_collate;
int io_uring_enabled; /* If io_uring is enabled */
};

#define MAX_KEYS_BUFFER 256
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ start_server {tags {"introspection"}} {
req-res-logfile
client-default-resp
dual-channel-replication-enabled
io-uring-enabled
}

if {!$::tls} {
Expand Down
6 changes: 6 additions & 0 deletions valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2360,3 +2360,9 @@ jemalloc-bg-thread yes
# we may also use this when making decisions for replication.
#
# availability-zone "zone-name"

# If Valkey is compiled with io_uring support and liburing is installed in the
# system, then io_uring can be enabled with this config. The io_uring kernel
# interface was adopted in Linux kernel version 5.1.
#
# io-uring-enabled no

0 comments on commit 162c183

Please sign in to comment.