Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow generating test topics and sending non-empty messages #490

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pubsub/tests/bench_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type BenchmarkPubSubConstructor func(n int) (message.Publisher, message.Subscrib
// BenchSubscriber runs benchmark on a message Subscriber.
func BenchSubscriber(b *testing.B, pubSubConstructor BenchmarkPubSubConstructor) {
pub, sub := pubSubConstructor(b.N)
topicName := testTopicName(NewTestID())
topicName := testTopicName(TestContext{TestID: NewTestID()})

messages, err := sub.Subscribe(context.Background(), topicName)
if err != nil {
Expand Down
61 changes: 34 additions & 27 deletions pubsub/tests/test_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ type Features struct {
// NewSubscriberReceivesOldMessages should be set to true if messages are persisted even
// if they are already consumed (for example, like in Kafka).
NewSubscriberReceivesOldMessages bool

// GenerateTopicFunc overrides standard topic name generation.
GenerateTopicFunc func(tctx TestContext) string
}

// RunOnlyFastTests returns true if -short flag was provided -race was not provided.
Expand Down Expand Up @@ -218,7 +221,7 @@ func TestPublishSubscribe(
) {
pub, sub := pubSubConstructor(t)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -267,7 +270,7 @@ func TestConcurrentSubscribe(
pub, initSub := pubSubConstructor(t)
defer closePubSub(t, pub, initSub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

messagesCount := 5000
subscribersCount := 50
Expand Down Expand Up @@ -322,7 +325,7 @@ func TestConcurrentSubscribeMultipleTopics(
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))
messagesToPublish = append(messagesToPublish, msg)
}

Expand All @@ -332,7 +335,7 @@ func TestConcurrentSubscribeMultipleTopics(
receivedMessagesCh := make(chan message.Messages, topicsCount)

for i := 0; i < topicsCount; i++ {
topicName := testTopicName(tCtx.TestID) + fmt.Sprintf("-%d", i)
topicName := testTopicName(tCtx) + fmt.Sprintf("-%d", i)

var messagesToPublishForTopic []*message.Message
for _, msg := range messagesToPublish {
Expand Down Expand Up @@ -397,7 +400,7 @@ func TestPublishSubscribeInOrder(
pub, initSub := pubSubConstructor(t)
defer closePubSub(t, pub, initSub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := initSub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -467,7 +470,7 @@ func TestResendOnError(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -520,7 +523,7 @@ func TestNoAck(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand All @@ -530,7 +533,7 @@ func TestNoAck(
id := watermill.NewUUID()
log.Printf("sending %s", id)

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))

err := publishWithRetry(pub, topicName, msg)
require.NoError(t, err)
Expand Down Expand Up @@ -607,7 +610,7 @@ func TestContinueAfterSubscribeClose(
pub, sub := createPubSub(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -688,7 +691,7 @@ func TestConcurrentClose(
t.Skip("ExactlyOnceDelivery test is not supported yet")
}

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
createPub, createSub := createPubSub(t)
if subscribeInitializer, ok := createSub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
Expand Down Expand Up @@ -738,7 +741,7 @@ func TestContinueAfterErrors(
pub, sub := createPubSub(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -804,7 +807,7 @@ func TestConsumerGroups(
publisherPub, publisherSub := pubSubConstructor(t, "test_"+watermill.NewUUID())
defer closePubSub(t, publisherPub, publisherSub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := publisherSub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand All @@ -828,7 +831,7 @@ func TestPublisherClose(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -856,8 +859,8 @@ func TestTopic(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topic1 := testTopicName(tCtx.TestID) + "-1"
topic2 := testTopicName(tCtx.TestID) + "-2"
topic1 := testTopicName(tCtx) + "-1"
topic2 := testTopicName(tCtx) + "-2"

if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topic1))
Expand All @@ -866,8 +869,8 @@ func TestTopic(
require.NoError(t, subscribeInitializer.SubscribeInitialize(topic2))
}

topic1Msg := message.NewMessage(watermill.NewUUID(), nil)
topic2Msg := message.NewMessage(watermill.NewUUID(), nil)
topic1Msg := message.NewMessage(watermill.NewUUID(), []byte("x"))
topic2Msg := message.NewMessage(watermill.NewUUID(), []byte("x"))

require.NoError(t, publishWithRetry(pub, topic1, topic1Msg))
require.NoError(t, publishWithRetry(pub, topic2, topic2Msg))
Expand Down Expand Up @@ -904,12 +907,12 @@ func TestMessageCtx(
pub, sub := pubSubConstructor(t)
defer closePubSub(t, pub, sub)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}

msg := message.NewMessage(watermill.NewUUID(), nil)
msg := message.NewMessage(watermill.NewUUID(), []byte("x"))

// ensuring that context is not propagated via pub/sub
ctx, ctxCancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -986,7 +989,7 @@ func TestSubscribeCtx(
ctxWithCancel, cancel := context.WithCancel(context.Background())
ctxWithCancel = context.WithValue(ctxWithCancel, contextKey("foo"), "bar")

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -1037,7 +1040,7 @@ func TestReconnect(

pub, sub := pubSubConstructor(t)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -1078,7 +1081,7 @@ func TestReconnect(
go func() {
for range publishMessage {
id := watermill.NewUUID()
msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))

for {
fmt.Println("publishing message")
Expand Down Expand Up @@ -1123,7 +1126,7 @@ func TestNewSubscriberReceivesOldMessages(

pub, sub := pubSubConstructor(t)

topicName := testTopicName(tCtx.TestID)
topicName := testTopicName(tCtx)
if subscribeInitializer, ok := sub.(message.SubscribeInitializer); ok {
require.NoError(t, subscribeInitializer.SubscribeInitialize(topicName))
}
Expand Down Expand Up @@ -1219,8 +1222,12 @@ func assertConsumerGroupReceivedMessages(
AssertAllMessagesReceived(t, expectedMessages, receivedMessages)
}

func testTopicName(testID TestID) string {
return "topic-" + string(testID)
func testTopicName(tctx TestContext) string {
if tctx.Features.GenerateTopicFunc != nil {
return tctx.Features.GenerateTopicFunc(tctx)
}

return "topic-" + string(tctx.TestID)
}

func closePubSub(t *testing.T, pub message.Publisher, sub message.Subscriber) {
Expand Down Expand Up @@ -1254,7 +1261,7 @@ func PublishSimpleMessages(t *testing.T, messagesCount int, publisher message.Pu
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))
messagesToPublish = append(messagesToPublish, msg)

err := publishWithRetry(publisher, topicName, msg)
Expand Down Expand Up @@ -1286,7 +1293,7 @@ func AddSimpleMessagesParallel(t *testing.T, messagesCount int, publisher messag
for i := 0; i < messagesCount; i++ {
id := watermill.NewUUID()

msg := message.NewMessage(id, nil)
msg := message.NewMessage(id, []byte("x"))
messagesToPublish = append(messagesToPublish, msg)

publishMsg <- msg
Expand Down
Loading