Skip to content

Commit

Permalink
prefix test topics (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
mensfeld authored Feb 13, 2025
1 parent c000493 commit 7e76cc8
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 94 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 2.8.2 (Unreleased)
- [Feature] Allow for tagging of producer instances similar to how consumers can be tagged.
- [Refactor] Ensure all test topics in the test suite start with "it-" prefix.

## 2.8.1 (2024-12-26)
- [Enhancement] Raise `WaterDrop::ProducerNotTransactionalError` when attempting to use transactions on a non-transactional producer.
Expand Down
36 changes: 20 additions & 16 deletions spec/lib/waterdrop/clients/buffered_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
RSpec.describe_current do
subject(:client) { described_class.new(producer) }

let(:topic_name) { "it-#{SecureRandom}" }

let(:producer) do
WaterDrop::Producer.new do |config|
config.deliver = false
Expand Down Expand Up @@ -85,41 +87,41 @@
context 'when running transaction with production of messages' do
it 'expect to add them to the buffers' do
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
end

expect(client.messages.size).to eq(5)
expect(client.messages_for('test').size).to eq(2)
expect(client.messages_for(topic_name).size).to eq(2)
end
end

context 'when running nested transaction with production of messages' do
it 'expect to add them to the buffers' do
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')

producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
end
end

expect(client.messages.size).to eq(7)
expect(client.messages_for('test').size).to eq(4)
expect(client.messages_for(topic_name).size).to eq(4)
end
end

context 'when running nested transaction with production of messages on abort' do
it 'expect to add them to the buffers' do
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')

producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')

raise WaterDrop::AbortTransaction
end
Expand All @@ -139,7 +141,7 @@

it 'expect not to contain messages from the aborted transaction' do
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')

raise WaterDrop::AbortTransaction
end
Expand Down Expand Up @@ -167,7 +169,7 @@
it 'expect not to contain messages from the aborted transaction' do
expect do
producer.transaction do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')

raise StandardError
end
Expand All @@ -192,7 +194,8 @@
end

context 'when we try to store offset without a transaction' do
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) }
let(:topic) { "it-#{SecureRandom.uuid}" }
let(:message) { OpenStruct.new(topic: topic, partition: 0, offset: 10) }

it 'expect to raise an error' do
expect { producer.transaction_mark_as_consumed(nil, message) }
Expand All @@ -201,8 +204,9 @@
end

context 'when trying to store offset with transaction' do
let(:topic) { "it-#{SecureRandom.uuid}" }
let(:consumer) { OpenStruct.new(consumer_group_metadata_pointer: nil) }
let(:message) { OpenStruct.new(topic: rand.to_s, partition: 0, offset: 10) }
let(:message) { OpenStruct.new(topic: topic, partition: 0, offset: 10) }

it do
expect do
Expand Down
4 changes: 3 additions & 1 deletion spec/lib/waterdrop/config_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
RSpec.describe_current do
subject(:config) { described_class.new }

let(:topic_name) { "it-#{SecureRandom.uuid}" }

describe '#setup' do
context 'when configuration has errors' do
let(:error_class) { ::WaterDrop::Errors::ConfigurationInvalidError }
Expand Down Expand Up @@ -31,7 +33,7 @@

it 'expect not to allow it' do
expect do
producer.produce_sync(topic: 'test', payload: 'test')
producer.produce_sync(topic: topic_name, payload: 'test')
end.to raise_error(Rdkafka::Config::ClientCreationError)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def initialize
producer
end

let(:topic) { "it-#{SecureRandom.uuid}" }

after { producer.close }

context 'when having some default tags present' do
Expand All @@ -61,7 +63,7 @@ def initialize
end

context 'when publishing, default tags should be included' do
before { producer.produce_sync(topic: rand.to_s, payload: rand.to_s) }
before { producer.produce_sync(topic: topic, payload: rand.to_s) }

it 'expect to have proper metrics data in place' do
published_tags = dummy_client.buffer[:increment]['waterdrop.produced_sync'][0][0][:tags]
Expand All @@ -87,7 +89,7 @@ def initialize
context 'when expecting emitted stats DD dispatch' do
# Just to trigger some stats
before do
producer.produce_sync(topic: rand.to_s, payload: rand.to_s)
producer.produce_sync(topic: topic, payload: rand.to_s)

# Give it some time to emit the stats
sleep(1)
Expand Down Expand Up @@ -129,12 +131,12 @@ def initialize

context 'when producing sync' do
before do
producer.produce_sync(topic: rand.to_s, payload: rand.to_s)
producer.produce_sync(topic: topic, payload: rand.to_s)

producer.produce_many_sync(
[
{ topic: rand.to_s, payload: rand.to_s },
{ topic: rand.to_s, payload: rand.to_s }
{ topic: topic, payload: rand.to_s },
{ topic: topic, payload: rand.to_s }
]
)
end
Expand All @@ -146,12 +148,12 @@ def initialize

context 'when producing async' do
before do
producer.produce_async(topic: rand.to_s, payload: rand.to_s)
producer.produce_async(topic: topic, payload: rand.to_s)

producer.produce_many_async(
[
{ topic: rand.to_s, payload: rand.to_s },
{ topic: rand.to_s, payload: rand.to_s }
{ topic: topic, payload: rand.to_s },
{ topic: topic, payload: rand.to_s }
]
)
end
Expand All @@ -163,12 +165,12 @@ def initialize

context 'when buffering' do
before do
producer.buffer(topic: rand.to_s, payload: rand.to_s)
producer.buffer(topic: topic, payload: rand.to_s)

producer.buffer_many(
[
{ topic: rand.to_s, payload: rand.to_s },
{ topic: rand.to_s, payload: rand.to_s }
{ topic: topic, payload: rand.to_s },
{ topic: topic, payload: rand.to_s }
]
)
end
Expand All @@ -180,7 +182,7 @@ def initialize

context 'when flushing sync' do
before do
producer.buffer(topic: rand.to_s, payload: rand.to_s)
producer.buffer(topic: topic, payload: rand.to_s)
producer.flush_sync
end

Expand All @@ -191,7 +193,7 @@ def initialize

context 'when flushing async' do
before do
producer.buffer(topic: rand.to_s, payload: rand.to_s)
producer.buffer(topic: topic, payload: rand.to_s)
producer.flush_async
end

Expand All @@ -202,7 +204,7 @@ def initialize

context 'when message is acknowledged' do
before do
producer.produce_sync(topic: rand.to_s, payload: rand.to_s)
producer.produce_sync(topic: topic, payload: rand.to_s)
# We need to give the async callback a bit of time to kick in
sleep(0.1)
end
Expand All @@ -227,7 +229,7 @@ def initialize
end

before do
producer.produce_async(topic: rand.to_s, payload: rand.to_s)
producer.produce_async(topic: topic, payload: rand.to_s)
sleep(1)
end

Expand Down Expand Up @@ -263,7 +265,7 @@ def initialize

context 'when trying to publish a topic level metric' do
before do
producer.produce_sync(topic: rand.to_s, payload: rand.to_s)
producer.produce_sync(topic: topic, payload: rand.to_s)
sleep(1)
listener.send(:report_metric, metric, statistics)
end
Expand Down
6 changes: 4 additions & 2 deletions spec/lib/waterdrop/producer/sync_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
RSpec.describe_current do
subject(:producer) { build(:producer) }

let(:topic_name) { "it-#{SecureRandom.uuid}" }

after { producer.close }

describe '#produce_sync' do
Expand Down Expand Up @@ -85,9 +87,9 @@
describe '#produce_sync with partition key' do
subject(:delivery) { producer.produce_sync(message) }

let(:message) { build(:valid_message, partition_key: rand.to_s, topic: 'example_topic') }
let(:message) { build(:valid_message, partition_key: rand.to_s, topic: topic_name) }

before { producer.produce_sync(topic: 'example_topic', payload: '1') }
before { producer.produce_sync(topic: topic_name, payload: '1') }

it { expect(delivery).to be_a(Rdkafka::Producer::DeliveryReport) }
end
Expand Down
Loading

0 comments on commit 7e76cc8

Please sign in to comment.