Skip to content

Commit

Permalink
remove cluster.rpc and cluster.msg
Browse files Browse the repository at this point in the history
add cluster.lua for more flexible rpc
  • Loading branch information
findstr committed Dec 15, 2024
1 parent 032e51c commit 0004084
Show file tree
Hide file tree
Showing 14 changed files with 554 additions and 779 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ LIB_SRC = lualib-core.c \
lualib-debugger.c\

all: \
fmt \
$(TARGET) \
$(LUACLIB_PATH)/core.so \
$(LUACLIB_PATH)/zproto.so \
Expand Down
69 changes: 49 additions & 20 deletions examples/rpc.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
local core = require "core"
local crypto = require "core.crypto"
local rpc = require "core.cluster.rpc"
local cluster = require "core.cluster"
local zproto = require "zproto"

local proto = zproto:parse [[
Expand All @@ -11,42 +11,71 @@ pong 0x2 {
.txt:string 1
}
]]

assert(proto)
local function unmarshal(cmd, buf, size)
local dat, size = proto:unpack(buf, size, true)
local body = proto:decode(cmd, dat, size)
return body
end

local function marshal(cmd, body)
if type(cmd) == "string" then
cmd = proto:tag(cmd)
end
print("marshal", cmd, body)
local dat, size = proto:encode(cmd, body, true)
local buf, size = proto:pack(dat, size, true)
return cmd, buf, size
end

local callret = {
["ping"] = "pong",
[0x01] = "pong",
}

local server = rpc.listen {
addr = "127.0.0.1:9999",
proto = proto,
local server = cluster.new {
marshal = marshal,
unmarshal = unmarshal,
callret = callret,
accept = function(fd, addr)
print("accept", fd, addr)
end,

call = function(msg, cmd, fd)
print("callee", msg.txt, fd)
return msg
end,
close = function(fd, errno)
print("close", fd, errno)
end,
}

server.listen("127.0.0.1:9999")

local client = cluster.new {
marshal = marshal,
unmarshal = unmarshal,
callret = callret,
call = function(msg, cmd, fd)
print("callee", msg.txt, cmd, fd)
return "pong", msg
end
print("callee", msg.txt, fd)
return msg
end,
close = function(fd, errno)
print("close", fd, errno)
end,
}


core.start(function()
for i = 1, 3 do
core.fork(function()
local conn = rpc.connect {
addr = "127.0.0.1:9999",
proto = proto,
timeout = 5000,
close = function(fd, errno)
end,
}
while true do
local fd, err = client.connect("127.0.0.1:9999")
print("connect", fd, err)
for j = 1, 10000 do
local txt = crypto.randomkey(5)
local ack, cmd = conn:call("ping", {txt = txt})
print("caller", conn, txt, ack.txt)
local ack, cmd = client.ping(fd, {txt = txt})
print("caller", fd, cmd, txt, ack.txt)
assert(ack.txt == txt)
assert(cmd == proto:tag("pong"))
assert(cmd == "pong")
core.sleep(1000)
end
end)
Expand Down
12 changes: 6 additions & 6 deletions lualib-src/lualib-core.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,29 +505,29 @@ static int lsendsize(lua_State *L)

static int ltracespan(lua_State *L)
{
silly_trace_span_t span;
span = (silly_trace_span_t)luaL_checkinteger(L, 1);
silly_tracespan_t span;
span = (silly_tracespan_t)luaL_checkinteger(L, 1);
silly_trace_span(span);
return 0;
}

static int ltracenew(lua_State *L)
{
silly_trace_id_t traceid;
silly_traceid_t traceid;
traceid = silly_trace_new();
lua_pushinteger(L, (lua_Integer)traceid);
return 1;
}

static int ltraceset(lua_State *L)
{
silly_trace_id_t traceid;
silly_traceid_t traceid;
lua_State *co = lua_tothread(L, 1);
silly_worker_resume(co);
if lua_isnoneornil (L, 2) {
traceid = TRACE_WORKER_ID;
} else {
traceid = (silly_trace_id_t)luaL_checkinteger(L, 2);
traceid = (silly_traceid_t)luaL_checkinteger(L, 2);
}
traceid = silly_trace_set(traceid);
lua_pushinteger(L, (lua_Integer)traceid);
Expand All @@ -536,7 +536,7 @@ static int ltraceset(lua_State *L)

static int ltraceget(lua_State *L)
{
silly_trace_id_t traceid;
silly_traceid_t traceid;
traceid = silly_trace_get();
lua_pushinteger(L, (lua_Integer)traceid);
return 1;
Expand Down
Loading

0 comments on commit 0004084

Please sign in to comment.