Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka internal docker connection #2490

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
320e54d
fixed kafka internal docker connection
catinapoke Apr 19, 2024
9b6537d
added new option for kafka listeners
catinapoke May 10, 2024
15f4ca3
Merge remote-tracking branch 'original/main'
catinapoke May 10, 2024
c7552f4
fixed kafka docs with new options
catinapoke May 13, 2024
8904938
Update docs/modules/kafka.md
catinapoke May 14, 2024
7b38c28
Apply suggestions from code review for docs
catinapoke May 14, 2024
98793d7
added new test, fixed docs from code review
catinapoke May 16, 2024
62a68b5
Merge branch 'main' of https://github.com/catinapoke/testcontainers-go
catinapoke May 16, 2024
a7c4a9f
Merge remote-tracking branch 'Original/main'
catinapoke May 16, 2024
bfa9191
renamed docker folder to testdata and fixed test name
catinapoke May 16, 2024
f1b0d48
Merge remote-tracking branch 'Original/main'
catinapoke May 23, 2024
128d31a
added test for input listeners validation and refactor from code review
catinapoke May 23, 2024
ead068a
added unit test for trimValidateListeners
catinapoke May 28, 2024
eb71110
Merge remote-tracking branch 'Original/main'
catinapoke May 28, 2024
a4d91e2
Merge branch 'main' into main
mdelapenya Jun 7, 2024
dd8fcec
Merge remote-tracking branch 'upstream/main'
catinapoke Aug 25, 2024
5c147d4
go mod update
catinapoke Aug 25, 2024
b31e1af
todo for PR
catinapoke Aug 25, 2024
9f23a4d
updated copyStarterScript for upstream
catinapoke Aug 25, 2024
e63a604
fix: typo
mdelapenya Sep 2, 2024
f9cb155
fix: use Run method
mdelapenya Sep 2, 2024
8394529
chore: use PLAINTEXT and BROKER
mdelapenya Sep 2, 2024
31950f4
fix: handle error in tests
mdelapenya Sep 2, 2024
3aa2cbf
chore: make lint
mdelapenya Sep 2, 2024
d61004f
chore: use non deprecated APIs
mdelapenya Sep 2, 2024
df53491
chore: handle errors in testss
mdelapenya Sep 2, 2024
bf02852
fix: close sarama client
mdelapenya Sep 2, 2024
152ef0c
fix: validation in test
mdelapenya Sep 2, 2024
efa0f7d
chore: refactor test
mdelapenya Sep 2, 2024
d669bfb
chore: add test using kcat
mdelapenya Sep 2, 2024
4c44b97
Merge branch 'main' into catinapoke/main
mdelapenya Sep 9, 2024
ba701e1
Merge pull request #1 from mdelapenya/catinapoke/main
catinapoke Sep 17, 2024
b51be12
fixed basic kafka test but network one is broken
catinapoke Sep 17, 2024
c8c47f4
not working test with kcat
catinapoke Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,54 @@ The environment variables that are already set by default are:

{% include "../features/common_functional_options.md" %}


#### ClusterId

You can set up cluster id by using `WithClusterID` option.

```
KafkaContainer, err = kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.6.1"))
```

#### Listeners

If you need to connect new listeners, you can use `WithListener(listeners []KafkaListener)`.
This option controls next env parameters:
catinapoke marked this conversation as resolved.
Show resolved Hide resolved
- `KAFKA_LISTENERS`
- `KAFKA_REST_BOOTSTRAP_SERVERS`
- `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP`
- `KAFKA_INTER_BROKER_LISTENER_NAME`
- `KAFKA_ADVERTISED_LISTENERS`

Example:
```
KafkaContainer, err = kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.6.1"),
network.WithNetwork([]string{"kafka"}, Network),
kafka.WithListener([]kafka.KafkaListener{
{
Name: "INTERNAL",
Ip: "kafka",
Port: "9092",
},
}),
)
```
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

Here we created network for our container and added kafka to it, so they can communicate. Then we marked port 9092 for our internal usage.
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

First listener in slice will be written in `KAFKA_INTER_BROKER_LISTENER_NAME`
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

Every listener's name will be converted in upper case. Every name and port should be unique and will be checked in validation step.
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

If you are not using this option or list is empty, there will be 2 default listeners with next addresses
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

External - Host():MappedPort()
Internal - Host():9092

### Container Methods

The Kafka container exposes the following methods:
Expand All @@ -73,4 +121,4 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
<!--/codeinclude-->
<!--/codeinclude-->
10 changes: 5 additions & 5 deletions modules/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka
go 1.21

require (
github.com/IBM/sarama v1.42.1
github.com/IBM/sarama v1.43.2
github.com/docker/go-connections v0.5.0
github.com/testcontainers/testcontainers-go v0.31.0
golang.org/x/mod v0.16.0
Expand All @@ -22,7 +22,7 @@ require (
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v25.0.5+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -41,7 +41,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
Expand All @@ -51,7 +51,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
Expand All @@ -66,7 +66,7 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731190214-cbb8c96f2d6d // indirect
Expand Down
10 changes: 10 additions & 0 deletions modules/kafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOEl
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw=
github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
Expand Down Expand Up @@ -33,6 +35,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
Expand Down Expand Up @@ -88,6 +92,8 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand All @@ -108,6 +114,8 @@ github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQ
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -183,6 +191,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
157 changes: 125 additions & 32 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// starterScript {
starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
export KAFKA_ADVERTISED_LISTENERS=%s
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
Expand All @@ -34,6 +34,13 @@ echo '' > /etc/confluent/docker/ensure
type KafkaContainer struct {
testcontainers.Container
ClusterID string
Listeners KafkaListener
}

type KafkaListener struct {
Name string
Ip string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about host?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can u write an example? Cuz I think you can write it in IP

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you can use ip too. But, to be more generic my suggestion is to rename Ip to Host

Port string
}

// RunContainer creates an instance of the Kafka container type
Expand All @@ -43,10 +50,10 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
ExposedPorts: []string{string(publicPort)},
Env: map[string]string{
// envVars {
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_LISTENERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why it should change?

Copy link
Author

@catinapoke catinapoke Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More obvious way to show the purpose. It's also the way many people write listeners in articles

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not change this, as it could break existing client code using for reasons these env vars and the values

"KAFKA_BROKER_ID": "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1",
Expand All @@ -62,29 +69,61 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
Entrypoint: []string{"sh"},
// this CMD will wait for the starter script to be copied into the container and then execute it
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript},
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

settings := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(&settings)
}
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

trimListeners(settings.Listeners)
if err := validateListeners(settings.Listeners); err != nil {
return nil, fmt.Errorf("listeners validation: %w", err)
}

// apply envs for listeners
envChange := editEnvsForListeners(settings.Listeners)
for key, item := range envChange {
genericContainerReq.Env[key] = item
}

genericContainerReq.ContainerRequest.LifecycleHooks =
[]testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// 1. copy the starter script into the container
func(ctx context.Context, c testcontainers.Container) error {
host, err := c.Host(ctx)
if err != nil {
return err
if len(settings.Listeners) == 0 {
defaultInternal, err := internalListener(ctx, c)
if err != nil {
return fmt.Errorf("can't create default internal listener: %w", err)
}
settings.Listeners = append(settings.Listeners, defaultInternal)
}

inspect, err := c.Inspect(ctx)
defaultExternal, err := externalListener(ctx, c)
if err != nil {
return err
return fmt.Errorf("can't create default external listener: %w", err)
}

hostname := inspect.Config.Hostname
settings.Listeners = append(settings.Listeners, defaultExternal)

port, err := c.MappedPort(ctx, publicPort)
if err != nil {
return err
var advertised []string
for _, item := range settings.Listeners {
advertised = append(advertised, fmt.Sprintf("%s://%s:%s", item.Name, item.Ip, item.Port))
}

scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname)
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ","))

return c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755)
},
Expand All @@ -94,19 +133,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
},
},
},
},
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

for _, opt := range opts {
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

err := validateKRaftVersion(genericContainerReq.Image)
if err != nil {
Expand All @@ -125,12 +152,78 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return &KafkaContainer{Container: container, ClusterID: clusterID}, nil
}

func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["CLUSTER_ID"] = clusterID
func trimListeners(listeners []KafkaListener) {
catinapoke marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < len(listeners); i++ {
listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " "))
listeners[i].Ip = strings.Trim(listeners[i].Ip, " ")
listeners[i].Port = strings.Trim(listeners[i].Port, " ")
}
}

return nil
func validateListeners(listeners []KafkaListener) error {
catinapoke marked this conversation as resolved.
Show resolved Hide resolved
var ports map[string]bool = make(map[string]bool, len(listeners)+2)
var names map[string]bool = make(map[string]bool, len(listeners)+2)

// check for default listeners
ports["9094"] = true
ports["9093"] = true

// check for default listeners
names["CONTROLLER"] = true
names["EXTERNAL"] = true

for _, item := range listeners {
if names[item.Name] {
return fmt.Errorf("duplicate of listener name: %s", item.Name)
}
names[item.Name] = true

if ports[item.Port] {
return fmt.Errorf("duplicate of listener port: %s", item.Port)
}
ports[item.Port] = true
}

return nil
}

func editEnvsForListeners(listeners []KafkaListener) map[string]string {
catinapoke marked this conversation as resolved.
Show resolved Hide resolved
if len(listeners) == 0 {
// no change
return map[string]string{}
}

envs := map[string]string{
"KAFKA_LISTENERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093",
"KAFKA_REST_BOOTSTRAP_SERVERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka local provides a rest-proxy service but I don't see any test for it. It is also a good opportunity to add a test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will take a look

"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT, EXTERNAL:PLAINTEXT",
}

// expect first listener has common network between kafka instances
envs["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name

// expect small number of listeners, so joins is okay
for _, item := range listeners {
envs["KAFKA_LISTENERS"] = strings.Join(
[]string{
envs["KAFKA_LISTENERS"],
fmt.Sprintf("%s://0.0.0.0:%s", item.Name, item.Port),
},
",",
)

envs["KAFKA_REST_BOOTSTRAP_SERVERS"] = envs["KAFKA_LISTENERS"]

envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join(
[]string{
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"],
item.Name + ":" + "PLAINTEXT",
},
",",
)
}

return envs
}

// Brokers retrieves the broker connection strings from Kafka with only one entry,
Expand Down
11 changes: 4 additions & 7 deletions modules/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,12 @@ func TestKafka_invalidVersion(t *testing.T) {
}

// assertAdvertisedListeners checks that the advertised listeners are set correctly:
// - The BROKER:// protocol is using the hostname of the Kafka container
// - The INTERNAL:// protocol is using the hostname of the Kafka container
func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) {
inspect, err := container.Inspect(context.Background())
hostname, err := container.Host(context.Background())
if err != nil {
t.Fatal(err)
}

hostname := inspect.Config.Hostname

code, r, err := container.Exec(context.Background(), []string{"cat", "/usr/sbin/testcontainers_start.sh"})
if err != nil {
t.Fatal(err)
Expand All @@ -121,7 +118,7 @@ func assertAdvertisedListeners(t *testing.T, container testcontainers.Container)
t.Fatal(err)
}

if !strings.Contains(string(bs), "BROKER://"+hostname+":9092") {
t.Fatalf("expected advertised listeners to contain %s, got %s", "BROKER://"+hostname+":9092", string(bs))
if !strings.Contains(string(bs), "INTERNAL://"+hostname+":9092") {
t.Fatalf("expected advertised listeners to contain %s, got %s", "INTERNAL://"+hostname+":9092", string(bs))
}
}
Loading