diff --git a/src/cluster/index.js b/src/cluster/index.js index af0a919a6..a0dea79dc 100644 --- a/src/cluster/index.js +++ b/src/cluster/index.js @@ -108,11 +108,29 @@ module.exports = class Cluster { }) this[PRIVATE.REFRESH_METADATA] = sharedPromiseTo(async () => { - return await this.brokerPool.refreshMetadata(Array.from(this.targetTopics)) + try { + return await this.brokerPool.refreshMetadata(Array.from(this.targetTopics)) + } catch (e) { + if (e.type === 'UNKNOWN_TOPIC_OR_PARTITION' && e.unknownTopics) { + await this.removeMultipleTargetTopics(e.unknownTopics) + return await this.brokerPool.refreshMetadata(Array.from(this.targetTopics)) + } else { + throw e + } + } }) this[PRIVATE.REFRESH_METADATA_IF_NECESSARY] = sharedPromiseTo(async () => { - return await this.brokerPool.refreshMetadataIfNecessary(Array.from(this.targetTopics)) + try { + return await this.brokerPool.refreshMetadataIfNecessary(Array.from(this.targetTopics)) + } catch (e) { + if (e.type === 'UNKNOWN_TOPIC_OR_PARTITION' && e.unknownTopics) { + await this.removeMultipleTargetTopics(e.unknownTopics) + return await this.brokerPool.refreshMetadataIfNecessary(Array.from(this.targetTopics)) + } else { + throw e + } + } }) this[PRIVATE.FIND_CONTROLLER_BROKER] = sharedPromiseTo(async () => { @@ -245,6 +263,32 @@ module.exports = class Cluster { } } + /** + * @public + * @param {string[]} topics + * @return {Promise} + */ + async removeMultipleTargetTopics(topics) { + /* this may be called from addMultipleTargetTopics which already acquired the lock */ + const isLockNeeded = !this.mutatingTargetTopics.isLocked() + + if (isLockNeeded) { + await this.mutatingTargetTopics.acquire() + } + + try { + for (const topic of topics) { + if (this.targetTopics.has(topic)) { + this.targetTopics.delete(topic) + } + } + } finally { + if (isLockNeeded) { + await this.mutatingTargetTopics.release() + } + } + } + /** @type {() => string[]} */ getNodeIds() { return this.brokerPool.getNodeIds() diff --git a/src/protocol/requests/metadata/v0/response.js b/src/protocol/requests/metadata/v0/response.js index 3b2fb7650..40027114e 100644 --- a/src/protocol/requests/metadata/v0/response.js +++ b/src/protocol/requests/metadata/v0/response.js @@ -1,5 +1,5 @@ const Decoder = require('../../../decoder') -const { failure, createErrorFromCode } = require('../../../error') +const { createErrorFromCode, errorCodes, failure } = require('../../../error') /** * Metadata Response (Version: 0) => [brokers] [topic_metadata] @@ -18,6 +18,10 @@ const { failure, createErrorFromCode } = require('../../../error') * isr => INT32 */ +const unknownTopicOrPartitionErrorCode = errorCodes.find( + v => v.type === 'UNKNOWN_TOPIC_OR_PARTITION' +).code + const broker = decoder => ({ nodeId: decoder.readInt32(), host: decoder.readString(), @@ -52,7 +56,21 @@ const parse = async data => { const topicsWithErrors = data.topicMetadata.filter(topic => failure(topic.topicErrorCode)) if (topicsWithErrors.length > 0) { const { topicErrorCode } = topicsWithErrors[0] - throw createErrorFromCode(topicErrorCode) + const error = createErrorFromCode(topicErrorCode) + const unknownTopics = topicsWithErrors + .filter(topic => topic.topicErrorCode === unknownTopicOrPartitionErrorCode) + .map(topic => topic.topic) + if (unknownTopics.length > 0) { + /** + * Add an additional "unknownTopics" property when the Kafka metadata + * request finds an unknown topic. There are multiple versions of the + * metadata request but all of them use the v0 parse method. I didn't + * create a new error type for this since this error won't be handled + * in the same way for other Kafka requests. + */ + error.unknownTopics = unknownTopics + } + throw error } const errors = data.topicMetadata.flatMap(topic => { diff --git a/src/utils/lock.js b/src/utils/lock.js index 07de4bcc5..a61135774 100644 --- a/src/utils/lock.js +++ b/src/utils/lock.js @@ -60,4 +60,8 @@ module.exports = class Lock { return waitingLock() } } + + isLocked() { + return this[PRIVATE.LOCKED] + } } diff --git a/src/utils/lock.spec.js b/src/utils/lock.spec.js index 882d11362..ee16948cb 100644 --- a/src/utils/lock.spec.js +++ b/src/utils/lock.spec.js @@ -16,12 +16,13 @@ describe('Utils > Lock', () => { await lock.release() } } + const graceMilliseconds = 5 await Promise.all([callResource(), callResource(), callResource()]) const calls = resource.mock.calls.flat() expect(calls.length).toEqual(3) - expect(calls[1] - calls[0]).toBeGreaterThanOrEqual(50) - expect(calls[2] - calls[1]).toBeGreaterThanOrEqual(50) + expect(calls[1] - calls[0]).toBeGreaterThanOrEqual(50 - graceMilliseconds) + expect(calls[2] - calls[1]).toBeGreaterThanOrEqual(50 - graceMilliseconds) }) it('throws an error if the lock cannot be acquired within a period', async () => {