Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka internal docker connection #2490

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
320e54d
fixed kafka internal docker connection
catinapoke Apr 19, 2024
9b6537d
added new option for kafka listeners
catinapoke May 10, 2024
15f4ca3
Merge remote-tracking branch 'original/main'
catinapoke May 10, 2024
c7552f4
fixed kafka docs with new options
catinapoke May 13, 2024
8904938
Update docs/modules/kafka.md
catinapoke May 14, 2024
7b38c28
Apply suggestions from code review for docs
catinapoke May 14, 2024
98793d7
added new test, fixed docs from code review
catinapoke May 16, 2024
62a68b5
Merge branch 'main' of https://github.com/catinapoke/testcontainers-go
catinapoke May 16, 2024
a7c4a9f
Merge remote-tracking branch 'Original/main'
catinapoke May 16, 2024
bfa9191
renamed docker folder to testdata and fixed test name
catinapoke May 16, 2024
f1b0d48
Merge remote-tracking branch 'Original/main'
catinapoke May 23, 2024
128d31a
added test for input listeners validation and refactor from code review
catinapoke May 23, 2024
ead068a
added unit test for trimValidateListeners
catinapoke May 28, 2024
eb71110
Merge remote-tracking branch 'Original/main'
catinapoke May 28, 2024
a4d91e2
Merge branch 'main' into main
mdelapenya Jun 7, 2024
dd8fcec
Merge remote-tracking branch 'upstream/main'
catinapoke Aug 25, 2024
5c147d4
go mod update
catinapoke Aug 25, 2024
b31e1af
todo for PR
catinapoke Aug 25, 2024
9f23a4d
updated copyStarterScript for upstream
catinapoke Aug 25, 2024
e63a604
fix: typo
mdelapenya Sep 2, 2024
f9cb155
fix: use Run method
mdelapenya Sep 2, 2024
8394529
chore: use PLAINTEXT and BROKER
mdelapenya Sep 2, 2024
31950f4
fix: handle error in tests
mdelapenya Sep 2, 2024
3aa2cbf
chore: make lint
mdelapenya Sep 2, 2024
d61004f
chore: use non deprecated APIs
mdelapenya Sep 2, 2024
df53491
chore: handle errors in testss
mdelapenya Sep 2, 2024
bf02852
fix: close sarama client
mdelapenya Sep 2, 2024
152ef0c
fix: validation in test
mdelapenya Sep 2, 2024
efa0f7d
chore: refactor test
mdelapenya Sep 2, 2024
d669bfb
chore: add test using kcat
mdelapenya Sep 2, 2024
4c44b97
Merge branch 'main' into catinapoke/main
mdelapenya Sep 9, 2024
ba701e1
Merge pull request #1 from mdelapenya/catinapoke/main
catinapoke Sep 17, 2024
b51be12
fixed basic kafka test but network one is broken
catinapoke Sep 17, 2024
c8c47f4
not working test with kcat
catinapoke Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,44 @@ The environment variables that are already set by default are:

{% include "../features/common_functional_options.md" %}


#### ClusterId

You can set up cluster id by using `WithClusterID` option.

```
KafkaContainer, err = kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.6.1"))
```

#### Listeners

If you need to connect new listeners, you can use `WithListener(listeners []KafkaListener)`.
This option controls the following environment variables for the Kafka container:
- `KAFKA_LISTENERS`
- `KAFKA_REST_BOOTSTRAP_SERVERS`
- `KAFKA_LISTENER_SECURITY_PROTOCOL_MAP`
- `KAFKA_INTER_BROKER_LISTENER_NAME`
- `KAFKA_ADVERTISED_LISTENERS`

Example:

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:kafkaWithListener
<!--/codeinclude-->

In the above code, we created a network for our container and attached kafka to it, so they can communicate. Then we marked port 9092 for our internal usage.

The first listener in the slice will be written in the env parameter `KAFKA_INTER_BROKER_LISTENER_NAME`

Every listener's name will be converted in upper case. Every name and port should be unique and will be checked in a validation step.

If you are not using this option or the listeners list is empty, there will be 2 default listeners with the following addresses and ports:

External - Host():MappedPort()
Internal - Host():9092

### Container Methods

The Kafka container exposes the following methods:
Expand All @@ -81,4 +119,4 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers
<!--/codeinclude-->
<!--/codeinclude-->
9 changes: 4 additions & 5 deletions modules/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/testcontainers/testcontainers-go/modules/kafka
go 1.22

require (
github.com/IBM/sarama v1.42.1
github.com/IBM/sarama v1.43.2
github.com/docker/go-connections v0.5.0
github.com/testcontainers/testcontainers-go v0.33.0
golang.org/x/mod v0.16.0
Expand All @@ -22,7 +22,7 @@ require (
github.com/distribution/reference v0.6.0 // indirect
github.com/docker/docker v27.1.1+incompatible // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand All @@ -40,7 +40,7 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/moby/docker-image-spec v1.3.1 // indirect
Expand All @@ -51,7 +51,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
Expand All @@ -67,7 +67,6 @@ require (
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect
Expand Down
16 changes: 8 additions & 8 deletions modules/kafka/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/IBM/sarama v1.42.1 h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=
github.com/IBM/sarama v1.42.1/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
github.com/IBM/sarama v1.43.2 h1:HABeEqRUh32z8yzY2hGB/j8mHSzC/HA9zlEjqFNCzSw=
github.com/IBM/sarama v1.43.2/go.mod h1:Kyo4WkF24Z+1nz7xeVUFWIuKVV8RS3wM8mkvPKMdXFQ=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand All @@ -31,8 +31,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/eapache/go-resiliency v1.4.0 h1:3OK9bWpPk5q6pbFAaYSEwD9CLUSHG8bnZuqX2yMt3B0=
github.com/eapache/go-resiliency v1.4.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-resiliency v1.6.0 h1:CqGDTLtpwuWKn6Nj3uNUdflaq+/kIPsg0gfNzHton30=
github.com/eapache/go-resiliency v1.6.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
Expand Down Expand Up @@ -84,8 +84,8 @@ github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZ
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand All @@ -106,8 +106,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug=
github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
160 changes: 126 additions & 34 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ const (
// starterScript {
starterScriptContent = `#!/bin/bash
source /etc/confluent/docker/bash-config
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092
export KAFKA_ADVERTISED_LISTENERS=%s
echo Starting Kafka KRaft mode
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure
Expand All @@ -34,6 +34,13 @@ echo '' > /etc/confluent/docker/ensure
type KafkaContainer struct {
testcontainers.Container
ClusterID string
Listeners KafkaListener
}

type KafkaListener struct {
Name string
Host string
Port string
}

// Deprecated: use Run instead
Expand All @@ -49,10 +56,10 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
ExposedPorts: []string{string(publicPort)},
Env: map[string]string{
// envVars {
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_LISTENERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why it should change?

Copy link
Author

@catinapoke catinapoke Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More obvious way to show the purpose. It's also the way many people write listeners in articles

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd not change this, as it could break existing client code using for reasons these env vars and the values

"KAFKA_BROKER_ID": "1",
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1",
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1",
Expand All @@ -68,15 +75,43 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
Entrypoint: []string{"sh"},
// this CMD will wait for the starter script to be copied into the container and then execute it
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript},
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

settings := defaultOptions()
for _, opt := range opts {
if apply, ok := opt.(Option); ok {
apply(&settings)
}
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

if err := trimValidateListeners(settings.Listeners); err != nil {
return nil, fmt.Errorf("listeners validation: %w", err)
}

// apply envs for listeners
envChange := editEnvsForListeners(settings.Listeners)
for key, item := range envChange {
genericContainerReq.Env[key] = item
}

genericContainerReq.ContainerRequest.LifecycleHooks =
[]testcontainers.ContainerLifecycleHooks{
{
PostStarts: []testcontainers.ContainerHook{
// Use a single hook to copy the starter script and wait for
// the Kafka server to be ready. This prevents the wait running
// if the starter script fails to copy.
func(ctx context.Context, c testcontainers.Container) error {
// 1. copy the starter script into the container
if err := copyStarterScript(ctx, c); err != nil {
if err := copyStarterScript(ctx, c, &settings); err != nil {
return fmt.Errorf("copy starter script: %w", err)
}

Expand All @@ -85,19 +120,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
},
},
},
},
}

genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

for _, opt := range opts {
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}

err := validateKRaftVersion(genericContainerReq.Image)
if err != nil {
Expand All @@ -116,32 +139,70 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return &KafkaContainer{Container: container, ClusterID: clusterID}, nil
}

func trimValidateListeners(listeners []KafkaListener) error {
// Trim
for i := 0; i < len(listeners); i++ {
listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " "))
listeners[i].Host = strings.Trim(listeners[i].Host, " ")
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
listeners[i].Port = strings.Trim(listeners[i].Port, " ")
}

// Validate
var ports map[string]bool = make(map[string]bool, len(listeners)+2)
var names map[string]bool = make(map[string]bool, len(listeners)+2)

// check for default listeners
ports["9094"] = true
ports["9093"] = true

// check for default listeners
names["CONTROLLER"] = true
names["EXTERNAL"] = true

for _, item := range listeners {
if names[item.Name] {
return fmt.Errorf("duplicate of listener name: %s", item.Name)
}
names[item.Name] = true

if ports[item.Port] {
return fmt.Errorf("duplicate of listener port: %s", item.Port)
}
ports[item.Port] = true
}

return nil
}

// copyStarterScript copies the starter script into the container.
func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
func copyStarterScript(ctx context.Context, c testcontainers.Container, settings *options) error {
if err := wait.ForListeningPort(publicPort).
SkipInternalCheck().
WaitUntilReady(ctx, c); err != nil {
return fmt.Errorf("wait for exposed port: %w", err)
}

host, err := c.Host(ctx)
if err != nil {
return fmt.Errorf("host: %w", err)
if len(settings.Listeners) == 0 {
defaultInternal, err := internalListener(ctx, c)
if err != nil {
return fmt.Errorf("can't create default internal listener: %w", err)
}
settings.Listeners = append(settings.Listeners, defaultInternal)
}

inspect, err := c.Inspect(ctx)
defaultExternal, err := externalListener(ctx, c)
if err != nil {
return fmt.Errorf("inspect: %w", err)
return fmt.Errorf("can't create default external listener: %w", err)
}

hostname := inspect.Config.Hostname
settings.Listeners = append(settings.Listeners, defaultExternal)

port, err := c.MappedPort(ctx, publicPort)
if err != nil {
return fmt.Errorf("mapped port: %w", err)
var advertised []string
for _, item := range settings.Listeners {
advertised = append(advertised, fmt.Sprintf("%s://%s:%s", item.Name, item.Host, item.Port))
}

scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname)
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ","))

if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil {
return fmt.Errorf("copy to container: %w", err)
Expand All @@ -150,12 +211,43 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error {
return nil
}

func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["CLUSTER_ID"] = clusterID
func editEnvsForListeners(listeners []KafkaListener) map[string]string {
catinapoke marked this conversation as resolved.
Show resolved Hide resolved
if len(listeners) == 0 {
// no change
return map[string]string{}
}

return nil
envs := map[string]string{
"KAFKA_LISTENERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093",
"KAFKA_REST_BOOTSTRAP_SERVERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kafka local provides a rest-proxy service but I don't see any test for it. It is also a good opportunity to add a test.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will take a look

"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT, EXTERNAL:PLAINTEXT",
}

// expect first listener has common network between kafka instances
envs["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name

// expect small number of listeners, so joins is okay
for _, item := range listeners {
envs["KAFKA_LISTENERS"] = strings.Join(
[]string{
envs["KAFKA_LISTENERS"],
fmt.Sprintf("%s://0.0.0.0:%s", item.Name, item.Port),
},
",",
)

envs["KAFKA_REST_BOOTSTRAP_SERVERS"] = envs["KAFKA_LISTENERS"]

envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join(
[]string{
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"],
item.Name + ":" + "PLAINTEXT",
},
",",
)
}

return envs
}

// Brokers retrieves the broker connection strings from Kafka with only one entry,
Expand Down
Loading