From c245b9a69d479e098a1df3ed88a569346a12cc0a Mon Sep 17 00:00:00 2001 From: Alex Blair Date: Fri, 5 Mar 2021 18:49:16 +0000 Subject: [PATCH] Allow workers to be configured per consumer The same application may have differing requirements in terms of load, so it doesn't make that much sense to have a global configuration for this --- lib/racecar.rb | 2 +- lib/racecar/config.rb | 4 ++-- lib/racecar/consumer.rb | 12 ++++++++++-- spec/integration/consumer_spec.rb | 14 ++++++-------- 4 files changed, 19 insertions(+), 13 deletions(-) diff --git a/lib/racecar.rb b/lib/racecar.rb index 7e663df9..f9291a46 100644 --- a/lib/racecar.rb +++ b/lib/racecar.rb @@ -54,7 +54,7 @@ def self.instrumenter def self.run(processor) runner = Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter) - if config.parallel_workers > 1 + if config.parallel_workers && config.parallel_workers > 1 ParallelRunner.new(runner: runner, config: config, logger: logger).run else runner.run diff --git a/lib/racecar/config.rb b/lib/racecar/config.rb index f78e0804..0c644eee 100644 --- a/lib/racecar/config.rb +++ b/lib/racecar/config.rb @@ -168,11 +168,10 @@ class Config < KingKonf::Config If this is not set to greater than 1, no extra processes will be generated" integer :parallel_workers, default: 1 - # The error handler must be set directly on the object. attr_reader :error_handler - attr_accessor :subscriptions, :logger + attr_accessor :subscriptions, :logger, :parallel_workers def statistics_interval_ms if Rdkafka::Config.statistics_callback @@ -225,6 +224,7 @@ def load_consumer_class(consumer_class) consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase, ].compact.join + self.parallel_workers = consumer_class.parallel_workers self.subscriptions = consumer_class.subscriptions self.max_wait_time = consumer_class.max_wait_time || self.max_wait_time self.pidfile ||= "#{group_id}.pid" diff --git a/lib/racecar/consumer.rb b/lib/racecar/consumer.rb index 11e0cacb..187fe2c9 100644 --- a/lib/racecar/consumer.rb +++ b/lib/racecar/consumer.rb @@ -9,7 +9,7 @@ class Consumer class << self attr_accessor :max_wait_time attr_accessor :group_id - attr_accessor :producer, :consumer + attr_accessor :producer, :consumer, :parallel_workers def subscriptions @subscriptions ||= [] @@ -27,7 +27,15 @@ def subscriptions # @param additional_config [Hash] Configuration properties for consumer. # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md # @return [nil] - def subscribes_to(*topics, start_from_beginning: true, max_bytes_per_partition: 1048576, additional_config: {}) + def subscribes_to( + *topics, + start_from_beginning: true, + max_bytes_per_partition: 1048576, + additional_config: {}, + parallel_workers: nil + ) + self.parallel_workers = parallel_workers + topics.each do |topic| subscriptions << Subscription.new(topic, start_from_beginning, max_bytes_per_partition, additional_config) end diff --git a/spec/integration/consumer_spec.rb b/spec/integration/consumer_spec.rb index 8c638b37..27fd78ea 100644 --- a/spec/integration/consumer_spec.rb +++ b/spec/integration/consumer_spec.rb @@ -15,7 +15,7 @@ class NoProcessConsumer < Racecar::Consumer RSpec.describe "running a Racecar consumer", type: :integration do context "when an error occurs trying to start the runner" do context "when there are no subscriptions, and no parallelism" do - before { Racecar.configure { |c| c.parallel_workers = 1 } } + before { NoSubsConsumer.parallel_workers = nil } it "raises an exception" do expect do @@ -25,7 +25,7 @@ class NoProcessConsumer < Racecar::Consumer end context "when there are no subscriptions, and parallelism" do - before { Racecar.configure { |c| c.parallel_workers = 3 } } + before { NoSubsConsumer.parallel_workers = 3 } it "raises an exception" do expect do @@ -35,7 +35,7 @@ class NoProcessConsumer < Racecar::Consumer end context "when there is no process method, and no parallelism" do - before { Racecar.configure { |c| c.parallel_workers = 1 } } + before { NoProcessConsumer.parallel_workers = nil } it "raises an exception" do expect do @@ -45,7 +45,7 @@ class NoProcessConsumer < Racecar::Consumer end context "when there is no process method, and parallelism" do - before { Racecar.configure { |c| c.parallel_workers = 3 } } + before { NoSubsConsumer.parallel_workers = 3 } it "raises an exception" do expect do @@ -72,11 +72,9 @@ def process(message) end before do - Racecar.configure { |c| c.parallel_workers = parallelism } - create_topic(topic: input_topic, partitions: topic_partitions) - consumer_class.subscribes_to(input_topic) + consumer_class.subscribes_to(input_topic, parallel_workers: parallelism) consumer_class.output_topic = output_topic publish_messages!(input_topic, input_messages) @@ -94,7 +92,7 @@ class EchoConsumer1 < mock_echo_consumer_class let(:input_messages) { [{ payload: "hello", key: "greetings", partition: nil }] } let(:topic_partitions) { 1 } - let(:parallelism) { 1 } + let(:parallelism) { nil } it "can consume and publish a message" do in_background(cleanup_callback: -> { Process.kill("INT", Process.pid) }) do