diff --git a/Gemfile b/Gemfile index ec2699867..d88ba639e 100644 --- a/Gemfile +++ b/Gemfile @@ -50,9 +50,11 @@ group :test do if ENV['RAILS_VERSION'] == 'edge' gem 'actionmailer', :github => 'rails/rails' gem 'activerecord', :github => 'rails/rails' + gem 'activejob', :github => 'rails/rails' elsif ENV['RAILS_VERSION'] gem 'actionmailer', "~> #{ENV['RAILS_VERSION']}" gem 'activerecord', "~> #{ENV['RAILS_VERSION']}" + if ENV['RAILS_VERSION'] < '5.1' gem 'loofah', '2.3.1' gem 'nokogiri', '< 1.11.0' @@ -81,6 +83,7 @@ group :test do if ENV['RAILS_VERSION'].nil? || ENV['RAILS_VERSION'] >= '6.0.0' gem 'zeitwerk', :require => false end + gem 'concurrent-ruby' end group :rubocop do diff --git a/lib/active_job/queue_adapters/delayed_job_adapter.rb b/lib/active_job/queue_adapters/delayed_job_adapter.rb new file mode 100644 index 000000000..6ff6d2efa --- /dev/null +++ b/lib/active_job/queue_adapters/delayed_job_adapter.rb @@ -0,0 +1,61 @@ +module ActiveJob + module QueueAdapters + # Explicitly remove the implementation existing in older rails'. + remove_const(:DelayedJobAdapter) if defined?(:DelayedJobAdapter) + + # = Delayed Job adapter for Active Job + # + # To use Delayed Job, set the queue_adapter config to +:delayed_job+. + # + # Rails.application.config.active_job.queue_adapter = :delayed_job + class DelayedJobAdapter < ::ActiveJob::QueueAdapters::AbstractAdapter + def initialize(enqueue_after_transaction_commit: false) + @enqueue_after_transaction_commit = enqueue_after_transaction_commit + end + + def enqueue_after_transaction_commit? # :nodoc: + @enqueue_after_transaction_commit + end + + def enqueue(job) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority) + job.provider_job_id = delayed_job.id + delayed_job + end + + def enqueue_at(job, timestamp) + delayed_job = Delayed::Job.enqueue(JobWrapper.new(job.serialize), queue: job.queue_name, priority: job.priority, run_at: Time.at(timestamp)) + job.provider_job_id = delayed_job.id + delayed_job + end + + class JobWrapper + attr_accessor :job_data + + def initialize(job_data) + @job_data = job_data + end + + def display_name + base_name = "#{job_data["job_class"]} [#{job_data["job_id"]}] from DelayedJob(#{job_data["queue_name"]})" + + return base_name unless log_arguments? + + "#{base_name} with arguments: #{job_data["arguments"]}" + end + + def perform + Base.execute(job_data) + end + + private + def log_arguments? + job_data["job_class"].constantize.log_arguments? + rescue NameError + false + end + end + end + end +end + diff --git a/lib/delayed/railtie.rb b/lib/delayed/railtie.rb index a50ca1b4b..c25f7dde7 100644 --- a/lib/delayed/railtie.rb +++ b/lib/delayed/railtie.rb @@ -1,5 +1,6 @@ require 'delayed_job' require 'rails' +require 'active_job/queue_adapters/delayed_job_adapter' module Delayed class Railtie < Rails::Railtie diff --git a/spec/active_job_adapter_spec.rb b/spec/active_job_adapter_spec.rb new file mode 100644 index 000000000..fd4bb94e5 --- /dev/null +++ b/spec/active_job_adapter_spec.rb @@ -0,0 +1,121 @@ +require 'helper' +require 'active_job' +require 'concurrent' + +describe 'a Rails active job backend' do + module JobBuffer + @values = Concurrent::Array.new + + class << self + def clear + @values.clear + end + + def add(value) + @values << value + end + + def values + @values.dup + end + end + end + + class TestJob < ActiveJob::Base + queue_as :integration_tests + + def perform(message) + JobBuffer.add(message) + end + end + + let(:worker) { Delayed::Worker.new(sleep_delay: 0.5, queues: %w[integration_tests]) } + + before do + JobBuffer.clear + Delayed::Job.delete_all + ActiveJob::Base.queue_adapter = :delayed_job + ActiveJob::Base.logger = nil + end + + it "should supply a wrapped class name to DelayedJob" do + TestJob.perform_later + job = Delayed::Job.all.last + expect(job.name).to match(/TestJob \[[0-9a-f-]+\] from DelayedJob\(integration_tests\) with arguments: \[\]/) + end + + it 'enqueus and executes the job' do + start_worker do + TestJob.perform_later('Rails') + sleep 2 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should not run jobs queued on a non-listening queue" do + start_worker do + old_queue = TestJob.queue_name + + begin + TestJob.queue_as :some_other_queue + TestJob.perform_later "Rails" + sleep 2 + expect(JobBuffer.values.empty?).to eq true + ensure + TestJob.queue_name = old_queue + end + end + end + + it 'runs multiple queued jobs' do + start_worker do + ActiveJob.perform_all_later(TestJob.new('Rails'), TestJob.new('World')) + sleep 2 + expect(JobBuffer.values).to eq(['Rails', 'World']) + end + end + + it 'should not run job enqueued in the future' do + start_worker do + TestJob.set(wait: 5.seconds).perform_later('Rails') + sleep 2 + expect(JobBuffer.values.empty?).to eq true + end + end + + it 'should run job enqueued in the future at the specified time' do + start_worker do + TestJob.set(wait: 5.seconds).perform_later('Rails') + sleep 10 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should run job bulk enqueued in the future at the specified time" do + start_worker do + ActiveJob.perform_all_later([TestJob.new("Rails").set(wait: 5.seconds)]) + sleep 10 + expect(JobBuffer.values).to eq(['Rails']) + end + end + + it "should run job with higher priority first" do + start_worker do + wait_until = Time.now + 3.seconds + TestJob.set(wait_until: wait_until, priority: 20).perform_later "1" + TestJob.set(wait_until: wait_until, priority: 10).perform_later "2" + sleep 10 + + expect(JobBuffer.values).to eq(['2', '1']) + end + end + + private + def start_worker(&) + thread = Thread.new { worker.start } + yield + ensure + worker.stop + thread.join + end +end