diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index 1c085d89..515909cb 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -1062,6 +1062,30 @@ class Consumer { // return m ?? null; } + /** + * Store offsets for the given topic partitions. + * + * Stored offsets will be commited automatically at a later point if autoCommit is enabled. + * Otherwise, they will be committed when commitOffsets is called without arguments. + * + * enable.auto.offset.store must be set to false to use this API. + * @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions + */ + storeOffsets(topicPartitions) { + if (this.#state !== ConsumerState.CONNECTED) { + throw new error.KafkaJSError('Store can only be called while connected.', { code: error.ErrorCodes.ERR__STATE }); + } + + if (!this.#userManagedStores) { + throw new error.KafkaJSError( + 'Store can only be called when enable.auto.offset.store is explicitly set to false.', { code: error.ErrorCodes.ERR__INVALID_ARG }); + } + + const topicPartitionsRdKafka = topicPartitions.map( + topicPartitionOffsetToRdKafka); + this.#internalClient.offsetsStore(topicPartitionsRdKafka); + } + async #commitOffsetsUntilNoStateErr(offsetsToCommit) { let err = { code: error.ErrorCodes.ERR_NO_ERROR }; do { diff --git a/test/promisified/consumer/consumerCacheTests.spec.js b/test/promisified/consumer/consumerCacheTests.spec.js index 56033c60..a8791079 100644 --- a/test/promisified/consumer/consumerCacheTests.spec.js +++ b/test/promisified/consumer/consumerCacheTests.spec.js @@ -1,6 +1,5 @@ jest.setTimeout(30000) -const { is } = require('bluebird'); const { secureRandom, createTopic, diff --git a/test/promisified/consumer/store.spec.js b/test/promisified/consumer/store.spec.js new file mode 100644 index 00000000..3ee056d9 --- /dev/null +++ b/test/promisified/consumer/store.spec.js @@ -0,0 +1,154 @@ +jest.setTimeout(30000) + +const { + secureRandom, + createTopic, + waitFor, + createProducer, + createConsumer, + sleep, +} = require('../testhelpers'); +const { ErrorCodes } = require('../../../lib').KafkaJS; + +describe.each([[false], [true]])('Consumer store', (isAutoCommit) => { + let topicName, groupId, producer, consumer; + + beforeEach(async () => { + topicName = `test-topic-${secureRandom()}` + groupId = `consumer-group-id-${secureRandom()}` + + await createTopic({ topic: topicName, partitions: 3 }) + + producer = createProducer({}); + + consumer = createConsumer({ + groupId, + maxWaitTimeInMs: 100, + fromBeginning: true, + autoCommit: isAutoCommit, + autoCommitInterval: 500, + }, { + 'enable.auto.offset.store': false, + }); + }); + + afterEach(async () => { + consumer && (await consumer.disconnect()) + producer && (await producer.disconnect()) + }); + + it('should not work if enable.auto.offset.store = true', async () => { + let assignment = []; + consumer = createConsumer({ + groupId, + maxWaitTimeInMs: 100, + fromBeginning: true, + }, { + /* Set to true manually - the default value with kafkaJS block is false. */ + 'enable.auto.offset.store': true, + 'rebalance_cb': function (err, asg) { + if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) { + assignment = asg; + } + } + }); + + await consumer.connect(); + await consumer.subscribe({ topic: topicName }) + await consumer.run({ + eachMessage: async () => { + } + }); + await waitFor(() => assignment.length > 0, () => null, 1000); + expect( + () => consumer.storeOffsets([{ topic: topicName, partition: 0, offset: '10' }]) + ).toThrow(/Store can only be called when enable.auto.offset.store is explicitly set to false/); + }); + + it('should not work if enable.auto.offset.store is unset', async () => { + let assignment = []; + consumer = createConsumer({ + groupId, + maxWaitTimeInMs: 100, + fromBeginning: true, + }, { + /* Set to true manually - the default value with kafkaJS block is false. */ + 'rebalance_cb': function (err, asg) { + if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) { + assignment = asg; + } + } + }); + + await consumer.connect(); + await consumer.subscribe({ topic: topicName }) + await consumer.run({ + eachMessage: async () => { + } + }); + await waitFor(() => assignment.length > 0, () => null, 1000); + expect( + () => consumer.storeOffsets([{ topic: topicName, partition: 0, offset: '10' }]) + ).toThrow(/Store can only be called when enable.auto.offset.store is explicitly set to false/); + }); + + it('should commit stored offsets', async () => { + /* Evenly distribute 30 messages across 3 partitions */ + let i = 0; + const messages = Array(3 * 10) + .fill() + .map(() => { + const value = secureRandom() + return { value: `value-${value}`, partition: (i++) % 3 } + }) + + await producer.connect(); + await producer.send({ topic: topicName, messages }) + await producer.flush(); + + let msgCount = 0; + await consumer.connect(); + await consumer.subscribe({ topic: topicName }) + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + msgCount++; + const offset = (Number(message.offset) + 1).toString(); + expect(() => consumer.storeOffsets([{ topic, partition, offset }])).not.toThrow(); + } + }); + await waitFor(() => msgCount >= 30, () => null, { delay: 100 }); + expect(msgCount).toEqual(30); + + if (!isAutoCommit) + await expect(consumer.commitOffsets()).resolves.toBeUndefined(); + else + await sleep(1000); /* Wait for auto-commit */ + + await consumer.disconnect(); + + /* Send 30 more messages */ + await producer.send({ topic: topicName, messages }) + await producer.flush(); + + + consumer = createConsumer({ + groupId, + maxWaitTimeInMs: 100, + fromBeginning: true, + }); + + msgCount = 0; + await consumer.connect(); + await consumer.subscribe({ topic: topicName }) + await consumer.run({ + eachMessage: async ({ message }) => { + msgCount++; + } + }) + /* Only the extra 30 messages should come to us */ + await waitFor(() => msgCount >= 30, () => null, { delay: 100 }); + await sleep(1000); + expect(msgCount).toEqual(30); + }); + +}); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 75a7390a..2feec790 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -136,6 +136,7 @@ type Sender = { export type Producer = Sender & { connect(): Promise disconnect(): Promise + flush(args?: {timeout?: number}): Promise } export interface RetryOptions { @@ -460,6 +461,7 @@ export type Consumer = { subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise stop(): Promise run(config?: ConsumerRunConfig): Promise + storeOffsets(topicPartitions: Array): void commitOffsets(topicPartitions: Array): Promise seek(topicPartitionOffset: TopicPartitionOffset): Promise describeGroup(): Promise