Skip to content

Commit

Permalink
fetchTopicOffsetsByTimestamp api implemented (#206)
Browse files Browse the repository at this point in the history
* added fetchTopicOffsetsByTimeStamp

* Requested changes

* Requested changes

* requested changes

* Changelog changes
  • Loading branch information
PratRanj07 authored Dec 17, 2024
1 parent 3d8c258 commit 65451d9
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 0 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# confluent-kafka-javascript v1.1.0

v1.1.0 is a feature release. It is supported for all usage.

## Enhancements

1. Add support for an Admin API to fetch topic offsets by timestamp (#206).


# confluent-kafka-javascript v1.0.0

v1.0.0 is a feature release. It is supported for all usage.
Expand Down
2 changes: 2 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,8 @@ The admin-client only has support for a limited subset of methods, with more to
and `includeAuthorizedOperations` options. Fetching for all topics is not advisable.
* The `fetchTopicOffsets` method is supported with additional `timeout`
and `isolationLevel` options.
* The `fetchTopicOffsetsByTimestamp` method is supported with additional `timeout`
and `isolationLevel` options.
### Using the Schema Registry
Expand Down
90 changes: 90 additions & 0 deletions examples/kafkajs/admin/fetch-topic-offsets-by-timestamp.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
const { Kafka, IsolationLevel } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

async function fetchOffsetsByTimestamp() {
// Parse command-line arguments
const args = parseArgs({
allowPositionals: true,
options: {
'bootstrap-servers': {
type: 'string',
short: 'b',
default: 'localhost:9092',
},
'timeout': {
type: 'string',
short: 't',
default: '5000',
},
'isolation-level': {
type: 'string',
short: 'i',
default: '0', // Default to '0' for read_uncommitted
},
'timestamp': {
type: 'string',
short: 's',
},
},
});

const {
'bootstrap-servers': bootstrapServers,
timeout,
'isolation-level': isolationLevel,
timestamp,
} = args.values;

const [topic] = args.positionals;

if (!topic) {
console.error('Topic name is required');
process.exit(1);
}

// Determine the isolation level
let isolationLevelValue;
if (isolationLevel === '0') {
isolationLevelValue = IsolationLevel.READ_UNCOMMITTED;
} else if (isolationLevel === '1') {
isolationLevelValue = IsolationLevel.READ_COMMITTED;
} else {
console.error('Invalid isolation level. Use 0 for READ_UNCOMMITTED or 1 for READ_COMMITTED.');
process.exit(1);
}

// Parse the timestamp if provided
const timestampValue = timestamp ? Number(timestamp) : undefined;

const kafka = new Kafka({
kafkaJS: {
brokers: [bootstrapServers],
},
});

const admin = kafka.admin();
await admin.connect();

try {
// Prepare options
const options = {
isolationLevel: isolationLevelValue,
timeout: Number(timeout),
};

// Fetch offsets by timestamp for the specified topic
const offsets = await admin.fetchTopicOffsetsByTimestamp(
topic,
timestampValue, // Only pass timestamp if provided
options
);

console.log(`Offsets for topic "${topic}" with timestamp ${timestampValue || 'not provided'}:`, JSON.stringify(offsets, null, 2));
} catch (err) {
console.error('Error fetching topic offsets by timestamp:', err);
} finally {
await admin.disconnect();
}
}

fetchOffsetsByTimestamp();
1 change: 1 addition & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const IsolationLevel = {
* Either a timestamp can be used, or else, one of the special, pre-defined values
* (EARLIEST, LATEST, MAX_TIMESTAMP) can be used while passing an OffsetSpec to listOffsets.
* @param {number} timestamp - The timestamp to list offsets at.
* @memberof RdKafka
* @constructor
*/
function OffsetSpec(timestamp) {
Expand Down
125 changes: 125 additions & 0 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,131 @@ class Admin {
});
});
}

/**
* List offsets for the topic partition(s) by timestamp.
*
* @param {string} topic - The topic to fetch offsets for.
* @param {number?} timestamp - The timestamp to fetch offsets for.
* @param {object?} options
* @param {number?} options.timeout - The request timeout in milliseconds.
* May be unset (default: 5000)
* @param {KafkaJS.IsolationLevel?} options.isolationLevel - The isolation level for reading the offsets.
* (default: READ_UNCOMMITTED)
*
* The returned topic partitions contain the earliest offset whose timestamp is greater than or equal to
* the given timestamp. If there is no such offset, or if the timestamp is unset, the latest offset is returned instead.
*
* @returns {Promise<Array<{partition: number, offset: string}>>}
*/
async fetchTopicOffsetsByTimestamp(topic, timestamp, options = {}) {
if (this.#state !== AdminState.CONNECTED) {
throw new error.KafkaJSError("Admin client is not connected.", { code: error.ErrorCodes.ERR__STATE });
}

if (!Object.hasOwn(options, 'timeout')) {
options.timeout = 5000;
}

let topicData;
let startTime, endTime, timeTaken;

try {
// Measure time taken for fetchTopicMetadata
startTime = hrtime.bigint();
topicData = await this.fetchTopicMetadata({ topics: [topic], timeout: options.timeout });
endTime = hrtime.bigint();
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds

// Adjust timeout for the next request
options.timeout -= timeTaken;
if (options.timeout <= 0) {
throw new error.KafkaJSError("Timeout exceeded while fetching topic metadata.", { code: error.ErrorCodes.ERR__TIMED_OUT });
}
} catch (err) {
throw new createKafkaJsErrorFromLibRdKafkaError(err);
}

const partitionIds = topicData.flatMap(topic =>
topic.partitions.map(partition => partition.partitionId)
);
let topicPartitionOffset = [];
if (typeof timestamp === 'undefined') {
topicPartitionOffset = partitionIds.map(partitionId => ({
topic,
partition: partitionId,
offset: OffsetSpec.LATEST
}));
}
else {
topicPartitionOffset = partitionIds.map(partitionId => ({
topic,
partition: partitionId,
offset: new OffsetSpec(timestamp)
}));
}

const topicPartitionOffsetsLatest = partitionIds.map(partitionId => ({
topic,
partition: partitionId,
offset: OffsetSpec.LATEST
}));

try {
// Measure time taken for listOffsets (by timestamp)
startTime = hrtime.bigint();
const offsetsByTimeStamp = await this.#listOffsets(topicPartitionOffset, options);
endTime = hrtime.bigint();
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds

// Adjust timeout for the next request
options.timeout -= timeTaken;
if (options.timeout <= 0) {
throw new error.KafkaJSError("Timeout exceeded while fetching offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT });
}

if (typeof timestamp === 'undefined') {
// Return result from offsetsByTimestamp if timestamp is undefined
return offsetsByTimeStamp.map(offset => ({
partition: offset.partition,
offset: offset.offset.toString(),
}));
} else {
// Measure time taken for listOffsets(latest)
startTime = hrtime.bigint();
const latestOffsets = await this.#listOffsets(topicPartitionOffsetsLatest, options);
endTime = hrtime.bigint();
timeTaken = Number(endTime - startTime) / 1e6; // Convert nanoseconds to milliseconds

// Adjust timeout for the next request
options.timeout -= timeTaken;
if (options.timeout <= 0) {
throw new error.KafkaJSError("Timeout exceeded while fetching latest offsets.", { code: error.ErrorCodes.ERR__TIMED_OUT });
}

const combinedResults = partitionIds.map(partitionId => {
const latest = latestOffsets.find(offset => offset.partition === partitionId);
const timestampOffset = offsetsByTimeStamp.find(offset => offset.partition === partitionId);

if (timestampOffset.offset === -1) {
return {
partition: partitionId,
offset: latest.offset.toString(),
};
} else {
return {
partition: partitionId,
offset: timestampOffset.offset.toString(),
};
}
});

return combinedResults;
}
} catch (err) {
throw createKafkaJsErrorFromLibRdKafkaError(err);
}
}
}

module.exports = {
Expand Down
Loading

0 comments on commit 65451d9

Please sign in to comment.