diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index d2f951b043..90cca83c3a 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -61,6 +61,15 @@ The Kafka container will be started using a custom shell script: [Init script](../../modules/kafka/kafka.go) inside_block:starterScript +That will set the advertised listeners with these values: + + +[Advertised Listeners](../../modules/kafka/kafka.go) inside_block:advertisedListeners + + +KafkaContainer provides methods to read the broker addresses for different +connectivity environments. + #### Environment variables The environment variables that are already set by default are: @@ -82,3 +91,33 @@ The `Brokers(ctx)` method returns the Kafka brokers as a string slice, containin [Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:getBrokers + +#### BrokersByHostDockerInternal + +The `BrokersByHostDockerInternal(ctx)` method returns the Kafka brokers as a +string slice, containing the hostname `host.docker.internal:`. + +This method is useful when you need to run additional containers that need to +connect to Kafka. + + +[Get Kafka brokers by host.docker.internal](../../modules/kafka/examples_test.go) inside_block:getBrokersByHostDockerInternal + + +#### BrokersByContainerName + +The `BrokersByContainerName(ctx)` method returns the Kafka brokers as a string +slice, addressed by the container's name(`Ex: charming_dijkstra:19093`). This +method is useful when you need to run additional containers that need to connect +to Kafka. + +To use this broker address you should run all the containers inside a docker +network. + + +[First start Kafka inside a docker network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kafka + + + +[Then start a second container in the same network](../../modules/kafka/examples_test.go) inside_block:getBrokersByContainerName_Kcat + diff --git a/modules/kafka/examples_test.go b/modules/kafka/examples_test.go index c275924ecc..ee641e9fcb 100644 --- a/modules/kafka/examples_test.go +++ b/modules/kafka/examples_test.go @@ -1,12 +1,20 @@ package kafka_test import ( + "bytes" "context" "fmt" + "io" "log" + "strings" + + "github.com/IBM/sarama" + "github.com/docker/docker/api/types/container" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/kafka" + "github.com/testcontainers/testcontainers-go/network" + "github.com/testcontainers/testcontainers-go/wait" ) func ExampleRun() { @@ -19,15 +27,270 @@ func ExampleRun() { ) defer func() { if err := testcontainers.TerminateContainer(kafkaContainer); err != nil { + log.Printf("failed to terminate container: %v", err) + } + }() + if err != nil { + log.Printf("failed to start container: %s", err) + return + } + // } + + state, err := kafkaContainer.State(ctx) + if err != nil { + log.Printf("failed to get container state: %s", err) + return + } + + fmt.Println(kafkaContainer.ClusterID) + fmt.Println(state.Running) + + // Output: + // test-cluster + // true +} + +func ExampleKafkaContainer_BrokersByHostDockerInternal() { + ctx := context.Background() + + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.5.0", + kafka.WithClusterID("test-cluster"), + ) + if err != nil { + log.Printf("failed to start container: %s", err) + return + } + + // Clean up the container after + defer func() { + if err := kafkaContainer.Terminate(ctx); err != nil { log.Printf("failed to terminate container: %s", err) } }() + + state, err := kafkaContainer.State(ctx) + if err != nil { + log.Printf("failed to get container state: %s", err) + return + } + + fmt.Println(kafkaContainer.ClusterID) + fmt.Println(state.Running) + + const topic = "example-topic" + + // Produce a message from the host that will be read by a consumer in another docker container + brokers, err := kafkaContainer.Brokers(ctx) + if err != nil { + log.Print(err) + return + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Print(err) + return + } + + if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("key"), + Value: sarama.StringEncoder("example_message_value"), + }); err != nil { + log.Print(err) + return + } + + // getBrokersByHostDockerInternal { + brokers, err = kafkaContainer.BrokersByHostDockerInternal(ctx) + if err != nil { + log.Print(err) + return + } + + // Run another container that can connect to the kafka container via hostname "host.docker.internal" + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + + // Add host.docker.internal to the consumer container so it can contact the kafka borkers + HostConfigModifier: func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, + }, + Started: true, + }, + ) + if err != nil { + log.Printf("kafkacat error: %v", err) + return + } + + lr, err := kcat.Logs(ctx) + if err != nil { + log.Printf("kafkacat logs error: %v", err) + return + } + + logs, err := io.ReadAll(lr) + if err != nil { + log.Printf("kafkacat logs read error: %v", err) + return + } + + fmt.Println("read message:", string(bytes.TrimSpace(logs))) + // } + + // Output: + // test-cluster + // true + // read message: example_message_value +} + +func ExampleKafkaContainer_BrokersByContainerName() { + ctx := context.Background() + + // getBrokersByContainerName_Kafka { + net, err := network.New(ctx) + if err != nil { + log.Printf("failed to create network: %s", err) + return + } + + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.5.0", + kafka.WithClusterID("test-cluster"), + network.WithNetwork(nil, net), // Run kafka test container in a new docker network + ) if err != nil { log.Printf("failed to start container: %s", err) return } // } + // Clean up the container after + defer func() { + if err := kafkaContainer.Terminate(ctx); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }() + + state, err := kafkaContainer.State(ctx) + if err != nil { + log.Printf("failed to get container state: %s", err) + return + } + + fmt.Println(kafkaContainer.ClusterID) + fmt.Println(state.Running) + + const topic = "example-topic" + + // Produce a message from the host that will be read by a consumer in another docker container + brokers, err := kafkaContainer.Brokers(ctx) + if err != nil { + log.Print(err) + return + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Print(err) + return + } + + if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("key"), + Value: sarama.StringEncoder("example_message_value"), + }); err != nil { + log.Print(err) + return + } + + // getBrokersByContainerName_Kcat { + brokers, err = kafkaContainer.BrokersByContainerName(ctx) + if err != nil { + log.Print(err) + return + } + + // Run another container that can connect to the kafka container via the kafka containers name + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer + }, + Started: true, + }, + ) + if err != nil { + log.Printf("kafkacat error: %v", err) + return + } + + lr, err := kcat.Logs(ctx) + if err != nil { + log.Printf("kafkacat logs error: %v", err) + return + } + + logs, err := io.ReadAll(lr) + if err != nil { + log.Printf("kafkacat logs read error: %v", err) + return + } + + fmt.Println("read message:", string(bytes.TrimSpace(logs))) + // } + + // Output: + // test-cluster + // true + // read message: example_message_value +} + +func ExampleKafkaContainer_BrokersByContainerId() { + ctx := context.Background() + + net, err := network.New(ctx) + if err != nil { + log.Printf("failed to create network: %s", err) + return + } + + kafkaContainer, err := kafka.Run(ctx, + "confluentinc/confluent-local:7.5.0", + kafka.WithClusterID("test-cluster"), + network.WithNetwork(nil, net), // Run kafka test container in a new docker network + ) + if err != nil { + log.Printf("failed to start container: %s", err) + return + } + + // Clean up the container after + defer func() { + if err := kafkaContainer.Terminate(ctx); err != nil { + log.Printf("failed to terminate container: %s", err) + } + }() + state, err := kafkaContainer.State(ctx) if err != nil { log.Printf("failed to get container state: %s", err) @@ -37,7 +300,73 @@ func ExampleRun() { fmt.Println(kafkaContainer.ClusterID) fmt.Println(state.Running) + const topic = "example-topic" + + // Produce a message from the host that will be read by a consumer in another docker container + brokers, err := kafkaContainer.Brokers(ctx) + if err != nil { + log.Print(err) + return + } + + config := sarama.NewConfig() + config.Producer.Return.Successes = true + producer, err := sarama.NewSyncProducer(brokers, config) + if err != nil { + log.Print(err) + return + } + + if _, _, err := producer.SendMessage(&sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder("key"), + Value: sarama.StringEncoder("example_message_value"), + }); err != nil { + log.Print(err) + return + } + + brokers, err = kafkaContainer.BrokersByContainerId(ctx) + if err != nil { + log.Print(err) + return + } + + // Run another container that can connect to the kafka container via the kafka containers ContainerID + kcat, err := testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", topic, "-c", "1"}, + WaitingFor: wait.ForExit(), + Networks: []string{net.Name}, // Run kafkacat in the same docker network as the testcontainer + }, + Started: true, + }, + ) + if err != nil { + log.Printf("kafkacat error: %v", err) + return + } + + lr, err := kcat.Logs(ctx) + if err != nil { + log.Printf("kafkacat logs error: %v", err) + return + } + + logs, err := io.ReadAll(lr) + if err != nil { + log.Printf("kafkacat logs read error: %v", err) + return + } + + fmt.Println("read message:", string(bytes.TrimSpace(logs))) + // Output: // test-cluster // true + // read message: example_message_value } diff --git a/modules/kafka/go.mod b/modules/kafka/go.mod index 7d2f6bc196..97cb0f9e9b 100644 --- a/modules/kafka/go.mod +++ b/modules/kafka/go.mod @@ -4,6 +4,7 @@ go 1.22 require ( github.com/IBM/sarama v1.42.1 + github.com/docker/docker v27.1.1+incompatible github.com/docker/go-connections v0.5.0 github.com/stretchr/testify v1.9.0 github.com/testcontainers/testcontainers-go v0.34.0 @@ -21,7 +22,6 @@ require ( github.com/cpuguy83/dockercfg v0.3.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/distribution/reference v0.6.0 // indirect - github.com/docker/docker v27.1.1+incompatible // indirect github.com/docker/go-units v0.5.0 // indirect github.com/eapache/go-resiliency v1.4.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect diff --git a/modules/kafka/kafka.go b/modules/kafka/kafka.go index 73e392e1d2..ea7fb9e859 100644 --- a/modules/kafka/kafka.go +++ b/modules/kafka/kafka.go @@ -15,20 +15,35 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) -const publicPort = nat.Port("9093/tcp") +const ( + // Internal Listening port for Broker intercommunication + brokerToBrokerPort = 9092 + // Mapped port for advertised listener of localhost:. + publicLocalhostPort = nat.Port("9093/tcp") + // Internal listening port for Contoller + controllerPort = 9094 + // Mapped port for advertised listener of host.docker.internal: + publicDockerHostPort = nat.Port("19093/tcp") + // Internal listening port for advertised listener of :19094. This is not mapped to a random host port + networkInternalContainerNamePort = 19094 + // Internal listening port for advertised listener of :19095. This is not mapped to a random host port + networkInternalContainerIdPort = 19095 +) + const ( starterScript = "/usr/sbin/testcontainers_start.sh" // starterScript { starterScriptContent = `#!/bin/bash source /etc/confluent/docker/bash-config -export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://%s:%d,BROKER://%s:9092 +export KAFKA_ADVERTISED_LISTENERS=%s echo Starting Kafka KRaft mode sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure echo 'kafka-storage format --ignore-formatted -t "$(kafka-storage random-uuid)" -c /etc/kafka/kafka.properties' >> /etc/confluent/docker/configure echo '' > /etc/confluent/docker/ensure /etc/confluent/docker/configure -/etc/confluent/docker/launch` +/etc/confluent/docker/launch +` // } ) @@ -46,14 +61,35 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize // Run creates an instance of the Kafka container type func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) { + listeners := strings.Join([]string{ + "LOCALHOST://0.0.0.0:" + strconv.Itoa(publicLocalhostPort.Int()), + "HOST_DOCKER_INTERNAL://0.0.0.0:" + strconv.Itoa(publicDockerHostPort.Int()), + "CONTAINER_NAME://0.0.0.0:" + strconv.Itoa(networkInternalContainerNamePort), + "CONTAINER_ID://0.0.0.0:" + strconv.Itoa(networkInternalContainerIdPort), + "BROKER://0.0.0.0:" + strconv.Itoa(brokerToBrokerPort), + "CONTROLLER://0.0.0.0:" + strconv.Itoa(controllerPort), + }, ",") + + protoMap := strings.Join([]string{ + "LOCALHOST:PLAINTEXT", + "HOST_DOCKER_INTERNAL:PLAINTEXT", + "CONTAINER_NAME:PLAINTEXT", + "CONTAINER_ID:PLAINTEXT", + "BROKER:PLAINTEXT", + "CONTROLLER:PLAINTEXT", + }, ",") + req := testcontainers.ContainerRequest{ - Image: img, - ExposedPorts: []string{string(publicPort)}, + Image: img, + ExposedPorts: []string{ + string(publicLocalhostPort), + string(publicDockerHostPort), + }, Env: map[string]string{ // envVars { - "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", - "KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", - "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT", + "KAFKA_LISTENERS": listeners, + "KAFKA_REST_BOOTSTRAP_SERVERS": listeners, + "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": protoMap, "KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER", "KAFKA_BROKER_ID": "1", "KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR": "1", @@ -123,7 +159,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom // copyStarterScript copies the starter script into the container. func copyStarterScript(ctx context.Context, c testcontainers.Container) error { - if err := wait.ForListeningPort(publicPort). + if err := wait.ForListeningPort(publicLocalhostPort). SkipInternalCheck(). WaitUntilReady(ctx, c); err != nil { return fmt.Errorf("wait for exposed port: %w", err) @@ -141,12 +177,27 @@ func copyStarterScript(ctx context.Context, c testcontainers.Container) error { hostname := inspect.Config.Hostname - port, err := c.MappedPort(ctx, publicPort) + portLh, err := c.MappedPort(ctx, publicLocalhostPort) if err != nil { return fmt.Errorf("mapped port: %w", err) } - scriptContent := fmt.Sprintf(starterScriptContent, host, port.Int(), hostname) + portDh, err := c.MappedPort(ctx, publicDockerHostPort) + if err != nil { + return fmt.Errorf("mapped port: %w", err) + } + + // advertisedListeners { + advertisedListeners := strings.Join([]string{ + fmt.Sprintf("LOCALHOST://%s:%d", host, portLh.Int()), + fmt.Sprintf("HOST_DOCKER_INTERNAL://%s:%d", "host.docker.internal", portDh.Int()), + fmt.Sprintf("CONTAINER_NAME://%s:%d", strings.Trim(inspect.Name, "/"), networkInternalContainerNamePort), + fmt.Sprintf("CONTAINER_ID://%s:%d", hostname, networkInternalContainerIdPort), + fmt.Sprintf("BROKER://%s:%d", hostname, brokerToBrokerPort), + }, ",") + + scriptContent := fmt.Sprintf(starterScriptContent, advertisedListeners) + // } if err := c.CopyToContainer(ctx, []byte(scriptContent), starterScript, 0o755); err != nil { return fmt.Errorf("copy to container: %w", err) @@ -165,18 +216,61 @@ func WithClusterID(clusterID string) testcontainers.CustomizeRequestOption { // Brokers retrieves the broker connection strings from Kafka with only one entry, // defined by the exposed public port. +// +// Example Output: localhost: func (kc *KafkaContainer) Brokers(ctx context.Context) ([]string, error) { host, err := kc.Host(ctx) if err != nil { return nil, err } - port, err := kc.MappedPort(ctx, publicPort) + port, err := kc.MappedPort(ctx, publicLocalhostPort) + if err != nil { + return nil, err + } + + return []string{host + ":" + port.Port()}, nil +} + +// BrokersByHostDockerInternal retrieves broker connection string suitable when +// running 2 containers in the default docker network +// +// Example Output: host.docker.internal: +func (kc *KafkaContainer) BrokersByHostDockerInternal(ctx context.Context) ([]string, error) { + port, err := kc.MappedPort(ctx, publicDockerHostPort) + if err != nil { + return nil, err + } + + host := "host.docker.internal" + return []string{host + ":" + port.Port()}, nil +} + +// BrokersByContainerName retrieves broker connection string suitable when +// trying to connect 2 containers running within the same docker network together +// +// Example Output: zealous_murdock:19093 +func (kc *KafkaContainer) BrokersByContainerName(ctx context.Context) ([]string, error) { + inspect, err := kc.Inspect(ctx) + if err != nil { + return nil, err + } + + host := strings.Trim(inspect.Name, "/") + return []string{host + ":" + strconv.Itoa(networkInternalContainerNamePort)}, nil +} + +// BrokersByContainerId retrieves broker connection string suitable when +// trying to connect 2 containers running within the same docker network together +// +// Example Output: e3c69e4fc625:19094 +func (kc *KafkaContainer) BrokersByContainerId(ctx context.Context) ([]string, error) { + inspect, err := kc.Inspect(ctx) if err != nil { return nil, err } - return []string{fmt.Sprintf("%s:%d", host, port.Int())}, nil + return []string{inspect.Config.Hostname + ":" + strconv.Itoa(networkInternalContainerIdPort)}, nil } // configureControllerQuorumVoters sets the quorum voters for the controller. For that, it will diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index af858f849f..21ad06e977 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -1,23 +1,44 @@ package kafka_test import ( + "bytes" "context" + "fmt" + "io" "strings" "testing" "github.com/IBM/sarama" + "github.com/docker/docker/api/types/container" + "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/modules/kafka" + "github.com/testcontainers/testcontainers-go/network" + "github.com/testcontainers/testcontainers-go/wait" ) -func TestKafka(t *testing.T) { - topic := "some-topic" +func TestKafka_invalidVersion(t *testing.T) { + ctx := context.Background() + ctr, err := kafka.Run(ctx, "confluentinc/confluent-local:6.3.3", kafka.WithClusterID("kraftCluster")) + testcontainers.CleanupContainer(t, ctr) + require.Error(t, err) +} + +const ( + testTopic = "some-topic" + testValue = "kafka-message-value" +) + +func TestKafka(t *testing.T) { ctx := context.Background() + net, err := network.New(ctx) + testcontainers.CleanupNetwork(t, net) + require.NoError(t, err) - kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0", kafka.WithClusterID("kraftCluster")) + kafkaContainer, err := kafka.Run(ctx, "confluentinc/confluent-local:7.5.0", kafka.WithClusterID("kraftCluster"), network.WithNetwork(nil, net)) testcontainers.CleanupContainer(t, kafkaContainer) require.NoError(t, err) @@ -37,7 +58,7 @@ func TestKafka(t *testing.T) { consumer, ready, done, cancel := NewTestKafkaConsumer(t) defer cancel() go func() { - if err := client.Consume(context.Background(), []string{topic}, consumer); err != nil { + if err := client.Consume(context.Background(), []string{testTopic}, consumer); err != nil { cancel() } }() @@ -54,38 +75,150 @@ func TestKafka(t *testing.T) { require.NoError(t, err) _, _, err = producer.SendMessage(&sarama.ProducerMessage{ - Topic: topic, + Topic: testTopic, Key: sarama.StringEncoder("key"), - Value: sarama.StringEncoder("value"), + Value: sarama.StringEncoder(testValue), }) require.NoError(t, err) <-done require.Truef(t, strings.EqualFold(string(consumer.message.Key), "key"), "expected key to be %s, got %s", "key", string(consumer.message.Key)) - require.Truef(t, strings.EqualFold(string(consumer.message.Value), "value"), "expected value to be %s, got %s", "value", string(consumer.message.Value)) + require.Truef(t, strings.EqualFold(string(consumer.message.Value), testValue), "expected value to be %s, got %s", testValue, string(consumer.message.Value)) + + t.Run("BrokersByHostDockerInternal", func(t *testing.T) { + brokers, err := kafkaContainer.BrokersByHostDockerInternal(ctx) + require.NoError(t, err) + + kcat, err := runKcatContainer(ctx, brokers, func(hc *container.HostConfig) { + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, nil) + testcontainers.CleanupContainer(t, kcat) + require.NoError(t, err) + + l, err := kcat.Logs(ctx) + require.NoError(t, err) + defer l.Close() + + assertKcatReadMsg(t, l) + }) + t.Run("BrokersByContainerId", func(t *testing.T) { + brokers, err := kafkaContainer.BrokersByContainerId(ctx) + require.NoError(t, err) + + kcat, err := runKcatContainer(ctx, brokers, nil, []string{net.Name}) + testcontainers.CleanupContainer(t, kcat) + require.NoError(t, err) + + l, err := kcat.Logs(ctx) + require.NoError(t, err) + defer l.Close() + + assertKcatReadMsg(t, l) + }) + t.Run("BrokersByContainerName", func(t *testing.T) { + brokers, err := kafkaContainer.BrokersByContainerName(ctx) + require.NoError(t, err) + + kcat, err := runKcatContainer(ctx, brokers, nil, []string{net.Name}) + testcontainers.CleanupContainer(t, kcat) + require.NoError(t, err) + + l, err := kcat.Logs(ctx) + require.NoError(t, err) + defer l.Close() + + assertKcatReadMsg(t, l) + }) } -func TestKafka_invalidVersion(t *testing.T) { - ctx := context.Background() +func runKcatContainer(ctx context.Context, brokers []string, hostMod func(*container.HostConfig), networks []string) (testcontainers.Container, error) { + return testcontainers.GenericContainer( + ctx, + testcontainers.GenericContainerRequest{ + ContainerRequest: testcontainers.ContainerRequest{ + Image: "confluentinc/cp-kafkacat", + Entrypoint: []string{"kafkacat"}, + Cmd: []string{"-b", strings.Join(brokers, ","), "-C", "-t", testTopic, "-c", "1"}, + WaitingFor: wait.ForExit(), + HostConfigModifier: hostMod, + Networks: networks, + }, + Started: true, + }, + ) +} - ctr, err := kafka.Run(ctx, "confluentinc/confluent-local:6.3.3", kafka.WithClusterID("kraftCluster")) - testcontainers.CleanupContainer(t, ctr) - require.Error(t, err) +func assertKcatReadMsg(t *testing.T, l io.Reader) { + t.Helper() + lb, err := io.ReadAll(l) + require.NoError(t, err) + + readMsg := string(bytes.TrimSpace(lb)) + require.Truef(t, strings.EqualFold(readMsg, testValue), "expected value to be %s, got %s", testValue, readMsg) } +const ( + // Internal listening port for Broker intercommunication + brokerToBrokerPort = 9092 + // Mapped port for advertised listener of localhost:. + publicLocalhostPort = nat.Port("9093/tcp") + // Mapped port for advertised listener of host.docker.internal: + publicDockerHostPort = nat.Port("19093/tcp") + // Internal listening port for advertised listener of :19094. This is not mapped to a random host port + networkInternalContainerNamePort = 19094 + // Internal listening port for advertised listener of :19095. This is not mapped to a random host port + networkInternalContainerIdPort = 19095 +) + // assertAdvertisedListeners checks that the advertised listeners are set correctly: +// - The LOCALHOST:// protocol is using the host of the Kafka container +// - The HOST_DOCKER_INTERNAL:// protocol is using hostname host.docker.internal +// - The CONTAINER_NAME:// protocol is using the container name of the Kafka container +// - The CONTAINER_ID:// protocol is using the container ID of the Kafka container // - The BROKER:// protocol is using the hostname of the Kafka container -func assertAdvertisedListeners(t *testing.T, container testcontainers.Container) { +func assertAdvertisedListeners(t *testing.T, container *kafka.KafkaContainer) { t.Helper() - inspect, err := container.Inspect(context.Background()) + ctx := context.Background() + + inspect, err := container.Inspect(ctx) require.NoError(t, err) - brokerURL := "BROKER://" + inspect.Config.Hostname + ":9092" + portLh, err := container.MappedPort(ctx, publicLocalhostPort) + require.NoError(t, err) - ctx := context.Background() + portDh, err := container.MappedPort(ctx, publicDockerHostPort) + require.NoError(t, err) + + host, err := container.Host(ctx) + require.NoError(t, err) bs := testcontainers.RequireContainerExec(ctx, t, container, []string{"cat", "/usr/sbin/testcontainers_start.sh"}) - require.Containsf(t, bs, brokerURL, "expected advertised listeners to contain %s, got %s", brokerURL, bs) + assert := func(listener string) { + t.Helper() + require.Containsf(t, bs, listener, "expected advertised listeners to contain %s, got %s", listener, bs) + } + + mustBrokers := func(fn func(context.Context) ([]string, error)) string { + t.Helper() + brokers, err := fn(ctx) + require.NoError(t, err) + require.Len(t, brokers, 1) + return brokers[0] + } + + assert(fmt.Sprintf("LOCALHOST://%s:%d", host, portLh.Int())) + assert(fmt.Sprintf("LOCALHOST://%s", mustBrokers(container.Brokers))) //nolint:perfsprint + + assert(fmt.Sprintf("HOST_DOCKER_INTERNAL://host.docker.internal:%d", portDh.Int())) + assert(fmt.Sprintf("HOST_DOCKER_INTERNAL://%s", mustBrokers(container.BrokersByHostDockerInternal))) //nolint:perfsprint + + assert(fmt.Sprintf("CONTAINER_NAME://%s:%d", strings.Trim(inspect.Name, "/"), networkInternalContainerNamePort)) + assert(fmt.Sprintf("CONTAINER_NAME://%s", mustBrokers(container.BrokersByContainerName))) //nolint:perfsprint + + assert(fmt.Sprintf("CONTAINER_ID://%s:%d", inspect.Config.Hostname, networkInternalContainerIdPort)) + assert(fmt.Sprintf("CONTAINER_ID://%s", mustBrokers(container.BrokersByContainerId))) //nolint:perfsprint + + assert(fmt.Sprintf("BROKER://%s:%d", inspect.Config.Hostname, brokerToBrokerPort)) }