From 1488db50f6147fc8009dce807a45f05b80908a6c Mon Sep 17 00:00:00 2001 From: Alex Blair Date: Thu, 25 Feb 2021 11:02:33 +0000 Subject: [PATCH] Increase logging to investigate hanging tests --- lib/racecar/concurrent_runner.rb | 57 +++++++++++++++++++++++--------- lib/racecar/runner.rb | 10 ++++-- 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/lib/racecar/concurrent_runner.rb b/lib/racecar/concurrent_runner.rb index c3db659d..479493ff 100644 --- a/lib/racecar/concurrent_runner.rb +++ b/lib/racecar/concurrent_runner.rb @@ -15,20 +15,20 @@ def initialize(processor:, config:, logger:, instrumenter:) end def run - $stderr.puts "=> Running with max concurrency of #{config.max_concurrency}" + logger.info "=> Running with max concurrency of #{config.max_concurrency}" - readers = config.max_concurrency.times.map do + @readers = config.max_concurrency.times.map do pid, reader = run_worker @process_ids << pid - $stderr.puts "=> Forked new Racecar consumer with process id #{pid}" + logger.info "=> Forked new Racecar consumer with process id #{pid}" reader end install_signal_handlers - wait_for_exit(readers) + wait_for_exit end def stop @@ -36,11 +36,16 @@ def stop @shutdown_initiated = true - $stderr.puts "=> Terminating child processes" + logger.info "=> Terminating child processes" Process.kill("TERM", *@process_ids) - @process_ids.each { |pid| Process.waitpid(pid) } + @process_ids.each do |pid| + logger.info "Waiting for child with pid #{pid}" + Process.waitpid(pid) + end + + logger.info "All children shutdown, exiting" raise exception if exception end @@ -54,7 +59,12 @@ def install_signal_handlers # Print the consumer config to STDERR on USR1. trap("USR1") { $stderr.puts config.inspect } - SHUTDOWN_SIGNALS.each { |signal| trap(signal) { stop } } + SHUTDOWN_SIGNALS.each do |signal| + trap(signal) do + $stderr.puts "Received signal in parent: #{signal}, stopping" + stop + end + end end def run_worker @@ -66,8 +76,12 @@ def run_worker Racecar::Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter).run rescue Exception => e + logger.info "Caught error from #{Process.pid}: #{e.class}: #{e.message}" # Allow the parent process to re-raise the exception after shutdown child_writer.write(Marshal.dump(e)) + logger.info "Wrote to pipe from child #{Process.pid}" + ensure + child_writer.close end end @@ -76,7 +90,7 @@ def run_worker [pid, parent_reader] end - def wait_for_exit(readers) + def wait_for_exit # The call to IO.select blocks until one of our readers is ready for reading, which # could be for one of two reasons: # @@ -86,14 +100,27 @@ def wait_for_exit(readers) # - A graceful shutdown was already initiated, and the pipe writer has been closed, in # which case there is nothing more to do. # - readable_io = IO.select(readers) - # The return value is an array of arrays, the first of which contains the readers. Looks - # something like [[#], [], []] - ready_readers = readable_io.first - - first_read = ready_readers.first.read + begin + readable_io = IO.select(@readers) + # The return value is an array of arrays, the first of which contains the readers. Looks + # something like [[#], [], []] + logger.info "Ready to read: #{readable_io}" + ready_readers = readable_io.first + + logger.info "Readers closed?: #{readable_io.first.map(&:closed?)}" + + first_read = ready_readers.first.read + + if first_read.empty? + logger.info "First read: #{first_read}" + else + logger.info "Marshalled: #{Marshal.load(first_read)}" + self.exception = Marshal.load(first_read) + end + rescue Exception => e + logger.info "Eek, error: #{e.class}: #{e.message}" + end - self.exception = Marshal.load(first_read) unless first_read.empty? stop end end diff --git a/lib/racecar/runner.rb b/lib/racecar/runner.rb index 306385a6..3f1ce092 100644 --- a/lib/racecar/runner.rb +++ b/lib/racecar/runner.rb @@ -90,6 +90,7 @@ def run consumer.close end end + logger.info "Runner with pid #{Process.pid} shut down" end def stop @@ -153,9 +154,12 @@ def delivery_callback def install_signal_handlers # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. - trap("QUIT") { stop } - trap("INT") { stop } - trap("TERM") { stop } + ["QUIT", "INT", "TERM"].each do |signal| + trap(signal) do + $stderr.puts "Received signal in child: #{signal}, stopping" + stop + end + end # Print the consumer config to STDERR on USR1. trap("USR1") { $stderr.puts config.inspect }