Skip to content

Commit

Permalink
Adding batching to re-queuing for timestamp (resque#767)
Browse files Browse the repository at this point in the history
* adding code to add requeue in batches

* cop

* fixed rubocop errors

* added test fix

* added rubocop fix

* added versions to matrix

* Revert "added test fix"

This reverts commit d0b0f2e.

* removed unrelated changes

* more unrelated changes

* one last unrelated change

* cleaned up comments and code

* rubocop line length
  • Loading branch information
brennen-stripe authored Mar 14, 2023
1 parent edda557 commit 121e342
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .rubocop_todo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Metrics/MethodLength:
# Offense count: 2
# Configuration parameters: CountComments.
Metrics/ModuleLength:
Max: 331
Max: 364

# Offense count: 1
Style/CaseEquality:
Expand Down
74 changes: 69 additions & 5 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -206,16 +206,80 @@ def enqueue_next_item(timestamp)

# Enqueues all delayed jobs for a timestamp
def enqueue_delayed_items_for_timestamp(timestamp)
item = nil
count = 0
batch_size = delayed_requeue_batch_size
actual_batch_size = nil

log "Processing delayed items for timestamp #{timestamp}, in batches of #{batch_size}"

loop do
handle_shutdown do
# Continually check that it is still the master
item = enqueue_next_item(timestamp) if am_master
if am_master
actual_batch_size = enqueue_items_in_batch_for_timestamp(timestamp,
batch_size)
end
end
# continue processing until there are no more ready items in this
# timestamp
break if item.nil?

count += actual_batch_size
log "queued #{count} jobs" if actual_batch_size != -1

# continue processing until there are no more items in this
# timestamp. If we don't have a full batch, this is the last one.
# This also breaks us in the event of a redis transaction failure
# i.e. enqueue_items_in_batch_for_timestamp returned -1
break if actual_batch_size < batch_size
end

log "finished queueing #{count} total jobs for timestamp #{timestamp}" if count != -1
end

def timestamp_key(timestamp)
"delayed:#{timestamp.to_i}"
end

def enqueue_items_in_batch_for_timestamp(timestamp, batch_size)
timestamp_bucket_key = timestamp_key(timestamp)

encoded_jobs_to_requeue = Resque.redis.lrange(timestamp_bucket_key, 0, batch_size - 1)

# Watch is used to ensure that the timestamp bucket we are operating on
# is not altered by any other clients between the watch call and when we call exec
# (to execute the multi block). We should error catch on the redis.exec return value
# as that will indicate if the entire transaction was aborted or not. Though we should
# be safe as our ltrim is inside the multi block and therefore also would have been
# aborted. So nothing would have been queued, but also nothing lost from the bucket.
watch_result = Resque.redis.watch(timestamp_bucket_key) do
Resque.redis.multi do
encoded_jobs_to_requeue.each do |encoded_job|
Resque.redis.srem("timestamps:#{encoded_job}", timestamp_bucket_key)

decoded_job = Resque.decode(encoded_job)
enqueue(decoded_job)
end

Resque.redis.ltrim(timestamp_bucket_key, batch_size, -1)
end
end

# Did the multi block successfully remove from this timestamp and enqueue the jobs?
success = !watch_result.nil?

# If this was the last batch in this timestamp bucket, clean up
if success && encoded_jobs_to_requeue.count < batch_size
Resque.clean_up_timestamp(timestamp_bucket_key, timestamp)
end

unless success
# Our batched transaction failed in Redis due to the timestamp_bucket_key value
# being modified while we built our multi block. We return -1 to ensure we break
# out of the loop iterating on this timestamp so it can be re-processed via the
# loop in handle_delayed_items.
return -1
end

# will return 0 if none were left to batch
encoded_jobs_to_requeue.count
end

def enqueue(config)
Expand Down
6 changes: 6 additions & 0 deletions lib/resque/scheduler/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ def app_name
@app_name ||= environment['APP_NAME']
end

def delayed_requeue_batch_size
@delayed_requeue_batch_size ||= \
ENV['DELAYED_REQUEUE_BATCH_SIZE'].to_i if environment['DELAYED_REQUEUE_BATCH_SIZE']
@delayed_requeue_batch_size ||= 100
end

# Amount of time in seconds to sleep between polls of the delayed
# queue. Defaults to 5
attr_writer :poll_sleep_amount
Expand Down
32 changes: 16 additions & 16 deletions lib/resque/scheduler/delaying_extensions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,22 @@ def get_last_enqueued_at(job_name)
redis.hget('delayed:last_enqueued_at', job_name)
end

def clean_up_timestamp(key, timestamp)
# Use a watch here to ensure nobody adds jobs to this delayed
# queue while we're removing it.
redis.watch(key) do
if redis.llen(key).to_i == 0
# If the list is empty, remove it.
redis.multi do |transaction|
transaction.del(key)
transaction.zrem(:delayed_queue_schedule, timestamp.to_i)
end
else
redis.redis.unwatch
end
end
end

private

def job_to_hash(klass, args)
Expand Down Expand Up @@ -328,22 +344,6 @@ def remove_delayed_job(encoded_job)
replies.each_slice(2).map(&:first).inject(:+)
end

def clean_up_timestamp(key, timestamp)
# Use a watch here to ensure nobody adds jobs to this delayed
# queue while we're removing it.
redis.watch(key) do
if redis.llen(key).to_i == 0
# If the list is empty, remove it.
redis.multi do |transaction|
transaction.del(key)
transaction.zrem(:delayed_queue_schedule, timestamp.to_i)
end
else
redis.redis.unwatch
end
end
end

def search_first_delayed_timestamp_in_range(start_at, stop_at)
start_at = start_at.nil? ? '-inf' : start_at.to_i
stop_at = stop_at.nil? ? '+inf' : stop_at.to_i
Expand Down
56 changes: 56 additions & 0 deletions test/delayed_queue_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,62 @@
assert_equal(1, Resque.delayed_timestamp_peek(t, 0, 3).length)
end

test 'enqueue_delayed_items_for_timestamp enqueues jobs in 2 batches' do
t = Time.now + 60

# create 120 jobs
120.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(120, Resque.delayed_timestamp_size(t))

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))

# assert that the active queue is now 120
assert_equal(120, Resque.size(Resque.queue_from_class(SomeIvarJob)))
end

test 'enqueue_delayed_items_for_timestamp enqueues jobs in one batch for the timestamp' do
t = Time.now + 60

# create 90 jobs
90.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(90, Resque.delayed_timestamp_size(t))

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
assert_equal(0, Resque.delayed_timestamp_size(t))

# assert that the active queue is now 90
assert_equal(90, Resque.size(Resque.queue_from_class(SomeIvarJob)))
end

# test to make sure the timestamp is cleaned up

test 'enqueue_delayed_items_for_timestamp handles a watch failure' do
t = Time.now + 60

# create 100 jobs
100.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(100, Resque.delayed_timestamp_size(t))

Resque.redis.stubs(:watch).returns(nil)

Resque.expects(:clean_up_timestamp).never

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
end

test 'enqueue_delayed_items_for_timestamp cleans up a timestamp' do
t = Time.now + 60

# create 100 jobs
100.times { Resque.enqueue_at(t, SomeIvarJob) }
assert_equal(100, Resque.delayed_timestamp_size(t))

Resque.expects(:clean_up_timestamp).once

Resque::Scheduler.enqueue_delayed_items_for_timestamp(t)
end

test 'enqueue_delayed_items_for_timestamp creates jobs ' \
'and empties the delayed queue' do
t = Time.now + 60
Expand Down

0 comments on commit 121e342

Please sign in to comment.