Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(otel): add opentelemety utility functions #272

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
22 changes: 15 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg
/*
PublishWithContext sends a Publishing from the client to an exchange on the server.

NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured.
NOTE: Context termination is not honoured.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're now using the context for span propagation in-process.


When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
Expand Down Expand Up @@ -1523,8 +1523,9 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
*/
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
return ch.Publish(exchange, key, mandatory, immediate, msg)
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
return err
}

/*
Expand Down Expand Up @@ -1583,11 +1584,18 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.

NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed
to this function is not honoured.
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context
variant. The termination of the context passed to this function is not
honoured.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
_, msg, errFn := spanForPublication(ctx, msg, exchange, key, immediate)
dc, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
if err != nil {
errFn(err)
Comment on lines +1594 to +1595
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errFn needs to be called regardless of there is an err or not, to properly ends the Span.

Suggested change
if err != nil {
errFn(err)
errFn(err)
if err != nil {

Maybe also rename errFn to endFn to make the intention clearer.

return nil, err
}
return dc, nil
}

/*
Expand Down
231 changes: 144 additions & 87 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package amqp091

import (
"errors"
"time"
"context"
"errors"
"fmt"
"time"

"go.opentelemetry.io/otel/trace"
)

var errDeliveryNotInitialized = errors.New("delivery not initialized")
Expand All @@ -17,88 +21,106 @@ var errDeliveryNotInitialized = errors.New("delivery not initialized")
//
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple, requeue bool) error
Reject(tag uint64, requeue bool) error
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple, requeue bool) error
Reject(tag uint64, requeue bool) error
}

// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
Acknowledger Acknowledger // the channel from which this delivery arrived

Headers Table // Application or header exchange table

// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation identifier
ReplyTo string // application use - address to reply to (ex: RPC)
Expiration string // implementation use - message expiration spec
MessageId string // application use - message identifier
Timestamp time.Time // application use - message timestamp
Type string // application use - message type name
UserId string // application use - creating user - should be authenticated user
AppId string // application use - creating application id

// Valid only with Channel.Consume
ConsumerTag string

// Valid only with Channel.Get
MessageCount uint32

DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key

Body []byte
Acknowledger Acknowledger // the channel from which this delivery arrived

Headers Table // Application or header exchange table

// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation identifier
ReplyTo string // application use - address to reply to (ex: RPC)
Expiration string // implementation use - message expiration spec
MessageId string // application use - message identifier
Timestamp time.Time // application use - message timestamp
Type string // application use - message type name
UserId string // application use - creating user - should be authenticated user
AppId string // application use - creating application id

// Valid only with Channel.Consume
ConsumerTag string

// Valid only with Channel.Get
MessageCount uint32

DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key

Body []byte
}

// Span returns context and a span that for the delivery
// the resulting span is linked to the publication that created it, if it has
// the appropraite headers set. See [context-propagation] for more details
//
// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/
func (d Delivery) Span(
ctx context.Context,
options ...trace.SpanStartOption,
) (context.Context, trace.Span) {
return spanForDelivery(ctx, &d, options...)
}

// Link returns a link for the delivery. The link points to the publication, if
// the appropriate headers are set.
func (d Delivery) Link(ctx context.Context) trace.Link {
return extractLinkFromDelivery(ctx, &d)
}

func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()

delivery := Delivery{
Acknowledger: channel,

Headers: props.Headers,
ContentType: props.ContentType,
ContentEncoding: props.ContentEncoding,
DeliveryMode: props.DeliveryMode,
Priority: props.Priority,
CorrelationId: props.CorrelationId,
ReplyTo: props.ReplyTo,
Expiration: props.Expiration,
MessageId: props.MessageId,
Timestamp: props.Timestamp,
Type: props.Type,
UserId: props.UserId,
AppId: props.AppId,

Body: body,
}

// Properties for the delivery types
switch m := msg.(type) {
case *basicDeliver:
delivery.ConsumerTag = m.ConsumerTag
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey

case *basicGetOk:
delivery.MessageCount = m.MessageCount
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
}

return &delivery
props, body := msg.getContent()

delivery := Delivery{
Acknowledger: channel,

Headers: props.Headers,
ContentType: props.ContentType,
ContentEncoding: props.ContentEncoding,
DeliveryMode: props.DeliveryMode,
Priority: props.Priority,
CorrelationId: props.CorrelationId,
ReplyTo: props.ReplyTo,
Expiration: props.Expiration,
MessageId: props.MessageId,
Timestamp: props.Timestamp,
Type: props.Type,
UserId: props.UserId,
AppId: props.AppId,

Body: body,
}

// Properties for the delivery types
switch m := msg.(type) {
case *basicDeliver:
delivery.ConsumerTag = m.ConsumerTag
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey

case *basicGetOk:
delivery.MessageCount = m.MessageCount
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
}

return &delivery
}

/*
Expand All @@ -121,10 +143,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Ack(multiple bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
}

/*
Expand All @@ -141,10 +163,10 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Reject(requeue bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Reject(d.DeliveryTag, requeue)
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Reject(d.DeliveryTag, requeue)
}

/*
Expand All @@ -166,8 +188,43 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Nack(multiple, requeue bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
}

type DeliveryResponse uint8

const (
Ack DeliveryResponse = iota
Reject
Nack
)

func (r DeliveryResponse) Name() string {
switch r {
case Ack:
return "ack"
case Nack:
return "nack"
case Reject:
return "reject"
default:
return "unknown"
}
}

func (d Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error {
defer settleDelivery(ctx, &d, response, multiple, requeue)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my understanding of messaging semconv, (“Settle” spans SHOULD be created for every manually or automatically triggered settlement operation. A single “Settle” span can account for a single message or for multiple messages (in case messages are passed for settling as batches). For each message it accounts for, the “Settle” span MAY link to the creation context of the message.)
the settle Span should start before calling Acknowledger.Ack() etc., and end right after Acknowledger.Ack() have returned.

switch response {
case Ack:
return d.Ack(multiple)
case Nack:
return d.Nack(multiple, requeue)
case Reject:
return d.Reject(requeue)
default:
return fmt.Errorf("unknown operation %s", response.Name())
}
}
16 changes: 14 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
module github.com/rabbitmq/amqp091-go

go 1.20
go 1.21

require go.uber.org/goleak v1.3.0
toolchain go1.22.0

require (
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
)
19 changes: 18 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading
Loading