Skip to content

Commit

Permalink
[#128]: chore: Rename RabbitMQ to AMQP and fix the wrong span in …
Browse files Browse the repository at this point in the history
…Pause function
  • Loading branch information
rustatian authored Mar 12, 2024
2 parents e18be35 + b562302 commit 9098449
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 42 deletions.
2 changes: 1 addition & 1 deletion amqpjobs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
RequireAndVerifyClientCert ClientAuthType = "require_and_verify_client_cert"
)

// pipeline rabbitmq info
// pipeline amqp info
const (
exchangeKey string = "exchange"
exchangeType string = "exchange_type"
Expand Down
14 changes: 7 additions & 7 deletions amqpjobs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Driver struct {
stopped uint64
}

// FromConfig initializes rabbitmq pipeline
// FromConfig initializes AMQP pipeline
func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logger, cfg Configurer, pipeline jobs.Pipeline, pq jobs.Queue) (*Driver, error) {
const op = errors.Op("new_amqp_consumer")

Expand Down Expand Up @@ -177,7 +177,7 @@ func FromConfig(tracer *sdktrace.TracerProvider, configKey string, log *zap.Logg

// save address
jb.connStr = conf.Addr
err = jb.initRabbitMQ()
err = jb.init()
if err != nil {
return nil, errors.E(op, err)
}
Expand Down Expand Up @@ -307,7 +307,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
// save address
jb.connStr = conf.Addr

err = jb.initRabbitMQ()
err = jb.init()
if err != nil {
return nil, errors.E(op, err)
}
Expand Down Expand Up @@ -341,7 +341,7 @@ func FromPipeline(tracer *sdktrace.TracerProvider, pipeline jobs.Pipeline, log *
}

func (d *Driver) Push(ctx context.Context, job jobs.Message) error {
const op = errors.Op("rabbitmq_push")
const op = errors.Op("amqp_driver_push")
// check if the pipeline registered

ctx, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "amqp_push")
Expand All @@ -367,7 +367,7 @@ func (d *Driver) Push(ctx context.Context, job jobs.Message) error {

func (d *Driver) Run(ctx context.Context, p jobs.Pipeline) error {
start := time.Now().UTC()
const op = errors.Op("rabbit_run")
const op = errors.Op("amqp_driver_run")

_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "amqp_run")
defer span.End()
Expand Down Expand Up @@ -490,7 +490,7 @@ func (d *Driver) Pause(ctx context.Context, p string) error {
start := time.Now().UTC()
pipe := *d.pipeline.Load()

_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "amqp_resume")
_, span := trace.SpanFromContext(ctx).TracerProvider().Tracer(tracerName).Start(ctx, "amqp_pause")
defer span.End()

if pipe.Name() != p {
Expand Down Expand Up @@ -618,7 +618,7 @@ func (d *Driver) Stop(ctx context.Context) error {

// handleItem
func (d *Driver) handleItem(ctx context.Context, msg *Item) error {
const op = errors.Op("rabbitmq_handle_item")
const op = errors.Op("amqp_driver_handle_item")
select {
case pch := <-d.publishChan:
// return the channel back
Expand Down
2 changes: 1 addition & 1 deletion amqpjobs/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (d *Driver) listener(deliv <-chan amqp.Delivery) {
span.End()
}

d.log.Debug("delivery channel was closed, leaving the rabbit listener")
d.log.Debug("delivery channel was closed, leaving the AMQP listener")
// reduce number of listeners
if atomic.LoadUint32(&d.listeners) == 0 {
d.log.Debug("number of listeners", zap.Uint32("listeners", atomic.LoadUint32(&d.listeners)))
Expand Down
4 changes: 2 additions & 2 deletions amqpjobs/rabbit_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"github.com/roadrunner-server/errors"
)

func (d *Driver) initRabbitMQ() error {
const op = errors.Op("jobs_plugin_rmq_init")
func (d *Driver) init() error {
const op = errors.Op("jobs_plugin_amqp_init")
// Channel opens a unique, concurrent server channel to process the bulk of AMQP
// messages. Any error from methods on this receiver will render the receiver
// invalid and a new Channel should be opened.
Expand Down
10 changes: 5 additions & 5 deletions amqpjobs/redial.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type redialMsg struct {
err *amqp.Error
}

// redialer used to redial to the rabbitmq in case of the connection interrupts
// redialer used to redial to the server in case of the connection interrupts
func (d *Driver) redialer() { //nolint:gocognit,gocyclo
go func() {
for {
Expand Down Expand Up @@ -220,7 +220,7 @@ func (d *Driver) redialMergeCh() {
}

func (d *Driver) redial(rm *redialMsg) {
const op = errors.Op("rabbitmq_redial")
const op = errors.Op("amqp_driver_redial")
// trash the broken publishing channel
d.reset()

Expand All @@ -239,12 +239,12 @@ func (d *Driver) redial(rm *redialMsg) {
return errors.E(op, err)
}

d.log.Info("rabbitmq dial was succeed. trying to redeclare queues and subscribers")
d.log.Info("amqp dial was succeed. trying to redeclare queues and subscribers")

// re-init connection
err = d.initRabbitMQ()
err = d.init()
if err != nil {
d.log.Error("rabbitmq dial", zap.Error(err))
d.log.Error("amqp dial", zap.Error(err))
return errors.E(op, err)
}

Expand Down
2 changes: 1 addition & 1 deletion tests/jobs_amqp_durability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestDurabilityAMQP_NoQueue(t *testing.T) {
assert.Equal(t, oLogger.FilterMessageSnippet("amqp connection closed").Len(), 1)
assert.Equal(t, oLogger.FilterMessageSnippet("pipeline connection was closed, redialing").Len(), 1)

assert.Equal(t, oLogger.FilterMessageSnippet("rabbitmq dial was succeed. trying to redeclare queues and subscribers").Len(), 1)
assert.Equal(t, oLogger.FilterMessageSnippet("amqp dial was succeed. trying to redeclare queues and subscribers").Len(), 1)
assert.Equal(t, oLogger.FilterMessageSnippet("queues and subscribers was redeclared successfully").Len(), 1)
assert.Equal(t, oLogger.FilterMessageSnippet("connection was successfully restored").Len(), 1)
assert.Equal(t, oLogger.FilterMessageSnippet("redialer restarted").Len(), 1)
Expand Down
50 changes: 25 additions & 25 deletions tests/jobs_amqp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestAMQPFanoutQueueName(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was processed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPHeaders(t *testing.T) {
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestAMQPHeaders(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPHeadersXRoutingKey(t *testing.T) {
Expand Down Expand Up @@ -316,7 +316,7 @@ func TestAMQPHeadersXRoutingKey(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPDeclareHeaders(t *testing.T) {
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestAMQPDeclareHeaders(t *testing.T) {
assert.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPInitTLS(t *testing.T) {
Expand Down Expand Up @@ -490,7 +490,7 @@ func TestAMQPInitTLS(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was processed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPRemoveAllFromPQ(t *testing.T) {
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestAMQPRemoveAllFromPQ(t *testing.T) {
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 200, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQP20Pipelines(t *testing.T) {
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestAMQP20Pipelines(t *testing.T) {
assert.Equal(t, 20, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 20, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 20, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 20, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
assert.Equal(t, 20, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPBug1792(t *testing.T) {
Expand Down Expand Up @@ -769,7 +769,7 @@ func TestAMQPBug1792(t *testing.T) {
assert.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
assert.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPInit(t *testing.T) {
Expand Down Expand Up @@ -852,7 +852,7 @@ func TestAMQPInit(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPInitV27(t *testing.T) {
Expand Down Expand Up @@ -935,7 +935,7 @@ func TestAMQPInitV27(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPRoutingQueue(t *testing.T) {
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func TestAMQPRoutingQueue(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPInitV27RR27(t *testing.T) {
Expand Down Expand Up @@ -1102,7 +1102,7 @@ func TestAMQPInitV27RR27(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPInitV27RR27Durable(t *testing.T) {
Expand Down Expand Up @@ -1185,7 +1185,7 @@ func TestAMQPInitV27RR27Durable(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPReset(t *testing.T) {
Expand Down Expand Up @@ -1274,7 +1274,7 @@ func TestAMQPReset(t *testing.T) {
require.Equal(t, 4, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 4, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 4, oLogger.FilterMessageSnippet("job was processed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPDeclare(t *testing.T) {
Expand Down Expand Up @@ -1362,7 +1362,7 @@ func TestAMQPDeclare(t *testing.T) {
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPDeclareDurable(t *testing.T) {
Expand Down Expand Up @@ -1450,7 +1450,7 @@ func TestAMQPDeclareDurable(t *testing.T) {
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("job was processed successfully").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPJobsError(t *testing.T) {
Expand Down Expand Up @@ -1539,7 +1539,7 @@ func TestAMQPJobsError(t *testing.T) {
require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was resumed").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 3, oLogger.FilterMessageSnippet("jobs protocol error").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPNoGlobalSection(t *testing.T) {
Expand Down Expand Up @@ -1693,7 +1693,7 @@ func TestAMQPStats(t *testing.T) {
require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was paused").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("pipeline was resumed").Len())
require.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPBadResp(t *testing.T) {
Expand Down Expand Up @@ -1777,7 +1777,7 @@ func TestAMQPBadResp(t *testing.T) {
require.Equal(t, 2, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("job processing was started").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("response handler error").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
require.Equal(t, 2, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

// redialer should be restarted
Expand Down Expand Up @@ -1862,12 +1862,12 @@ func TestAMQPSlow(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()

assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len(), 1)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len(), 1)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet(`number of listeners`).Len(), 1)
assert.Equal(t, oLogger.FilterMessageSnippet("consume channel close").Len(), 0)
assert.GreaterOrEqual(
t,
oLogger.FilterMessageSnippet("rabbitmq dial was succeed. trying to redeclare queues and subscribers").Len(),
oLogger.FilterMessageSnippet("amqp dial was succeed. trying to redeclare queues and subscribers").Len(),
1,
)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("queues and subscribers was redeclared successfully").Len(), 1)
Expand Down Expand Up @@ -1959,12 +1959,12 @@ func TestAMQPSlowAutoAck(t *testing.T) {
stopCh <- struct{}{}
wg.Wait()

assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len(), 1)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len(), 1)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet(`number of listeners`).Len(), 1)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("consume channel close").Len(), 0)
assert.GreaterOrEqual(
t,
oLogger.FilterMessageSnippet("rabbitmq dial was succeed. trying to redeclare queues and subscribers").Len(),
oLogger.FilterMessageSnippet("amqp dial was succeed. trying to redeclare queues and subscribers").Len(),
0,
)
assert.GreaterOrEqual(t, oLogger.FilterMessageSnippet("queues and subscribers was redeclared successfully").Len(), 0)
Expand Down Expand Up @@ -2102,7 +2102,7 @@ func TestAMQPRawPayload(t *testing.T) {
assert.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was started").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())
}

func TestAMQPOTEL(t *testing.T) {
Expand Down Expand Up @@ -2210,7 +2210,7 @@ func TestAMQPOTEL(t *testing.T) {
assert.Equal(t, 1, oLogger.FilterMessageSnippet("pipeline was stopped").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("job was pushed successfully").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("job processing was started").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the rabbit listener").Len())
assert.Equal(t, 1, oLogger.FilterMessageSnippet("delivery channel was closed, leaving the AMQP listener").Len())

t.Cleanup(func() {
_ = resp.Body.Close()
Expand Down

0 comments on commit 9098449

Please sign in to comment.