Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

the context update #51

Merged
merged 76 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
76 commits
Select commit Hold shift + click to select a range
64c6da2
[WIP] the context update
jxsl13 Feb 26, 2024
716e690
improve flow control handling, reduce redundant code, immediately ret…
jxsl13 Feb 28, 2024
59cd9a1
update basic tests
jxsl13 Mar 4, 2024
47dc367
add Jitter test utility
jxsl13 Mar 5, 2024
da85365
rename test utility parameter
jxsl13 Mar 5, 2024
c29f640
add logging to test helpers
jxsl13 Mar 5, 2024
e0eeaf9
fix callback bug & refactor session
jxsl13 Mar 5, 2024
0801433
make connection tests that do not use toxiproxy run in parallel
jxsl13 Mar 5, 2024
bee775f
remove context parameter from ReturnConnection and recover (if necess…
jxsl13 Mar 6, 2024
49d5859
derive connection names for pool tests from test function name
jxsl13 Mar 6, 2024
91c0e8b
update connection pool tests
jxsl13 Mar 6, 2024
8bb9559
update session pool tests
jxsl13 Mar 6, 2024
298684f
[WIP] split session test into smaller and easier to understand tests
jxsl13 Mar 6, 2024
af569a0
add healthy rabbitmq connection
jxsl13 Mar 7, 2024
48c1bf1
fix message generator
jxsl13 Mar 7, 2024
c464b33
improve ConsumeAsyncN test utility & add utility for creating test se…
jxsl13 Mar 7, 2024
f547dee
add Disconnect utility
jxsl13 Mar 7, 2024
d3b8abd
add healthy connect url to tests
jxsl13 Mar 7, 2024
88a8e37
improve tests & allow running all tests in parallel.
jxsl13 Mar 7, 2024
26e54cf
update testutils port generator
jxsl13 Mar 8, 2024
0aca705
add close error checks
jxsl13 Mar 8, 2024
b8bdb85
finalize session tests
jxsl13 Mar 8, 2024
f225b8b
cleanup & improve test utilities
jxsl13 Mar 8, 2024
5f3102a
improve error messages
jxsl13 Mar 8, 2024
f2218c8
update .gitignore
jxsl13 Mar 8, 2024
09a2ff3
update publisher & test (utlities)
jxsl13 Mar 8, 2024
2e907da
update dockerfile to only port forward to localhost
jxsl13 Mar 8, 2024
cd930e7
update readme
jxsl13 Mar 8, 2024
85bd49f
add address log field to connection logs
jxsl13 Mar 8, 2024
d3d8a08
add returned channel to session & to AwaitConfirm
jxsl13 Mar 8, 2024
55a7e5b
update proxy utility
jxsl13 Mar 8, 2024
7fdb5fe
update publisher test
jxsl13 Mar 8, 2024
33f5df3
do not defer reconnected function
jxsl13 Mar 8, 2024
460785b
cleanup logging
jxsl13 Mar 8, 2024
bed77d4
refactor and simplify first subscriber tests to run in parallel
jxsl13 Mar 9, 2024
6b44962
update toxiproxy image to 2.7.0
jxsl13 Mar 9, 2024
3d076ee
improve toxiproxy log messages
jxsl13 Mar 12, 2024
dcf1210
export Flush method & flag connections using errors instead of bools
jxsl13 Mar 12, 2024
0399ae8
fix transient id counter
jxsl13 Mar 12, 2024
c01a4fb
remove flaggable helper function
jxsl13 Mar 12, 2024
7bf300b
improve log message
jxsl13 Mar 12, 2024
91f7ae3
use new Flag & exported Flush methods in session pool
jxsl13 Mar 12, 2024
9f46f3a
add more logs to toxiproxy actions
jxsl13 Mar 12, 2024
4427e3c
remove unneded errors channel check when closing session
jxsl13 Mar 12, 2024
35edbd8
add low level tests to confirm deadlock assumptions
jxsl13 Mar 12, 2024
b988ebc
make happy path have the lowest probability to be hit (golang select …
jxsl13 Mar 12, 2024
eb1cf70
update subscriber & publisher tests
jxsl13 Mar 13, 2024
5370c48
update testutils
jxsl13 Mar 13, 2024
96dd887
update amqpx tests
jxsl13 Mar 13, 2024
94ff5b3
fix amqpx, amqpx tests & test utilities
jxsl13 Mar 14, 2024
eb0745a
rename *Size methods to *Capacity and make Size methods represent the…
jxsl13 Mar 14, 2024
9d0f9e9
update readme examples
jxsl13 Mar 14, 2024
f40c747
remove reconnect attempt assertions in tests
jxsl13 Mar 14, 2024
616ee39
update makefile test timeout
jxsl13 Mar 14, 2024
e2e046e
update test utils
jxsl13 Mar 15, 2024
12263ba
add test case for debugging a close deadlock
jxsl13 Mar 15, 2024
47b5648
update generators
jxsl13 Mar 15, 2024
39bd86d
update subscriber tests
jxsl13 Mar 15, 2024
5b8a69c
update session pool tests
jxsl13 Mar 15, 2024
b773bf3
update session comments
jxsl13 Mar 15, 2024
736cc72
fix session & connection no being returned back to the pool in case t…
jxsl13 Mar 15, 2024
1613877
only check if close session close does not deadlock, ignore returned …
jxsl13 Mar 15, 2024
8927bc5
flush connection channels after recovery
jxsl13 Mar 15, 2024
991323c
remove connection channel flush
jxsl13 Mar 15, 2024
6abb8fc
improve ConsumeN test utility
jxsl13 Mar 15, 2024
e757aef
fix nil pointer dereference
jxsl13 Mar 15, 2024
c675cd0
remove signal handling from tests
jxsl13 Mar 15, 2024
df88d09
update low level channel close with disconnect tests
jxsl13 Mar 15, 2024
a5a8c64
make two amwpx sub/sub tests more robust to connection loss
jxsl13 Mar 15, 2024
e8a7a25
update toxiproxy test container
jxsl13 Mar 15, 2024
8017954
increase close timeout
jxsl13 Mar 15, 2024
3d39586
run less tests in parallel
jxsl13 Mar 15, 2024
acda79f
decrease toxiproxy ports
jxsl13 Mar 15, 2024
f5cfb80
remove parallel tests
jxsl13 Mar 15, 2024
626c0cd
allow parallel tests by default but run them sequentially in the CI p…
jxsl13 Mar 15, 2024
554e0a1
test increased parallelism
jxsl13 Mar 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
run: docker-compose up -d

- name: Code Coverage
run: go test -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt ./...
run: go test -timeout 900s -race -count=1 -parallel 2 -covermode=atomic -coverprofile=coverage.txt ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
*.test
*trace*
coverage.txt
DEBUG.md
debug.md
__debug_bin*
*.log
453 changes: 0 additions & 453 deletions DEBUG.md

This file was deleted.

16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,18 @@ down:
docker-compose down

test:
go test -timeout 900s -v -race -count=1 ./...
go test -timeout 600ss -v -race -count=1 ./... > parallel.test.log

test-sequentially:
go test -timeout 900s -v -race -parallel 1 -count=1 ./... > sequential.test.log

count-tests:
grep -REn 'func Test.+\(.+testing\.T.*\)' . | wc -l

count-disconnect-tests:
grep -REn 'func Test.+WithDisconnect.*\(.+testing\.T.*\)' . | wc -l


pool.TestBatchSubscriberMaxBytes:
go test -timeout 0m30s github.com/jxsl13/amqpx/pool -run ^TestBatchSubscriberMaxBytes$ -v -count=1 -race 2>&1 > debug.test.log
cat test.log | grep 'INFO: session' | sort | uniq -c
84 changes: 57 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background())
defer cancel()

amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
amqpx.RegisterTopologyCreator(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity
t.ExchangeDeclare("example-exchange", "topic") // durable exchange by default
t.QueueDeclare("example-queue") // durable quorum queue by default
t.QueueBind("example-queue", "route.name.v1.event", "example-exchange")
_ = t.ExchangeDeclare(ctx, "example-exchange", "topic") // durable exchange by default
_, _ = t.QueueDeclare(ctx, "example-queue") // durable quorum queue by default
_ = t.QueueBind(ctx, "example-queue", "route.name.v1.event", "example-exchange")
return nil
})
amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error {
amqpx.RegisterTopologyDeleter(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity
t.QueueDelete("example-queue")
t.ExchangeDelete("example-exchange")
_, _ = t.QueueDelete(ctx, "example-queue")
_ = t.ExchangeDelete(ctx, "example-exchange")
return nil
})

amqpx.RegisterHandler("example-queue", func(msg pool.Delivery) error {
amqpx.RegisterHandler("example-queue", func(ctx context.Context, msg pool.Delivery) error {
fmt.Println("received message:", string(msg.Body))
fmt.Println("canceling context")
cancel()
Expand All @@ -70,12 +70,13 @@ func main() {
})

amqpx.Start(
ctx,
amqpx.NewURL("localhost", 5672, "admin", "password"), // or amqp://username@password:localhost:5672
amqpx.WithLogger(logging.NewNoOpLogger()), // provide a logger that implements the logging.Logger interface
)
defer amqpx.Close()

amqpx.Publish("example-exchange", "route.name.v1.event", pool.Publishing{
_ = amqpx.Publish(ctx, "example-exchange", "route.name.v1.event", pool.Publishing{
ContentType: "application/json",
Body: []byte("my test event"),
})
Expand All @@ -101,8 +102,8 @@ import (
"github.com/jxsl13/amqpx/logging"
)

func ExampleConsumer(cancel func()) amqpx.HandlerFunc {
return func(msg amqpx.Delivery) error {
func SomeConsumer(cancel func()) pool.HandlerFunc {
return func(ctx context.Context, msg pool.Delivery) error {
fmt.Println("received message:", string(msg.Body))
fmt.Println("canceling context")
cancel()
Expand All @@ -116,45 +117,46 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background())
defer cancel()

amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
amqpx.RegisterTopologyCreator(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity

t.ExchangeDeclare("example-exchange", "topic",
_ = t.ExchangeDeclare(ctx, "example-exchange", "topic",
pool.ExchangeDeclareOptions{
Durable: true,
},
)
t.QueueDeclare("example-queue",
_, _ = t.QueueDeclare(ctx, "example-queue",
pool.QueueDeclareOptions{
Durable: true,
Args: pool.QuorumQueue,
},
)
t.QueueBind("example-queue", "route.name.v1.event", "example-exchange")
t.QueueBind(ctx, "example-queue", "route.name.v1.event", "example-exchange")
return nil
})
amqpx.RegisterTopologyDeleter(func(t *amqpx.Topologer) error {
amqpx.RegisterTopologyDeleter(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity
t.QueueDelete("example-queue")
t.ExchangeDelete("example-exchange")
_, _ = t.QueueDelete(ctx, "example-queue")
_ = t.ExchangeDelete(ctx, "example-exchange")
return nil
})

amqpx.RegisterHandler("example-queue",
ExampleConsumer(cancel),
SomeConsumer(cancel),
pool.ConsumeOptions{
ConsumerTag: "example-queue-cunsumer",
Exclusive: true,
},
)

amqpx.Start(
_ = amqpx.Start(
ctx,
amqpx.NewURL("localhost", 5672, "admin", "password"), // or amqp://username@password:localhost:5672
amqpx.WithLogger(logging.NewNoOpLogger()), // provide a logger that implements the logging.Logger interface (logrus adapter is provided)
)
defer amqpx.Close()

amqpx.Publish("example-exchange", "route.name.v1.event", amqpx.Publishing{
_ = amqpx.Publish(ctx, "example-exchange", "route.name.v1.event", pool.Publishing{
ContentType: "application/json",
Body: []byte("my test event"),
})
Expand Down Expand Up @@ -192,20 +194,48 @@ A `Subscriber` must be `Start()`ed in order for it to create consumer goroutines

## Development

Tests can all be run in parallel but the parallel testing is disabled for now because of the GitHub runners starting to behave weirdly when under such a load.
That is why those tests were disabled for the CI pipeline.

Test flags you might want to add:
```shell
-v -race -count=1
go test -v -race -count=1 ./...
```
- see test logs
- detect data races
- do not cache test results

Starting the test environment:
Starting the tests:
```shell
docker-compose up -d
go test -v -race -count=1 ./...
```

Starting the tests:
### Test environment

- Requires docker (and docker compose subcommand)

Starting the test environment:
```shell
go test -v -race -count=1 ./...
```
make environment
#or
docker compose up -d
```

The test environment looks like this:

Web interfaces:
- [rabbitmq management interface: http://127.0.0.1:15672 -> rabbitmq:15672](http://127.0.0.1:15672)
- [out of memory rabbitmq management interface: http://127.0.0.1:25672 -> rabbitmq-broken:15672](http://127.0.0.1:25672)

```
127.0.0.1:5670 -> rabbitmq-broken:5672 # out of memory rabbitmq
127.0.0.1:5671 -> rabbitmq:5672 # healthy rabbitmq connection which is never disconnected


127.0.0.1:5672 -> toxiproxy:5672 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
127.0.0.1:5673 -> toxiproxy:5673 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
127.0.0.1:5674 -> toxiproxy:5674 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
...
127.0.0.1:5771 -> toxiproxy:5771 -> rabbitmq:5672 # connection which is disconnected by toxiproxy

```
Loading
Loading