Skip to content

Commit

Permalink
Enhances metrics and refactors API server logging
Browse files Browse the repository at this point in the history
Introduces a robust metrics collection system with new performance
metrics and overtime series for job queues, enabling detailed insights
on execution and wait times.

Refactors logging in the API server for uniformity and readability,
and revises job and queue configurations for simplification.

Reorders and comments out configuration queues previously defined.
Updates Redis version for compatibility improvements.

Improves statistical calculations, providing a more precise analysis of
queue health, throughput, and error trends.
  • Loading branch information
eliasjpr committed Nov 19, 2024
1 parent 9d87b08 commit 9261351
Show file tree
Hide file tree
Showing 16 changed files with 480 additions and 181 deletions.
1 change: 1 addition & 0 deletions playground/bench.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./config"
Log.info { "Stats Enabled: #{JoobQ.config.stats_enabled?}" }

JoobQ.forge
sleep
8 changes: 4 additions & 4 deletions playground/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ require "../src/joobq"
require "./jobs/*"

JoobQ.configure do |config|
queue name: "queue:test", workers: 5, job: TestJob, throttle: nil
queue "queue:fail", 5, FailJob
queue "queue:expire", 1, ExpireJob

config.rest_api_enabled = true

queue name: "queue:test", workers: 5, job: TestJob, throttle: nil
# queue "queue:fail", 5, FailJob
# queue "queue:expire", 1, ExpireJob
end
2 changes: 1 addition & 1 deletion playground/jobs/test_job.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ struct TestJob
def perform
random = Random.rand(100)

if random > 98
if random > 99
raise "Bad"
end

Expand Down
4 changes: 2 additions & 2 deletions playground/load.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ require "./config"

1_000_000.times do |i|
TestJob.enqueue(x: i)
FailJob.enqueue
ExpireJob.enqueue
# FailJob.enqueue
# ExpireJob.enqueue
end

puts "Enqueued 1,000,000 jobs"
4 changes: 2 additions & 2 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ shards:

pool:
git: https://github.com/ysbaddaden/pool.git
version: 0.3.0
version: 0.2.4

redis:
git: https://github.com/stefanwille/crystal-redis.git
version: 2.8.3
version: 2.9.1

1 change: 0 additions & 1 deletion shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ license: MIT
dependencies:
redis:
github: stefanwille/crystal-redis
version: ~> 2.8.0

cron_parser:
github: kostya/cron_parser
37 changes: 22 additions & 15 deletions src/joobq.cr
Original file line number Diff line number Diff line change
Expand Up @@ -57,45 +57,45 @@ require "./joobq/**"
# - Logging and statistics creation are integral parts of the module, facilitating monitoring and debugging.
# - The module's design allows for flexible configuration and easy management of job queues.
module JoobQ
extend self
CONFIG = Configure.new

def config
Configure.instance
def self.config
CONFIG
end

def configure(&)
with config yield config
def self.configure(&)
with CONFIG yield CONFIG
end

def store
def self.store
config.store
end

def reset
def self.reset
store.reset
end

def statistics
JoobQ::GlobalStats.calculate_stats(queues)
def self.statistics
JoobQ::GlobalStats.instance.calculate_stats
end

def queues
def self.queues
config.queues
end

def add(job)
def self.add(job)
store.enqueue(job)
end

def scheduler
def self.scheduler
Scheduler.instance
end

def [](name : String)
def self.[](name : String)
queues[name]
end

def forge
def self.forge
Log.info { "JoobQ starting..." }
scheduler.run

Expand All @@ -104,10 +104,17 @@ module JoobQ
queue.start
end

spawn do
QueueMetrics.instance.collect_and_store_metrics
loop do
QueueMetrics.instance.collect_and_store_metrics
sleep 5.seconds
end
end

Log.info { "JoobQ initialized and waiting for Jobs..." }

Log.info { "Rest API Enabled: #{config.rest_api_enabled?}" }

if config.rest_api_enabled?
APIServer.start
end
Expand Down
42 changes: 14 additions & 28 deletions src/joobq/api_server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ require "json"

module JoobQ
class APIServer
Log = ::Log.for("API SERVER")

def self.start
new.start
end
Expand All @@ -11,7 +13,7 @@ module JoobQ
server = HTTP::Server.new([APIHandler.new])
address = "0.0.0.0"
port = 8080
puts "Listening on http://#{address}:#{port}"
Log.info &.emit("Listening on http://#{address}:#{port}")
server.bind_tcp(address, port)
server.listen
end
Expand Down Expand Up @@ -158,45 +160,29 @@ module JoobQ
path = context.request.path

case {method: method, path: path}
when {method: "POST", path: "/joobq/jobs"} then enqueue_job(context)
when {method: "GET", path: "/joobq/jobs/registry"} then job_registry(context)
when {method: "GET", path: "/joobq/queue/metrics"} then queue_metrics(context)
when {method: "GET", path: "/joobq/metrics"} then global_metrics(context)
when {method: "GET", path: "/joobq/metrics/series"}then overtime_series(context)
else call_next(context)
when {method: "POST", path: "/joobq/jobs"} then enqueue_job(context)
when {method: "GET", path: "/joobq/jobs/registry"} then job_registry(context)
when {method: "GET", path: "/joobq/queue/metrics"} then queue_metrics(context)
when {method: "GET", path: "/joobq/metrics"} then global_metrics(context)
when {method: "GET", path: "/joobq/metrics/series"} then overtime_series(context)
else call_next(context)
end
end

private def overtime_series(context)
context.response.content_type = "application/json"
context.response.print(::JoobQ.statistics["overtime_series"].to_json)
metrics = GlobalStats.instance.calculate_stats
context.response.print(metrics["overtime_series"].to_json)
end

private def global_metrics(context)
context.response.content_type = "application/json"
context.response.print(::JoobQ.statistics.to_json)
metrics = GlobalStats.instance.calculate_stats
context.response.print(metrics.to_json)
end

private def queue_metrics(context)
metrics = ::JoobQ.queues.map do |_, queue|
{queue.name => {
:total_workers => queue.info[:total_workers],
:status => queue.info[:status],
:metrics => {
:enqueued => queue.info[:enqueued],
:completed => queue.info[:completed],
:retried => queue.info[:retried],
:dead => queue.info[:dead],
:processing => queue.info[:processing],
:running_workers => queue.info[:running_workers],
:jobs_per_second => queue.info[:jobs_per_second],
:errors_per_second => queue.info[:errors_per_second],
:enqueued_per_second => queue.info[:enqueued_per_second],
:jobs_latency => queue.info[:jobs_latency].to_s,
:elapsed_time => queue.info[:elapsed_time].to_s,
},
}}
end
metrics = QueueMetrics.new.all_queue_metrics

context.response.headers["Refresh"] = "5"
context.response.content_type = "application/json"
Expand Down
9 changes: 3 additions & 6 deletions src/joobq/configure.cr
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,22 @@ module JoobQ
# in different environments.
# - The `queue` macro simplifies the process of setting up different types of queues, making the `JoobQ` system
# adaptable to various job processing requirements.
struct Configure
class Configure
# Loads the logger configuration from the environment variables
# and sets the default log level to `:trace`.
Log.setup_from_env(default_level: :trace)

class_getter instance : Configure = new

getter queues = {} of String => BaseQueue

property store : Store = RedisStore.new
property? stats_enabled : Bool = false
property? rest_api_enabled : Bool = false
property? stats_enabled : Bool = true
property default_queue : String = "default"
property retries : Int32 = 3
property expires : Time::Span = 3.days
property timeout : Time::Span = 2.seconds
property failed_ttl : Time::Span = 3.milliseconds
property dead_letter_ttl : Time::Span = 7.days
property job_registry : JobSchemaRegistry = JobSchemaRegistry.new
property? rest_api_enabled : Bool = false

macro queue(name, workers, job, throttle = nil)
{% begin %}
Expand Down
4 changes: 2 additions & 2 deletions src/joobq/fail_handler.cr
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ module JoobQ
# Logic to add the job back to the queue after a delay
queue.store.schedule(job, delay)
queue.retried.add(1)
Log.warn &.emit("Job moved to Retry Queue", job_id: job.jid.to_s)
# Log.warn &.emit("Job moved to Retry Queue", job_id: job.jid.to_s)
end
end
end

module DeadLetterManager
def self.add(job, queue, error = nil)
DeadLetter.add job
Log.error &.emit("Job moved to Dead Letter Queue", error: error)
# Log.error &.emit("Job moved to Dead Letter Queue", error: error)
end
end
end
Loading

0 comments on commit 9261351

Please sign in to comment.