Skip to content

Commit

Permalink
Enhances job scheduling and memory storage
Browse files Browse the repository at this point in the history
Adds detailed in-memory store implementation for efficient job management and thorough unit testing for operations like enqueue, dequeue, and job failure handling.

Refines job scheduling with modular schedulers for delayed, recurring, and cron-based job execution, improving the scheduling robustness and reducing code complexity.

Updates job configuration and middleware management, optimizing logging and error handling across the job processing lifecycle.
  • Loading branch information
eliasjpr committed Nov 22, 2024
1 parent 72768ab commit 59799dd
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 236 deletions.
6 changes: 6 additions & 0 deletions playground/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@ JoobQ.configure do |config|
queue name: "queue:test", workers: 2, job: TestJob, throttle: nil
queue name: "queue:fail", workers: 1, job: FailJob
queue name: "queue:expire", workers: 1, job: ExpireJob

scheduler do
cron(pattern: "*/5 * * * * *") { puts "Every 30 seconds #{Time.local}" }
cron(pattern: "*/5 * * * * *") { }
every(1.minute, TestJob, x: 1)
end
end
1 change: 1 addition & 0 deletions playground/jobs/expire_jobs.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ struct ExpireJob
include JoobQ::Job
@queue = "queue:expire"
@retries = 2
@expires = 5.seconds.from_now.to_unix_ms

def initialize
end
Expand Down
12 changes: 6 additions & 6 deletions playground/jobs/test_job.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ struct TestJob
end

def perform
# random = Random.rand(100)
random = Random.rand(100)

# if random > 92
# raise "Bad"
# else
# sleep Random.rand(5).seconds
# end
if random > 98
raise "Bad"
else
sleep Random.rand(5).seconds
end

x + 1
end
Expand Down
4 changes: 2 additions & 2 deletions playground/load.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ require "./config"

1_000_000.times do |i|
TestJob.enqueue(x: i)
# FailJob.enqueue
# ExpireJob.enqueue
FailJob.enqueue
ExpireJob.enqueue
end

puts "Enqueued 1,000,000 jobs"
153 changes: 153 additions & 0 deletions spec/in_memory_store_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
require "./spec_helper"

# Tests for InMemoryStore
describe JoobQ::InMemoryStore do
store = JoobQ::InMemoryStore.new
job1 = ExampleJob.new(x: 1)
job2 = ExampleJob.new(x: 2)

describe "#initialize" do
it "starts with empty queues and no scheduled jobs" do
store.queue_size("default").should eq(0)
store.fetch_due_jobs(Time.local).should be_empty
end
end

describe "#enqueue" do
it "adds a job to the specified queue" do
store.enqueue(job1)
store.queue_size(job1.queue).should eq(1)
end
end

describe "#dequeue" do
context "when the queue has jobs" do
it "removes and returns the first job" do
store.queues[job1.queue] = [] of JoobQ::Job
store.enqueue(job1)
store.enqueue(job2)

dequeued_job = store.dequeue(job1.queue, ExampleJob)

dequeued_job.should eq(job1)

store.queue_size(job1.queue).should eq(1)
end
end

context "when the queue is empty" do
it "returns nil" do
store.dequeue("empty_queue", ExampleJob).should be_nil
end
end
end

describe "#clear_queue" do
it "removes all jobs from the queue" do
store.enqueue(job1)
store.enqueue(job2)

store.clear_queue(job1.queue)
store.queue_size(job1.queue).should eq(0)
end
end

describe "#delete_job" do
it "removes a specific job from the queue" do
store.enqueue(job1)
store.enqueue(job2)

store.delete_job(job1)
store.queue_size(job1.queue).should eq(1)
store.dequeue(job1.queue, ExampleJob).should eq(job2)
end
end

describe "#move_job_back_to_queue" do
it "requeues a job that was being processed" do
store.enqueue(job1)
store.move_job_back_to_queue(job1.queue)
store.queue_size(job1.queue).should eq(1)
end
end

describe "#mark_as_failed" do
it "stores a failed job with error details" do
error_details = {"error" => "Test Failure", "reason" => "Simulated error"}
store.mark_as_failed(job1, error_details)

failed_jobs = store.failed_jobs
failed_jobs.size.should eq(1)
failed_jobs.first.job.should eq(job1)
failed_jobs.first.error_details.should eq(error_details)
end
end

describe "#mark_as_dead" do
it "stores a dead job with an expiration time" do
expiration_time = (Time.local + 3600.seconds).to_rfc3339
store.mark_as_dead(job1, expiration_time)

dead_jobs = store.dead_jobs
dead_jobs.size.should eq(1)
dead_jobs.first.job.should eq(job1)
dead_jobs.first.expiration_time.should eq(Time.parse_rfc3339(expiration_time))
end
end

describe "#schedule" do
it "stores a job with a future execution time" do
store.schedule(job1, 5000) # 5 seconds delay
scheduled_jobs = store.scheduled_jobs
scheduled_jobs.size.should eq(1)
scheduled_jobs.first.job.should eq(job1)
end
end

describe "#fetch_due_jobs" do
context "when jobs are due" do
it "returns jobs ready for execution" do
store.schedule(job1, 0) # Immediate execution
store.schedule(job2, 10_000) # 10 seconds delay

due_jobs = store.fetch_due_jobs(Time.local)
due_jobs.size.should eq(1)
due_jobs.first.should contain(%("x":1))
end
end

context "when jobs are not yet due" do
it "returns an empty array" do
store.schedule(job1, 10_000) # 10 seconds delay

due_jobs = store.fetch_due_jobs(Time.local)
due_jobs.should be_empty
end
end
end

describe "#queue_size" do
it "returns the correct number of jobs in a queue" do
store.queues[job1.queue] = [] of JoobQ::Job

store.enqueue(job1)
store.enqueue(job2)
store.queue_size(job1.queue).should eq(2)
end
end

describe "#list_jobs" do
it "lists jobs in a queue with pagination" do
5.times do |i|
job = ExampleJob.new(x: i)
store.enqueue(ExampleJob.new(x: i))
end

jobs_page1 = store.list_jobs("example", page_number: 1, page_size: 2)
jobs_page2 = store.list_jobs("example", page_number: 2, page_size: 2)

jobs_page1.size.should eq(2)
jobs_page2.size.should eq(2)
end
end
end
6 changes: 3 additions & 3 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ JoobQ.configure do
queue "failed", 10, FailJob

scheduler do
cron("*/1 * * * *") { }
cron("*/5 20-23 * * *") { }
every 1.second, ExampleJob, x: 1
cron(pattern: "*/30 * * * *") { puts "Every 30 seconds #{Time.local}" }
cron(pattern: "*/5 20-23 * * *") { }
every(1.minute, ExampleJob, x: 1)
end
end
8 changes: 7 additions & 1 deletion src/joobq.cr
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ require "uuid"
require "uuid/json"
require "log"
require "cron_parser"

require "./joobq/store"

require "./joobq/**"

# ### Module `JoobQ`
Expand Down Expand Up @@ -97,7 +99,11 @@ module JoobQ

def self.forge
Log.info { "JoobQ starting..." }
scheduler.run

puts "Scheduler count: #{config.schedulers.size}"
config.schedulers.each do |scheduler|
scheduler.run
end

queues.each do |key, queue|
Log.info { "JoobQ starting #{key} queue..." }
Expand Down
82 changes: 36 additions & 46 deletions src/joobq/configure.cr
Original file line number Diff line number Diff line change
@@ -1,55 +1,28 @@
module JoobQ
alias ThrottlerConfig = Hash(String, NamedTuple(limit: Int32, period: Time::Span))

# This struct is responsible for configuring and managing settings for the `JoobQ` job queue system.
# `Configure` is responsible for managing the settings for the `JoobQ` job queue system.
#
# #### Properties and Getters
# ### Features
#
# - `INSTANCE`: A constant that holds a singleton instance of the `Configure` struct.
# - `redis`: A getter that returns a `Redis::PooledClient` instance. The Redis client is configured using environment
# variables, including host, port, password, pool size, and timeout settings.
# - `queues`: A getter returning a hash mapping queue names to their corresponding `BaseQueue` instances.
# - `stats_enabled`: A property indicating whether statistics tracking is enabled. Defaults to `false`.
# - `default_queue`: A property defining the name of the default queue. Defaults to `"default"`.
# - `retries`: A property indicating the number of retries for a job. Defaults to `3`.
# - `expires`: A property setting the expiration time for a job in seconds. Defaults to `3.days.total_seconds.to_i`.
# - `timeout`: A property indicating the timeout setting. Defaults to `2`.
#
# #### Macro `queue`
#
# - `queue(name, workers, kind)`: A macro for defining queues. It takes a queue name, the number of workers,
# and the kind of jobs the queue will handle. This macro creates and stores a new `JoobQ::Queue` instance in the
# `queues` hash.
#
# #### Method `scheduler`
#
# - `scheduler`: A method that yields the singleton instance of the `Scheduler`. This is useful for accessing the
# scheduler within the scope of the `Configure` struct.
# - Centralizes job queue configurations, Redis connection setup, and queue properties.
# - Provides default settings and allows easy customization through environment variables.
# - Supports defining queues, middlewares, throttling, and scheduling jobs.
#
# ### Usage Example
#
# To utilize the `Configure` struct for setting up queues, you can define them using the `queue` macro:
#
# ```
# JoobQ::Configure.instance.queue "my_queue", 5, MyJob
# JoobQ::Configure.instance.queue "my_queue", 5, MyJob, { limit: 10, period: 1.minute }
# ```
#
# This would create a new queue named "my_queue" with 5 workers that handle `MyJob` type jobs and store it in the
# `queues` hash.
#
# ### Notes
#
# - The `Configure` struct centralizes the configuration settings for the `JoobQ` system, including Redis connection
# parameters and queue properties.
# - It uses environment variables for configuring the Redis client, providing flexibility and ease of configuration
# in different environments.
# - The `queue` macro simplifies the process of setting up different types of queues, making the `JoobQ` system
# adaptable to various job processing requirements.
class Configure
# Loads the logger configuration from the environment variables
# and sets the default log level to `:trace`.
Log.setup_from_env(default_level: :trace)

# Constants
QUEUE_THROTTLE_LIMITS = ThrottlerConfig.new

# Properties and Getters
getter queues = {} of String => BaseQueue
getter time_location : Time::Location = Time::Location.load("America/New_York")

property store : Store = RedisStore.new
property? rest_api_enabled : Bool = false
Expand All @@ -61,34 +34,51 @@ module JoobQ
property failed_ttl : Time::Span = 3.milliseconds
property dead_letter_ttl : Time::Span = 7.days
property job_registry : JobSchemaRegistry = JobSchemaRegistry.new

# Middlewares and Pipeline
property middlewares : Array(Middleware) = [
Middleware::Throttle.new,
Middleware::Retry.new,
Middleware::Timeout.new,
] of Middleware

QUEUE_THROTTLE_LIMITS = ThrottlerConfig.new
getter middleware_pipeline : MiddlewarePipeline do
MiddlewarePipeline.new(middlewares)
end

# Schedulers
property schedulers : Array(Scheduler) = [] of Scheduler

def use
# DSL: Add custom middlewares
def use(& : ->)
yield middlewares
end

getter middleware_pipeline : MiddlewarePipeline do
MiddlewarePipeline.new(middlewares)
# Set the time location globally
def time_location=(tz : String = "America/New_York") : Time::Location
timezone = Time::Location.load(tz)
Time::Location.local = timezone
timezone
end

# Macro: Define a queue
#
# Adds a queue configuration and optionally applies throttling limits.
macro queue(name, workers, job, throttle = nil)
{% begin %}
queues[{{name}}] = JoobQ::Queue({{job.id}}).new({{name}}, {{workers}}, )
queues[{{name}}] = JoobQ::Queue({{job.id}}).new({{name}}, {{workers}})
{% if throttle %}
JoobQ::Configure::QUEUE_THROTTLE_LIMITS[{{name}}] = {{throttle}}
{% end %}
job_registry.register({{job.id}})
{% end %}
end

def scheduler(&)
with Scheduler.instance yield
# Add a scheduler and execute within its context
def scheduler(tz : Time::Location = self.time_location)
scheduler = Scheduler.new(timezone: tz)
@schedulers << scheduler
with scheduler yield
end
end
end
Loading

0 comments on commit 59799dd

Please sign in to comment.