Skip to content

Commit

Permalink
add call with headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie Li committed Jun 14, 2019
1 parent b8072fc commit 28711b5
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 9 deletions.
35 changes: 29 additions & 6 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ func (p *Process) WaitFor(d time.Duration) error {

// RegisterFunc registers the function using it's reflection name
func (p Process) RegisterFunc(function interface{}) (funcName string, err error) {
if reflect.TypeOf(function).Kind() != reflect.Func {
return "", errors.New("f is not a function")
funcName, err = fn(function)
if err != nil {
return "", err
}
funcName = runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name()
registered := p.server.IsTaskRegistered(funcName)
if !registered {
err = p.server.RegisterTask(funcName, function)
Expand All @@ -116,22 +116,38 @@ func (p Process) Register(funcName string, function interface{}) error {
return nil
}

// Invoke registers the func with it's reflect name, and sends the task
// Invoke calls the func by its reflect name
func (p Process) Invoke(f interface{}, args []tasks.Arg) (jobID string, err error) {
funcName, err := p.RegisterFunc(f)
return p.InvokeWithHeaders(f, args, nil)
}

// InvokeWithHeaders calls the function by its reflect name with headers
func (p Process) InvokeWithHeaders(f interface{}, args []tasks.Arg, headers tasks.Headers) (jobID string, err error) {
funcName, err := fn(f)
if err != nil {
return "", err
}
return p.Call(funcName, args)
return p.CallWithHeaders(funcName, args, headers)
}

// Call calls a registered function, the arguments needs to be in the machinery []Arg format
func (p Process) Call(funcName string, args []tasks.Arg) (jobID string, err error) {
return p.CallWithHeaders(funcName, args, nil)
}

// CallWithHeaders calls a register function with metadata
func (p Process) CallWithHeaders(funcName string, args []tasks.Arg, headers tasks.Headers) (jobID string, err error) {
if !p.server.IsTaskRegistered(funcName) {
return "", errors.Errorf("function %s is not registered", funcName)
}

sig, err := tasks.NewSignature(funcName, args)
if err != nil {
return "", errors.Wrap(err, "process call")
}

sig.Headers = headers

r, err := p.server.SendTask(sig)
if err != nil {
return "", errors.Wrapf(err, "call func %s", funcName)
Expand Down Expand Up @@ -584,3 +600,10 @@ func interruptSubject(jobID string) string {
func headerSubject(jobID string) string {
return "headers_" + jobID
}

func fn(function interface{}) (string, error) {
if reflect.TypeOf(function).Kind() != reflect.Func {
return "", errors.New("f is not a function")
}
return runtime.FuncForPC(reflect.ValueOf(function).Pointer()).Name(), nil
}
44 changes: 44 additions & 0 deletions api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,13 +330,57 @@ func TestGracefulWait(t *testing.T) {
assert.EqualError(t, err, "Worker quit gracefully")
}

func TestInvokeUnregistered(t *testing.T) {
p, err := New("redis://localhost:6379")
require.NoError(t, err)

_, err = p.Invoke(task, []tasks.Arg{
{
Type: "string",
Value: "test invoke",
},
})
require.Error(t, err)
}

func TestCallWithHeaders(t *testing.T) {
p, err := New("redis://localhost:6379")
require.NoError(t, err)
task := func(ctx context.Context, msg string) (string, error) {
sig := tasks.SignatureFromContext(ctx)
assert.EqualValues(t, "bar", sig.Headers["foo"])
return "received " + msg, nil
}

p.RegisterFunc(task)

jobID, err := p.InvokeWithHeaders(
task,
[]tasks.Arg{
{
Type: "string",
Value: "test invoke",
},
},
map[string]interface{}{
"foo": "bar",
},
)

r := p.GetResult(jobID)
_, err = r.Get(1 * time.Millisecond)
require.NoError(t, err)
}

func TestInvoke(t *testing.T) {
p, err := New("redis://localhost:6379")
require.NoError(t, err)
task := func(msg string) (string, error) {
return "received " + msg, nil
}

p.RegisterFunc(task)

jobID, err := p.Invoke(task, []tasks.Arg{
{
Type: "string",
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/jackielii/process
go 1.12

require (
github.com/RichardKnop/machinery v1.6.2
github.com/RichardKnop/machinery v1.6.5
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.0
github.com/pkg/errors v0.8.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ git.apache.org/thrift.git v0.0.0-20181218151757-9b75e4fe745a/go.mod h1:fPE2ZNJGy
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/RichardKnop/logging v0.0.0-20181101035820-b1d5d44c82d6 h1:Vgjpn7q8aQnye8nVJUboZbPd8DFLjYafgjJN2nO73xc=
github.com/RichardKnop/logging v0.0.0-20181101035820-b1d5d44c82d6/go.mod h1:rJJ84PyA/Wlmw1hO+xTzV2wsSUon6J5ktg0g8BF2PuU=
github.com/RichardKnop/machinery v1.6.2 h1:Mn53hDOPj9RF6Lt6HU8DVZCt5PTM4OoZydh3LO4L/ao=
github.com/RichardKnop/machinery v1.6.2/go.mod h1:+QjVq/Z0aWiTc1O0lq34oK9PY6NzYjxVNlLlgTaqoJE=
github.com/RichardKnop/machinery v1.6.5 h1:naU8+o/B1bdQeugr8MLXzoE3qCbeonBGlwOB0b2aL2Y=
github.com/RichardKnop/machinery v1.6.5/go.mod h1:+QjVq/Z0aWiTc1O0lq34oK9PY6NzYjxVNlLlgTaqoJE=
github.com/RichardKnop/redsync v1.2.0 h1:gK35hR3zZkQigHKm8wOGb9MpJ9BsrW6MzxezwjTcHP0=
github.com/RichardKnop/redsync v1.2.0/go.mod h1:9b8nBGAX3bE2uCfJGSnsDvF23mKyHTZzmvmj5FH3Tp0=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
Expand Down

0 comments on commit 28711b5

Please sign in to comment.