From 125394c8c7117619724e40e7ebed26532692c781 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Thu, 14 Nov 2024 18:14:11 +0000 Subject: [PATCH 01/12] Extract the scripting engine code from the functions unit This commit creates a new unit for the scripting engine code by extracting the existing code from the functions unit. We're doing this refactor to prepare the code for runnning the `EVAL` command using different scripting engines. Signed-off-by: Ricardo Dias --- cmake/Modules/SourceFiles.cmake | 3 +- src/Makefile | 4 +- src/engine.c | 281 +++++++++++++++++++++++++++++++ src/engine.h | 72 ++++++++ src/function_lua.c | 9 +- src/functions.c | 290 ++++++++------------------------ src/functions.h | 65 +------ src/module.c | 14 +- src/server.c | 6 + src/valkeymodule.h | 59 ++++++- tests/modules/helloscripting.c | 5 +- 11 files changed, 512 insertions(+), 296 deletions(-) create mode 100644 src/engine.c create mode 100644 src/engine.h diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index 1a754ff846..f0f6954d1f 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -106,7 +106,8 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/connection.c ${CMAKE_SOURCE_DIR}/src/unix.c ${CMAKE_SOURCE_DIR}/src/server.c - ${CMAKE_SOURCE_DIR}/src/logreqres.c) + ${CMAKE_SOURCE_DIR}/src/logreqres.c + ${CMAKE_SOURCE_DIR}/src/engine.c) # valkey-cli set(VALKEY_CLI_SRCS diff --git a/src/Makefile b/src/Makefile index e52f4f08d3..0dc2a54207 100644 --- a/src/Makefile +++ b/src/Makefile @@ -374,7 +374,7 @@ else endef endif -# Determine install/uninstall Redis symlinks for compatibility when +# Determine install/uninstall Redis symlinks for compatibility when # installing/uninstalling Valkey binaries (defaulting to `yes`) USE_REDIS_SYMLINKS?=yes ifeq ($(USE_REDIS_SYMLINKS),yes) @@ -416,7 +416,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o engine.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/engine.c b/src/engine.c new file mode 100644 index 0000000000..46fbdea9ac --- /dev/null +++ b/src/engine.c @@ -0,0 +1,281 @@ +#include "engine.h" +#include "dict.h" +#include "functions.h" +#include "module.h" + +typedef struct engineImpl { + /* Engine specific context */ + engineCtx *ctx; + + /* Callback functions implemented by the scripting engine module */ + engineMethods methods; +} engineImpl; + +typedef struct engine { + sds name; /* Name of the engine */ + ValkeyModule *module; /* the module that implements the scripting engine */ + engineImpl *impl; /* engine callbacks that allows to interact with the engine */ + client *c; /* Client that is used to run commands */ + ValkeyModuleCtx *module_ctx; /* Cache of the module context object */ +} engine; + + +typedef struct engineManger { + dict *engines; /* engines dictionary */ + size_t engine_cache_memory; +} engineManager; + + +static engineManager engineMgr = { + .engines = NULL, + .engine_cache_memory = 0, +}; + +static uint64_t dictStrCaseHash(const void *key) { + return dictGenCaseHashFunction((unsigned char *)key, strlen((char *)key)); +} + +dictType engineDictType = { + dictStrCaseHash, /* hash function */ + NULL, /* key dup */ + dictSdsKeyCaseCompare, /* key compare */ + NULL, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + +/* Initializes the scripting engine manager. + * The engine manager is responsible for managing the several scripting engines + * that are loaded in the server and implemented by Valkey Modules. + * + * Returns C_ERR if some error occurs during the initialization. + */ +int engineManagerInit(void) { + engineMgr.engines = dictCreate(&engineDictType); + return C_OK; +} + +size_t engineManagerGetCacheMemory(void) { + return engineMgr.engine_cache_memory; +} + +size_t engineManagerGetNumEngines(void) { + return dictSize(engineMgr.engines); +} + +size_t engineManagerGetMemoryUsage(void) { + return dictMemUsage(engineMgr.engines) + sizeof(engineMgr); +} + +/* Registers a new scripting engine in the engine manager. + * + * - `engine_name`: the name of the scripting engine. This name will match + * against the engine name specified in the script header using a shebang. + * + * - `ctx`: engine specific context pointer. + * + * - engine_methods - the struct with the scripting engine callback functions + * pointers. + * + * Returns C_ERR in case of an error during registration. + */ +int engineManagerRegisterEngine(const char *engine_name, + ValkeyModule *engine_module, + engineCtx *engine_ctx, + engineMethods *engine_methods) { + sds engine_name_sds = sdsnew(engine_name); + + if (dictFetchValue(engineMgr.engines, engine_name_sds)) { + serverLog(LL_WARNING, "Same engine was registered twice"); + sdsfree(engine_name_sds); + return C_ERR; + } + + engineImpl *ei = zmalloc(sizeof(engineImpl)); + *ei = (engineImpl){ + .ctx = engine_ctx, + .methods = { + .create_functions_library = engine_methods->create_functions_library, + .call_function = engine_methods->call_function, + .free_function = engine_methods->free_function, + .get_function_memory_overhead = engine_methods->get_function_memory_overhead, + .get_memory_info = engine_methods->get_memory_info, + }, + }; + + client *c = createClient(NULL); + c->flag.deny_blocking = 1; + c->flag.script = 1; + c->flag.fake = 1; + + engine *e = zmalloc(sizeof(*ei)); + *e = (engine){ + .name = engine_name_sds, + .module = engine_module, + .impl = ei, + .c = c, + .module_ctx = engine_module ? moduleAllocateContext() : NULL, + }; + + dictAdd(engineMgr.engines, engine_name_sds, e); + + engineMemoryInfo mem_info = engineCallGetMemoryInfo(e); + engineMgr.engine_cache_memory += zmalloc_size(e) + + sdsAllocSize(e->name) + + zmalloc_size(ei) + + mem_info.engine_memory_overhead; + + return C_OK; +} + +/* Removes a scripting engine from the engine manager. + * + * - `engine_name`: name of the engine to remove + */ +int engineManagerUnregisterEngine(const char *engine_name) { + dictEntry *entry = dictUnlink(engineMgr.engines, engine_name); + if (entry == NULL) { + serverLog(LL_WARNING, "There's no engine registered with name %s", engine_name); + return C_ERR; + } + + engine *e = dictGetVal(entry); + + functionsRemoveLibFromEngine(e); + + zfree(e->impl); + sdsfree(e->name); + freeClient(e->c); + if (e->module_ctx) { + serverAssert(e->module != NULL); + zfree(e->module_ctx); + } + zfree(e); + + dictFreeUnlinkedEntry(engineMgr.engines, entry); + + return C_OK; +} + +/* + * Lookups the engine with `engine_name` in the engine manager and returns it if + * it exists. Otherwise returns `NULL`. + */ +engine *engineManagerFind(sds engine_name) { + dictEntry *entry = dictFind(engineMgr.engines, engine_name); + if (entry) { + return dictGetVal(entry); + } + return NULL; +} + +sds engineGetName(engine *engine) { + return engine->name; +} + +client *engineGetClient(engine *engine) { + return engine->c; +} + +ValkeyModule *engineGetModule(engine *engine) { + return engine->module; +} + +/* + * Iterates the list of engines registered in the engine manager and calls the + * callback function with each engine. + * + * The `context` pointer is also passed in each callback call. + */ +void engineManagerForEachEngine(engineIterCallback callback, void *context) { + dictIterator *iter = dictGetIterator(engineMgr.engines); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + engine *e = dictGetVal(entry); + if (!callback(e, context)) { + break; + } + } + dictReleaseIterator(iter); +} + +static void engineSetupModuleCtx(engine *e, client *c) { + if (e->module != NULL) { + serverAssert(e->module_ctx != NULL); + moduleScriptingEngineInitContext(e->module_ctx, e->module, c); + } +} + +static void engineTeardownModuleCtx(engine *e) { + if (e->module != NULL) { + serverAssert(e->module_ctx != NULL); + moduleFreeContext(e->module_ctx); + } +} + +compiledFunction **engineCallCreateFunctionsLibrary(engine *engine, + const char *code, + size_t timeout, + size_t *out_num_compiled_functions, + char **err) { + engineSetupModuleCtx(engine, NULL); + + compiledFunction **functions = engine->impl->methods.create_functions_library( + engine->module_ctx, + engine->impl->ctx, + code, + timeout, + out_num_compiled_functions, + err); + + engineTeardownModuleCtx(engine); + + return functions; +} + +void engineCallFunction(engine *engine, + functionCtx *func_ctx, + client *caller, + void *compiled_function, + robj **keys, + size_t nkeys, + robj **args, + size_t nargs) { + engineSetupModuleCtx(engine, caller); + + engine->impl->methods.call_function( + engine->module_ctx, + engine->impl->ctx, + func_ctx, + compiled_function, + keys, + nkeys, + args, + nargs); + + engineTeardownModuleCtx(engine); +} + +void engineCallFreeFunction(engine *engine, + void *compiled_func) { + engineSetupModuleCtx(engine, NULL); + engine->impl->methods.free_function(engine->module_ctx, + engine->impl->ctx, + compiled_func); + engineTeardownModuleCtx(engine); +} + +size_t engineCallGetFunctionMemoryOverhead(engine *engine, void *compiled_function) { + engineSetupModuleCtx(engine, NULL); + size_t mem = engine->impl->methods.get_function_memory_overhead(engine->module_ctx, compiled_function); + engineTeardownModuleCtx(engine); + return mem; +} + +engineMemoryInfo engineCallGetMemoryInfo(engine *engine) { + engineSetupModuleCtx(engine, NULL); + engineMemoryInfo mem_info = engine->impl->methods.get_memory_info( + engine->module_ctx, engine->impl->ctx); + engineTeardownModuleCtx(engine); + return mem_info; +} diff --git a/src/engine.h b/src/engine.h new file mode 100644 index 0000000000..9ac337b5bf --- /dev/null +++ b/src/engine.h @@ -0,0 +1,72 @@ +#ifndef _ENGINE_H_ +#define _ENGINE_H_ + +#include "server.h" + +// Forward declaration of the engine structure. +typedef struct engine engine; + +/* ValkeyModule type aliases for scripting engine structs and types. */ +typedef ValkeyModuleScriptingEngineCtx engineCtx; +typedef ValkeyModuleScriptingEngineFunctionCtx functionCtx; +typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction; +typedef ValkeyModuleScriptingEngineMemoryInfo engineMemoryInfo; +typedef ValkeyModuleScriptingEngineMethods engineMethods; + +/* + * Callback function used to iterate the list of engines registered in the + * engine manager. + * + * - `engine`: the engine in the current iteration. + * + * - `context`: a generic pointer to a context object. + * + * If the callback function returns 0, then the iteration is stopped + * immediately. + */ +typedef int (*engineIterCallback)(engine *engine, void *context); + +/* + * Engine manager API functions. + */ +int engineManagerInit(void); +size_t engineManagerGetCacheMemory(void); +size_t engineManagerGetNumEngines(void); +size_t engineManagerGetMemoryUsage(void); +int engineManagerRegisterEngine(const char *engine_name, + ValkeyModule *engine_module, + engineCtx *engine_ctx, + engineMethods *engine_methods); +int engineManagerUnregisterEngine(const char *engine_name); +engine *engineManagerFind(sds engine_name); +void engineManagerForEachEngine(engineIterCallback callback, void *context); + +/* + * Engine API functions. + */ +sds engineGetName(engine *engine); +client *engineGetClient(engine *engine); +ValkeyModule *engineGetModule(engine *engine); + +/* + * API to call engine callback functions. + */ +compiledFunction **engineCallCreateFunctionsLibrary( + engine *engine, + const char *code, + size_t timeout, + size_t *out_num_compiled_functions, + char **err); +void engineCallFunction(engine *engine, + functionCtx *func_ctx, + client *caller, + void *compiled_function, + robj **keys, + size_t nkeys, + robj **args, + size_t nargs); +void engineCallFreeFunction(engine *engine, void *compiled_func); +size_t engineCallGetFunctionMemoryOverhead(engine *engine, void *compiled_function); +engineMemoryInfo engineCallGetMemoryInfo(engine *engine); + +#endif /* _ENGINE_H_ */ diff --git a/src/function_lua.c b/src/function_lua.c index b535528906..bafda878b0 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -39,6 +39,7 @@ * Uses script_lua.c to run the Lua code. */ +#include "engine.h" #include "functions.h" #include "script_lua.h" #include @@ -557,8 +558,8 @@ int luaEngineInitEngine(void) { .get_memory_info = luaEngineGetMemoryInfo, }; - return functionsRegisterEngine(LUA_ENGINE_NAME, - NULL, - lua_engine_ctx, - &lua_engine_methods); + return engineManagerRegisterEngine(LUA_ENGINE_NAME, + NULL, + lua_engine_ctx, + &lua_engine_methods); } diff --git a/src/functions.c b/src/functions.c index 0d003f7fac..9750d0a99c 100644 --- a/src/functions.c +++ b/src/functions.c @@ -31,7 +31,6 @@ #include "sds.h" #include "dict.h" #include "adlist.h" -#include "module.h" #define LOAD_TIMEOUT_MS 500 @@ -41,8 +40,6 @@ typedef enum { restorePolicy_Replace } restorePolicy; -static size_t engine_cache_memory = 0; - /* Forward declaration */ static void engineFunctionDispose(void *obj); static void engineStatsDispose(void *obj); @@ -67,15 +64,6 @@ typedef struct functionsLibMetaData { sds code; } functionsLibMetaData; -dictType engineDictType = { - dictSdsCaseHash, /* hash function */ - dictSdsDup, /* key dup */ - dictSdsKeyCaseCompare, /* key compare */ - dictSdsDestructor, /* key destructor */ - NULL, /* val destructor */ - NULL /* allow to expand */ -}; - dictType functionDictType = { dictSdsCaseHash, /* hash function */ dictSdsDup, /* key dup */ @@ -112,34 +100,14 @@ dictType librariesDictType = { NULL /* allow to expand */ }; -/* Dictionary of engines */ -static dict *engines = NULL; - /* Libraries Ctx. */ static functionsLibCtx *curr_functions_lib_ctx = NULL; -static void setupEngineModuleCtx(engineInfo *ei, client *c) { - if (ei->engineModule != NULL) { - serverAssert(ei->module_ctx != NULL); - moduleScriptingEngineInitContext(ei->module_ctx, ei->engineModule, c); - } -} - -static void teardownEngineModuleCtx(engineInfo *ei) { - if (ei->engineModule != NULL) { - serverAssert(ei->module_ctx != NULL); - moduleFreeContext(ei->module_ctx); - } -} - static size_t functionMallocSize(functionInfo *fi) { - setupEngineModuleCtx(fi->li->ei, NULL); - size_t size = zmalloc_size(fi) + - sdsAllocSize(fi->name) + - (fi->desc ? sdsAllocSize(fi->desc) : 0) + - fi->li->ei->engine->get_function_memory_overhead(fi->li->ei->module_ctx, fi->function); - teardownEngineModuleCtx(fi->li->ei); - return size; + return zmalloc_size(fi) + + sdsAllocSize(fi->name) + + (fi->desc ? sdsAllocSize(fi->desc) : 0) + + engineCallGetFunctionMemoryOverhead(fi->li->engine, fi->function); } static size_t libraryMallocSize(functionLibInfo *li) { @@ -161,12 +129,8 @@ static void engineFunctionDispose(void *obj) { if (fi->desc) { sdsfree(fi->desc); } - setupEngineModuleCtx(fi->li->ei, NULL); - engine *engine = fi->li->ei->engine; - engine->free_function(fi->li->ei->module_ctx, - engine->engine_ctx, - fi->function); - teardownEngineModuleCtx(fi->li->ei); + + engineCallFreeFunction(fi->li->engine, fi->function); zfree(fi); } @@ -239,30 +203,30 @@ functionsLibCtx *functionsLibCtxGetCurrent(void) { return curr_functions_lib_ctx; } +static int initializeFunctionsLibEngineStats(engine *engine, void *context) { + functionsLibCtx *lib_ctx = (functionsLibCtx *)context; + functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); + dictAdd(lib_ctx->engines_stats, engineGetName(engine), stats); + return 1; +} + /* Create a new functions ctx */ functionsLibCtx *functionsLibCtxCreate(void) { functionsLibCtx *ret = zmalloc(sizeof(functionsLibCtx)); ret->libraries = dictCreate(&librariesDictType); ret->functions = dictCreate(&functionDictType); ret->engines_stats = dictCreate(&engineStatsDictType); - dictIterator *iter = dictGetIterator(engines); - dictEntry *entry = NULL; - while ((entry = dictNext(iter))) { - engineInfo *ei = dictGetVal(entry); - functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); - dictAdd(ret->engines_stats, ei->name, stats); - } - dictReleaseIterator(iter); + engineManagerForEachEngine(initializeFunctionsLibEngineStats, ret); ret->cache_memory = 0; return ret; } -void functionsAddEngineStats(engineInfo *ei) { +void functionsAddEngineStats(sds engine_name) { serverAssert(curr_functions_lib_ctx != NULL); - dictEntry *entry = dictFind(curr_functions_lib_ctx->engines_stats, ei->name); + dictEntry *entry = dictFind(curr_functions_lib_ctx->engines_stats, engine_name); if (entry == NULL) { functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); - dictAdd(curr_functions_lib_ctx->engines_stats, ei->name, stats); + dictAdd(curr_functions_lib_ctx->engines_stats, engine_name, stats); } } @@ -312,12 +276,12 @@ static int functionLibCreateFunction(robj *name, return C_OK; } -static functionLibInfo *engineLibraryCreate(sds name, engineInfo *ei, sds code) { +static functionLibInfo *engineLibraryCreate(sds name, engine *e, sds code) { functionLibInfo *li = zmalloc(sizeof(*li)); *li = (functionLibInfo){ .name = sdsdup(name), .functions = dictCreate(&libraryFunctionDictType), - .ei = ei, + .engine = e, .code = sdsdup(code), }; return li; @@ -339,7 +303,7 @@ static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) { lib_ctx->cache_memory -= libraryMallocSize(li); /* update stats */ - functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, engineGetName(li->engine)); serverAssert(stats); stats->n_lib--; stats->n_functions -= dictSize(li->functions); @@ -359,7 +323,7 @@ static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo *li) { lib_ctx->cache_memory += libraryMallocSize(li); /* update stats */ - functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, li->ei->name); + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, engineGetName(li->engine)); serverAssert(stats); stats->n_lib++; stats->n_functions += dictSize(li->functions); @@ -446,107 +410,30 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l return ret; } -/* Register an engine, should be called once by the engine on startup and give - * the following: - * - * - engine_name - name of the engine to register - * - * - engine_module - the valkey module that implements this engine - * - * - engine_ctx - the engine ctx that should be used by the server to interact - * with the engine. - * - * - engine_methods - the struct with the scripting engine callback functions - * pointers. - * - */ -int functionsRegisterEngine(const char *engine_name, - ValkeyModule *engine_module, - engineCtx *engine_ctx, - engineMethods *engine_methods) { - sds engine_name_sds = sdsnew(engine_name); - if (dictFetchValue(engines, engine_name_sds)) { - serverLog(LL_WARNING, "Same engine was registered twice"); - sdsfree(engine_name_sds); - return C_ERR; - } - - engine *eng = zmalloc(sizeof(engine)); - *eng = (engine){ - .engine_ctx = engine_ctx, - .create = engine_methods->create_functions_library, - .call = engine_methods->call_function, - .get_function_memory_overhead = engine_methods->get_function_memory_overhead, - .free_function = engine_methods->free_function, - .get_memory_info = engine_methods->get_memory_info, - }; - - client *c = createClient(NULL); - c->flag.deny_blocking = 1; - c->flag.script = 1; - c->flag.fake = 1; - engineInfo *ei = zmalloc(sizeof(*ei)); - *ei = (engineInfo){ - .name = engine_name_sds, - .engineModule = engine_module, - .module_ctx = engine_module ? moduleAllocateContext() : NULL, - .engine = eng, - .c = c, - }; - - dictAdd(engines, engine_name_sds, ei); - - functionsAddEngineStats(ei); - - setupEngineModuleCtx(ei, NULL); - engineMemoryInfo mem_info = eng->get_memory_info(ei->module_ctx, - eng->engine_ctx); - engine_cache_memory += zmalloc_size(ei) + - sdsAllocSize(ei->name) + - zmalloc_size(eng) + - mem_info.engine_memory_overhead; - - teardownEngineModuleCtx(ei); - - return C_OK; +static int replyEngineStats(engine *engine, void *context) { + client *c = (client *)context; + addReplyBulkCString(c, engineGetName(engine)); + addReplyMapLen(c, 2); + functionsLibEngineStats *e_stats = + dictFetchValue(curr_functions_lib_ctx->engines_stats, engineGetName(engine)); + addReplyBulkCString(c, "libraries_count"); + addReplyLongLong(c, e_stats ? e_stats->n_lib : 0); + addReplyBulkCString(c, "functions_count"); + addReplyLongLong(c, e_stats ? e_stats->n_functions : 0); + return 1; } -/* Removes a scripting engine from the server. - * - * - engine_name - name of the engine to remove - */ -int functionsUnregisterEngine(const char *engine_name) { - sds engine_name_sds = sdsnew(engine_name); - dictEntry *entry = dictFind(engines, engine_name_sds); - if (entry == NULL) { - serverLog(LL_WARNING, "There's no engine registered with name %s", engine_name); - sdsfree(engine_name_sds); - return C_ERR; - } - - engineInfo *ei = dictGetVal(entry); - +void functionsRemoveLibFromEngine(engine *engine) { dictIterator *iter = dictGetSafeIterator(curr_functions_lib_ctx->libraries); + dictEntry *entry = NULL; while ((entry = dictNext(iter))) { functionLibInfo *li = dictGetVal(entry); - if (li->ei == ei) { + if (li->engine == engine) { libraryUnlink(curr_functions_lib_ctx, li); engineLibraryFree(li); } } dictReleaseIterator(iter); - - zfree(ei->engine); - sdsfree(ei->name); - freeClient(ei->c); - if (ei->engineModule != NULL) { - serverAssert(ei->module_ctx != NULL); - zfree(ei->module_ctx); - } - zfree(ei); - - sdsfree(engine_name_sds); - return C_OK; } /* @@ -578,20 +465,8 @@ void functionStatsCommand(client *c) { } addReplyBulkCString(c, "engines"); - addReplyMapLen(c, dictSize(engines)); - dictIterator *iter = dictGetIterator(engines); - dictEntry *entry = NULL; - while ((entry = dictNext(iter))) { - engineInfo *ei = dictGetVal(entry); - addReplyBulkCString(c, ei->name); - addReplyMapLen(c, 2); - functionsLibEngineStats *e_stats = dictFetchValue(curr_functions_lib_ctx->engines_stats, ei->name); - addReplyBulkCString(c, "libraries_count"); - addReplyLongLong(c, e_stats->n_lib); - addReplyBulkCString(c, "functions_count"); - addReplyLongLong(c, e_stats->n_functions); - } - dictReleaseIterator(iter); + addReplyMapLen(c, engineManagerGetNumEngines()); + engineManagerForEachEngine(replyEngineStats, c); } static void functionListReplyFlags(client *c, functionInfo *fi) { @@ -667,7 +542,8 @@ void functionListCommand(client *c) { addReplyBulkCString(c, "library_name"); addReplyBulkCBuffer(c, li->name, sdslen(li->name)); addReplyBulkCString(c, "engine"); - addReplyBulkCBuffer(c, li->ei->name, sdslen(li->ei->name)); + sds engine_name = engineGetName(li->engine); + addReplyBulkCBuffer(c, engine_name, sdslen(engine_name)); addReplyBulkCString(c, "functions"); addReplyArrayLen(c, dictSize(li->functions)); @@ -747,7 +623,7 @@ static void fcallCommandGeneric(client *c, int ro) { return; } functionInfo *fi = dictGetVal(de); - engine *engine = fi->li->ei->engine; + engine *engine = fi->li->engine; long long numkeys; /* Get the number of arguments that are keys */ @@ -764,19 +640,16 @@ static void fcallCommandGeneric(client *c, int ro) { } scriptRunCtx run_ctx; - if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) return; - setupEngineModuleCtx(fi->li->ei, run_ctx.original_client); - - engine->call(fi->li->ei->module_ctx, - engine->engine_ctx, - &run_ctx, - fi->function, - c->argv + 3, - numkeys, - c->argv + 3 + numkeys, - c->argc - 3 - numkeys); - - teardownEngineModuleCtx(fi->li->ei); + if (scriptPrepareForRun(&run_ctx, engineGetClient(engine), c, fi->name, fi->f_flags, ro) != C_OK) return; + + engineCallFunction(engine, + &run_ctx, + run_ctx.original_client, + fi->function, + c->argv + 3, + numkeys, + c->argv + 3 + numkeys, + c->argc - 3 - numkeys); scriptResetRun(&run_ctx); } @@ -1076,12 +949,10 @@ void functionFreeLibMetaData(functionsLibMetaData *md) { if (md->engine) sdsfree(md->engine); } -static void freeCompiledFunctions(engineInfo *ei, +static void freeCompiledFunctions(engine *engine, compiledFunction **compiled_functions, size_t num_compiled_functions, size_t free_function_from_idx) { - setupEngineModuleCtx(ei, NULL); - for (size_t i = 0; i < num_compiled_functions; i++) { compiledFunction *func = compiled_functions[i]; decrRefCount(func->name); @@ -1089,16 +960,12 @@ static void freeCompiledFunctions(engineInfo *ei, decrRefCount(func->desc); } if (i >= free_function_from_idx) { - ei->engine->free_function(ei->module_ctx, - ei->engine->engine_ctx, - func->function); + engineCallFreeFunction(engine, func->function); } zfree(func); } zfree(compiled_functions); - - teardownEngineModuleCtx(ei); } /* Compile and save the given library, return the loaded library name on success @@ -1120,12 +987,13 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC goto error; } - engineInfo *ei = dictFetchValue(engines, md.engine); - if (!ei) { + engine *engine = engineManagerFind(md.engine); + if (!engine) { *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); goto error; } - engine *engine = ei->engine; + + functionsAddEngineStats(md.engine); old_li = dictFetchValue(lib_ctx->libraries, md.name); if (old_li && !replace) { @@ -1138,18 +1006,15 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC libraryUnlink(lib_ctx, old_li); } - new_li = engineLibraryCreate(md.name, ei, code); + new_li = engineLibraryCreate(md.name, engine, code); size_t num_compiled_functions = 0; char *compile_error = NULL; - setupEngineModuleCtx(ei, NULL); compiledFunction **compiled_functions = - engine->create(ei->module_ctx, - engine->engine_ctx, - md.code, - timeout, - &num_compiled_functions, - &compile_error); - teardownEngineModuleCtx(ei); + engineCallCreateFunctionsLibrary(engine, + md.code, + timeout, + &num_compiled_functions, + &compile_error); if (compiled_functions == NULL) { serverAssert(num_compiled_functions == 0); serverAssert(compile_error != NULL); @@ -1167,7 +1032,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC func->f_flags, err); if (ret == C_ERR) { - freeCompiledFunctions(ei, + freeCompiledFunctions(engine, compiled_functions, num_compiled_functions, i); @@ -1175,7 +1040,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC } } - freeCompiledFunctions(ei, + freeCompiledFunctions(engine, compiled_functions, num_compiled_functions, num_compiled_functions); @@ -1259,32 +1124,27 @@ void functionLoadCommand(client *c) { addReplyBulkSds(c, library_name); } +static int getEngineUsedMemory(engine *engine, void *context) { + size_t *engines_memory = (size_t *)context; + engineMemoryInfo mem_info = engineCallGetMemoryInfo(engine); + *engines_memory += mem_info.used_memory; + return 1; +} + /* Return memory usage of all the engines combine */ unsigned long functionsMemory(void) { - dictIterator *iter = dictGetIterator(engines); - dictEntry *entry = NULL; size_t engines_memory = 0; - while ((entry = dictNext(iter))) { - engineInfo *ei = dictGetVal(entry); - engine *engine = ei->engine; - setupEngineModuleCtx(ei, NULL); - engineMemoryInfo mem_info = engine->get_memory_info(ei->module_ctx, - engine->engine_ctx); - engines_memory += mem_info.used_memory; - teardownEngineModuleCtx(ei); - } - dictReleaseIterator(iter); - + engineManagerForEachEngine(getEngineUsedMemory, &engines_memory); return engines_memory; } /* Return memory overhead of all the engines combine */ unsigned long functionsMemoryOverhead(void) { - size_t memory_overhead = dictMemUsage(engines); + size_t memory_overhead = engineManagerGetMemoryUsage(); memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); memory_overhead += sizeof(functionsLibCtx); memory_overhead += curr_functions_lib_ctx->cache_memory; - memory_overhead += engine_cache_memory; + memory_overhead += engineManagerGetCacheMemory(); return memory_overhead; } @@ -1309,8 +1169,6 @@ size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx) { /* Initialize engine data structures. * Should be called once on server initialization */ int functionsInit(void) { - engines = dictCreate(&engineDictType); - curr_functions_lib_ctx = functionsLibCtxCreate(); if (luaEngineInitEngine() != C_OK) { diff --git a/src/functions.h b/src/functions.h index 89e39fdc56..2fcfb2272e 100644 --- a/src/functions.h +++ b/src/functions.h @@ -49,65 +49,12 @@ */ #include "server.h" +#include "engine.h" #include "script.h" #include "valkeymodule.h" typedef struct functionLibInfo functionLibInfo; -/* ValkeyModule type aliases for scripting engine structs and types. */ -typedef ValkeyModuleScriptingEngineCtx engineCtx; -typedef ValkeyModuleScriptingEngineFunctionCtx functionCtx; -typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction; -typedef ValkeyModuleScriptingEngineMemoryInfo engineMemoryInfo; -typedef ValkeyModuleScriptingEngineMethods engineMethods; - -typedef struct engine { - /* engine specific context */ - engineCtx *engine_ctx; - - /* Compiles the script code and returns an array of compiled functions - * registered in the script./ - * - * Returns NULL on error and set err to be the error message */ - compiledFunction **(*create)( - ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx, - const char *code, - size_t timeout, - size_t *out_num_compiled_functions, - char **err); - - /* Invoking a function, func_ctx is an opaque object (from engine POV). - * The func_ctx should be used by the engine to interaction with the server, - * such interaction could be running commands, set resp, or set - * replication mode - */ - void (*call)(ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx, - functionCtx *func_ctx, - void *compiled_function, - robj **keys, - size_t nkeys, - robj **args, - size_t nargs); - - /* free the given function */ - void (*free_function)(ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx, - void *compiled_function); - - /* Return memory overhead for a given function, - * such memory is not counted as engine memory but as general - * structs memory that hold different information */ - size_t (*get_function_memory_overhead)(ValkeyModuleCtx *module_ctx, - void *compiled_function); - - /* Get the current used memory by the engine */ - engineMemoryInfo (*get_memory_info)(ValkeyModuleCtx *module_ctx, - engineCtx *engine_ctx); - -} engine; - /* Hold information about an engine. * Used on rdb.c so it must be declared here. */ typedef struct engineInfo { @@ -134,16 +81,10 @@ typedef struct functionInfo { struct functionLibInfo { sds name; /* Library name */ dict *functions; /* Functions dictionary */ - engineInfo *ei; /* Pointer to the function engine */ + engine *engine; /* Pointer to the scripting engine */ sds code; /* Library code */ }; -int functionsRegisterEngine(const char *engine_name, - ValkeyModule *engine_module, - void *engine_ctx, - engineMethods *engine_methods); -int functionsUnregisterEngine(const char *engine_name); - sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibCtx *lib_ctx, size_t timeout); unsigned long functionsMemory(void); unsigned long functionsMemoryOverhead(void); @@ -158,6 +99,8 @@ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx); void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)); void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async); +void functionsRemoveLibFromEngine(engine *engine); + int luaEngineInitEngine(void); int functionsInit(void); diff --git a/src/module.c b/src/module.c index db493dd8bc..1b6c31a401 100644 --- a/src/module.c +++ b/src/module.c @@ -62,7 +62,7 @@ #include "crc16_slottable.h" #include "valkeymodule.h" #include "io_threads.h" -#include "functions.h" +#include "engine.h" #include #include #include @@ -13125,10 +13125,10 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, return VALKEYMODULE_ERR; } - if (functionsRegisterEngine(engine_name, - module_ctx->module, - engine_ctx, - engine_methods) != C_OK) { + if (engineManagerRegisterEngine(engine_name, + module_ctx->module, + engine_ctx, + engine_methods) != C_OK) { return VALKEYMODULE_ERR; } @@ -13144,7 +13144,9 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, */ int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name) { UNUSED(ctx); - functionsUnregisterEngine(engine_name); + if (engineManagerUnregisterEngine(engine_name) != C_OK) { + return VALKEYMODULE_ERR; + } return VALKEYMODULE_OK; } diff --git a/src/server.c b/src/server.c index 3cdec9fa9b..3876eb5cfc 100644 --- a/src/server.c +++ b/src/server.c @@ -42,6 +42,7 @@ #include "fmtargs.h" #include "io_threads.h" #include "sds.h" +#include "engine.h" #include #include @@ -2873,6 +2874,11 @@ void initServer(void) { server.maxmemory_policy = MAXMEMORY_NO_EVICTION; } + if (engineManagerInit() == C_ERR) { + serverPanic("Scripting engine manager initialization failed, check the server logs."); + exit(1); + } + /* Initialize the LUA scripting engine. */ scriptingInit(1); /* Initialize the functions engine based off of LUA initialization. */ diff --git a/src/valkeymodule.h b/src/valkeymodule.h index 1d99d2ff7a..2ee67e8889 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -818,7 +818,12 @@ typedef struct ValkeyModuleScriptingEngineCompiledFunction { } ValkeyModuleScriptingEngineCompiledFunction; /* This struct is used to return the memory information of the scripting - * engine. */ + * engine. + * + * IMPORTANT: If we ever need to add/remove fields from this struct, we need + * to bump the version number defined in the + * `VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION` constant. + */ typedef struct ValkeyModuleScriptingEngineMemoryInfo { /* The memory used by the scripting engine runtime. */ size_t used_memory; @@ -826,6 +831,25 @@ typedef struct ValkeyModuleScriptingEngineMemoryInfo { size_t engine_memory_overhead; } ValkeyModuleScriptingEngineMemoryInfo; +/* The callback function called when `FUNCTION LOAD` command is called to load + * a library of functions. + * This callback function evaluates the source code passed to `FUNCTION LOAD` + * and registers the functions declared in the source code. + * + * - `engine_ctx`: the engine specific context pointer. + * + * - `code`: string pointer to the source code. + * + * - `timeout`: timeout for the library creation (0 for no timeout). + * + * - `out_num_compiled_functions`: out param with the number of objects + * returned by this function. + * + * - `err` - out param with the description of error (if occurred). + * + * Returns an array of compiled function objects, or `NULL` if some error + * occurred. + */ typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc)( ValkeyModuleCtx *module_ctx, ValkeyModuleScriptingEngineCtx *engine_ctx, @@ -834,6 +858,28 @@ typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEng size_t *out_num_compiled_functions, char **err); +/* The callback function called when `FCALL` command is called on a function + * registered in the scripting engine. + * This callback function executes the `compiled_function` code. + * + * - `module_ctx`: the module runtime context. + * + * - `engine_ctx`: the engine specific context pointer. + * + * - `func_ctx`: the context opaque structure that represents the runtime + * context for the function. + * + * - `compiled_function`: pointer to the compiled function registered by the + * engine. + * + * - `keys`: the array of key strings passed in the `FCALL` command. + * + * - `nkeys`: the number of elements present in the `keys` array. + * + * - `args`: the array of string arguments passed in the `FCALL` command. + * + * - `nargs`: the number of elements present in the `args` array. + */ typedef void (*ValkeyModuleScriptingEngineCallFunctionFunc)( ValkeyModuleCtx *module_ctx, ValkeyModuleScriptingEngineCtx *engine_ctx, @@ -844,10 +890,15 @@ typedef void (*ValkeyModuleScriptingEngineCallFunctionFunc)( ValkeyModuleString **args, size_t nargs); + +/* Return memory overhead for a given function, such memory is not counted as + * engine memory but as general structs memory that hold different information + */ typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)( ValkeyModuleCtx *module_ctx, void *compiled_function); +/* Free the given function */ typedef void (*ValkeyModuleScriptingEngineFreeFunctionFunc)( ValkeyModuleCtx *module_ctx, ValkeyModuleScriptingEngineCtx *engine_ctx, @@ -865,13 +916,13 @@ typedef struct ValkeyModuleScriptingEngineMethodsV1 { * ValkeyModuleScriptingEngineCompiledFunc objects. */ ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc create_functions_library; - /* Function callback to free the memory of a registered engine function. */ - ValkeyModuleScriptingEngineFreeFunctionFunc free_function; - /* The callback function called when `FCALL` command is called on a function * registered in this engine. */ ValkeyModuleScriptingEngineCallFunctionFunc call_function; + /* Function callback to free the memory of a registered engine function. */ + ValkeyModuleScriptingEngineFreeFunctionFunc free_function; + /* Function callback to return memory overhead for a given function. */ ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead; diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index c912164bda..c7385bd2a3 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -269,7 +269,7 @@ static size_t engineFunctionMemoryOverhead(ValkeyModuleCtx *module_ctx, } static void engineFreeFunction(ValkeyModuleCtx *module_ctx, - ValkeyModuleScriptingEngineCtx *engine_ctx, + ValkeyModuleScriptingEngineCtx *engine_ctx, void *compiled_function) { VALKEYMODULE_NOT_USED(module_ctx); VALKEYMODULE_NOT_USED(engine_ctx); @@ -341,7 +341,8 @@ callHelloLangFunction(ValkeyModuleCtx *module_ctx, ValkeyModule_ReplyWithLongLong(module_ctx, result); } -int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, +int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, + ValkeyModuleString **argv, int argc) { VALKEYMODULE_NOT_USED(argv); VALKEYMODULE_NOT_USED(argc); From 12caad75f7289363b7177da5a8eadb043c65c8bb Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 23 Dec 2024 12:44:28 +0000 Subject: [PATCH 02/12] Use robj strings for returning errors in scripting engine function create library callback Instead of using C strings to return the errors that may be happen during the code parsing of a scripting engine, use `robj` strings. This commit also removes the `valkey_asprintf` function, since it's not used in any place anymore. Signed-off-by: Ricardo Dias --- src/engine.c | 8 +++++--- src/engine.h | 11 +++++------ src/function_lua.c | 8 +++++--- src/functions.c | 6 +++--- src/util.c | 20 -------------------- src/util.h | 1 - src/valkeymodule.h | 2 +- tests/modules/helloscripting.c | 20 +++++++++++++------- tests/unit/moduleapi/scriptingengine.tcl | 4 ++++ 9 files changed, 36 insertions(+), 44 deletions(-) diff --git a/src/engine.c b/src/engine.c index 46fbdea9ac..7f71b11971 100644 --- a/src/engine.c +++ b/src/engine.c @@ -217,7 +217,7 @@ compiledFunction **engineCallCreateFunctionsLibrary(engine *engine, const char *code, size_t timeout, size_t *out_num_compiled_functions, - char **err) { + robj **err) { engineSetupModuleCtx(engine, NULL); compiledFunction **functions = engine->impl->methods.create_functions_library( @@ -265,9 +265,11 @@ void engineCallFreeFunction(engine *engine, engineTeardownModuleCtx(engine); } -size_t engineCallGetFunctionMemoryOverhead(engine *engine, void *compiled_function) { +size_t engineCallGetFunctionMemoryOverhead(engine *engine, + void *compiled_function) { engineSetupModuleCtx(engine, NULL); - size_t mem = engine->impl->methods.get_function_memory_overhead(engine->module_ctx, compiled_function); + size_t mem = engine->impl->methods.get_function_memory_overhead( + engine->module_ctx, compiled_function); engineTeardownModuleCtx(engine); return mem; } diff --git a/src/engine.h b/src/engine.h index 9ac337b5bf..892c2e32fe 100644 --- a/src/engine.h +++ b/src/engine.h @@ -51,12 +51,11 @@ ValkeyModule *engineGetModule(engine *engine); /* * API to call engine callback functions. */ -compiledFunction **engineCallCreateFunctionsLibrary( - engine *engine, - const char *code, - size_t timeout, - size_t *out_num_compiled_functions, - char **err); +compiledFunction **engineCallCreateFunctionsLibrary(engine *engine, + const char *code, + size_t timeout, + size_t *out_num_compiled_functions, + robj **err); void engineCallFunction(engine *engine, functionCtx *func_ctx, client *caller, diff --git a/src/function_lua.c b/src/function_lua.c index bafda878b0..6dd51dea88 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -122,7 +122,7 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx, const char *code, size_t timeout, size_t *out_num_compiled_functions, - char **err) { + robj **err) { /* The lua engine is implemented in the core, and not in a Valkey Module */ serverAssert(module_ctx == NULL); @@ -140,7 +140,8 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx, /* compile the code */ if (luaL_loadbuffer(lua, code, strlen(code), "@user_function")) { - *err = valkey_asprintf("Error compiling function: %s", lua_tostring(lua, -1)); + sds error = sdscatfmt(sdsempty(), "Error compiling function: %s", lua_tostring(lua, -1)); + *err = createObject(OBJ_STRING, error); lua_pop(lua, 1); /* pops the error */ goto done; } @@ -158,7 +159,8 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx, if (lua_pcall(lua, 0, 0, 0)) { errorInfo err_info = {0}; luaExtractErrorInformation(lua, &err_info); - *err = valkey_asprintf("Error registering functions: %s", err_info.msg); + sds error = sdscatfmt(sdsempty(), "Error registering functions: %s", err_info.msg); + *err = createObject(OBJ_STRING, error); lua_pop(lua, 1); /* pops the error */ luaErrorInformationDiscard(&err_info); listIter *iter = listGetIterator(load_ctx.functions, AL_START_HEAD); diff --git a/src/functions.c b/src/functions.c index 9750d0a99c..13a03c506e 100644 --- a/src/functions.c +++ b/src/functions.c @@ -1008,7 +1008,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC new_li = engineLibraryCreate(md.name, engine, code); size_t num_compiled_functions = 0; - char *compile_error = NULL; + robj *compile_error = NULL; compiledFunction **compiled_functions = engineCallCreateFunctionsLibrary(engine, md.code, @@ -1018,8 +1018,8 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC if (compiled_functions == NULL) { serverAssert(num_compiled_functions == 0); serverAssert(compile_error != NULL); - *err = sdsnew(compile_error); - zfree(compile_error); + *err = sdsdup(compile_error->ptr); + decrRefCount(compile_error); goto error; } diff --git a/src/util.c b/src/util.c index 6e44392ce1..ea4f7d72d7 100644 --- a/src/util.c +++ b/src/util.c @@ -1381,23 +1381,3 @@ int snprintf_async_signal_safe(char *to, size_t n, const char *fmt, ...) { va_end(args); return result; } - -/* A printf-like function that returns a freshly allocated string. - * - * This function is similar to asprintf function, but it uses zmalloc for - * allocating the string buffer. */ -char *valkey_asprintf(char const *fmt, ...) { - va_list args; - - va_start(args, fmt); - size_t str_len = vsnprintf(NULL, 0, fmt, args) + 1; - va_end(args); - - char *str = zmalloc(str_len); - - va_start(args, fmt); - vsnprintf(str, str_len, fmt, args); - va_end(args); - - return str; -} diff --git a/src/util.h b/src/util.h index 61095ddb65..51eb38f0b4 100644 --- a/src/util.h +++ b/src/util.h @@ -99,6 +99,5 @@ int snprintf_async_signal_safe(char *to, size_t n, const char *fmt, ...); #endif size_t valkey_strlcpy(char *dst, const char *src, size_t dsize); size_t valkey_strlcat(char *dst, const char *src, size_t dsize); -char *valkey_asprintf(char const *fmt, ...); #endif diff --git a/src/valkeymodule.h b/src/valkeymodule.h index 2ee67e8889..c501b373fd 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -856,7 +856,7 @@ typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEng const char *code, size_t timeout, size_t *out_num_compiled_functions, - char **err); + ValkeyModuleString **err); /* The callback function called when `FCALL` command is called on a function * registered in the scripting engine. diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index c7385bd2a3..c78561812a 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -151,8 +151,9 @@ static void helloLangParseArgs(HelloFunc *func) { /* * Parses an HELLO program source code. */ -static HelloProgram *helloLangParseCode(const char *code, - HelloProgram *program) { +static int helloLangParseCode(const char *code, + HelloProgram *program, + ValkeyModuleString **err) { char *_code = ValkeyModule_Alloc(sizeof(char) * strlen(code) + 1); strcpy(_code, code); @@ -188,7 +189,9 @@ static HelloProgram *helloLangParseCode(const char *code, currentFunc = NULL; break; default: - ValkeyModule_Assert(0); + *err = ValkeyModule_CreateStringPrintf(NULL, "Failed to parse instruction: '%s'", token); + ValkeyModule_Free(_code); + return -1; } token = strtok(NULL, " \n"); @@ -196,7 +199,7 @@ static HelloProgram *helloLangParseCode(const char *code, ValkeyModule_Free(_code); - return program; + return 0; } /* @@ -269,7 +272,7 @@ static size_t engineFunctionMemoryOverhead(ValkeyModuleCtx *module_ctx, } static void engineFreeFunction(ValkeyModuleCtx *module_ctx, - ValkeyModuleScriptingEngineCtx *engine_ctx, + ValkeyModuleScriptingEngineCtx *engine_ctx, void *compiled_function) { VALKEYMODULE_NOT_USED(module_ctx); VALKEYMODULE_NOT_USED(engine_ctx); @@ -284,7 +287,7 @@ static ValkeyModuleScriptingEngineCompiledFunction **createHelloLangEngine(Valke const char *code, size_t timeout, size_t *out_num_compiled_functions, - char **err) { + ValkeyModuleString **err) { VALKEYMODULE_NOT_USED(module_ctx); VALKEYMODULE_NOT_USED(timeout); VALKEYMODULE_NOT_USED(err); @@ -298,7 +301,10 @@ static ValkeyModuleScriptingEngineCompiledFunction **createHelloLangEngine(Valke ctx->program->num_functions = 0; } - ctx->program = helloLangParseCode(code, ctx->program); + int ret = helloLangParseCode(code, ctx->program, err); + if (ret < 0) { + return NULL; + } ValkeyModuleScriptingEngineCompiledFunction **compiled_functions = ValkeyModule_Alloc(sizeof(ValkeyModuleScriptingEngineCompiledFunction *) * ctx->program->num_functions); diff --git a/tests/unit/moduleapi/scriptingengine.tcl b/tests/unit/moduleapi/scriptingengine.tcl index c350633dd8..17f9b035d1 100644 --- a/tests/unit/moduleapi/scriptingengine.tcl +++ b/tests/unit/moduleapi/scriptingengine.tcl @@ -51,6 +51,10 @@ start_server {tags {"modules"}} { assert_error {ERR Function already exists in the library} {r function load "#!hello name=mylib2\nFUNCTION foo\nARGS 0\nRETURN\nFUNCTION foo\nARGS 0\nRETURN"} } + test {Load script with syntax error} { + assert_error {ERR Failed to parse instruction: 'RETRN'} {r function load replace "#!hello name=mylib3\nFUNCTION foo\nARGS 0\nRETRN"} + } + test {Call scripting engine function: calling foo works} { r fcall foo 0 134 } {134} From bc5760b9e3d02181ef40769eacce332271bef831 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 24 Dec 2024 14:33:34 +0000 Subject: [PATCH 03/12] Rename `engine` to `scriptingEngine` Signed-off-by: Ricardo Dias --- cmake/Modules/SourceFiles.cmake | 2 +- src/Makefile | 2 +- src/function_lua.c | 2 +- src/functions.c | 16 +++++----- src/functions.h | 14 ++++----- src/module.c | 2 +- src/{engine.c => scripting_engine.c} | 46 ++++++++++++++-------------- src/{engine.h => scripting_engine.h} | 25 +++++++-------- src/server.c | 2 +- 9 files changed, 56 insertions(+), 55 deletions(-) rename src/{engine.c => scripting_engine.c} (87%) rename src/{engine.h => scripting_engine.h} (71%) diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index f0f6954d1f..bdaa1faad8 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -107,7 +107,7 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/unix.c ${CMAKE_SOURCE_DIR}/src/server.c ${CMAKE_SOURCE_DIR}/src/logreqres.c - ${CMAKE_SOURCE_DIR}/src/engine.c) + ${CMAKE_SOURCE_DIR}/src/scripting_engine.c) # valkey-cli set(VALKEY_CLI_SRCS diff --git a/src/Makefile b/src/Makefile index 0dc2a54207..9e4075660d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -416,7 +416,7 @@ endif ENGINE_NAME=valkey SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX) ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX) -ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o engine.o +ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX) ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX) diff --git a/src/function_lua.c b/src/function_lua.c index 6dd51dea88..d2c2d5bfd1 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -39,7 +39,7 @@ * Uses script_lua.c to run the Lua code. */ -#include "engine.h" +#include "scripting_engine.h" #include "functions.h" #include "script_lua.h" #include diff --git a/src/functions.c b/src/functions.c index 13a03c506e..9a8c996e58 100644 --- a/src/functions.c +++ b/src/functions.c @@ -203,7 +203,7 @@ functionsLibCtx *functionsLibCtxGetCurrent(void) { return curr_functions_lib_ctx; } -static int initializeFunctionsLibEngineStats(engine *engine, void *context) { +static int initializeFunctionsLibEngineStats(scriptingEngine *engine, void *context) { functionsLibCtx *lib_ctx = (functionsLibCtx *)context; functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); dictAdd(lib_ctx->engines_stats, engineGetName(engine), stats); @@ -276,7 +276,7 @@ static int functionLibCreateFunction(robj *name, return C_OK; } -static functionLibInfo *engineLibraryCreate(sds name, engine *e, sds code) { +static functionLibInfo *engineLibraryCreate(sds name, scriptingEngine *e, sds code) { functionLibInfo *li = zmalloc(sizeof(*li)); *li = (functionLibInfo){ .name = sdsdup(name), @@ -410,7 +410,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l return ret; } -static int replyEngineStats(engine *engine, void *context) { +static int replyEngineStats(scriptingEngine *engine, void *context) { client *c = (client *)context; addReplyBulkCString(c, engineGetName(engine)); addReplyMapLen(c, 2); @@ -423,7 +423,7 @@ static int replyEngineStats(engine *engine, void *context) { return 1; } -void functionsRemoveLibFromEngine(engine *engine) { +void functionsRemoveLibFromEngine(scriptingEngine *engine) { dictIterator *iter = dictGetSafeIterator(curr_functions_lib_ctx->libraries); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { @@ -623,7 +623,7 @@ static void fcallCommandGeneric(client *c, int ro) { return; } functionInfo *fi = dictGetVal(de); - engine *engine = fi->li->engine; + scriptingEngine *engine = fi->li->engine; long long numkeys; /* Get the number of arguments that are keys */ @@ -949,7 +949,7 @@ void functionFreeLibMetaData(functionsLibMetaData *md) { if (md->engine) sdsfree(md->engine); } -static void freeCompiledFunctions(engine *engine, +static void freeCompiledFunctions(scriptingEngine *engine, compiledFunction **compiled_functions, size_t num_compiled_functions, size_t free_function_from_idx) { @@ -987,7 +987,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC goto error; } - engine *engine = engineManagerFind(md.engine); + scriptingEngine *engine = engineManagerFind(md.engine); if (!engine) { *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); goto error; @@ -1124,7 +1124,7 @@ void functionLoadCommand(client *c) { addReplyBulkSds(c, library_name); } -static int getEngineUsedMemory(engine *engine, void *context) { +static int getEngineUsedMemory(scriptingEngine *engine, void *context) { size_t *engines_memory = (size_t *)context; engineMemoryInfo mem_info = engineCallGetMemoryInfo(engine); *engines_memory += mem_info.used_memory; diff --git a/src/functions.h b/src/functions.h index 2fcfb2272e..7f6d144365 100644 --- a/src/functions.h +++ b/src/functions.h @@ -49,7 +49,7 @@ */ #include "server.h" -#include "engine.h" +#include "scripting_engine.h" #include "script.h" #include "valkeymodule.h" @@ -61,7 +61,7 @@ typedef struct engineInfo { sds name; /* Name of the engine */ ValkeyModule *engineModule; /* the module that implements the scripting engine */ ValkeyModuleCtx *module_ctx; /* Scripting engine module context */ - engine *engine; /* engine callbacks that allows to interact with the engine */ + scriptingEngine *engine; /* engine callbacks that allows to interact with the engine */ client *c; /* Client that is used to run commands */ } engineInfo; @@ -79,10 +79,10 @@ typedef struct functionInfo { /* Hold information about the specific library. * Used on rdb.c so it must be declared here. */ struct functionLibInfo { - sds name; /* Library name */ - dict *functions; /* Functions dictionary */ - engine *engine; /* Pointer to the scripting engine */ - sds code; /* Library code */ + sds name; /* Library name */ + dict *functions; /* Functions dictionary */ + scriptingEngine *engine; /* Pointer to the scripting engine */ + sds code; /* Library code */ }; sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibCtx *lib_ctx, size_t timeout); @@ -99,7 +99,7 @@ void functionsLibCtxFree(functionsLibCtx *functions_lib_ctx); void functionsLibCtxClear(functionsLibCtx *lib_ctx, void(callback)(dict *)); void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async); -void functionsRemoveLibFromEngine(engine *engine); +void functionsRemoveLibFromEngine(scriptingEngine *engine); int luaEngineInitEngine(void); int functionsInit(void); diff --git a/src/module.c b/src/module.c index 1b6c31a401..7cc64f5369 100644 --- a/src/module.c +++ b/src/module.c @@ -62,7 +62,7 @@ #include "crc16_slottable.h" #include "valkeymodule.h" #include "io_threads.h" -#include "engine.h" +#include "scripting_engine.h" #include #include #include diff --git a/src/engine.c b/src/scripting_engine.c similarity index 87% rename from src/engine.c rename to src/scripting_engine.c index 7f71b11971..8980589153 100644 --- a/src/engine.c +++ b/src/scripting_engine.c @@ -1,23 +1,23 @@ -#include "engine.h" +#include "scripting_engine.h" #include "dict.h" #include "functions.h" #include "module.h" -typedef struct engineImpl { +typedef struct scriptingEngineImpl { /* Engine specific context */ engineCtx *ctx; /* Callback functions implemented by the scripting engine module */ engineMethods methods; -} engineImpl; +} scriptingEngineImpl; -typedef struct engine { +typedef struct scriptingEngine { sds name; /* Name of the engine */ ValkeyModule *module; /* the module that implements the scripting engine */ - engineImpl *impl; /* engine callbacks that allows to interact with the engine */ + scriptingEngineImpl *impl; /* engine callbacks that allows to interact with the engine */ client *c; /* Client that is used to run commands */ ValkeyModuleCtx *module_ctx; /* Cache of the module context object */ -} engine; +} scriptingEngine; typedef struct engineManger { @@ -91,8 +91,8 @@ int engineManagerRegisterEngine(const char *engine_name, return C_ERR; } - engineImpl *ei = zmalloc(sizeof(engineImpl)); - *ei = (engineImpl){ + scriptingEngineImpl *ei = zmalloc(sizeof(scriptingEngineImpl)); + *ei = (scriptingEngineImpl){ .ctx = engine_ctx, .methods = { .create_functions_library = engine_methods->create_functions_library, @@ -108,8 +108,8 @@ int engineManagerRegisterEngine(const char *engine_name, c->flag.script = 1; c->flag.fake = 1; - engine *e = zmalloc(sizeof(*ei)); - *e = (engine){ + scriptingEngine *e = zmalloc(sizeof(*ei)); + *e = (scriptingEngine){ .name = engine_name_sds, .module = engine_module, .impl = ei, @@ -139,7 +139,7 @@ int engineManagerUnregisterEngine(const char *engine_name) { return C_ERR; } - engine *e = dictGetVal(entry); + scriptingEngine *e = dictGetVal(entry); functionsRemoveLibFromEngine(e); @@ -161,7 +161,7 @@ int engineManagerUnregisterEngine(const char *engine_name) { * Lookups the engine with `engine_name` in the engine manager and returns it if * it exists. Otherwise returns `NULL`. */ -engine *engineManagerFind(sds engine_name) { +scriptingEngine *engineManagerFind(sds engine_name) { dictEntry *entry = dictFind(engineMgr.engines, engine_name); if (entry) { return dictGetVal(entry); @@ -169,15 +169,15 @@ engine *engineManagerFind(sds engine_name) { return NULL; } -sds engineGetName(engine *engine) { +sds engineGetName(scriptingEngine *engine) { return engine->name; } -client *engineGetClient(engine *engine) { +client *engineGetClient(scriptingEngine *engine) { return engine->c; } -ValkeyModule *engineGetModule(engine *engine) { +ValkeyModule *engineGetModule(scriptingEngine *engine) { return engine->module; } @@ -191,7 +191,7 @@ void engineManagerForEachEngine(engineIterCallback callback, void *context) { dictIterator *iter = dictGetIterator(engineMgr.engines); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { - engine *e = dictGetVal(entry); + scriptingEngine *e = dictGetVal(entry); if (!callback(e, context)) { break; } @@ -199,21 +199,21 @@ void engineManagerForEachEngine(engineIterCallback callback, void *context) { dictReleaseIterator(iter); } -static void engineSetupModuleCtx(engine *e, client *c) { +static void engineSetupModuleCtx(scriptingEngine *e, client *c) { if (e->module != NULL) { serverAssert(e->module_ctx != NULL); moduleScriptingEngineInitContext(e->module_ctx, e->module, c); } } -static void engineTeardownModuleCtx(engine *e) { +static void engineTeardownModuleCtx(scriptingEngine *e) { if (e->module != NULL) { serverAssert(e->module_ctx != NULL); moduleFreeContext(e->module_ctx); } } -compiledFunction **engineCallCreateFunctionsLibrary(engine *engine, +compiledFunction **engineCallCreateFunctionsLibrary(scriptingEngine *engine, const char *code, size_t timeout, size_t *out_num_compiled_functions, @@ -233,7 +233,7 @@ compiledFunction **engineCallCreateFunctionsLibrary(engine *engine, return functions; } -void engineCallFunction(engine *engine, +void engineCallFunction(scriptingEngine *engine, functionCtx *func_ctx, client *caller, void *compiled_function, @@ -256,7 +256,7 @@ void engineCallFunction(engine *engine, engineTeardownModuleCtx(engine); } -void engineCallFreeFunction(engine *engine, +void engineCallFreeFunction(scriptingEngine *engine, void *compiled_func) { engineSetupModuleCtx(engine, NULL); engine->impl->methods.free_function(engine->module_ctx, @@ -265,7 +265,7 @@ void engineCallFreeFunction(engine *engine, engineTeardownModuleCtx(engine); } -size_t engineCallGetFunctionMemoryOverhead(engine *engine, +size_t engineCallGetFunctionMemoryOverhead(scriptingEngine *engine, void *compiled_function) { engineSetupModuleCtx(engine, NULL); size_t mem = engine->impl->methods.get_function_memory_overhead( @@ -274,7 +274,7 @@ size_t engineCallGetFunctionMemoryOverhead(engine *engine, return mem; } -engineMemoryInfo engineCallGetMemoryInfo(engine *engine) { +engineMemoryInfo engineCallGetMemoryInfo(scriptingEngine *engine) { engineSetupModuleCtx(engine, NULL); engineMemoryInfo mem_info = engine->impl->methods.get_memory_info( engine->module_ctx, engine->impl->ctx); diff --git a/src/engine.h b/src/scripting_engine.h similarity index 71% rename from src/engine.h rename to src/scripting_engine.h index 892c2e32fe..0b875a72e0 100644 --- a/src/engine.h +++ b/src/scripting_engine.h @@ -4,7 +4,7 @@ #include "server.h" // Forward declaration of the engine structure. -typedef struct engine engine; +typedef struct scriptingEngine scriptingEngine; /* ValkeyModule type aliases for scripting engine structs and types. */ typedef ValkeyModuleScriptingEngineCtx engineCtx; @@ -17,14 +17,14 @@ typedef ValkeyModuleScriptingEngineMethods engineMethods; * Callback function used to iterate the list of engines registered in the * engine manager. * - * - `engine`: the engine in the current iteration. + * - `engine`: the scripting engine in the current iteration. * * - `context`: a generic pointer to a context object. * * If the callback function returns 0, then the iteration is stopped * immediately. */ -typedef int (*engineIterCallback)(engine *engine, void *context); +typedef int (*engineIterCallback)(scriptingEngine *engine, void *context); /* * Engine manager API functions. @@ -38,25 +38,25 @@ int engineManagerRegisterEngine(const char *engine_name, engineCtx *engine_ctx, engineMethods *engine_methods); int engineManagerUnregisterEngine(const char *engine_name); -engine *engineManagerFind(sds engine_name); +scriptingEngine *engineManagerFind(sds engine_name); void engineManagerForEachEngine(engineIterCallback callback, void *context); /* * Engine API functions. */ -sds engineGetName(engine *engine); -client *engineGetClient(engine *engine); -ValkeyModule *engineGetModule(engine *engine); +sds engineGetName(scriptingEngine *engine); +client *engineGetClient(scriptingEngine *engine); +ValkeyModule *engineGetModule(scriptingEngine *engine); /* * API to call engine callback functions. */ -compiledFunction **engineCallCreateFunctionsLibrary(engine *engine, +compiledFunction **engineCallCreateFunctionsLibrary(scriptingEngine *engine, const char *code, size_t timeout, size_t *out_num_compiled_functions, robj **err); -void engineCallFunction(engine *engine, +void engineCallFunction(scriptingEngine *engine, functionCtx *func_ctx, client *caller, void *compiled_function, @@ -64,8 +64,9 @@ void engineCallFunction(engine *engine, size_t nkeys, robj **args, size_t nargs); -void engineCallFreeFunction(engine *engine, void *compiled_func); -size_t engineCallGetFunctionMemoryOverhead(engine *engine, void *compiled_function); -engineMemoryInfo engineCallGetMemoryInfo(engine *engine); +void engineCallFreeFunction(scriptingEngine *engine, void *compiled_func); +size_t engineCallGetFunctionMemoryOverhead(scriptingEngine *engine, + void *compiled_function); +engineMemoryInfo engineCallGetMemoryInfo(scriptingEngine *engine); #endif /* _ENGINE_H_ */ diff --git a/src/server.c b/src/server.c index 3876eb5cfc..247ed3cca0 100644 --- a/src/server.c +++ b/src/server.c @@ -42,7 +42,7 @@ #include "fmtargs.h" #include "io_threads.h" #include "sds.h" -#include "engine.h" +#include "scripting_engine.h" #include #include From 8f4e35018eaa5e34360795d44c39136f1e272238 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Thu, 26 Dec 2024 15:12:38 +0000 Subject: [PATCH 04/12] fix spell checker error Signed-off-by: Ricardo Dias --- tests/unit/moduleapi/scriptingengine.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/moduleapi/scriptingengine.tcl b/tests/unit/moduleapi/scriptingengine.tcl index 17f9b035d1..3a37339ea8 100644 --- a/tests/unit/moduleapi/scriptingengine.tcl +++ b/tests/unit/moduleapi/scriptingengine.tcl @@ -52,7 +52,7 @@ start_server {tags {"modules"}} { } test {Load script with syntax error} { - assert_error {ERR Failed to parse instruction: 'RETRN'} {r function load replace "#!hello name=mylib3\nFUNCTION foo\nARGS 0\nRETRN"} + assert_error {ERR Failed to parse instruction: 'SEND'} {r function load replace "#!hello name=mylib3\nFUNCTION foo\nARGS 0\nSEND"} } test {Call scripting engine function: calling foo works} { From f4477c9fe4ba0f1248435c3e22b2fc78702883ec Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Fri, 3 Jan 2025 16:53:43 +0000 Subject: [PATCH 05/12] Addresses reviewers' comments * renames the prefix of scripting engine functions from `engine` to `scriptingEngine` * fixes scripting_engine header file top macro name * reorders source file list in cmake Signed-off-by: Ricardo Dias --- cmake/Modules/SourceFiles.cmake | 4 +- src/function_lua.c | 2 +- src/functions.c | 40 +++++++++---------- src/module.c | 4 +- src/scripting_engine.c | 67 +++++++++++++++---------------- src/scripting_engine.h | 70 +++++++++++++++++---------------- src/server.c | 2 +- 7 files changed, 96 insertions(+), 93 deletions(-) diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index bdaa1faad8..80c3f0c876 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -100,14 +100,14 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/script_lua.c ${CMAKE_SOURCE_DIR}/src/script.c ${CMAKE_SOURCE_DIR}/src/functions.c + ${CMAKE_SOURCE_DIR}/src/scripting_engine.c ${CMAKE_SOURCE_DIR}/src/function_lua.c ${CMAKE_SOURCE_DIR}/src/commands.c ${CMAKE_SOURCE_DIR}/src/strl.c ${CMAKE_SOURCE_DIR}/src/connection.c ${CMAKE_SOURCE_DIR}/src/unix.c ${CMAKE_SOURCE_DIR}/src/server.c - ${CMAKE_SOURCE_DIR}/src/logreqres.c - ${CMAKE_SOURCE_DIR}/src/scripting_engine.c) + ${CMAKE_SOURCE_DIR}/src/logreqres.c) # valkey-cli set(VALKEY_CLI_SRCS diff --git a/src/function_lua.c b/src/function_lua.c index d2c2d5bfd1..4aa0dba845 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -560,7 +560,7 @@ int luaEngineInitEngine(void) { .get_memory_info = luaEngineGetMemoryInfo, }; - return engineManagerRegisterEngine(LUA_ENGINE_NAME, + return scriptingEngineManagerRegister(LUA_ENGINE_NAME, NULL, lua_engine_ctx, &lua_engine_methods); diff --git a/src/functions.c b/src/functions.c index 9a8c996e58..d11f6b8376 100644 --- a/src/functions.c +++ b/src/functions.c @@ -107,7 +107,7 @@ static size_t functionMallocSize(functionInfo *fi) { return zmalloc_size(fi) + sdsAllocSize(fi->name) + (fi->desc ? sdsAllocSize(fi->desc) : 0) + - engineCallGetFunctionMemoryOverhead(fi->li->engine, fi->function); + scriptingEngineCallGetFunctionMemoryOverhead(fi->li->engine, fi->function); } static size_t libraryMallocSize(functionLibInfo *li) { @@ -130,7 +130,7 @@ static void engineFunctionDispose(void *obj) { sdsfree(fi->desc); } - engineCallFreeFunction(fi->li->engine, fi->function); + scriptingEngineCallFreeFunction(fi->li->engine, fi->function); zfree(fi); } @@ -206,7 +206,7 @@ functionsLibCtx *functionsLibCtxGetCurrent(void) { static int initializeFunctionsLibEngineStats(scriptingEngine *engine, void *context) { functionsLibCtx *lib_ctx = (functionsLibCtx *)context; functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); - dictAdd(lib_ctx->engines_stats, engineGetName(engine), stats); + dictAdd(lib_ctx->engines_stats, scriptingEngineGetName(engine), stats); return 1; } @@ -216,7 +216,7 @@ functionsLibCtx *functionsLibCtxCreate(void) { ret->libraries = dictCreate(&librariesDictType); ret->functions = dictCreate(&functionDictType); ret->engines_stats = dictCreate(&engineStatsDictType); - engineManagerForEachEngine(initializeFunctionsLibEngineStats, ret); + scriptingEngineManagerForEachEngine(initializeFunctionsLibEngineStats, ret); ret->cache_memory = 0; return ret; } @@ -303,7 +303,7 @@ static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) { lib_ctx->cache_memory -= libraryMallocSize(li); /* update stats */ - functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, engineGetName(li->engine)); + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, scriptingEngineGetName(li->engine)); serverAssert(stats); stats->n_lib--; stats->n_functions -= dictSize(li->functions); @@ -323,7 +323,7 @@ static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo *li) { lib_ctx->cache_memory += libraryMallocSize(li); /* update stats */ - functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, engineGetName(li->engine)); + functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, scriptingEngineGetName(li->engine)); serverAssert(stats); stats->n_lib++; stats->n_functions += dictSize(li->functions); @@ -412,10 +412,10 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l static int replyEngineStats(scriptingEngine *engine, void *context) { client *c = (client *)context; - addReplyBulkCString(c, engineGetName(engine)); + addReplyBulkCString(c, scriptingEngineGetName(engine)); addReplyMapLen(c, 2); functionsLibEngineStats *e_stats = - dictFetchValue(curr_functions_lib_ctx->engines_stats, engineGetName(engine)); + dictFetchValue(curr_functions_lib_ctx->engines_stats, scriptingEngineGetName(engine)); addReplyBulkCString(c, "libraries_count"); addReplyLongLong(c, e_stats ? e_stats->n_lib : 0); addReplyBulkCString(c, "functions_count"); @@ -465,8 +465,8 @@ void functionStatsCommand(client *c) { } addReplyBulkCString(c, "engines"); - addReplyMapLen(c, engineManagerGetNumEngines()); - engineManagerForEachEngine(replyEngineStats, c); + addReplyMapLen(c, scriptingEngineManagerGetNumEngines()); + scriptingEngineManagerForEachEngine(replyEngineStats, c); } static void functionListReplyFlags(client *c, functionInfo *fi) { @@ -542,7 +542,7 @@ void functionListCommand(client *c) { addReplyBulkCString(c, "library_name"); addReplyBulkCBuffer(c, li->name, sdslen(li->name)); addReplyBulkCString(c, "engine"); - sds engine_name = engineGetName(li->engine); + sds engine_name = scriptingEngineGetName(li->engine); addReplyBulkCBuffer(c, engine_name, sdslen(engine_name)); addReplyBulkCString(c, "functions"); @@ -640,9 +640,9 @@ static void fcallCommandGeneric(client *c, int ro) { } scriptRunCtx run_ctx; - if (scriptPrepareForRun(&run_ctx, engineGetClient(engine), c, fi->name, fi->f_flags, ro) != C_OK) return; + if (scriptPrepareForRun(&run_ctx, scriptingEngineGetClient(engine), c, fi->name, fi->f_flags, ro) != C_OK) return; - engineCallFunction(engine, + scriptingEngineCallFunction(engine, &run_ctx, run_ctx.original_client, fi->function, @@ -960,7 +960,7 @@ static void freeCompiledFunctions(scriptingEngine *engine, decrRefCount(func->desc); } if (i >= free_function_from_idx) { - engineCallFreeFunction(engine, func->function); + scriptingEngineCallFreeFunction(engine, func->function); } zfree(func); } @@ -987,7 +987,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC goto error; } - scriptingEngine *engine = engineManagerFind(md.engine); + scriptingEngine *engine = scriptingEngineManagerFind(md.engine); if (!engine) { *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); goto error; @@ -1010,7 +1010,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC size_t num_compiled_functions = 0; robj *compile_error = NULL; compiledFunction **compiled_functions = - engineCallCreateFunctionsLibrary(engine, + scriptingEngineCallCreateFunctionsLibrary(engine, md.code, timeout, &num_compiled_functions, @@ -1126,7 +1126,7 @@ void functionLoadCommand(client *c) { static int getEngineUsedMemory(scriptingEngine *engine, void *context) { size_t *engines_memory = (size_t *)context; - engineMemoryInfo mem_info = engineCallGetMemoryInfo(engine); + engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine); *engines_memory += mem_info.used_memory; return 1; } @@ -1134,17 +1134,17 @@ static int getEngineUsedMemory(scriptingEngine *engine, void *context) { /* Return memory usage of all the engines combine */ unsigned long functionsMemory(void) { size_t engines_memory = 0; - engineManagerForEachEngine(getEngineUsedMemory, &engines_memory); + scriptingEngineManagerForEachEngine(getEngineUsedMemory, &engines_memory); return engines_memory; } /* Return memory overhead of all the engines combine */ unsigned long functionsMemoryOverhead(void) { - size_t memory_overhead = engineManagerGetMemoryUsage(); + size_t memory_overhead = scriptingEngineManagerGetMemoryUsage(); memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); memory_overhead += sizeof(functionsLibCtx); memory_overhead += curr_functions_lib_ctx->cache_memory; - memory_overhead += engineManagerGetCacheMemory(); + memory_overhead += scriptingEngineManagerGetCacheMemory(); return memory_overhead; } diff --git a/src/module.c b/src/module.c index 7cc64f5369..881f00aebf 100644 --- a/src/module.c +++ b/src/module.c @@ -13125,7 +13125,7 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, return VALKEYMODULE_ERR; } - if (engineManagerRegisterEngine(engine_name, + if (scriptingEngineManagerRegister(engine_name, module_ctx->module, engine_ctx, engine_methods) != C_OK) { @@ -13144,7 +13144,7 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, */ int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name) { UNUSED(ctx); - if (engineManagerUnregisterEngine(engine_name) != C_OK) { + if (scriptingEngineManagerUnregister(engine_name) != C_OK) { return VALKEYMODULE_ERR; } return VALKEYMODULE_OK; diff --git a/src/scripting_engine.c b/src/scripting_engine.c index 8980589153..19255cca5c 100644 --- a/src/scripting_engine.c +++ b/src/scripting_engine.c @@ -50,20 +50,20 @@ dictType engineDictType = { * * Returns C_ERR if some error occurs during the initialization. */ -int engineManagerInit(void) { +int scriptingEngineManagerInit(void) { engineMgr.engines = dictCreate(&engineDictType); return C_OK; } -size_t engineManagerGetCacheMemory(void) { +size_t scriptingEngineManagerGetCacheMemory(void) { return engineMgr.engine_cache_memory; } -size_t engineManagerGetNumEngines(void) { +size_t scriptingEngineManagerGetNumEngines(void) { return dictSize(engineMgr.engines); } -size_t engineManagerGetMemoryUsage(void) { +size_t scriptingEngineManagerGetMemoryUsage(void) { return dictMemUsage(engineMgr.engines) + sizeof(engineMgr); } @@ -79,10 +79,10 @@ size_t engineManagerGetMemoryUsage(void) { * * Returns C_ERR in case of an error during registration. */ -int engineManagerRegisterEngine(const char *engine_name, - ValkeyModule *engine_module, - engineCtx *engine_ctx, - engineMethods *engine_methods) { +int scriptingEngineManagerRegister(const char *engine_name, + ValkeyModule *engine_module, + engineCtx *engine_ctx, + engineMethods *engine_methods) { sds engine_name_sds = sdsnew(engine_name); if (dictFetchValue(engineMgr.engines, engine_name_sds)) { @@ -119,7 +119,7 @@ int engineManagerRegisterEngine(const char *engine_name, dictAdd(engineMgr.engines, engine_name_sds, e); - engineMemoryInfo mem_info = engineCallGetMemoryInfo(e); + engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e); engineMgr.engine_cache_memory += zmalloc_size(e) + sdsAllocSize(e->name) + zmalloc_size(ei) + @@ -132,7 +132,7 @@ int engineManagerRegisterEngine(const char *engine_name, * * - `engine_name`: name of the engine to remove */ -int engineManagerUnregisterEngine(const char *engine_name) { +int scriptingEngineManagerUnregister(const char *engine_name) { dictEntry *entry = dictUnlink(engineMgr.engines, engine_name); if (entry == NULL) { serverLog(LL_WARNING, "There's no engine registered with name %s", engine_name); @@ -161,7 +161,7 @@ int engineManagerUnregisterEngine(const char *engine_name) { * Lookups the engine with `engine_name` in the engine manager and returns it if * it exists. Otherwise returns `NULL`. */ -scriptingEngine *engineManagerFind(sds engine_name) { +scriptingEngine *scriptingEngineManagerFind(sds engine_name) { dictEntry *entry = dictFind(engineMgr.engines, engine_name); if (entry) { return dictGetVal(entry); @@ -169,15 +169,15 @@ scriptingEngine *engineManagerFind(sds engine_name) { return NULL; } -sds engineGetName(scriptingEngine *engine) { +sds scriptingEngineGetName(scriptingEngine *engine) { return engine->name; } -client *engineGetClient(scriptingEngine *engine) { +client *scriptingEngineGetClient(scriptingEngine *engine) { return engine->c; } -ValkeyModule *engineGetModule(scriptingEngine *engine) { +ValkeyModule *scriptingEngineGetModule(scriptingEngine *engine) { return engine->module; } @@ -187,7 +187,8 @@ ValkeyModule *engineGetModule(scriptingEngine *engine) { * * The `context` pointer is also passed in each callback call. */ -void engineManagerForEachEngine(engineIterCallback callback, void *context) { +void scriptingEngineManagerForEachEngine(engineIterCallback callback, + void *context) { dictIterator *iter = dictGetIterator(engineMgr.engines); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { @@ -213,11 +214,11 @@ static void engineTeardownModuleCtx(scriptingEngine *e) { } } -compiledFunction **engineCallCreateFunctionsLibrary(scriptingEngine *engine, - const char *code, - size_t timeout, - size_t *out_num_compiled_functions, - robj **err) { +compiledFunction **scriptingEngineCallCreateFunctionsLibrary(scriptingEngine *engine, + const char *code, + size_t timeout, + size_t *out_num_compiled_functions, + robj **err) { engineSetupModuleCtx(engine, NULL); compiledFunction **functions = engine->impl->methods.create_functions_library( @@ -233,14 +234,14 @@ compiledFunction **engineCallCreateFunctionsLibrary(scriptingEngine *engine, return functions; } -void engineCallFunction(scriptingEngine *engine, - functionCtx *func_ctx, - client *caller, - void *compiled_function, - robj **keys, - size_t nkeys, - robj **args, - size_t nargs) { +void scriptingEngineCallFunction(scriptingEngine *engine, + functionCtx *func_ctx, + client *caller, + void *compiled_function, + robj **keys, + size_t nkeys, + robj **args, + size_t nargs) { engineSetupModuleCtx(engine, caller); engine->impl->methods.call_function( @@ -256,8 +257,8 @@ void engineCallFunction(scriptingEngine *engine, engineTeardownModuleCtx(engine); } -void engineCallFreeFunction(scriptingEngine *engine, - void *compiled_func) { +void scriptingEngineCallFreeFunction(scriptingEngine *engine, + void *compiled_func) { engineSetupModuleCtx(engine, NULL); engine->impl->methods.free_function(engine->module_ctx, engine->impl->ctx, @@ -265,8 +266,8 @@ void engineCallFreeFunction(scriptingEngine *engine, engineTeardownModuleCtx(engine); } -size_t engineCallGetFunctionMemoryOverhead(scriptingEngine *engine, - void *compiled_function) { +size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine, + void *compiled_function) { engineSetupModuleCtx(engine, NULL); size_t mem = engine->impl->methods.get_function_memory_overhead( engine->module_ctx, compiled_function); @@ -274,7 +275,7 @@ size_t engineCallGetFunctionMemoryOverhead(scriptingEngine *engine, return mem; } -engineMemoryInfo engineCallGetMemoryInfo(scriptingEngine *engine) { +engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine) { engineSetupModuleCtx(engine, NULL); engineMemoryInfo mem_info = engine->impl->methods.get_memory_info( engine->module_ctx, engine->impl->ctx); diff --git a/src/scripting_engine.h b/src/scripting_engine.h index 0b875a72e0..d8e19daff6 100644 --- a/src/scripting_engine.h +++ b/src/scripting_engine.h @@ -1,5 +1,5 @@ -#ifndef _ENGINE_H_ -#define _ENGINE_H_ +#ifndef _SCRIPTING_ENGINE_H_ +#define _SCRIPTING_ENGINE_H_ #include "server.h" @@ -29,44 +29,46 @@ typedef int (*engineIterCallback)(scriptingEngine *engine, void *context); /* * Engine manager API functions. */ -int engineManagerInit(void); -size_t engineManagerGetCacheMemory(void); -size_t engineManagerGetNumEngines(void); -size_t engineManagerGetMemoryUsage(void); -int engineManagerRegisterEngine(const char *engine_name, - ValkeyModule *engine_module, - engineCtx *engine_ctx, - engineMethods *engine_methods); -int engineManagerUnregisterEngine(const char *engine_name); -scriptingEngine *engineManagerFind(sds engine_name); -void engineManagerForEachEngine(engineIterCallback callback, void *context); +int scriptingEngineManagerInit(void); +size_t scriptingEngineManagerGetCacheMemory(void); +size_t scriptingEngineManagerGetNumEngines(void); +size_t scriptingEngineManagerGetMemoryUsage(void); +int scriptingEngineManagerRegister(const char *engine_name, + ValkeyModule *engine_module, + engineCtx *engine_ctx, + engineMethods *engine_methods); +int scriptingEngineManagerUnregister(const char *engine_name); +scriptingEngine *scriptingEngineManagerFind(sds engine_name); +void scriptingEngineManagerForEachEngine(engineIterCallback callback, + void *context); /* * Engine API functions. */ -sds engineGetName(scriptingEngine *engine); -client *engineGetClient(scriptingEngine *engine); -ValkeyModule *engineGetModule(scriptingEngine *engine); +sds scriptingEngineGetName(scriptingEngine *engine); +client *scriptingEngineGetClient(scriptingEngine *engine); +ValkeyModule *scriptingEngineGetModule(scriptingEngine *engine); /* * API to call engine callback functions. */ -compiledFunction **engineCallCreateFunctionsLibrary(scriptingEngine *engine, - const char *code, - size_t timeout, - size_t *out_num_compiled_functions, - robj **err); -void engineCallFunction(scriptingEngine *engine, - functionCtx *func_ctx, - client *caller, - void *compiled_function, - robj **keys, - size_t nkeys, - robj **args, - size_t nargs); -void engineCallFreeFunction(scriptingEngine *engine, void *compiled_func); -size_t engineCallGetFunctionMemoryOverhead(scriptingEngine *engine, - void *compiled_function); -engineMemoryInfo engineCallGetMemoryInfo(scriptingEngine *engine); +compiledFunction **scriptingEngineCallCreateFunctionsLibrary(scriptingEngine *engine, + const char *code, + size_t timeout, + size_t *out_num_compiled_functions, + robj **err); +void scriptingEngineCallFunction(scriptingEngine *engine, + functionCtx *func_ctx, + client *caller, + void *compiled_function, + robj **keys, + size_t nkeys, + robj **args, + size_t nargs); +void scriptingEngineCallFreeFunction(scriptingEngine *engine, + void *compiled_func); +size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine, + void *compiled_function); +engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine); -#endif /* _ENGINE_H_ */ +#endif /* _SCRIPTING_ENGINE_H_ */ diff --git a/src/server.c b/src/server.c index 247ed3cca0..6da0cd6870 100644 --- a/src/server.c +++ b/src/server.c @@ -2874,7 +2874,7 @@ void initServer(void) { server.maxmemory_policy = MAXMEMORY_NO_EVICTION; } - if (engineManagerInit() == C_ERR) { + if (scriptingEngineManagerInit() == C_ERR) { serverPanic("Scripting engine manager initialization failed, check the server logs."); exit(1); } From 83f9820be0b2c09f16e651d41ee5a9fa64644945 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Fri, 3 Jan 2025 17:00:19 +0000 Subject: [PATCH 06/12] Fix merge conflicts Signed-off-by: Ricardo Dias --- src/scripting_engine.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/scripting_engine.h b/src/scripting_engine.h index d8e19daff6..1cffe11584 100644 --- a/src/scripting_engine.h +++ b/src/scripting_engine.h @@ -7,6 +7,7 @@ typedef struct scriptingEngine scriptingEngine; /* ValkeyModule type aliases for scripting engine structs and types. */ +typedef struct ValkeyModule ValkeyModule; typedef ValkeyModuleScriptingEngineCtx engineCtx; typedef ValkeyModuleScriptingEngineFunctionCtx functionCtx; typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction; From 212f5de654f23a83524c169225ebc26b80d0f505 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Fri, 3 Jan 2025 17:09:45 +0000 Subject: [PATCH 07/12] Fix clang-format errors Signed-off-by: Ricardo Dias --- src/function_lua.c | 6 +++--- src/functions.c | 22 +++++++++++----------- src/module.c | 6 +++--- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/function_lua.c b/src/function_lua.c index 4aa0dba845..59c16eae54 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -561,7 +561,7 @@ int luaEngineInitEngine(void) { }; return scriptingEngineManagerRegister(LUA_ENGINE_NAME, - NULL, - lua_engine_ctx, - &lua_engine_methods); + NULL, + lua_engine_ctx, + &lua_engine_methods); } diff --git a/src/functions.c b/src/functions.c index d11f6b8376..924b99006a 100644 --- a/src/functions.c +++ b/src/functions.c @@ -643,13 +643,13 @@ static void fcallCommandGeneric(client *c, int ro) { if (scriptPrepareForRun(&run_ctx, scriptingEngineGetClient(engine), c, fi->name, fi->f_flags, ro) != C_OK) return; scriptingEngineCallFunction(engine, - &run_ctx, - run_ctx.original_client, - fi->function, - c->argv + 3, - numkeys, - c->argv + 3 + numkeys, - c->argc - 3 - numkeys); + &run_ctx, + run_ctx.original_client, + fi->function, + c->argv + 3, + numkeys, + c->argv + 3 + numkeys, + c->argc - 3 - numkeys); scriptResetRun(&run_ctx); } @@ -1011,10 +1011,10 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC robj *compile_error = NULL; compiledFunction **compiled_functions = scriptingEngineCallCreateFunctionsLibrary(engine, - md.code, - timeout, - &num_compiled_functions, - &compile_error); + md.code, + timeout, + &num_compiled_functions, + &compile_error); if (compiled_functions == NULL) { serverAssert(num_compiled_functions == 0); serverAssert(compile_error != NULL); diff --git a/src/module.c b/src/module.c index 2fa9f53c8b..61e41e3f28 100644 --- a/src/module.c +++ b/src/module.c @@ -13163,9 +13163,9 @@ int VM_RegisterScriptingEngine(ValkeyModuleCtx *module_ctx, } if (scriptingEngineManagerRegister(engine_name, - module_ctx->module, - engine_ctx, - engine_methods) != C_OK) { + module_ctx->module, + engine_ctx, + engine_methods) != C_OK) { return VALKEYMODULE_ERR; } From 9946f177bd38b0eeb0e5c14a927ceaf52250f749 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 6 Jan 2025 12:26:14 +0000 Subject: [PATCH 08/12] Addresses some reviewers' suggestions * Changed return type of `engineIterCallback` to `void` * removed `exit(1)` call after `serverPanic` Signed-off-by: Ricardo Dias --- src/functions.c | 10 ++++------ src/scripting_engine.c | 4 +--- src/scripting_engine.h | 4 +--- src/server.c | 2 -- 4 files changed, 6 insertions(+), 14 deletions(-) diff --git a/src/functions.c b/src/functions.c index 924b99006a..7f6ac8f751 100644 --- a/src/functions.c +++ b/src/functions.c @@ -203,11 +203,11 @@ functionsLibCtx *functionsLibCtxGetCurrent(void) { return curr_functions_lib_ctx; } -static int initializeFunctionsLibEngineStats(scriptingEngine *engine, void *context) { +static void initializeFunctionsLibEngineStats(scriptingEngine *engine, + void *context) { functionsLibCtx *lib_ctx = (functionsLibCtx *)context; functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); dictAdd(lib_ctx->engines_stats, scriptingEngineGetName(engine), stats); - return 1; } /* Create a new functions ctx */ @@ -410,7 +410,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l return ret; } -static int replyEngineStats(scriptingEngine *engine, void *context) { +static void replyEngineStats(scriptingEngine *engine, void *context) { client *c = (client *)context; addReplyBulkCString(c, scriptingEngineGetName(engine)); addReplyMapLen(c, 2); @@ -420,7 +420,6 @@ static int replyEngineStats(scriptingEngine *engine, void *context) { addReplyLongLong(c, e_stats ? e_stats->n_lib : 0); addReplyBulkCString(c, "functions_count"); addReplyLongLong(c, e_stats ? e_stats->n_functions : 0); - return 1; } void functionsRemoveLibFromEngine(scriptingEngine *engine) { @@ -1124,11 +1123,10 @@ void functionLoadCommand(client *c) { addReplyBulkSds(c, library_name); } -static int getEngineUsedMemory(scriptingEngine *engine, void *context) { +static void getEngineUsedMemory(scriptingEngine *engine, void *context) { size_t *engines_memory = (size_t *)context; engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine); *engines_memory += mem_info.used_memory; - return 1; } /* Return memory usage of all the engines combine */ diff --git a/src/scripting_engine.c b/src/scripting_engine.c index 19255cca5c..f171d5e919 100644 --- a/src/scripting_engine.c +++ b/src/scripting_engine.c @@ -193,9 +193,7 @@ void scriptingEngineManagerForEachEngine(engineIterCallback callback, dictEntry *entry = NULL; while ((entry = dictNext(iter))) { scriptingEngine *e = dictGetVal(entry); - if (!callback(e, context)) { - break; - } + callback(e, context); } dictReleaseIterator(iter); } diff --git a/src/scripting_engine.h b/src/scripting_engine.h index 1cffe11584..837b1d74d0 100644 --- a/src/scripting_engine.h +++ b/src/scripting_engine.h @@ -22,10 +22,8 @@ typedef ValkeyModuleScriptingEngineMethods engineMethods; * * - `context`: a generic pointer to a context object. * - * If the callback function returns 0, then the iteration is stopped - * immediately. */ -typedef int (*engineIterCallback)(scriptingEngine *engine, void *context); +typedef void (*engineIterCallback)(scriptingEngine *engine, void *context); /* * Engine manager API functions. diff --git a/src/server.c b/src/server.c index cefb2f992d..20629f91e3 100644 --- a/src/server.c +++ b/src/server.c @@ -2878,7 +2878,6 @@ void initServer(void) { if (scriptingEngineManagerInit() == C_ERR) { serverPanic("Scripting engine manager initialization failed, check the server logs."); - exit(1); } /* Initialize the LUA scripting engine. */ @@ -2886,7 +2885,6 @@ void initServer(void) { /* Initialize the functions engine based off of LUA initialization. */ if (functionsInit() == C_ERR) { serverPanic("Functions initialization failed, check the server logs."); - exit(1); } slowlogInit(); latencyMonitorInit(); From f76c7fe676d3ddb355cc74b34aac90c1d75b09b9 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 6 Jan 2025 15:55:51 +0000 Subject: [PATCH 09/12] Fix memory leak in dummy `hello` scripting engine Signed-off-by: Ricardo Dias --- src/functions.c | 2 ++ tests/modules/helloscripting.c | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/src/functions.c b/src/functions.c index 7f6ac8f751..6a4b200a6f 100644 --- a/src/functions.c +++ b/src/functions.c @@ -1022,6 +1022,8 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC goto error; } + serverAssert(compile_error == NULL); + for (size_t i = 0; i < num_compiled_functions; i++) { compiledFunction *func = compiled_functions[i]; int ret = functionLibCreateFunction(func->name, diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index c78561812a..f6040dbb93 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -226,6 +226,7 @@ static uint32_t executeHelloLangFunction(HelloFunc *func, break; } case RETURN: { + ValkeyModule_Assert(sp > 0); uint32_t val = stack[--sp]; ValkeyModule_Assert(sp == 0); return val; @@ -303,6 +304,13 @@ static ValkeyModuleScriptingEngineCompiledFunction **createHelloLangEngine(Valke int ret = helloLangParseCode(code, ctx->program, err); if (ret < 0) { + for (uint32_t i = 0; i < ctx->program->num_functions; i++) { + HelloFunc *func = ctx->program->functions[i]; + ValkeyModule_Free(func->name); + ValkeyModule_Free(func); + ctx->program->functions[i] = NULL; + } + ctx->program->num_functions = 0; return NULL; } From 86461343b0626d70a66d08a3d2bb3c98b2c9ee94 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 13 Jan 2025 13:25:38 +0000 Subject: [PATCH 10/12] Addresses reviewers comments * Inlines `scriptingEngineImpl` struct inside `scriptingEngine` struct * Improves log message when engine is already registered Signed-off-by: Ricardo Dias --- src/scripting_engine.c | 51 +++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 28 deletions(-) diff --git a/src/scripting_engine.c b/src/scripting_engine.c index f171d5e919..bb1f552016 100644 --- a/src/scripting_engine.c +++ b/src/scripting_engine.c @@ -14,7 +14,7 @@ typedef struct scriptingEngineImpl { typedef struct scriptingEngine { sds name; /* Name of the engine */ ValkeyModule *module; /* the module that implements the scripting engine */ - scriptingEngineImpl *impl; /* engine callbacks that allows to interact with the engine */ + scriptingEngineImpl impl; /* engine context and callbacks to interact with the engine */ client *c; /* Client that is used to run commands */ ValkeyModuleCtx *module_ctx; /* Cache of the module context object */ } scriptingEngine; @@ -86,33 +86,30 @@ int scriptingEngineManagerRegister(const char *engine_name, sds engine_name_sds = sdsnew(engine_name); if (dictFetchValue(engineMgr.engines, engine_name_sds)) { - serverLog(LL_WARNING, "Same engine was registered twice"); + serverLog(LL_WARNING, "Scripting engine '%s' is already registered in the server", engine_name_sds); sdsfree(engine_name_sds); return C_ERR; } - scriptingEngineImpl *ei = zmalloc(sizeof(scriptingEngineImpl)); - *ei = (scriptingEngineImpl){ - .ctx = engine_ctx, - .methods = { - .create_functions_library = engine_methods->create_functions_library, - .call_function = engine_methods->call_function, - .free_function = engine_methods->free_function, - .get_function_memory_overhead = engine_methods->get_function_memory_overhead, - .get_memory_info = engine_methods->get_memory_info, - }, - }; - client *c = createClient(NULL); c->flag.deny_blocking = 1; c->flag.script = 1; c->flag.fake = 1; - scriptingEngine *e = zmalloc(sizeof(*ei)); + scriptingEngine *e = zmalloc(sizeof(*e)); *e = (scriptingEngine){ .name = engine_name_sds, .module = engine_module, - .impl = ei, + .impl = { + .ctx = engine_ctx, + .methods = { + .create_functions_library = engine_methods->create_functions_library, + .call_function = engine_methods->call_function, + .free_function = engine_methods->free_function, + .get_function_memory_overhead = engine_methods->get_function_memory_overhead, + .get_memory_info = engine_methods->get_memory_info, + }, + }, .c = c, .module_ctx = engine_module ? moduleAllocateContext() : NULL, }; @@ -122,7 +119,6 @@ int scriptingEngineManagerRegister(const char *engine_name, engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e); engineMgr.engine_cache_memory += zmalloc_size(e) + sdsAllocSize(e->name) + - zmalloc_size(ei) + mem_info.engine_memory_overhead; return C_OK; @@ -143,7 +139,6 @@ int scriptingEngineManagerUnregister(const char *engine_name) { functionsRemoveLibFromEngine(e); - zfree(e->impl); sdsfree(e->name); freeClient(e->c); if (e->module_ctx) { @@ -219,9 +214,9 @@ compiledFunction **scriptingEngineCallCreateFunctionsLibrary(scriptingEngine *en robj **err) { engineSetupModuleCtx(engine, NULL); - compiledFunction **functions = engine->impl->methods.create_functions_library( + compiledFunction **functions = engine->impl.methods.create_functions_library( engine->module_ctx, - engine->impl->ctx, + engine->impl.ctx, code, timeout, out_num_compiled_functions, @@ -242,9 +237,9 @@ void scriptingEngineCallFunction(scriptingEngine *engine, size_t nargs) { engineSetupModuleCtx(engine, caller); - engine->impl->methods.call_function( + engine->impl.methods.call_function( engine->module_ctx, - engine->impl->ctx, + engine->impl.ctx, func_ctx, compiled_function, keys, @@ -258,16 +253,16 @@ void scriptingEngineCallFunction(scriptingEngine *engine, void scriptingEngineCallFreeFunction(scriptingEngine *engine, void *compiled_func) { engineSetupModuleCtx(engine, NULL); - engine->impl->methods.free_function(engine->module_ctx, - engine->impl->ctx, - compiled_func); + engine->impl.methods.free_function(engine->module_ctx, + engine->impl.ctx, + compiled_func); engineTeardownModuleCtx(engine); } size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine, void *compiled_function) { engineSetupModuleCtx(engine, NULL); - size_t mem = engine->impl->methods.get_function_memory_overhead( + size_t mem = engine->impl.methods.get_function_memory_overhead( engine->module_ctx, compiled_function); engineTeardownModuleCtx(engine); return mem; @@ -275,8 +270,8 @@ size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine, engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine) { engineSetupModuleCtx(engine, NULL); - engineMemoryInfo mem_info = engine->impl->methods.get_memory_info( - engine->module_ctx, engine->impl->ctx); + engineMemoryInfo mem_info = engine->impl.methods.get_memory_info( + engine->module_ctx, engine->impl.ctx); engineTeardownModuleCtx(engine); return mem_info; } From 12c36671515f013a29de9db25ab046436f68126b Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 13 Jan 2025 13:42:21 +0000 Subject: [PATCH 11/12] Renames `engineManager::engine_cache_memory` to `engineManager::total_memory_overhead` It also fixes a bug, where we would never decrement the memory overhead of scripting engines that were unregistered. Signed-off-by: Ricardo Dias --- src/functions.c | 2 +- src/scripting_engine.c | 23 +++++++++++++++-------- src/scripting_engine.h | 2 +- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/functions.c b/src/functions.c index 6a4b200a6f..14d8c5296e 100644 --- a/src/functions.c +++ b/src/functions.c @@ -1144,7 +1144,7 @@ unsigned long functionsMemoryOverhead(void) { memory_overhead += dictMemUsage(curr_functions_lib_ctx->functions); memory_overhead += sizeof(functionsLibCtx); memory_overhead += curr_functions_lib_ctx->cache_memory; - memory_overhead += scriptingEngineManagerGetCacheMemory(); + memory_overhead += scriptingEngineManagerGetTotalMemoryOverhead(); return memory_overhead; } diff --git a/src/scripting_engine.c b/src/scripting_engine.c index bb1f552016..9488f5ef93 100644 --- a/src/scripting_engine.c +++ b/src/scripting_engine.c @@ -21,14 +21,14 @@ typedef struct scriptingEngine { typedef struct engineManger { - dict *engines; /* engines dictionary */ - size_t engine_cache_memory; + dict *engines; /* engines dictionary */ + size_t total_memory_overhead; /* the sum of the memory overhead of all registered scripting engines */ } engineManager; static engineManager engineMgr = { .engines = NULL, - .engine_cache_memory = 0, + .total_memory_overhead = 0, }; static uint64_t dictStrCaseHash(const void *key) { @@ -55,8 +55,10 @@ int scriptingEngineManagerInit(void) { return C_OK; } -size_t scriptingEngineManagerGetCacheMemory(void) { - return engineMgr.engine_cache_memory; +/* Returns the amount of memory overhead consumed by all registered scripting + engines. */ +size_t scriptingEngineManagerGetTotalMemoryOverhead(void) { + return engineMgr.total_memory_overhead; } size_t scriptingEngineManagerGetNumEngines(void) { @@ -117,9 +119,9 @@ int scriptingEngineManagerRegister(const char *engine_name, dictAdd(engineMgr.engines, engine_name_sds, e); engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e); - engineMgr.engine_cache_memory += zmalloc_size(e) + - sdsAllocSize(e->name) + - mem_info.engine_memory_overhead; + engineMgr.total_memory_overhead += zmalloc_size(e) + + sdsAllocSize(e->name) + + mem_info.engine_memory_overhead; return C_OK; } @@ -139,6 +141,11 @@ int scriptingEngineManagerUnregister(const char *engine_name) { functionsRemoveLibFromEngine(e); + engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e); + engineMgr.total_memory_overhead -= zmalloc_size(e) + + sdsAllocSize(e->name) + + mem_info.engine_memory_overhead; + sdsfree(e->name); freeClient(e->c); if (e->module_ctx) { diff --git a/src/scripting_engine.h b/src/scripting_engine.h index 837b1d74d0..0ed49e6f88 100644 --- a/src/scripting_engine.h +++ b/src/scripting_engine.h @@ -29,7 +29,7 @@ typedef void (*engineIterCallback)(scriptingEngine *engine, void *context); * Engine manager API functions. */ int scriptingEngineManagerInit(void); -size_t scriptingEngineManagerGetCacheMemory(void); +size_t scriptingEngineManagerGetTotalMemoryOverhead(void); size_t scriptingEngineManagerGetNumEngines(void); size_t scriptingEngineManagerGetMemoryUsage(void); int scriptingEngineManagerRegister(const char *engine_name, From 2447b8cb7e4055f7ce6988060f1a3c7740655f27 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Mon, 13 Jan 2025 14:20:22 +0000 Subject: [PATCH 12/12] Fix sanitizer error in helloscripting.c module Signed-off-by: Ricardo Dias --- tests/modules/helloscripting.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index f6040dbb93..5a34e89f68 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -72,6 +72,7 @@ typedef struct HelloFunc { char *name; HelloInst instructions[256]; uint32_t num_instructions; + uint32_t index; } HelloFunc; /* @@ -172,6 +173,7 @@ static int helloLangParseCode(const char *code, ValkeyModule_Assert(currentFunc == NULL); currentFunc = ValkeyModule_Alloc(sizeof(HelloFunc)); memset(currentFunc, 0, sizeof(HelloFunc)); + currentFunc->index = program->num_functions; program->functions[program->num_functions++] = currentFunc; helloLangParseFunction(currentFunc); break; @@ -252,8 +254,10 @@ static ValkeyModuleScriptingEngineMemoryInfo engineGetMemoryInfo(ValkeyModuleCtx for (uint32_t i = 0; i < ctx->program->num_functions; i++) { HelloFunc *func = ctx->program->functions[i]; - mem_info.used_memory += ValkeyModule_MallocSize(func); - mem_info.used_memory += ValkeyModule_MallocSize(func->name); + if (func != NULL) { + mem_info.used_memory += ValkeyModule_MallocSize(func); + mem_info.used_memory += ValkeyModule_MallocSize(func->name); + } } } @@ -277,7 +281,9 @@ static void engineFreeFunction(ValkeyModuleCtx *module_ctx, void *compiled_function) { VALKEYMODULE_NOT_USED(module_ctx); VALKEYMODULE_NOT_USED(engine_ctx); + HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx; HelloFunc *func = (HelloFunc *)compiled_function; + ctx->program->functions[func->index] = NULL; ValkeyModule_Free(func->name); func->name = NULL; ValkeyModule_Free(func);