Skip to content

Commit

Permalink
Add storeOffsets and fix typings
Browse files Browse the repository at this point in the history
  • Loading branch information
milindl committed Apr 25, 2024
1 parent ed7226b commit 938df93
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 1 deletion.
24 changes: 24 additions & 0 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,30 @@ class Consumer {
// return m ?? null;
}

/**
* Store offsets for the given topic partitions.
*
* Stored offsets will be commited automatically at a later point if autoCommit is enabled.
* Otherwise, they will be committed when commitOffsets is called without arguments.
*
* enable.auto.offset.store must be set to false to use this API.
* @param {import("../../types/kafkajs").TopicPartitionOffsetAndMetadata[]?} topicPartitions
*/
storeOffsets(topicPartitions) {
if (this.#state !== ConsumerState.CONNECTED) {
throw new error.KafkaJSError('Store can only be called while connected.', { code: error.ErrorCodes.ERR__STATE });
}

if (!this.#userManagedStores) {
throw new error.KafkaJSError(
'Store can only be called when enable.auto.offset.store is explicitly set to false.', { code: error.ErrorCodes.ERR__INVALID_ARG });
}

const topicPartitionsRdKafka = topicPartitions.map(
topicPartitionOffsetToRdKafka);
this.#internalClient.offsetsStore(topicPartitionsRdKafka);
}

async #commitOffsetsUntilNoStateErr(offsetsToCommit) {
let err = { code: error.ErrorCodes.ERR_NO_ERROR };
do {
Expand Down
1 change: 0 additions & 1 deletion test/promisified/consumer/consumerCacheTests.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
jest.setTimeout(30000)

const { is } = require('bluebird');
const {
secureRandom,
createTopic,
Expand Down
154 changes: 154 additions & 0 deletions test/promisified/consumer/store.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
jest.setTimeout(30000)

const {
secureRandom,
createTopic,
waitFor,
createProducer,
createConsumer,
sleep,
} = require('../testhelpers');
const { ErrorCodes } = require('../../../lib').KafkaJS;

describe.each([[false], [true]])('Consumer store', (isAutoCommit) => {
let topicName, groupId, producer, consumer;

beforeEach(async () => {
topicName = `test-topic-${secureRandom()}`
groupId = `consumer-group-id-${secureRandom()}`

await createTopic({ topic: topicName, partitions: 3 })

producer = createProducer({});

consumer = createConsumer({
groupId,
maxWaitTimeInMs: 100,
fromBeginning: true,
autoCommit: isAutoCommit,
autoCommitInterval: 500,
}, {
'enable.auto.offset.store': false,
});
});

afterEach(async () => {
consumer && (await consumer.disconnect())
producer && (await producer.disconnect())
});

it('should not work if enable.auto.offset.store = true', async () => {
let assignment = [];
consumer = createConsumer({
groupId,
maxWaitTimeInMs: 100,
fromBeginning: true,
}, {
/* Set to true manually - the default value with kafkaJS block is false. */
'enable.auto.offset.store': true,
'rebalance_cb': function (err, asg) {
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
assignment = asg;
}
}
});

await consumer.connect();
await consumer.subscribe({ topic: topicName })
await consumer.run({
eachMessage: async () => {
}
});
await waitFor(() => assignment.length > 0, () => null, 1000);
expect(
() => consumer.storeOffsets([{ topic: topicName, partition: 0, offset: '10' }])
).toThrow(/Store can only be called when enable.auto.offset.store is explicitly set to false/);
});

it('should not work if enable.auto.offset.store is unset', async () => {
let assignment = [];
consumer = createConsumer({
groupId,
maxWaitTimeInMs: 100,
fromBeginning: true,
}, {
/* Set to true manually - the default value with kafkaJS block is false. */
'rebalance_cb': function (err, asg) {
if (err.code === ErrorCodes.ERR__ASSIGN_PARTITIONS) {
assignment = asg;
}
}
});

await consumer.connect();
await consumer.subscribe({ topic: topicName })
await consumer.run({
eachMessage: async () => {
}
});
await waitFor(() => assignment.length > 0, () => null, 1000);
expect(
() => consumer.storeOffsets([{ topic: topicName, partition: 0, offset: '10' }])
).toThrow(/Store can only be called when enable.auto.offset.store is explicitly set to false/);
});

it('should commit stored offsets', async () => {
/* Evenly distribute 30 messages across 3 partitions */
let i = 0;
const messages = Array(3 * 10)
.fill()
.map(() => {
const value = secureRandom()
return { value: `value-${value}`, partition: (i++) % 3 }
})

await producer.connect();
await producer.send({ topic: topicName, messages })
await producer.flush();

let msgCount = 0;
await consumer.connect();
await consumer.subscribe({ topic: topicName })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
msgCount++;
const offset = (Number(message.offset) + 1).toString();
expect(() => consumer.storeOffsets([{ topic, partition, offset }])).not.toThrow();
}
});
await waitFor(() => msgCount >= 30, () => null, { delay: 100 });
expect(msgCount).toEqual(30);

if (!isAutoCommit)
await expect(consumer.commitOffsets()).resolves.toBeUndefined();
else
await sleep(1000); /* Wait for auto-commit */

await consumer.disconnect();

/* Send 30 more messages */
await producer.send({ topic: topicName, messages })
await producer.flush();


consumer = createConsumer({
groupId,
maxWaitTimeInMs: 100,
fromBeginning: true,
});

msgCount = 0;
await consumer.connect();
await consumer.subscribe({ topic: topicName })
await consumer.run({
eachMessage: async ({ message }) => {
msgCount++;
}
})
/* Only the extra 30 messages should come to us */
await waitFor(() => msgCount >= 30, () => null, { delay: 100 });
await sleep(1000);
expect(msgCount).toEqual(30);
});

});
2 changes: 2 additions & 0 deletions types/kafkajs.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ type Sender = {
export type Producer = Sender & {
connect(): Promise<void>
disconnect(): Promise<void>
flush(args?: {timeout?: number}): Promise<void>
}

export interface RetryOptions {
Expand Down Expand Up @@ -460,6 +461,7 @@ export type Consumer = {
subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
stop(): Promise<void>
run(config?: ConsumerRunConfig): Promise<void>
storeOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): void
commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
seek(topicPartitionOffset: TopicPartitionOffset): Promise<void>
describeGroup(): Promise<GroupDescription>
Expand Down

0 comments on commit 938df93

Please sign in to comment.