Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka): Fix internal docker connection #2894

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
320e54d
fixed kafka internal docker connection
catinapoke Apr 19, 2024
9b6537d
added new option for kafka listeners
catinapoke May 10, 2024
15f4ca3
Merge remote-tracking branch 'original/main'
catinapoke May 10, 2024
c7552f4
fixed kafka docs with new options
catinapoke May 13, 2024
8904938
Update docs/modules/kafka.md
catinapoke May 14, 2024
7b38c28
Apply suggestions from code review for docs
catinapoke May 14, 2024
98793d7
added new test, fixed docs from code review
catinapoke May 16, 2024
62a68b5
Merge branch 'main' of https://github.com/catinapoke/testcontainers-go
catinapoke May 16, 2024
a7c4a9f
Merge remote-tracking branch 'Original/main'
catinapoke May 16, 2024
bfa9191
renamed docker folder to testdata and fixed test name
catinapoke May 16, 2024
f1b0d48
Merge remote-tracking branch 'Original/main'
catinapoke May 23, 2024
128d31a
added test for input listeners validation and refactor from code review
catinapoke May 23, 2024
ead068a
added unit test for trimValidateListeners
catinapoke May 28, 2024
eb71110
Merge remote-tracking branch 'Original/main'
catinapoke May 28, 2024
a4d91e2
Merge branch 'main' into main
mdelapenya Jun 7, 2024
dd8fcec
Merge remote-tracking branch 'upstream/main'
catinapoke Aug 25, 2024
5c147d4
go mod update
catinapoke Aug 25, 2024
b31e1af
todo for PR
catinapoke Aug 25, 2024
9f23a4d
updated copyStarterScript for upstream
catinapoke Aug 25, 2024
e63a604
fix: typo
mdelapenya Sep 2, 2024
f9cb155
fix: use Run method
mdelapenya Sep 2, 2024
8394529
chore: use PLAINTEXT and BROKER
mdelapenya Sep 2, 2024
31950f4
fix: handle error in tests
mdelapenya Sep 2, 2024
3aa2cbf
chore: make lint
mdelapenya Sep 2, 2024
d61004f
chore: use non deprecated APIs
mdelapenya Sep 2, 2024
df53491
chore: handle errors in testss
mdelapenya Sep 2, 2024
bf02852
fix: close sarama client
mdelapenya Sep 2, 2024
152ef0c
fix: validation in test
mdelapenya Sep 2, 2024
efa0f7d
chore: refactor test
mdelapenya Sep 2, 2024
d669bfb
chore: add test using kcat
mdelapenya Sep 2, 2024
4c44b97
Merge branch 'main' into catinapoke/main
mdelapenya Sep 9, 2024
ba701e1
Merge pull request #1 from mdelapenya/catinapoke/main
catinapoke Sep 17, 2024
b51be12
fixed basic kafka test but network one is broken
catinapoke Sep 17, 2024
c8c47f4
not working test with kcat
catinapoke Sep 23, 2024
cc60b86
Merge branch 'main' into catinapoke/main
mdelapenya Nov 21, 2024
84cd919
fix: use PLAINTEXT in test
mdelapenya Nov 21, 2024
8a65a7b
chor: rename container variable
mdelapenya Nov 21, 2024
fdbabd2
chore: refactor unit tests for the new test function pattern
mdelapenya Nov 21, 2024
3429d0b
chore: use kcat container function
mdelapenya Nov 21, 2024
01ba4e9
chore: use require and new CleanupContainer functions
mdelapenya Nov 21, 2024
63ee6cf
chore: use test function pattern even more
mdelapenya Nov 21, 2024
8660157
chore: exclude lint error
mdelapenya Nov 21, 2024
3c1e739
chore: remove unused code
mdelapenya Nov 21, 2024
4f152f2
chore: rename to Listener
mdelapenya Nov 25, 2024
76cf56d
chore: trim listeners in the option
mdelapenya Nov 25, 2024
ec67726
chore: use empty struct to not consume memory
mdelapenya Nov 25, 2024
da0deb8
chore: format errors
mdelapenya Nov 25, 2024
f2aaa60
chore: simplify advertised slice
mdelapenya Nov 25, 2024
f82cfc9
chore: simplify envs for listeners
mdelapenya Nov 25, 2024
2b00774
chore: make WithListeners self-contained
mdelapenya Nov 25, 2024
50f9bcc
fix: proper advertised slice
mdelapenya Nov 25, 2024
7772f4f
chore: simplify test helpers
mdelapenya Nov 25, 2024
45f6660
chore: use require
mdelapenya Nov 25, 2024
aa341f8
chore: extract inline helper to a function
mdelapenya Nov 25, 2024
278ec35
chore: refactor options to support writing to the container request
mdelapenya Nov 25, 2024
fc81847
docs: refinement
mdelapenya Nov 25, 2024
4ab2546
docs: document withClusterID function
mdelapenya Nov 25, 2024
9adbd97
chore: remove unused
mdelapenya Nov 25, 2024
67c3480
WIP
mdelapenya Nov 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ The environment variables that are already set by default are:

{% include "../features/common_functional_options.md" %}


#### ClusterId

You can set up cluster id by using `WithClusterID` option.

```
KafkaContainer, err = kafka.Run(ctx,
"confluentinc/confluent-local:7.6.1",
kafka.WithClusterID("test-cluster"))
```

#### Listeners

If you need to connect new listeners, you can use `WithListener(listeners []KafkaListener)`.
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
This option controls the following environment variables for the Kafka container:
- `KAFKA_LISTENERS`
- `KAFKA_REST_BOOTSTRAP_SERVERS`
- `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP`
- `KAFKA_INTER_BROKER_LISTENER_NAME`
- `KAFKA_ADVERTISED_LISTENERS`

Example:

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:kafkaWithListener
<!--/codeinclude-->

In the above code, we created a network for our container and attached kafka to it, so they can communicate. Then we marked port 9092 for our internal usage.

The first listener in the slice will be written in the env parameter `KAFKA_INTER_BROKER_LISTENER_NAME`

Every listener's name will be converted in upper case. Every name and port should be unique and will be checked in a validation step.

If you are not using this option or the listeners list is empty, there will be 2 default listeners with the following addresses and ports:

External - Host():MappedPort()
Internal - Host():9092

### Container Methods

The Kafka container exposes the following methods:
Expand All @@ -81,4 +119,4 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
<!--/codeinclude-->
<!--/codeinclude-->
11 changes: 5 additions & 6 deletions modules/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka
go 1.22

require (
github.com/IBM/sarama v1.42.1
github.com/IBM/sarama v1.43.3
github.com/docker/go-connections v0.5.0
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.34.0
Expand All @@ -23,7 +23,7 @@ require (
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -41,7 +41,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -53,7 +53,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand All @@ -69,8 +69,7 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
Expand Down
24 changes: 12 additions & 12 deletions modules/kafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA=
github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand All @@ -32,8 +32,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
Expand Down Expand Up @@ -85,8 +85,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -111,8 +111,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -190,14 +190,14 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
159 changes: 125 additions & 34 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
// starterScript {
starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
export KAFKA_ADVERTISED_LISTENERS=%s
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
Expand All @@ -36,6 +36,13 @@ echo '' > /etc/confluent/docker/ensure
type KafkaContainer struct {
testcontainers.Container
ClusterID string
Listeners KafkaListener
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}

type KafkaListener struct {
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
Name string
Host string
Port string
}

// Deprecated: use Run instead
Expand Down Expand Up @@ -70,37 +77,52 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
Entrypoint: []string{"sh"},
// this CMD will wait for the starter script to be copied into the container and then execute it
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript},
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// Use a single hook to copy the starter script and wait for
// the Kafka server to be ready. This prevents the wait running
// if the starter script fails to copy.
func(ctx context.Context, c testcontainers.Container) error {
// 1. copy the starter script into the container
if err := copyStarterScript(ctx, c); err != nil {
return fmt.Errorf("copy starter script: %w", err)
}

// 2. wait for the Kafka server to be ready
return wait.ForLog(".*Transitioning from RECOVERY to RUNNING.*").AsRegexp().WaitUntilReady(ctx, c)
},
},
},
},
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

settings := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(&settings)
}
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

if err := trimValidateListeners(settings.Listeners); err != nil {
return nil, fmt.Errorf("listeners validation: %w", err)
}

// apply envs for listeners
envChange := editEnvsForListeners(settings.Listeners)
for key, item := range envChange {
genericContainerReq.Env[key] = item
}

genericContainerReq.ContainerRequest.LifecycleHooks = []testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// Use a single hook to copy the starter script and wait for
// the Kafka server to be ready. This prevents the wait running
// if the starter script fails to copy.
func(ctx context.Context, c testcontainers.Container) error {
// 1. copy the starter script into the container
if err := copyStarterScript(ctx, c, &settings); err != nil {
return fmt.Errorf("copy starter script: %w", err)
}

// 2. wait for the Kafka server to be ready
return wait.ForLog(".*Transitioning from RECOVERY to RUNNING.*").AsRegexp().WaitUntilReady(ctx, c)
},
},
},
}

err := validateKRaftVersion(genericContainerReq.Image)
if err != nil {
return nil, err
Expand All @@ -121,32 +143,70 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return c, nil
}

func trimValidateListeners(listeners []KafkaListener) error {
// Trim
for i := 0; i < len(listeners); i++ {
listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " "))
listeners[i].Host = strings.Trim(listeners[i].Host, " ")
listeners[i].Port = strings.Trim(listeners[i].Port, " ")
}
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved

// Validate
var ports map[string]bool = make(map[string]bool, len(listeners)+2)
var names map[string]bool = make(map[string]bool, len(listeners)+2)
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved

// check for default listeners
ports["9094"] = true
ports["9093"] = true
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved

// check for default listeners
names["CONTROLLER"] = true
names["PLAINTEXT"] = true

for _, item := range listeners {
if names[item.Name] {
return fmt.Errorf("duplicate of listener name: %s", item.Name)
}
names[item.Name] = true

if ports[item.Port] {
return fmt.Errorf("duplicate of listener port: %s", item.Port)
}
ports[item.Port] = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: use struct, double return to check exists and quote:

Suggested change
if names[item.Name] {
return fmt.Errorf("duplicate of listener name: %s", item.Name)
}
names[item.Name] = true
if ports[item.Port] {
return fmt.Errorf("duplicate of listener port: %s", item.Port)
}
ports[item.Port] = true
if _, exists := names[item.Name]; exists {
return fmt.Errorf("duplicate of listener name: %q", item.Name)
}
names[item.Name] = struct{}{}
if _, exists := ports[item.Port]; exists {
return fmt.Errorf("duplicate of listener port: %s", item.Port)
}
ports[item.Port] = struct{}{}

question: do ports really need to be unique, I would expect it to be the name port combination that needs to be?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a kafka expert, so I'm not sure about that. @eddumelendez @catinapoke what are your thoughts on this?

}

return nil
}

// copyStarterScript copies the starter script into the container.
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
func copyStarterScript(ctx context.Context, c testcontainers.Container, settings *options) error {
if err := wait.ForListeningPort(publicPort).
SkipInternalCheck().
WaitUntilReady(ctx, c); err != nil {
return fmt.Errorf("wait for exposed port: %w", err)
}

host, err := c.Host(ctx)
if err != nil {
return fmt.Errorf("host: %w", err)
if len(settings.Listeners) == 0 {
defaultInternal, err := brokerListener(ctx, c)
if err != nil {
return fmt.Errorf("can't create default internal listener: %w", err)
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}
settings.Listeners = append(settings.Listeners, defaultInternal)
}

inspect, err := c.Inspect(ctx)
defaultExternal, err := plainTextListener(ctx, c)
if err != nil {
return fmt.Errorf("inspect: %w", err)
return fmt.Errorf("can't create default external listener: %w", err)
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}

hostname := inspect.Config.Hostname
settings.Listeners = append(settings.Listeners, defaultExternal)

port, err := c.MappedPort(ctx, publicPort)
if err != nil {
return fmt.Errorf("mapped port: %w", err)
var advertised []string
for _, item := range settings.Listeners {
advertised = append(advertised, fmt.Sprintf("%s://%s:%s", item.Name, item.Host, item.Port))
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}

scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname)
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ","))

if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil {
return fmt.Errorf("copy to container: %w", err)
Expand All @@ -155,12 +215,43 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
return nil
}

func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["CLUSTER_ID"] = clusterID
func editEnvsForListeners(listeners []KafkaListener) map[string]string {
if len(listeners) == 0 {
// no change
return map[string]string{}
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
envs := map[string]string{
"KAFKA_LISTENERS": "CONTROLLER://0.0.0.0:9094, PLAINTEXT://0.0.0.0:9093",
"KAFKA_REST_BOOTSTRAP_SERVERS": "CONTROLLER://0.0.0.0:9094, PLAINTEXT://0.0.0.0:9093",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT, PLAINTEXT:PLAINTEXT",
}

// expect first listener has common network between kafka instances
envs["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name

// expect small number of listeners, so joins is okay
for _, item := range listeners {
envs["KAFKA_LISTENERS"] = strings.Join(
[]string{
envs["KAFKA_LISTENERS"],
fmt.Sprintf("%s://0.0.0.0:%s", item.Name, item.Port),
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
},
",",
)

envs["KAFKA_REST_BOOTSTRAP_SERVERS"] = envs["KAFKA_LISTENERS"]

envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join(
[]string{
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"],
item.Name + ":" + "PLAINTEXT",
},
",",
)
}

return envs
}

// Brokers retrieves the broker connection strings from Kafka with only one entry,
Expand Down
Loading
Loading