Skip to content

Commit

Permalink
Client async connector (#787)
Browse files Browse the repository at this point in the history
- switch terafoundation connector to use the async createClient instead
of getConnection and the sync create call and any references in the code
use the async method
  • Loading branch information
jsnoble authored May 22, 2024
1 parent 884b344 commit 162125e
Show file tree
Hide file tree
Showing 24 changed files with 134 additions and 127 deletions.
2 changes: 1 addition & 1 deletion asset/asset.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "kafka",
"description": "Kafka reader and writer support.",
"version": "4.1.0"
"version": "4.2.0"
}
2 changes: 1 addition & 1 deletion asset/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-assets",
"displayName": "Asset",
"version": "4.1.0",
"version": "4.2.0",
"private": true,
"description": "Teraslice asset for kafka operations",
"license": "MIT",
Expand Down
1 change: 0 additions & 1 deletion asset/src/_kafka_clients/consumer-client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type * as kafka from 'node-rdkafka';
import { } from 'terafoundation_kafka_connector';
import {
pDelay, toHumanTime, EncodingConfig,
isBoolean, isNotNil
Expand Down
26 changes: 9 additions & 17 deletions asset/src/kafka_dead_letter/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import {
WorkerContext,
ExecutionConfig,
ConnectionConfig,
OperationAPI,
DeadLetterAPIFn,
Expand All @@ -12,19 +10,16 @@ import { KafkaDeadLetterConfig } from './interfaces';
import { ProducerClient, ProduceMessage } from '../_kafka_clients';

export default class KafkaDeadLetter extends OperationAPI<KafkaDeadLetterConfig> {
producer: ProducerClient;
collector: Collector<ProduceMessage>;

constructor(
context: WorkerContext,
apiConfig: KafkaDeadLetterConfig,
executionConfig: ExecutionConfig
) {
super(context, apiConfig, executionConfig);
producer!: ProducerClient;
collector!: Collector<ProduceMessage>;

async initialize(): Promise<void> {
await super.initialize();
const logger = this.logger.child({ module: 'kafka-producer' });

this.producer = new ProducerClient(this.createClient(), {
const client = await this.createClient();

this.producer = new ProducerClient(client, {
logger,
topic: this.apiConfig.topic,
bufferSize: this.apiConfig.max_buffer_size,
Expand All @@ -34,10 +29,7 @@ export default class KafkaDeadLetter extends OperationAPI<KafkaDeadLetterConfig>
size: this.apiConfig.size,
wait: this.apiConfig.wait,
});
}

async initialize(): Promise<void> {
await super.initialize();
await this.producer.connect();
}

Expand Down Expand Up @@ -110,9 +102,9 @@ export default class KafkaDeadLetter extends OperationAPI<KafkaDeadLetterConfig>
return config as ConnectionConfig;
}

private createClient(): kafka.Producer {
private async createClient(): Promise<kafka.Producer> {
const config = this.clientConfig();
const connection = this.context.foundation.getConnection(config);
const connection = await this.context.apis.foundation.createClient(config);
return connection.client;
}
}
4 changes: 3 additions & 1 deletion asset/src/kafka_reader_api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ export default class KafkaReaderApi extends APIFactory<APIConsumer, KafkaReaderA
const validConfig = this.validateConfig(newConfig);
const clientConfig = this.clientConfig(validConfig);

const kafkaClient = this.context.foundation.getConnection(clientConfig).client;
const { client: kafkaClient } = await this.context.apis.foundation.createClient(
clientConfig
);
const tryFn = this.tryRecord.bind(this);
const client = new APIConsumer(kafkaClient, {
...validConfig,
Expand Down
5 changes: 2 additions & 3 deletions asset/src/kafka_sender_api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import {
isNumber,
isBoolean
} from '@terascope/job-components';
import * as kafka from 'node-rdkafka';
import { KafkaSenderConfig } from '../kafka_sender/interfaces';
import KafkaRouteSender from './sender';
import { KafkaSenderAPIConfig } from './interfaces';
Expand Down Expand Up @@ -82,9 +81,9 @@ export default class KafkaSenderApi extends APIFactory<KafkaRouteSender, KafkaSe
const validConfig = this.validateConfig(newConfig);
const clientConfig = this.clientConfig(validConfig);

const kafkaClient = this.context.foundation.getConnection(
const { client: kafkaClient } = await this.context.apis.foundation.createClient(
clientConfig
).client as kafka.Producer;
);

validConfig.tryFn = this.tryRecord.bind(this);

Expand Down
3 changes: 2 additions & 1 deletion jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ module.exports = {
},
ignoreDirectories: ['dist'],
availableExtensions: ['.js', '.ts']
}
},
testTimeout: 60 * 1000
};
10 changes: 5 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "kafka-asset-bundle",
"displayName": "Kafka Asset Bundle",
"version": "4.1.0",
"version": "4.2.0",
"private": true,
"description": "A bundle of Kafka operations and processors for Teraslice",
"repository": "[email protected]:terascope/kafka-assets.git",
Expand All @@ -24,9 +24,9 @@
"publish:changed": "./scripts/publish.sh",
"setup": "yarn && yarn build --force",
"test": "KAFKA_VERSION=3.2 ts-scripts test asset --",
"test:all": "ts-scripts test",
"test:debug": "ts-scripts test --debug asset --",
"test:watch": "ts-scripts test --watch asset --"
"test:all": "KAFKA_VERSION=3.2 ts-scripts test",
"test:debug": "KAFKA_VERSION=3.2 ts-scripts test --debug asset --",
"test:watch": "KAFKA_VERSION=3.2 ts-scripts test --watch asset --"
},
"dependencies": {
"node-gyp": "9.4.1"
Expand All @@ -42,7 +42,7 @@
"eslint": "^8.57.0",
"jest": "^29.6.2",
"jest-extended": "^4.0.2",
"terafoundation_kafka_connector": "^0.12.0",
"terafoundation_kafka_connector": "^0.13.0",
"teraslice-test-harness": "^0.30.0",
"ts-jest": "^29.1.2",
"typescript": "~5.2.2",
Expand Down
3 changes: 2 additions & 1 deletion packages/terafoundation_kafka_connector/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ module.exports = {
diagnostics: true,
pretty: true,
}
}
},
testTimeout: 60 * 1000
};
6 changes: 3 additions & 3 deletions packages/terafoundation_kafka_connector/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "terafoundation_kafka_connector",
"displayName": "Terafoundation Kafka Connector",
"version": "0.12.0",
"version": "0.13.0",
"description": "Terafoundation connector for Kafka producer and consumer clients.",
"homepage": "https://github.com/terascope/kafka-assets",
"repository": "[email protected]:terascope/kafka-assets.git",
Expand All @@ -17,8 +17,8 @@
"build:watch": "yarn build --watch",
"prepare": "yarn build",
"test": "KAFKA_VERSION=3.2 ts-scripts test . --",
"test:debug": "ts-scripts test --debug . --",
"test:watch": "ts-scripts test --watch . --"
"test:debug": "KAFKA_VERSION=3.2 ts-scripts test --debug . --",
"test:watch": "KAFKA_VERSION=3.2 ts-scripts test --watch . --"
},
"dependencies": {
"node-rdkafka": "^3.0.1"
Expand Down
34 changes: 21 additions & 13 deletions packages/terafoundation_kafka_connector/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ import {
* rdkafka settings: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
*/

class KafakConnector {
create(
class KafkaConnector {
async createClient(
config: KafkaConnectorConfig,
logger: Logger,
settings: KafkaConsumerSettings
): KafkaConsumerResult;
create(
): Promise<KafkaConsumerResult>;
async createClient(
config: KafkaConnectorConfig,
logger: Logger,
settings: KafkaProducerSettings
): KafkaProducerResult;
create(
): Promise<KafkaProducerResult>;
async createClient(
config: KafkaConnectorConfig,
logger: Logger,
settings: KafkaConsumerSettings|KafkaProducerSettings
): KafkaConsumerResult|KafkaProducerResult {
): Promise<KafkaConsumerResult|KafkaProducerResult> {
const clientType = getClientType(settings.options && settings.options.type);

if (isConsumerSettings(settings)) {
Expand All @@ -53,9 +53,10 @@ class KafakConnector {
logger.info(`Creating a Kafka consumer for group: ${group}`);
const client = new KafkaConsumer(clientOptions, topicOptions);

this._autoconnect(client, logger, settings.autoconnect);
await this._autoconnect(client, logger, settings.autoconnect);
return {
client,
logger
};
}

Expand All @@ -69,27 +70,32 @@ class KafakConnector {
const client = new Producer(clientOptions, topicOptions);
client.setPollInterval(pollInterval);

this._autoconnect(client, logger, settings.autoconnect);
await this._autoconnect(client, logger, settings.autoconnect);
return {
client,
logger
};
}

throw new Error(`Unsupport client type of ${clientType}`);
throw new Error(`Unsupported client type of ${clientType}`);
}

create() {
throw new Error('kafka does not support the deprecated "create" method');
}

config_schema() {
return schema;
}

private _autoconnect(client: Producer|KafkaConsumer, logger: Logger, autoconnect = true) {
private async _autoconnect(client: Producer|KafkaConsumer, logger: Logger, autoconnect = true) {
if (!autoconnect) return;

// Default to autoconnecting but can be disabled.
client.connect({}, (err) => {
if (err) {
logger.error('Error connecting to Kafka', err);
throw err;
throw (err);
} else {
logger.info('Kafka connection initialized.');
}
Expand Down Expand Up @@ -172,4 +178,6 @@ function isProducerSettings(settings: any): settings is KafkaProducerSettings {
return getClientType(settings.options.type) === 'producer';
}

export = new KafakConnector();
const connector = new KafkaConnector();

export default connector;
3 changes: 3 additions & 0 deletions packages/terafoundation_kafka_connector/src/interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Logger } from '@terascope/job-components';
import { KafkaConsumer, Producer } from 'node-rdkafka';

export interface KafkaConnectorConfig {
Expand Down Expand Up @@ -50,8 +51,10 @@ export type ClientType = 'producer'|'consumer';

export interface KafkaConsumerResult {
client: KafkaConsumer;
logger: Logger
}

export interface KafkaProducerResult {
client: Producer;
logger: Logger
}
Loading

0 comments on commit 162125e

Please sign in to comment.