Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test out processing different partitions concurrently #221

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
PATH
remote: .
specs:
racecar (2.1.1)
racecar (2.2.0)
concurrent-ruby (~> 1.0)
king_konf (~> 1.0.0)
rdkafka (~> 0.8.0)

Expand All @@ -18,7 +19,7 @@ GEM
concurrent-ruby (1.1.7)
diff-lcs (1.4.4)
dogstatsd-ruby (4.8.2)
ffi (1.13.1)
ffi (1.14.2)
i18n (1.8.5)
concurrent-ruby (~> 1.0)
king_konf (1.0.0)
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ services:
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_DELETE_TOPIC_ENABLE: 'true'
10 changes: 10 additions & 0 deletions lib/racecar.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require "racecar/consumer"
require "racecar/consumer_set"
require "racecar/runner"
require "racecar/concurrent_runner"
require "racecar/config"
require "racecar/version"
require "ensure_hash_compact"
Expand Down Expand Up @@ -53,4 +54,13 @@ def self.instrumenter
def self.run(processor)
Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter).run
end

def self.run_concurrent(consumer_class)
ConcurrentRunner.new(
consumer_class: consumer_class,
config: config,
logger: logger,
instrumenter: instrumenter
).run
end
end
10 changes: 7 additions & 3 deletions lib/racecar/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ def run
$stderr.puts "=> Ctrl-C to shutdown consumer"
end

processor = consumer_class.new

Racecar.run(processor)
if config.max_concurrency > 1
Racecar.run_concurrent(consumer_class)
else
processor = consumer_class.new
Racecar.run(processor)
end
nil
end

private
Expand Down
74 changes: 74 additions & 0 deletions lib/racecar/concurrent_runner.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# frozen_string_literal: true

require "concurrent-ruby"
require "racecar/runner"
require "racecar/signals"

module Racecar
class ConcurrentRunner
def initialize(consumer_class:, config:, logger:, instrumenter:)
@consumer_class = consumer_class
@config = config
@logger = logger
@instrumenter = instrumenter
end

def run
puts "=> Running with max concurrency of #{config.max_concurrency}"

config.max_concurrency.times do
runner = Racecar::Runner.new(
consumer_class.new,
config: config,
logger: logger,
instrumenter: instrumenter,
disable_signal_handlers: true
)
consumers << runner

schedule(runner)
end

trap("USR1") { $stderr.puts config.inspect }

Signals.setup_shutdown_handlers
self.signals_ready = true
Signals.wait_for_shutdown { stop }
end

def stop
consumers.each(&:stop)
pool.shutdown
pool.wait_for_termination
logger.info "All consumers shut down"
raise exception if exception
end

private

attr_accessor :signals_ready, :exception
attr_reader :consumer_class, :config, :logger, :instrumenter

def schedule(runner)
pool.post do
until signals_ready; end
begin
runner.run
rescue Exception => e
# Store exception to be reraised after graceful shutdown
self.exception = e
# Ensure that any uncaught exceptions cause a crash
Process.kill("INT", 0)
end
end
end

def consumers
@consumers ||= []
end

def pool
@pool ||= Concurrent::FixedThreadPool.new(config.max_concurrency)
end
end
end
3 changes: 3 additions & 0 deletions lib/racecar/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ class Config < KingKonf::Config
desc "Whether to boot Rails when starting the consumer"
boolean :without_rails, default: false

desc "Maximum number of threads to run the application with. Each will spawn its own consumer"
integer :max_concurrency, default: 1

# The error handler must be set directly on the object.
attr_reader :error_handler

Expand Down
7 changes: 4 additions & 3 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ module Racecar
class Runner
attr_reader :processor, :config, :logger

def initialize(processor, config:, logger:, instrumenter: NullInstrumenter)
def initialize(processor, config:, logger:, instrumenter: NullInstrumenter, disable_signal_handlers: false)
@processor, @config, @logger = processor, config, logger
@instrumenter = instrumenter
@disable_signal_handlers = disable_signal_handlers
@stop_requested = false
Rdkafka::Config.logger = logger

Expand Down Expand Up @@ -44,7 +45,7 @@ def setup_pauses
end

def run
install_signal_handlers
install_signal_handlers unless disable_signal_handlers
@stop_requested = false

# Configure the consumer with a producer so it can produce messages and
Expand Down Expand Up @@ -93,7 +94,7 @@ def stop

private

attr_reader :pauses
attr_reader :pauses, :disable_signal_handlers

def process_method
@process_method ||= begin
Expand Down
33 changes: 33 additions & 0 deletions lib/racecar/signals.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true

module Racecar
module Signals
module_function

SHUTDOWN_SIGNALS = ["INT", "QUIT", "TERM"]

def setup_shutdown_handlers
# We cannot run the graceful shutdown from a trap context, given that each
# consumer is running in a thread. Instead, we're setting up a pipe so
# that the shutdown will be initiated outside of that trap context.
@shutdown_reader, writer = IO.pipe

SHUTDOWN_SIGNALS.each do |signal|
trap(signal) { writer.puts signal }
end
end

def wait_for_shutdown(&callback)
# The below will block until we receive one of the signals we are listening for.
# Any subsequent repeats of those signals are written, but ignored, since shutdown
# is already initiated. If we execute `reader.gets.chomp`, this would give us access
# to the signal received, in case we ever need to respond based off the signal
#
# This could also be extended if we want multiple signals to trigger an ungraceful
# shutdown
IO.select([@shutdown_reader])

callback.call
end
end
end
1 change: 1 addition & 0 deletions racecar.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Gem::Specification.new do |spec|

spec.add_runtime_dependency "king_konf", "~> 1.0.0"
spec.add_runtime_dependency "rdkafka", "~> 0.8.0"
spec.add_runtime_dependency "concurrent-ruby", '~> 1.0'

spec.add_development_dependency "bundler", [">= 1.13", "< 3"]
spec.add_development_dependency "pry"
Expand Down
Loading