Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency testing using a forking approach #222

Merged
merged 14 commits into from
Mar 29, 2021

Conversation

Alexander-Blair
Copy link
Contributor

@Alexander-Blair Alexander-Blair commented Feb 23, 2021

The issue: #188

Some discussion on a previous implementation approach on this PR: #221 - this PR is an initial draft to continue the conversation.

Adds a ConcurrentRunner, which uses forking to allow concurrent processing:

  • Adds the code in a way that does not affect existing APIs (you never know what your users will decide to depend on)
  • Includes quite a big overhaul of the existing integration test so that we have a fresh topic for each spec, and we also clean up the topics at the end.

Outstanding question(s):

  • Should there be a hard limit on max concurrency?

@dasch
Copy link
Contributor

dasch commented Feb 23, 2021

Try rebasing on master to get the tests working again. Also, would you like to extract the improvements to the test suite to a separate PR and get that merged first?

@Alexander-Blair
Copy link
Contributor Author

Very sensible idea! 👍 #225

@Alexander-Blair Alexander-Blair force-pushed the concurrency-testing-forking branch 3 times, most recently from cd5feab to 6f44b73 Compare February 24, 2021 09:44
Copy link
Contributor

@dasch dasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting close! Can you also rebase on master to see if that fixes CI?

# which case there is nothing more to do.
readable_io = IO.select(readers)

first_read = (readable_io.first & readers).first.read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to this line, it's not obvious what's being done.

@@ -153,6 +153,9 @@ class Config < KingKonf::Config
desc "Whether to boot Rails when starting the consumer"
boolean :without_rails, default: false

desc "Maximum number of threads to run the application with. Each will spawn its own consumer"
integer :max_concurrency, default: 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to processes. Also, I think we should call this parallel rather than concurrent now that we're not using threads. Alternatively, we could perhaps look at what the configs are called for Unicorn and Resque?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps "workers" is better. Also – it's not really a max, is it? It's the actual number of workers that will be spun up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah true. What needs to be clear though is that the number of workers * running instances should be less than or equal to the number of partitions in the Kafka topics. Especially if people are running locally and are disappointed when throughput doesn't improve on a single partitioned topic 😄

@Alexander-Blair Alexander-Blair force-pushed the concurrency-testing-forking branch 12 times, most recently from 1488db5 to d21e3ea Compare February 25, 2021 13:34
@dasch
Copy link
Contributor

dasch commented Feb 26, 2021

One of the workers somehow receives a shutdown signal - in this case,
we do the same as above, initiating termination and waiting for all of
the workers to shut down

Is there a case to be made for restarting that worker instead, or does that expand the state space too much?

ready_readers.each(&:close)

# Recursively wait for the remaining readers
wait_for_exit(remaining_readers - ready_readers)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This risks blowing the stack if we go too deep. Probably iterate instead, e.g. until remaining_readers.empty? ... end and update a variable with the remaining readers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, this wasn't needed in the end 👍

@Alexander-Blair Alexander-Blair force-pushed the concurrency-testing-forking branch 3 times, most recently from ed7dd10 to 5d0a3d6 Compare February 27, 2021 11:43
@Alexander-Blair
Copy link
Contributor Author

Is there a case to be made for restarting that worker instead, or does that expand the state space too much?

I'd maybe like to have this running as an experimental feature, and see if 'quiet' deaths of workers is a common occurrence. From what I can see, exceptions (aside from on startup) are rescued within the Racecar::Runner itself, and baked into the retry capabilities. Perhaps if it turns out the consumers can die on their own (and it turns out to be somewhat common), we can explore the restart capabilities.

@Alexander-Blair Alexander-Blair force-pushed the concurrency-testing-forking branch 3 times, most recently from 0c921d7 to d90b989 Compare March 3, 2021 11:07
Copy link
Contributor

@dasch dasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, ready for beta testing at least 👍

Can you fully document in the README and perhaps the code as well, and also add an entry to the changelog?

Copy link
Contributor

@dasch dasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Darn close! Can you make the change to the code and also add an entry in the changelog?

start_from_beginning: true,
max_bytes_per_partition: 1048576,
additional_config: {},
parallel_workers: nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it makes sense to make this configuration part of the subscribes_to method – consumer classes can subscribe to multiple topics, but the parallelism is tied to the consumer, not the topic. The class level accessor should suffice; class MyConsumer < ...; self.parallel_workers = 5; end.

Copy link
Contributor

@dasch dasch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks a bunch for putting in all the work!

I can see that the branch needs to be rebased, but after that I'll happily merge :D

Alex Blair added 12 commits March 29, 2021 13:16
When running the specs, failing to do so was causing a consistent
segfault when running via Docker (though not on a Macbook!). Upon
further investigation, it turned out that:
- A producer was created in test 1 (without spinning up parallel
  workers). The producer was not closed, and remained alive in the main
  RSpec process
- A test was run which did include parallelism, therefore forking the
  main RSpec process one or more times. At the end of the test, the main
  process and the forked processes all tried to call the producer's
  finalizer, resulting in bad times
Each will register as its own member of the Kafka consumer group, and
act independently
This is specified per consumer, as this makes more sense than a
global configuration option
When running separate instances of the same app, the workers often end
up with the same process id, so it's not particularly informative on its
own
@dasch dasch merged commit 1c34161 into zendesk:master Mar 29, 2021
@Alexander-Blair Alexander-Blair deleted the concurrency-testing-forking branch March 29, 2021 13:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants