diff --git a/playground/bench.cr b/playground/bench.cr index 4daa420..0bfb509 100644 --- a/playground/bench.cr +++ b/playground/bench.cr @@ -1,4 +1,5 @@ require "./config" +Log.info { "Stats Enabled: #{JoobQ.config.stats_enabled?}" } JoobQ.forge sleep diff --git a/playground/config.cr b/playground/config.cr index ac0d24d..e346b59 100644 --- a/playground/config.cr +++ b/playground/config.cr @@ -2,9 +2,9 @@ require "../src/joobq" require "./jobs/*" JoobQ.configure do |config| - queue name: "queue:test", workers: 5, job: TestJob, throttle: nil - queue "queue:fail", 5, FailJob - queue "queue:expire", 1, ExpireJob - config.rest_api_enabled = true + + queue name: "queue:test", workers: 5, job: TestJob, throttle: nil + # queue "queue:fail", 5, FailJob + # queue "queue:expire", 1, ExpireJob end diff --git a/playground/jobs/test_job.cr b/playground/jobs/test_job.cr index 65ae291..e9bf06d 100644 --- a/playground/jobs/test_job.cr +++ b/playground/jobs/test_job.cr @@ -13,7 +13,7 @@ struct TestJob def perform random = Random.rand(100) - if random > 98 + if random > 99 raise "Bad" end diff --git a/playground/load.cr b/playground/load.cr index d236832..8a45244 100644 --- a/playground/load.cr +++ b/playground/load.cr @@ -2,8 +2,8 @@ require "./config" 1_000_000.times do |i| TestJob.enqueue(x: i) - FailJob.enqueue - ExpireJob.enqueue + # FailJob.enqueue + # ExpireJob.enqueue end puts "Enqueued 1,000,000 jobs" diff --git a/shard.lock b/shard.lock index d11d573..3c32c1d 100644 --- a/shard.lock +++ b/shard.lock @@ -6,9 +6,9 @@ shards: pool: git: https://github.com/ysbaddaden/pool.git - version: 0.3.0 + version: 0.2.4 redis: git: https://github.com/stefanwille/crystal-redis.git - version: 2.8.3 + version: 2.9.1 diff --git a/shard.yml b/shard.yml index c3e2dfb..1a5e524 100644 --- a/shard.yml +++ b/shard.yml @@ -15,7 +15,6 @@ license: MIT dependencies: redis: github: stefanwille/crystal-redis - version: ~> 2.8.0 cron_parser: github: kostya/cron_parser diff --git a/src/joobq.cr b/src/joobq.cr index 1d9cb5a..8a7a6ec 100644 --- a/src/joobq.cr +++ b/src/joobq.cr @@ -57,45 +57,45 @@ require "./joobq/**" # - Logging and statistics creation are integral parts of the module, facilitating monitoring and debugging. # - The module's design allows for flexible configuration and easy management of job queues. module JoobQ - extend self + CONFIG = Configure.new - def config - Configure.instance + def self.config + CONFIG end - def configure(&) - with config yield config + def self.configure(&) + with CONFIG yield CONFIG end - def store + def self.store config.store end - def reset + def self.reset store.reset end - def statistics - JoobQ::GlobalStats.calculate_stats(queues) + def self.statistics + JoobQ::GlobalStats.instance.calculate_stats end - def queues + def self.queues config.queues end - def add(job) + def self.add(job) store.enqueue(job) end - def scheduler + def self.scheduler Scheduler.instance end - def [](name : String) + def self.[](name : String) queues[name] end - def forge + def self.forge Log.info { "JoobQ starting..." } scheduler.run @@ -104,10 +104,17 @@ module JoobQ queue.start end + spawn do + QueueMetrics.instance.collect_and_store_metrics + loop do + QueueMetrics.instance.collect_and_store_metrics + sleep 5.seconds + end + end + Log.info { "JoobQ initialized and waiting for Jobs..." } Log.info { "Rest API Enabled: #{config.rest_api_enabled?}" } - if config.rest_api_enabled? APIServer.start end diff --git a/src/joobq/api_server.cr b/src/joobq/api_server.cr index e6cb8b7..8cfb577 100644 --- a/src/joobq/api_server.cr +++ b/src/joobq/api_server.cr @@ -3,6 +3,8 @@ require "json" module JoobQ class APIServer + Log = ::Log.for("API SERVER") + def self.start new.start end @@ -11,7 +13,7 @@ module JoobQ server = HTTP::Server.new([APIHandler.new]) address = "0.0.0.0" port = 8080 - puts "Listening on http://#{address}:#{port}" + Log.info &.emit("Listening on http://#{address}:#{port}") server.bind_tcp(address, port) server.listen end @@ -158,45 +160,29 @@ module JoobQ path = context.request.path case {method: method, path: path} - when {method: "POST", path: "/joobq/jobs"} then enqueue_job(context) - when {method: "GET", path: "/joobq/jobs/registry"} then job_registry(context) - when {method: "GET", path: "/joobq/queue/metrics"} then queue_metrics(context) - when {method: "GET", path: "/joobq/metrics"} then global_metrics(context) - when {method: "GET", path: "/joobq/metrics/series"}then overtime_series(context) - else call_next(context) + when {method: "POST", path: "/joobq/jobs"} then enqueue_job(context) + when {method: "GET", path: "/joobq/jobs/registry"} then job_registry(context) + when {method: "GET", path: "/joobq/queue/metrics"} then queue_metrics(context) + when {method: "GET", path: "/joobq/metrics"} then global_metrics(context) + when {method: "GET", path: "/joobq/metrics/series"} then overtime_series(context) + else call_next(context) end end private def overtime_series(context) context.response.content_type = "application/json" - context.response.print(::JoobQ.statistics["overtime_series"].to_json) + metrics = GlobalStats.instance.calculate_stats + context.response.print(metrics["overtime_series"].to_json) end private def global_metrics(context) context.response.content_type = "application/json" - context.response.print(::JoobQ.statistics.to_json) + metrics = GlobalStats.instance.calculate_stats + context.response.print(metrics.to_json) end private def queue_metrics(context) - metrics = ::JoobQ.queues.map do |_, queue| - {queue.name => { - :total_workers => queue.info[:total_workers], - :status => queue.info[:status], - :metrics => { - :enqueued => queue.info[:enqueued], - :completed => queue.info[:completed], - :retried => queue.info[:retried], - :dead => queue.info[:dead], - :processing => queue.info[:processing], - :running_workers => queue.info[:running_workers], - :jobs_per_second => queue.info[:jobs_per_second], - :errors_per_second => queue.info[:errors_per_second], - :enqueued_per_second => queue.info[:enqueued_per_second], - :jobs_latency => queue.info[:jobs_latency].to_s, - :elapsed_time => queue.info[:elapsed_time].to_s, - }, - }} - end + metrics = QueueMetrics.new.all_queue_metrics context.response.headers["Refresh"] = "5" context.response.content_type = "application/json" diff --git a/src/joobq/configure.cr b/src/joobq/configure.cr index 00c18d8..64f478a 100644 --- a/src/joobq/configure.cr +++ b/src/joobq/configure.cr @@ -43,17 +43,15 @@ module JoobQ # in different environments. # - The `queue` macro simplifies the process of setting up different types of queues, making the `JoobQ` system # adaptable to various job processing requirements. - struct Configure + class Configure # Loads the logger configuration from the environment variables # and sets the default log level to `:trace`. Log.setup_from_env(default_level: :trace) - - class_getter instance : Configure = new - getter queues = {} of String => BaseQueue property store : Store = RedisStore.new - property? stats_enabled : Bool = false + property? rest_api_enabled : Bool = false + property? stats_enabled : Bool = true property default_queue : String = "default" property retries : Int32 = 3 property expires : Time::Span = 3.days @@ -61,7 +59,6 @@ module JoobQ property failed_ttl : Time::Span = 3.milliseconds property dead_letter_ttl : Time::Span = 7.days property job_registry : JobSchemaRegistry = JobSchemaRegistry.new - property? rest_api_enabled : Bool = false macro queue(name, workers, job, throttle = nil) {% begin %} diff --git a/src/joobq/fail_handler.cr b/src/joobq/fail_handler.cr index d1dcc5d..a44d4ba 100644 --- a/src/joobq/fail_handler.cr +++ b/src/joobq/fail_handler.cr @@ -110,7 +110,7 @@ module JoobQ # Logic to add the job back to the queue after a delay queue.store.schedule(job, delay) queue.retried.add(1) - Log.warn &.emit("Job moved to Retry Queue", job_id: job.jid.to_s) + # Log.warn &.emit("Job moved to Retry Queue", job_id: job.jid.to_s) end end end @@ -118,7 +118,7 @@ module JoobQ module DeadLetterManager def self.add(job, queue, error = nil) DeadLetter.add job - Log.error &.emit("Job moved to Dead Letter Queue", error: error) + # Log.error &.emit("Job moved to Dead Letter Queue", error: error) end end end diff --git a/src/joobq/global_stats.cr b/src/joobq/global_stats.cr index 61f80fd..2c50010 100644 --- a/src/joobq/global_stats.cr +++ b/src/joobq/global_stats.cr @@ -1,110 +1,166 @@ module JoobQ - module JoobQ - class GlobalStats - property total_enqueued : Int64 = 0 - property total_completed : Int64 = 0 - property total_retried : Int64 = 0 - property total_dead : Int64 = 0 - property total_processing : Int64 = 0 - property total_workers : Int32 = 0 - property total_running_workers : Int32 = 0 - property jobs_per_second : Float64 = 0.0 - property errors_per_second : Float64 = 0.0 - property enqueued_per_second : Float64 = 0.0 - property jobs_latency : String = "0s" - @start_time = Time.monotonic - - property overtime_series : Array(NamedTuple(name: String, type: String, data: Array(NamedTuple(x: String, y: Float64 | Int64)))) = - [] of NamedTuple(name: String, type: String, data: Array(NamedTuple(x: String, y: Float64 | Int64))) - - def initialize - @overtime_series << { name: "Enqueued", type: "column", data: [] of NamedTuple(x: String, y: Float64 | Int64) } - @overtime_series << { name: "Completed", type: "line", data: [] of NamedTuple(x: String, y: Float64 | Int64) } - end + # Interface for metrics providers + module MetricsProvider + abstract def global_metrics : Hash(String, Int64 | Float64) + end - def self.calculate_stats(queues) - @@stats ||= new - @@stats.not_nil!.calculate_stats(queues) + # Utility module for common statistical calculations + module StatsUtils + def self.percent_of(value : Number, total : Number) : Float64 + if total.to_f == 0 || value.to_f <= 0 + 0.0 + else + percentage = (value.to_f / total.to_f * 100) + percentage = 100.0 if percentage > 100.0 # Optional: Cap at 100% + percentage.round(2) end + end - def calculate_stats(queues) - reset - queues.each do |_, queue| - info = queue.info - @total_enqueued += info[:enqueued] - @total_completed += info[:completed] - @total_retried += info[:retried] - @total_dead += info[:dead] - @total_processing += info[:processing] - @total_workers += info[:total_workers] - @total_running_workers += info[:running_workers] - @jobs_per_second += info[:jobs_per_second] - @errors_per_second += info[:errors_per_second] - @enqueued_per_second += info[:enqueued_per_second] - @jobs_latency += info[:jobs_latency] - end - - stats + def self.format_latency(latency_in_seconds : Float64) : String + if latency_in_seconds >= 1 + "#{latency_in_seconds.round(2)}s" + else + "#{(latency_in_seconds * 1000).round(2)}ms" end + end + end - def reset - @total_enqueued = 0 - @total_completed = 0 - @total_retried = 0 - @total_dead = 0 - @total_processing = 0 - @total_workers = 0 - @total_running_workers = 0 - @jobs_per_second = 0.0 - @errors_per_second = 0.0 - @enqueued_per_second = 0.0 - @jobs_latency = "0s" - end + # Class to handle overtime series data + class OvertimeSeries + alias SeriesData = Array(NamedTuple(x: String, y: Float64 | Int64)) + alias Series = NamedTuple(name: String, type: String, data: SeriesData) - def stats - { - "total_enqueued" => @total_enqueued, - "total_completed" => @total_completed, - "total_retried" => @total_retried, - "total_dead" => @total_dead, - "total_processing" => @total_processing, - "total_workers" => @total_workers, - "total_running_workers" => @total_running_workers, - "jobs_per_second" => @jobs_per_second.round(2), - "errors_per_second" => @errors_per_second.round(2), - "enqueued_per_second" => @enqueued_per_second.round(2), - "percent_pending" => percent_of(@total_enqueued-@total_completed, @total_enqueued), - "percent_processing" => percent_of(@total_processing, @total_enqueued), - "percent_completed" => percent_of(@total_completed, @total_enqueued), - "percent_retried" => percent_of(@total_retried, @total_enqueued), - "percent_dead" => percent_of(@total_dead, @total_enqueued), - "overtime_series" => overtime_series, - } - end + property overtime_series : Array(Series) = [] of Series - def overtime_series - enqueued_series = @overtime_series.first - completed_series = @overtime_series.last + def initialize + @overtime_series << {name: "Enqueued", type: "column", data: SeriesData.new(10)} + @overtime_series << {name: "Completed", type: "line", data: SeriesData.new(10)} + end - current_time = Time.local - elapsed = Time.monotonic - @start_time - enqueued_series[:data] << { x: current_time.to_rfc3339, y: @total_enqueued} - completed_series[:data] << { x: current_time.to_rfc3339, y: @jobs_per_second.round(2)} + def update(enqueued : Float64, jobs_completed_per_second : Float64) + current_time = Time.local.to_rfc3339 + enqueued_series = @overtime_series.first + completed_series = @overtime_series.last - enqueued_series[:data].shift if enqueued_series[:data].size > 10 - completed_series[:data].shift if completed_series[:data].size > 10 + enqueued_series[:data] << {x: current_time, y: enqueued} + completed_series[:data] << {x: current_time, y: jobs_completed_per_second} - @start_time = Time.monotonic - @overtime_series - end + enqueued_series[:data].shift if enqueued_series[:data].size > 10 + completed_series[:data].shift if completed_series[:data].size > 10 + end + end - def per_second(value, elapsed) - elapsed.to_f == 0 ? 0.0 : value.to_f / elapsed.to_f - end + # Class to aggregate global statistics + class GlobalStats + include StatsUtils - def percent_of(value, total) - total.to_f == 0 ? 0.0 : (value.to_f / total.to_f * 100).round(2) - end + def self.instance + @@instance ||= new + end + + # Define properties for all the stats + property total_workers : Int64 = 0 + property current_size : Int64 = 0 + property total_jobs : Int64 = 0 + property completed : Int64 = 0 + property retried : Int64 = 0 + property dead : Int64 = 0 + property processing : Int64 = 0 + property running_workers : Int64 = 0 + property jobs_completed_per_second : Float64 = 0.0 + property errors_per_second : Float64 = 0.0 + property enqueued_per_second : Float64 = 0.0 + property job_wait_time : Float64 = 0.0 + property job_execution_time : Float64 = 0.0 + property worker_utilization : Float64 = 0.0 + property error_rate_trend : Float64 = 0.0 + property failed_job_rate : Float64 = 0.0 + property jobs_per_worker_per_second : Float64 = 0.0 + + property overtime_series : OvertimeSeries + + def initialize(@metrics_provider : MetricsProvider = QueueMetrics.instance) + reset + @overtime_series = OvertimeSeries.new + end + + # Calculate global statistics using a metrics provider + def calculate_stats + reset + global_metrics = @metrics_provider.global_metrics + + @total_workers = global_metrics["total_workers"].to_i64 + @current_size = global_metrics["current_size"].to_i64 + @total_jobs = global_metrics["total_jobs"].to_i64 + @completed = global_metrics["completed"].to_i64 + @retried = global_metrics["retried"].to_i64 + @dead = global_metrics["dead"].to_i64 + @processing = global_metrics["processing"].to_i64 + @running_workers = global_metrics["running_workers"].to_i64 + @jobs_completed_per_second = global_metrics["jobs_completed_per_second"].to_f64 + @errors_per_second = global_metrics["errors_per_second"].to_f64 + @enqueued_per_second = global_metrics["enqueued_per_second"].to_f64 + @job_wait_time = global_metrics["job_wait_time"].to_f64 + @job_execution_time = global_metrics["job_execution_time"].to_f64 + @worker_utilization = global_metrics["worker_utilization"].to_f64 + @error_rate_trend = global_metrics["error_rate_trend"].to_f64 + @failed_job_rate = global_metrics["failed_job_rate"].to_f64 + @jobs_per_worker_per_second = global_metrics["jobs_per_worker_per_second"].to_f64 + + update_overtime_series + stats + end + + private def reset + @total_workers = 0 + @current_size = 0 + @total_jobs = 0 + @completed = 0 + @retried = 0 + @dead = 0 + @processing = 0 + @running_workers = 0 + @jobs_completed_per_second = 0.0 + @errors_per_second = 0.0 + @enqueued_per_second = 0.0 + @job_wait_time = 0.0 + @job_execution_time = 0.0 + @worker_utilization = 0.0 + @error_rate_trend = 0.0 + @failed_job_rate = 0.0 + @jobs_per_worker_per_second = 0.0 + end + + private def update_overtime_series + @overtime_series.update(@enqueued_per_second, @jobs_completed_per_second) + end + + def stats + { + "total_workers" => @total_workers, + "current_size" => @current_size, + "total_jobs" => @total_jobs, + "completed" => @completed, + "retried" => @retried, + "dead" => @dead, + "processing" => @processing, + "running_workers" => @running_workers, + "jobs_completed_per_second" => @jobs_completed_per_second.round(2), + "errors_per_second" => @errors_per_second.round(2), + "enqueued_per_second" => @enqueued_per_second.round(2), + "job_wait_time" => @job_wait_time, + "job_execution_time" => @job_execution_time, + "worker_utilization" => @worker_utilization.round(2), + "error_rate_trend" => @error_rate_trend.round(2), + "failed_job_rate" => @failed_job_rate.round(2), + "jobs_per_worker_per_second" => @jobs_per_worker_per_second.round(2), + "percent_pending" => StatsUtils.percent_of(@total_jobs - @completed, @total_jobs), + "percent_processing" => StatsUtils.percent_of(@processing, @total_jobs), + "percent_completed" => StatsUtils.percent_of(@completed, @total_jobs), + "percent_retried" => StatsUtils.percent_of(@retried, @total_jobs), + "percent_dead" => StatsUtils.percent_of(@dead, @total_jobs), + "overtime_series" => @overtime_series.overtime_series, + } end end end diff --git a/src/joobq/job.cr b/src/joobq/job.cr index 2ffc5db..0bbb48d 100644 --- a/src/joobq/job.cr +++ b/src/joobq/job.cr @@ -133,6 +133,8 @@ module JoobQ property retries : Int32 = JoobQ.config.retries property expires : Int64 = JoobQ.config.expires.from_now.to_unix_ms property status : Status = Status::Enqueued + @[JSON::Field(ignore: true)] + property enqueue_time = Time.monotonic {% for status in Status.constants %} diff --git a/src/joobq/queue.cr b/src/joobq/queue.cr index 4ba5a2b..347f725 100644 --- a/src/joobq/queue.cr +++ b/src/joobq/queue.cr @@ -197,6 +197,77 @@ module JoobQ private getter workers : Array(Worker(T)) = Array(Worker(T)).new private getter workers_mutex = Mutex.new + property total_job_wait_time : Time::Span = Time.monotonic + property total_job_execution_time : Time::Span = Time.monotonic + + getter jobs_completed_per_second : Float64 do + per_second(completed.get).round(2) + end + + getter errors_per_second : Float64 do + per_second(retried.get).round(2) + end + + getter enqueued_per_second : Float64 do + per_second(total_jobs).round(2) + end + + # Calculates the average wait time for jobs in the queue. + # Measure the time that jobs spend waiting in the queue before being picked up by a worker. + getter job_wait_time : Float64 do + return 0.0 if completed.get.zero? + avg_time = (total_job_wait_time / completed.get.to_f) + # convert the time to seconds if it's greater than 1 second + avg_time.total_seconds > 1 ? avg_time.total_seconds.round(2) : avg_time.total_milliseconds.round(2) + end + + # Calculates the average execution time for jobs in the queue. + # Measure the time that jobs take to complete once they are picked up by a worker. + # This metric can help identify bottlenecks in job processing. + # A high execution time may indicate that jobs are taking too long to complete, which can impact the overall + # throughput of the system. + # A low execution time indicates that jobs are being processed quickly, which is desirable for high-performance + # systems. + # The average execution time is calculated by dividing the total execution time by the number of completed jobs. + getter job_execution_time : Float64 do + return 0.0 if completed.get.zero? + avg_time = (total_job_execution_time / completed.get.to_f) + avg_time.total_seconds > 1 ? avg_time.total_seconds.round(2) : avg_time.total_milliseconds.round(2) + end + + getter worker_utilization : Float64 do + elapsed_time = Time.monotonic - @start_time + return 0.0 if elapsed_time.to_f == 0.0 || @total_workers == 0 + + total_worker_time = @total_workers.to_f * elapsed_time.total_seconds + return 0.0 if total_worker_time == 0.0 + + utilization = (@total_job_execution_time.total_seconds / total_worker_time) * 100.0 + utilization = utilization.clamp(0.0, 100.0) + utilization.round(2) + end + + # Calculates the error rate trend for the queue. + getter error_rate_trend : Float64 do + total_attempts = completed.get + retried.get + dead.get + return 0.0 if total_attempts == 0 + (retried.get.to_f / total_attempts.to_f).round(2) + end + + # Calculates the failed job rate for the queue. + getter failed_job_rate : Float64 do + total_processed = completed.get + dead.get + return 0.0 if total_processed == 0 + (dead.get.to_f / total_processed.to_f).round(2) + end + + # Updated throughput_rate + # Calculates the average number of jobs processed per worker per second. + getter jobs_per_worker_per_second : Float64 do + return 0.0 if total_workers.zero? + (jobs_completed_per_second / total_workers.to_f).round(2) + end + def initialize(@name : String, @total_workers : Int32, @throttle_limit : NamedTuple(limit: Int32, period: Time::Span)? = nil) create_workers end @@ -280,27 +351,30 @@ module JoobQ def info { - name: name, - total_workers: total_workers, - status: status, - enqueued: size, - completed: completed.get, - retried: retried.get, - dead: dead.get, - processing: busy.get, - running_workers: running_workers, - jobs_per_second: per_second(completed.get).round(2), - errors_per_second: per_second(retried.get).round(2), - enqueued_per_second: per_second(size).round(2), - jobs_latency: jobs_latency, - elapsed_time: Time.monotonic - start_time, + name: name, + total_workers: total_workers, + status: status, + current_size: size, + total_jobs: total_jobs, + completed: completed.get, + retried: retried.get, + dead: dead.get, + processing: busy.get, + running_workers: running_workers, + jobs_completed_per_second: jobs_completed_per_second, + errors_per_second: errors_per_second, + enqueued_per_second: enqueued_per_second, + job_wait_time: job_wait_time, + job_execution_time: job_execution_time, + worker_utilization: worker_utilization, + error_rate_trend: error_rate_trend, + failed_job_rate: failed_job_rate, + jobs_per_worker_per_second: jobs_per_worker_per_second, } end - private def jobs_latency : String - elapsed_time = Time.monotonic - @start_time - return "0" if status == "Awaiting" || completed.get == 0 - format_time_span(elapsed_time / completed.get) + private def total_jobs + size + completed.get + busy.get + retried.get + dead.get end private def per_second(value) : Float64 @@ -309,13 +383,6 @@ module JoobQ elapsed_time.to_f == 0 ? 0.0 : value.to_f / elapsed_time.to_f end - private def format_time_span(span : Time::Span) : String - hours = span.total_hours.to_i - minutes = (span.total_minutes % 60).to_i - seconds = (span.total_seconds % 60).to_i - "%02d:%02d:%02d" % {hours, minutes, seconds} - end - private def reprocess_busy_jobs! @store.move_job_back_to_queue(name) end diff --git a/src/joobq/queue_metrics.cr b/src/joobq/queue_metrics.cr new file mode 100644 index 0000000..86b1ef6 --- /dev/null +++ b/src/joobq/queue_metrics.cr @@ -0,0 +1,181 @@ +require "redis" +require "json" +require "process" +require "system" + +module JoobQ + # Represents a class to handle queue metrics + class QueueMetrics + include MetricsProvider + + def self.instance : QueueMetrics + @@instance ||= new + end + + @redis : Redis::PooledClient + @instance_id : String + @process_id : Int64 + @queues : Hash(String, JoobQ::BaseQueue) + + # Initialize with a Redis client and unique instance ID + def initialize(instance_id : String = System.hostname) + @redis = RedisStore.new.redis + @instance_id = instance_id + @process_id = Process.pid + @queues = JoobQ.config.queues + end + + # Collect and store queue metrics into Redis + def collect_and_store_metrics + timestamp = Time.utc.to_unix + + @queues.values.each do |queue| + metrics_key = "joobq:metrics:#{@instance_id}:#{queue.name}" + queue_metrics = queue.info.to_h + queue_metrics[:instance_id] = @instance_id + queue_metrics[:process_id] = @process_id + queue_metrics[:last_updated] = timestamp + queue_metrics.delete(:name) + queue_metrics.delete(:status) + + # Retrieve queue info and store in Redis + @redis.hmset metrics_key, queue_metrics + end + end + + # Modify this method to return a Hash instead of an Array + def all_queue_metrics : Hash(String, Hash(String, String)) + all_metrics = Hash(String, Hash(String, String)).new + + @queues.each do |queue_name, _| + all_metrics[queue_name] = queue_metrics(queue_name) + end + + all_metrics + end + + # Retrieve metrics for a specific queue + def queue_metrics(queue_name : String) : Hash(String, String) + metrics_key = "joobq:metrics:#{@instance_id}:#{queue_name}" + @redis.hgetall(metrics_key) + end + + # Aggregate metrics across all queues into a single hash + def global_metrics : Hash(String, Float64 | Int64) + data = metric_store + + @queues.keys.each do |queue_name| + metrics = aggregate_metrics(queue_name) + data.each do |key, _| + data[key] += metrics[key] + end + end + + # Calculate averages for specific metrics + queue_count = @queues.size + if queue_count > 0 + ["jobs_completed_per_second", "errors_per_second", "enqueued_per_second", + "job_wait_time", "job_execution_time", "worker_utilization", + "error_rate_trend", "failed_job_rate", "jobs_per_worker_per_second"].each do |metric| + data[metric] /= queue_count + end + end + + data + end + + # Aggregate metrics across all instances for a particular queue + def aggregate_metrics(queue_name : String) : Hash(String, Int64 | Float64) + keys_pattern = "joobq:metrics:*:#{queue_name}" + all_keys = scan_keys(keys_pattern) + + aggregated_metrics = metric_store + count = 0 + + responses = @redis.pipelined do |pipe| + all_keys.each do |key| + pipe.hgetall(key) + end + end + + responses.each do |metrics| + # Ensure 'metrics' is an Array(Redis::RedisValue) + metrics_array = metrics.as(Array(Redis::RedisValue)) + + # Convert the array of key-value pairs into a hash + metrics_array.each_slice(2) do |key_value| + key = key_value[0].to_s + value = key_value[1].to_s + unless ["instance_id", "process_id", "last_updated"].includes?(key) + aggregated_metrics[key] = value.includes?(".") ? value.to_f64 : value.to_i64 + end + end + + count += 1 + end + + # Calculate average metrics where applicable + if count > 0 + ["jobs_completed_per_second", "errors_per_second", "enqueued_per_second", + "job_wait_time", "job_execution_time", "worker_utilization", + "error_rate_trend", "failed_job_rate", "jobs_per_worker_per_second"].each do |metric| + aggregated_metrics[metric] /= count + end + end + + aggregated_metrics + end + + # Efficiently scan keys matching a pattern using SCAN instead of KEYS + private def scan_keys(pattern : String) : Array(String) + cursor = "0" + keys = [] of String + + loop do + response = @redis.scan(cursor, match: pattern, count: 1000) + # 'response' is an Array(Redis::RedisValue) + # The first element is the cursor, the second is the array of keys + + # Extract and convert the cursor + cursor_value = response[0] + cursor = cursor_value.is_a?(String) ? cursor_value : cursor_value.to_s + + # Extract and convert the results + results_value = response[1] + if results_value.is_a?(Array) + results_value.each do |val| + key = val.is_a?(String) ? val : val.to_s + keys << key + end + end + + break if cursor == "0" + end + + keys + end + + # Initialize the metric store with default Float64 values + private def metric_store : Hash(String, Float64 | Int64) + { + "total_workers" => 0_i64, + "current_size" => 0_i64, + "total_jobs" => 0_i64, + "completed" => 0_i64, + "retried" => 0_i64, + "dead" => 0_i64, + "processing" => 0_i64, + "running_workers" => 0_i64, + "jobs_completed_per_second" => 0.0, + "errors_per_second" => 0.0, + "enqueued_per_second" => 0.0, + "job_wait_time" => 0.0, + "job_execution_time" => 0.0, + "worker_utilization" => 0.0, + "error_rate_trend" => 0.0, + "failed_job_rate" => 0.0, + "jobs_per_worker_per_second" => 0.0, + } of String => Float64 | Int64 + end + end +end diff --git a/src/joobq/redis_store.cr b/src/joobq/redis_store.cr index 3673c54..cec7f70 100644 --- a/src/joobq/redis_store.cr +++ b/src/joobq/redis_store.cr @@ -12,7 +12,7 @@ module JoobQ @port : Int32 = ENV.fetch("REDIS_PORT", "6379").to_i, @password : String? = ENV["REDIS_PASS"]?, @pool_size : Int32 = ENV.fetch("REDIS_POOL_SIZE", "100").to_i, - @pool_timeout : Time::Span = 0.5.seconds) + @pool_timeout : Float64 = 0.5) @redis = Redis::PooledClient.new( host: @host, port: @port, diff --git a/src/joobq/worker.cr b/src/joobq/worker.cr index 292033a..07f1726 100644 --- a/src/joobq/worker.cr +++ b/src/joobq/worker.cr @@ -212,9 +212,12 @@ module JoobQ end private def execute(job : T) + wait_time = Time.monotonic - job.enqueue_time + @queue.total_job_wait_time += wait_time start = Time.monotonic job.perform job.completed! + @queue.total_job_execution_time = Time.monotonic - start @queue.completed.add(1) @queue.store.delete_job job rescue ex : Exception