Skip to content

Commit

Permalink
update publisher & test (utlities)
Browse files Browse the repository at this point in the history
  • Loading branch information
jxsl13 committed Mar 8, 2024
1 parent f2218c8 commit 09a2ff3
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 187 deletions.
8 changes: 6 additions & 2 deletions pool/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,14 @@ func (p *Publisher) Publish(ctx context.Context, exchange string, routingKey str
case errors.Is(err, ErrDeliveryTagMismatch):
return err
case errors.Is(err, ErrFlowControl):
p.warn(exchange, routingKey, err, "publish failed, retrying")
return err
default:
p.warn(exchange, routingKey, err, "publish failed, retrying")
if recoverable(err) {
p.warn(exchange, routingKey, err, "publish failed due to recoverable error, retrying")
// retry
} else {
return err
}
}
}
}
Expand Down
277 changes: 105 additions & 172 deletions pool/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package pool_test
import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"testing"
"time"
Expand All @@ -15,217 +13,152 @@ import (
"github.com/stretchr/testify/assert"
)

func TestPublisher(t *testing.T) {
ctx := context.TODO()
connections := 1
sessions := 10 // publisher sessions + consumer sessions
p, err := pool.New(
func TestSinglePublisher(t *testing.T) {
t.Parallel()

var (
proxyName, connectURL, _ = testutils.NextConnectURL()
ctx = context.TODO()
nextConnName = testutils.ConnectionNameGenerator()
numMsgs = 20
)

healthyConnCB, hcbAssert := AssertConnectionReconnectAttempts(t, 0)
defer hcbAssert()
hs, hsclose := NewSession(
t,
ctx,
testutils.HealthyConnectURL,
connections,
sessions,
pool.WithName("TestPublisher"),
pool.WithConfirms(true),
nextConnName(),
pool.ConnectionWithRecoverCallback(healthyConnCB),
)
defer hsclose()

p, err := pool.New(
ctx,
connectURL,
1,
1,
pool.WithLogger(logging.NewTestLogger(t)),
pool.WithConfirms(true),
)
if err != nil {
assert.NoError(t, err)
return
}
defer p.Close()

var wg sync.WaitGroup

channels := sessions / 2 // one sessions for consumer and one for publisher
wg.Add(channels)
for id := 0; id < channels; id++ {
go func(id int64) {
defer wg.Done()

s, err := p.GetSession(ctx)
if err != nil {
assert.NoError(t, err)
return
}
defer func() {
p.ReturnSession(s, nil)
}()
var (
nextExchangeName = testutils.ExchangeNameGenerator(hs.Name())
nextQueueName = testutils.QueueNameGenerator(hs.Name())
exchangeName = nextExchangeName()
queueName = nextQueueName()
)
cleanup := DeclareExchangeQueue(t, ctx, hs, exchangeName, queueName)
defer cleanup()

queueName := fmt.Sprintf("TestPublisher-Queue-%d", id)
_, err = s.QueueDeclare(ctx, queueName)
if err != nil {
assert.NoError(t, err)
return
}
defer func() {
i, err := s.QueueDelete(ctx, queueName)
assert.NoError(t, err)
assert.Equal(t, 0, i)
}()
var (
nextConsumerName = testutils.ConsumerNameGenerator(queueName)
publisherMsgGen = testutils.MessageGenerator(queueName)
consumerMsgGen = testutils.MessageGenerator(queueName)
wg sync.WaitGroup
)

exchangeName := fmt.Sprintf("TestPublisher-Exchange-%d", id)
err = s.ExchangeDeclare(ctx, exchangeName, "topic")
if err != nil {
assert.NoError(t, err)
return
}
defer func() {
err := s.ExchangeDelete(ctx, exchangeName)
assert.NoError(t, err)
}()
pub := pool.NewPublisher(p)
defer pub.Close()

err = s.QueueBind(ctx, queueName, "#", exchangeName)
if err != nil {
assert.NoError(t, err)
return
}
// TODO: currently this test allows duplication of messages
ConsumeAsyncN(t, ctx, &wg, hs, queueName, nextConsumerName(), consumerMsgGen, numMsgs, true)

for i := 0; i < numMsgs; i++ {
msg := publisherMsgGen()
err = func(msg string) error {
disconnected, reconnected := DisconnectWithStartedStopped(
t,
proxyName,
0,
testutils.Jitter(time.Millisecond, 20*time.Millisecond),
testutils.Jitter(100*time.Millisecond, 150*time.Millisecond),
)
defer func() {
err := s.QueueUnbind(ctx, queueName, "#", exchangeName, nil)
assert.NoError(t, err)
disconnected()
reconnected()
}()

delivery, err := s.Consume(
queueName,
pool.ConsumeOptions{
ConsumerTag: fmt.Sprintf("Consumer-%s", queueName),
Exclusive: true,
},
)
if err != nil {
assert.NoError(t, err)
return
}

message := fmt.Sprintf("Message-%s", queueName)

wg.Add(1)
go func(msg string) {
defer wg.Done()

for val := range delivery {
receivedMsg := string(val.Body)
assert.Equal(t, message, receivedMsg)
}
// this routine must be closed upon session closure
}(message)

time.Sleep(5 * time.Second)

pub := pool.NewPublisher(p)
defer pub.Close()

pub.Publish(ctx, exchangeName, "", pool.Publishing{
return pub.Publish(ctx, exchangeName, "", pool.Publishing{
Mandatory: true,
ContentType: "application/json",
Body: []byte(message),
ContentType: "text/plain",
Body: []byte(msg),
})

time.Sleep(5 * time.Second)

}(int64(id))
}(msg)
if err != nil {
assert.NoError(t, err, fmt.Sprintf("when publishing message %s", msg))
return
}
}

wg.Wait()
}

func TestPublishAwaitFlowControl(t *testing.T) {
ctx, cancel := signal.NotifyContext(context.TODO(), os.Interrupt)
defer cancel()
t.Parallel()

var (
ctx = context.TODO()
nextConnName = testutils.ConnectionNameGenerator()
)

healthyConnCB, hcbAssert := AssertConnectionReconnectAttempts(t, 0)
defer hcbAssert()
hs, hsclose := NewSession(
t,
ctx,
testutils.HealthyConnectURL,
nextConnName(),
pool.ConnectionWithRecoverCallback(healthyConnCB),
)
defer hsclose()

connections := 1
sessions := 2 // publisher sessions + consumer sessions
p, err := pool.New(
ctx,
testutils.BrokenConnectURL, // memory limit or disk limit reached
connections,
sessions,
pool.WithName("TestPublishAwaitFlowControl"),
pool.WithConfirms(true),
testutils.BrokenConnectURL,
1,
1,
pool.WithLogger(logging.NewTestLogger(t)),
pool.WithConfirms(true),
)
if err != nil {
assert.NoError(t, err)
return
}
defer p.Close()

s, err := p.GetSession(ctx)
defer func() {
p.Close()
}()
var (
nextExchangeName = testutils.ExchangeNameGenerator(hs.Name())
nextQueueName = testutils.QueueNameGenerator(hs.Name())
exchangeName = nextExchangeName()
queueName = nextQueueName()
)
ts, err := p.GetTransientSession(ctx)
if err != nil {
assert.NoError(t, err)
return
}
defer p.ReturnSession(s, nil)
defer func() {
p.ReturnSession(ts, nil)
}()
cleanup := DeclareExchangeQueue(t, ctx, ts, exchangeName, queueName)
defer cleanup()

var (
exchangeName = "TestPublishAwaitFlowControl-Exchange"
publisherMsgGen = testutils.MessageGenerator(queueName)
)

cleanup := initQueueExchange(t, s, ctx, "TestPublishAwaitFlowControl-Queue", exchangeName)
defer cleanup()

pub := pool.NewPublisher(p)
defer pub.Close()

pubGen := PublishingGenerator("TestPublishAwaitFlowControl")

err = pub.Publish(ctx, exchangeName, "", pubGen())
err = pub.Publish(ctx, exchangeName, "", pool.Publishing{
ContentType: "text/plain",
Body: []byte(publisherMsgGen()),
})
assert.ErrorIs(t, err, pool.ErrFlowControl)

}

func initQueueExchange(t *testing.T, s *pool.Session, ctx context.Context, queueName, exchangeName string) (cleanup func()) {
_, err := s.QueueDeclare(ctx, queueName)
if err != nil {
assert.NoError(t, err)
return
}
cleanupList := []func(){}

cleanupQueue := func() {
i, err := s.QueueDelete(ctx, queueName)
assert.NoError(t, err)
assert.Equal(t, 0, i)
}
cleanupList = append(cleanupList, cleanupQueue)

err = s.ExchangeDeclare(ctx, exchangeName, "topic")
if err != nil {
assert.NoError(t, err)
return
}
cleanupExchange := func() {
err := s.ExchangeDelete(ctx, exchangeName)
assert.NoError(t, err)
}
cleanupList = append(cleanupList, cleanupExchange)

err = s.QueueBind(ctx, queueName, "#", exchangeName)
if err != nil {
assert.NoError(t, err)
return
}
cleanupBind := func() {
err := s.QueueUnbind(ctx, queueName, "#", exchangeName, nil)
assert.NoError(t, err)
}
cleanupList = append(cleanupList, cleanupBind)

return func() {
for i := len(cleanupList) - 1; i >= 0; i-- {
cleanupList[i]()
}
}
}

func PublishingGenerator(MessagePrefix string) func() pool.Publishing {
i := 0
return func() pool.Publishing {
defer func() {
i++
}()
return pool.Publishing{
ContentType: "application/json",
Body: []byte(fmt.Sprintf("%s-%d", MessagePrefix, i)),
}
}
}
Loading

0 comments on commit 09a2ff3

Please sign in to comment.