From 43a9ddddd9490eb2f07dabf453b8e6eeb8890e0b Mon Sep 17 00:00:00 2001 From: HungLV Date: Fri, 19 Jul 2024 14:51:01 +0700 Subject: [PATCH 1/3] feat: kafka config --- packages/indexer/docker-compose.yaml | 74 ++++++++++++++++++- .../src/common/kafka-stream-producer.ts | 12 +-- packages/indexer/src/jobs/cdc/index.ts | 4 +- 3 files changed, 82 insertions(+), 8 deletions(-) diff --git a/packages/indexer/docker-compose.yaml b/packages/indexer/docker-compose.yaml index 5aae70638f..5f8f85559f 100644 --- a/packages/indexer/docker-compose.yaml +++ b/packages/indexer/docker-compose.yaml @@ -2,11 +2,13 @@ version: "3" services: postgres: - image: postgres:13.2 + image: debezium/postgres:14 command: postgres -N 2000 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=password + - POSTGRES_DB=postgres + container_name: postgres networks: - local ports: @@ -62,6 +64,76 @@ services: networks: - local + zookeeper: + image: confluentinc/cp-zookeeper:7.4.4 + healthcheck: + test: echo srvr | nc zookeeper 2181 || exit 1 + retries: 20 + interval: 10s + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181 + container_name: zookeeper + networks: + - local + + kafka: + image: confluentinc/cp-kafka:7.4.4 + depends_on: + zookeeper: + condition: service_healthy + ports: + - 9092:9092 + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_ADVERTISED_HOST_NAME: kafka + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + container_name: kafka + networks: + - local + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + depends_on: + - kafka + ports: + - 8081:8080 + environment: + KAFKA_CLUSTERS_0_NAME: reservoir-indexer-local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + networks: + - local + + kafka-connect: + container_name: kafka-connect + image: quay.io/debezium/connect:2.5 + depends_on: + - kafka + - postgres + - zookeeper + ports: + - 8083:8083 + links: + - kafka + - zookeeper + environment: + GROUP_ID: 1 + CONFIG_STORAGE_TOPIC: my_connect_configs + OFFSET_STORAGE_TOPIC: my_connect_offsets + STATUS_STORAGE_TOPIC: my_connect_statuses + BOOTSTRAP_SERVERS: kafka:9092 + ADVERTISED_HOST_NAME: kafka + networks: + - local + networks: local: driver: bridge diff --git a/packages/indexer/src/common/kafka-stream-producer.ts b/packages/indexer/src/common/kafka-stream-producer.ts index 6eec23d111..a502730651 100644 --- a/packages/indexer/src/common/kafka-stream-producer.ts +++ b/packages/indexer/src/common/kafka-stream-producer.ts @@ -5,12 +5,12 @@ import { config } from "@/config/index"; const kafka = new Kafka({ clientId: config.kafkaStreamClientId, brokers: config.kafkaStreamBrokers, - ssl: { - rejectUnauthorized: false, - ca: config.kafkaStreamCertificateCa, - key: config.kafkaStreamCertificateKey, - cert: config.kafkaStreamCertificateCert, - }, + // ssl: { + // rejectUnauthorized: false, + // ca: config.kafkaStreamCertificateCa, + // key: config.kafkaStreamCertificateKey, + // cert: config.kafkaStreamCertificateCert, + // }, logLevel: logLevel.ERROR, }); diff --git a/packages/indexer/src/jobs/cdc/index.ts b/packages/indexer/src/jobs/cdc/index.ts index 45f0d8bb72..beed2e4ca8 100644 --- a/packages/indexer/src/jobs/cdc/index.ts +++ b/packages/indexer/src/jobs/cdc/index.ts @@ -15,6 +15,8 @@ export const consumer = kafka.consumer({ groupId: config.kafkaConsumerGroupId, maxBytesPerPartition: config.kafkaMaxBytesPerPartition || 1048576, // (default is 1MB) allowAutoTopicCreation: false, + sessionTimeout: 60000, + heartbeatInterval: 3000, }); export async function startKafkaProducer(): Promise { @@ -76,7 +78,7 @@ export async function startKafkaConsumer(): Promise { return; } - const event = JSON.parse(message.value!.toString()); + const event = JSON.parse(message.value!.toString()).payload; if (batch.topic.endsWith("-dead-letter")) { logger.info( From e229b114c336a4ff4c8919141fe922ea2135e342 Mon Sep 17 00:00:00 2001 From: HungLV Date: Mon, 22 Jul 2024 17:17:12 +0700 Subject: [PATCH 2/3] fix: kraft mode --- packages/indexer/docker-compose.yaml | 34 ++++++++-------------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/packages/indexer/docker-compose.yaml b/packages/indexer/docker-compose.yaml index 5f8f85559f..1302fe9aaf 100644 --- a/packages/indexer/docker-compose.yaml +++ b/packages/indexer/docker-compose.yaml @@ -64,37 +64,23 @@ services: networks: - local - zookeeper: - image: confluentinc/cp-zookeeper:7.4.4 - healthcheck: - test: echo srvr | nc zookeeper 2181 || exit 1 - retries: 20 - interval: 10s - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - ports: - - 2181:2181 - container_name: zookeeper - networks: - - local - kafka: image: confluentinc/cp-kafka:7.4.4 - depends_on: - zookeeper: - condition: service_healthy ports: - 9092:9092 - 29092:29092 environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_ADVERTISED_HOST_NAME: kafka - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT://:9092,PLAINTEXT_HOST://:29092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_ADVERTISED_HOST_NAME: kafka container_name: kafka networks: - local @@ -118,12 +104,10 @@ services: depends_on: - kafka - postgres - - zookeeper ports: - 8083:8083 links: - kafka - - zookeeper environment: GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs From ac01a32169abd067dd09e0a9a5ceeac50e4dff41 Mon Sep 17 00:00:00 2001 From: HungLV Date: Tue, 23 Jul 2024 10:07:13 +0700 Subject: [PATCH 3/3] fix: sasl kafka config --- packages/indexer/.env.example | 5 ++++- packages/indexer/src/common/kafka-stream-producer.ts | 12 ++++++------ packages/indexer/src/config/index.ts | 3 ++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/packages/indexer/.env.example b/packages/indexer/.env.example index 820439c896..1149f75e30 100644 --- a/packages/indexer/.env.example +++ b/packages/indexer/.env.example @@ -119,12 +119,15 @@ DISABLE_REALTIME_METADATA_REFRESH=0 # For kafka DO_KAFKA_WORK=0 -KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY=0 +KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY=1 KAFKA_CONSUMER_GROUP_ID= KAFKA_BROKERS= KAFKA_CLIENT_ID= KAFKA_MAX_BYTES_PER_PARTITION=1024 +KAFKA_SASL_USERNAME=xxx +KAFKA_SASL_PASSWORD=xxx + # For testing order websocket triggers DO_OLD_ORDER_WEBSOCKET_WORK=0 diff --git a/packages/indexer/src/common/kafka-stream-producer.ts b/packages/indexer/src/common/kafka-stream-producer.ts index a502730651..1bf33046c8 100644 --- a/packages/indexer/src/common/kafka-stream-producer.ts +++ b/packages/indexer/src/common/kafka-stream-producer.ts @@ -5,12 +5,12 @@ import { config } from "@/config/index"; const kafka = new Kafka({ clientId: config.kafkaStreamClientId, brokers: config.kafkaStreamBrokers, - // ssl: { - // rejectUnauthorized: false, - // ca: config.kafkaStreamCertificateCa, - // key: config.kafkaStreamCertificateKey, - // cert: config.kafkaStreamCertificateCert, - // }, + ssl: false, + sasl: { + mechanism: "scram-sha-256", + username: config.kafkaStreamUsername, + password: config.kafkaStreamPassword, + }, logLevel: logLevel.ERROR, }); diff --git a/packages/indexer/src/config/index.ts b/packages/indexer/src/config/index.ts index 4e9bb8c391..98af4a5c48 100644 --- a/packages/indexer/src/config/index.ts +++ b/packages/indexer/src/config/index.ts @@ -74,7 +74,8 @@ export const config = { kafkaStreamCertificateCa: String(process.env.KAFKA_STREAM_CERTIFICATE_CA), kafkaStreamCertificateKey: String(process.env.KAFKA_STREAM_CERTIFICATE_KEY), kafkaStreamCertificateCert: String(process.env.KAFKA_STREAM_CERTIFICATE_CERT), - + kafkaStreamUsername: String(process.env.KAFKA_SASL_USERNAME), + kafkaStreamPassword: String(process.env.KAFKA_SASL_PASSWORD), maxTokenSetSize: 100000, awsAccessKeyId: String(process.env.AWS_ACCESS_KEY_ID || process.env.FC_AWS_ACCESS_KEY_ID),