-
I've got a Strimzi Kafka cluster running in Openshift 4.11. It's configured with TLS encryption and authentication and ACL authorisation for the topic I want to create a KafkaConnect with a apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: connect-user
labels:
strimzi.io/cluster: kafka-cluster
spec:
authentication:
type: tls
authorization:
type: simple
acls:
# Kafka Connects internal topics used to store configuration, offsets or status
- resource:
type: group
name: connect-cluster
operations:
- Read
- resource:
type: topic
name: connect-cluster-configs
operations:
- Read
- Describe
- Write
- Create
- resource:
type: topic
name: connect-cluster-status
operations:
- Read
- Describe
- Write
- Create
- resource:
type: topic
name: connect-cluster-offsets
operations:
- Read
- Describe
- Write
- Create
# Additional topics and groups used by connectors
# Change to match the topics used by your connectors
- resource:
type: group
name: my-group
operations:
- Read
- resource:
type: topic
name: my-topic
operations:
- Read
- Describe
- Write
- Create
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-cluster
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true"
spec:
version: 3.3.1
replicas: 1
bootstrapServers: kafka-cluster-kafka-bootstrap.kafka-strimzi:9093
tls:
trustedCertificates:
- secretName: kafka-cluster-cluster-ca-cert
certificate: ca.crt
authentication:
type: tls
certificateAndKey:
secretName: connect-user
certificate: user.crt
key: user.key
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
# This image will last only for 24 hours and might be overwritten by other users
# Strimzi will use this tag to push the image. But it will use the digest to pull
# the container image to make sure it pulls exactly the image we just built. So
# it should not happen that you pull someone else's container image. However, we
# recommend changing this to your own container registry or using a different
# image name for any other than demo purposes.
image: docker.io/ctomas65/strimzi-connect-file-example-3.3.1
pushSecret: my-registry-credentials
plugins:
- name: kafka-connect-file
artifacts:
- type: maven
group: org.apache.kafka
artifact: connect-file
version: 3.3.1 A Build resource is created to build the Kafka Connect image which is pushed to the repository with success. Eventually, a Kafka Connect pod is created from that image. $ oc describe kafkaconnect -n kafka-strimzi
Name: kafka-connect-cluster
Namespace: kafka-strimzi
Labels: <none>
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: KafkaConnect
Metadata:
Creation Timestamp: 2023-01-03T11:47:22Z
Generation: 1
Managed Fields:
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:spec:
.:
f:authentication:
.:
f:certificateAndKey:
.:
f:certificate:
f:key:
f:secretName:
f:type:
f:bootstrapServers:
f:build:
.:
f:output:
.:
f:image:
f:pushSecret:
f:type:
f:plugins:
f:config:
.:
f:config.storage.replication.factor:
f:config.storage.topic:
f:group.id:
f:offset.storage.replication.factor:
f:offset.storage.topic:
f:status.storage.replication.factor:
f:status.storage.topic:
f:replicas:
f:tls:
.:
f:trustedCertificates:
f:version:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2023-01-03T11:47:22Z
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:status:
.:
f:conditions:
f:labelSelector:
f:observedGeneration:
f:replicas:
f:url:
Manager: strimzi-cluster-operator
Operation: Update
Subresource: status
Time: 2023-01-03T11:49:54Z
Resource Version: 14098229
UID: 575ddbfe-fb27-431d-b59c-4d3ae969beb4
Spec:
Authentication:
Certificate And Key:
Certificate: user.crt
Key: user.key
Secret Name: connect-user
Type: tls
Bootstrap Servers: kafka-cluster-kafka-bootstrap.kafka-strimzi:9093
Build:
Output:
Image: docker.io/ctomas65/strimzi-connect-file-example-3.3.1:latest
Push Secret: my-registry-credentials
Type: docker
Plugins:
Artifacts:
Artifact: connect-file
Group: org.apache.kafka
Type: maven
Version: 3.3.1
Name: kafka-connect-file
Config:
config.storage.replication.factor: -1
config.storage.topic: connect-cluster-configs
group.id: connect-cluster
offset.storage.replication.factor: -1
offset.storage.topic: connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: connect-cluster-status
Replicas: 1
Tls:
Trusted Certificates:
Certificate: ca.crt
Secret Name: kafka-cluster-cluster-ca-cert
Version: 3.3.1
Status:
Conditions:
Last Transition Time: 2023-01-03T11:49:54.508079Z
Status: True
Type: Ready
Label Selector: strimzi.io/cluster=kafka-connect-cluster,strimzi.io/name=kafka-connect-cluster-connect,strimzi.io/kind=KafkaConnect
Observed Generation: 1
Replicas: 1
URL: http://kafka-connect-cluster-connect-api.kafka-strimzi.svc:8083
Events: <none> I will then create the connector instances for the source and sink file stream connectors in the same namespace: ---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: file-source-connector
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: kafka-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSourceConnector
tasksMax: 2
config:
file: "/opt/kafka/LICENSE"
topic: my-topic
---
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: file-sink-connector
labels:
# The strimzi.io/cluster label identifies the KafkaConnect instance
# in which to create this connector. That KafkaConnect instance
# must have the strimzi.io/use-connector-resources annotation
# set to true.
strimzi.io/cluster: kafka-connect-cluster
spec:
class: org.apache.kafka.connect.file.FileStreamSinkConnector
tasksMax: 2
config:
file: "/tmp/my-file"
topic: my-topic
EOF The KafkaConnector resources are created: $ oc get kafkaconnectors --selector strimzi.io/cluster=kafka-connect-cluster -n kafka-strimzi
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY
file-sink-connector kafka-connect-cluster org.apache.kafka.connect.file.FileStreamSinkConnector 2
file-source-connector kafka-connect-cluster org.apache.kafka.connect.file.FileStreamSourceConnector 2 $ oc describe kafkaconnectors -n kafka-strimzi
Name: file-sink-connector
Namespace: kafka-strimzi
Labels: strimzi.io/cluster=kafka-connect-cluster
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: KafkaConnector
Metadata:
Creation Timestamp: 2023-01-03T12:38:44Z
Generation: 1
Managed Fields:
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:labels:
.:
f:strimzi.io/cluster:
f:spec:
.:
f:class:
f:config:
.:
f:file:
f:topics:
f:tasksMax:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2023-01-03T12:38:44Z
Resource Version: 14126627
UID: 4b0ca8f7-5f55-409b-b30a-b2bd0aea8105
Spec:
Class: org.apache.kafka.connect.file.FileStreamSinkConnector
Config:
File: /tmp/my-file
Topics:
my-topic
Tasks Max: 2
Events: <none>
Name: file-source-connector
Namespace: kafka-strimzi
Labels: strimzi.io/cluster=kafka-connect-cluster
Annotations: <none>
API Version: kafka.strimzi.io/v1beta2
Kind: KafkaConnector
Metadata:
Creation Timestamp: 2023-01-03T12:38:29Z
Generation: 1
Managed Fields:
API Version: kafka.strimzi.io/v1beta2
Fields Type: FieldsV1
fieldsV1:
f:metadata:
f:annotations:
.:
f:kubectl.kubernetes.io/last-applied-configuration:
f:labels:
.:
f:strimzi.io/cluster:
f:spec:
.:
f:class:
f:config:
.:
f:file:
f:topic:
f:tasksMax:
Manager: kubectl-client-side-apply
Operation: Update
Time: 2023-01-03T12:38:29Z
Resource Version: 14126481
UID: b07f3223-0b27-4fcc-905d-0577bcac955f
Spec:
Class: org.apache.kafka.connect.file.FileStreamSourceConnector
Config:
File: /opt/kafka/LICENSE
Topic: my-topic
Tasks Max: 2
Events: <none> It is supposed that the data in the The sink connector is not creating the The source connector is not working either: I have run a consumer in the consumer group $ oc annotate KafkaConnector file-source-connector strimzi.io/restart=true -n kafka-strimzi The KafkaConnect logs does not show any activity when the KafkaConnectors are created or updated with the restart annotation. Can someone tell me how to get the KafkaConnectors to work? Kind regards. PD: Here is the log file of the KafkaConnect pod. kafka-connect-cluster-connect-5b8b5c79f4-4pkwj-kafka-connect-cluster-connect.log |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Did you enable the Connector Operator: https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/connect/kafka-connect-build.yaml#L5-L9 ... without it, you cannot really use the |
Beta Was this translation helpful? Give feedback.
-
Ok. I know what was happening. The annotation to enable the connector operator was uncommented in the KafkaConnect manifest file, but it was bad indented. Wrong: metadata:
name: kafka-connect-cluster
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true" Right: metadata:
name: kafka-connect-cluster
annotations:
# use-connector-resources configures this KafkaConnect
# to use KafkaConnector resources to avoid
# needing to call the Connect REST API directly
strimzi.io/use-connector-resources: "true" |
Beta Was this translation helpful? Give feedback.
Did you enable the Connector Operator: https://github.com/strimzi/strimzi-kafka-operator/blob/main/examples/connect/kafka-connect-build.yaml#L5-L9 ... without it, you cannot really use the
KafkaConnector
resources.