Skip to content

Commit

Permalink
Merge pull request #343 from TarsCloud/pushing
Browse files Browse the repository at this point in the history
Support push client and server
  • Loading branch information
lbbniu authored Mar 23, 2022
2 parents a53ab89 + 6686ffa commit 02257a2
Show file tree
Hide file tree
Showing 17 changed files with 442 additions and 4 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
*.tar.gz
*.tgz
tars-protocol
*go.sum
1 change: 1 addition & 0 deletions _examples/PushServer/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PushServer
27 changes: 27 additions & 0 deletions _examples/PushServer/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"fmt"
"time"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/protocol/push"
)

func callback(data []byte) {
fmt.Println("recv message:", string(data))
}

func main() {
comm := tars.NewCommunicator()
obj := "TestApp.PushServer.MessageObj@tcp -h 127.0.0.1 -p 10015 -t 60000"
client := push.NewClient(callback)
comm.StringToProxy(obj, client)
data, err := client.Connect([]byte("hello"))
if err != nil {
panic(err)
}
fmt.Println("connect ok", string(data))
// Wait for receving message
time.Sleep(time.Second * 10)
}
23 changes: 23 additions & 0 deletions _examples/PushServer/config.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<tars>
<application>
<server>
app=TestApp
server=PushServer
local=tcp -h 127.0.0.1 -p 10014 -t 30000
logpath=/tmp
<TestApp.PushServer.MessageObjAdapter>
allow
endpoint=tcp -h 127.0.0.1 -p 10015 -t 60000
handlegroup=TestApp.PushServer.MessageObjAdapter
maxconns=200000
protocol=not_tars
queuecap=10000
queuetimeout=60000
servant=TestApp.PushServer.MessageObj
shmcap=0
shmkey=0
threads=1
</TestApp.PushServer.MessageObjAdapter>
</server>
</application>
</tars>
21 changes: 21 additions & 0 deletions _examples/PushServer/debugtool/dumpstack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"fmt"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/protocol/res/adminf"
)

func main() {
comm := tars.NewCommunicator()
obj := "TestApp.PushServer.MessageObj@tcp -h 127.0.0.1 -p 10014 -t 60000"
app := new(adminf.AdminF)
comm.StringToProxy(obj, app)
ret, err := app.Notify("tars.dumpstack")
if err != nil {
fmt.Println(err)
return
}
fmt.Println(ret)
}
43 changes: 43 additions & 0 deletions _examples/PushServer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"fmt"
"time"

"github.com/TarsCloud/TarsGo/tars"
"github.com/TarsCloud/TarsGo/tars/protocol/push"
"github.com/TarsCloud/TarsGo/tars/util/current"
)

type pushImp struct{}

// OnConnect ...
func (p *pushImp) OnConnect(ctx context.Context, req []byte) []byte {
ip, _ := current.GetClientIPFromContext(ctx)
port, _ := current.GetClientPortFromContext(ctx)
fmt.Println("on connect:", ip, port)
go func() {
for i := 0; i < 3; i++ {
time.Sleep(time.Millisecond * 100)
if err := push.Send(ctx, []byte("msg"+fmt.Sprint(i))); err != nil {
fmt.Println("send error", err)
}
}
}()
return req
}

// OnClose ...
func (p *pushImp) OnClose(ctx context.Context) {
ip, _ := current.GetClientIPFromContext(ctx)
port, _ := current.GetClientPortFromContext(ctx)
fmt.Println("on close:", ip, port)
}

func main() {
cfg := tars.GetServerConfig()
proto := push.NewServer(&pushImp{})
tars.AddServantWithProtocol(proto, cfg.App+"."+cfg.Server+".MessageObj")
tars.Run()
}
10 changes: 10 additions & 0 deletions _examples/PushServer/makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
APP := TestApp
TARGET := PushServer
MFLAGS :=
DFLAGS :=
CONFIG := client
STRIP_FLAG:= N
J2GO_FLAG:=

libpath=${subst :, ,$(GOPATH)}
$(eval -include scripts/makefile.tars.gomod)
123 changes: 123 additions & 0 deletions _examples/PushServer/scripts/makefile.tars.gomod
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#-------------------------------------------------------------------------------
#fix cgo compile error
export LC_ALL = en_US.UTF-8
export LANG = en_US.UTF-8
#-------------------------------------------------------------------------------

GOPATH ?= $(shell go env GOPATH)
GOROOT ?= $(shell go env GOROOT)
GO = ${GOROOT}/bin/go

#-------------------------------------------------------------------------------
libpath=${subst :, ,$(GOPATH)}
TARS2GO := $(firstword $(subst :, , $(GOPATH)))/bin/tars2go
GOMODULENAME:= $(shell head -n1 go.mod | awk '{print $$2}')

ifeq (,$(findstring -outdir,$(J2GO_FLAG)))
J2GO_FLAG += -outdir=tars-protocol
endif

ifeq (,$(findstring -module,$(J2GO_FLAG)))
J2GO_FLAG += -module=${GOMODULENAME}
endif

PB2GO := $(firstword $(subst :, , $(GOPATH)))/bin/protoc

#-------------------------------------------------------------------------------

TARS_SRC := $(wildcard *.tars)
PRO_SRC += $(wildcard *.proto)
GO_SRC := $(wildcard *.go)

#----------------------------------------------------------------------------------

copyfile = if test -z "$(APP)" || test -z "$(TARGET)"; then \
echo "['APP' or 'TARGET' option is empty.]"; exit 1; \
else \
if test ! -d $(2); then \
echo "[No such dir:$(2), now we create it.]";\
mkdir -p $(2);\
fi; \
echo "[Copy file $(1) -> $(2)]"; \
cp -v $(1) $(2); \
fi;

ALL: $(TARGET)
#----------------------------------------------------------------------------------
$(TARGET): TARSBUILD $(GO_SRC)
$(GO) mod tidy
$(GO) build $(GO_BUILD_FLAG) -o $@

#----------------------------------------------------------------------------------
ifneq ($(strip $(TARS_SRC)),)
TARSBUILD: $(TARS_SRC)
@echo "install $(TARS2GO)..."
#go get github.com/TarsCloud/TarsGo/tars/tools/tars2go && go install github.com/TarsCloud/TarsGo/tars/tools/tars2go
@echo -e "\e[33;1m$(TARS2GO)\e[0m \e[36;1m ${TARS_SRC} \e[0m..."
$(TARS2GO) $(J2GO_FLAG) $(TARS_SRC)
else
TARSBUILD: $(TARS_SRC)
@echo "no tars file"
endif

ifneq ($(PRO_SRC),)
PROBUILD: $(PRO_SRC)
@echo -e "\e[33;1mprotoc\e[0m \e[36;1m ${PRO_SRC} \e[0m..."
@echo $(PB2GO) ${PB2GO_FLAG} $(addprefix --proto_path=, $(sort $(dir $(PRO_SRC)))) $(PRO_SRC)
$(foreach file,$(PRO_SRC),$(eval echo $(PB2GO) ${PB2GO_FLAG} --proto_path=$(dir $file) $file))
for file in $(sort $(PRO_SRC));\
do \
dirname=$$(dirname $$file);\
$(PB2GO) ${PB2GO_FLAG} --go_out=plugins=tarsrpc:$$dirname --proto_path=$$dirname $$file;\
done
else
PROBUILD: $(PRO_SRC)
@echo "no proto file"
endif

#----------------------------------------------------------------------------------
tar: $(TARGET) $(CONFIG)
@if [ -d $(TARGET)_tmp_dir ]; then \
echo "dir has exist:$(TARGET)_tmp_dir, abort."; \
exit 1; \
else \
mkdir $(TARGET)_tmp_dir $(TARGET)_tmp_dir/$(TARGET);\
cp -rf $(TARGET) $(CONFIG) $(TARGET)_tmp_dir/$(TARGET)/; \
cd $(TARGET)_tmp_dir; tar --exclude=".svn" --exclude="_svn" -czvf $(TARGET).tgz $(TARGET)/; cd ..; \
if [ -f "$(TARGET).tgz" ]; then \
mv -vf $(TARGET).tgz $(TARGET).`date +%Y%m%d%H%M%S`.tgz; \
fi; \
mv $(TARGET)_tmp_dir/$(TARGET).tgz ./; \
rm -rf $(TARGET)_tmp_dir; \
echo "tar cvfz $(TARGET).tgz ..."; \
fi


HELP += $(HELP_TAR)

ifneq ($(TARS_SRC),)

SERVER_NAME := $(TARGET)

endif
#----------------------------------------------------------------------------------

clean:
rm -vf $(DEPEND_TARS_OBJ) $(INVOKE_DEPEND_TARS_OBJ) $(LOCAL_OBJ) $(TARGET) $(TARGETS) $(DEP_FILE) ${CLEANFILE} .*.d.tmp gmon.out
rm -vf *$(TARGET)*.tgz

cleanall:
rm -vf $(DEPEND_TARS_H) $(DEPEND_TARS_CPP) $(DEPEND_TARS_OBJ) $(LOCAL_OBJ) $(HCE_H) $(HCE_CPP) $(TARGET) $(TARGETS) $(DEP_FILE) ${CLEANFILE} *.o .*.d.tmp .*.d gmon.out
rm -vf *$(TARGET)*.tgz

HELP += $(HELP_CLEAN)
HELP += $(HELP_CLEANALL)

HELP_CLEAN = "\n\e[1;33mclean\e[0m:\t\t[remove $(LOCAL_OBJ) $(TARGET)]"
HELP_CLEANALL = "\n\e[1;33mcleanall\e[0m:\t[clean & rm .*.d]"
HELP_TAR = "\n\e[1;33mtar\e[0m:\t\t[will do 'tar $(TARGET).tgz $(RELEASE_FILE)']"

help:
@echo -e $(HELP)"\n"

#-------------------------------------------------------------------------------
4 changes: 4 additions & 0 deletions _examples/PushServer/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -ex
make
./PushServer --config=config.conf
29 changes: 26 additions & 3 deletions tars/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/TarsCloud/TarsGo/tars/transport"
"github.com/TarsCloud/TarsGo/tars/util/endpoint"
"github.com/TarsCloud/TarsGo/tars/util/rtimer"
"github.com/TarsCloud/TarsGo/tars/util/tools"
)

var reconnectMsg = "_reconnect_"
Expand All @@ -34,6 +35,8 @@ type AdapterProxy struct {
lastBlockTime int64
lastCheckTime int64
lastKeepAliveTime int64
pushCallback func([]byte)
onceKeepAlive sync.Once

closed bool
}
Expand Down Expand Up @@ -91,7 +94,7 @@ func (c *AdapterProxy) Recv(pkg []byte) {
return
}
if packet.IRequestId == 0 {
go c.onPush(packet)
c.onPush(packet)
return
}
if packet.CPacketType == basef.TARSONEWAY {
Expand Down Expand Up @@ -210,8 +213,27 @@ func (c *AdapterProxy) onPush(pkg *requestf.ResponsePacket) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*ClientIdleTimeout)
defer cancel()
oldClient.GraceClose(ctx) // grace shutdown
return
}
// Support push msg
if c.pushCallback == nil {
return
}
data := tools.Int8ToByte(pkg.SBuffer)
c.pushCallback(data)
}

func (c *AdapterProxy) autoKeepAlive() {
interval := c.comm.Client.ClientIdleTimeout / 2
if interval == 0 {
interval = time.Minute
}
for range time.NewTicker(interval).C {
if c.closed {
return
}
c.doKeepAlive()
}
// TODO: support push msg
}

func (c *AdapterProxy) doKeepAlive() {
Expand All @@ -229,10 +251,11 @@ func (c *AdapterProxy) doKeepAlive() {
}
c.lastKeepAliveTime = now

reqId := c.obj.genRequestID()
req := requestf.RequestPacket{
IVersion: c.obj.version,
CPacketType: int8(basef.TARSONEWAY),
IRequestId: c.obj.genRequestID(),
IRequestId: reqId,
SServantName: c.obj.name,
SFuncName: "tars_ping",
ITimeout: int32(c.obj.timeout),
Expand Down
1 change: 1 addition & 0 deletions tars/model/Servant.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Servant interface {
TarsSetTimeout(t int)
TarsSetProtocol(Protocol)
Name() string
SetPushCallback(callback func([]byte))
}

type Protocol interface {
Expand Down
35 changes: 35 additions & 0 deletions tars/protocol/push/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package push

import (
"context"

"github.com/TarsCloud/TarsGo/tars/model"
"github.com/TarsCloud/TarsGo/tars/protocol/res/requestf"
"github.com/TarsCloud/TarsGo/tars/util/tools"
)

// Client is the pushing client
type Client struct {
servant model.Servant
callback func(data []byte)
}

// SetServant implements client servant
func (c *Client) SetServant(s model.Servant) {
s.SetPushCallback(c.callback)
c.servant = s
}

// NewClient returns the client for pushing message
func NewClient(callback func(data []byte)) *Client {
return &Client{callback: callback}
}

// Connect starts to connect to pushing server
func (c *Client) Connect(req []byte) ([]byte, error) {
rsp := &requestf.ResponsePacket{}
if err := c.servant.TarsInvoke(context.Background(), 0, "push", req, nil, nil, rsp); err != nil {
return nil, err
}
return tools.Int8ToByte(rsp.SBuffer), nil
}
Loading

0 comments on commit 02257a2

Please sign in to comment.