From 65451d9058442d12c24d391684fb2cd1a3753dd1 Mon Sep 17 00:00:00 2001 From: Pratyush Ranjan <156985928+PratRanj07@users.noreply.github.com> Date: Tue, 17 Dec 2024 11:28:46 +0530 Subject: [PATCH] fetchTopicOffsetsByTimestamp api implemented (#206) * added fetchTopicOffsetsByTimeStamp * Requested changes * Requested changes * requested changes * Changelog changes --- CHANGELOG.md | 9 + MIGRATION.md | 2 + .../admin/fetch-topic-offsets-by-timestamp.js | 90 +++++++++ lib/admin.js | 1 + lib/kafkajs/_admin.js | 125 ++++++++++++ .../fetch_topic_offsets_by_timestamp.spec.js | 178 ++++++++++++++++++ types/kafkajs.d.ts | 6 + 7 files changed, 411 insertions(+) create mode 100644 examples/kafkajs/admin/fetch-topic-offsets-by-timestamp.js create mode 100644 test/promisified/admin/fetch_topic_offsets_by_timestamp.spec.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a9d2c1a..15a1fb25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +# confluent-kafka-javascript v1.1.0 + +v1.1.0 is a feature release. It is supported for all usage. + +## Enhancements + +1. Add support for an Admin API to fetch topic offsets by timestamp (#206). + + # confluent-kafka-javascript v1.0.0 v1.0.0 is a feature release. It is supported for all usage. diff --git a/MIGRATION.md b/MIGRATION.md index 100cf980..676e60a8 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -339,6 +339,8 @@ The admin-client only has support for a limited subset of methods, with more to and `includeAuthorizedOperations` options. Fetching for all topics is not advisable. * The `fetchTopicOffsets` method is supported with additional `timeout` and `isolationLevel` options. + * The `fetchTopicOffsetsByTimestamp` method is supported with additional `timeout` + and `isolationLevel` options. ### Using the Schema Registry diff --git a/examples/kafkajs/admin/fetch-topic-offsets-by-timestamp.js b/examples/kafkajs/admin/fetch-topic-offsets-by-timestamp.js new file mode 100644 index 00000000..0fe55579 --- /dev/null +++ b/examples/kafkajs/admin/fetch-topic-offsets-by-timestamp.js @@ -0,0 +1,90 @@ +const { Kafka, IsolationLevel } = require('@confluentinc/kafka-javascript').KafkaJS; +const { parseArgs } = require('node:util'); + +async function fetchOffsetsByTimestamp() { + // Parse command-line arguments + const args = parseArgs({ + allowPositionals: true, + options: { + 'bootstrap-servers': { + type: 'string', + short: 'b', + default: 'localhost:9092', + }, + 'timeout': { + type: 'string', + short: 't', + default: '5000', + }, + 'isolation-level': { + type: 'string', + short: 'i', + default: '0', // Default to '0' for read_uncommitted + }, + 'timestamp': { + type: 'string', + short: 's', + }, + }, + }); + + const { + 'bootstrap-servers': bootstrapServers, + timeout, + 'isolation-level': isolationLevel, + timestamp, + } = args.values; + + const [topic] = args.positionals; + + if (!topic) { + console.error('Topic name is required'); + process.exit(1); + } + + // Determine the isolation level + let isolationLevelValue; + if (isolationLevel === '0') { + isolationLevelValue = IsolationLevel.READ_UNCOMMITTED; + } else if (isolationLevel === '1') { + isolationLevelValue = IsolationLevel.READ_COMMITTED; + } else { + console.error('Invalid isolation level. Use 0 for READ_UNCOMMITTED or 1 for READ_COMMITTED.'); + process.exit(1); + } + + // Parse the timestamp if provided + const timestampValue = timestamp ? Number(timestamp) : undefined; + + const kafka = new Kafka({ + kafkaJS: { + brokers: [bootstrapServers], + }, + }); + + const admin = kafka.admin(); + await admin.connect(); + + try { + // Prepare options + const options = { + isolationLevel: isolationLevelValue, + timeout: Number(timeout), + }; + + // Fetch offsets by timestamp for the specified topic + const offsets = await admin.fetchTopicOffsetsByTimestamp( + topic, + timestampValue, // Only pass timestamp if provided + options + ); + + console.log(`Offsets for topic "${topic}" with timestamp ${timestampValue || 'not provided'}:`, JSON.stringify(offsets, null, 2)); + } catch (err) { + console.error('Error fetching topic offsets by timestamp:', err); + } finally { + await admin.disconnect(); + } +} + +fetchOffsetsByTimestamp(); diff --git a/lib/admin.js b/lib/admin.js index 19c5024f..bde01f72 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -67,6 +67,7 @@ const IsolationLevel = { * Either a timestamp can be used, or else, one of the special, pre-defined values * (EARLIEST, LATEST, MAX_TIMESTAMP) can be used while passing an OffsetSpec to listOffsets. * @param {number} timestamp - The timestamp to list offsets at. + * @memberof RdKafka * @constructor */ function OffsetSpec(timestamp) { diff --git a/lib/kafkajs/_admin.js b/lib/kafkajs/_admin.js index 5637b990..9011ed5f 100644 --- a/lib/kafkajs/_admin.js +++ b/lib/kafkajs/_admin.js @@ -872,6 +872,131 @@ class Admin { }); }); } + + /** + * List offsets for the topic partition(s) by timestamp. + * + * @param {string} topic - The topic to fetch offsets for. + * @param {number?} timestamp - The timestamp to fetch offsets for. + * @param {object?} options + * @param {number?} options.timeout - The request timeout in milliseconds. + * May be unset (default: 5000) + * @param {KafkaJS.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets. + * (default: READ_UNCOMMITTED) + * + * The returned topic partitions contain the earliest offset whose timestamp is greater than or equal to + * the given timestamp. If there is no such offset, or if the timestamp is unset, the latest offset is returned instead. + * + * @returns {Promise>} + */ + async fetchTopicOffsetsByTimestamp(topic, timestamp, options = {}) { + if (this.#state !== AdminState.CONNECTED) { + throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE }); + } + + if (!Object.hasOwn(options, 'timeout')) { + options.timeout = 5000; + } + + let topicData; + let startTime, endTime, timeTaken; + + try { + // Measure time taken for fetchTopicMetadata + startTime = hrtime.bigint(); + topicData = await this.fetchTopicMetadata({ topics: [topic], timeout: options.timeout }); + endTime = hrtime.bigint(); + timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds + + // Adjust timeout for the next request + options.timeout -= timeTaken; + if (options.timeout <= 0) { + throw new error.KafkaJSError("Timeout exceeded while fetching topic metadata.", { code: error.ErrorCodes.ERR__TIMED_OUT }); + } + } catch (err) { + throw new createKafkaJsErrorFromLibRdKafkaError(err); + } + + const partitionIds = topicData.flatMap(topic => + topic.partitions.map(partition => partition.partitionId) + ); + let topicPartitionOffset = []; + if (typeof timestamp === 'undefined') { + topicPartitionOffset = partitionIds.map(partitionId => ({ + topic, + partition: partitionId, + offset: OffsetSpec.LATEST + })); + } + else { + topicPartitionOffset = partitionIds.map(partitionId => ({ + topic, + partition: partitionId, + offset: new OffsetSpec(timestamp) + })); + } + + const topicPartitionOffsetsLatest = partitionIds.map(partitionId => ({ + topic, + partition: partitionId, + offset: OffsetSpec.LATEST + })); + + try { + // Measure time taken for listOffsets (by timestamp) + startTime = hrtime.bigint(); + const offsetsByTimeStamp = await this.#listOffsets(topicPartitionOffset, options); + endTime = hrtime.bigint(); + timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds + + // Adjust timeout for the next request + options.timeout -= timeTaken; + if (options.timeout <= 0) { + throw new error.KafkaJSError("Timeout exceeded while fetching offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT }); + } + + if (typeof timestamp === 'undefined') { + // Return result from offsetsByTimestamp if timestamp is undefined + return offsetsByTimeStamp.map(offset => ({ + partition: offset.partition, + offset: offset.offset.toString(), + })); + } else { + // Measure time taken for listOffsets(latest) + startTime = hrtime.bigint(); + const latestOffsets = await this.#listOffsets(topicPartitionOffsetsLatest, options); + endTime = hrtime.bigint(); + timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds + + // Adjust timeout for the next request + options.timeout -= timeTaken; + if (options.timeout <= 0) { + throw new error.KafkaJSError("Timeout exceeded while fetching latest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT }); + } + + const combinedResults = partitionIds.map(partitionId => { + const latest = latestOffsets.find(offset => offset.partition === partitionId); + const timestampOffset = offsetsByTimeStamp.find(offset => offset.partition === partitionId); + + if (timestampOffset.offset === -1) { + return { + partition: partitionId, + offset: latest.offset.toString(), + }; + } else { + return { + partition: partitionId, + offset: timestampOffset.offset.toString(), + }; + } + }); + + return combinedResults; + } + } catch (err) { + throw createKafkaJsErrorFromLibRdKafkaError(err); + } + } } module.exports = { diff --git a/test/promisified/admin/fetch_topic_offsets_by_timestamp.spec.js b/test/promisified/admin/fetch_topic_offsets_by_timestamp.spec.js new file mode 100644 index 00000000..cf0ede8e --- /dev/null +++ b/test/promisified/admin/fetch_topic_offsets_by_timestamp.spec.js @@ -0,0 +1,178 @@ +jest.setTimeout(30000); + +const { ErrorCodes } = require("../../../lib").KafkaJS; +const { + secureRandom, + createTopic, + createProducer, + createAdmin, +} = require("../testhelpers"); + +describe("fetchTopicOffsetsByTimestamp function", () => { + let topicName, admin, producer; + + beforeEach(async () => { + admin = createAdmin({}); + producer = createProducer({ + clientId: "test-producer-id", + }); + + await admin.connect(); + await producer.connect(); + + topicName = `test-topic-${secureRandom()}`; + }); + + afterEach(async () => { + await admin.deleteTopics({ + topics: [topicName], + }); + await admin.disconnect(); + producer && (await producer.disconnect()); + }); + + it("should timeout when fetching topic offsets by timestamp", async () => { + await createTopic({ topic: topicName, partitions: 1 }); + + await expect( + admin.fetchTopicOffsetsByTimestamp(topicName, Date.now(), { timeout: 0 }) + ).rejects.toHaveProperty("code", ErrorCodes.ERR__TIMED_OUT); + }); + + it("should fetch offsets for specific timestamps (t1, t2, t3)", async () => { + await createTopic({ topic: topicName, partitions: 1 }); + + // Messages with specific timestamps + const now = 10000000; + const t1 = now + 100; + const t2 = now + 250; + const t3 = now + 400; + + const messages = [ + { value: 'message1', timestamp: t1.toString() }, + { value: 'message2', timestamp: t2.toString() }, + { value: 'message3', timestamp: t3.toString() }, + ]; + + await producer.send({ topic: topicName, messages }); + + const offsetsAtSpecificTimestamp1 = await admin.fetchTopicOffsetsByTimestamp(topicName, now + 50); + expect(offsetsAtSpecificTimestamp1).toEqual([ + { + partition: 0, + offset: "0", + }, + ]); + + const offsetsAtSpecificTimestamp2 = await admin.fetchTopicOffsetsByTimestamp(topicName, now + 250); + expect(offsetsAtSpecificTimestamp2).toEqual([ + { + partition: 0, + offset: "1", + }, + ]); + + const offsetsAtSpecificTimestamp3 = await admin.fetchTopicOffsetsByTimestamp(topicName, now + 500); + expect(offsetsAtSpecificTimestamp3).toEqual([ + { + partition: 0, + offset: "3", + }, + ]); + }); + + it("should return result for a topic with a single partition and no timestamp", async () => { + await createTopic({ topic: topicName, partitions: 1 }); + + // Send some messages to reach specific offsets + const messages = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + })); + await producer.send({ topic: topicName, messages: messages }); + + // Fetch offsets without providing timestamp + const offsets = await admin.fetchTopicOffsetsByTimestamp(topicName); + + expect(offsets).toEqual([ + { + partition: 0, + offset: "5", // As per the test case, no timestamp should return the last committed offset '5' + }, + ]); + }); + + it("should fetch offsets for specific timestamps for a topic with multiple partitions", async () => { + await createTopic({ topic: topicName, partitions: 2 }); + + // Messages with specific timestamps for each partition + const now = 10000000; + const t1 = now + 100; + const t2 = now + 250; + const t3 = now + 400; + + const messagesPartition0 = [ + { value: "message0-partition0-t1", timestamp: t1.toString(), partition: 0 }, + { value: "message1-partition0-t2", timestamp: t2.toString(), partition: 0 }, + { value: "message2-partition0-t3", timestamp: t3.toString(), partition: 0 }, + ]; + + const messagesPartition1 = [ + { value: "message0-partition1-t1", timestamp: t1.toString(), partition: 1 }, + { value: "message1-partition1-t2", timestamp: t2.toString(), partition: 1 }, + { value: "message2-partition1-t3", timestamp: t3.toString(), partition: 1 }, + ]; + + await producer.send({ topic: topicName, messages: messagesPartition0 }); + await producer.send({ topic: topicName, messages: messagesPartition1 }); + + const offsetsBeforeT1 = await admin.fetchTopicOffsetsByTimestamp(topicName, now + 50); + expect(offsetsBeforeT1).toEqual([ + { partition: 0, offset: "0" }, // Offset before any message in partition 0 + { partition: 1, offset: "0" }, // Offset before any message in partition 1 + ]); + + const offsetsBetweenT1AndT2 = await admin.fetchTopicOffsetsByTimestamp(topicName, now + 250); + expect(offsetsBetweenT1AndT2).toEqual([ + { partition: 0, offset: "1" }, + { partition: 1, offset: "1" }, + ]); + + // Fetch latest offsets + const offsetsAfterT3 = await admin.fetchTopicOffsetsByTimestamp(topicName, now + 500); + expect(offsetsAfterT3).toEqual([ + { partition: 0, offset: "3" }, // Latest offset in partition 0 + { partition: 1, offset: "3" }, // Latest offset in partition 1 + ]); + }); + + + it("should return result for a topic with multiple partitions and no timestamp", async () => { + await createTopic({ topic: topicName, partitions: 2 }); + + const messagesPartition0 = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + partition: 0, + })); + const messagesPartition1 = Array.from({ length: 5 }, (_, i) => ({ + value: `message${i}`, + partition: 1, + })); + + await producer.send({ topic: topicName, messages: messagesPartition0 }); + await producer.send({ topic: topicName, messages: messagesPartition1 }); + + // Fetch offsets without providing timestamp + const offsets = await admin.fetchTopicOffsetsByTimestamp(topicName); + + expect(offsets).toEqual([ + { + partition: 0, + offset: "5", // As per the test case, no timestamp should return the last committed offset '5' + }, + { + partition: 1, + offset: "5", // As per the test case, no timestamp should return the last committed offset '5' + }, + ]); + }); +}); diff --git a/types/kafkajs.d.ts b/types/kafkajs.d.ts index 1aaf2817..58d03a5b 100644 --- a/types/kafkajs.d.ts +++ b/types/kafkajs.d.ts @@ -430,6 +430,12 @@ export type Admin = { timeout?: number, isolationLevel: IsolationLevel }): Promise> + fetchTopicOffsetsByTimestamp(topic: string, + timestamp?: number, + options?: { + timeout?: number, + isolationLevel: IsolationLevel + }): Promise> }