Skip to content

Commit

Permalink
Adds support for scripting engines as Valkey modules
Browse files Browse the repository at this point in the history
This commit extends the module API to support the addition of different
scripting engines to run user defined functions.

The scripting engine can be implemented as a Valkey module, and can be
dynamically loaded with the `loadmodule` config directive, or with
the `MODULE LOAD` command.

The current module API support, only allows to load scripting engines to
run functions using `FCALL` command.

In a follow up PR, we will move the Lua scripting engine implmentation
into its own module.

Signed-off-by: Ricardo Dias <[email protected]>
  • Loading branch information
rjd15372 committed Nov 12, 2024
1 parent 45d596e commit 331df7f
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 126 deletions.
2 changes: 1 addition & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -2175,7 +2175,7 @@ static int rewriteFunctions(rio *aof) {
dictIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionLibInfo *li = dictGetVal(entry);
ValkeyModuleScriptingEngineFunctionLibrary *li = dictGetVal(entry);
if (rioWrite(aof, "*3\r\n", 4) == 0) goto werr;
char function_load[] = "$8\r\nFUNCTION\r\n$4\r\nLOAD\r\n";
if (rioWrite(aof, function_load, sizeof(function_load) - 1) == 0) goto werr;
Expand Down
39 changes: 19 additions & 20 deletions src/function_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ typedef struct luaFunctionCtx {
} luaFunctionCtx;

typedef struct loadCtx {
functionLibInfo *li;
ValkeyModuleScriptingEngineFunctionLibrary *li;
monotime start_time;
size_t timeout;
} loadCtx;
Expand Down Expand Up @@ -100,7 +100,7 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) {
*
* Return NULL on compilation error and set the error to the err variable
*/
static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size_t timeout, sds *err) {
static int luaEngineCreate(void *engine_ctx, ValkeyModuleScriptingEngineFunctionLibrary *li, const char *blob, size_t timeout, char **err) {
int ret = C_ERR;
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
Expand All @@ -114,8 +114,8 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size
lua_pop(lua, 1); /* pop the metatable */

/* compile the code */
if (luaL_loadbuffer(lua, blob, sdslen(blob), "@user_function")) {
*err = sdscatprintf(sdsempty(), "Error compiling function: %s", lua_tostring(lua, -1));
if (luaL_loadbuffer(lua, blob, strlen(blob), "@user_function")) {
*err = valkey_asprintf("Error compiling function: %s", lua_tostring(lua, -1));
lua_pop(lua, 1); /* pops the error */
goto done;
}
Expand All @@ -133,7 +133,7 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size
if (lua_pcall(lua, 0, 0, 0)) {
errorInfo err_info = {0};
luaExtractErrorInformation(lua, &err_info);
*err = sdscatprintf(sdsempty(), "Error registering functions: %s", err_info.msg);
*err = valkey_asprintf("Error registering functions: %s", err_info.msg);
lua_pop(lua, 1); /* pops the error */
luaErrorInformationDiscard(&err_info);
goto done;
Expand All @@ -158,7 +158,7 @@ static int luaEngineCreate(void *engine_ctx, functionLibInfo *li, sds blob, size
/*
* Invole the give function with the given keys and args
*/
static void luaEngineCall(scriptRunCtx *run_ctx,
static void luaEngineCall(ValkeyModuleScriptingEngineFunctionCallCtx *func_ctx,
void *engine_ctx,
void *compiled_function,
robj **keys,
Expand All @@ -177,6 +177,7 @@ static void luaEngineCall(scriptRunCtx *run_ctx,

serverAssert(lua_isfunction(lua, -1));

scriptRunCtx *run_ctx = moduleGetScriptRunCtxFromFunctionCtx(func_ctx);
luaCallFunction(run_ctx, lua, keys, nkeys, args, nargs, 0);
lua_pop(lua, 1); /* Pop error handler */
}
Expand Down Expand Up @@ -406,12 +407,13 @@ static int luaRegisterFunction(lua_State *lua) {
return luaError(lua);
}

sds err = NULL;
char *err = NULL;
if (functionLibCreateFunction(register_f_args.name, register_f_args.lua_f_ctx, load_ctx->li, register_f_args.desc,
register_f_args.f_flags, &err) != C_OK) {
serverAssert(err != NULL);
luaRegisterFunctionArgsDispose(lua, &register_f_args);
luaPushError(lua, err);
sdsfree(err);
zfree(err);
return luaError(lua);
}

Expand Down Expand Up @@ -494,16 +496,13 @@ int luaEngineInitEngine(void) {
lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the new global table */
lua_replace(lua_engine_ctx->lua, LUA_GLOBALSINDEX); /* set new global table as the new globals */


engine *lua_engine = zmalloc(sizeof(*lua_engine));
*lua_engine = (engine){
.engine_ctx = lua_engine_ctx,
.create = luaEngineCreate,
.call = luaEngineCall,
.get_used_memory = luaEngineGetUsedMemoy,
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
.get_engine_memory_overhead = luaEngineMemoryOverhead,
.free_function = luaEngineFreeFunction,
};
return functionsRegisterEngine(LUA_ENGINE_NAME, lua_engine);
return functionsRegisterEngine(LUA_ENGINE_NAME,
NULL,
lua_engine_ctx,
luaEngineCreate,
luaEngineCall,
luaEngineGetUsedMemoy,
luaEngineFunctionMemoryOverhead,
luaEngineMemoryOverhead,
luaEngineFreeFunction);
}
Loading

0 comments on commit 331df7f

Please sign in to comment.