Skip to content

Commit

Permalink
Merge pull request #51 from jxsl13/feat/the-context-update
Browse files Browse the repository at this point in the history
the context update
  • Loading branch information
jxsl13 authored Mar 15, 2024
2 parents 8fb166c + 554e0a1 commit 469f1f9
Show file tree
Hide file tree
Showing 51 changed files with 4,655 additions and 2,501 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
run: docker-compose up -d

- name: Code Coverage
run: go test -timeout 900s -race -count=1 -covermode=atomic -coverprofile=coverage.txt ./...
run: go test -timeout 900s -race -count=1 -parallel 2 -covermode=atomic -coverprofile=coverage.txt ./...

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
*.test
*trace*
coverage.txt
DEBUG.md
debug.md
__debug_bin*
*.log
453 changes: 0 additions & 453 deletions DEBUG.md

This file was deleted.

16 changes: 15 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,18 @@ down:
docker-compose down

test:
go test -timeout 900s -v -race -count=1 ./...
go test -timeout 600ss -v -race -count=1 ./... > parallel.test.log

test-sequentially:
go test -timeout 900s -v -race -parallel 1 -count=1 ./... > sequential.test.log

count-tests:
grep -REn 'func Test.+\(.+testing\.T.*\)' . | wc -l

count-disconnect-tests:
grep -REn 'func Test.+WithDisconnect.*\(.+testing\.T.*\)' . | wc -l


pool.TestBatchSubscriberMaxBytes:
go test -timeout 0m30s github.com/jxsl13/amqpx/pool -run ^TestBatchSubscriberMaxBytes$ -v -count=1 -race 2>&1 > debug.test.log
cat test.log | grep 'INFO: session' | sort | uniq -c
84 changes: 57 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background())
defer cancel()

amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
amqpx.RegisterTopologyCreator(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity
t.ExchangeDeclare("example-exchange", "topic") // durable exchange by default
t.QueueDeclare("example-queue") // durable quorum queue by default
t.QueueBind("example-queue", "route.name.v1.event", "example-exchange")
_ = t.ExchangeDeclare(ctx, "example-exchange", "topic") // durable exchange by default
_, _ = t.QueueDeclare(ctx, "example-queue") // durable quorum queue by default
_ = t.QueueBind(ctx, "example-queue", "route.name.v1.event", "example-exchange")
return nil
})
amqpx.RegisterTopologyDeleter(func(t *pool.Topologer) error {
amqpx.RegisterTopologyDeleter(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity
t.QueueDelete("example-queue")
t.ExchangeDelete("example-exchange")
_, _ = t.QueueDelete(ctx, "example-queue")
_ = t.ExchangeDelete(ctx, "example-exchange")
return nil
})

amqpx.RegisterHandler("example-queue", func(msg pool.Delivery) error {
amqpx.RegisterHandler("example-queue", func(ctx context.Context, msg pool.Delivery) error {
fmt.Println("received message:", string(msg.Body))
fmt.Println("canceling context")
cancel()
Expand All @@ -70,12 +70,13 @@ func main() {
})

amqpx.Start(
ctx,
amqpx.NewURL("localhost", 5672, "admin", "password"), // or amqp://username@password:localhost:5672
amqpx.WithLogger(logging.NewNoOpLogger()), // provide a logger that implements the logging.Logger interface
)
defer amqpx.Close()

amqpx.Publish("example-exchange", "route.name.v1.event", pool.Publishing{
_ = amqpx.Publish(ctx, "example-exchange", "route.name.v1.event", pool.Publishing{
ContentType: "application/json",
Body: []byte("my test event"),
})
Expand All @@ -101,8 +102,8 @@ import (
"github.com/jxsl13/amqpx/logging"
)

func ExampleConsumer(cancel func()) amqpx.HandlerFunc {
return func(msg amqpx.Delivery) error {
func SomeConsumer(cancel func()) pool.HandlerFunc {
return func(ctx context.Context, msg pool.Delivery) error {
fmt.Println("received message:", string(msg.Body))
fmt.Println("canceling context")
cancel()
Expand All @@ -116,45 +117,46 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background())
defer cancel()

amqpx.RegisterTopologyCreator(func(t *pool.Topologer) error {
amqpx.RegisterTopologyCreator(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity

t.ExchangeDeclare("example-exchange", "topic",
_ = t.ExchangeDeclare(ctx, "example-exchange", "topic",
pool.ExchangeDeclareOptions{
Durable: true,
},
)
t.QueueDeclare("example-queue",
_, _ = t.QueueDeclare(ctx, "example-queue",
pool.QueueDeclareOptions{
Durable: true,
Args: pool.QuorumQueue,
},
)
t.QueueBind("example-queue", "route.name.v1.event", "example-exchange")
t.QueueBind(ctx, "example-queue", "route.name.v1.event", "example-exchange")
return nil
})
amqpx.RegisterTopologyDeleter(func(t *amqpx.Topologer) error {
amqpx.RegisterTopologyDeleter(func(ctx context.Context, t *pool.Topologer) error {
// error handling omitted for brevity
t.QueueDelete("example-queue")
t.ExchangeDelete("example-exchange")
_, _ = t.QueueDelete(ctx, "example-queue")
_ = t.ExchangeDelete(ctx, "example-exchange")
return nil
})

amqpx.RegisterHandler("example-queue",
ExampleConsumer(cancel),
SomeConsumer(cancel),
pool.ConsumeOptions{
ConsumerTag: "example-queue-cunsumer",
Exclusive: true,
},
)

amqpx.Start(
_ = amqpx.Start(
ctx,
amqpx.NewURL("localhost", 5672, "admin", "password"), // or amqp://username@password:localhost:5672
amqpx.WithLogger(logging.NewNoOpLogger()), // provide a logger that implements the logging.Logger interface (logrus adapter is provided)
)
defer amqpx.Close()

amqpx.Publish("example-exchange", "route.name.v1.event", amqpx.Publishing{
_ = amqpx.Publish(ctx, "example-exchange", "route.name.v1.event", pool.Publishing{
ContentType: "application/json",
Body: []byte("my test event"),
})
Expand Down Expand Up @@ -192,20 +194,48 @@ A `Subscriber` must be `Start()`ed in order for it to create consumer goroutines

## Development

Tests can all be run in parallel but the parallel testing is disabled for now because of the GitHub runners starting to behave weirdly when under such a load.
That is why those tests were disabled for the CI pipeline.

Test flags you might want to add:
```shell
-v -race -count=1
go test -v -race -count=1 ./...
```
- see test logs
- detect data races
- do not cache test results

Starting the test environment:
Starting the tests:
```shell
docker-compose up -d
go test -v -race -count=1 ./...
```

Starting the tests:
### Test environment

- Requires docker (and docker compose subcommand)

Starting the test environment:
```shell
go test -v -race -count=1 ./...
```
make environment
#or
docker compose up -d
```

The test environment looks like this:

Web interfaces:
- [rabbitmq management interface: http://127.0.0.1:15672 -> rabbitmq:15672](http://127.0.0.1:15672)
- [out of memory rabbitmq management interface: http://127.0.0.1:25672 -> rabbitmq-broken:15672](http://127.0.0.1:25672)

```
127.0.0.1:5670 -> rabbitmq-broken:5672 # out of memory rabbitmq
127.0.0.1:5671 -> rabbitmq:5672 # healthy rabbitmq connection which is never disconnected
127.0.0.1:5672 -> toxiproxy:5672 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
127.0.0.1:5673 -> toxiproxy:5673 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
127.0.0.1:5674 -> toxiproxy:5674 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
...
127.0.0.1:5771 -> toxiproxy:5771 -> rabbitmq:5672 # connection which is disconnected by toxiproxy
```
Loading

0 comments on commit 469f1f9

Please sign in to comment.