From 7e76cc85057649bb7974060fccc573d6e8f1a2d8 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 13 Feb 2025 13:34:12 +0100 Subject: [PATCH] prefix test topics (#562) --- CHANGELOG.md | 1 + spec/lib/waterdrop/clients/buffered_spec.rb | 36 +++---- spec/lib/waterdrop/config_spec.rb | 4 +- .../vendors/datadog/metrics_listener_spec.rb | 34 +++---- spec/lib/waterdrop/producer/sync_spec.rb | 6 +- .../waterdrop/producer/transactions_spec.rb | 93 ++++++++++--------- spec/lib/waterdrop/producer/variant_spec.rb | 17 ++-- spec/lib/waterdrop/producer_spec.rb | 10 +- spec/spec_helper.rb | 1 + spec/support/factories/message.rb | 2 +- 10 files changed, 110 insertions(+), 94 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 043ea865..4368030e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## 2.8.2 (Unreleased) - [Feature] Allow for tagging of producer instances similar to how consumers can be tagged. +- [Refactor] Ensure all test topics in the test suite start with "it-" prefix. ## 2.8.1 (2024-12-26) - [Enhancement] Raise `WaterDrop::ProducerNotTransactionalError` when attempting to use transactions on a non-transactional producer. diff --git a/spec/lib/waterdrop/clients/buffered_spec.rb b/spec/lib/waterdrop/clients/buffered_spec.rb index 82193078..36aacd74 100644 --- a/spec/lib/waterdrop/clients/buffered_spec.rb +++ b/spec/lib/waterdrop/clients/buffered_spec.rb @@ -3,6 +3,8 @@ RSpec.describe_current do subject(:client) { described_class.new(producer) } + let(:topic_name) { "it-#{SecureRandom}" } + let(:producer) do WaterDrop::Producer.new do |config| config.deliver = false @@ -85,41 +87,41 @@ context 'when running transaction with production of messages' do it 'expect to add them to the buffers' do producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') end expect(client.messages.size).to eq(5) - expect(client.messages_for('test').size).to eq(2) + expect(client.messages_for(topic_name).size).to eq(2) end end context 'when running nested transaction with production of messages' do it 'expect to add them to the buffers' do producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') end end expect(client.messages.size).to eq(7) - expect(client.messages_for('test').size).to eq(4) + expect(client.messages_for(topic_name).size).to eq(4) end end context 'when running nested transaction with production of messages on abort' do it 'expect to add them to the buffers' do producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') raise WaterDrop::AbortTransaction end @@ -139,7 +141,7 @@ it 'expect not to contain messages from the aborted transaction' do producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') raise WaterDrop::AbortTransaction end @@ -167,7 +169,7 @@ it 'expect not to contain messages from the aborted transaction' do expect do producer.transaction do - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') raise StandardError end @@ -192,7 +194,8 @@ end context 'when we try to store offset without a transaction' do - let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) } + let(:topic) { "it-#{SecureRandom.uuid}" } + let(:message) { OpenStruct.new(topic: topic, partition: 0, offset: 10) } it 'expect to raise an error' do expect { producer.transaction_mark_as_consumed(nil, message) } @@ -201,8 +204,9 @@ end context 'when trying to store offset with transaction' do + let(:topic) { "it-#{SecureRandom.uuid}" } let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) } - let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) } + let(:message) { OpenStruct.new(topic: topic, partition: 0, offset: 10) } it do expect do diff --git a/spec/lib/waterdrop/config_spec.rb b/spec/lib/waterdrop/config_spec.rb index 9f3e2d3a..59494d26 100644 --- a/spec/lib/waterdrop/config_spec.rb +++ b/spec/lib/waterdrop/config_spec.rb @@ -3,6 +3,8 @@ RSpec.describe_current do subject(:config) { described_class.new } + let(:topic_name) { "it-#{SecureRandom.uuid}" } + describe '#setup' do context 'when configuration has errors' do let(:error_class) { ::WaterDrop::Errors::ConfigurationInvalidError } @@ -31,7 +33,7 @@ it 'expect not to allow it' do expect do - producer.produce_sync(topic: 'test', payload: 'test') + producer.produce_sync(topic: topic_name, payload: 'test') end.to raise_error(Rdkafka::Config::ClientCreationError) end end diff --git a/spec/lib/waterdrop/instrumentation/vendors/datadog/metrics_listener_spec.rb b/spec/lib/waterdrop/instrumentation/vendors/datadog/metrics_listener_spec.rb index 7a40a808..5d873c7f 100644 --- a/spec/lib/waterdrop/instrumentation/vendors/datadog/metrics_listener_spec.rb +++ b/spec/lib/waterdrop/instrumentation/vendors/datadog/metrics_listener_spec.rb @@ -46,6 +46,8 @@ def initialize producer end + let(:topic) { "it-#{SecureRandom.uuid}" } + after { producer.close } context 'when having some default tags present' do @@ -61,7 +63,7 @@ def initialize end context 'when publishing, default tags should be included' do - before { producer.produce_sync(topic: rand.to_s, payload: rand.to_s) } + before { producer.produce_sync(topic: topic, payload: rand.to_s) } it 'expect to have proper metrics data in place' do published_tags = dummy_client.buffer[:increment]['waterdrop.produced_sync'][0][0][:tags] @@ -87,7 +89,7 @@ def initialize context 'when expecting emitted stats DD dispatch' do # Just to trigger some stats before do - producer.produce_sync(topic: rand.to_s, payload: rand.to_s) + producer.produce_sync(topic: topic, payload: rand.to_s) # Give it some time to emit the stats sleep(1) @@ -129,12 +131,12 @@ def initialize context 'when producing sync' do before do - producer.produce_sync(topic: rand.to_s, payload: rand.to_s) + producer.produce_sync(topic: topic, payload: rand.to_s) producer.produce_many_sync( [ - { topic: rand.to_s, payload: rand.to_s }, - { topic: rand.to_s, payload: rand.to_s } + { topic: topic, payload: rand.to_s }, + { topic: topic, payload: rand.to_s } ] ) end @@ -146,12 +148,12 @@ def initialize context 'when producing async' do before do - producer.produce_async(topic: rand.to_s, payload: rand.to_s) + producer.produce_async(topic: topic, payload: rand.to_s) producer.produce_many_async( [ - { topic: rand.to_s, payload: rand.to_s }, - { topic: rand.to_s, payload: rand.to_s } + { topic: topic, payload: rand.to_s }, + { topic: topic, payload: rand.to_s } ] ) end @@ -163,12 +165,12 @@ def initialize context 'when buffering' do before do - producer.buffer(topic: rand.to_s, payload: rand.to_s) + producer.buffer(topic: topic, payload: rand.to_s) producer.buffer_many( [ - { topic: rand.to_s, payload: rand.to_s }, - { topic: rand.to_s, payload: rand.to_s } + { topic: topic, payload: rand.to_s }, + { topic: topic, payload: rand.to_s } ] ) end @@ -180,7 +182,7 @@ def initialize context 'when flushing sync' do before do - producer.buffer(topic: rand.to_s, payload: rand.to_s) + producer.buffer(topic: topic, payload: rand.to_s) producer.flush_sync end @@ -191,7 +193,7 @@ def initialize context 'when flushing async' do before do - producer.buffer(topic: rand.to_s, payload: rand.to_s) + producer.buffer(topic: topic, payload: rand.to_s) producer.flush_async end @@ -202,7 +204,7 @@ def initialize context 'when message is acknowledged' do before do - producer.produce_sync(topic: rand.to_s, payload: rand.to_s) + producer.produce_sync(topic: topic, payload: rand.to_s) # We need to give the async callback a bit of time to kick in sleep(0.1) end @@ -227,7 +229,7 @@ def initialize end before do - producer.produce_async(topic: rand.to_s, payload: rand.to_s) + producer.produce_async(topic: topic, payload: rand.to_s) sleep(1) end @@ -263,7 +265,7 @@ def initialize context 'when trying to publish a topic level metric' do before do - producer.produce_sync(topic: rand.to_s, payload: rand.to_s) + producer.produce_sync(topic: topic, payload: rand.to_s) sleep(1) listener.send(:report_metric, metric, statistics) end diff --git a/spec/lib/waterdrop/producer/sync_spec.rb b/spec/lib/waterdrop/producer/sync_spec.rb index 6f37773c..14cb4e70 100644 --- a/spec/lib/waterdrop/producer/sync_spec.rb +++ b/spec/lib/waterdrop/producer/sync_spec.rb @@ -3,6 +3,8 @@ RSpec.describe_current do subject(:producer) { build(:producer) } + let(:topic_name) { "it-#{SecureRandom.uuid}" } + after { producer.close } describe '#produce_sync' do @@ -85,9 +87,9 @@ describe '#produce_sync with partition key' do subject(:delivery) { producer.produce_sync(message) } - let(:message) { build(:valid_message, partition_key: rand.to_s, topic: 'example_topic') } + let(:message) { build(:valid_message, partition_key: rand.to_s, topic: topic_name) } - before { producer.produce_sync(topic: 'example_topic', payload: '1') } + before { producer.produce_sync(topic: topic_name, payload: '1') } it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryReport) } end diff --git a/spec/lib/waterdrop/producer/transactions_spec.rb b/spec/lib/waterdrop/producer/transactions_spec.rb index 3ba3ed45..eb506db6 100644 --- a/spec/lib/waterdrop/producer/transactions_spec.rb +++ b/spec/lib/waterdrop/producer/transactions_spec.rb @@ -5,6 +5,7 @@ let(:transactional_id) { SecureRandom.uuid } let(:critical_error) { Exception } + let(:topic_name) { "it-#{SecureRandom.uuid}" } after { producer.close } @@ -53,8 +54,8 @@ handlers = [] producer.transaction do - handlers << producer.produce_async(topic: 'example_topic1', payload: '1') - handlers << producer.produce_async(topic: 'example_topic2', payload: '2') + handlers << producer.produce_async(topic: "#{topic_name}1", payload: '1') + handlers << producer.produce_async(topic: "#{topic_name}2", payload: '2') end expect { handlers.map!(&:wait) }.not_to raise_error @@ -64,7 +65,7 @@ result = rand transaction_result = producer.transaction do - producer.produce_async(topic: 'example_topic', payload: '2') + producer.produce_async(topic: topic_name, payload: '2') result end @@ -81,7 +82,7 @@ begin producer.transaction do 20.times do |i| - producer.produce_async(topic: SecureRandom.uuid, payload: i.to_s) + producer.produce_async(topic: topic_name, payload: i.to_s) end end rescue Rdkafka::RdkafkaError => e @@ -105,7 +106,7 @@ expect do producer.transaction do 10.times do |i| - producer.produce_async(topic: SecureRandom.uuid, payload: i.to_s) + producer.produce_async(topic: topic_name, payload: i.to_s) end end end.not_to raise_error @@ -118,7 +119,7 @@ it 'expect to re-raise this error' do expect do producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'na') + producer.produce_async(topic: topic_name, payload: 'na') raise StandardError end @@ -130,7 +131,7 @@ begin producer.transaction do - handler = producer.produce_async(topic: 'example_topic', payload: 'na') + handler = producer.produce_async(topic: topic_name, payload: 'na') raise StandardError end @@ -156,7 +157,7 @@ begin producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'na') + producer.produce_async(topic: topic_name, payload: 'na') raise StandardError end @@ -181,7 +182,7 @@ begin producer.transaction do - result = producer.produce_sync(topic: 'example_topic', payload: 'na') + result = producer.produce_sync(topic: topic_name, payload: 'na') expect(result.partition).to eq(0) expect(result.error).to be_nil @@ -204,7 +205,7 @@ begin producer.transaction do - handler = producer.produce_async(topic: 'example_topic', payload: 'na') + handler = producer.produce_async(topic: topic_name, payload: 'na') raise StandardError end @@ -228,7 +229,7 @@ it 'expect to re-raise this error' do expect do producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'na') + producer.produce_async(topic: topic_name, payload: 'na') raise critical_error end @@ -240,7 +241,7 @@ begin producer.transaction do - handler = producer.produce_async(topic: 'example_topic', payload: 'na') + handler = producer.produce_async(topic: topic_name, payload: 'na') raise critical_error end @@ -258,7 +259,7 @@ it 'expect not to re-raise' do expect do producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'na') + producer.produce_async(topic: topic_name, payload: 'na') raise WaterDrop::AbortTransaction end @@ -269,7 +270,7 @@ handler = nil producer.transaction do - handler = producer.produce_async(topic: 'example_topic', payload: 'na') + handler = producer.produce_async(topic: topic_name, payload: 'na') raise WaterDrop::AbortTransaction end @@ -292,7 +293,7 @@ producer.transaction do sleep(0.1) - producer.produce_async(topic: 'example_topic', payload: 'na') + producer.produce_async(topic: topic_name, payload: 'na') raise(WaterDrop::AbortTransaction) end @@ -313,7 +314,7 @@ result = nil producer.transaction do - result = producer.produce_sync(topic: 'example_topic', payload: 'na') + result = producer.produce_sync(topic: topic_name, payload: 'na') expect(result.partition).to eq(0) expect(result.error).to be_nil @@ -332,7 +333,7 @@ handler = nil producer.transaction do - handler = producer.produce_async(topic: 'example_topic', payload: 'na') + handler = producer.produce_async(topic: topic_name, payload: 'na') raise(WaterDrop::AbortTransaction) end @@ -364,7 +365,7 @@ expect do producer1.transaction do - producer1.produce_async(topic: 'example_topic', payload: '1') + producer1.produce_async(topic: topic_name, payload: '1') end end.to raise_error(Rdkafka::RdkafkaError, /fenced by a newer instance/) end @@ -375,7 +376,7 @@ expect do producer2.transaction do - producer2.produce_async(topic: 'example_topic', payload: '1') + producer2.produce_async(topic: topic_name, payload: '1') end end.not_to raise_error end @@ -426,20 +427,20 @@ context 'when we use transactional producer without transaction' do it 'expect to allow as it will wrap with a transaction' do expect do - producer.produce_sync(topic: 'example_topic', payload: rand.to_s) + producer.produce_sync(topic: topic_name, payload: rand.to_s) end.not_to raise_error end it 'expect to deliver message correctly' do - result = producer.produce_sync(topic: 'example_topic', payload: rand.to_s) - expect(result.topic_name).to eq('example_topic') + result = producer.produce_sync(topic: topic_name, payload: rand.to_s) + expect(result.topic_name).to eq(topic_name) expect(result.error).to be_nil end it 'expect to use the async dispatch though with transaction wrapper' do - handler = producer.produce_async(topic: 'example_topic', payload: rand.to_s) + handler = producer.produce_async(topic: topic_name, payload: rand.to_s) result = handler.wait - expect(result.topic_name).to eq('example_topic') + expect(result.topic_name).to eq(topic_name) expect(result.error).to be_nil end @@ -515,10 +516,10 @@ handlers = [] producer.transaction do - handlers << producer.produce_async(topic: 'example_topic', payload: 'data') + handlers << producer.produce_async(topic: topic_name, payload: 'data') producer.transaction do - handlers << producer.produce_async(topic: 'example_topic', payload: 'data') + handlers << producer.produce_async(topic: topic_name, payload: 'data') end end @@ -527,10 +528,10 @@ it 'expect to have one actual transaction' do producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'data') + producer.produce_async(topic: topic_name, payload: 'data') producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'data') + producer.produce_async(topic: topic_name, payload: 'data') end end @@ -544,10 +545,10 @@ handlers = [] producer.transaction do - handlers << producer.produce_async(topic: 'example_topic', payload: 'data') + handlers << producer.produce_async(topic: topic_name, payload: 'data') producer.transaction do - handlers << producer.produce_async(topic: 'example_topic', payload: 'data') + handlers << producer.produce_async(topic: topic_name, payload: 'data') raise(WaterDrop::AbortTransaction) end end @@ -560,7 +561,7 @@ end context 'when trying to mark as consumed in a transaction' do - let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 100) } + let(:message) { OpenStruct.new(topic: topic_name, partition: 0, offset: 100) } context 'when we try mark as consumed without a transaction' do it 'expect to raise an error' do @@ -616,7 +617,7 @@ end it 'expect to be able to do so and to send a message' do - expect { producer.produce_async(topic: 'test', payload: 'a') } + expect { producer.produce_async(topic: topic_name, payload: 'a') } .not_to raise_error end end @@ -645,7 +646,7 @@ begin producer.transaction do - handler = producer.produce_async(topic: 'example_topic', payload: 'na') + handler = producer.produce_async(topic: topic_name, payload: 'na') break end @@ -664,7 +665,7 @@ end handler = producer.transaction do - producer.produce_async(topic: 'example_topic', payload: 'na') + producer.produce_async(topic: topic_name, payload: 'na') end expect { handler.wait }.not_to raise_error @@ -672,8 +673,6 @@ end context 'when producer gets a critical broker errors with reload on' do - let(:topic) { SecureRandom.uuid } - let(:producer) do WaterDrop::Producer.new do |config| config.max_payload_size = 1_000_000_000_000 @@ -687,7 +686,7 @@ before do admin = Rdkafka::Config.new('bootstrap.servers': 'localhost:9092').admin - admin.create_topic(topic, 1, 1, 'max.message.bytes': 128).wait + admin.create_topic(topic_name, 1, 1, 'max.message.bytes': 128).wait admin.close end @@ -695,41 +694,43 @@ errored = false begin - producer.produce_async(topic: topic, payload: '1' * 512) + producer.produce_async(topic: topic_name, payload: '1' * 512) rescue WaterDrop::Errors::ProduceError errored = true end expect(errored).to be(true) - producer.produce_async(topic: topic, payload: '1') + producer.produce_async(topic: topic_name, payload: '1') end it 'expect to be able to use same producer after the error when sync' do errored = false begin - producer.produce_sync(topic: topic, payload: '1' * 512) + producer.produce_sync(topic: topic_name, payload: '1' * 512) rescue WaterDrop::Errors::ProduceError errored = true end expect(errored).to be(true) - producer.produce_sync(topic: topic, payload: '1') + producer.produce_sync(topic: topic_name, payload: '1') end end context 'when wrapping an early return method with a transaction' do let(:operation) do + t_name = topic_name + Class.new do - def call(producer, handlers) - handlers << producer.produce_async(topic: 'example_topic1', payload: '1') + define_method :call do |producer, handlers| + handlers << producer.produce_async(topic: "#{t_name}1", payload: '1') return unless handlers.empty? # Never to be reached, expected in this spec - handlers << producer.produce_async(topic: 'example_topic1', payload: '1') + handlers << producer.produce_async(topic: "#{t_name}1", payload: '1') end end end @@ -748,12 +749,12 @@ def call(producer, handlers) context 'when wrapping an early break block with a transaction' do let(:operation) do lambda do |producer, handlers| - handlers << producer.produce_async(topic: 'example_topic1', payload: '1') + handlers << producer.produce_async(topic: "#{topic_name}1", payload: '1') return unless handlers.empty? # Never to be reached, expected in this spec - handlers << producer.produce_async(topic: 'example_topic1', payload: '1') + handlers << producer.produce_async(topic: "#{topic_name}1", payload: '1') end end diff --git a/spec/lib/waterdrop/producer/variant_spec.rb b/spec/lib/waterdrop/producer/variant_spec.rb index 95a3bf5b..65b347f0 100644 --- a/spec/lib/waterdrop/producer/variant_spec.rb +++ b/spec/lib/waterdrop/producer/variant_spec.rb @@ -6,6 +6,7 @@ let(:config_error) { WaterDrop::Errors::VariantInvalidError } let(:produce_error) { WaterDrop::Errors::ProduceError } let(:rd_config_error) { Rdkafka::Config::ConfigError } + let(:topic) { "it-#{SecureRandom.uuid}" } after { producer.close } @@ -48,7 +49,7 @@ it 'expect to allow the original one' do variant - expect { producer.produce_sync(topic: SecureRandom.uuid, payload: '') }.not_to raise_error + expect { producer.produce_sync(topic: topic, payload: '') }.not_to raise_error end end @@ -56,7 +57,7 @@ let(:variant) { producer.with(topic_config: { 'message.timeout.ms': 100_000 }) } it 'expect to work' do - expect { variant.produce_sync(topic: SecureRandom.uuid, payload: '') }.not_to raise_error + expect { variant.produce_sync(topic: topic, payload: '') }.not_to raise_error end end @@ -66,7 +67,7 @@ it 'expect to use this timeout it' do expect do - variant.produce_sync(topic: SecureRandom.uuid, payload: '') + variant.produce_sync(topic: topic, payload: '') end.to raise_error(produce_error) end end @@ -78,7 +79,7 @@ it 'expect to use the settings' do expect do - variant.produce_sync(topic: SecureRandom.uuid, payload: '') + variant.produce_sync(topic: topic, payload: '') end.not_to raise_error end @@ -87,7 +88,7 @@ it 'expect not to allow it' do expect do - variant.produce_sync(topic: SecureRandom.uuid, payload: '') + variant.produce_sync(topic: topic, payload: '') end.to raise_error(config_error) end end @@ -100,7 +101,7 @@ it 'expect to use the settings' do expect do - variant.produce_sync(topic: SecureRandom.uuid, payload: '') + variant.produce_sync(topic: topic, payload: '') end.not_to raise_error end @@ -109,7 +110,7 @@ it 'expect not to allow it' do expect do - variant.produce_sync(topic: SecureRandom.uuid, payload: '') + variant.produce_sync(topic: topic, payload: '') end.to raise_error(config_error) end end @@ -122,7 +123,7 @@ it 'expect not to allow it' do expect do - variant.produce_sync(topic: SecureRandom.uuid, payload: '') + variant.produce_sync(topic: topic, payload: '') end.to raise_error(config_error) end end diff --git a/spec/lib/waterdrop/producer_spec.rb b/spec/lib/waterdrop/producer_spec.rb index 4440e59a..8394fa7f 100644 --- a/spec/lib/waterdrop/producer_spec.rb +++ b/spec/lib/waterdrop/producer_spec.rb @@ -3,6 +3,8 @@ RSpec.describe_current do subject(:producer) { described_class.new } + let(:topic_name) { "it-#{SecureRandom.uuid}" } + after { producer.close } describe '#initialize' do @@ -126,13 +128,13 @@ def on_oauthbearer_token_refresh(_); end let(:count) { producer.partition_count(topic) } context 'when topic does not exist' do - let(:topic) { SecureRandom.uuid } + let(:topic) { "it-#{SecureRandom.uuid}" } it { expect { count }.to raise_error(Rdkafka::RdkafkaError, /unknown_topic_or_part/) } end context 'when topic exists' do - let(:topic) { SecureRandom.uuid } + let(:topic) { "it-#{SecureRandom.uuid}" } before { producer.produce_sync(topic: topic, payload: '') } @@ -291,7 +293,7 @@ def on_oauthbearer_token_refresh(_); end end it 'expect the error notifications to publish those errors' do - handler = producer.produce_async(topic: 'na', payload: 'data', label: 'test') + handler = producer.produce_async(topic: topic_name, payload: 'data', label: 'test') producer.purge handler.wait(raise_response_error: false) @@ -478,7 +480,7 @@ def on_oauthbearer_token_refresh(_); end let(:child_process) do fork do - producer.produce_sync(topic: 'test', payload: '1') + producer.produce_sync(topic: topic_name, payload: '1') producer.close end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8057ef38..6bffb0ba 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -2,6 +2,7 @@ require 'factory_bot' require 'ostruct' +require 'securerandom' coverage = !ENV.key?('GITHUB_WORKFLOW') coverage = true if ENV['GITHUB_COVERAGE'] == 'true' diff --git a/spec/support/factories/message.rb b/spec/support/factories/message.rb index c763c9a0..7b863df7 100644 --- a/spec/support/factories/message.rb +++ b/spec/support/factories/message.rb @@ -4,7 +4,7 @@ factory :valid_message, class: 'Hash' do skip_create - topic { rand.to_s } + topic { "it-#{SecureRandom.uuid}" } payload { rand.to_s } partition_key { nil } label { nil }