From 8870af0e0cc27ddad2eb2f8ae1f827764e5e0d02 Mon Sep 17 00:00:00 2001 From: myan Date: Tue, 10 Sep 2024 10:08:13 +0000 Subject: [PATCH] add err group Signed-off-by: myan --- test/integration/mqtt_paho/concurrent_test.go | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/test/integration/mqtt_paho/concurrent_test.go b/test/integration/mqtt_paho/concurrent_test.go index 967c3ff2f..0d5fbcccd 100644 --- a/test/integration/mqtt_paho/concurrent_test.go +++ b/test/integration/mqtt_paho/concurrent_test.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" cloudevents "github.com/cloudevents/sdk-go/v2" cecontext "github.com/cloudevents/sdk-go/v2/context" @@ -30,24 +31,24 @@ func TestConcurrentSendingEvent(t *testing.T) { senderNum := 10 // 10 gorutine to sending the events eventNum := 1000 // each gorutine sender publishs 1,000 events + var g errgroup.Group + // start a receiver c, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs()) require.NoError(t, err) - go func() { - // sending 10,000 events concurrenly and verify all of them can be recieved + g.Go(func() error { + // verify all of events can be recieved count := senderNum * eventNum var mu sync.Mutex - err = c.StartReceiver(ctx, func(event cloudevents.Event) { + return c.StartReceiver(ctx, func(event cloudevents.Event) { mu.Lock() defer mu.Unlock() count-- - // all the events has been received if count == 0 { readyCh <- true } }) - require.NoError(t, err) - }() + }) // wait for 5 seconds to ensure the receiver starts safely time.Sleep(5 * time.Second) @@ -62,17 +63,22 @@ func TestConcurrentSendingEvent(t *testing.T) { require.NoError(t, err) for i := 0; i < senderNum; i++ { - go func() { + g.Go(func() error { for j := 0; j < eventNum; j++ { result := client.Send( cecontext.WithTopic(ctx, topicName), evt, ) - require.NoError(t, result) + if result != nil { + return result + } } - }() + return nil + }) } + require.NoError(t, g.Wait()) + // wait until all the events are received for { select {