From 0ae8ba9483fdf040d1d16593492ca971b7c5be6a Mon Sep 17 00:00:00 2001 From: defooli Date: Thu, 10 Mar 2022 13:18:31 +0800 Subject: [PATCH 1/3] Support push client and server --- .gitignore | 1 + _examples/PushServer/client/client.go | 27 ++++ _examples/PushServer/config.conf | 23 ++++ _examples/PushServer/debugtool/dumpstack.go | 21 +++ _examples/PushServer/main.go | 36 +++++ _examples/PushServer/makefile | 10 ++ .../PushServer/scripts/makefile.tars.gomod | 123 ++++++++++++++++++ _examples/PushServer/start.sh | 4 + tars/adapter.go | 29 ++++- tars/model/Servant.go | 1 + tars/protocol/push/client.go | 35 +++++ tars/protocol/push/server.go | 82 ++++++++++++ tars/servant.go | 14 ++ tars/transport/tcphandler.go | 2 +- tars/transport/udphandler.go | 1 + tars/util/current/tarscurrent.go | 24 ++++ 16 files changed, 429 insertions(+), 4 deletions(-) create mode 100644 _examples/PushServer/client/client.go create mode 100755 _examples/PushServer/config.conf create mode 100644 _examples/PushServer/debugtool/dumpstack.go create mode 100755 _examples/PushServer/main.go create mode 100755 _examples/PushServer/makefile create mode 100755 _examples/PushServer/scripts/makefile.tars.gomod create mode 100755 _examples/PushServer/start.sh create mode 100644 tars/protocol/push/client.go create mode 100644 tars/protocol/push/server.go diff --git a/.gitignore b/.gitignore index 248b43d2..614f289d 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ *.tar.gz *.tgz tars-protocol +*go.sum diff --git a/_examples/PushServer/client/client.go b/_examples/PushServer/client/client.go new file mode 100644 index 00000000..4054479c --- /dev/null +++ b/_examples/PushServer/client/client.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "time" + + "github.com/TarsCloud/TarsGo/tars" + "github.com/TarsCloud/TarsGo/tars/protocol/push" +) + +func callback(data []byte) { + fmt.Println("recv message:", string(data)) +} + +func main() { + comm := tars.NewCommunicator() + obj := "TestApp.PushServer.MessageObj@tcp -h 127.0.0.1 -p 10015 -t 60000" + client := push.NewClient(callback) + comm.StringToProxy(obj, client) + data, err := client.Connect([]byte("hello")) + if err != nil { + panic(err) + } + fmt.Println("connect ok", string(data)) + // Wait for receving message + time.Sleep(time.Second * 10) +} diff --git a/_examples/PushServer/config.conf b/_examples/PushServer/config.conf new file mode 100755 index 00000000..b6ed407e --- /dev/null +++ b/_examples/PushServer/config.conf @@ -0,0 +1,23 @@ + + + + app=TestApp + server=PushServer + local=tcp -h 127.0.0.1 -p 10014 -t 30000 + logpath=/tmp + + allow + endpoint=tcp -h 127.0.0.1 -p 10015 -t 60000 + handlegroup=TestApp.PushServer.MessageObjAdapter + maxconns=200000 + protocol=not_tars + queuecap=10000 + queuetimeout=60000 + servant=TestApp.PushServer.MessageObj + shmcap=0 + shmkey=0 + threads=1 + + + + diff --git a/_examples/PushServer/debugtool/dumpstack.go b/_examples/PushServer/debugtool/dumpstack.go new file mode 100644 index 00000000..7b807009 --- /dev/null +++ b/_examples/PushServer/debugtool/dumpstack.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" + + "github.com/TarsCloud/TarsGo/tars" + "github.com/TarsCloud/TarsGo/tars/protocol/res/adminf" +) + +func main() { + comm := tars.NewCommunicator() + obj := "TestApp.PushServer.MessageObj@tcp -h 127.0.0.1 -p 10014 -t 60000" + app := new(adminf.AdminF) + comm.StringToProxy(obj, app) + ret, err := app.Notify("tars.dumpstack") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(ret) +} diff --git a/_examples/PushServer/main.go b/_examples/PushServer/main.go new file mode 100755 index 00000000..db28571b --- /dev/null +++ b/_examples/PushServer/main.go @@ -0,0 +1,36 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/TarsCloud/TarsGo/tars" + "github.com/TarsCloud/TarsGo/tars/protocol/push" + "github.com/TarsCloud/TarsGo/tars/util/current" +) + +type pushImp struct{} + +// OnConnect ... +func (p *pushImp) OnConnect(ctx context.Context, req []byte) []byte { + ip, _ := current.GetClientIPFromContext(ctx) + port, _ := current.GetClientPortFromContext(ctx) + fmt.Println("on connect:", ip, port) + go func() { + for i := 0; i < 3; i++ { + time.Sleep(time.Millisecond * 100) + if err := push.Send(ctx, []byte("msg"+fmt.Sprint(i))); err != nil { + fmt.Println("send error", err) + } + } + }() + return req +} + +func main() { + cfg := tars.GetServerConfig() + proto := push.NewServer(&pushImp{}) + tars.AddServantWithProtocol(proto, cfg.App+"."+cfg.Server+".MessageObj") + tars.Run() +} diff --git a/_examples/PushServer/makefile b/_examples/PushServer/makefile new file mode 100755 index 00000000..742e183d --- /dev/null +++ b/_examples/PushServer/makefile @@ -0,0 +1,10 @@ +APP := TestApp +TARGET := PushServer +MFLAGS := +DFLAGS := +CONFIG := client +STRIP_FLAG:= N +J2GO_FLAG:= + +libpath=${subst :, ,$(GOPATH)} +$(eval -include scripts/makefile.tars.gomod) diff --git a/_examples/PushServer/scripts/makefile.tars.gomod b/_examples/PushServer/scripts/makefile.tars.gomod new file mode 100755 index 00000000..2991ab48 --- /dev/null +++ b/_examples/PushServer/scripts/makefile.tars.gomod @@ -0,0 +1,123 @@ +#------------------------------------------------------------------------------- +#fix cgo compile error +export LC_ALL = en_US.UTF-8 +export LANG = en_US.UTF-8 +#------------------------------------------------------------------------------- + +GOPATH ?= $(shell go env GOPATH) +GOROOT ?= $(shell go env GOROOT) +GO = ${GOROOT}/bin/go + +#------------------------------------------------------------------------------- +libpath=${subst :, ,$(GOPATH)} +TARS2GO := $(firstword $(subst :, , $(GOPATH)))/bin/tars2go +GOMODULENAME:= $(shell head -n1 go.mod | awk '{print $$2}') + +ifeq (,$(findstring -outdir,$(J2GO_FLAG))) + J2GO_FLAG += -outdir=tars-protocol +endif + +ifeq (,$(findstring -module,$(J2GO_FLAG))) + J2GO_FLAG += -module=${GOMODULENAME} +endif + +PB2GO := $(firstword $(subst :, , $(GOPATH)))/bin/protoc + +#------------------------------------------------------------------------------- + +TARS_SRC := $(wildcard *.tars) +PRO_SRC += $(wildcard *.proto) +GO_SRC := $(wildcard *.go) + +#---------------------------------------------------------------------------------- + +copyfile = if test -z "$(APP)" || test -z "$(TARGET)"; then \ + echo "['APP' or 'TARGET' option is empty.]"; exit 1; \ + else \ + if test ! -d $(2); then \ + echo "[No such dir:$(2), now we create it.]";\ + mkdir -p $(2);\ + fi; \ + echo "[Copy file $(1) -> $(2)]"; \ + cp -v $(1) $(2); \ + fi; + +ALL: $(TARGET) +#---------------------------------------------------------------------------------- +$(TARGET): TARSBUILD $(GO_SRC) + $(GO) mod tidy + $(GO) build $(GO_BUILD_FLAG) -o $@ + +#---------------------------------------------------------------------------------- +ifneq ($(strip $(TARS_SRC)),) +TARSBUILD: $(TARS_SRC) + @echo "install $(TARS2GO)..." + #go get github.com/TarsCloud/TarsGo/tars/tools/tars2go && go install github.com/TarsCloud/TarsGo/tars/tools/tars2go + @echo -e "\e[33;1m$(TARS2GO)\e[0m \e[36;1m ${TARS_SRC} \e[0m..." + $(TARS2GO) $(J2GO_FLAG) $(TARS_SRC) +else +TARSBUILD: $(TARS_SRC) + @echo "no tars file" +endif + +ifneq ($(PRO_SRC),) +PROBUILD: $(PRO_SRC) + @echo -e "\e[33;1mprotoc\e[0m \e[36;1m ${PRO_SRC} \e[0m..." + @echo $(PB2GO) ${PB2GO_FLAG} $(addprefix --proto_path=, $(sort $(dir $(PRO_SRC)))) $(PRO_SRC) + $(foreach file,$(PRO_SRC),$(eval echo $(PB2GO) ${PB2GO_FLAG} --proto_path=$(dir $file) $file)) + for file in $(sort $(PRO_SRC));\ + do \ + dirname=$$(dirname $$file);\ + $(PB2GO) ${PB2GO_FLAG} --go_out=plugins=tarsrpc:$$dirname --proto_path=$$dirname $$file;\ + done +else +PROBUILD: $(PRO_SRC) + @echo "no proto file" +endif + +#---------------------------------------------------------------------------------- +tar: $(TARGET) $(CONFIG) + @if [ -d $(TARGET)_tmp_dir ]; then \ + echo "dir has exist:$(TARGET)_tmp_dir, abort."; \ + exit 1; \ + else \ + mkdir $(TARGET)_tmp_dir $(TARGET)_tmp_dir/$(TARGET);\ + cp -rf $(TARGET) $(CONFIG) $(TARGET)_tmp_dir/$(TARGET)/; \ + cd $(TARGET)_tmp_dir; tar --exclude=".svn" --exclude="_svn" -czvf $(TARGET).tgz $(TARGET)/; cd ..; \ + if [ -f "$(TARGET).tgz" ]; then \ + mv -vf $(TARGET).tgz $(TARGET).`date +%Y%m%d%H%M%S`.tgz; \ + fi; \ + mv $(TARGET)_tmp_dir/$(TARGET).tgz ./; \ + rm -rf $(TARGET)_tmp_dir; \ + echo "tar cvfz $(TARGET).tgz ..."; \ + fi + + +HELP += $(HELP_TAR) + +ifneq ($(TARS_SRC),) + +SERVER_NAME := $(TARGET) + +endif +#---------------------------------------------------------------------------------- + +clean: + rm -vf $(DEPEND_TARS_OBJ) $(INVOKE_DEPEND_TARS_OBJ) $(LOCAL_OBJ) $(TARGET) $(TARGETS) $(DEP_FILE) ${CLEANFILE} .*.d.tmp gmon.out + rm -vf *$(TARGET)*.tgz + +cleanall: + rm -vf $(DEPEND_TARS_H) $(DEPEND_TARS_CPP) $(DEPEND_TARS_OBJ) $(LOCAL_OBJ) $(HCE_H) $(HCE_CPP) $(TARGET) $(TARGETS) $(DEP_FILE) ${CLEANFILE} *.o .*.d.tmp .*.d gmon.out + rm -vf *$(TARGET)*.tgz + +HELP += $(HELP_CLEAN) +HELP += $(HELP_CLEANALL) + +HELP_CLEAN = "\n\e[1;33mclean\e[0m:\t\t[remove $(LOCAL_OBJ) $(TARGET)]" +HELP_CLEANALL = "\n\e[1;33mcleanall\e[0m:\t[clean & rm .*.d]" +HELP_TAR = "\n\e[1;33mtar\e[0m:\t\t[will do 'tar $(TARGET).tgz $(RELEASE_FILE)']" + +help: + @echo -e $(HELP)"\n" + +#------------------------------------------------------------------------------- diff --git a/_examples/PushServer/start.sh b/_examples/PushServer/start.sh new file mode 100755 index 00000000..faccc14a --- /dev/null +++ b/_examples/PushServer/start.sh @@ -0,0 +1,4 @@ +#!/bin/bash +set -ex +make +./PushServer --config=config.conf diff --git a/tars/adapter.go b/tars/adapter.go index 26bbc2dd..c1657f3f 100755 --- a/tars/adapter.go +++ b/tars/adapter.go @@ -13,6 +13,7 @@ import ( "github.com/TarsCloud/TarsGo/tars/transport" "github.com/TarsCloud/TarsGo/tars/util/endpoint" "github.com/TarsCloud/TarsGo/tars/util/rtimer" + "github.com/TarsCloud/TarsGo/tars/util/tools" ) var reconnectMsg = "_reconnect_" @@ -34,6 +35,8 @@ type AdapterProxy struct { lastBlockTime int64 lastCheckTime int64 lastKeepAliveTime int64 + pushCallback func([]byte) + onceKeepAlive sync.Once closed bool } @@ -91,7 +94,7 @@ func (c *AdapterProxy) Recv(pkg []byte) { return } if packet.IRequestId == 0 { - go c.onPush(packet) + c.onPush(packet) return } if packet.CPacketType == basef.TARSONEWAY { @@ -210,8 +213,27 @@ func (c *AdapterProxy) onPush(pkg *requestf.ResponsePacket) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*ClientIdleTimeout) defer cancel() oldClient.GraceClose(ctx) // grace shutdown + return + } + // Support push msg + if c.pushCallback == nil { + return + } + data := tools.Int8ToByte(pkg.SBuffer) + c.pushCallback(data) +} + +func (c *AdapterProxy) autoKeepAlive() { + interval := c.comm.Client.ClientIdleTimeout / 2 + if interval == 0 { + interval = time.Minute + } + for range time.NewTicker(interval).C { + if c.closed { + return + } + c.doKeepAlive() } - // TODO: support push msg } func (c *AdapterProxy) doKeepAlive() { @@ -229,10 +251,11 @@ func (c *AdapterProxy) doKeepAlive() { } c.lastKeepAliveTime = now + reqId := c.obj.genRequestID() req := requestf.RequestPacket{ IVersion: c.obj.version, CPacketType: int8(basef.TARSONEWAY), - IRequestId: c.obj.genRequestID(), + IRequestId: reqId, SServantName: c.obj.name, SFuncName: "tars_ping", ITimeout: int32(c.obj.timeout), diff --git a/tars/model/Servant.go b/tars/model/Servant.go index f72b3c17..5d6eedc0 100755 --- a/tars/model/Servant.go +++ b/tars/model/Servant.go @@ -17,6 +17,7 @@ type Servant interface { TarsSetTimeout(t int) TarsSetProtocol(Protocol) Name() string + SetPushCallback(callback func([]byte)) } type Protocol interface { diff --git a/tars/protocol/push/client.go b/tars/protocol/push/client.go new file mode 100644 index 00000000..00069d38 --- /dev/null +++ b/tars/protocol/push/client.go @@ -0,0 +1,35 @@ +package push + +import ( + "context" + + "github.com/TarsCloud/TarsGo/tars/model" + "github.com/TarsCloud/TarsGo/tars/protocol/res/requestf" + "github.com/TarsCloud/TarsGo/tars/util/tools" +) + +// Client is the pushing client +type Client struct { + servant model.Servant + callback func(data []byte) +} + +// SetServant implements client servant +func (c *Client) SetServant(s model.Servant) { + s.SetPushCallback(c.callback) + c.servant = s +} + +// NewClient returns the client for pushing message +func NewClient(callback func(data []byte)) *Client { + return &Client{callback: callback} +} + +// Connect starts to connect to pushing server +func (c *Client) Connect(req []byte) ([]byte, error) { + rsp := &requestf.ResponsePacket{} + if err := c.servant.Tars_invoke(context.Background(), 0, "push", req, nil, nil, rsp); err != nil { + return nil, err + } + return tools.Int8ToByte(rsp.SBuffer), nil +} diff --git a/tars/protocol/push/server.go b/tars/protocol/push/server.go new file mode 100644 index 00000000..b805ccb3 --- /dev/null +++ b/tars/protocol/push/server.go @@ -0,0 +1,82 @@ +package push + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "net" + + "github.com/TarsCloud/TarsGo/tars" + "github.com/TarsCloud/TarsGo/tars/protocol/codec" + "github.com/TarsCloud/TarsGo/tars/protocol/res/requestf" + "github.com/TarsCloud/TarsGo/tars/transport" + "github.com/TarsCloud/TarsGo/tars/util/current" + "github.com/TarsCloud/TarsGo/tars/util/tools" +) + +// PushServer defines the pushing server +type PushServer interface { + OnConnect(ctx context.Context, req []byte) []byte +} + +type serverProtocol struct { + tars.TarsProtocol + s PushServer +} + +// Send push message to client +func Send(ctx context.Context, data []byte) error { + conn, udpAddr, ok := current.GetRawConn(ctx) + if !ok { + return fmt.Errorf("connection not found") + } + rsp := &requestf.ResponsePacket{ + SBuffer: tools.ByteToInt8(data), + } + rspData := response2Bytes(rsp) + var err error + if udpAddr != nil { + udpConn, _ := conn.(*net.UDPConn) + _, err = udpConn.WriteToUDP(rspData, udpAddr) + } else { + _, err = conn.Write(rspData) + } + return err +} + +// NewServer return a server for pushing message +func NewServer(s PushServer) transport.ServerProtocol { + return &serverProtocol{TarsProtocol: tars.TarsProtocol{}, s: s} +} + +// Invoke process request and send response +func (s *serverProtocol) Invoke(ctx context.Context, reqBytes []byte) []byte { + req := &requestf.RequestPacket{} + rsp := &requestf.ResponsePacket{} + is := codec.NewReader(reqBytes[4:]) + if err := req.ReadFrom(is); err != nil { + rsp.IRet = 1 + rsp.SResultDesc = "decode request package error" + } else { + rsp.IRequestId = req.IRequestId + rsp.CPacketType = req.CPacketType + if req.SFuncName != "tars_ping" { + rspData := s.s.OnConnect(ctx, tools.Int8ToByte(req.SBuffer)) + rsp.SBuffer = tools.ByteToInt8(rspData) + } + } + return response2Bytes(rsp) +} + +func response2Bytes(rsp *requestf.ResponsePacket) []byte { + os := codec.NewBuffer() + rsp.WriteTo(os) + bs := os.ToBytes() + sbuf := bytes.NewBuffer(nil) + sbuf.Write(make([]byte, 4)) + sbuf.Write(bs) + len := sbuf.Len() + binary.BigEndian.PutUint32(sbuf.Bytes(), uint32(len)) + return sbuf.Bytes() +} diff --git a/tars/servant.go b/tars/servant.go index e14a5382..f2cd7ded 100755 --- a/tars/servant.go +++ b/tars/servant.go @@ -37,6 +37,8 @@ type ServantProxy struct { version int16 proto model.Protocol queueLen int32 + + pushCallback func([]byte) } // NewServantProxy creates and initializes a servant proxy @@ -99,6 +101,11 @@ func (s *ServantProxy) genRequestID() int32 { } } +// SetPushCallback set callback function for pushing +func (s *ServantProxy) SetPushCallback(callback func([]byte)) { + s.pushCallback = callback +} + // TarsInvoke is used for client invoking server. func (s *ServantProxy) TarsInvoke(ctx context.Context, cType byte, sFuncName string, @@ -217,6 +224,13 @@ func (s *ServantProxy) doInvoke(ctx context.Context, msg *Message, timeout time. current.SetServerPortWithContext(ctx, fmt.Sprintf("%v", ep.Port)) msg.Adp = adp adp.obj = s + + if s.pushCallback != nil { + // auto keep alive for push client + go adp.onceKeepAlive.Do(adp.autoKeepAlive) + adp.pushCallback = s.pushCallback + } + atomic.AddInt32(&s.queueLen, 1) readCh := make(chan *requestf.ResponsePacket) adp.resp.Store(msg.Req.IRequestId, readCh) diff --git a/tars/transport/tcphandler.go b/tars/transport/tcphandler.go index ca4ad5f1..47ecd73e 100755 --- a/tars/transport/tcphandler.go +++ b/tars/transport/tcphandler.go @@ -69,7 +69,7 @@ func (h *tcpHandler) getConnContext(connSt *connInfo) context.Context { current.SetClientIPWithContext(ctx, ipPort[0]) current.SetClientPortWithContext(ctx, ipPort[1]) current.SetRecvPkgTsFromContext(ctx, time.Now().UnixNano()/1e6) - + current.SetRawConnWithContext(ctx, connSt.conn, nil) return ctx } diff --git a/tars/transport/udphandler.go b/tars/transport/udphandler.go index b4077277..892745c1 100755 --- a/tars/transport/udphandler.go +++ b/tars/transport/udphandler.go @@ -64,6 +64,7 @@ func (h *udpHandler) Handle() error { current.SetClientIPWithContext(ctx, udpAddr.IP.String()) current.SetClientPortWithContext(ctx, strconv.Itoa(udpAddr.Port)) current.SetRecvPkgTsFromContext(ctx, time.Now().UnixNano()/1e6) + current.SetRawConnWithContext(ctx, h.conn, udpAddr) atomic.AddInt32(&h.ts.numInvoke, 1) rsp := h.ts.invoke(ctx, pkg) // no need to check package diff --git a/tars/util/current/tarscurrent.go b/tars/util/current/tarscurrent.go index 02bb42f8..63337785 100644 --- a/tars/util/current/tarscurrent.go +++ b/tars/util/current/tarscurrent.go @@ -2,6 +2,8 @@ package current import ( "context" + "net" + "github.com/TarsCloud/TarsGo/tars/util/trace" ) @@ -23,6 +25,9 @@ type Current struct { needDyeing bool dyeingUser string traceData *trace.TraceData + + rawConn net.Conn + udpAddr *net.UDPAddr } // NewCurrent return a Current point. @@ -309,3 +314,22 @@ func SetTraceData(ctx context.Context, traceData *trace.TraceData) bool { } return ok } + +// GetRawConn get the raw tcp/udp connection from the context. +func GetRawConn(ctx context.Context) (net.Conn, *net.UDPAddr, bool) { + tc, ok := currentFromContext(ctx) + if ok { + return tc.rawConn, tc.udpAddr, true + } + return nil, nil, false +} + +// SetUDPConnWithContext set tcp/udp connection to the tars current. +func SetRawConnWithContext(ctx context.Context, conn net.Conn, udpAddr *net.UDPAddr) bool { + tc, ok := currentFromContext(ctx) + if ok { + tc.rawConn = conn + tc.udpAddr = udpAddr + } + return ok +} From 64f54ebca1b296529252d026e484fa2c7ec1cffe Mon Sep 17 00:00:00 2001 From: defooli Date: Thu, 17 Mar 2022 10:53:58 +0800 Subject: [PATCH 2/3] upgrade api --- tars/protocol/push/client.go | 2 +- tars/protocol/push/server.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tars/protocol/push/client.go b/tars/protocol/push/client.go index 00069d38..7b13dc67 100644 --- a/tars/protocol/push/client.go +++ b/tars/protocol/push/client.go @@ -28,7 +28,7 @@ func NewClient(callback func(data []byte)) *Client { // Connect starts to connect to pushing server func (c *Client) Connect(req []byte) ([]byte, error) { rsp := &requestf.ResponsePacket{} - if err := c.servant.Tars_invoke(context.Background(), 0, "push", req, nil, nil, rsp); err != nil { + if err := c.servant.TarsInvoke(context.Background(), 0, "push", req, nil, nil, rsp); err != nil { return nil, err } return tools.Int8ToByte(rsp.SBuffer), nil diff --git a/tars/protocol/push/server.go b/tars/protocol/push/server.go index b805ccb3..3b9c5423 100644 --- a/tars/protocol/push/server.go +++ b/tars/protocol/push/server.go @@ -21,7 +21,7 @@ type PushServer interface { } type serverProtocol struct { - tars.TarsProtocol + tars.Protocol s PushServer } @@ -47,7 +47,7 @@ func Send(ctx context.Context, data []byte) error { // NewServer return a server for pushing message func NewServer(s PushServer) transport.ServerProtocol { - return &serverProtocol{TarsProtocol: tars.TarsProtocol{}, s: s} + return &serverProtocol{Protocol: tars.Protocol{}, s: s} } // Invoke process request and send response From 6686ffa15418a12a005378547c42e19840e03705 Mon Sep 17 00:00:00 2001 From: Defool Date: Tue, 22 Mar 2022 11:54:59 +0800 Subject: [PATCH 3/3] Add on close callback --- _examples/PushServer/.gitignore | 1 + _examples/PushServer/main.go | 7 +++++++ tars/protocol/push/server.go | 5 +++++ 3 files changed, 13 insertions(+) create mode 100644 _examples/PushServer/.gitignore diff --git a/_examples/PushServer/.gitignore b/_examples/PushServer/.gitignore new file mode 100644 index 00000000..35fbfa95 --- /dev/null +++ b/_examples/PushServer/.gitignore @@ -0,0 +1 @@ +PushServer diff --git a/_examples/PushServer/main.go b/_examples/PushServer/main.go index db28571b..df7adb86 100755 --- a/_examples/PushServer/main.go +++ b/_examples/PushServer/main.go @@ -28,6 +28,13 @@ func (p *pushImp) OnConnect(ctx context.Context, req []byte) []byte { return req } +// OnClose ... +func (p *pushImp) OnClose(ctx context.Context) { + ip, _ := current.GetClientIPFromContext(ctx) + port, _ := current.GetClientPortFromContext(ctx) + fmt.Println("on close:", ip, port) +} + func main() { cfg := tars.GetServerConfig() proto := push.NewServer(&pushImp{}) diff --git a/tars/protocol/push/server.go b/tars/protocol/push/server.go index 3b9c5423..ccadb2fc 100644 --- a/tars/protocol/push/server.go +++ b/tars/protocol/push/server.go @@ -18,6 +18,7 @@ import ( // PushServer defines the pushing server type PushServer interface { OnConnect(ctx context.Context, req []byte) []byte + OnClose(ctx context.Context) } type serverProtocol struct { @@ -50,6 +51,10 @@ func NewServer(s PushServer) transport.ServerProtocol { return &serverProtocol{Protocol: tars.Protocol{}, s: s} } +func (s *serverProtocol) DoClose(ctx context.Context) { + s.s.OnClose(ctx) +} + // Invoke process request and send response func (s *serverProtocol) Invoke(ctx context.Context, reqBytes []byte) []byte { req := &requestf.RequestPacket{}