Skip to content

Commit

Permalink
AO3-6901 Use ActiveJob for asynchronous indexing (#5055)
Browse files Browse the repository at this point in the history
* AO3-6901 Use ActiveJob for asynchronous indexing

* Update tests for AsyncIndexer

* Avoid leaking queue_adapter changes to subsequent tests

* Run enqueued jobs for request specs (works n+1 specs)

* Fix typo in comment
  • Loading branch information
redsummernight authored Feb 12, 2025
1 parent ec6c90f commit ddc7157
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 71 deletions.
15 changes: 15 additions & 0 deletions app/jobs/async_indexer_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# A job to index the record IDs queued up by AsyncIndexer.
class AsyncIndexerJob < ApplicationJob
REDIS = AsyncIndexer::REDIS

def perform(name)
indexer = name.split(":").first.constantize
ids = REDIS.smembers(name)

return if ids.empty?

batch = indexer.new(ids).index_documents
IndexSweeper.new(batch, indexer).process_batch
REDIS.del(name)
end
end
19 changes: 12 additions & 7 deletions app/models/search/async_indexer.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
class AsyncIndexer

REDIS = REDIS_GENERAL

####################
# CLASS METHODS
####################

def self.perform(name)
# TODO: Keep the method so we can still run queued jobs from previous
# versions. However, tests should no longer depend on it.
#
# Remove in a future version, once all old jobs have been retried or
# cleared.
raise "Avoid using AsyncIndexer.perform in tests" if Rails.env.test?

indexer = name.split(":").first.constantize
ids = REDIS.smembers(name)

Expand Down Expand Up @@ -41,10 +47,10 @@ def self.index(klass, ids, priority)
def initialize(indexer, priority)
@indexer = indexer
@priority = case priority.to_s
when 'main'
'high'
when 'background'
'low'
when "main"
"high"
when "background"
"low"
else
priority
end
Expand All @@ -53,11 +59,10 @@ def initialize(indexer, priority)
def enqueue_ids(ids)
name = "#{indexer}:#{ids.first}:#{Time.now.to_i}"
REDIS.sadd(name, ids)
Resque::Job.create(queue, self.class, name)
AsyncIndexerJob.set(queue: queue).perform_later(name)
end

def queue
"reindex_#{priority}"
end

end
4 changes: 0 additions & 4 deletions config/environments/production.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@
# Use a different cache store in production.
# config.cache_store = :mem_cache_store

# Use a real queuing backend for Active Job (and separate queues per environment).
# config.active_job.queue_adapter = :resque
# config.active_job.queue_name_prefix = "otwarchive_production"

config.action_mailer.perform_caching = false

# Ignore bad email addresses and do not raise email delivery errors.
Expand Down
4 changes: 2 additions & 2 deletions spec/lib/tasks/notifications.rake_spec.rb
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
require "spec_helper"

describe "rake notifications:send_tos_update" do
include ActiveJob::TestHelper

let(:admin_post) { create(:admin_post) }

context "with one user" do
let!(:user) { create(:user) }

it "enqueues one tos update notifications" do
ActiveJob::Base.queue_adapter = :test
expect(User.all.size).to eq(1)
expect { subject.invoke(admin_post.id) }
.to have_enqueued_mail(TosUpdateMailer, :tos_update_notification).on_queue(:tos_update).with(user, admin_post.id)
Expand All @@ -18,7 +19,6 @@
before { create_list(:user, 10) }

it "enqueues multiple tos update notifications" do
ActiveJob::Base.queue_adapter = :test
expect(User.all.size).to eq(10)
expect { subject.invoke(admin_post.id) }
.to have_enqueued_mail(TosUpdateMailer, :tos_update_notification).on_queue(:tos_update).with(instance_of(User), admin_post.id).exactly(10)
Expand Down
91 changes: 35 additions & 56 deletions spec/models/search/async_indexer_spec.rb
Original file line number Diff line number Diff line change
@@ -1,49 +1,24 @@
require 'spec_helper'
require "spec_helper"

describe AsyncIndexer do
it "should enqueue ids" do
tag = Tag.new
tag.id = 34
include ActiveJob::TestHelper

indexer = AsyncIndexer.new(TagIndexer, :background)
it "enqueues IDs" do
freeze_time
batch_key = "WorkIndexer:34:#{Time.now.to_i}"

expect(AsyncIndexer).to receive(:new).with(TagIndexer, :background).and_return(indexer)
expect(indexer).to receive(:enqueue_ids).with([34]).and_return(true)
expect do
AsyncIndexer.index(Work, [34], :background)
end.to enqueue_job.on_queue("reindex_low").with(batch_key)

AsyncIndexer.index('Tag', [34], :background)
expect(AsyncIndexer::REDIS.smembers(batch_key)).to contain_exactly("34")
end

it "should retry batch errors" do
work = Work.new
work.id = 34
context "when persistent indexing failures occur" do
let(:work_id) { create(:work).id }

indexer = WorkIndexer.new([34])
batch = {
"errors" => true,
"items" => [{
"update" => {
"_id" => 34,
"error" => {
"problem" => "description"
}
}
}]
}

async_indexer = AsyncIndexer.new(WorkIndexer, "failures")

expect(AsyncIndexer::REDIS).to receive(:smembers).and_return([34])
expect(WorkIndexer).to receive(:new).with([34]).and_return(indexer)
expect(indexer).to receive(:index_documents).and_return(batch)
expect(AsyncIndexer).to receive(:new).with(WorkIndexer, "failures").and_return(async_indexer)
expect(async_indexer).to receive(:enqueue_ids).with([34]).and_return(true)

AsyncIndexer.perform("WorkIndexer:34:#{Time.now.to_i}")
end

context "when persistent failures occur" do
before do
# Make elasticsearch always fail.
# Make batch indexing always fail.
allow($elasticsearch).to receive(:bulk) do |options|
{
"errors" => true,
Expand All @@ -62,33 +37,37 @@
end
end

it "should call the BookmarkedWorkIndexer three times with the same ID" do
expect(BookmarkedWorkIndexer).to receive(:new).with(["99999"]).
exactly(3).times.
and_call_original
AsyncIndexer.index(BookmarkedWorkIndexer, [99_999], "main")
end

it "should add the ID to the BookmarkedWorkIndexer's permanent failures" do
AsyncIndexer.index(BookmarkedWorkIndexer, [99_999], "main")

permanent_store = IndexSweeper.permanent_failures(BookmarkedWorkIndexer)
it "tries indexing IDs up to 3 times" do
expect(BookmarkedWorkIndexer).to receive(:new).with([work_id.to_s])
.exactly(3).times
.and_call_original

expect do
AsyncIndexer.index(BookmarkedWorkIndexer, [work_id], :main)
end.to enqueue_job

2.times do
# Batch keys in Redis contain a timestamp. Ensure that each retry has
# a different batch key.
travel(1.second)
expect { perform_enqueued_jobs }
.to enqueue_job.on_queue("reindex_failures")
end

expect(permanent_store).to include(
"99999-work" => { "an error" => "with a message" }
)
# The third failure does not enqueue a retry.
travel(1.second)
expect { perform_enqueued_jobs }
.not_to enqueue_job
end
end

context "when there are no IDs to index" do
before do
allow(AsyncIndexer::REDIS).to receive(:smembers).and_return([])
end

it "doesn't call the indexer" do
expect(AsyncIndexer::REDIS).to receive(:smembers).and_return([])
expect(WorkIndexer).not_to receive(:new)

AsyncIndexer.perform("WorkIndexer:34:#{Time.now.to_i}")
AsyncIndexer.index(WorkIndexer, [34], :main)
perform_enqueued_jobs
end
end
end
2 changes: 1 addition & 1 deletion spec/models/search/index_sweeper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
expect(AsyncIndexer).to receive(:new).with(StatCounterIndexer, "failures").at_least(:once).and_return(indexer)
expect(indexer).to receive(:enqueue_ids).with([work.stat_counter.id]).at_least(:once).and_call_original

expect(sweeper.process_batch).to be(true)
sweeper.process_batch
end

it "doesn't trigger an error if the batch results are empty" do
Expand Down
11 changes: 10 additions & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,17 @@ def clean_the_database

def run_all_indexing_jobs
%w[main background stats].each do |reindex_type|
ScheduledReindexJob.perform reindex_type
ScheduledReindexJob.perform(reindex_type)
end

# In Rails pre-7.2, "config.active_job.queue_adapter" is respected by some
# test cases but not others. In request specs, the queue adapter will be
# overridden to ":test", so we need to call "perform_enqueued_jobs" to
# process jobs.
#
# Refer to https://github.com/rails/rails/pull/48585.
perform_enqueued_jobs if ActiveJob::Base.queue_adapter.instance_of? ActiveJob::QueueAdapters::TestAdapter

Indexer.all.map(&:refresh_index)
end

Expand Down

0 comments on commit ddc7157

Please sign in to comment.