Skip to content

Commit

Permalink
Refactors middleware and throttling in JoobQ
Browse files Browse the repository at this point in the history
Introduces a middleware pattern to handle job processing, replacing obsolete fail handling logic.
Adds throttling, retry, and timeout middlewares to enhance job lifecycle management.
Renames and refines dead letter logic; removes redundant FailHandler class.
Improves logging by integrating an async logging middleware for job enqueue events.

Enhances flexibility in configuring job pipelines and improves code maintainability.
  • Loading branch information
eliasjpr committed Nov 22, 2024
1 parent b82473e commit 72768ab
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 307 deletions.
22 changes: 21 additions & 1 deletion src/joobq/configure.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
module JoobQ
alias ThrottlerConfig = Hash(String, NamedTuple(limit: Int32, period: Time::Span))

# This struct is responsible for configuring and managing settings for the `JoobQ` job queue system.
#
# #### Properties and Getters
Expand Down Expand Up @@ -59,10 +61,28 @@ module JoobQ
property failed_ttl : Time::Span = 3.milliseconds
property dead_letter_ttl : Time::Span = 7.days
property job_registry : JobSchemaRegistry = JobSchemaRegistry.new
property middlewares : Array(Middleware) = [
Middleware::Throttle.new,
Middleware::Retry.new,
Middleware::Timeout.new,
] of Middleware

QUEUE_THROTTLE_LIMITS = ThrottlerConfig.new

def use
yield middlewares
end

getter middleware_pipeline : MiddlewarePipeline do
MiddlewarePipeline.new(middlewares)
end

macro queue(name, workers, job, throttle = nil)
{% begin %}
queues[{{name}}] = JoobQ::Queue({{job.id}}).new({{name}}, {{workers}}, {{throttle}})
queues[{{name}}] = JoobQ::Queue({{job.id}}).new({{name}}, {{workers}}, )
{% if throttle %}
JoobQ::Configure::QUEUE_THROTTLE_LIMITS[{{name}}] = {{throttle}}
{% end %}
job_registry.register({{job.id}})
{% end %}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ module JoobQ
# The dead letter queue is cleaned up by removing jobs that have been in the
# queue for longer than the dead letter expiration time. The dead letter
# expiration time is configurable and defaults to 7 days.
module DeadLetter
module DeadLetterManager
private class_getter expires : String = ::JoobQ.config.dead_letter_ttl.to_s
private class_getter store : Store = ::JoobQ.config.store

def self.add(job)
store.mark_as_dead(job, expires)
Log.error &.emit("Job Moved to Dead Letter Queue", job_id: job.jid.to_s)
end
end
end
123 changes: 0 additions & 123 deletions src/joobq/fail_handler.cr

This file was deleted.

4 changes: 4 additions & 0 deletions src/joobq/job.cr
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ module JoobQ
property retries : Int32 = JoobQ.config.retries
property expires : Int64 = JoobQ.config.expires.from_now.to_unix_ms
property status : Status = Status::Enqueued

@[JSON::Field(emit_null: true)]
property error : NamedTuple(failed_at: String, message: String | Nil, backtrace: String, cause: String)?

@[JSON::Field(ignore: true)]
property enqueue_time = Time.monotonic

Expand Down
32 changes: 32 additions & 0 deletions src/joobq/middleware.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
module JoobQ
module Middleware
abstract def matches?(job : Job, queue : BaseQueue) : Bool
abstract def call(job : Job, queue : BaseQueue, next_middleware : ->) : Nil
end

class MiddlewarePipeline

@middlewares : Array(Middleware)

def initialize(@middlewares : Array(Middleware) = [] of Middleware)
@middlewares = [] of Middleware
end

def call(job : Job, queue : BaseQueue, &block : ->)
call_next(0, job, queue, &block)
end

private def call_next(index : Int32, job : Job, queue : BaseQueue, &block : ->)
if index < @middlewares.size
middleware = @middlewares[index]
if middleware.matches?(job, queue)
middleware.call(job, queue, -> { call_next(index + 1, job, queue, &block) })
else
call_next(index + 1, job, queue, &block)
end
else
yield
end
end
end
end
22 changes: 22 additions & 0 deletions src/joobq/middlewares/async_logging_middleware.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
module JoobQ
module Middleware
class AsyncLogging
include Middleware

def matches?(job : JoobQ::Job, queue : BaseQueue) : Bool
true
end

def call(job : JoobQ::Job, queue : BaseQueue, next_middleware : ->) : Nil
spawn do
log_to_remote_service(job)
end
next_middleware.call
end

private def log_to_remote_service(job : JoobQ::Job)
Log.info &.emit("Job enqueued", queue: job.queue, job_id: job.jid.to_s)
end
end
end
end
51 changes: 51 additions & 0 deletions src/joobq/middlewares/retry_middleware.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module JoobQ
module Middleware
class Retry
include Middleware

def matches?(job : JoobQ::Job, queue : BaseQueue) : Bool
true # This middleware applies to all jobs
end

def call(job : JoobQ::Job, queue : BaseQueue, next_middleware : ->) : Nil
begin
next_middleware.call
rescue ex : Exception
handle_failure(job, queue, ex)
end
end

private def handle_failure(job : JoobQ::Job, queue : BaseQueue, ex : Exception)
Log.error &.emit("Job Failure", job_id: job.jid.to_s, error: ex.message)
job.failed!
job.retries -= 1

job.error = {
failed_at: Time.local.to_rfc3339,
message: ex.message,
backtrace: ex.inspect_with_backtrace[0..10],
cause: ex.cause.to_s,
}

if job.retries > 0
queue.metrics.increment_retried
job.retrying!
ExponentialBackoff.retry(job, queue)
else
queue.metrics.increment_dead
DeadLetterManager.add(job)
end
end
end
end

class ExponentialBackoff
def self.retry(job, queue)
delay = (2 ** (job.retries)) * 1000 # Delay in ms
# Logic to add the job back to the queue after a delay
queue.store.schedule(job, delay)
# Log.warn &.emit("Job moved to Retry Queue", job_id: job.jid.to_s)
Log.warn &.emit("Retrying Job", job_id: job.jid.to_s, retries_left: job.retries)
end
end
end
51 changes: 51 additions & 0 deletions src/joobq/middlewares/throttle_middleware.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
module JoobQ
module Middleware
class Throttle
include Middleware

private getter queue_throttle_limits : JoobQ::ThrottlerConfig
private getter throttlers : Hash(String, Throttler) = {} of String => Throttler

def initialize(@queue_throttle_limits = Configure::QUEUE_THROTTLE_LIMITS)
build_throttlers
end

def matches?(job : JoobQ::Job, queue : BaseQueue) : Bool
@queue_throttle_limits.has_key?(job.queue)
end

def call(job : JoobQ::Job, queue : BaseQueue, next_middleware : ->) : Nil
throttlers[job.queue].throttle
next_middleware.call
end

private def build_throttlers
@queue_throttle_limits.map do |queue, throttle_limit|
@throttlers[queue] = Throttler.new(throttle_limit)
end
end
end
end

class Throttler
@limit : Int32
@period : Float64

def initialize(throttle_limit : NamedTuple(limit: Int32, period: Time::Span))
@limit = throttle_limit[:limit]
@period = throttle_limit[:period].total_milliseconds
@min_interval = @period / @limit # milliseconds
@last_job_time = Time.utc.to_unix_ms
end

def throttle
now = Time.utc.to_unix_ms
elapsed = now - @last_job_time
sleep_time = @min_interval - elapsed
if sleep_time > 0
sleep (sleep_time / 1000.0).seconds
end
@last_job_time = Time.utc.to_unix_ms
end
end
end
20 changes: 20 additions & 0 deletions src/joobq/middlewares/timeout_middleware.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module JoobQ
module Middleware
class Timeout
include Middleware

def matches?(job : Job, queue : BaseQueue) : Bool
true
end

def call(job : Job, queue : BaseQueue, next_middleware : ->) : Nil
if job.expired?
job.expired!
DeadLetterManager.add(job)
else
next_middleware.call
end
end
end
end
end
4 changes: 2 additions & 2 deletions src/joobq/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ module JoobQ
getter total_workers : Int32
getter metrics : Metrics
getter worker_manager : WorkerManager(T) { WorkerManager(T).new(total_workers, self, metrics) }
getter throttle_limit : NamedTuple(limit: Int32, period: Time::Span)?
getter throttle_limit : ThrottlerConfig?

def initialize(@name : String, @total_workers : Int32, @throttle_limit : NamedTuple(limit: Int32, period: Time::Span)? = nil)
def initialize(@name : String, @total_workers : Int32, @throttle_limit : ThrottlerConfig? = nil)
@metrics = Metrics.new
end

Expand Down
Loading

0 comments on commit 72768ab

Please sign in to comment.