From 235fc2c1a9b6e99153a123d97c651787afb77595 Mon Sep 17 00:00:00 2001 From: "Alexandr A. Panko" Date: Fri, 23 Mar 2018 12:47:46 +0300 Subject: [PATCH 1/6] Add ability to exclude queues by flag option --exclude_specified_queues --- README.md | 6 +++ lib/delayed/backend/shared_spec.rb | 59 ++++++++++++++++++++++++++++++ lib/delayed/command.rb | 3 ++ lib/delayed/tasks.rb | 1 + lib/delayed/worker.rb | 47 +++++++++++++----------- spec/delayed/backend/test.rb | 5 ++- spec/delayed/command_spec.rb | 15 ++++++++ 7 files changed, 113 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 4224fa1a4..18d8d00a7 100644 --- a/README.md +++ b/README.md @@ -244,6 +244,9 @@ You can then do the following: RAILS_ENV=production script/delayed_job --queue=tracking start RAILS_ENV=production script/delayed_job --queues=mailers,tasks start + # Option --exclude-specified-queues will do inverse of queues processing by skipping onces from --queue, --queues. + # If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*". + # Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues. # The following command will start 1 worker for the tracking queue, # 2 workers for the mailers and tasks queues, and 2 workers for any jobs: @@ -270,6 +273,9 @@ Work off queues by setting the `QUEUE` or `QUEUES` environment variable. QUEUE=tracking rake jobs:work QUEUES=mailers,tasks rake jobs:work +If EXCLUDE_SPECIFIED_QUEUES set to YES, then queues defined by QUEUE, QUEUES will be skipped instead. +See opton --exclude-specified-queues description for specal case of queue "*" + Restarting delayed_job ====================== diff --git a/lib/delayed/backend/shared_spec.rb b/lib/delayed/backend/shared_spec.rb index 771e0cd9c..d8ba0d4a7 100644 --- a/lib/delayed/backend/shared_spec.rb +++ b/lib/delayed/backend/shared_spec.rb @@ -425,6 +425,65 @@ def create_job(opts = {}) expect(SimpleJob.runs).to eq(3) end end + + context "when asked to exclude specified queues" do + context 'and worker does not have queue set' do + before(:each) do + worker.queues = [] + worker.exclude_specified_queues = true + end + + it 'works off all jobs' do + expect(SimpleJob.runs).to eq(0) + + create_job(:queue => 'one') + create_job(:queue => 'two') + create_job + worker.work_off + + expect(SimpleJob.runs).to eq(3) + end + end + + context 'and worker has one queue set' do + before(:each) do + worker.queues = ['large'] + worker.exclude_specified_queues = true + end + + it 'only works off jobs which are not from selected queues' do + expect(SimpleJob.runs).to eq(0) + + create_job(:queue => 'large') + create_job(:queue => 'small') + create_job(:queue => 'small 2') + worker.work_off + + expect(SimpleJob.runs).to eq(2) + end + end + + context 'and worker has two queue set' do + before(:each) do + worker.queues = %w[large small] + worker.exclude_specified_queues = true + end + + it 'only works off jobs which are not from selected queues' do + expect(SimpleJob.runs).to eq(0) + + create_job(:queue => 'large') + create_job(:queue => 'small') + create_job(:queue => 'medium') + create_job(:queue => 'medium 2') + create_job + + worker.work_off + + expect(SimpleJob.runs).to eq(3) + end + end + end end context 'max_attempts' do diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 281078242..72155c194 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -71,6 +71,9 @@ def initialize(args) # rubocop:disable MethodLength opt.on('--queue=queue', 'Specify which queue DJ must look up for jobs') do |queue| @options[:queues] = queue.split(',') end + opt.on('--exclude-specified-queues', 'Exclude looking up of queues specified by --queue[s]=') do + @options[:exclude_specified_queues] = true + end opt.on('--pool=queue1[,queue2][:worker_count]', 'Specify queues and number of workers for a worker pool') do |pool| parse_worker_pool(pool) end diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 409ba48f8..1826da6bd 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -19,6 +19,7 @@ :min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), + :exclude_specified_queues => ENV['EXCLUDE_SPECIFIED_QUEUES'].to_s.upcase == 'YES', :quiet => ENV['QUIET'] } diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index 1a352f775..f6e50274f 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -9,20 +9,21 @@ module Delayed class Worker # rubocop:disable ClassLength - DEFAULT_LOG_LEVEL = 'info'.freeze - DEFAULT_SLEEP_DELAY = 5 - DEFAULT_MAX_ATTEMPTS = 25 - DEFAULT_MAX_RUN_TIME = 4.hours - DEFAULT_DEFAULT_PRIORITY = 0 - DEFAULT_DELAY_JOBS = true - DEFAULT_QUEUES = [].freeze - DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze - DEFAULT_READ_AHEAD = 5 + DEFAULT_LOG_LEVEL = 'info'.freeze + DEFAULT_SLEEP_DELAY = 5 + DEFAULT_MAX_ATTEMPTS = 25 + DEFAULT_MAX_RUN_TIME = 4.hours + DEFAULT_DEFAULT_PRIORITY = 0 + DEFAULT_DELAY_JOBS = true + DEFAULT_QUEUES = [].freeze + DEFAULT_EXCLUDE_SPECIFIED_QUEUES = false + DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze + DEFAULT_READ_AHEAD = 5 cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, - :read_ahead, :plugins, :destroy_failed_jobs, :exit_on_complete, - :default_log_level + :exclude_specified_queues, :read_ahead, :plugins, :destroy_failed_jobs, + :exit_on_complete, :default_log_level # Named queue into which jobs are enqueued by default cattr_accessor :default_queue_name @@ -33,16 +34,17 @@ class Worker # rubocop:disable ClassLength attr_accessor :name_prefix def self.reset - self.default_log_level = DEFAULT_LOG_LEVEL - self.sleep_delay = DEFAULT_SLEEP_DELAY - self.max_attempts = DEFAULT_MAX_ATTEMPTS - self.max_run_time = DEFAULT_MAX_RUN_TIME - self.default_priority = DEFAULT_DEFAULT_PRIORITY - self.delay_jobs = DEFAULT_DELAY_JOBS - self.queues = DEFAULT_QUEUES - self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES - self.read_ahead = DEFAULT_READ_AHEAD - @lifecycle = nil + self.default_log_level = DEFAULT_LOG_LEVEL + self.sleep_delay = DEFAULT_SLEEP_DELAY + self.max_attempts = DEFAULT_MAX_ATTEMPTS + self.max_run_time = DEFAULT_MAX_RUN_TIME + self.default_priority = DEFAULT_DEFAULT_PRIORITY + self.delay_jobs = DEFAULT_DELAY_JOBS + self.queues = DEFAULT_QUEUES + self.exclude_specified_queues = DEFAULT_EXCLUDE_SPECIFIED_QUEUES + self.queue_attributes = DEFAULT_QUEUE_ATTRIBUTES + self.read_ahead = DEFAULT_READ_AHEAD + @lifecycle = nil end # Add or remove plugins in this list before the worker is instantiated @@ -131,7 +133,8 @@ def initialize(options = {}) @quiet = options.key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 - [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, :exit_on_complete].each do |option| + [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, + :exclude_specified_queues, :exit_on_complete].each do |option| self.class.send("#{option}=", options[option]) if options.key?(option) end diff --git a/spec/delayed/backend/test.rb b/spec/delayed/backend/test.rb index 28031171f..7f15d2c12 100644 --- a/spec/delayed/backend/test.rb +++ b/spec/delayed/backend/test.rb @@ -66,7 +66,10 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti end jobs.select! { |j| j.priority <= Worker.max_priority } if Worker.max_priority jobs.select! { |j| j.priority >= Worker.min_priority } if Worker.min_priority - jobs.select! { |j| Worker.queues.include?(j.queue) } if Worker.queues.any? + jobs.select! { |j| + includes = Worker.queues.include?(j.queue) + Worker.exclude_specified_queues ? !includes : includes + } if Worker.queues.any? jobs.sort_by! { |j| [j.priority, j.run_at] }[0..limit - 1] end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index b57cd6efa..72b2e5438 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -175,5 +175,20 @@ command.daemonize end + + it 'should run with respect of exclude queues' do + command = Delayed::Command.new(['--pool=*:1', '--pool=lage,slow,buggy:2', '--exclude-specified-queues']) + expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => [], :exclude_specified_queues => true}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}] + ].each do |args| + expect(command).to receive(:run_process).with(*args).once + end + + command.daemonize + end end end From 88ad588e6736d472ea1a1d3a6569dbc1380dc2a2 Mon Sep 17 00:00:00 2001 From: Will Novak Date: Mon, 18 Mar 2019 11:15:19 -0400 Subject: [PATCH 2/6] Fix #1019 Rubocop issues --- README.md | 3 ++- lib/delayed/backend/shared_spec.rb | 2 +- lib/delayed/tasks.rb | 2 +- lib/delayed/worker.rb | 4 ++-- spec/delayed/backend/test.rb | 10 ++++++---- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 18d8d00a7..56182a9e7 100644 --- a/README.md +++ b/README.md @@ -247,7 +247,8 @@ You can then do the following: # Option --exclude-specified-queues will do inverse of queues processing by skipping onces from --queue, --queues. # If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*". - # Use the --pool option to specify a worker pool. You can use this option multiple times to start different numbers of workers for different queues. + # Use the --pool option to specify a worker pool. + # You can use this option multiple times to start different numbers of workers for different queues. # The following command will start 1 worker for the tracking queue, # 2 workers for the mailers and tasks queues, and 2 workers for any jobs: RAILS_ENV=production script/delayed_job --pool=tracking --pool=mailers,tasks:2 --pool=*:2 start diff --git a/lib/delayed/backend/shared_spec.rb b/lib/delayed/backend/shared_spec.rb index d8ba0d4a7..f1ed5c3ad 100644 --- a/lib/delayed/backend/shared_spec.rb +++ b/lib/delayed/backend/shared_spec.rb @@ -426,7 +426,7 @@ def create_job(opts = {}) end end - context "when asked to exclude specified queues" do + context 'when asked to exclude specified queues' do context 'and worker does not have queue set' do before(:each) do worker.queues = [] diff --git a/lib/delayed/tasks.rb b/lib/delayed/tasks.rb index 1826da6bd..0e7971099 100644 --- a/lib/delayed/tasks.rb +++ b/lib/delayed/tasks.rb @@ -19,7 +19,7 @@ :min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'], :queues => (ENV['QUEUES'] || ENV['QUEUE'] || '').split(','), - :exclude_specified_queues => ENV['EXCLUDE_SPECIFIED_QUEUES'].to_s.upcase == 'YES', + :exclude_specified_queues => ENV['EXCLUDE_SPECIFIED_QUEUES'].to_s.casecmp('YES').zero?, :quiet => ENV['QUIET'] } diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index f6e50274f..ef3aabb6a 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -133,8 +133,8 @@ def initialize(options = {}) @quiet = options.key?(:quiet) ? options[:quiet] : true @failed_reserve_count = 0 - [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, - :exclude_specified_queues, :exit_on_complete].each do |option| + [:min_priority, :max_priority, :sleep_delay, :read_ahead, :queues, + :exclude_specified_queues, :exit_on_complete].each do |option| self.class.send("#{option}=", options[option]) if options.key?(option) end diff --git a/spec/delayed/backend/test.rb b/spec/delayed/backend/test.rb index 7f15d2c12..e580686a4 100644 --- a/spec/delayed/backend/test.rb +++ b/spec/delayed/backend/test.rb @@ -66,10 +66,12 @@ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_ti end jobs.select! { |j| j.priority <= Worker.max_priority } if Worker.max_priority jobs.select! { |j| j.priority >= Worker.min_priority } if Worker.min_priority - jobs.select! { |j| - includes = Worker.queues.include?(j.queue) - Worker.exclude_specified_queues ? !includes : includes - } if Worker.queues.any? + if Worker.queues.any? + jobs.select! do |j| + includes = Worker.queues.include?(j.queue) + Worker.exclude_specified_queues ? !includes : includes + end + end jobs.sort_by! { |j| [j.priority, j.run_at] }[0..limit - 1] end From d30def5b09a6e2d1291a8c31e9c8b7c4a3c4cf27 Mon Sep 17 00:00:00 2001 From: Will Novak Date: Wed, 3 Mar 2021 16:19:07 -0500 Subject: [PATCH 3/6] Update README.md Co-authored-by: Nicholas Jakobsen --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 56182a9e7..85e8de59a 100644 --- a/README.md +++ b/README.md @@ -275,7 +275,7 @@ Work off queues by setting the `QUEUE` or `QUEUES` environment variable. QUEUES=mailers,tasks rake jobs:work If EXCLUDE_SPECIFIED_QUEUES set to YES, then queues defined by QUEUE, QUEUES will be skipped instead. -See opton --exclude-specified-queues description for specal case of queue "*" +See option --exclude-specified-queues description for special case of queue "*" Restarting delayed_job ====================== From 3e39646b112e4813f2e52ec2f7b11edc853da039 Mon Sep 17 00:00:00 2001 From: Will Novak Date: Wed, 3 Mar 2021 16:44:11 -0500 Subject: [PATCH 4/6] Update README.md Co-authored-by: Nicholas Jakobsen --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 85e8de59a..f41636d4d 100644 --- a/README.md +++ b/README.md @@ -244,7 +244,7 @@ You can then do the following: RAILS_ENV=production script/delayed_job --queue=tracking start RAILS_ENV=production script/delayed_job --queues=mailers,tasks start - # Option --exclude-specified-queues will do inverse of queues processing by skipping onces from --queue, --queues. + # Option --exclude-specified-queues will do inverse of queues processing by skipping ones from --queue, --queues. # If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*". # Use the --pool option to specify a worker pool. From 1301f016964644a5e7cc09a0972868ca6e0b8dce Mon Sep 17 00:00:00 2001 From: Nicholas Jakobsen Date: Wed, 3 Mar 2021 16:27:35 -0800 Subject: [PATCH 5/6] Allow per worker queue exclusions in pools The addition of `exclude_specified_queues` allowed all specified queues to be treated as exclusions, implying that all jobs in those queues not be handled by the workers. However, this only allows application of this behaviour globally. What we want is the ability to create two or more pools, where some pools handle jobs from the given queues and other pools handle jobs that are not from the given queues. To implement this, we check if the `exclude_specified_queues` is not set and treat this as a request for automatic detection of exclusions. By adding a "!" to the beginning of a queue list, only the workers that handle that list of queues will behaves as if the `exclude_specified_queues` flag had been set. --- lib/delayed/command.rb | 15 +++++++++++++++ spec/delayed/command_spec.rb | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/lib/delayed/command.rb b/lib/delayed/command.rb index 72155c194..225966476 100644 --- a/lib/delayed/command.rb +++ b/lib/delayed/command.rb @@ -122,6 +122,7 @@ def setup_pools end def run_process(process_name, options = {}) + options = normalize_worker_options(options) Delayed::Worker.before_fork Daemons.run_proc(process_name, :dir => options[:pid_dir], :dir_mode => :normal, :monitor => @monitor, :ARGV => @args) do |*_args| $0 = File.join(options[:prefix], process_name) if @options[:prefix] @@ -156,6 +157,20 @@ def parse_worker_pool(pool) @worker_pools << [queues, worker_count] end + def normalize_worker_options(options) + options = options.dup + + # If we haven't explictly said that we do or don't want to exclude specified queues, treat a leading '!' as a negation indicator for that list of queues + # Otherwise, the ! is treated as part of the queue name itself + if options[:exclude_specified_queues].nil? && options[:queues].present? + queues = options[:queues].map {|queue| queue.sub(/^!/, '') } # remove leading ! from all queues even though we only expect the first to have one, this makes it easier to look for changes after + options[:exclude_specified_queues] = queues != options[:queues] + options[:queues] = queues + end + + options + end + def root @root ||= rails_root_defined? ? ::Rails.root : DIR_PWD end diff --git a/spec/delayed/command_spec.rb b/spec/delayed/command_spec.rb index 72b2e5438..0c22d8521 100644 --- a/spec/delayed/command_spec.rb +++ b/spec/delayed/command_spec.rb @@ -190,5 +190,20 @@ command.daemonize end + + it 'should set queue exclusion to true if a queue starts with a ! and --exclude_specified_queues has not been specified' do + command = Delayed::Command.new(['--pool=fast:1', '--pool=!lage,slow,buggy:2']) + expect(FileUtils).to receive(:mkdir_p).with('./tmp/pids').once + + [ + ['delayed_job.0', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[fast], :exclude_specified_queues => false}], + ['delayed_job.1', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}], + ['delayed_job.2', {:quiet => true, :pid_dir => './tmp/pids', :log_dir => './log', :queues => %w[lage slow buggy], :exclude_specified_queues => true}] + ].each do |args| + expect(command).to receive(:run_process).with(*args).once + end + + command.daemonize + end end end From bfafebf66312f1867ce8831e002e5a0c019e4f14 Mon Sep 17 00:00:00 2001 From: Nicholas Jakobsen Date: Thu, 26 Aug 2021 09:39:40 -0700 Subject: [PATCH 6/6] Document worker pool name queue exclusion prefix --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index d843b71f7..d2c190231 100644 --- a/README.md +++ b/README.md @@ -250,6 +250,8 @@ You can then do the following: # Option --exclude-specified-queues will do inverse of queues processing by skipping ones from --queue, --queues. # If both --pool=* --exclude-specified-queues given, no exclusions will by applied on "*". + # A worker pool's queue list can be prefixed with a ! which has the same effect as setting + # --exclude-specified-queues but only applies it to that specific worker pool. # Use the --pool option to specify a worker pool. # You can use this option multiple times to start different numbers of workers for different queues.