Skip to content

Commit

Permalink
refactor and simplify first subscriber tests to run in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
jxsl13 committed Mar 9, 2024
1 parent 460785b commit bed77d4
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 85 deletions.
1 change: 1 addition & 0 deletions pool/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,5 @@ func TestPublishAwaitFlowControl(t *testing.T) {
Body: []byte(publisherMsgGen()),
})
assert.ErrorIs(t, err, pool.ErrFlowControl)
// FIXME: this test gets stuck when the sessions in the session pool are closed.:
}
2 changes: 2 additions & 0 deletions pool/subscriber_handler_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
)

func TestWithMaxBatchSize(t *testing.T) {
t.Parallel()

dummyHandler := func(context.Context, []Delivery) error { return nil }
bh := NewBatchHandler("test", dummyHandler, WithMaxBatchSize(0), WithMaxBatchBytes(0))

Expand Down
246 changes: 161 additions & 85 deletions pool/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,120 +13,196 @@ import (
"github.com/stretchr/testify/assert"
)

func TestSubscriber(t *testing.T) {
func TestNewSingleSubscriber(t *testing.T) {
t.Parallel()

ctx := context.TODO()
sessions := 2 // publisher sessions + consumer sessions
p, err := pool.New(
var (
ctx = context.TODO()
poolName = testutils.FuncName()
)

hp, err := pool.New(
ctx,
testutils.HealthyConnectURL,
1,
sessions,
2,
pool.WithConfirms(true),
pool.WithLogger(logging.NewTestLogger(t)),
pool.WithName(poolName),
)
if err != nil {
assert.NoError(t, err)
return
}
defer p.Close()
defer hp.Close()

var wg sync.WaitGroup
var (
nextExchangeName = testutils.ExchangeNameGenerator(poolName)
nextQueueName = testutils.QueueNameGenerator(poolName)
exchangeName = nextExchangeName()
queueName = nextQueueName()
nextConsumerName = testutils.ConsumerNameGenerator(queueName)
consumerName = nextConsumerName()
publisherMsgGen = testutils.MessageGenerator(queueName)
subscriberMsgGen = testutils.MessageGenerator(queueName)
)

channels := sessions / 2 // one sessions for consumer and one for publisher
wg.Add(channels)
for id := 0; id < channels; id++ {
go func(id int64) {
defer wg.Done()
ts, err := hp.GetTransientSession(ctx)
if err != nil {
assert.NoError(t, err)
return
}
defer hp.ReturnSession(ts, nil)

ts, err := p.GetTransientSession(p.Context())
if err != nil {
assert.NoError(t, err)
return
}
defer p.ReturnSession(ts, nil)
cleanup := DeclareExchangeQueue(t, ctx, ts, exchangeName, queueName)
defer cleanup()

queueName := fmt.Sprintf("TestSubscriber-Queue-%d", id)
_, err = ts.QueueDeclare(ctx, queueName)
if err != nil {
assert.NoError(t, err)
return
}
defer func() {
i, err := ts.QueueDelete(ctx, queueName)
assert.NoError(t, err)
assert.Equal(t, 0, i)
}()
cctx, cancel := context.WithCancel(ctx)

exchangeName := fmt.Sprintf("TestSubscriber-Exchange-%d", id)
err = ts.ExchangeDeclare(ctx, exchangeName, "topic")
if err != nil {
assert.NoError(t, err)
return
}
defer func() {
err := ts.ExchangeDelete(ctx, exchangeName)
assert.NoError(t, err)
}()
sub := pool.NewSubscriber(hp, pool.SubscriberWithContext(cctx))
defer sub.Close()

err = ts.QueueBind(ctx, queueName, "#", exchangeName)
if err != nil {
assert.NoError(t, err)
return
}
defer func() {
err := ts.QueueUnbind(ctx, queueName, "#", exchangeName, nil)
assert.NoError(t, err)
}()
sub.RegisterHandlerFunc(queueName,
func(ctx context.Context, msg pool.Delivery) error {

message := fmt.Sprintf("Message-%s", queueName)
// handler func
receivedMsg := string(msg.Body)
// assert equel to message that is to be sent
assert.Equal(t, subscriberMsgGen(), receivedMsg)

cctx, cancel := context.WithCancel(p.Context())
// close subscriber from within handler
cancel()
return nil
},
pool.ConsumeOptions{
ConsumerTag: consumerName,
Exclusive: true,
},
)
err = sub.Start(ctx)
if err != nil {
assert.NoError(t, err)
return
}
time.Sleep(5 * time.Second)

sub := pool.NewSubscriber(p, pool.SubscriberWithContext(cctx))
defer sub.Close()
pub := pool.NewPublisher(hp)
defer pub.Close()

sub.RegisterHandlerFunc(queueName,
func(ctx context.Context, msg pool.Delivery) error {
err = pub.Publish(ctx, exchangeName, "", pool.Publishing{
Mandatory: true,
ContentType: "application/json",
Body: []byte(publisherMsgGen()),
})
assert.NoError(t, err)

// handler func
receivedMsg := string(msg.Body)
// assert equel to message that is to be sent
assert.Equal(t, message, receivedMsg)
// this should be canceled upon context cancelation from within the
// subscriber handler.
sub.Wait()
}

// close subscriber from within handler
cancel()
return nil
},
pool.ConsumeOptions{
ConsumerTag: fmt.Sprintf("Consumer-%s", queueName),
Exclusive: true,
},
)
err = sub.Start(ctx)
if err != nil {
assert.NoError(t, err)
return
}
time.Sleep(5 * time.Second)
func TestNewSubscriberWithDisconnect(t *testing.T) {
t.Parallel()

pub := pool.NewPublisher(p)
defer pub.Close()
var (
ctx = context.TODO()
poolName = testutils.FuncName()
proxyName, connectURL, _ = testutils.NextConnectURL()
disconnected, reconnected = Disconnect(t, proxyName, 5*time.Second)
)

pub.Publish(ctx, exchangeName, "", pool.Publishing{
Mandatory: true,
ContentType: "application/json",
Body: []byte(message),
})
hp, err := pool.New(
ctx,
testutils.HealthyConnectURL,
1,
1,
pool.WithConfirms(true),
pool.WithLogger(logging.NewTestLogger(t)),
pool.WithName(poolName),
)
if err != nil {
assert.NoError(t, err)
return
}
defer hp.Close()

// this should be canceled upon context cancelation from within the
// subscriber handler.
sub.Wait()
bp, err := pool.New(
ctx,
connectURL,
1,
1,
pool.WithConfirms(true),
pool.WithLogger(logging.NewTestLogger(t)),
pool.WithName(poolName+"-broken"),
)
if err != nil {
assert.NoError(t, err)
return
}
defer bp.Close()

}(int64(id))
var (
nextExchangeName = testutils.ExchangeNameGenerator(poolName)
nextQueueName = testutils.QueueNameGenerator(poolName)
exchangeName = nextExchangeName()
queueName = nextQueueName()
nextConsumerName = testutils.ConsumerNameGenerator(queueName)
consumerName = nextConsumerName()
publisherMsgGen = testutils.MessageGenerator(queueName)
subscriberMsgGen = testutils.MessageGenerator(queueName)
)

ts, err := hp.GetTransientSession(ctx)
if err != nil {
assert.NoError(t, err)
return
}
defer hp.ReturnSession(ts, nil)

wg.Wait()
cleanup := DeclareExchangeQueue(t, ctx, ts, exchangeName, queueName)
defer cleanup()

cctx, cancel := context.WithCancel(ctx)

sub := pool.NewSubscriber(hp, pool.SubscriberWithContext(cctx))
defer sub.Close()

sub.RegisterHandlerFunc(queueName,
func(ctx context.Context, msg pool.Delivery) error {

// handler func
receivedMsg := string(msg.Body)
// assert equel to message that is to be sent
assert.Equal(t, subscriberMsgGen(), receivedMsg)

// close subscriber from within handler
cancel()
return nil
},
pool.ConsumeOptions{
ConsumerTag: consumerName,
Exclusive: true,
},
)
pub := pool.NewPublisher(hp)
defer pub.Close()

err = pub.Publish(ctx, exchangeName, "", pool.Publishing{
Mandatory: true,
ContentType: "application/json",
Body: []byte(publisherMsgGen()),
})
assert.NoError(t, err)

disconnected()
err = sub.Start(ctx)
if err != nil {
assert.NoError(t, err)
return
}

sub.Wait()
reconnected()
}

func TestBatchSubscriber(t *testing.T) {
Expand Down

0 comments on commit bed77d4

Please sign in to comment.