diff --git a/Gemfile.lock b/Gemfile.lock index d5252ea3..c158eee7 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,8 @@ PATH remote: . specs: - racecar (2.1.1) + racecar (2.2.0) + concurrent-ruby (~> 1.0) king_konf (~> 1.0.0) rdkafka (~> 0.8.0) @@ -18,7 +19,7 @@ GEM concurrent-ruby (1.1.7) diff-lcs (1.4.4) dogstatsd-ruby (4.8.2) - ffi (1.13.1) + ffi (1.14.2) i18n (1.8.5) concurrent-ruby (~> 1.0) king_konf (1.0.0) diff --git a/docker-compose.yml b/docker-compose.yml index 7dd73774..1545f78f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -30,3 +30,4 @@ services: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_JMX_PORT: 9101 + KAFKA_DELETE_TOPIC_ENABLE: 'true' diff --git a/lib/racecar.rb b/lib/racecar.rb index ea7bfba2..0b262a3c 100644 --- a/lib/racecar.rb +++ b/lib/racecar.rb @@ -7,6 +7,7 @@ require "racecar/consumer" require "racecar/consumer_set" require "racecar/runner" +require "racecar/concurrent_runner" require "racecar/config" require "racecar/version" require "ensure_hash_compact" @@ -53,4 +54,13 @@ def self.instrumenter def self.run(processor) Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter).run end + + def self.run_concurrent(consumer_class) + ConcurrentRunner.new( + consumer_class: consumer_class, + config: config, + logger: logger, + instrumenter: instrumenter + ).run + end end diff --git a/lib/racecar/cli.rb b/lib/racecar/cli.rb index fe4ac8d7..74911f62 100644 --- a/lib/racecar/cli.rb +++ b/lib/racecar/cli.rb @@ -58,9 +58,13 @@ def run $stderr.puts "=> Ctrl-C to shutdown consumer" end - processor = consumer_class.new - - Racecar.run(processor) + if config.max_concurrency > 1 + Racecar.run_concurrent(consumer_class) + else + processor = consumer_class.new + Racecar.run(processor) + end + nil end private diff --git a/lib/racecar/concurrent_runner.rb b/lib/racecar/concurrent_runner.rb new file mode 100644 index 00000000..9999a0c0 --- /dev/null +++ b/lib/racecar/concurrent_runner.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +require "concurrent-ruby" +require "racecar/runner" +require "racecar/signals" + +module Racecar + class ConcurrentRunner + def initialize(consumer_class:, config:, logger:, instrumenter:) + @consumer_class = consumer_class + @config = config + @logger = logger + @instrumenter = instrumenter + end + + def run + puts "=> Running with max concurrency of #{config.max_concurrency}" + + config.max_concurrency.times do + runner = Racecar::Runner.new( + consumer_class.new, + config: config, + logger: logger, + instrumenter: instrumenter, + disable_signal_handlers: true + ) + consumers << runner + + schedule(runner) + end + + trap("USR1") { $stderr.puts config.inspect } + + Signals.setup_shutdown_handlers + self.signals_ready = true + Signals.wait_for_shutdown { stop } + end + + def stop + consumers.each(&:stop) + pool.shutdown + pool.wait_for_termination + logger.info "All consumers shut down" + raise exception if exception + end + + private + + attr_accessor :signals_ready, :exception + attr_reader :consumer_class, :config, :logger, :instrumenter + + def schedule(runner) + pool.post do + until signals_ready; end + begin + runner.run + rescue Exception => e + # Store exception to be reraised after graceful shutdown + self.exception = e + # Ensure that any uncaught exceptions cause a crash + Process.kill("INT", 0) + end + end + end + + def consumers + @consumers ||= [] + end + + def pool + @pool ||= Concurrent::FixedThreadPool.new(config.max_concurrency) + end + end +end diff --git a/lib/racecar/config.rb b/lib/racecar/config.rb index 17a41d18..cb5f2f60 100644 --- a/lib/racecar/config.rb +++ b/lib/racecar/config.rb @@ -153,6 +153,9 @@ class Config < KingKonf::Config desc "Whether to boot Rails when starting the consumer" boolean :without_rails, default: false + desc "Maximum number of threads to run the application with. Each will spawn its own consumer" + integer :max_concurrency, default: 1 + # The error handler must be set directly on the object. attr_reader :error_handler diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index d918d38a..cdd52373 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -8,9 +8,10 @@ module Racecar class Runner attr_reader :processor, :config, :logger - def initialize(processor, config:, logger:, instrumenter: NullInstrumenter) + def initialize(processor, config:, logger:, instrumenter: NullInstrumenter, disable_signal_handlers: false) @processor, @config, @logger = processor, config, logger @instrumenter = instrumenter + @disable_signal_handlers = disable_signal_handlers @stop_requested = false Rdkafka::Config.logger = logger @@ -44,7 +45,7 @@ def setup_pauses end def run - install_signal_handlers + install_signal_handlers unless disable_signal_handlers @stop_requested = false # Configure the consumer with a producer so it can produce messages and @@ -93,7 +94,7 @@ def stop private - attr_reader :pauses + attr_reader :pauses, :disable_signal_handlers def process_method @process_method ||= begin diff --git a/lib/racecar/signals.rb b/lib/racecar/signals.rb new file mode 100644 index 00000000..6c82a1d9 --- /dev/null +++ b/lib/racecar/signals.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +module Racecar + module Signals + module_function + + SHUTDOWN_SIGNALS = ["INT", "QUIT", "TERM"] + + def setup_shutdown_handlers + # We cannot run the graceful shutdown from a trap context, given that each + # consumer is running in a thread. Instead, we're setting up a pipe so + # that the shutdown will be initiated outside of that trap context. + @shutdown_reader, writer = IO.pipe + + SHUTDOWN_SIGNALS.each do |signal| + trap(signal) { writer.puts signal } + end + end + + def wait_for_shutdown(&callback) + # The below will block until we receive one of the signals we are listening for. + # Any subsequent repeats of those signals are written, but ignored, since shutdown + # is already initiated. If we execute `reader.gets.chomp`, this would give us access + # to the signal received, in case we ever need to respond based off the signal + # + # This could also be extended if we want multiple signals to trigger an ungraceful + # shutdown + IO.select([@shutdown_reader]) + + callback.call + end + end +end diff --git a/racecar.gemspec b/racecar.gemspec index 83b483fa..3399af5d 100644 --- a/racecar.gemspec +++ b/racecar.gemspec @@ -22,6 +22,7 @@ Gem::Specification.new do |spec| spec.add_runtime_dependency "king_konf", "~> 1.0.0" spec.add_runtime_dependency "rdkafka", "~> 0.8.0" + spec.add_runtime_dependency "concurrent-ruby", '~> 1.0' spec.add_development_dependency "bundler", [">= 1.13", "< 3"] spec.add_development_dependency "pry" diff --git a/spec/integration/consumer_spec.rb b/spec/integration/consumer_spec.rb index 17e44d07..2dbd786d 100644 --- a/spec/integration/consumer_spec.rb +++ b/spec/integration/consumer_spec.rb @@ -4,63 +4,190 @@ require "racecar/cli" require "racecar/ctl" -class EchoConsumer < Racecar::Consumer - subscribes_to "input" - - self.group_id = "test-consumer-#{SecureRandom.hex(8)}" +class NoSubsConsumer < Racecar::Consumer + def process(message); end +end - def process(message) - produce message.value, key: message.key, topic: "output" - end +class NoProcessConsumer < Racecar::Consumer + subscribes_to 'some-topic' end -module IntegrationSupport - INCOMING_MESSAGES = [] +RSpec.describe "running a Racecar consumer", type: :integration do + context "when an error occurs trying to start the runner" do + context "when there are no subscriptions, and no concurrency" do + it 'raises an exception' do + expect do + Racecar::Cli.new(["NoSubsConsumer"]).run + end.to raise_error(ArgumentError) + end + end - CONSUMER = Thread.new do - consumer = Rdkafka::Config.new({ - "bootstrap.servers": Racecar.config.brokers.join(","), - "client.id": Racecar.config.client_id, - "group.id": "racecar-tests", - }.merge(Racecar.config.rdkafka_consumer)).consumer + context "when there are no subscriptions, and concurrency" do + before { Racecar.configure { |c| c.max_concurrency = 3 } } - consumer.subscribe("output") + it 'raises an exception' do + expect do + Racecar::Cli.new(["NoSubsConsumer"]).run + end.to raise_error(ArgumentError) + end + end - consumer.each do |message| - puts "Received message #{message}" - INCOMING_MESSAGES << message + context "when there is no process method, and no concurrency" do + it 'raises an exception' do + expect do + Racecar::Cli.new(["NoProcessConsumer"]).run + end.to raise_error(NotImplementedError) + end end - end - def self.incoming_messages - INCOMING_MESSAGES + context "when there is no process method, and concurrency" do + before { Racecar.configure { |c| c.max_concurrency = 3 } } + + it 'raises an exception' do + expect do + Racecar::Cli.new(["NoProcessConsumer"]).run + end.to raise_error(NotImplementedError) + end + end end -end -RSpec.describe "running a Racecar consumer" do - it "works" do - worker = Thread.new do - cli = Racecar::Cli.new(["EchoConsumer"]) - cli.run + context "when the runner starts successfully" do + let(:input_topic) { generate_input_topic_name } + let(:output_topic) { generate_output_topic_name } + let(:mock_echo_consumer_class) do + Class.new(Racecar::Consumer) do + class << self + attr_accessor :output_topic + end + + def process(message) + produce message.value, key: message.key, topic: self.class.output_topic + deliver! + end + end + end + + let(:run_in_background!) do + Thread.new do + Thread.current.abort_on_exception = true + Racecar::Cli.new([consumer_class.name.to_s]).run + end end - ctl = Racecar::Ctl.main %w( - produce -t input -v hello -k greetings - ) + before do + Racecar.configure { |c| c.max_concurrency = concurrency } - message = nil - attempt = 1 + create_topic(topic: input_topic, partitions: topic_partitions) - while message.nil? && attempt <= 10 - puts "Waiting for message..." - sleep 2 ** attempt - message = IntegrationSupport.incoming_messages.last - attempt += 1 + consumer_class.subscribes_to(input_topic) + consumer_class.output_topic = output_topic + + rdkafka_consumer.subscribe(output_topic) + + publish_messages!(input_topic, input_messages) + + run_in_background! end - expect(message).not_to be_nil - expect(message.topic).to eq "output" - expect(message.payload).to eq "hello" - expect(message.key).to eq "greetings" + after { Process.kill('INT', 0) } + + after(:all) { delete_all_test_topics } + + context "for a single threaded consumer" do + let(:consumer_class) do + class EchoConsumer1 < mock_echo_consumer_class + self.group_id = "echo-consumer-1" + end + EchoConsumer1 + end + let(:input_messages) { [{ payload: 'hello', key: 'greetings', partition: nil }] } + let(:topic_partitions) { 1 } + let(:concurrency) { 1 } + + it "works" do + wait_for_messages(topic: input_topic, expected_message_count: 1) + + message = incoming_messages.first + + expect(message).not_to be_nil + expect(message.topic).to eq output_topic + expect(message.payload).to eq "hello" + expect(message.key).to eq "greetings" + end + end + + context "when running concurrent consumers" do + let(:input_messages) do + [ + { payload: 'message-0', partition: 0, key: 'a' }, + { payload: 'message-1', partition: 1, key: 'a' }, + { payload: 'message-2', partition: 2, key: 'a' }, + { payload: 'message-3', partition: 3, key: 'a' }, + { payload: 'message-4', partition: 4, key: 'a' }, + { payload: 'message-5', partition: 5, key: 'a' } + ] + end + + context "when partitions exceed concurrency" do + let(:topic_partitions) { 6 } + let(:concurrency) { 3 } + let(:consumer_class) do + class EchoConsumer2 < mock_echo_consumer_class + self.group_id = "echo-consumer-2" + end + EchoConsumer2 + end + + it "assigns partitions to all concurrent consumers" do + wait_for_assignments( + group_id: "echo-consumer-2", + topic: input_topic, + expected_members_count: concurrency + ) + + wait_for_messages(topic: input_topic, expected_message_count: input_messages.count) + + expect(incoming_messages.map(&:topic).uniq).to eq([output_topic]) + expect(incoming_messages.map(&:payload)) + .to match_array(input_messages.map { |m| m[:payload] }) + end + end + + context "when the concurrency exceeds the number of partitions" do + let(:consumer_class) do + class EchoConsumer3 < mock_echo_consumer_class + self.group_id = "echo-consumer-3" + end + EchoConsumer3 + end + let(:topic_partitions) { 3 } + let(:concurrency) { 5 } + let(:input_messages) do + [ + { payload: 'message-0', partition: 0, key: 'a' }, + { payload: 'message-1', partition: 0, key: 'a' }, + { payload: 'message-2', partition: 1, key: 'a' }, + { payload: 'message-3', partition: 1, key: 'a' }, + { payload: 'message-4', partition: 2, key: 'a' }, + { payload: 'message-5', partition: 2, key: 'a' } + ] + end + + it "assigns all the consumers that it can, up to the total number of partitions" do + wait_for_assignments( + group_id: "echo-consumer-3", + topic: input_topic, + expected_members_count: topic_partitions + ) + + wait_for_messages(topic: input_topic, expected_message_count: input_messages.count) + + expect(incoming_messages.count).to eq(6) + expect(incoming_messages.map(&:topic).uniq).to eq([output_topic]) + expect(incoming_messages.map(&:payload)) + .to match_array(input_messages.map { |m| m[:payload] }) + end + end + end end end diff --git a/spec/signals_spec.rb b/spec/signals_spec.rb new file mode 100644 index 00000000..f2ffbd51 --- /dev/null +++ b/spec/signals_spec.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +require "spec_helper" + +RSpec.describe Racecar::Signals do + describe ".wait_for_shutdown" do + before { described_class.setup_shutdown_handlers } + + let(:callback_mock) { double(:callback, cleanup: nil) } + + it "calls the callback after a signal is received" do + Thread.new do + sleep 0.2 + Process.kill(described_class::SHUTDOWN_SIGNALS.sample, 0) + end + + expect(callback_mock).to receive(:cleanup).once + + described_class.wait_for_shutdown { callback_mock.cleanup } + end + + context "when signal is sent more than once" do + it "only calls the callback once" do + Thread.new do + sleep 0.2 + Process.kill(described_class::SHUTDOWN_SIGNALS.sample, 0) + Process.kill(described_class::SHUTDOWN_SIGNALS.sample, 0) + Process.kill(described_class::SHUTDOWN_SIGNALS.sample, 0) + Process.kill(described_class::SHUTDOWN_SIGNALS.sample, 0) + end + + described_class.wait_for_shutdown { callback_mock.cleanup } + + expect(callback_mock).to have_received(:cleanup).exactly(1).time + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b57b70d3..6d539039 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,8 +4,11 @@ require "racecar" require "timecop" require_relative 'support/mock_env' +require_relative 'support/integration_helper' +Thread.abort_on_exception = true RSpec.configure do |config| config.disable_monkey_patching! config.include MockEnv + config.include IntegrationHelper, type: :integration end diff --git a/spec/support/integration_helper.rb b/spec/support/integration_helper.rb new file mode 100644 index 00000000..ed6b8940 --- /dev/null +++ b/spec/support/integration_helper.rb @@ -0,0 +1,136 @@ +# frozen_string_literal: true + +require "open3" +require "concurrent-ruby" + +module IntegrationHelper + def self.included(klass) + klass.instance_eval do + let!(:polling_thread) do + Thread.new do + Thread.current.abort_on_exception = true + rdkafka_consumer.each do |message| + puts "Received message #{message}" + incoming_messages << message + end + end + end + + let(:rdkafka_consumer) { rdkafka_config.consumer } + let(:rdkafka_producer) { rdkafka_config.producer } + + let(:rdkafka_config) do + Rdkafka::Config.new({ + "bootstrap.servers": kafka_brokers, + "client.id": Racecar.config.client_id, + "group.id": "racecar-tests" + }.merge(Racecar.config.rdkafka_consumer)) + end + + after do + polling_thread.kill + incoming_messages.clear + rdkafka_consumer.unsubscribe + rdkafka_producer.close + end + end + end + + def publish_messages!(topic, messages) + messages.map do |m| + rdkafka_producer.produce( + topic: topic, + key: m.fetch(:key), + payload: m.fetch(:payload), + partition: m.fetch(:partition) + ) + end.each(&:wait) + + puts "Published messages to topic: #{topic}; messages: #{messages}" + end + + def create_topic(topic:, partitions: 1) + puts "Creating topic #{topic}" + msg, process = Open3.capture2e("kafka-topics --bootstrap-server #{kafka_brokers} --create "\ + "--topic #{topic} --partitions #{partitions} --replication-factor 1") + return if process.exitstatus.zero? + + puts "Kafka topic creation exited with status #{process.exitstatus}, message: #{msg}" + end + + def wait_for_assignments(group_id:, topic:, expected_members_count:) + rebalance_attempt = 1 + + until members_count(group_id, topic) == expected_members_count + puts "Waiting for rebalance..." + sleep 2 * rebalance_attempt + raise "Did not rebalance in time" if rebalance_attempt > 5 + rebalance_attempt += 1 + end + end + + def wait_for_messages(topic:, expected_message_count:) + attempt = 1 + + until incoming_messages.count == expected_message_count + puts "Waiting for message..." + sleep 2 * attempt + raise "Did not receive messages in time" if attempt > 5 + attempt += 1 + end + end + + def generate_input_topic_name + "#{input_topic_prefix}-#{SecureRandom.hex(8)}" + end + + def generate_output_topic_name + "#{output_topic_prefix}-#{SecureRandom.hex(8)}" + end + + def delete_all_test_topics + message, process = Open3.capture2e( + "kafka-topics --bootstrap-server localhost:9092 --delete --topic '#{input_topic_prefix}-.*'" + ) + puts "Input topics deletion exited with status #{process.exitstatus}, message: #{message}" + + message, process = Open3.capture2e( + "kafka-topics --bootstrap-server localhost:9092 --delete --topic '#{output_topic_prefix}-.*'" + ) + puts "Output topics deletion exited with status #{process.exitstatus}, message: #{message}" + end + + def incoming_messages + @incoming_messages ||= Concurrent::Array.new + end + + private + + def kafka_brokers + Racecar.config.brokers.join(",") + end + + def input_topic_prefix + "input-test-topic" + end + + def output_topic_prefix + "output-test-topic" + end + + def members_count(group_id, topic) + consumer_group_partitions_and_member_ids(group_id, topic).uniq { |data| data.fetch(:member_id) }.count + end + + def consumer_group_partitions_and_member_ids(group_id, topic) + message, process = Open3.capture2e( + "kafka-consumer-groups --bootstrap-server #{kafka_brokers} --describe --group #{group_id}" + ) + unless process.exitstatus.zero? + raise "Kafka consumer group inspection exited with status #{process.exitstatus}, message: #{message}" + end + + message.scan(/^#{topic}\ +(?\d+)\ +\S+\ +\S+\ +\S+\ +(?\S+)\ /) + .map { |partition, member_id| { partition: partition, member_id: member_id } } + end +end