From 09a2ff3801b445f7cbe4d39cbcb7823636521237 Mon Sep 17 00:00:00 2001 From: John Behm Date: Fri, 8 Mar 2024 22:27:36 +0100 Subject: [PATCH] update publisher & test (utlities) --- pool/publisher.go | 8 +- pool/publisher_test.go | 277 ++++++++++++++++------------------------- pool/session_test.go | 14 ++- pool/utils_test.go | 22 ++-- 4 files changed, 134 insertions(+), 187 deletions(-) diff --git a/pool/publisher.go b/pool/publisher.go index 9020878..42d0edd 100644 --- a/pool/publisher.go +++ b/pool/publisher.go @@ -82,10 +82,14 @@ func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey str case errors.Is(err, ErrDeliveryTagMismatch): return err case errors.Is(err, ErrFlowControl): - p.warn(exchange, routingKey, err, "publish failed, retrying") return err default: - p.warn(exchange, routingKey, err, "publish failed, retrying") + if recoverable(err) { + p.warn(exchange, routingKey, err, "publish failed due to recoverable error, retrying") + // retry + } else { + return err + } } } } diff --git a/pool/publisher_test.go b/pool/publisher_test.go index 4a2f58d..07b38f3 100644 --- a/pool/publisher_test.go +++ b/pool/publisher_test.go @@ -3,8 +3,6 @@ package pool_test import ( "context" "fmt" - "os" - "os/signal" "sync" "testing" "time" @@ -15,18 +13,34 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPublisher(t *testing.T) { - ctx := context.TODO() - connections := 1 - sessions := 10 // publisher sessions + consumer sessions - p, err := pool.New( +func TestSinglePublisher(t *testing.T) { + t.Parallel() + + var ( + proxyName, connectURL, _ = testutils.NextConnectURL() + ctx = context.TODO() + nextConnName = testutils.ConnectionNameGenerator() + numMsgs = 20 + ) + + healthyConnCB, hcbAssert := AssertConnectionReconnectAttempts(t, 0) + defer hcbAssert() + hs, hsclose := NewSession( + t, ctx, testutils.HealthyConnectURL, - connections, - sessions, - pool.WithName("TestPublisher"), - pool.WithConfirms(true), + nextConnName(), + pool.ConnectionWithRecoverCallback(healthyConnCB), + ) + defer hsclose() + + p, err := pool.New( + ctx, + connectURL, + 1, + 1, pool.WithLogger(logging.NewTestLogger(t)), + pool.WithConfirms(true), ) if err != nil { assert.NoError(t, err) @@ -34,198 +48,117 @@ func TestPublisher(t *testing.T) { } defer p.Close() - var wg sync.WaitGroup - - channels := sessions / 2 // one sessions for consumer and one for publisher - wg.Add(channels) - for id := 0; id < channels; id++ { - go func(id int64) { - defer wg.Done() - - s, err := p.GetSession(ctx) - if err != nil { - assert.NoError(t, err) - return - } - defer func() { - p.ReturnSession(s, nil) - }() + var ( + nextExchangeName = testutils.ExchangeNameGenerator(hs.Name()) + nextQueueName = testutils.QueueNameGenerator(hs.Name()) + exchangeName = nextExchangeName() + queueName = nextQueueName() + ) + cleanup := DeclareExchangeQueue(t, ctx, hs, exchangeName, queueName) + defer cleanup() - queueName := fmt.Sprintf("TestPublisher-Queue-%d", id) - _, err = s.QueueDeclare(ctx, queueName) - if err != nil { - assert.NoError(t, err) - return - } - defer func() { - i, err := s.QueueDelete(ctx, queueName) - assert.NoError(t, err) - assert.Equal(t, 0, i) - }() + var ( + nextConsumerName = testutils.ConsumerNameGenerator(queueName) + publisherMsgGen = testutils.MessageGenerator(queueName) + consumerMsgGen = testutils.MessageGenerator(queueName) + wg sync.WaitGroup + ) - exchangeName := fmt.Sprintf("TestPublisher-Exchange-%d", id) - err = s.ExchangeDeclare(ctx, exchangeName, "topic") - if err != nil { - assert.NoError(t, err) - return - } - defer func() { - err := s.ExchangeDelete(ctx, exchangeName) - assert.NoError(t, err) - }() + pub := pool.NewPublisher(p) + defer pub.Close() - err = s.QueueBind(ctx, queueName, "#", exchangeName) - if err != nil { - assert.NoError(t, err) - return - } + // TODO: currently this test allows duplication of messages + ConsumeAsyncN(t, ctx, &wg, hs, queueName, nextConsumerName(), consumerMsgGen, numMsgs, true) + + for i := 0; i < numMsgs; i++ { + msg := publisherMsgGen() + err = func(msg string) error { + disconnected, reconnected := DisconnectWithStartedStopped( + t, + proxyName, + 0, + testutils.Jitter(time.Millisecond, 20*time.Millisecond), + testutils.Jitter(100*time.Millisecond, 150*time.Millisecond), + ) defer func() { - err := s.QueueUnbind(ctx, queueName, "#", exchangeName, nil) - assert.NoError(t, err) + disconnected() + reconnected() }() - delivery, err := s.Consume( - queueName, - pool.ConsumeOptions{ - ConsumerTag: fmt.Sprintf("Consumer-%s", queueName), - Exclusive: true, - }, - ) - if err != nil { - assert.NoError(t, err) - return - } - - message := fmt.Sprintf("Message-%s", queueName) - - wg.Add(1) - go func(msg string) { - defer wg.Done() - - for val := range delivery { - receivedMsg := string(val.Body) - assert.Equal(t, message, receivedMsg) - } - // this routine must be closed upon session closure - }(message) - - time.Sleep(5 * time.Second) - - pub := pool.NewPublisher(p) - defer pub.Close() - - pub.Publish(ctx, exchangeName, "", pool.Publishing{ + return pub.Publish(ctx, exchangeName, "", pool.Publishing{ Mandatory: true, - ContentType: "application/json", - Body: []byte(message), + ContentType: "text/plain", + Body: []byte(msg), }) - - time.Sleep(5 * time.Second) - - }(int64(id)) + }(msg) + if err != nil { + assert.NoError(t, err, fmt.Sprintf("when publishing message %s", msg)) + return + } } - wg.Wait() } func TestPublishAwaitFlowControl(t *testing.T) { - ctx, cancel := signal.NotifyContext(context.TODO(), os.Interrupt) - defer cancel() + t.Parallel() + + var ( + ctx = context.TODO() + nextConnName = testutils.ConnectionNameGenerator() + ) + + healthyConnCB, hcbAssert := AssertConnectionReconnectAttempts(t, 0) + defer hcbAssert() + hs, hsclose := NewSession( + t, + ctx, + testutils.HealthyConnectURL, + nextConnName(), + pool.ConnectionWithRecoverCallback(healthyConnCB), + ) + defer hsclose() - connections := 1 - sessions := 2 // publisher sessions + consumer sessions p, err := pool.New( ctx, - testutils.BrokenConnectURL, // memory limit or disk limit reached - connections, - sessions, - pool.WithName("TestPublishAwaitFlowControl"), - pool.WithConfirms(true), + testutils.BrokenConnectURL, + 1, + 1, pool.WithLogger(logging.NewTestLogger(t)), + pool.WithConfirms(true), ) if err != nil { assert.NoError(t, err) return } - defer p.Close() - - s, err := p.GetSession(ctx) + defer func() { + p.Close() + }() + var ( + nextExchangeName = testutils.ExchangeNameGenerator(hs.Name()) + nextQueueName = testutils.QueueNameGenerator(hs.Name()) + exchangeName = nextExchangeName() + queueName = nextQueueName() + ) + ts, err := p.GetTransientSession(ctx) if err != nil { assert.NoError(t, err) return } - defer p.ReturnSession(s, nil) + defer func() { + p.ReturnSession(ts, nil) + }() + cleanup := DeclareExchangeQueue(t, ctx, ts, exchangeName, queueName) + defer cleanup() var ( - exchangeName = "TestPublishAwaitFlowControl-Exchange" + publisherMsgGen = testutils.MessageGenerator(queueName) ) - - cleanup := initQueueExchange(t, s, ctx, "TestPublishAwaitFlowControl-Queue", exchangeName) - defer cleanup() - pub := pool.NewPublisher(p) defer pub.Close() - pubGen := PublishingGenerator("TestPublishAwaitFlowControl") - - err = pub.Publish(ctx, exchangeName, "", pubGen()) + err = pub.Publish(ctx, exchangeName, "", pool.Publishing{ + ContentType: "text/plain", + Body: []byte(publisherMsgGen()), + }) assert.ErrorIs(t, err, pool.ErrFlowControl) - -} - -func initQueueExchange(t *testing.T, s *pool.Session, ctx context.Context, queueName, exchangeName string) (cleanup func()) { - _, err := s.QueueDeclare(ctx, queueName) - if err != nil { - assert.NoError(t, err) - return - } - cleanupList := []func(){} - - cleanupQueue := func() { - i, err := s.QueueDelete(ctx, queueName) - assert.NoError(t, err) - assert.Equal(t, 0, i) - } - cleanupList = append(cleanupList, cleanupQueue) - - err = s.ExchangeDeclare(ctx, exchangeName, "topic") - if err != nil { - assert.NoError(t, err) - return - } - cleanupExchange := func() { - err := s.ExchangeDelete(ctx, exchangeName) - assert.NoError(t, err) - } - cleanupList = append(cleanupList, cleanupExchange) - - err = s.QueueBind(ctx, queueName, "#", exchangeName) - if err != nil { - assert.NoError(t, err) - return - } - cleanupBind := func() { - err := s.QueueUnbind(ctx, queueName, "#", exchangeName, nil) - assert.NoError(t, err) - } - cleanupList = append(cleanupList, cleanupBind) - - return func() { - for i := len(cleanupList) - 1; i >= 0; i-- { - cleanupList[i]() - } - } -} - -func PublishingGenerator(MessagePrefix string) func() pool.Publishing { - i := 0 - return func() pool.Publishing { - defer func() { - i++ - }() - return pool.Publishing{ - ContentType: "application/json", - Body: []byte(fmt.Sprintf("%s-%d", MessagePrefix, i)), - } - } } diff --git a/pool/session_test.go b/pool/session_test.go index ecd33d9..526e18b 100644 --- a/pool/session_test.go +++ b/pool/session_test.go @@ -30,6 +30,7 @@ func TestNewSingleSessionPublishAndConsume(t *testing.T) { consumerName = nextConsumerName() consumeMessageGenerator = testutils.MessageGenerator(queueName) publishMessageGenerator = testutils.MessageGenerator(queueName) + numMsgs = 20 ) reconnectCB, deferredAssert := AssertConnectionReconnectAttempts(t, 0) @@ -70,8 +71,8 @@ func TestNewSingleSessionPublishAndConsume(t *testing.T) { cleanup := DeclareExchangeQueue(t, ctx, s, exchangeName, queueName) defer cleanup() - ConsumeAsyncN(t, ctx, &wg, s, queueName, consumerName, consumeMessageGenerator, 20) - PublishAsyncN(t, ctx, &wg, s, exchangeName, publishMessageGenerator, 20) + ConsumeAsyncN(t, ctx, &wg, s, queueName, consumerName, consumeMessageGenerator, numMsgs, false) + PublishAsyncN(t, ctx, &wg, s, exchangeName, publishMessageGenerator, numMsgs) wg.Wait() } @@ -86,6 +87,7 @@ func TestManyNewSessionsPublishAndConsume(t *testing.T) { connName = nextConnName() nextSessionName = testutils.SessionNameGenerator(connName) sessions = 5 + numMsgs = 20 ) reconnectCB, deferredAssert := AssertConnectionReconnectAttempts(t, 0) @@ -135,8 +137,8 @@ func TestManyNewSessionsPublishAndConsume(t *testing.T) { cleanup := DeclareExchangeQueue(t, ctx, s, exchangeName, queueName) defer cleanup() - ConsumeAsyncN(t, ctx, &wg, s, queueName, consumerName, consumeNextMessage, 20) - PublishAsyncN(t, ctx, &wg, s, exchangeName, publishNextMessage, 20) + ConsumeAsyncN(t, ctx, &wg, s, queueName, consumerName, consumeNextMessage, numMsgs, false) + PublishAsyncN(t, ctx, &wg, s, exchangeName, publishNextMessage, numMsgs) } wg.Wait() @@ -662,7 +664,7 @@ func TestNewSessionPublishWithDisconnect(t *testing.T) { disconnected, reconnected = Disconnect(t, proxyName, 5*time.Second) ) - ConsumeAsyncN(t, ctx, &wg, hs, queueName, nextConsumerName(), consumeMsgGen, numMsgs) + ConsumeAsyncN(t, ctx, &wg, hs, queueName, nextConsumerName(), consumeMsgGen, numMsgs, false) disconnected() PublishN(t, ctx, s, exchangeName, publishMsgGen, numMsgs) @@ -725,7 +727,7 @@ func TestNewSessionConsumeWithDisconnect(t *testing.T) { PublishAsyncN(t, ctx, &wg, hs, exchangeName, publisherMsgGen, numMsgs) disconnected() - ConsumeN(t, ctx, s, queueName, nextConsumerName(), consumerMsgGen, numMsgs) + ConsumeN(t, ctx, s, queueName, nextConsumerName(), consumerMsgGen, numMsgs, false) reconnected() wg.Wait() diff --git a/pool/utils_test.go b/pool/utils_test.go index 48c62e8..6332b8a 100644 --- a/pool/utils_test.go +++ b/pool/utils_test.go @@ -26,6 +26,7 @@ func ConsumeN( consumerName string, messageGenerator func() string, n int, + allowDuplicates bool, ) { cctx, ccancel := context.WithCancel(ctx) defer ccancel() @@ -35,7 +36,7 @@ func ConsumeN( defer func() { assert.Equal(t, n, msgsReceived, "expected to consume %d messages, got %d", n, msgsReceived) }() -outer: + for { delivery, err := c.Consume( queueName, @@ -56,8 +57,9 @@ outer: case <-cctx.Done(): return case val, ok := <-delivery: + require.True(t, ok, "expected delivery channel to be open of consumer %s in ConsumeN", consumerName) if !ok { - continue outer + return } err := val.Ack(false) if err != nil { @@ -65,10 +67,15 @@ outer: return } - var ( - expectedMsg = messageGenerator() - receivedMsg = string(val.Body) - ) + var receivedMsg = string(val.Body) + if allowDuplicates && receivedMsg == previouslyReceivedMsg { + // TODO: it is possible that messages are duplicated, but this is not a problem + // due to network issues. We should not fail the test in this case. + log.Warnf("received duplicate message: %s", receivedMsg) + continue + } + + var expectedMsg = messageGenerator() assert.Equalf( t, expectedMsg, @@ -101,12 +108,13 @@ func ConsumeAsyncN( consumerName string, messageGenerator func() string, n int, + alllowDuplicates bool, ) { wg.Add(1) go func() { defer wg.Done() - ConsumeN(t, ctx, c, queueName, consumerName, messageGenerator, n) + ConsumeN(t, ctx, c, queueName, consumerName, messageGenerator, n, alllowDuplicates) }() }