Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial test setup #42

Merged
merged 5 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ down:
docker-compose down

test:
go test -v -race -count=1 ./...
go test -timeout 900s -v -race -count=1 ./...
33 changes: 24 additions & 9 deletions logging/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"sort"
"strings"
"sync"
"testing"
"time"
)
Expand All @@ -19,6 +20,9 @@ func NewTestLogger(t *testing.T) *TestLogger {
}

func newTestLoggerWithFields(l *TestLogger, fields Fields) *TestLogger {
l.mu.Lock()
defer l.mu.Unlock()

n := &TestLogger{
t: l.t,
fields: make(map[string]any, len(fields)+len(l.fields)),
Expand All @@ -38,6 +42,7 @@ func newTestLoggerWithFields(l *TestLogger, fields Fields) *TestLogger {
type TestLogger struct {
t *testing.T
fields map[string]any
mu sync.Mutex
}

func (l *TestLogger) Debugf(format string, args ...any) {
Expand Down Expand Up @@ -138,23 +143,33 @@ func (l *TestLogger) fieldsMsg(level, msg string) string {
func (l *TestLogger) logf(prefix, format string, args ...any) {
l.t.Helper()
lpref := strings.ToLower(prefix)
msg := fmt.Sprintf(format, args...)
if strings.Contains(lpref, "panic") || strings.Contains(lpref, "fatal") {
l.t.Fatal(l.fieldsMsg(prefix, msg))
} else {
l.t.Log(l.fieldsMsg(prefix, msg))
arg := fmt.Sprintf(format, args...)

l.mu.Lock()
defer l.mu.Unlock()
msg := l.fieldsMsg(prefix, arg)
if lpref == "panic" || lpref == "fatal" {
l.t.Fatal(msg)
} else {
l.t.Log(msg)
}

}

func (l *TestLogger) log(prefix string, args ...any) {
l.t.Helper()
msg := fmt.Sprint(args...)

lpref := strings.ToLower(prefix)
if strings.Contains(lpref, "panic") || strings.Contains(lpref, "fatal") {
l.t.Fatal(l.fieldsMsg(prefix, msg))
arg := fmt.Sprint(args...)

l.mu.Lock()
defer l.mu.Unlock()
msg := l.fieldsMsg(prefix, arg)
if lpref == "panic" || lpref == "fatal" {
l.mu.Lock()
l.t.Fatal(msg)
l.mu.Unlock()
} else {
l.t.Log(l.fieldsMsg(prefix, msg))
l.t.Log(msg)
}
}
19 changes: 7 additions & 12 deletions pool/helpers_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (c *cancelContext) Reset(parentCtx context.Context) error {
}

type stateContext struct {
mu sync.RWMutex
mu sync.Mutex

parentCtx context.Context

Expand Down Expand Up @@ -203,12 +203,12 @@ func (sc *stateContext) Resumed() {
sc.resumed.Cancel()
}

func (sc *stateContext) Resuming() doner {
return sc.resuming
func (sc *stateContext) Resuming() context.Context {
return sc.resuming.Context()
}

func (sc *stateContext) Pausing() doner {
return sc.pausing
func (sc *stateContext) Pausing() context.Context {
return sc.pausing.Context()
}

func (sc *stateContext) Pause(ctx context.Context) error {
Expand Down Expand Up @@ -333,8 +333,8 @@ func (sc *stateContext) AwaitPaused(ctx context.Context) (err error) {
}

func (sc *stateContext) isClosed() bool {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.closed
}

Expand All @@ -354,8 +354,3 @@ func (sc *stateContext) closeUnguarded() {
sc.resumed.Cancel()
sc.closed = true
}

type doner interface {
Done() <-chan struct{}
Context() context.Context
}
184 changes: 184 additions & 0 deletions pool/helpers_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package pool

import (
"context"
"os"
"os/signal"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/jxsl13/amqpx/logging"
"github.com/stretchr/testify/assert"
)

func worker(t *testing.T, ctx context.Context, wg *sync.WaitGroup, sc *stateContext) {
defer wg.Done()

log := logging.NewTestLogger(t)
defer func() {
log.Debug("worker pausing (closing)")
sc.Paused()
log.Debug("worker paused (closing)")
log.Debug("worker closed")
}()
log.Debug("worker started")

for {
select {
case <-ctx.Done():
//log.Debug("worker done")
return
case <-sc.Resuming().Done():
//log.Debug("worker resuming")
sc.Resumed()
go func() {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
sc.Pause(ctx) // always have at least one goroutine that triggers the switch back to the other state after a specific time
}
}()
//log.Debug("worker resumed")
}
select {
case <-ctx.Done():
return
case <-sc.Pausing().Done():
//log.Debug("worker pausing")
sc.Paused()
//log.Debug("worker paused")
go func() {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
sc.Resume(ctx) // always have at least one goroutine that triggers the switch back to the other state after a specific time
}
}()
}
}
}

func TestStateContextSimpleSynchronized(t *testing.T) {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

sc := newStateContext(ctx)
var wg sync.WaitGroup
wg.Add(1)
go worker(t, ctx, &wg, sc)

// normal execution order
assert.NoError(t, sc.Resume(ctx))
assert.NoError(t, sc.Pause(ctx))

// somewhat random execution order
assert.NoError(t, sc.Pause(ctx)) // if already paused, nobody cares
assert.NoError(t, sc.Resume(ctx))

assert.NoError(t, sc.Resume(ctx)) // should be ignored
assert.NoError(t, sc.Pause(ctx))

cancel()
wg.Wait()
}

func TestStateContextConcurrentTransitions(t *testing.T) {
log := logging.NewTestLogger(t)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

sc := newStateContext(ctx)
var wwg sync.WaitGroup
wwg.Add(1)
go worker(t, ctx, &wwg, sc)

var wg sync.WaitGroup
var (
numGoroutines = 2000
iterations = 10000
)
trigger := make(chan struct{})

var (
pause atomic.Int64
resume atomic.Int64
)
wg.Add(numGoroutines)
for i := 0; i < (numGoroutines / 10 * 8); i++ {
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-trigger:
//log.Infof("routine %d triggered", id)
}
time.Sleep(time.Duration(id/100) * 20 * time.Millisecond)

for i := 0; i < iterations; i++ {
if id%2 == 0 {
assert.NoError(t, sc.Resume(ctx))
// log.Infof("routine %d resumed", id)
pause.Add(1)
} else {
assert.NoError(t, sc.Pause(ctx))
// log.Infof("routine %d paused", id)
resume.Add(1)
}
}
}(i)
}

var (
active atomic.Int64
awaitPaused atomic.Int64
awaitResumed atomic.Int64
)

for i := 0; i < (numGoroutines / 10 * 2); i++ {
go func(id int) {
defer wg.Done()
select {
case <-ctx.Done():
return
case <-trigger:
//log.Infof("routine %d triggered", id)
}
time.Sleep(time.Duration(id/100) * 10 * time.Millisecond)

for i := 0; i < iterations; i++ {
switch id % 3 {
case 0:
_, err := sc.IsActive(ctx)
assert.NoError(t, err)
active.Add(1)
case 1:
assert.NoError(t, sc.AwaitPaused(ctx))
// log.Infof("routine %d await paused", id)
awaitPaused.Add(1)
case 2:
assert.NoError(t, sc.AwaitResumed(ctx))
// log.Infof("routine %d await resumed", id)
awaitResumed.Add(1)
}
}

}(numGoroutines/2 + i)
}

close(trigger)
wg.Wait()

cancel()
wwg.Wait()

log.Debugf("pause: %d", pause.Load())
log.Debugf("resume: %d", resume.Load())
log.Debugf("active: %d", active.Load())
log.Debugf("awaitPaused: %d", awaitPaused.Load())
log.Debugf("awaitResumed: %d", awaitResumed.Load())
}
6 changes: 3 additions & 3 deletions pool/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (s *Subscriber) consume(h *Handler) (err error) {

// got a working session
delivery, err := session.ConsumeWithContext(
h.pausing().Context(),
h.pausing(),
opts.Queue,
opts.ConsumeOptions,
)
Expand Down Expand Up @@ -424,7 +424,7 @@ func (s *Subscriber) batchConsume(h *BatchHandler) (err error) {

// got a working session
delivery, err := session.ConsumeWithContext(
h.pausing().Context(),
h.pausing(),
opts.Queue,
opts.ConsumeOptions,
)
Expand Down Expand Up @@ -608,7 +608,7 @@ func (s *Subscriber) ackBatchPostHandle(opts BatchHandlerConfig, lastDeliveryTag

type handler interface {
QueueConfig() QueueConfig
pausing() doner
pausing() context.Context
}

func (s *Subscriber) returnSession(h handler, session *Session, err error) {
Expand Down
4 changes: 2 additions & 2 deletions pool/subscriber_batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (h *BatchHandler) Pause(ctx context.Context) error {
return h.sc.Pause(ctx)
}

func (h *BatchHandler) pausing() doner {
func (h *BatchHandler) pausing() context.Context {
return h.sc.Pausing()
}

Expand All @@ -147,7 +147,7 @@ func (h *BatchHandler) Resume(ctx context.Context) error {
return h.sc.Resume(ctx)
}

func (h *BatchHandler) resuming() doner {
func (h *BatchHandler) resuming() context.Context {
return h.sc.Resuming()
}

Expand Down
4 changes: 2 additions & 2 deletions pool/subscriber_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (h *Handler) Pause(ctx context.Context) error {
return h.sc.Pause(ctx)
}

func (h *Handler) pausing() doner {
func (h *Handler) pausing() context.Context {
return h.sc.Pausing()
}

Expand All @@ -123,7 +123,7 @@ func (h *Handler) Resume(ctx context.Context) error {
return h.sc.Resume(ctx)
}

func (h *Handler) resuming() doner {
func (h *Handler) resuming() context.Context {
return h.sc.Resuming()
}

Expand Down