Skip to content

Commit

Permalink
Rebase based on unstable branch
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Lipeng Zhu <[email protected]>
Co-authored-by: Wangyang Guo <[email protected]>
  • Loading branch information
lipzhu and guowangy committed Jul 15, 2024
1 parent 6a5a11f commit e29bd5e
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 3 deletions.
13 changes: 12 additions & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,17 @@ else
LIBCRYPTO_LIBS=-lcrypto
endif

# only Linux has IO_URING support
ifeq ($(uname_S),Linux)
HAS_LIBURING := $(shell sh -c 'echo "$(NUMBER_SIGN_CHAR)include <liburing.h>" > foo.c; \
$(CC) -E foo.c > /dev/null 2>&1 && echo yes; \
rm foo.c')
ifeq ($(HAS_LIBURING),yes)
FINAL_CFLAGS+= -DHAVE_LIBURING
FINAL_LIBS+= -luring
endif
endif

BUILD_NO:=0
BUILD_YES:=1
BUILD_MODULE:=2
Expand Down Expand Up @@ -401,7 +412,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o io_uring.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
Expand Down
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3073,6 +3073,7 @@ standardConfig static_configs[] = {
createBoolConfig("replica-ignore-disk-write-errors", NULL, MODIFIABLE_CONFIG, server.repl_ignore_disk_write_error, 0, NULL, NULL),
createBoolConfig("extended-redis-compatibility", NULL, MODIFIABLE_CONFIG, server.extended_redis_compat, 0, NULL, updateExtendedRedisCompat),
createBoolConfig("enable-debug-assert", NULL, IMMUTABLE_CONFIG | HIDDEN_CONFIG, server.enable_debug_assert, 0, NULL, NULL),
createBoolConfig("io-uring-enabled", NULL, IMMUTABLE_CONFIG, server.io_uring_enabled, 0, NULL, NULL),

/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),
Expand Down
140 changes: 140 additions & 0 deletions src/io_uring.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#include "io_uring.h"

#include <liburing.h>

/* io_uring instance queue depth. */
#define IO_URING_DEPTH 256

static size_t io_uring_queue_len = 0;

void initIOUring(void) {
if (server.io_uring_enabled) {
struct io_uring_params params;
struct io_uring *ring = zmalloc(sizeof(struct io_uring));
memset(&params, 0, sizeof(params));
/* On success, io_uring_queue_init_params(3) returns 0 and ring will
* point to the shared memory containing the io_uring queues.
* On failure -errno is returned. */
int ret = io_uring_queue_init_params(IO_URING_DEPTH, ring, &params);
if (ret != 0) {
/* Warning if user enable the io_uring in config but system doesn't support yet. */
serverLog(LL_WARNING, "System doesn't support io_uring, disable io_uring.");
zfree(ring);
server.io_uring = NULL;
} else {
serverLog(LL_NOTICE, "System support io_uring, enable io_uring.");
server.io_uring = ring;
}
}
}

int canWriteUsingIOUring(client *c) {
if (server.io_uring_enabled && server.io_uring) {
/* Currently, we only use io_uring to handle the static buffer write requests. */
return getClientType(c) != CLIENT_TYPE_REPLICA && listLength(c->reply) == 0 && c->bufpos > 0;
}
return 0;
}

int writeToClientUsingIOUring(client *c) {
c->flag.pending_io_uring_write = 1;
struct io_uring_sqe *sqe = io_uring_get_sqe(server.io_uring);
if (sqe == NULL) return C_ERR;
io_uring_prep_send(sqe, c->conn->fd, c->buf + c->sentlen, c->bufpos - c->sentlen, MSG_DONTWAIT);
io_uring_sqe_set_data(sqe, c);
io_uring_queue_len++;
return C_OK;
}

/* Submit requests to the submission queue and wait for completion. */
static inline void ioUringSubmitAndWaitBarrier(void) {
io_uring_submit(server.io_uring);
/* Wait for all submitted queue entries complete. */
while (io_uring_queue_len) {
struct io_uring_cqe *cqe;
if (io_uring_wait_cqe(server.io_uring, &cqe) == 0) {
client *c = io_uring_cqe_get_data(cqe);
c->nwritten = cqe->res;
io_uring_cqe_seen(server.io_uring, cqe);
io_uring_queue_len--;
} else {
serverPanic("Error waiting io_uring completion queue.");
}
}
}

/* Check the completed io_uring event and update the state. */
int checkPendingIOUringWriteState(client *c) {
/* Note that where synchronous system calls will return -1 on
* failure and set errno to the actual error value,
* io_uring never uses errno. Instead it returns the negated
* errno directly in the CQE res field. */
if (c->nwritten <= 0) {
if (c->nwritten != -EAGAIN) {
c->conn->last_errno = -(c->nwritten);
/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks. */
if (c->nwritten != -EINTR && c->conn->state == CONN_STATE_CONNECTED) c->conn->state = CONN_STATE_ERROR;
}
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
}
return C_ERR;
}

c->sentlen += c->nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
atomic_fetch_add_explicit(&server.stat_net_output_bytes, c->nwritten, memory_order_relaxed);
c->net_output_bytes += c->nwritten;

/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!c->flag.primary) c->last_interaction = server.unixtime;

return C_OK;
}

void submitAndWaitIOUringComplete() {
if (server.io_uring_enabled && server.io_uring && listLength(server.clients_pending_write) > 0) {
ioUringSubmitAndWaitBarrier();
listIter li;
listNode *ln;
/* An optimization for connWrite: batch submit the write(3). */
listRewind(server.clients_pending_write, &li);
while ((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flag.pending_io_uring_write = 0;
listUnlinkNode(server.clients_pending_write, ln);

if (checkPendingIOUringWriteState(c) == C_ERR) continue;
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
/* Close connection after entire reply has been sent. */
if (c->flag.close_after_reply) {
freeClientAsync(c);
continue;
}
}
/* Update client's memory usage after writing.
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
* handleClientsWithPendingWritesUsingThreads(). */
updateClientMemUsageAndBucket(c);
}
}
}

void freeIOUring(void) {
if (server.io_uring_enabled && server.io_uring) {
io_uring_queue_exit(server.io_uring);
zfree(server.io_uring);
server.io_uring = NULL;
}
}
21 changes: 21 additions & 0 deletions src/io_uring.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#ifndef IO_URING_H
#define IO_URING_H

#include "server.h"

/* Initialize io_uring at server startup if have io_uring configured, setup io_uring submission and completion. */
void initIOUring(void);

/* If the client is suitable to use io_uring handle the write request. */
int canWriteUsingIOUring(client *c);

/* Use io_uring to handle the client request, it is always used together with canWriteUsingIOUring(). */
int writeToClientUsingIOUring(client *c);

/* Submit requests to the submission queue and wait for completion. */
void submitAndWaitIOUringComplete(void);

/* Free io_uring. */
void freeIOUring(void);

#endif /* IO_URING_H */
3 changes: 3 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_threads.h"
#include "io_uring.h"

#include <time.h>
#include <signal.h>
Expand Down Expand Up @@ -2810,6 +2811,7 @@ void initListeners(void) {
void InitServerLast(void) {
bioInit();
initIOThreads();
initIOUring();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
Expand Down Expand Up @@ -6984,6 +6986,7 @@ int main(int argc, char **argv) {

aeMain(server.el);
aeDeleteEventLoop(server.el);
freeIOUring();
return 0;
}

Expand Down
6 changes: 5 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1180,7 +1180,8 @@ typedef struct ClientFlags {
uint64_t reprocessing_command : 1; /* The client is re-processing the command. */
uint64_t replication_done : 1; /* Indicate that replication has been done on the client */
uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */
uint64_t reserved : 9; /* Reserved for future use */
uint64_t pending_io_uring_write : 1; /* Client has output to send using io_uring. */
uint64_t reserved : 8; /* Reserved for future use */
} ClientFlags;

typedef struct client {
Expand Down Expand Up @@ -2123,6 +2124,9 @@ struct valkeyServer {
sds availability_zone; /* When run in a cloud environment we can configure the availability zone it is running in */
/* Local environment */
char *locale_collate;
/* io_uring */
int io_uring_enabled; /* Enable io_uring */
struct io_uring *io_uring;
};

#define MAX_KEYS_BUFFER 256
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ start_server {tags {"introspection"}} {
socket-mark-id
req-res-logfile
client-default-resp
io-uring-enabled
}

if {!$::tls} {
Expand Down
8 changes: 7 additions & 1 deletion valkey.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2313,4 +2313,10 @@ jemalloc-bg-thread yes
# this is only exposed via the info command for clients to use, but in the future we
# we may also use this when making decisions for replication.
#
# availability-zone "zone-name"
# availability-zone "zone-name"

# If Valkey is compiled with io_uring support and liburing is installed in the
# system, then io_uring can be enabled with this config. The io_uring kernel
# interface was adopted in Linux kernel version 5.1.
#
# io-uring-enabled no

0 comments on commit e29bd5e

Please sign in to comment.