diff --git a/lualib/core.lua b/lualib/core.lua index 76ecd9f6..8c6b0214 100644 --- a/lualib/core.lua +++ b/lualib/core.lua @@ -521,7 +521,7 @@ end, log_info("[sys.core] SILLY_UDP fd:", fd, "closed") end end, -[7] = function(signum) --SILLY_ERROR = 7 + [7] = function(signum) --SILLY_SIGNAL = 7 local fn = signal_dispatch[signum] if fn then local t = task_create(fn) diff --git a/lualib/core/cluster.lua b/lualib/core/cluster.lua index 91533633..878c1bfa 100644 --- a/lualib/core/cluster.lua +++ b/lualib/core/cluster.lua @@ -1,11 +1,11 @@ local core = require "core" -local mutex = require "core.sync.mutex" local dns = require "core.dns" local logger = require "core.logger" local np = require "core.netpacket" local type = type local pairs = pairs local assert = assert +local format = string.format local tcp_connect = core.tcp_connect local tcp_send = core.tcp_send local tcp_close = core.socket_close @@ -28,51 +28,24 @@ local mt = { } local function connect_wrapper(self) - local lock = self.__lock local fdaddr = self.__fdaddr - local connecting = self.__connecting return function(addr) - local fd = fdaddr[addr] - if fd then - return fd, "connected" - end - connecting[addr] = true - local l = lock:lock(addr) - fd = fdaddr[addr] - if fd then - return fd, "connected" - end - while connecting[addr] do - local newaddr = addr - local name, port = addr:match("([^:]+):(%d+)") - if dns.isname(name) then - local ip = dns.lookup(name, dns.A) - if ip then - newaddr = ip .. ":" .. port - else - newaddr = nil - logger.error("[rpc.client] dns lookup fail", name) - end + local newaddr = addr + local name, port = addr:match("([^:]+):(%d+)") + if dns.isname(name) then + local ip = dns.lookup(name, dns.A) + if ip then + newaddr = ip .. ":" .. port + else + return nil, format("dns lookup:%s fail", name) end - if newaddr then - local fd, errno = tcp_connect(newaddr, self.__event) - if fd then - if connecting[addr] then - connecting[addr] = nil - fdaddr[addr] = fd - fdaddr[fd] = addr - return fd, "ok" - else --already close - tcp_close(fd) - return nil, "active closed" - end - else - logger.error("[rpc.client] connect fail", addr, errno) - end - end - core.sleep(1000) - logger.info("[rpc.client] retry connect:", addr) end + local fd, errstr = tcp_connect(newaddr, self.__event) + if not fd then + return fd, errstr + end + fdaddr[fd] = addr + return fd, "ok" end end @@ -82,29 +55,18 @@ local function listen_wrapper(self) if not fd then return fd, errno end - self.__fdaddr[addr] = fd self.__fdaddr[fd] = addr return fd, nil end end local function close_wrapper(self) - return function(addr) - local connecting = self.__connecting - if connecting[addr] then - connecting[addr] = nil - return true, "connecting" - end + return function(fd) local fdaddr = self.__fdaddr - local fd = fdaddr[addr] - if not fd then + if not fdaddr[fd] then return false, "closed" end - if type(addr) == "string" then - addr, fd = fd, addr - end fdaddr[fd] = nil - fdaddr[addr] = nil core.socket_close(fd) return true, "connected" end @@ -125,7 +87,6 @@ local function init_event(self, conf) local EVENT = {} function EVENT.accept(fd, _, addr) fdaddr[fd] = addr - fdaddr[addr] = fd local ok, err = pcall(accept, fd, addr) if not ok then logger.error("[rpc.server] EVENT.accept", err) @@ -135,9 +96,7 @@ local function init_event(self, conf) end function EVENT.close(fd, errno) - local addr = fdaddr[fd] fdaddr[fd] = nil - fdaddr[addr] = nil local ok, err = pcall(close, fd, errno) if not ok then logger.error("[rpc.server] EVENT.close", err) @@ -272,19 +231,17 @@ local M = {} function M.new(conf) ---@class core.cluster local obj = { - __lock = mutex:new(), __ctx = np.create(), __fdaddr = {}, __waitpool = {}, __waitcmd = {}, __waitfor = nil, __event = nil, - __connecting = {}, ---@type async fun(addr:string):number?,string? connect = nil, ---@type fun(addr:string, backlog?:number):number?,string? listen = nil, - ---@type fun(addr:string):boolean,string? + ---@type fun(addr:string|integer):boolean,string? close = nil, ---@type fun(fd:number, cmd:string, obj:any):any?,string? call = nil, diff --git a/test/testrpc.lua b/test/testrpc.lua index e0cb0d1e..c9536a40 100644 --- a/test/testrpc.lua +++ b/test/testrpc.lua @@ -87,7 +87,7 @@ local server = cluster.new { end, } -server.listen(":8989") +local listen_fd = server.listen(":8989") local client_fd local client = cluster.new { timeout = 1000, @@ -182,8 +182,8 @@ end client_part() server_part() -client.close("127.0.0.1:8989") -server.close(":8989") -server.close(accept_addr) +client.close(client_fd) +server.close(listen_fd) +server.close(accept_fd) testaux.asserteq(next(client.__fdaddr), nil, "client fdaddr empty") testaux.asserteq(next(server.__fdaddr), nil, "client fdaddr empty")