Skip to content

Commit

Permalink
Merge pull request #29 from marstr/unblockSBManagement
Browse files Browse the repository at this point in the history
Adding RPC AMQP Session Reuse.
  • Loading branch information
marstr authored Oct 15, 2018
2 parents 5eca153 + 6423579 commit 1b59633
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 27 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change Log

## `v1.1.0`

- adding the ability to reuse an AMQP session while making RPCs
- bug fixes

## `v1.0.3`
- updating dependencies, adding new 'go-autorest' constraint

Expand Down
60 changes: 33 additions & 27 deletions rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,35 +70,41 @@ func NewLink(conn *amqp.Client, address string) (*Link, error) {
return nil, err
}

authSender, err := authSession.NewSender(
amqp.LinkTargetAddress(address),
)
if err != nil {
return nil, err
}
return NewLinkWithSession(conn, authSession, address)
}

linkID, err := uuid.NewV4()
if err != nil {
return nil, err
}
// NewLinkWithSession will build a new request response link, but will reuse an existing AMQP session
func NewLinkWithSession(conn *amqp.Client, session *amqp.Session, address string) (*Link, error) {

id := linkID.String()
clientAddress := strings.Replace("$", "", address, -1) + replyPostfix + id
authReceiver, err := authSession.NewReceiver(
amqp.LinkSourceAddress(address),
amqp.LinkTargetAddress(clientAddress),
)
if err != nil {
return nil, err
}
authSender, err := session.NewSender(
amqp.LinkTargetAddress(address),
)
if err != nil {
return nil, err
}

linkID, err := uuid.NewV4()
if err != nil {
return nil, err
}

id := linkID.String()
clientAddress := strings.Replace("$", "", address, -1) + replyPostfix + id
authReceiver, err := session.NewReceiver(
amqp.LinkSourceAddress(address),
amqp.LinkTargetAddress(clientAddress),
)
if err != nil {
return nil, err
}

return &Link{
sender: authSender,
receiver: authReceiver,
session: authSession,
clientAddress: clientAddress,
id: id,
}, nil
return &Link{
sender: authSender,
receiver: authReceiver,
session: session,
clientAddress: clientAddress,
id: id,
}, nil
}

// RetryableRPC attempts to retry a request a number of times with delay
Expand Down Expand Up @@ -183,7 +189,7 @@ func (l *Link) RPC(ctx context.Context, msg *amqp.Message) (*Response, error) {
for i := range descriptionCandidates {
if rawDescription, ok := res.ApplicationProperties[descriptionCandidates[i]]; ok {
descriptionFound = true
if description, ok = rawDescription.(string); ok {
if description, ok = rawDescription.(string); ok || rawDescription == nil {
break
} else {
return nil, errors.New("status description was not of expected type string")
Expand Down

0 comments on commit 1b59633

Please sign in to comment.