-
Hi, Note that I'm very new to Kafka and Strimzi Operator, so sorry if this seems obvious. I'm wondering how permissions works here. I would expect that my KafkaConnect connects using Connector spec:
spec:
autoRestart:
enabled: true
class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
config:
camel.component.aws2-s3.accessKey: ABC
camel.component.aws2-s3.autoCreateBucket: false
camel.component.aws2-s3.destinationBucket: swift
camel.component.aws2-s3.destinationBucketPrefix: raw/
camel.component.aws2-s3.ignoreBody: true
camel.component.aws2-s3.includeFolders: true
camel.component.aws2-s3.region: custom
camel.component.aws2-s3.secretKey: xxxx
camel.source.camelMessageHeaderKey: CamelAwsS3Key
camel.source.endpoint.delay: 5000
camel.source.endpoint.deleteAfterRead: true
camel.source.endpoint.maxConnections: 1
camel.source.endpoint.moveAfterRead: true
camel.source.endpoint.overrideEndpoint: "true"
camel.source.endpoint.prefix: incoming
camel.source.endpoint.trustAllCertificates: true
camel.source.endpoint.uriEndpointOverride: https://minio
camel.source.maxPollDuration: "1000"
camel.source.path.bucketNameOrArn: swift
key.converter: org.apache.kafka.connect.storage.StringConverter
topics: my-topic
pause: false
tasksMax: 2 From KafkaConnector details I can see the following error message: connectorStatus:
connector:
state: RUNNING
worker_id: kafka-connect-1.kafka-connect.kafka.svc:8083
name: s3-sourceconnector
tasks:
- id: 0
state: FAILED
trace: org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception
from producer send callback\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:312)\n\tat
org.apache.kafka.connect.runtime.WorkerSourceTask.prepareToSendRecord(WorkerSourceTask.java:126)\n\tat
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:395)\n\tat
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:354)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:244)\n\tat
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:72)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat
java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.kafka.common.errors.TopicAuthorizationException:
Not authorized to access topics: [my-topic]\n
worker_id: kafka-connect-1.kafka-connect.kafka.svc:8083 I can catch that in KafkaConnect's logs:
❯ kubectl get kafkaConnect kafka -o yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
annotations:
helm.sh/hook-weight: "10"
strimzi.io/use-connector-resources: "true"
labels:
app: kafka
name: kafka
spec:
authentication:
certificateAndKey:
certificate: user.crt
key: user.key
secretName: kafka-connect-user
type: tls
bootstrapServers: kafka-kafka-bootstrap:9093
config:
config.storage.replication.factor: -1
config.storage.topic: kafka-connect-cluster-configs
group.id: kafka-connect-cluster
internal.key.converter: org.apache.kafka.connect.json.JsonConverter
internal.value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
offset.storage.replication.factor: -1
offset.storage.topic: kafka-connect-cluster-offsets
status.storage.replication.factor: -1
status.storage.topic: kafka-connect-cluster-status
value.converter.schemas.enable: false
image: strimzi-kafka-connect:0.0.5
livenessProbe:
initialDelaySeconds: 120
timeoutSeconds: 10
logging:
loggers:
connect.root.logger.level: INFO
log4j.logger.io.confluent.connect.jdbc: DEBUG
log4j.logger.org.apache.kafka.connect.runtime.WorkerSinkTask: INFO
log4j.logger.org.apache.kafka.connect.runtime.WorkerSourceTask: DEBUG
type: inline
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: metrics-config.yml
name: connect-metrics
readinessProbe:
initialDelaySeconds: 120
timeoutSeconds: 10
replicas: 2
resources:
limits:
cpu: 1
memory: 4Gi
requests:
cpu: 200m
memory: 512Mi
template:
pod:
securityContext:
fsGroup: 3000
runAsGroup: 2000
runAsUser: 10001
tls:
trustedCertificates:
- certificate: ca.crt
secretName: kafka-cluster-ca-cert
version: 3.5.1
status:
conditions:
- lastTransitionTime: "2024-08-22T16:03:58.446156640Z"
status: "True"
type: Ready
connectorPlugins:
- class: com.mongodb.kafka.connect.MongoSinkConnector
type: sink
version: 1.10.0
- class: com.snowflake.kafka.connector.SnowflakeSinkConnector
type: sink
version: 1.5.5
- class: io.confluent.connect.jdbc.JdbcSinkConnector
type: sink
version: 10.2.2
- class: io.confluent.connect.s3.S3SinkConnector
type: sink
version: 10.5.4
- class: org.apache.camel.kafkaconnector.CamelSinkConnector
type: sink
version: 0.11.5
- class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SinkConnector
type: sink
version: 0.11.5
- class: com.mongodb.kafka.connect.MongoSourceConnector
type: source
version: 1.10.0
- class: io.confluent.connect.jdbc.JdbcSourceConnector
type: source
version: 10.2.2
- class: io.confluent.connect.storage.tools.SchemaSourceConnector
type: source
version: 3.3.2
- class: org.apache.camel.kafkaconnector.CamelSourceConnector
type: source
version: 0.11.5
- class: org.apache.camel.kafkaconnector.aws2s3.CamelAws2s3SourceConnector
type: source
version: 0.11.5
- class: org.apache.kafka.connect.mirror.MirrorCheckpointConnector
type: source
version: 3.3.2
- class: org.apache.kafka.connect.mirror.MirrorHeartbeatConnector
type: source
version: 3.3.2
- class: org.apache.kafka.connect.mirror.MirrorSourceConnector
type: source
version: 3.3.2
labelSelector: strimzi.io/cluster=kafka,strimzi.io/name=kafka-connect,strimzi.io/kind=KafkaConnect
observedGeneration: 1
replicas: 2
url: http://kafka-connect-api.kafka.svc:8083 ❯ kubectl get kafka kafka -o yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
annotations:
helm.sh/hook-weight: "0"
name: kafka
spec:
clusterCa:
generateCertificateAuthority: false
entityOperator:
template:
pod:
securityContext:
fsGroup: 3000
runAsGroup: 2000
runAsUser: 10001
topicOperator:
reconciliationIntervalSeconds: 90
resources:
limits:
cpu: "2"
memory: 2Gi
requests:
cpu: 200m
memory: 512Mi
userOperator:
reconciliationIntervalSeconds: 120
kafka:
authorization:
superUsers:
- kafka-super-user
type: simple
config:
auto.create.topics.enable: true
auto.leader.rebalance.enable: true
default.replication.factor: 2
group.initial.rebalance.delay.ms: 0
inter.broker.protocol.version: 3.3
log.cleaner.enable: true
log.cleaner.threads: 6
log.retention.bytes: 1073741824
log.retention.check.interval.ms: 300000
log.retention.hours: 48
log.segment.bytes: 1073741824
min.insync.replicas: 2
num.io.threads: 8
num.network.threads: 12
num.partitions: 1
num.recovery.threads.per.data.dir: 1
num.replica.fetchers: 3
offsets.topic.replication.factor: 3
socket.receive.buffer.bytes: 102400
socket.request.max.bytes: 104857600
socket.send.buffer.bytes: 102400
transaction.state.log.min.isr: 2
transaction.state.log.replication.factor: 3
zookeeper.connection.timeout.ms: 6000
listeners:
- configuration:
brokers:
- advertisedHost: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9095
broker: 0
- advertisedHost: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9095
broker: 1
- advertisedHost: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9095
broker: 2
name: plain
port: 9095
tls: false
type: internal
- authentication:
type: scram-sha-512
configuration:
brokers:
- advertisedHost: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9096
broker: 0
- advertisedHost: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9096
broker: 1
- advertisedHost: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9096
broker: 2
name: plainscram
port: 9096
tls: false
type: internal
- authentication:
sasl: true
type: scram-sha-512
configuration:
brokers:
- advertisedHost: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9092
broker: 0
- advertisedHost: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9092
broker: 1
- advertisedHost: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9092
broker: 2
name: saslscram
port: 9092
tls: false
type: internal
- authentication:
type: tls
configuration:
brokers:
- advertisedHost: kafka-kafka-0.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9093
broker: 0
- advertisedHost: kafka-kafka-1.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9093
broker: 1
- advertisedHost: kafka-kafka-2.kafka-kafka-brokers.kafka.svc.cluster.local
advertisedPort: 9093
broker: 2
name: tls
port: 9093
tls: true
type: internal
- configuration:
brokers:
- advertisedHost: b0-kafka.example.com
advertisedPort: 10000
broker: 0
- advertisedHost: b1-kafka.example.com
advertisedPort: 10000
broker: 1
- advertisedHost: b2-kafka.example.com
advertisedPort: 10000
broker: 2
name: ingress
port: 9094
tls: false
type: internal
livenessProbe:
initialDelaySeconds: 120
timeoutSeconds: 10
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: kafka-metrics-config.yml
name: kafka-metrics
readinessProbe:
initialDelaySeconds: 120
timeoutSeconds: 10
replicas: 3
resources:
limits:
cpu: "4"
memory: 16Gi
requests:
cpu: "2"
memory: 8Gi
storage:
class: default
deleteClaim: false
size: 10Gi
type: persistent-claim
template:
pod:
securityContext:
fsGroup: 3000
runAsGroup: 2000
runAsUser: 10001
topologySpreadConstraints:
- labelSelector:
matchLabels:
app.kubernetes.io/name: kafka
maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
version: 3.5.1
zookeeper:
livenessProbe:
initialDelaySeconds: 120
timeoutSeconds: 10
metricsConfig:
type: jmxPrometheusExporter
valueFrom:
configMapKeyRef:
key: zookeeper-metrics-config.yml
name: kafka-metrics
readinessProbe:
initialDelaySeconds: 120
timeoutSeconds: 10
replicas: 3
resources:
limits:
cpu: 1
memory: 2Gi
storage:
class: default
deleteClaim: false
size: 5Gi
type: persistent-claim
template:
pod:
securityContext:
fsGroup: 3000
runAsGroup: 2000
runAsUser: 10001
topologySpreadConstraints:
- labelSelector:
matchLabels:
app.kubernetes.io/name: zookeeper
maxSkew: 1
topologyKey: kubernetes.io/hostname
whenUnsatisfiable: DoNotSchedule
status:
clusterId: cvYGeX0xSbEp24YSeEyEmv
conditions:
- lastTransitionTime: "2024-08-23T12:46:28.606690911Z"
message: inter.broker.protocol.version does not match the Kafka cluster version,
which suggests that an upgrade is incomplete.
reason: KafkaInterBrokerProtocolVersion
status: "True"
type: Warning
- lastTransitionTime: "2024-08-23T12:54:52.835128258Z"
status: "True"
type: Ready
- lastTransitionTime: "2024-08-23T12:46:25.113572130Z"
message: 'Resource Kafka(kafka/kafka) contains
object at path spec.kafka.listeners.auth with an unknown property: sasl'
reason: UnknownFields
status: "True"
type: Warning
kafkaVersion: 3.5.1
listeners:
- addresses:
- host: kafka-kafka-bootstrap.kafka.svc
port: 9095
bootstrapServers: kafka-kafka-bootstrap.kafka.svc:9095
name: plain
type: plain
- addresses:
- host: kafka-kafka-bootstrap.kafka.svc
port: 9096
bootstrapServers: kafka-kafka-bootstrap.kafka.svc:9096
name: plainscram
type: plainscram
- addresses:
- host: kafka-kafka-bootstrap.kafka.svc
port: 9092
bootstrapServers: kafka-kafka-bootstrap.kafka.svc:9092
name: saslscram
type: saslscram
- addresses:
- host: kafka-kafka-bootstrap.kafka.svc
port: 9093
bootstrapServers: kafka-kafka-bootstrap.kafka.svc:9093
certificates:
- |
-----BEGIN CERTIFICATE-----
MIIFLTCCAxWgAwIBAgIUC5X/8zlMVnx/IFxdwfQQ0+BlIhYwDQYJKoZIhvcNAQEN
...
-----END CERTIFICATE-----
name: tls
type: tls
- addresses:
- host: kafka-kafka-bootstrap.kafka.svc
port: 9094
bootstrapServers: kafka-kafka-bootstrap.kafka.svc:9094
name: ingress
type: ingress
observedGeneration: 2
operatorLastSuccessfulVersion: 0.37.0 |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
You are missing some ACL permissions. So you need to look at the user you use in the Kafka Connect cluster and check its ACLs and fix them. How to do that depends on how you manage the user. It might be just fixing the KafkaUser resource or it might be something else. |
Beta Was this translation helpful? Give feedback.
You are missing some ACL permissions. So you need to look at the user you use in the Kafka Connect cluster and check its ACLs and fix them. How to do that depends on how you manage the user. It might be just fixing the KafkaUser resource or it might be something else.