Skip to content

Commit

Permalink
Allow workers to be configured per consumer
Browse files Browse the repository at this point in the history
The same application may have differing requirements in terms of load,
so it doesn't make that much sense to have a global configuration for
this
  • Loading branch information
Alex Blair committed Mar 17, 2021
1 parent 633114a commit c245b9a
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
2 changes: 1 addition & 1 deletion lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def self.instrumenter
def self.run(processor)
runner = Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter)

if config.parallel_workers > 1
if config.parallel_workers && config.parallel_workers > 1
ParallelRunner.new(runner: runner, config: config, logger: logger).run
else
runner.run
Expand Down
4 changes: 2 additions & 2 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,10 @@ class Config < KingKonf::Config
If this is not set to greater than 1, no extra processes will be generated"
integer :parallel_workers, default: 1


# The error handler must be set directly on the object.
attr_reader :error_handler

attr_accessor :subscriptions, :logger
attr_accessor :subscriptions, :logger, :parallel_workers

def statistics_interval_ms
if Rdkafka::Config.statistics_callback
Expand Down Expand Up @@ -225,6 +224,7 @@ def load_consumer_class(consumer_class)
consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase,
].compact.join

self.parallel_workers = consumer_class.parallel_workers
self.subscriptions = consumer_class.subscriptions
self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time
self.pidfile ||= "#{group_id}.pid"
Expand Down
12 changes: 10 additions & 2 deletions lib/racecar/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Consumer
class << self
attr_accessor :max_wait_time
attr_accessor :group_id
attr_accessor :producer, :consumer
attr_accessor :producer, :consumer, :parallel_workers

def subscriptions
@subscriptions ||= []
Expand All @@ -27,7 +27,15 @@ def subscriptions
# @param additional_config [Hash] Configuration properties for consumer.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
# @return [nil]
def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {})
def subscribes_to(
*topics,
start_from_beginning: true,
max_bytes_per_partition: 1048576,
additional_config: {},
parallel_workers: nil
)
self.parallel_workers = parallel_workers

topics.each do |topic|
subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config)
end
Expand Down
14 changes: 6 additions & 8 deletions spec/integration/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class NoProcessConsumer < Racecar::Consumer
RSpec.describe "running a Racecar consumer", type: :integration do
context "when an error occurs trying to start the runner" do
context "when there are no subscriptions, and no parallelism" do
before { Racecar.configure { |c| c.parallel_workers = 1 } }
before { NoSubsConsumer.parallel_workers = nil }

it "raises an exception" do
expect do
Expand All @@ -25,7 +25,7 @@ class NoProcessConsumer < Racecar::Consumer
end

context "when there are no subscriptions, and parallelism" do
before { Racecar.configure { |c| c.parallel_workers = 3 } }
before { NoSubsConsumer.parallel_workers = 3 }

it "raises an exception" do
expect do
Expand All @@ -35,7 +35,7 @@ class NoProcessConsumer < Racecar::Consumer
end

context "when there is no process method, and no parallelism" do
before { Racecar.configure { |c| c.parallel_workers = 1 } }
before { NoProcessConsumer.parallel_workers = nil }

it "raises an exception" do
expect do
Expand All @@ -45,7 +45,7 @@ class NoProcessConsumer < Racecar::Consumer
end

context "when there is no process method, and parallelism" do
before { Racecar.configure { |c| c.parallel_workers = 3 } }
before { NoSubsConsumer.parallel_workers = 3 }

it "raises an exception" do
expect do
Expand All @@ -72,11 +72,9 @@ def process(message)
end

before do
Racecar.configure { |c| c.parallel_workers = parallelism }

create_topic(topic: input_topic, partitions: topic_partitions)

consumer_class.subscribes_to(input_topic)
consumer_class.subscribes_to(input_topic, parallel_workers: parallelism)
consumer_class.output_topic = output_topic

publish_messages!(input_topic, input_messages)
Expand All @@ -94,7 +92,7 @@ class EchoConsumer1 < mock_echo_consumer_class

let(:input_messages) { [{ payload: "hello", key: "greetings", partition: nil }] }
let(:topic_partitions) { 1 }
let(:parallelism) { 1 }
let(:parallelism) { nil }

it "can consume and publish a message" do
in_background(cleanup_callback: -> { Process.kill("INT", Process.pid) }) do
Expand Down

0 comments on commit c245b9a

Please sign in to comment.