Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix kafka internal docker connection #2490

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
320e54d
fixed kafka internal docker connection
catinapoke Apr 19, 2024
9b6537d
added new option for kafka listeners
catinapoke May 10, 2024
15f4ca3
Merge remote-tracking branch 'original/main'
catinapoke May 10, 2024
c7552f4
fixed kafka docs with new options
catinapoke May 13, 2024
8904938
Update docs/modules/kafka.md
catinapoke May 14, 2024
7b38c28
Apply suggestions from code review for docs
catinapoke May 14, 2024
98793d7
added new test, fixed docs from code review
catinapoke May 16, 2024
62a68b5
Merge branch 'main' of https://github.com/catinapoke/testcontainers-go
catinapoke May 16, 2024
a7c4a9f
Merge remote-tracking branch 'Original/main'
catinapoke May 16, 2024
bfa9191
renamed docker folder to testdata and fixed test name
catinapoke May 16, 2024
f1b0d48
Merge remote-tracking branch 'Original/main'
catinapoke May 23, 2024
128d31a
added test for input listeners validation and refactor from code review
catinapoke May 23, 2024
ead068a
added unit test for trimValidateListeners
catinapoke May 28, 2024
eb71110
Merge remote-tracking branch 'Original/main'
catinapoke May 28, 2024
a4d91e2
Merge branch 'main' into main
mdelapenya Jun 7, 2024
dd8fcec
Merge remote-tracking branch 'upstream/main'
catinapoke Aug 25, 2024
5c147d4
go mod update
catinapoke Aug 25, 2024
b31e1af
todo for PR
catinapoke Aug 25, 2024
9f23a4d
updated copyStarterScript for upstream
catinapoke Aug 25, 2024
e63a604
fix: typo
mdelapenya Sep 2, 2024
f9cb155
fix: use Run method
mdelapenya Sep 2, 2024
8394529
chore: use PLAINTEXT and BROKER
mdelapenya Sep 2, 2024
31950f4
fix: handle error in tests
mdelapenya Sep 2, 2024
3aa2cbf
chore: make lint
mdelapenya Sep 2, 2024
d61004f
chore: use non deprecated APIs
mdelapenya Sep 2, 2024
df53491
chore: handle errors in testss
mdelapenya Sep 2, 2024
bf02852
fix: close sarama client
mdelapenya Sep 2, 2024
152ef0c
fix: validation in test
mdelapenya Sep 2, 2024
efa0f7d
chore: refactor test
mdelapenya Sep 2, 2024
d669bfb
chore: add test using kcat
mdelapenya Sep 2, 2024
4c44b97
Merge branch 'main' into catinapoke/main
mdelapenya Sep 9, 2024
ba701e1
Merge pull request #1 from mdelapenya/catinapoke/main
catinapoke Sep 17, 2024
b51be12
fixed basic kafka test but network one is broken
catinapoke Sep 17, 2024
c8c47f4
not working test with kcat
catinapoke Sep 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,14 @@ This option controls next env parameters:
- `KAFKA_ADVERTISED_LISTENERS`

Example:
```
KafkaContainer, err = kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/confluent-local:7.6.1"),
network.WithNetwork([]string{"kafka"}, Network),
kafka.WithListener([]kafka.KafkaListener{
{
Name: "INTERNAL",
Ip: "kafka",
Port: "9092",
},
}),
)
```

<!--codeinclude-->
[Get Kafka brokers](../../modules/kafka/kafka_test.go) inside_block:kafkaWithListener
<!--/codeinclude-->

Here we created network for our container and added kafka to it, so they can communicate. Then we marked port 9092 for our internal usage.
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

First listener in slice will be written in `KAFKA_INTER_BROKER_LISTENER_NAME`
The first listener in the slice will be written in the env parameter `KAFKA_INTER_BROKER_LISTENER_NAME`

Every listener's name will be converted in upper case. Every name and port should be unique and will be checked in validation step.
catinapoke marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
14 changes: 14 additions & 0 deletions modules/kafka/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:bullseye

WORKDIR /app

COPY go.mod .
COPY go.sum .

RUN go mod tidy

COPY . .

RUN go build -o ./pg_test

CMD /app/pg_test
14 changes: 14 additions & 0 deletions modules/kafka/docker/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
module amogus

go 1.21.6

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0
github.com/google/uuid v1.6.0
github.com/pkg/errors v0.9.1
)

require (
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.15.0 // indirect
)
72 changes: 72 additions & 0 deletions modules/kafka/docker/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Microsoft/go-winio v0.5.2 h1:a9IhgEQBCUEk6QCdml9CiJGhAws+YwffDHEMp1VMrpA=
github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY=
github.com/Microsoft/hcsshim v0.9.4 h1:mnUj0ivWy6UzbB1uLFqKR6F+ZyiDc7j4iGgHTpO+5+I=
github.com/Microsoft/hcsshim v0.9.4/go.mod h1:7pLA8lDk46WKDWlVsENo92gC0XFa8rbKfyFRBqxEbCc=
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0 h1:icCHutJouWlQREayFwCc7lxDAhws08td+W3/gdqgZts=
github.com/confluentinc/confluent-kafka-go/v2 v2.3.0/go.mod h1:/VTy8iEpe6mD9pkCH5BhijlUl8ulUXymKv1Qig5Rgb8=
github.com/containerd/cgroups v1.0.4 h1:jN/mbWBEaz+T1pi5OFtnkQ+8qnmEbAr1Oo1FRm5B0dA=
github.com/containerd/cgroups v1.0.4/go.mod h1:nLNQtsF7Sl2HxNebu77i1R0oDlhiTG+kO4JTrUzo6IA=
github.com/containerd/containerd v1.6.8 h1:h4dOFDwzHmqFEP754PgfgTeVXFnLiRc6kiqC7tplDJs=
github.com/containerd/containerd v1.6.8/go.mod h1:By6p5KqPK0/7/CgO/A6t/Gz+CUYUu2zf1hUaaymVXB0=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/distribution v2.8.1+incompatible h1:Q50tZOPR6T/hjNsyc9g8/syEs6bk8XXApsHjKukMl68=
github.com/docker/distribution v2.8.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v20.10.17+incompatible h1:JYCuMrWaVNophQTOrMMoSwudOVEfcegoZZrleKc1xwE=
github.com/docker/docker v20.10.17+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
github.com/moby/sys/mount v0.3.3/go.mod h1:PBaEorSNTLG5t/+4EgukEQVlAvVEc6ZjTySwKdqp5K0=
github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vygl78=
github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 h1:dcztxKSvZ4Id8iPpHERQBbIJfabdt4wUm5qy3wOL2Zc=
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6/go.mod h1:E2VnQOmVuvZB6UYnnDB0qG5Nq/1tD9acaOpo6xmt0Kw=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 h1:rc3tiVYb5z54aKaDfakKn0dDjIyPpTtszkjuMzyt7ec=
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/runc v1.1.3 h1:vIXrkId+0/J2Ymu2m7VjGvbSlAId9XNRPhn2p4b+d8w=
github.com/opencontainers/runc v1.1.3/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/testcontainers/testcontainers-go v0.14.0 h1:h0D5GaYG9mhOWr2qHdEKDXpkce/VlvaYOCzTRi6UBi8=
github.com/testcontainers/testcontainers-go v0.14.0/go.mod h1:hSRGJ1G8Q5Bw2gXgPulJOLlEBaYJHeBSOkQM5JLG+JQ=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 h1:0BOZf6qNozI3pkN3fJLwNubheHJYHhMh91GRFOWWK08=
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
google.golang.org/grpc v1.54.0/go.mod h1:PUSEXI6iWghWaB6lXM4knEgpJNu2qUcKfDtNci3EC2g=
google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
165 changes: 165 additions & 0 deletions modules/kafka/docker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"log/slog"
"os"
"time"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/google/uuid"
"github.com/pkg/errors"
)

const (
client = "kafka_rw_test"
group = "test_group"
)

func main() {
brokers, _ := os.LookupEnv("KAFKA_BROKERS")
input_topic, _ := os.LookupEnv("KAFKA_TOPIC_IN")
output_topic, _ := os.LookupEnv("KAFKA_TOPIC_OUT")

log.Printf("Got brokers: %v\n", brokers)
log.Printf("Got input topic: %v\n", input_topic)
log.Printf("Got output topic: %v\n", output_topic)

consumer, err := InitNativeKafkaConsumer(client, brokers, group)
if err != nil {
log.Fatal(fmt.Errorf("failed to start kafka consumer: %w", err))
}

defer consumer.Close()

meta, err := consumer.GetMetadata(nil, true, 1000)
log.Printf("Metadata: %#+v, %v", meta, err)

producer, err := InitNativeKafkaProducer(client, brokers, "1", 30000)
if err != nil {
log.Fatal(fmt.Errorf("failed to start kafka producer: %w", err))
}

defer producer.Close()

err = consumer.SubscribeTopics([]string{input_topic}, nil)
if err != nil {
log.Fatal(fmt.Errorf("failed to subscribe to kafka topic: %w", err))
}

StartConsuming(context.TODO(), consumer, producer, output_topic)

fmt.Print("Finished\n")
}

func StartConsuming(ctx context.Context, consumer *kafka.Consumer, producer *kafka.Producer, outTopic string) {
log.Println("start consuming events")

run := true

for run {
msg, err := consumer.ReadMessage(time.Second * 1)
if err != nil {
kErr, ok := err.(kafka.Error)
if ok && kErr.IsTimeout() {
log.Println(fmt.Errorf("read timeout: %w", kErr))
continue
}

log.Println(fmt.Errorf("failed to read message: %w", err))
continue
}

log.Printf("got message: %s\n", string(msg.Value))

outputText := string(msg.Value) + "-from-internal"
output := MakeMsg(outTopic, string(msg.Key), outputText)

err = producer.Produce(&output, nil)

if err != nil {
log.Println(fmt.Errorf("failed to write message: %w", err))
continue
}

log.Printf("written: %s\n", outputText)
}
}

func MakeMsg(topic, key string, message interface{}) kafka.Message {
headers := []kafka.Header{
{
Key: "DateAdd", Value: []byte(time.Now().Format(time.RFC3339Nano)),
},
{
Key: "MessageId", Value: []byte(uuid.NewString()),
},
}

messageJson, _ := json.Marshal(message)

keyJson, _ := json.Marshal(key)

msg := kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Value: messageJson,
Key: keyJson,
Headers: headers,
}

return msg
}

func InitNativeKafkaProducer(
clientID string,
brokers string,
acks string,
bufMaxMsg int,
) (*kafka.Producer, error) {
cfg := kafka.ConfigMap{
"bootstrap.servers": brokers,
"client.id": clientID,
"acks": acks,
"queue.buffering.max.messages": bufMaxMsg,
"go.delivery.reports": false,
}

p, err := kafka.NewProducer(&cfg)
if err != nil {
slog.Error("new producer", err)
return nil, err
}

slog.Info(fmt.Sprintf("kafka producer %s created", clientID))

return p, nil
}

func InitNativeKafkaConsumer(
clientID string,
brokers string,
group string,
) (*kafka.Consumer, error) {
config := kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": group,
"client.id": clientID,
"auto.offset.reset": "earliest",
"auto.commit.interval.ms": 3000,
}

c, err := kafka.NewConsumer(&config)
if err != nil {
return nil, errors.Wrapf(err, "create kafka consumer")
}

slog.Info(fmt.Sprintf("kafka consumer %s created", clientID))

return c, nil
}
Loading