diff --git a/packages/indexer/.env.example b/packages/indexer/.env.example index 820439c896..1149f75e30 100644 --- a/packages/indexer/.env.example +++ b/packages/indexer/.env.example @@ -119,12 +119,15 @@ DISABLE_REALTIME_METADATA_REFRESH=0 # For kafka DO_KAFKA_WORK=0 -KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY=0 +KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY=1 KAFKA_CONSUMER_GROUP_ID= KAFKA_BROKERS= KAFKA_CLIENT_ID= KAFKA_MAX_BYTES_PER_PARTITION=1024 +KAFKA_SASL_USERNAME=xxx +KAFKA_SASL_PASSWORD=xxx + # For testing order websocket triggers DO_OLD_ORDER_WEBSOCKET_WORK=0 diff --git a/packages/indexer/docker-compose.yaml b/packages/indexer/docker-compose.yaml index 5aae70638f..1302fe9aaf 100644 --- a/packages/indexer/docker-compose.yaml +++ b/packages/indexer/docker-compose.yaml @@ -2,11 +2,13 @@ version: "3" services: postgres: - image: postgres:13.2 + image: debezium/postgres:14 command: postgres -N 2000 environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=password + - POSTGRES_DB=postgres + container_name: postgres networks: - local ports: @@ -62,6 +64,60 @@ services: networks: - local + kafka: + image: confluentinc/cp-kafka:7.4.4 + ports: + - 9092:9092 + - 29092:29092 + environment: + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' + KAFKA_PROCESS_ROLES: 'broker,controller' + KAFKA_BROKER_ID: 1 + KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092' + KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka:29093' + KAFKA_LISTENERS: 'CONTROLLER://:29093,PLAINTEXT://:9092,PLAINTEXT_HOST://:29092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' + KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' + CLUSTER_ID: '4L6g3nShT-eMCtK--X86sw' + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_ADVERTISED_HOST_NAME: kafka + container_name: kafka + networks: + - local + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + depends_on: + - kafka + ports: + - 8081:8080 + environment: + KAFKA_CLUSTERS_0_NAME: reservoir-indexer-local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 + networks: + - local + + kafka-connect: + container_name: kafka-connect + image: quay.io/debezium/connect:2.5 + depends_on: + - kafka + - postgres + ports: + - 8083:8083 + links: + - kafka + environment: + GROUP_ID: 1 + CONFIG_STORAGE_TOPIC: my_connect_configs + OFFSET_STORAGE_TOPIC: my_connect_offsets + STATUS_STORAGE_TOPIC: my_connect_statuses + BOOTSTRAP_SERVERS: kafka:9092 + ADVERTISED_HOST_NAME: kafka + networks: + - local + networks: local: driver: bridge diff --git a/packages/indexer/src/common/kafka-stream-producer.ts b/packages/indexer/src/common/kafka-stream-producer.ts index 6eec23d111..1bf33046c8 100644 --- a/packages/indexer/src/common/kafka-stream-producer.ts +++ b/packages/indexer/src/common/kafka-stream-producer.ts @@ -5,11 +5,11 @@ import { config } from "@/config/index"; const kafka = new Kafka({ clientId: config.kafkaStreamClientId, brokers: config.kafkaStreamBrokers, - ssl: { - rejectUnauthorized: false, - ca: config.kafkaStreamCertificateCa, - key: config.kafkaStreamCertificateKey, - cert: config.kafkaStreamCertificateCert, + ssl: false, + sasl: { + mechanism: "scram-sha-256", + username: config.kafkaStreamUsername, + password: config.kafkaStreamPassword, }, logLevel: logLevel.ERROR, }); diff --git a/packages/indexer/src/config/index.ts b/packages/indexer/src/config/index.ts index 4e9bb8c391..98af4a5c48 100644 --- a/packages/indexer/src/config/index.ts +++ b/packages/indexer/src/config/index.ts @@ -74,7 +74,8 @@ export const config = { kafkaStreamCertificateCa: String(process.env.KAFKA_STREAM_CERTIFICATE_CA), kafkaStreamCertificateKey: String(process.env.KAFKA_STREAM_CERTIFICATE_KEY), kafkaStreamCertificateCert: String(process.env.KAFKA_STREAM_CERTIFICATE_CERT), - + kafkaStreamUsername: String(process.env.KAFKA_SASL_USERNAME), + kafkaStreamPassword: String(process.env.KAFKA_SASL_PASSWORD), maxTokenSetSize: 100000, awsAccessKeyId: String(process.env.AWS_ACCESS_KEY_ID || process.env.FC_AWS_ACCESS_KEY_ID), diff --git a/packages/indexer/src/jobs/cdc/index.ts b/packages/indexer/src/jobs/cdc/index.ts index 45f0d8bb72..beed2e4ca8 100644 --- a/packages/indexer/src/jobs/cdc/index.ts +++ b/packages/indexer/src/jobs/cdc/index.ts @@ -15,6 +15,8 @@ export const consumer = kafka.consumer({ groupId: config.kafkaConsumerGroupId, maxBytesPerPartition: config.kafkaMaxBytesPerPartition || 1048576, // (default is 1MB) allowAutoTopicCreation: false, + sessionTimeout: 60000, + heartbeatInterval: 3000, }); export async function startKafkaProducer(): Promise { @@ -76,7 +78,7 @@ export async function startKafkaConsumer(): Promise { return; } - const event = JSON.parse(message.value!.toString()); + const event = JSON.parse(message.value!.toString()).payload; if (batch.topic.endsWith("-dead-letter")) { logger.info(