Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
delayed (2.0.2)
delayed (2.0.3)
activerecord (>= 6.0)
concurrent-ruby

Expand Down
51 changes: 30 additions & 21 deletions lib/delayed/monitor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Monitor
cattr_accessor :sleep_delay, instance_writer: false, default: 60

def initialize
@jobs = Job.group(priority_case_statement).group(:queue)
@jobs = Job.group(:priority, :queue)
@jobs = @jobs.where(queue: Worker.queues) if Worker.queues.any?
@memo = {}
end
Expand Down Expand Up @@ -68,63 +68,72 @@ def default_tags
}
end

def grouped_count(scope)
Delayed::Job.from(scope.select('priority, queue, COUNT(*) AS count'))
.group(priority_case_statement, :queue).sum(:count)
end

def grouped_min(scope, column)
Delayed::Job.from(scope.select("priority, queue, MIN(#{column}) AS #{column}"))
.group(priority_case_statement, :queue).minimum(column)
end

def count_grouped
if Job.connection.supports_partial_index?
failed_count_grouped.merge(jobs.live.count) { |_, l, f| l + f }
failed_count_grouped.merge(live_count_grouped) { |_, l, f| l + f }
else
jobs.count
grouped_count(jobs)
end
end

def live_count_grouped
grouped_count(jobs.live)
end

def future_count_grouped
jobs.future.count
grouped_count(jobs.future)
end

def locked_count_grouped
@memo[:locked_count_grouped] ||= jobs.claimed.count
@memo[:locked_count_grouped] ||= grouped_count(jobs.claimed)
end

def erroring_count_grouped
jobs.erroring.count
grouped_count(jobs.erroring)
end

def failed_count_grouped
@memo[:failed_count_grouped] ||= jobs.failed.count
@memo[:failed_count_grouped] ||= grouped_count(jobs.failed)
end

def max_lock_age_grouped
oldest_locked_job_grouped.each_with_object({}) do |job, metrics|
metrics[[job.priority.to_i, job.queue]] = Job.db_time_now - job.locked_at
end
oldest_locked_job_grouped.transform_values { |locked_at| Job.db_time_now - locked_at }
end

def max_age_grouped
oldest_workable_job_grouped.each_with_object({}) do |job, metrics|
metrics[[job.priority.to_i, job.queue]] = Job.db_time_now - job.run_at
end
oldest_workable_job_grouped.transform_values { |run_at| Job.db_time_now - run_at }
end

def alert_age_percent_grouped
oldest_workable_job_grouped.each_with_object({}) do |job, metrics|
max_age = Job.db_time_now - job.run_at
metrics[[job.priority.to_i, job.queue]] = [max_age / job.priority.alert_age * 100, 100].min if job.priority.alert_age
oldest_workable_job_grouped.each_with_object({}) do |((priority, queue), run_at), metrics|
max_age = Job.db_time_now - run_at
alert_age = Priority.new(priority).alert_age
metrics[[priority, queue]] = [max_age / alert_age * 100, 100].min if alert_age
end
end

def workable_count_grouped
jobs.claimable.count
grouped_count(jobs.claimable)
end

alias working_count_grouped locked_count_grouped

def oldest_locked_job_grouped
jobs.claimed
.select("#{priority_case_statement} AS priority, queue, MIN(locked_at) AS locked_at")
grouped_min(jobs.claimed, :locked_at)
end

def oldest_workable_job_grouped
@memo[:oldest_workable_job_grouped] ||= jobs.claimable
.select("(#{priority_case_statement}) AS priority, queue, MIN(run_at) AS run_at")
@memo[:oldest_workable_job_grouped] ||= grouped_min(jobs.claimable, :run_at)
end

def priority_case_statement
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Delayed
VERSION = '2.0.2'
VERSION = '2.0.3'
end
Loading