Skip to content

Commit

Permalink
Refactors throttling logic and updates dependencies
Browse files Browse the repository at this point in the history
Removes static throttle limits, integrating them into queue instances. Improves throttle handling by using last job timestamps stored per queue. Cleans queue metrics formatting and fixes minor style inconsistencies.

Adds dependencies "any_hash", "backtracer", and "raven" to the shard.lock, updating library setup.

Fixes handling and configuration of throttling logic per queue for enhanced flexibility and maintainability.
  • Loading branch information
eliasjpr committed Nov 25, 2024
1 parent d3dd4d4 commit 5312e86
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 48 deletions.
12 changes: 12 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
version: 2.0
shards:
any_hash:
git: https://github.com/sija/any_hash.cr.git
version: 0.2.5

backtracer:
git: https://github.com/sija/backtracer.cr.git
version: 1.2.2

cron_parser:
git: https://github.com/kostya/cron_parser.git
version: 0.4.0
Expand All @@ -8,6 +16,10 @@ shards:
git: https://github.com/ysbaddaden/pool.git
version: 0.2.4

raven:
git: https://github.com/sija/raven.cr.git
version: 1.9.4

redis:
git: https://github.com/stefanwille/crystal-redis.git
version: 2.9.1
Expand Down
9 changes: 1 addition & 8 deletions src/joobq/configure.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ module JoobQ
# ```
class Configure
Log.setup_from_env(default_level: :trace)

# Constants
QUEUE_THROTTLE_LIMITS = ThrottlerConfig.new

# Properties and Getters
getter queues = {} of String => BaseQueue
getter time_location : Time::Location = Time::Location.load("America/New_York")
Expand Down Expand Up @@ -66,10 +62,7 @@ module JoobQ
# Adds a queue configuration and optionally applies throttling limits.
macro queue(name, workers, job, throttle = nil)
{% begin %}
queues[{{name}}] = JoobQ::Queue({{job.id}}).new({{name}}, {{workers}})
{% if throttle %}
JoobQ::Configure::QUEUE_THROTTLE_LIMITS[{{name}}] = {{throttle}}
{% end %}
queues[{{name}}] = JoobQ::Queue({{job.id}}).new({{name}}, {{workers}}, {{throttle}})
job_registry.register({{job.id}})
{% end %}
end
Expand Down
51 changes: 20 additions & 31 deletions src/joobq/middlewares/throttle_middleware.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,38 @@ module JoobQ
class Throttle
include Middleware

private getter queue_throttle_limits : JoobQ::ThrottlerConfig
private getter throttlers : Hash(String, Throttler) = {} of String => Throttler

def initialize(@queue_throttle_limits = Configure::QUEUE_THROTTLE_LIMITS)
build_throttlers
end
private getter last_job_times : Hash(String, Int64) = {} of String => Int64

def matches?(job : JoobQ::Job, queue : BaseQueue) : Bool
@queue_throttle_limits.has_key?(job.queue)
!queue.throttle_limit.nil?
end

def call(job : JoobQ::Job, queue : BaseQueue, next_middleware : ->) : Nil
throttlers[job.queue].throttle
throttle(queue)
next_middleware.call
end

private def build_throttlers
@queue_throttle_limits.map do |queue, throttle_limit|
@throttlers[queue] = Throttler.new(throttle_limit)
end
end
end
end
private def throttle(queue : BaseQueue)
if throttle = queue.throttle_limit.not_nil!
limit = throttle[:limit]
period = throttle[:period].total_milliseconds

class Throttler
@limit : Int32
@period : Float64
min_interval = period / limit

def initialize(throttle_limit : NamedTuple(limit: Int32, period: Time::Span))
@limit = throttle_limit[:limit]
@period = throttle_limit[:period].total_milliseconds
@min_interval = @period / @limit # milliseconds
@last_job_time = Time.local.to_unix_ms
end
now = Time.local.to_unix_ms
last_job_time = @last_job_times[queue.name]?

def throttle
now = Time.local.to_unix_ms
elapsed = now - @last_job_time
sleep_time = @min_interval - elapsed
if sleep_time > 0
sleep (sleep_time / 1000.0).seconds
if last_job_time
elapsed = now - last_job_time
sleep_time = min_interval - elapsed
if sleep_time > 0
sleep (sleep_time / 1000.0).seconds
end
end

@last_job_times[queue.name] = now
end
end
@last_job_time = Time.local.to_unix_ms
end
end
end
5 changes: 3 additions & 2 deletions src/joobq/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ module JoobQ
getter total_workers : Int32
getter metrics : Metrics
getter worker_manager : WorkerManager(T) { WorkerManager(T).new(total_workers, self, metrics) }
getter throttle_limit : ThrottlerConfig?
getter throttle_limit : NamedTuple(limit: Int32, period: Time::Span)?

def initialize(@name : String, @total_workers : Int32, @throttle_limit : ThrottlerConfig? = nil)
def initialize(@name : String, @total_workers : Int32,
@throttle_limit : NamedTuple(limit: Int32, period: Time::Span)? = nil)
@metrics = Metrics.new
end

Expand Down
11 changes: 4 additions & 7 deletions src/joobq/queue_metrics.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ module JoobQ
class QueueMetrics
include MetricsProvider

NON_METRIC_KEYS = %w[instance_id process_id last_updated started_at throttle_limit name job_type status]
NON_METRIC_KEYS = %w[instance_id process_id last_updated started_at throttle_limit name job_type status]
METRICS_KEY_PATTERN = "joobq:metrics:*"
AVERAGES_METRICS = %w[
AVERAGES_METRICS = %w[
jobs_completed_per_second errors_per_second enqueued_per_second
job_wait_time job_execution_time worker_utilization
error_rate_trend failed_job_rate percent_completed
Expand Down Expand Up @@ -65,28 +65,25 @@ module JoobQ
end
end

def all_queue_metrics : Hash(String, Hash(String, String | Hash(String, Float64 | Int64)))
def all_queue_metrics : Hash(String, Hash(String, String | Hash(String, Float64 | Int64)))
result = Hash(String, Hash(String, String | Hash(String, Float64 | Int64))).new
@queues.keys.each do |queue_name|
result[queue_name] = queue_metrics(queue_name)
end
result
end


def queue_metrics(queue_name : String) : Hash(String, String | Hash(String, Float64 | Int64))
queue = @queues[queue_name]

result = Hash(String, String | Hash(String, Float64 | Int64)).new
result["status"] = queue.status
result["job_type"] = queue.job_type
result["started_at"] = queue.metrics.start_time.from_now.to_s
result["stats"]= aggregate_metrics(queue_name)
result["stats"] = aggregate_metrics(queue_name)
result
end



# Aggregate metrics across all queues into a single hash
def global_metrics : Hash(String, Float64 | Int64)
data = metric_store
Expand Down

0 comments on commit 5312e86

Please sign in to comment.