Skip to content

Commit

Permalink
add err group
Browse files Browse the repository at this point in the history
Signed-off-by: myan <[email protected]>
  • Loading branch information
yanmxa committed Sep 10, 2024
1 parent 787a693 commit 8870af0
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions test/integration/mqtt_paho/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"

cloudevents "github.com/cloudevents/sdk-go/v2"
cecontext "github.com/cloudevents/sdk-go/v2/context"
Expand All @@ -30,24 +31,24 @@ func TestConcurrentSendingEvent(t *testing.T) {
senderNum := 10 // 10 gorutine to sending the events
eventNum := 1000 // each gorutine sender publishs 1,000 events

var g errgroup.Group

// start a receiver
c, err := cloudevents.NewClient(protocolFactory(ctx, t, topicName), cloudevents.WithUUIDs())
require.NoError(t, err)
go func() {
// sending 10,000 events concurrenly and verify all of them can be recieved
g.Go(func() error {
// verify all of events can be recieved
count := senderNum * eventNum
var mu sync.Mutex
err = c.StartReceiver(ctx, func(event cloudevents.Event) {
return c.StartReceiver(ctx, func(event cloudevents.Event) {
mu.Lock()
defer mu.Unlock()
count--
// all the events has been received
if count == 0 {
readyCh <- true
}
})
require.NoError(t, err)
}()
})
// wait for 5 seconds to ensure the receiver starts safely
time.Sleep(5 * time.Second)

Expand All @@ -62,17 +63,22 @@ func TestConcurrentSendingEvent(t *testing.T) {
require.NoError(t, err)

for i := 0; i < senderNum; i++ {
go func() {
g.Go(func() error {
for j := 0; j < eventNum; j++ {
result := client.Send(
cecontext.WithTopic(ctx, topicName),
evt,
)
require.NoError(t, result)
if result != nil {
return result
}
}
}()
return nil
})
}

require.NoError(t, g.Wait())

// wait until all the events are received
for {
select {
Expand Down

0 comments on commit 8870af0

Please sign in to comment.