Skip to content

Commit

Permalink
Increase logging to investigate hanging tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Blair committed Feb 25, 2021
1 parent 689dae3 commit 1488db5
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 18 deletions.
57 changes: 42 additions & 15 deletions lib/racecar/concurrent_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,37 @@ def initialize(processor:, config:, logger:, instrumenter:)
end

def run
$stderr.puts "=> Running with max concurrency of #{config.max_concurrency}"
logger.info "=> Running with max concurrency of #{config.max_concurrency}"

readers = config.max_concurrency.times.map do
@readers = config.max_concurrency.times.map do
pid, reader = run_worker

@process_ids << pid
$stderr.puts "=> Forked new Racecar consumer with process id #{pid}"
logger.info "=> Forked new Racecar consumer with process id #{pid}"

reader
end

install_signal_handlers

wait_for_exit(readers)
wait_for_exit
end

def stop
return if @shutdown_initiated

@shutdown_initiated = true

$stderr.puts "=> Terminating child processes"
logger.info "=> Terminating child processes"

Process.kill("TERM", *@process_ids)

@process_ids.each { |pid| Process.waitpid(pid) }
@process_ids.each do |pid|
logger.info "Waiting for child with pid #{pid}"
Process.waitpid(pid)
end

logger.info "All children shutdown, exiting"

raise exception if exception
end
Expand All @@ -54,7 +59,12 @@ def install_signal_handlers
# Print the consumer config to STDERR on USR1.
trap("USR1") { $stderr.puts config.inspect }

SHUTDOWN_SIGNALS.each { |signal| trap(signal) { stop } }
SHUTDOWN_SIGNALS.each do |signal|
trap(signal) do
$stderr.puts "Received signal in parent: #{signal}, stopping"
stop
end
end
end

def run_worker
Expand All @@ -66,8 +76,12 @@ def run_worker

Racecar::Runner.new(processor, config: config, logger: logger, instrumenter: instrumenter).run
rescue Exception => e
logger.info "Caught error from #{Process.pid}: #{e.class}: #{e.message}"
# Allow the parent process to re-raise the exception after shutdown
child_writer.write(Marshal.dump(e))
logger.info "Wrote to pipe from child #{Process.pid}"
ensure
child_writer.close
end
end

Expand All @@ -76,7 +90,7 @@ def run_worker
[pid, parent_reader]
end

def wait_for_exit(readers)
def wait_for_exit
# The call to IO.select blocks until one of our readers is ready for reading, which
# could be for one of two reasons:
#
Expand All @@ -86,14 +100,27 @@ def wait_for_exit(readers)
# - A graceful shutdown was already initiated, and the pipe writer has been closed, in
# which case there is nothing more to do.
#
readable_io = IO.select(readers)
# The return value is an array of arrays, the first of which contains the readers. Looks
# something like [[#<IO:fd 10>], [], []]
ready_readers = readable_io.first

first_read = ready_readers.first.read
begin
readable_io = IO.select(@readers)
# The return value is an array of arrays, the first of which contains the readers. Looks
# something like [[#<IO:fd 10>], [], []]
logger.info "Ready to read: #{readable_io}"
ready_readers = readable_io.first

logger.info "Readers closed?: #{readable_io.first.map(&:closed?)}"

first_read = ready_readers.first.read

if first_read.empty?
logger.info "First read: #{first_read}"
else
logger.info "Marshalled: #{Marshal.load(first_read)}"
self.exception = Marshal.load(first_read)
end
rescue Exception => e
logger.info "Eek, error: #{e.class}: #{e.message}"
end

self.exception = Marshal.load(first_read) unless first_read.empty?
stop
end
end
Expand Down
10 changes: 7 additions & 3 deletions lib/racecar/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ def run
consumer.close
end
end
logger.info "Runner with pid #{Process.pid} shut down"
end

def stop
Expand Down Expand Up @@ -153,9 +154,12 @@ def delivery_callback

def install_signal_handlers
# Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
trap("QUIT") { stop }
trap("INT") { stop }
trap("TERM") { stop }
["QUIT", "INT", "TERM"].each do |signal|
trap(signal) do
$stderr.puts "Received signal in child: #{signal}, stopping"
stop
end
end

# Print the consumer config to STDERR on USR1.
trap("USR1") { $stderr.puts config.inspect }
Expand Down

0 comments on commit 1488db5

Please sign in to comment.