From 331df7f6b589db78c4f34826d77d413db1b083dc Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Fri, 8 Nov 2024 10:36:43 +0000 Subject: [PATCH 1/6] Adds support for scripting engines as Valkey modules This commit extends the module API to support the addition of different scripting engines to run user defined functions. The scripting engine can be implemented as a Valkey module, and can be dynamically loaded with the `loadmodule` config directive, or with the `MODULE LOAD` command. The current module API support, only allows to load scripting engines to run functions using `FCALL` command. In a follow up PR, we will move the Lua scripting engine implmentation into its own module. Signed-off-by: Ricardo Dias --- src/aof.c | 2 +- src/function_lua.c | 39 ++++--- src/functions.c | 201 ++++++++++++++++++++++--------------- src/functions.h | 62 ++++++++---- src/module.c | 172 +++++++++++++++++++++++++++++++ src/modules/CMakeLists.txt | 1 + src/rdb.c | 13 +-- src/server.h | 1 + src/strl.c | 21 ++++ src/util.h | 1 + src/valkeymodule.h | 36 +++++++ 11 files changed, 423 insertions(+), 126 deletions(-) diff --git a/src/aof.c b/src/aof.c index e0ca6fbb61..9b035b4d64 100644 --- a/src/aof.c +++ b/src/aof.c @@ -2175,7 +2175,7 @@ static int rewriteFunctions(rio *aof) { dictIterator *iter = dictGetIterator(functions); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { - functionLibInfo *li = dictGetVal(entry); + ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry); if (rioWrite(aof, "*3\r\n", 4) == 0) goto werr; char function_load[] = "$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n"; if (rioWrite(aof, function_load, sizeof(function_load) - 1) == 0) goto werr; diff --git a/src/function_lua.c b/src/function_lua.c index fa9983bf7e..92f5dad804 100644 --- a/src/function_lua.c +++ b/src/function_lua.c @@ -64,7 +64,7 @@ typedef struct luaFunctionCtx { } luaFunctionCtx; typedef struct loadCtx { - functionLibInfo *li; + ValkeyModuleScriptingEngineFunctionLibrary *li; monotime start_time; size_t timeout; } loadCtx; @@ -100,7 +100,7 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) { * * Return NULL on compilation error and set the error to the err variable */ -static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size_t timeout, sds *err) { +static int luaEngineCreate(void *engine_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li, const char *blob, size_t timeout, char **err) { int ret = C_ERR; luaEngineCtx *lua_engine_ctx = engine_ctx; lua_State *lua = lua_engine_ctx->lua; @@ -114,8 +114,8 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size lua_pop(lua, 1); /* pop the metatable */ /* compile the code */ - if (luaL_loadbuffer(lua, blob, sdslen(blob), "@user_function")) { - *err = sdscatprintf(sdsempty(), "Error compiling function: %s", lua_tostring(lua, -1)); + if (luaL_loadbuffer(lua, blob, strlen(blob), "@user_function")) { + *err = valkey_asprintf("Error compiling function: %s", lua_tostring(lua, -1)); lua_pop(lua, 1); /* pops the error */ goto done; } @@ -133,7 +133,7 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size if (lua_pcall(lua, 0, 0, 0)) { errorInfo err_info = {0}; luaExtractErrorInformation(lua, &err_info); - *err = sdscatprintf(sdsempty(), "Error registering functions: %s", err_info.msg); + *err = valkey_asprintf("Error registering functions: %s", err_info.msg); lua_pop(lua, 1); /* pops the error */ luaErrorInformationDiscard(&err_info); goto done; @@ -158,7 +158,7 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size /* * Invole the give function with the given keys and args */ -static void luaEngineCall(scriptRunCtx *run_ctx, +static void luaEngineCall(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx, void *engine_ctx, void *compiled_function, robj **keys, @@ -177,6 +177,7 @@ static void luaEngineCall(scriptRunCtx *run_ctx, serverAssert(lua_isfunction(lua, -1)); + scriptRunCtx *run_ctx = moduleGetScriptRunCtxFromFunctionCtx(func_ctx); luaCallFunction(run_ctx, lua, keys, nkeys, args, nargs, 0); lua_pop(lua, 1); /* Pop error handler */ } @@ -406,12 +407,13 @@ static int luaRegisterFunction(lua_State *lua) { return luaError(lua); } - sds err = NULL; + char *err = NULL; if (functionLibCreateFunction(register_f_args.name, register_f_args.lua_f_ctx, load_ctx->li, register_f_args.desc, register_f_args.f_flags, &err) != C_OK) { + serverAssert(err != NULL); luaRegisterFunctionArgsDispose(lua, ®ister_f_args); luaPushError(lua, err); - sdsfree(err); + zfree(err); return luaError(lua); } @@ -494,16 +496,13 @@ int luaEngineInitEngine(void) { lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the new global table */ lua_replace(lua_engine_ctx->lua, LUA_GLOBALSINDEX); /* set new global table as the new globals */ - - engine *lua_engine = zmalloc(sizeof(*lua_engine)); - *lua_engine = (engine){ - .engine_ctx = lua_engine_ctx, - .create = luaEngineCreate, - .call = luaEngineCall, - .get_used_memory = luaEngineGetUsedMemoy, - .get_function_memory_overhead = luaEngineFunctionMemoryOverhead, - .get_engine_memory_overhead = luaEngineMemoryOverhead, - .free_function = luaEngineFreeFunction, - }; - return functionsRegisterEngine(LUA_ENGINE_NAME, lua_engine); + return functionsRegisterEngine(LUA_ENGINE_NAME, + NULL, + lua_engine_ctx, + luaEngineCreate, + luaEngineCall, + luaEngineGetUsedMemoy, + luaEngineFunctionMemoryOverhead, + luaEngineMemoryOverhead, + luaEngineFreeFunction); } diff --git a/src/functions.c b/src/functions.c index e950024bad..3062cad0ec 100644 --- a/src/functions.c +++ b/src/functions.c @@ -122,7 +122,7 @@ static size_t functionMallocSize(functionInfo *fi) { fi->li->ei->engine->get_function_memory_overhead(fi->function); } -static size_t libraryMallocSize(functionLibInfo *li) { +static size_t libraryMallocSize(ValkeyModuleScriptingEngineFunctionLibrary *li) { return zmalloc_size(li) + sdsAllocSize(li->name) + sdsAllocSize(li->code); } @@ -148,7 +148,7 @@ static void engineFunctionDispose(dict *d, void *obj) { zfree(fi); } -static void engineLibraryFree(functionLibInfo *li) { +static void engineLibraryFree(ValkeyModuleScriptingEngineFunctionLibrary *li) { if (!li) { return; } @@ -227,6 +227,15 @@ functionsLibCtx *functionsLibCtxCreate(void) { return ret; } +void functionsAddEngineStats(engineInfo *ei) { + serverAssert(curr_functions_lib_ctx != NULL); + dictEntry *entry = dictFind(curr_functions_lib_ctx->engines_stats, ei->name); + if (entry == NULL) { + functionsLibEngineStats *stats = zcalloc(sizeof(*stats)); + dictAdd(curr_functions_lib_ctx->engines_stats, ei->name, stats); + } +} + /* * Creating a function inside the given library. * On success, return C_OK. @@ -236,15 +245,21 @@ functionsLibCtx *functionsLibCtxCreate(void) { * the function will verify that the given name is following the naming format * and return an error if its not. */ -int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err) { +int functionLibCreateFunction(sds name, + void *function, + ValkeyModuleScriptingEngineFunctionLibrary *li, + sds desc, + uint64_t f_flags, + char **err) { if (functionsVerifyName(name) != C_OK) { - *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one " - "character long"); + *err = valkey_asprintf("Function names can only contain letters, numbers," + " or underscores(_) and must be at least one " + "character long"); return C_ERR; } if (dictFetchValue(li->functions, name)) { - *err = sdsnew("Function already exists in the library"); + *err = valkey_asprintf("Function already exists in the library"); return C_ERR; } @@ -263,9 +278,9 @@ int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds return C_OK; } -static functionLibInfo *engineLibraryCreate(sds name, engineInfo *ei, sds code) { - functionLibInfo *li = zmalloc(sizeof(*li)); - *li = (functionLibInfo){ +static ValkeyModuleScriptingEngineFunctionLibrary *engineLibraryCreate(sds name, engineInfo *ei, sds code) { + ValkeyModuleScriptingEngineFunctionLibrary *li = zmalloc(sizeof(*li)); + *li = (ValkeyModuleScriptingEngineFunctionLibrary){ .name = sdsdup(name), .functions = dictCreate(&libraryFunctionDictType), .ei = ei, @@ -274,7 +289,7 @@ static functionLibInfo *engineLibraryCreate(sds name, engineInfo *ei, sds code) return li; } -static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) { +static void libraryUnlink(functionsLibCtx *lib_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li) { dictIterator *iter = dictGetIterator(li->functions); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { @@ -296,7 +311,7 @@ static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) { stats->n_functions -= dictSize(li->functions); } -static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo *li) { +static void libraryLink(functionsLibCtx *lib_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li) { dictIterator *iter = dictGetIterator(li->functions); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { @@ -332,8 +347,8 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l dictEntry *entry = NULL; iter = dictGetIterator(functions_lib_ctx_src->libraries); while ((entry = dictNext(iter))) { - functionLibInfo *li = dictGetVal(entry); - functionLibInfo *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name); + ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry); + ValkeyModuleScriptingEngineFunctionLibrary *old_li = dictFetchValue(functions_lib_ctx_dst->libraries, li->name); if (old_li) { if (!replace) { /* library already exists, failed the restore. */ @@ -367,7 +382,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l /* No collision, it is safe to link all the new libraries. */ iter = dictGetIterator(functions_lib_ctx_src->libraries); while ((entry = dictNext(iter))) { - functionLibInfo *li = dictGetVal(entry); + ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry); libraryLink(functions_lib_ctx_dst, li); dictSetVal(functions_lib_ctx_src->libraries, entry, NULL); } @@ -387,7 +402,7 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l /* Link back all libraries on tmp_l_ctx */ while (listLength(old_libraries_list) > 0) { listNode *head = listFirst(old_libraries_list); - functionLibInfo *li = listNodeValue(head); + ValkeyModuleScriptingEngineFunctionLibrary *li = listNodeValue(head); listNodeValue(head) = NULL; libraryLink(functions_lib_ctx_dst, li); listDelNode(old_libraries_list, head); @@ -400,8 +415,22 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l /* Register an engine, should be called once by the engine on startup and give the following: * * - engine_name - name of the engine to register - * - engine_ctx - the engine ctx that should be used by the server to interact with the engine */ -int functionsRegisterEngine(const char *engine_name, engine *engine) { + * - engine_module - the valkey module that implements this engine + * - engine_ctx - the engine ctx that should be used by the server to interact with the engine + * - create_func - the callback function that is called when a new script is loaded, which adds + * adds new functions to the engine function library. + * - call_func - the callback function that is called when a function is called e.g., using the + * 'FCALL' command. + */ +int functionsRegisterEngine(const char *engine_name, + ValkeyModule *engine_module, + void *engine_ctx, + ValkeyModuleScriptingEngineCreateFunc create_func, + ValkeyModuleScriptingEngineFunctionCallFunc call_func, + ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory_func, + ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead_func, + ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead_func, + ValkeyModuleScriptingEngineFreeFunctionFunc free_function_func) { sds engine_name_sds = sdsnew(engine_name); if (dictFetchValue(engines, engine_name_sds)) { serverLog(LL_WARNING, "Same engine was registered twice"); @@ -409,6 +438,17 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) { return C_ERR; } + engine *engine = zmalloc(sizeof(struct engine)); + *engine = (struct engine){ + .engine_ctx = engine_ctx, + .create = create_func, + .call = call_func, + .get_used_memory = get_used_memory_func, + .get_function_memory_overhead = get_function_memory_overhead_func, + .get_engine_memory_overhead = get_engine_memory_overhead_func, + .free_function = free_function_func, + }; + client *c = createClient(NULL); c->flag.deny_blocking = 1; c->flag.script = 1; @@ -416,18 +456,55 @@ int functionsRegisterEngine(const char *engine_name, engine *engine) { engineInfo *ei = zmalloc(sizeof(*ei)); *ei = (engineInfo){ .name = engine_name_sds, + .engineModule = engine_module, .engine = engine, .c = c, }; dictAdd(engines, engine_name_sds, ei); + functionsAddEngineStats(ei); + engine_cache_memory += zmalloc_size(ei) + sdsAllocSize(ei->name) + zmalloc_size(engine) + engine->get_engine_memory_overhead(engine->engine_ctx); return C_OK; } +/* Removes a scripting engine from the server. + * + * - engine_name - name of the engine to remove + */ +int functionsUnregisterEngine(const char *engine_name) { + sds engine_name_sds = sdsnew(engine_name); + dictEntry *entry = dictFind(engines, engine_name_sds); + if (entry == NULL) { + serverLog(LL_WARNING, "There's no engine registered with name %s", engine_name); + sdsfree(engine_name_sds); + return C_ERR; + } + + engineInfo *ei = dictGetVal(entry); + + dictIterator *iter = dictGetSafeIterator(curr_functions_lib_ctx->libraries); + while ((entry = dictNext(iter))) { + ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry); + if (li->ei == ei) { + libraryUnlink(curr_functions_lib_ctx, li); + engineLibraryFree(li); + } + } + dictReleaseIterator(iter); + + zfree(ei->engine); + sdsfree(ei->name); + freeClient(ei->c); + zfree(ei); + + sdsfree(engine_name_sds); + return C_OK; +} + /* * FUNCTION STATS */ @@ -535,7 +612,7 @@ void functionListCommand(client *c) { dictIterator *iter = dictGetIterator(curr_functions_lib_ctx->libraries); dictEntry *entry = NULL; while ((entry = dictNext(iter))) { - functionLibInfo *li = dictGetVal(entry); + ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry); if (library_name) { if (!stringmatchlen(library_name, sdslen(library_name), li->name, sdslen(li->name), 1)) { continue; @@ -584,7 +661,7 @@ void functionListCommand(client *c) { */ void functionDeleteCommand(client *c) { robj *function_name = c->argv[2]; - functionLibInfo *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr); + ValkeyModuleScriptingEngineFunctionLibrary *li = dictFetchValue(curr_functions_lib_ctx->libraries, function_name->ptr); if (!li) { addReplyError(c, "Library not found"); return; @@ -614,55 +691,18 @@ uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) { return scriptFlagsToCmdFlags(cmd_flags, script_flags); } -static void fcallCommandGeneric(client *c, int ro) { - /* Functions need to be fed to monitors before the commands they execute. */ - replicationFeedMonitors(c, server.monitors, c->db->id, c->argv, c->argc); - - robj *function_name = c->argv[1]; - dictEntry *de = c->cur_script; - if (!de) de = dictFind(curr_functions_lib_ctx->functions, function_name->ptr); - if (!de) { - addReplyError(c, "Function not found"); - return; - } - functionInfo *fi = dictGetVal(de); - engine *engine = fi->li->ei->engine; - - long long numkeys; - /* Get the number of arguments that are keys */ - if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { - addReplyError(c, "Bad number of keys provided"); - return; - } - if (numkeys > (c->argc - 3)) { - addReplyError(c, "Number of keys can't be greater than number of args"); - return; - } else if (numkeys < 0) { - addReplyError(c, "Number of keys can't be negative"); - return; - } - - scriptRunCtx run_ctx; - - if (scriptPrepareForRun(&run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) return; - - engine->call(&run_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, c->argv + 3 + numkeys, - c->argc - 3 - numkeys); - scriptResetRun(&run_ctx); -} - /* * FCALL nkeys */ void fcallCommand(client *c) { - fcallCommandGeneric(c, 0); + fcallCommandGeneric(curr_functions_lib_ctx->functions, c, 0); } /* * FCALL_RO nkeys */ void fcallroCommand(client *c) { - fcallCommandGeneric(c, 1); + fcallCommandGeneric(curr_functions_lib_ctx->functions, c, 1); } /* @@ -882,16 +922,16 @@ static int functionsVerifyName(sds name) { return C_OK; } -int functionExtractLibMetaData(sds payload, functionsLibMetaData *md, sds *err) { +int functionExtractLibMetaData(sds payload, functionsLibMetaData *md, char **err) { sds name = NULL; sds engine = NULL; if (strncmp(payload, "#!", 2) != 0) { - *err = sdsnew("Missing library metadata"); + *err = valkey_asprintf("Missing library metadata"); return C_ERR; } char *shebang_end = strchr(payload, '\n'); if (shebang_end == NULL) { - *err = sdsnew("Invalid library metadata"); + *err = valkey_asprintf("Invalid library metadata"); return C_ERR; } size_t shebang_len = shebang_end - payload; @@ -900,7 +940,7 @@ int functionExtractLibMetaData(sds payload, functionsLibMetaData *md, sds *err) sds *parts = sdssplitargs(shebang, &numparts); sdsfree(shebang); if (!parts || numparts == 0) { - *err = sdsnew("Invalid library metadata"); + *err = valkey_asprintf("Invalid library metadata"); sdsfreesplitres(parts, numparts); return C_ERR; } @@ -910,19 +950,19 @@ int functionExtractLibMetaData(sds payload, functionsLibMetaData *md, sds *err) sds part = parts[i]; if (strncasecmp(part, "name=", 5) == 0) { if (name) { - *err = sdscatfmt(sdsempty(), "Invalid metadata value, name argument was given multiple times"); + *err = valkey_asprintf("Invalid metadata value, name argument was given multiple times"); goto error; } name = sdsdup(part); sdsrange(name, 5, -1); continue; } - *err = sdscatfmt(sdsempty(), "Invalid metadata value given: %s", part); + *err = valkey_asprintf("Invalid metadata value given: %s", part); goto error; } if (!name) { - *err = sdsnew("Library name was not given"); + *err = valkey_asprintf("Library name was not given"); goto error; } @@ -949,25 +989,27 @@ void functionFreeLibMetaData(functionsLibMetaData *md) { /* Compile and save the given library, return the loaded library name on success * and NULL on failure. In case on failure the err out param is set with relevant error message */ -sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibCtx *lib_ctx, size_t timeout) { +sds functionsCreateWithLibraryCtx(sds code, int replace, char **err, functionsLibCtx *lib_ctx, size_t timeout) { dictIterator *iter = NULL; dictEntry *entry = NULL; - functionLibInfo *new_li = NULL; - functionLibInfo *old_li = NULL; + ValkeyModuleScriptingEngineFunctionLibrary *old_li = NULL; functionsLibMetaData md = {0}; + ValkeyModuleScriptingEngineFunctionLibrary *new_li = NULL; + if (functionExtractLibMetaData(code, &md, err) != C_OK) { return NULL; } if (functionsVerifyName(md.name)) { - *err = sdsnew("Library names can only contain letters, numbers, or underscores(_) and must be at least one " - "character long"); + *err = valkey_asprintf("Library names can only contain letters, numbers," + " or underscores(_) and must be at least one " + "character long"); goto error; } engineInfo *ei = dictFetchValue(engines, md.engine); if (!ei) { - *err = sdscatfmt(sdsempty(), "Engine '%S' not found", md.engine); + *err = valkey_asprintf("Engine '%s' not found", md.engine); goto error; } engine *engine = ei->engine; @@ -975,7 +1017,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC old_li = dictFetchValue(lib_ctx->libraries, md.name); if (old_li && !replace) { old_li = NULL; - *err = sdscatfmt(sdsempty(), "Library '%S' already exists", md.name); + *err = valkey_asprintf("Library '%s' already exists", md.name); goto error; } @@ -989,7 +1031,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC } if (dictSize(new_li->functions) == 0) { - *err = sdsnew("No functions registered"); + *err = valkey_asprintf("No functions registered"); goto error; } @@ -999,7 +1041,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC functionInfo *fi = dictGetVal(entry); if (dictFetchValue(lib_ctx->functions, fi->name)) { /* functions name collision, abort. */ - *err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); + *err = valkey_asprintf("Function %s already exists", fi->name); goto error; } } @@ -1050,14 +1092,16 @@ void functionLoadCommand(client *c) { } robj *code = c->argv[argc_pos]; - sds err = NULL; + char *err = NULL; sds library_name = NULL; size_t timeout = LOAD_TIMEOUT_MS; if (mustObeyClient(c)) { timeout = 0; } if (!(library_name = functionsCreateWithLibraryCtx(code->ptr, replace, &err, curr_functions_lib_ctx, timeout))) { - addReplyErrorSds(c, err); + serverAssert(err != NULL); + addReplyError(c, err); + zfree(err); return; } /* Indicate that the command changed the data so it will be replicated and @@ -1114,12 +1158,11 @@ size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx) { int functionsInit(void) { engines = dictCreate(&engineDictType); + curr_functions_lib_ctx = functionsLibCtxCreate(); + if (luaEngineInitEngine() != C_OK) { return C_ERR; } - /* Must be initialized after engines initialization */ - curr_functions_lib_ctx = functionsLibCtxCreate(); - return C_OK; } diff --git a/src/functions.h b/src/functions.h index da196cf197..1c19a3d680 100644 --- a/src/functions.h +++ b/src/functions.h @@ -52,7 +52,7 @@ #include "script.h" #include "valkeymodule.h" -typedef struct functionLibInfo functionLibInfo; +typedef struct ValkeyModuleScriptingEngineFunctionLibrary ValkeyModuleScriptingEngineFunctionLibrary; typedef struct engine { /* engine specific context */ @@ -65,19 +65,23 @@ typedef struct engine { * timeout - timeout for the library creation (0 for no timeout) * err - description of error (if occurred) * returns C_ERR on error and set err to be the error message */ - int (*create)(void *engine_ctx, functionLibInfo *li, sds code, size_t timeout, sds *err); - - /* Invoking a function, r_ctx is an opaque object (from engine POV). - * The r_ctx should be used by the engine to interaction with the server, + int (*create)(void *engine_ctx, + ValkeyModuleScriptingEngineFunctionLibrary *li, + const char *code, + size_t timeout, + char **err); + + /* Invoking a function, func_ctx is an opaque object (from engine POV). + * The func_ctx should be used by the engine to interaction with the server, * such interaction could be running commands, set resp, or set * replication mode */ - void (*call)(scriptRunCtx *r_ctx, + void (*call)(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx, void *engine_ctx, void *compiled_function, - robj **keys, + ValkeyModuleString **keys, size_t nkeys, - robj **args, + ValkeyModuleString **args, size_t nargs); /* get current used memory by the engine */ @@ -98,32 +102,43 @@ typedef struct engine { /* Hold information about an engine. * Used on rdb.c so it must be declared here. */ typedef struct engineInfo { - sds name; /* Name of the engine */ - engine *engine; /* engine callbacks that allows to interact with the engine */ - client *c; /* Client that is used to run commands */ + sds name; /* Name of the engine */ + ValkeyModule *engineModule; /* the module that implements the scripting engine */ + engine *engine; /* engine callbacks that allows to interact with the engine */ + client *c; /* Client that is used to run commands */ } engineInfo; /* Hold information about the specific function. * Used on rdb.c so it must be declared here. */ typedef struct functionInfo { - sds name; /* Function name */ - void *function; /* Opaque object that set by the function's engine and allow it - to run the function, usually it's the function compiled code. */ - functionLibInfo *li; /* Pointer to the library created the function */ - sds desc; /* Function description */ - uint64_t f_flags; /* Function flags */ + sds name; /* Function name */ + void *function; /* Opaque object that set by the function's engine and allow it + to run the function, usually it's the function compiled code. */ + ValkeyModuleScriptingEngineFunctionLibrary *li; /* Pointer to the library created the function */ + sds desc; /* Function description */ + uint64_t f_flags; /* Function flags */ } functionInfo; /* Hold information about the specific library. * Used on rdb.c so it must be declared here. */ -struct functionLibInfo { +struct ValkeyModuleScriptingEngineFunctionLibrary { sds name; /* Library name */ dict *functions; /* Functions dictionary */ engineInfo *ei; /* Pointer to the function engine */ sds code; /* Library code */ }; -int functionsRegisterEngine(const char *engine_name, engine *engine_ctx); +int functionsRegisterEngine(const char *engine_name, + ValkeyModule *engine_module, + void *engine_ctx, + ValkeyModuleScriptingEngineCreateFunc create_func, + ValkeyModuleScriptingEngineFunctionCallFunc call_func, + ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory_func, + ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead_func, + ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead_func, + ValkeyModuleScriptingEngineFreeFunctionFunc free_function_func); +int functionsUnregisterEngine(const char *engine_name); + sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibCtx *lib_ctx, size_t timeout); unsigned long functionsMemory(void); unsigned long functionsMemoryOverhead(void); @@ -138,7 +153,14 @@ void functionsLibCtxFree(functionsLibCtx *lib_ctx); void functionsLibCtxClear(functionsLibCtx *lib_ctx); void functionsLibCtxSwapWithCurrent(functionsLibCtx *lib_ctx); -int functionLibCreateFunction(sds name, void *function, functionLibInfo *li, sds desc, uint64_t f_flags, sds *err); +void fcallCommandGeneric(dict *functions, client *c, int ro); + +int functionLibCreateFunction(sds name, + void *function, + ValkeyModuleScriptingEngineFunctionLibrary *li, + sds desc, + uint64_t f_flags, + char **err); int luaEngineInitEngine(void); int functionsInit(void); diff --git a/src/module.c b/src/module.c index 2884239200..2aaeebc9df 100644 --- a/src/module.c +++ b/src/module.c @@ -67,6 +67,7 @@ #include #include #include +#include "functions.h" /* -------------------------------------------------------------------------- * Private data structures used by the modules system. Those are data @@ -173,6 +174,11 @@ struct ValkeyModuleCtx { }; typedef struct ValkeyModuleCtx ValkeyModuleCtx; +struct ValkeyModuleScriptingEngineFunctionCallCtx { + ValkeyModuleCtx module_ctx; + scriptRunCtx run_ctx; +}; + #define VALKEYMODULE_CTX_NONE (0) #define VALKEYMODULE_CTX_AUTO_MEMORY (1 << 0) #define VALKEYMODULE_CTX_KEYS_POS_REQUEST (1 << 1) @@ -13032,6 +13038,168 @@ int VM_RdbSave(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { return VALKEYMODULE_OK; } +/* Registers a new scripting engine in the server. + * + * - `engine_name`: the name of the scripting engine. This name will match + * against the engine name specified in the script header + * using a shebang. + * + * - `engine_ctx`: engine specific context pointer. + * + * - `create_func`: Library create function callback. When a new script is + * loaded, this callback will be called with the script code. + * `VM_RegisterScriptingEngineFunction` function should be + * called inside this callback to register the library + * functions declared in the script code. + * + * - `call_func`: the callback function called when `FCALL` command is called + * on a function registered in this engine. + * + * - `get_used_memory_func`: function callback to get current used memory by the + * engine. + * + * - `get_function_memory_overhead_func`: function callback to return memory + * overhead for a given function. + * + * - `get_engine_memory_overhead_func`: function callback to return memory + * overhead of the engine. + * + * - `free_function_func`: function callback to free the memory of a registered + * engine function. + */ +int VM_RegisterScriptingEngine(ValkeyModuleCtx *ctx, + const char *engine_name, + void *engine_ctx, + ValkeyModuleScriptingEngineCreateFunc create_func, + ValkeyModuleScriptingEngineFunctionCallFunc call_func, + ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory_func, + ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead_func, + ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead_func, + ValkeyModuleScriptingEngineFreeFunctionFunc free_function_func) { + serverLog(LL_DEBUG, "Registering a new scripting engine: %s", engine_name); + + if (functionsRegisterEngine(engine_name, + ctx->module, + engine_ctx, + create_func, + call_func, + get_used_memory_func, + get_function_memory_overhead_func, + get_engine_memory_overhead_func, + free_function_func) != C_OK) { + return VALKEYMODULE_ERR; + } + + return VALKEYMODULE_OK; +} + +/* Removes the scripting engine from the server. + * + * `engine_name` is the name of the scripting engine. + * + */ +int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name) { + UNUSED(ctx); + functionsUnregisterEngine(engine_name); + return VALKEYMODULE_OK; +} + + +/* Registers a new scripting function in the engine function library. + * + * This function should only be called in the context of the scripting engine + * creation callback function. + * + * - `name`: the name of the function. + * + * - `function`: the generic pointer to the function being registered. + * + * - `li`: the pointer to the opaque structure that holds the functions + * registered in the library that is currently being created. + * + * - `desc`: an optional description for the function. + * + * - `flags`: optional flags of the function. See `SCRIPT_FLAG_*` constants. + * + * - `err`: the pointer that will hold the error message in case of an error. + */ +int VM_RegisterScriptingEngineFunction(const char *name, + void *function, + ValkeyModuleScriptingEngineFunctionLibrary *li, + const char *desc, + uint64_t f_flags, + char **err) { + sds fname = sdsnew(name); + sds fdesc = sdsnew(desc); + if (functionLibCreateFunction(fname, function, li, fdesc, f_flags, err) != C_OK) { + sdsfree(fname); + sdsfree(fdesc); + return VALKEYMODULE_ERR; + } + + return VALKEYMODULE_OK; +} + +/* Implements the scripting engine function call logic. + * + */ +void fcallCommandGeneric(dict *functions, client *c, int ro) { + /* Functions need to be fed to monitors before the commands they execute. */ + replicationFeedMonitors(c, server.monitors, c->db->id, c->argv, c->argc); + + robj *function_name = c->argv[1]; + dictEntry *de = c->cur_script; + if (!de) de = dictFind(functions, function_name->ptr); + if (!de) { + addReplyError(c, "Function not found"); + return; + } + functionInfo *fi = dictGetVal(de); + engine *engine = fi->li->ei->engine; + + long long numkeys; + /* Get the number of arguments that are keys */ + if (getLongLongFromObject(c->argv[2], &numkeys) != C_OK) { + addReplyError(c, "Bad number of keys provided"); + return; + } + if (numkeys > (c->argc - 3)) { + addReplyError(c, "Number of keys can't be greater than number of args"); + return; + } else if (numkeys < 0) { + addReplyError(c, "Number of keys can't be negative"); + return; + } + + struct ValkeyModuleScriptingEngineFunctionCallCtx func_ctx; + + if (scriptPrepareForRun(&func_ctx.run_ctx, fi->li->ei->c, c, fi->name, fi->f_flags, ro) != C_OK) return; + + if (fi->li->ei->engineModule != NULL) { + moduleCreateContext(&func_ctx.module_ctx, fi->li->ei->engineModule, VALKEYMODULE_CTX_NONE); + func_ctx.module_ctx.client = func_ctx.run_ctx.original_client; + } + + engine->call(&func_ctx, engine->engine_ctx, fi->function, c->argv + 3, numkeys, c->argv + 3 + numkeys, + c->argc - 3 - numkeys); + scriptResetRun(&func_ctx.run_ctx); + + if (fi->li->ei->engineModule != NULL) { + moduleFreeContext(&func_ctx.module_ctx); + } +} + +/* Allows to get the module context pointer from the function call context pointer. + * + */ +ValkeyModuleCtx *VM_GetModuleCtxFromFunctionCallCtx(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx) { + return &func_ctx->module_ctx; +} + +scriptRunCtx *moduleGetScriptRunCtxFromFunctionCtx(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx) { + return &func_ctx->run_ctx; +} + /* MODULE command. * * MODULE LIST @@ -13901,4 +14069,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(RdbStreamFree); REGISTER_API(RdbLoad); REGISTER_API(RdbSave); + REGISTER_API(RegisterScriptingEngine); + REGISTER_API(UnregisterScriptingEngine); + REGISTER_API(RegisterScriptingEngineFunction); + REGISTER_API(GetModuleCtxFromFunctionCallCtx); } diff --git a/src/modules/CMakeLists.txt b/src/modules/CMakeLists.txt index 958796232f..8181cf93a0 100644 --- a/src/modules/CMakeLists.txt +++ b/src/modules/CMakeLists.txt @@ -7,6 +7,7 @@ list(APPEND MODULES_LIST "hellohook") list(APPEND MODULES_LIST "hellotimer") list(APPEND MODULES_LIST "hellotype") list(APPEND MODULES_LIST "helloworld") +list(APPEND MODULES_LIST "helloscripting") foreach (MODULE_NAME ${MODULES_LIST}) message(STATUS "Building module: ${MODULE_NAME}") diff --git a/src/rdb.c b/src/rdb.c index 1c200e54f5..82b9c6defd 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1303,7 +1303,7 @@ ssize_t rdbSaveFunctions(rio *rdb) { while ((entry = dictNext(iter))) { if ((ret = rdbSaveType(rdb, RDB_OPCODE_FUNCTION2)) < 0) goto werr; written += ret; - functionLibInfo *li = dictGetVal(entry); + ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry); if ((ret = rdbSaveRawString(rdb, (unsigned char *)li->code, sdslen(li->code))) < 0) goto werr; written += ret; } @@ -2945,11 +2945,11 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { * structure with out performing the actual functions loading. */ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, sds *err) { UNUSED(ver); - sds error = NULL; + char *error = NULL; sds final_payload = NULL; int res = C_ERR; if (!(final_payload = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - error = sdsnew("Failed loading library payload"); + error = valkey_asprintf("Failed loading library payload"); goto done; } @@ -2958,7 +2958,7 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s if (!(library_name = functionsCreateWithLibraryCtx(final_payload, rdbflags & RDBFLAGS_ALLOW_DUP, &error, lib_ctx, 0))) { if (!error) { - error = sdsnew("Failed creating the library"); + error = valkey_asprintf("Failed creating the library"); } goto done; } @@ -2971,10 +2971,11 @@ int rdbFunctionLoad(rio *rdb, int ver, functionsLibCtx *lib_ctx, int rdbflags, s if (final_payload) sdsfree(final_payload); if (error) { if (err) { - *err = error; + *err = sdsnew(error); + zfree(error); } else { serverLog(LL_WARNING, "Failed creating function, %s", error); - sdsfree(error); + zfree(error); } } return res; diff --git a/src/server.h b/src/server.h index 5cf56e9c86..0e6a237f07 100644 --- a/src/server.h +++ b/src/server.h @@ -2716,6 +2716,7 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en void moduleDefragGlobals(void); void *moduleGetHandleByName(char *modulename); int moduleIsModuleCommand(void *module_handle, struct serverCommand *cmd); +struct scriptRunCtx *moduleGetScriptRunCtxFromFunctionCtx(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx); /* Utils */ long long ustime(void); diff --git a/src/strl.c b/src/strl.c index 0453e0608f..feab779bc3 100644 --- a/src/strl.c +++ b/src/strl.c @@ -14,6 +14,9 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ #include +#include +#include +#include "zmalloc.h" /* * Copy string src to buffer dst of size dsize. At most dsize-1 @@ -81,6 +84,24 @@ valkey_strlcat(char *dst, const char *src, size_t dsize) } +/* + * This function is similar to asprintf function, but it uses zmalloc for + * allocating the string buffer. + * + * IMPORTANT: don't forget to free the buffer when no longer needed. + */ +char *valkey_asprintf(char const *fmt, ...) { + va_list args; + va_start(args, fmt); + size_t str_len = vsnprintf(NULL, 0, fmt, args) + 1; + va_end(args); + char* str = zmalloc(str_len); + va_start(args, fmt); + vsnprintf(str, str_len, fmt, args); + va_end(args); + + return str; +} diff --git a/src/util.h b/src/util.h index 51eb38f0b4..61095ddb65 100644 --- a/src/util.h +++ b/src/util.h @@ -99,5 +99,6 @@ int snprintf_async_signal_safe(char *to, size_t n, const char *fmt, ...); #endif size_t valkey_strlcpy(char *dst, const char *src, size_t dsize); size_t valkey_strlcat(char *dst, const char *src, size_t dsize); +char *valkey_asprintf(char const *fmt, ...); #endif diff --git a/src/valkeymodule.h b/src/valkeymodule.h index c2cdb2f0e7..49889c5f70 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -794,6 +794,16 @@ typedef void (*ValkeyModuleInfoFunc)(ValkeyModuleInfoCtx *ctx, int for_crash_rep typedef void (*ValkeyModuleDefragFunc)(ValkeyModuleDefragCtx *ctx); typedef void (*ValkeyModuleUserChangedFunc)(uint64_t client_id, void *privdata); +/* Type definitions for implementing scripting engines modules. */ +typedef struct ValkeyModuleScriptingEngineFunctionLibrary ValkeyModuleScriptingEngineFunctionLibrary; +typedef struct ValkeyModuleScriptingEngineFunctionCallCtx ValkeyModuleScriptingEngineFunctionCallCtx; +typedef int (*ValkeyModuleScriptingEngineCreateFunc)(void *engine_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li, const char *code, size_t timeout, char **err); +typedef void (*ValkeyModuleScriptingEngineFunctionCallFunc)(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx, void *engine_ctx, void *compiled_function, ValkeyModuleString **keys, size_t nkeys, ValkeyModuleString **args, size_t nargs); +typedef size_t (*ValkeyModuleScriptingEngineGetUsedMemoryFunc)(void *engine_ctx); +typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)(void *compiled_function); +typedef size_t (*ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc)(void *engine_ctx); +typedef void (*ValkeyModuleScriptingEngineFreeFunctionFunc)(void *engine_ctx, void *compiled_function); + /* ------------------------- End of common defines ------------------------ */ /* ----------- The rest of the defines are only for modules ----------------- */ @@ -1649,6 +1659,28 @@ VALKEYMODULE_API int (*ValkeyModule_RdbSave)(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) VALKEYMODULE_ATTR; +VALKEYMODULE_API int (*ValkeyModule_RegisterScriptingEngine)(ValkeyModuleCtx *ctx, + const char *engine_name, + void *engine_ctx, + ValkeyModuleScriptingEngineCreateFunc create_func, + ValkeyModuleScriptingEngineFunctionCallFunc call_func, + ValkeyModuleScriptingEngineGetUsedMemoryFunc get_used_memory_func, + ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead_func, + ValkeyModuleScriptingEngineGetEngineMemoryOverheadFunc get_engine_memory_overhead_func, + ValkeyModuleScriptingEngineFreeFunctionFunc free_function_func) VALKEYMODULE_ATTR; + +VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx *ctx, + const char *engine_name) VALKEYMODULE_ATTR; + +VALKEYMODULE_API int (*ValkeyModule_RegisterScriptingEngineFunction)(const char *name, + void *function, + ValkeyModuleScriptingEngineFunctionLibrary *li, + const char *desc, + uint64_t f_flags, + char **err) VALKEYMODULE_ATTR; + +VALKEYMODULE_API ValkeyModuleCtx *(*ValkeyModule_GetModuleCtxFromFunctionCallCtx)(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx); + #define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX) /* This is included inline inside each Valkey module. */ @@ -2015,6 +2047,10 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in VALKEYMODULE_GET_API(RdbStreamFree); VALKEYMODULE_GET_API(RdbLoad); VALKEYMODULE_GET_API(RdbSave); + VALKEYMODULE_GET_API(RegisterScriptingEngine); + VALKEYMODULE_GET_API(UnregisterScriptingEngine); + VALKEYMODULE_GET_API(RegisterScriptingEngineFunction); + VALKEYMODULE_GET_API(GetModuleCtxFromFunctionCallCtx); if (ValkeyModule_IsModuleNameBusy && ValkeyModule_IsModuleNameBusy(name)) return VALKEYMODULE_ERR; ValkeyModule_SetModuleAttribs(ctx, name, ver, apiver); From 9c618a38a52f82cc86e56462816cd3f40d3b3b32 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Fri, 8 Nov 2024 14:50:48 +0000 Subject: [PATCH 2/6] Adds new module API test to test the scripting engine module API This commit adds a module with a very simple stack based scripting language implementation to test the new module API that allows to implement new scripting engines as modules. Signed-off-by: Ricardo Dias --- tests/modules/Makefile | 3 +- tests/modules/helloscripting.c | 276 +++++++++++++++++++++++ tests/unit/functions.tcl | 4 +- tests/unit/moduleapi/scriptingengine.tcl | 82 +++++++ 4 files changed, 362 insertions(+), 3 deletions(-) create mode 100644 tests/modules/helloscripting.c create mode 100644 tests/unit/moduleapi/scriptingengine.tcl diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 1690b9b627..3ece6f9603 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -64,7 +64,8 @@ TEST_MODULES = \ moduleauthtwo.so \ rdbloadsave.so \ crash.so \ - cluster.so + cluster.so \ + helloscripting.so .PHONY: all diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c new file mode 100644 index 0000000000..7e3c9a6c32 --- /dev/null +++ b/tests/modules/helloscripting.c @@ -0,0 +1,276 @@ +#include "valkeymodule.h" + +#include +#include +#include + + +typedef enum HelloInstKind { + FUNCTION = 0, + CONSTI, + ARGS, + RETURN, + _END, +} HelloInstKind; + +const char *HelloInstKindStr[] = { + "FUNCTION", + "CONSTI", + "ARGS", + "RETURN", +}; + +typedef struct HelloInst { + HelloInstKind kind; + union { + uint32_t integer; + const char *string; + } param; +} HelloInst; + +typedef struct HelloFunc { + char *name; + HelloInst instructions[256]; + uint32_t num_instructions; +} HelloFunc; + +typedef struct HelloProgram { + HelloFunc *functions[16]; + uint32_t num_functions; +} HelloProgram; + +typedef struct HelloLangCtx { + HelloProgram *program; +} HelloLangCtx; + + +static HelloLangCtx *hello_ctx = NULL; + + +static HelloInstKind helloLangParseInstruction(const char *token) { + for (HelloInstKind i = 0; i < _END; i++) { + if (strcmp(HelloInstKindStr[i], token) == 0) { + return i; + } + } + return _END; +} + +static void helloLangParseFunction(HelloFunc *func) { + char *token = strtok(NULL, " \n"); + ValkeyModule_Assert(token != NULL); + func->name = ValkeyModule_Alloc(sizeof(char) * strlen(token) + 1); + strcpy(func->name, token); +} + +static uint32_t str2int(const char *str) { + char *end; + errno = 0; + uint32_t val = (uint32_t)strtoul(str, &end, 10); + ValkeyModule_Assert(errno == 0); + return val; +} + +static void helloLangParseIntegerParam(HelloFunc *func) { + char *token = strtok(NULL, " \n"); + func->instructions[func->num_instructions].param.integer = str2int(token); +} + +static void helloLangParseConstI(HelloFunc *func) { + helloLangParseIntegerParam(func); + func->num_instructions++; +} + +static void helloLangParseArgs(HelloFunc *func) { + helloLangParseIntegerParam(func); + func->num_instructions++; +} + +static HelloProgram *helloLangParseCode(const char *code, HelloProgram *program) { + char *_code = ValkeyModule_Alloc(sizeof(char) * strlen(code) + 1); + strcpy(_code, code); + + HelloFunc *currentFunc = NULL; + + char *token = strtok(_code, " \n"); + while (token != NULL) { + HelloInstKind kind = helloLangParseInstruction(token); + + if (currentFunc != NULL) { + currentFunc->instructions[currentFunc->num_instructions].kind = kind; + } + + switch (kind) { + case FUNCTION: + ValkeyModule_Assert(currentFunc == NULL); + currentFunc = ValkeyModule_Alloc(sizeof(HelloFunc)); + program->functions[program->num_functions++] = currentFunc; + helloLangParseFunction(currentFunc); + break; + case CONSTI: + ValkeyModule_Assert(currentFunc != NULL); + helloLangParseConstI(currentFunc); + break; + case ARGS: + ValkeyModule_Assert(currentFunc != NULL); + helloLangParseArgs(currentFunc); + break; + case RETURN: + ValkeyModule_Assert(currentFunc != NULL); + currentFunc->num_instructions++; + currentFunc = NULL; + break; + case _END: + ValkeyModule_Assert(0); + } + + token = strtok(NULL, " \n"); + } + + ValkeyModule_Free(_code); + + return program; +} + +static uint32_t executeHelloLangFunction(HelloFunc *func, ValkeyModuleString **args, int nargs) { + uint32_t stack[64]; + int sp = 0; + + for (uint32_t pc = 0; pc < func->num_instructions; pc++) { + HelloInst instr = func->instructions[pc]; + switch (instr.kind) { + case CONSTI: + stack[sp++] = instr.param.integer; + break; + case ARGS: + uint32_t idx = instr.param.integer; + ValkeyModule_Assert(idx < (uint32_t)nargs); + size_t len; + const char *argStr = ValkeyModule_StringPtrLen(args[idx], &len); + uint32_t arg = str2int(argStr); + stack[sp++] = arg; + break; + case RETURN: + uint32_t val = stack[--sp]; + ValkeyModule_Assert(sp == 0); + return val; + case FUNCTION: + case _END: + ValkeyModule_Assert(0); + } + } + + ValkeyModule_Assert(0); + return 0; +} + +static size_t engineGetUsedMemoy(void *engine_ctx) { + VALKEYMODULE_NOT_USED(engine_ctx); + return 0; +} + +static size_t engineMemoryOverhead(void *engine_ctx) { + HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx; + size_t overhead = ValkeyModule_MallocSize(engine_ctx); + if (ctx->program != NULL) { + overhead += ValkeyModule_MallocSize(ctx->program); + } + return overhead; +} + +static size_t engineFunctionMemoryOverhead(void *compiled_function) { + HelloFunc *func = (HelloFunc *)compiled_function; + return ValkeyModule_MallocSize(func->name); +} + +static void engineFreeFunction(void *engine_ctx, void *compiled_function) { + VALKEYMODULE_NOT_USED(engine_ctx); + HelloFunc *func = (HelloFunc *)compiled_function; + ValkeyModule_Free(func->name); + func->name = NULL; + ValkeyModule_Free(func); +} + +static int createHelloLangEngine(void *engine_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li, const char *code, size_t timeout, char **err) { + VALKEYMODULE_NOT_USED(timeout); + + HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx; + + if (ctx->program == NULL) { + ctx->program = ValkeyModule_Alloc(sizeof(HelloProgram)); + memset(ctx->program, 0, sizeof(HelloProgram)); + } else { + ctx->program->num_functions = 0; + } + + ctx->program = helloLangParseCode(code, ctx->program); + + for (uint32_t i = 0; i < ctx->program->num_functions; i++) { + HelloFunc *func = ctx->program->functions[i]; + int ret = ValkeyModule_RegisterScriptingEngineFunction(func->name, func, li, NULL, 0, err); + if (ret != 0) { + // We need to cleanup all parsed functions that were not registered. + for (uint32_t j=i; j < ctx->program->num_functions; j++) { + engineFreeFunction(NULL, ctx->program->functions[j]); + } + return ret; + } + } + + return 0; +} + +static void callHelloLangFunction(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx, + void *engine_ctx, + void *compiled_function, + ValkeyModuleString **keys, + size_t nkeys, + ValkeyModuleString **args, + size_t nargs) { + VALKEYMODULE_NOT_USED(engine_ctx); + VALKEYMODULE_NOT_USED(keys); + VALKEYMODULE_NOT_USED(nkeys); + + ValkeyModuleCtx *ctx = ValkeyModule_GetModuleCtxFromFunctionCallCtx(func_ctx); + + HelloFunc *func = (HelloFunc *)compiled_function; + uint32_t result = executeHelloLangFunction(func, args, nargs); + + ValkeyModule_ReplyWithLongLong(ctx, result); +} + +int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + VALKEYMODULE_NOT_USED(argv); + VALKEYMODULE_NOT_USED(argc); + + if (ValkeyModule_Init(ctx, "helloengine", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR; + + hello_ctx = ValkeyModule_Alloc(sizeof(HelloLangCtx)); + hello_ctx->program = NULL; + + ValkeyModule_RegisterScriptingEngine(ctx, + "HELLO", + hello_ctx, + createHelloLangEngine, + callHelloLangFunction, + engineGetUsedMemoy, + engineFunctionMemoryOverhead, + engineMemoryOverhead, + engineFreeFunction); + + return VALKEYMODULE_OK; +} + +int ValkeyModule_OnUnload(ValkeyModuleCtx *ctx) { + if (ValkeyModule_UnregisterScriptingEngine(ctx, "HELLO") != VALKEYMODULE_OK) { + ValkeyModule_Log(ctx, "error", "Failed to unregister engine"); + return VALKEYMODULE_ERR; + } + + ValkeyModule_Free(hello_ctx->program); + hello_ctx->program = NULL; + ValkeyModule_Free(hello_ctx); + hello_ctx = NULL; + + return VALKEYMODULE_OK; +} diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl index 7ddd36dd7d..1636baaf6d 100644 --- a/tests/unit/functions.tcl +++ b/tests/unit/functions.tcl @@ -604,7 +604,7 @@ start_server {tags {"scripting"}} { } } e set _ $e - } {*Library names can only contain letters, numbers, or underscores(_) and must be at least one character long*} + } {*Function names can only contain letters, numbers, or underscores(_) and must be at least one character long*} test {LIBRARIES - test registration with empty name} { catch { @@ -613,7 +613,7 @@ start_server {tags {"scripting"}} { } } e set _ $e - } {*Library names can only contain letters, numbers, or underscores(_) and must be at least one character long*} + } {*Function names can only contain letters, numbers, or underscores(_) and must be at least one character long*} test {LIBRARIES - math.random from function load} { catch { diff --git a/tests/unit/moduleapi/scriptingengine.tcl b/tests/unit/moduleapi/scriptingengine.tcl new file mode 100644 index 0000000000..b9ff08fd92 --- /dev/null +++ b/tests/unit/moduleapi/scriptingengine.tcl @@ -0,0 +1,82 @@ +set testmodule [file normalize tests/modules/helloscripting.so] + +set HELLO_PROGRAM "#!hello name=mylib\nFUNCTION foo\nARGS 0\nRETURN\nFUNCTION bar\nCONSTI 432\nRETURN" + +start_server {tags {"modules"}} { + r module load $testmodule + + r function load $HELLO_PROGRAM + + test {Load script with invalid library name} { + assert_error {ERR Library names can only contain letters, numbers, or underscores(_) and must be at least one character long} {r function load "#!hello name=my-lib\nFUNCTION foo\nARGS 0\nRETURN"} + } + + test {Load script with existing library} { + assert_error {ERR Library 'mylib' already exists} {r function load $HELLO_PROGRAM} + } + + test {Load script with invalid engine} { + assert_error {ERR Engine 'wasm' not found} {r function load "#!wasm name=mylib2\nFUNCTION foo\nARGS 0\nRETURN"} + } + + test {Load script with no functions} { + assert_error {ERR No functions registered} {r function load "#!hello name=mylib2\n"} + } + + test {Load script with duplicate function} { + assert_error {ERR Function foo already exists} {r function load "#!hello name=mylib2\nFUNCTION foo\nARGS 0\nRETURN"} + } + + test {Load script with no metadata header} { + assert_error {ERR Missing library metadata} {r function load "FUNCTION foo\nARGS 0\nRETURN"} + } + + test {Load script with header without lib name} { + assert_error {ERR Library name was not given} {r function load "#!hello \n"} + } + + test {Load script with header with unknown param} { + assert_error {ERR Invalid metadata value given: nme=mylib} {r function load "#!hello nme=mylib\n"} + } + + test {Load script with header with lib name passed twice} { + assert_error {ERR Invalid metadata value, name argument was given multiple times} {r function load "#!hello name=mylib2 name=mylib3\n"} + } + + test {Load script with invalid function name} { + assert_error {ERR Function names can only contain letters, numbers, or underscores(_) and must be at least one character long} {r function load "#!hello name=mylib2\nFUNCTION foo-bar\nARGS 0\nRETURN"} + } + + test {Load script with duplicate function} { + assert_error {ERR Function already exists in the library} {r function load "#!hello name=mylib2\nFUNCTION foo\nARGS 0\nRETURN\nFUNCTION foo\nARGS 0\nRETURN"} + } + + test {Call scripting engine function: calling foo works} { + r fcall foo 0 134 + } {134} + + test {Call scripting engine function: calling bar works} { + r fcall bar 0 + } {432} + + test {Replace function library and call functions} { + set result [r function load replace "#!hello name=mylib\nFUNCTION foo\nARGS 0\nRETURN\nFUNCTION bar\nCONSTI 500\nRETURN"] + assert_equal $result "mylib" + + set result [r fcall foo 0 132] + assert_equal $result 132 + + set result [r fcall bar 0] + assert_equal $result 500 + } + + test {List scripting engine functions} { + r function load replace "#!hello name=mylib\nFUNCTION foobar\nARGS 0\nRETURN" + r function list + } {{library_name mylib engine HELLO functions {{name foobar description {} flags {}}}}} + + test {Unload scripting engine module} { + set result [r module unload helloengine] + assert_equal $result "OK" + } +} From 695089173ca7104b9dbd13d7650b8abd8f265cee Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 12 Nov 2024 16:06:42 +0000 Subject: [PATCH 3/6] Adds comments to helloscripting.c structs and functions Signed-off-by: Ricardo Dias --- tests/modules/helloscripting.c | 140 +++++++++++++++++++++++++-------- 1 file changed, 109 insertions(+), 31 deletions(-) diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index 7e3c9a6c32..778a023f1d 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -1,18 +1,47 @@ #include "valkeymodule.h" -#include #include #include +#include - +/* + * This module implements a very simple stack based scripting language. + * It's purpose is only to test the valkey module API to implement scripting + * engines. + * + * The language is called HELLO, and a program in this language is formed by + * a list of function definitions. + * The language only supports 32-bit integer, and it only allows to return an + * integer constant, or return the value passed as the first argument to the + * function. + * + * Example of a program: + * + * ``` + * FUNCTION foo + * ARGS 0 # pushes the value in the first argument to the top of the + * stack RETURN # returns the current value on the top of the stack + * + * FUNCTION bar + * CONSTI 432 # pushes the value 432 to the top of the stack + * RETURN # returns the current value on the top of the stack + * ``` + */ + +/* + * List of instructions of the HELLO language. + */ typedef enum HelloInstKind { FUNCTION = 0, CONSTI, ARGS, RETURN, - _END, + _NUM_INSTRUCTIONS, // Not a real instruction. } HelloInstKind; +/* + * String representations of the instructions above. + */ const char *HelloInstKindStr[] = { "FUNCTION", "CONSTI", @@ -20,6 +49,10 @@ const char *HelloInstKindStr[] = { "RETURN", }; +/* + * Struct that represents an instance of an instruction. + * Instructions may have at most one parameter. + */ typedef struct HelloInst { HelloInstKind kind; union { @@ -28,17 +61,28 @@ typedef struct HelloInst { } param; } HelloInst; +/* + * Struct that represents an instance of a function. + * A function is just a list of instruction instances. + */ typedef struct HelloFunc { char *name; HelloInst instructions[256]; uint32_t num_instructions; } HelloFunc; +/* + * Struct that represents an instance of an HELLO program. + * A program is just a list of function instances. + */ typedef struct HelloProgram { HelloFunc *functions[16]; uint32_t num_functions; } HelloProgram; +/* + * Struct that represents the runtime context of an HELLO program. + */ typedef struct HelloLangCtx { HelloProgram *program; } HelloLangCtx; @@ -47,15 +91,29 @@ typedef struct HelloLangCtx { static HelloLangCtx *hello_ctx = NULL; +static uint32_t str2int(const char *str) { + char *end; + errno = 0; + uint32_t val = (uint32_t)strtoul(str, &end, 10); + ValkeyModule_Assert(errno == 0); + return val; +} + +/* + * Parses the kind of instruction that the current token points to. + */ static HelloInstKind helloLangParseInstruction(const char *token) { - for (HelloInstKind i = 0; i < _END; i++) { + for (HelloInstKind i = 0; i < _NUM_INSTRUCTIONS; i++) { if (strcmp(HelloInstKindStr[i], token) == 0) { return i; } } - return _END; + return _NUM_INSTRUCTIONS; } +/* + * Parses the function param. + */ static void helloLangParseFunction(HelloFunc *func) { char *token = strtok(NULL, " \n"); ValkeyModule_Assert(token != NULL); @@ -63,30 +121,35 @@ static void helloLangParseFunction(HelloFunc *func) { strcpy(func->name, token); } -static uint32_t str2int(const char *str) { - char *end; - errno = 0; - uint32_t val = (uint32_t)strtoul(str, &end, 10); - ValkeyModule_Assert(errno == 0); - return val; -} - +/* + * Parses an integer parameter. + */ static void helloLangParseIntegerParam(HelloFunc *func) { char *token = strtok(NULL, " \n"); func->instructions[func->num_instructions].param.integer = str2int(token); } +/* + * Parses the CONSTI instruction parameter. + */ static void helloLangParseConstI(HelloFunc *func) { helloLangParseIntegerParam(func); func->num_instructions++; } +/* + * Parses the ARGS instruction parameter. + */ static void helloLangParseArgs(HelloFunc *func) { helloLangParseIntegerParam(func); func->num_instructions++; } -static HelloProgram *helloLangParseCode(const char *code, HelloProgram *program) { +/* + * Parses an HELLO program source code. + */ +static HelloProgram *helloLangParseCode(const char *code, + HelloProgram *program) { char *_code = ValkeyModule_Alloc(sizeof(char) * strlen(code) + 1); strcpy(_code, code); @@ -120,7 +183,7 @@ static HelloProgram *helloLangParseCode(const char *code, HelloProgram *program) currentFunc->num_instructions++; currentFunc = NULL; break; - case _END: + default: ValkeyModule_Assert(0); } @@ -132,7 +195,11 @@ static HelloProgram *helloLangParseCode(const char *code, HelloProgram *program) return program; } -static uint32_t executeHelloLangFunction(HelloFunc *func, ValkeyModuleString **args, int nargs) { +/* + * Executes an HELLO function. + */ +static uint32_t executeHelloLangFunction(HelloFunc *func, + ValkeyModuleString **args, int nargs) { uint32_t stack[64]; int sp = 0; @@ -155,7 +222,7 @@ static uint32_t executeHelloLangFunction(HelloFunc *func, ValkeyModuleString **a ValkeyModule_Assert(sp == 0); return val; case FUNCTION: - case _END: + default: ValkeyModule_Assert(0); } } @@ -165,8 +232,15 @@ static uint32_t executeHelloLangFunction(HelloFunc *func, ValkeyModuleString **a } static size_t engineGetUsedMemoy(void *engine_ctx) { - VALKEYMODULE_NOT_USED(engine_ctx); - return 0; + HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx; + size_t memory = ValkeyModule_MallocSize(ctx); + memory += ValkeyModule_MallocSize(ctx->program); + for (uint32_t i = 0; i < ctx->program->num_functions; i++) { + HelloFunc *func = ctx->program->functions[i]; + memory += ValkeyModule_MallocSize(func); + memory += ValkeyModule_MallocSize(func->name); + } + return memory; } static size_t engineMemoryOverhead(void *engine_ctx) { @@ -191,7 +265,9 @@ static void engineFreeFunction(void *engine_ctx, void *compiled_function) { ValkeyModule_Free(func); } -static int createHelloLangEngine(void *engine_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li, const char *code, size_t timeout, char **err) { +static int createHelloLangEngine(void *engine_ctx, + ValkeyModuleScriptingEngineFunctionLibrary *li, + const char *code, size_t timeout, char **err) { VALKEYMODULE_NOT_USED(timeout); HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx; @@ -207,10 +283,11 @@ static int createHelloLangEngine(void *engine_ctx, ValkeyModuleScriptingEngineFu for (uint32_t i = 0; i < ctx->program->num_functions; i++) { HelloFunc *func = ctx->program->functions[i]; - int ret = ValkeyModule_RegisterScriptingEngineFunction(func->name, func, li, NULL, 0, err); + int ret = ValkeyModule_RegisterScriptingEngineFunction(func->name, func, li, + NULL, 0, err); if (ret != 0) { // We need to cleanup all parsed functions that were not registered. - for (uint32_t j=i; j < ctx->program->num_functions; j++) { + for (uint32_t j = i; j < ctx->program->num_functions; j++) { engineFreeFunction(NULL, ctx->program->functions[j]); } return ret; @@ -220,13 +297,11 @@ static int createHelloLangEngine(void *engine_ctx, ValkeyModuleScriptingEngineFu return 0; } -static void callHelloLangFunction(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx, - void *engine_ctx, - void *compiled_function, - ValkeyModuleString **keys, - size_t nkeys, - ValkeyModuleString **args, - size_t nargs) { +static void +callHelloLangFunction(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx, + void *engine_ctx, void *compiled_function, + ValkeyModuleString **keys, size_t nkeys, + ValkeyModuleString **args, size_t nargs) { VALKEYMODULE_NOT_USED(engine_ctx); VALKEYMODULE_NOT_USED(keys); VALKEYMODULE_NOT_USED(nkeys); @@ -239,11 +314,14 @@ static void callHelloLangFunction(ValkeyModuleScriptingEngineFunctionCallCtx *fu ValkeyModule_ReplyWithLongLong(ctx, result); } -int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { +int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, + int argc) { VALKEYMODULE_NOT_USED(argv); VALKEYMODULE_NOT_USED(argc); - if (ValkeyModule_Init(ctx, "helloengine", 1, VALKEYMODULE_APIVER_1) == VALKEYMODULE_ERR) return VALKEYMODULE_ERR; + if (ValkeyModule_Init(ctx, "helloengine", 1, VALKEYMODULE_APIVER_1) == + VALKEYMODULE_ERR) + return VALKEYMODULE_ERR; hello_ctx = ValkeyModule_Alloc(sizeof(HelloLangCtx)); hello_ctx->program = NULL; From 202f8de289f2fee0f193040b169be511c89ba93e Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 12 Nov 2024 17:56:29 +0000 Subject: [PATCH 4/6] Fixes test failure Signed-off-by: Ricardo Dias --- tests/modules/helloscripting.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index 778a023f1d..46b77c0308 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -167,6 +167,7 @@ static HelloProgram *helloLangParseCode(const char *code, case FUNCTION: ValkeyModule_Assert(currentFunc == NULL); currentFunc = ValkeyModule_Alloc(sizeof(HelloFunc)); + memset(currentFunc, 0, sizeof(HelloFunc)); program->functions[program->num_functions++] = currentFunc; helloLangParseFunction(currentFunc); break; @@ -289,6 +290,8 @@ static int createHelloLangEngine(void *engine_ctx, // We need to cleanup all parsed functions that were not registered. for (uint32_t j = i; j < ctx->program->num_functions; j++) { engineFreeFunction(NULL, ctx->program->functions[j]); + ctx->program->functions[j] = NULL; + ctx->program->num_functions--; } return ret; } From 1a05ac107275899e9d2f3f412104e44589aec2d9 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 12 Nov 2024 17:59:57 +0000 Subject: [PATCH 5/6] Improve hello lang general comment in helloscripting.c Signed-off-by: Ricardo Dias --- tests/modules/helloscripting.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/modules/helloscripting.c b/tests/modules/helloscripting.c index 46b77c0308..ed615bf88f 100644 --- a/tests/modules/helloscripting.c +++ b/tests/modules/helloscripting.c @@ -18,13 +18,16 @@ * Example of a program: * * ``` - * FUNCTION foo + * FUNCTION foo # declaration of function 'foo' * ARGS 0 # pushes the value in the first argument to the top of the - * stack RETURN # returns the current value on the top of the stack + * # stack + * RETURN # returns the current value on the top of the stack and marks + * # the end of the function declaration * - * FUNCTION bar + * FUNCTION bar # declaration of function 'bar' * CONSTI 432 # pushes the value 432 to the top of the stack - * RETURN # returns the current value on the top of the stack + * RETURN # returns the current value on the top of the stack and marks + * # the end of the function declaration. * ``` */ From ace2f4b1f25799e8224a63e2bb3cc9b9eb871340 Mon Sep 17 00:00:00 2001 From: Ricardo Dias Date: Tue, 12 Nov 2024 18:13:07 +0000 Subject: [PATCH 6/6] Added a few more tests to test the commands FUNCTION [FLUSH|DUMP|RESTORE|DELETE] Signed-off-by: Ricardo Dias --- tests/unit/moduleapi/scriptingengine.tcl | 44 ++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/unit/moduleapi/scriptingengine.tcl b/tests/unit/moduleapi/scriptingengine.tcl index b9ff08fd92..c350633dd8 100644 --- a/tests/unit/moduleapi/scriptingengine.tcl +++ b/tests/unit/moduleapi/scriptingengine.tcl @@ -75,6 +75,50 @@ start_server {tags {"modules"}} { r function list } {{library_name mylib engine HELLO functions {{name foobar description {} flags {}}}}} + test {Load a second library and call a function} { + r function load "#!hello name=mylib2\nFUNCTION getarg\nARGS 0\nRETURN" + set result [r fcall getarg 0 456] + assert_equal $result 456 + } + + test {Delete all libraries and functions} { + set result [r function flush] + assert_equal $result {OK} + r function list + } {} + + test {Test the deletion of a single library} { + r function load $HELLO_PROGRAM + r function load "#!hello name=mylib2\nFUNCTION getarg\nARGS 0\nRETURN" + + set result [r function delete mylib] + assert_equal $result {OK} + + set result [r fcall getarg 0 446] + assert_equal $result 446 + } + + test {Test dump and restore function library} { + r function load $HELLO_PROGRAM + + set result [r fcall bar 0] + assert_equal $result 432 + + set dump [r function dump] + + set result [r function flush] + assert_equal $result {OK} + + set result [r function restore $dump] + assert_equal $result {OK} + + set result [r fcall getarg 0 436] + assert_equal $result 436 + + set result [r fcall bar 0] + assert_equal $result 432 + } + test {Unload scripting engine module} { set result [r module unload helloengine] assert_equal $result "OK"