-
-
Notifications
You must be signed in to change notification settings - Fork 515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix kafka internal docker connection #2490
base: main
Are you sure you want to change the base?
Changes from 3 commits
320e54d
9b6537d
15f4ca3
c7552f4
8904938
7b38c28
98793d7
62a68b5
a7c4a9f
bfa9191
f1b0d48
128d31a
ead068a
eb71110
a4d91e2
dd8fcec
5c147d4
b31e1af
9f23a4d
e63a604
f9cb155
8394529
31950f4
3aa2cbf
d61004f
df53491
bf02852
152ef0c
efa0f7d
d669bfb
4c44b97
ba701e1
b51be12
c8c47f4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ const ( | |
// starterScript { | ||
starterScriptContent = `#!/bin/bash | ||
source /etc/confluent/docker/bash-config | ||
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092 | ||
export KAFKA_ADVERTISED_LISTENERS=%s | ||
echo Starting Kafka KRaft mode | ||
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure | ||
echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure | ||
|
@@ -34,6 +34,13 @@ echo '' > /etc/confluent/docker/ensure | |
type KafkaContainer struct { | ||
testcontainers.Container | ||
ClusterID string | ||
Listeners KafkaListener | ||
} | ||
|
||
type KafkaListener struct { | ||
Name string | ||
Ip string | ||
Port string | ||
} | ||
|
||
// RunContainer creates an instance of the Kafka container type | ||
|
@@ -43,10 +50,10 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
ExposedPorts: []string{string(publicPort)}, | ||
Env: map[string]string{ | ||
// envVars { | ||
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT", | ||
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER", | ||
"KAFKA_LISTENERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_REST_BOOTSTRAP_SERVERS": "EXTERNAL://0.0.0.0:9093,INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", | ||
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT", | ||
"KAFKA_INTER_BROKER_LISTENER_NAME": "INTERNAL", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason why it should change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More obvious way to show the purpose. It's also the way many people write listeners in articles There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd not change this, as it could break existing client code using for reasons these env vars and the values |
||
"KAFKA_BROKER_ID": "1", | ||
"KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", | ||
"KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS": "1", | ||
|
@@ -62,29 +69,61 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
Entrypoint: []string{"sh"}, | ||
// this CMD will wait for the starter script to be copied into the container and then execute it | ||
Cmd: []string{"-c", "while [ ! -f " + starterScript + " ]; do sleep 0.1; done; bash " + starterScript}, | ||
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ | ||
} | ||
|
||
genericContainerReq := testcontainers.GenericContainerRequest{ | ||
ContainerRequest: req, | ||
Started: true, | ||
} | ||
|
||
settings := defaultOptions() | ||
for _, opt := range opts { | ||
if apply, ok := opt.(Option); ok { | ||
apply(&settings) | ||
} | ||
if err := opt.Customize(&genericContainerReq); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
trimListeners(settings.Listeners) | ||
if err := validateListeners(settings.Listeners); err != nil { | ||
return nil, fmt.Errorf("listeners validation: %w", err) | ||
} | ||
|
||
// apply envs for listeners | ||
envChange := editEnvsForListeners(settings.Listeners) | ||
for key, item := range envChange { | ||
genericContainerReq.Env[key] = item | ||
} | ||
|
||
genericContainerReq.ContainerRequest.LifecycleHooks = | ||
[]testcontainers.ContainerLifecycleHooks{ | ||
{ | ||
PostStarts: []testcontainers.ContainerHook{ | ||
// 1. copy the starter script into the container | ||
func(ctx context.Context, c testcontainers.Container) error { | ||
host, err := c.Host(ctx) | ||
if err != nil { | ||
return err | ||
if len(settings.Listeners) == 0 { | ||
defaultInternal, err := internalListener(ctx, c) | ||
if err != nil { | ||
return fmt.Errorf("can't create default internal listener: %w", err) | ||
} | ||
settings.Listeners = append(settings.Listeners, defaultInternal) | ||
} | ||
|
||
inspect, err := c.Inspect(ctx) | ||
defaultExternal, err := externalListener(ctx, c) | ||
if err != nil { | ||
return err | ||
return fmt.Errorf("can't create default external listener: %w", err) | ||
} | ||
|
||
hostname := inspect.Config.Hostname | ||
settings.Listeners = append(settings.Listeners, defaultExternal) | ||
|
||
port, err := c.MappedPort(ctx, publicPort) | ||
if err != nil { | ||
return err | ||
var advertised []string | ||
for _, item := range settings.Listeners { | ||
advertised = append(advertised, fmt.Sprintf("%s://%s:%s", item.Name, item.Ip, item.Port)) | ||
} | ||
|
||
scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname) | ||
scriptContent := fmt.Sprintf(starterScriptContent, strings.Join(advertised, ",")) | ||
|
||
return c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755) | ||
}, | ||
|
@@ -94,19 +133,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
}, | ||
}, | ||
}, | ||
}, | ||
} | ||
|
||
genericContainerReq := testcontainers.GenericContainerRequest{ | ||
ContainerRequest: req, | ||
Started: true, | ||
} | ||
|
||
for _, opt := range opts { | ||
if err := opt.Customize(&genericContainerReq); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
err := validateKRaftVersion(genericContainerReq.Image) | ||
if err != nil { | ||
|
@@ -125,12 +152,78 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize | |
return &KafkaContainer{Container: container, ClusterID: clusterID}, nil | ||
} | ||
|
||
func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption { | ||
return func(req *testcontainers.GenericContainerRequest) error { | ||
req.Env["CLUSTER_ID"] = clusterID | ||
func trimListeners(listeners []KafkaListener) { | ||
catinapoke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for i := 0; i < len(listeners); i++ { | ||
listeners[i].Name = strings.ToUpper(strings.Trim(listeners[i].Name, " ")) | ||
listeners[i].Ip = strings.Trim(listeners[i].Ip, " ") | ||
listeners[i].Port = strings.Trim(listeners[i].Port, " ") | ||
} | ||
} | ||
|
||
return nil | ||
func validateListeners(listeners []KafkaListener) error { | ||
catinapoke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var ports map[string]bool = make(map[string]bool, len(listeners)+2) | ||
var names map[string]bool = make(map[string]bool, len(listeners)+2) | ||
|
||
// check for default listeners | ||
ports["9094"] = true | ||
ports["9093"] = true | ||
|
||
// check for default listeners | ||
names["CONTROLLER"] = true | ||
names["EXTERNAL"] = true | ||
|
||
for _, item := range listeners { | ||
if names[item.Name] { | ||
return fmt.Errorf("duplicate of listener name: %s", item.Name) | ||
} | ||
names[item.Name] = true | ||
|
||
if ports[item.Port] { | ||
return fmt.Errorf("duplicate of listener port: %s", item.Port) | ||
} | ||
ports[item.Port] = true | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func editEnvsForListeners(listeners []KafkaListener) map[string]string { | ||
catinapoke marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if len(listeners) == 0 { | ||
// no change | ||
return map[string]string{} | ||
} | ||
|
||
envs := map[string]string{ | ||
"KAFKA_LISTENERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093", | ||
"KAFKA_REST_BOOTSTRAP_SERVERS": "CONTROLLER://0.0.0.0:9094, EXTERNAL://0.0.0.0:9093", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. kafka local provides a rest-proxy service but I don't see any test for it. It is also a good opportunity to add a test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will take a look |
||
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT, EXTERNAL:PLAINTEXT", | ||
} | ||
|
||
// expect first listener has common network between kafka instances | ||
envs["KAFKA_INTER_BROKER_LISTENER_NAME"] = listeners[0].Name | ||
|
||
// expect small number of listeners, so joins is okay | ||
for _, item := range listeners { | ||
envs["KAFKA_LISTENERS"] = strings.Join( | ||
[]string{ | ||
envs["KAFKA_LISTENERS"], | ||
fmt.Sprintf("%s://0.0.0.0:%s", item.Name, item.Port), | ||
}, | ||
",", | ||
) | ||
|
||
envs["KAFKA_REST_BOOTSTRAP_SERVERS"] = envs["KAFKA_LISTENERS"] | ||
|
||
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"] = strings.Join( | ||
[]string{ | ||
envs["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"], | ||
item.Name + ":" + "PLAINTEXT", | ||
}, | ||
",", | ||
) | ||
} | ||
|
||
return envs | ||
} | ||
|
||
// Brokers retrieves the broker connection strings from Kafka with only one entry, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about host?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can u write an example? Cuz I think you can write it in IP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you can use ip too. But, to be more generic my suggestion is to rename
Ip
toHost