From c143056683f694eba1704acb84139095887515f4 Mon Sep 17 00:00:00 2001 From: pmcclory Date: Tue, 19 May 2026 16:55:25 -0400 Subject: [PATCH] feat: improved leopard saturation metrics --- Gemfile.lock | 2 +- lib/leopard/metrics_server.rb | 61 +++++++++------ lib/leopard/templates/prometheus_metrics.erb | 40 +++++++--- test/lib/nats_api_server.rb | 81 ++++++++++++++------ 4 files changed, 127 insertions(+), 57 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 9f5e00a..f3dad78 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - leopard (0.2.5) + leopard (0.2.7) concurrent-ruby (~> 1.1) dry-configurable (~> 1.3) dry-monads (~> 1.9) diff --git a/lib/leopard/metrics_server.rb b/lib/leopard/metrics_server.rb index 36d97bc..185c19f 100644 --- a/lib/leopard/metrics_server.rb +++ b/lib/leopard/metrics_server.rb @@ -80,45 +80,62 @@ def prometheus_metrics(workers) render_metrics_template(metrics) end - # Aggregates per-subject worker utilization metrics. + # Aggregates per-worker, per-subject saturation metrics and per-worker executor metrics. # # @param workers [Array] Active Leopard worker instances to observe. # # @return [Hash{Symbol => Object}] Metric hashes for the Prometheus template. def collect_prometheus_metrics(workers) - busy = Hash.new(0) - pending = Hash.new(0) - workers.each { |w| accumulate_worker_metrics(w, busy, pending) } - { - busy:, - pending:, - subjects: (busy.keys | pending.keys).sort, - total: workers.size, - } + subject_metrics = [] + executors = [] + + workers.each_with_index do |w, i| + accumulate_worker_metrics(w, i, subject_metrics) + ex = w.instance_variable_get(:@client)&.subscription_executor + executors << { worker: i, executor: ex } if ex + end + + { subject_metrics:, executors: } end - # Adds one worker's endpoint saturation metrics to the aggregate hashes. + # Appends one worker's per-subject slot metrics to the subject_metrics array. # # @param worker [Object] A Leopard worker instance. - # @param busy [Hash{String => Integer}] Subject-to-busy-worker counts. - # @param pending [Hash{String => Integer}] Subject-to-pending-message counts. + # @param worker_index [Integer] Position of this worker in the workers array. + # @param subject_metrics [Array] Accumulator for per-worker-per-subject metric rows. # # @return [void] - def accumulate_worker_metrics(worker, busy, pending) + def accumulate_worker_metrics(worker, worker_index, subject_metrics) service = worker.instance_variable_get(:@service) return unless service - service.endpoints.each do |ep| - # TODO: use ep.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint - sub = ep.instance_variable_get(:@handler) - next unless sub - - subj = ep.subject.to_s - busy[subj] += sub.concurrency_semaphore.available_permits.zero? ? 1 : 0 - pending[subj] += sub.pending_queue&.size.to_i + service.endpoints.each do |endpoint| + row = endpoint_subject_metrics(endpoint, worker_index) + subject_metrics << row if row end end + # Builds a per-worker-per-subject metric row from a single endpoint, or nil if not yet active. + # + # @param endpoint [Object] A NATS service endpoint. + # @param worker_index [Integer] Position of the owning worker in the workers array. + # + # @return [Hash, nil] + def endpoint_subject_metrics(endpoint, worker_index) + # TODO: use endpoint.handler once nats-pure.rb adds attr_reader :handler to NATS::Service::Endpoint + sub = endpoint.instance_variable_get(:@handler) + return unless sub + + concurrency = sub.instance_variable_get(:@processing_concurrency).to_i + { + worker: worker_index, + subject: endpoint.subject.to_s, + busy_slots: concurrency - sub.concurrency_semaphore.available_permits, + capacity_slots: concurrency, + pending: sub.pending_queue&.size.to_i, + } + end + # Renders the metrics ERB template with aggregated metric data. # # @param metrics [Hash{Symbol => Object}] Aggregated metric data for template rendering. diff --git a/lib/leopard/templates/prometheus_metrics.erb b/lib/leopard/templates/prometheus_metrics.erb index ee798a9..3671ae4 100644 --- a/lib/leopard/templates/prometheus_metrics.erb +++ b/lib/leopard/templates/prometheus_metrics.erb @@ -1,17 +1,35 @@ -# HELP leopard_subject_busy_instances Instances currently processing a message on this subject -# TYPE leopard_subject_busy_instances gauge -<% subjects.each do |subject| -%> -leopard_subject_busy_instances{subject="<%= subject %>"} <%= busy[subject] %> +# HELP leopard_subject_busy_slots Thread slots actively processing a message for this subject on this worker +# TYPE leopard_subject_busy_slots gauge +<% subject_metrics.each do |m| -%> +leopard_subject_busy_slots{subject="<%= m[:subject] %>",worker="<%= m[:worker] %>"} <%= m[:busy_slots] %> <% end -%> -# HELP leopard_subject_total_instances Total Leopard instances in this process -# TYPE leopard_subject_total_instances gauge -<% subjects.each do |subject| -%> -leopard_subject_total_instances{subject="<%= subject %>"} <%= total %> +# HELP leopard_subject_capacity_slots Total thread slots allocated for this subject on this worker +# TYPE leopard_subject_capacity_slots gauge +<% subject_metrics.each do |m| -%> +leopard_subject_capacity_slots{subject="<%= m[:subject] %>",worker="<%= m[:worker] %>"} <%= m[:capacity_slots] %> <% end -%> -# HELP leopard_subject_pending_messages Messages pending processing across all instances +# HELP leopard_subject_pending_messages Messages waiting to acquire a processing slot for this subject on this worker # TYPE leopard_subject_pending_messages gauge -<% subjects.each do |subject| -%> -leopard_subject_pending_messages{subject="<%= subject %>"} <%= pending[subject] %> +<% subject_metrics.each do |m| -%> +leopard_subject_pending_messages{subject="<%= m[:subject] %>",worker="<%= m[:worker] %>"} <%= m[:pending] %> +<% end -%> + +# HELP leopard_executor_active_threads Approximate number of active threads in the subscription executor (concurrent-ruby active_count is approximate) +# TYPE leopard_executor_active_threads gauge +<% executors.each do |e| -%> +leopard_executor_active_threads{worker="<%= e[:worker] %>"} <%= e[:executor].active_count %> +<% end -%> + +# HELP leopard_executor_max_threads Maximum threads in the subscription executor; queued_tasks goes positive when sum of capacity_slots across subjects exceeds this value +# TYPE leopard_executor_max_threads gauge +<% executors.each do |e| -%> +leopard_executor_max_threads{worker="<%= e[:worker] %>"} <%= e[:executor].max_length %> +<% end -%> + +# HELP leopard_executor_queued_tasks Tasks holding a semaphore permit but waiting for a free executor thread; nonzero only when the executor pool is fully saturated +# TYPE leopard_executor_queued_tasks gauge +<% executors.each do |e| -%> +leopard_executor_queued_tasks{worker="<%= e[:worker] %>"} <%= e[:executor].queue_length %> <% end -%> diff --git a/test/lib/nats_api_server.rb b/test/lib/nats_api_server.rb index 8328dcf..b737905 100755 --- a/test/lib/nats_api_server.rb +++ b/test/lib/nats_api_server.rb @@ -218,9 +218,19 @@ def callback_set(on_success: ->(*_) {}, on_failure: ->(*_) {}, on_error: ->(*_) end describe 'prometheus metrics' do # rubocop:disable Metrics/BlockLength - let(:available_struct) { Struct.new(:zero?) { def available_permits = self } } + let(:semaphore_struct) { Struct.new(:available_permits) } let(:queue_struct) { Struct.new(:pending_size) { def size = pending_size } } - let(:handler_struct) { Struct.new(:concurrency_semaphore, :pending_queue) } + let(:handler_struct) do + Struct.new(:concurrency_semaphore, :pending_queue, :processing_concurrency) do + def instance_variable_get(name) + return processing_concurrency if name == :@processing_concurrency + + super + end + end + end + let(:executor_struct) { Struct.new(:active_count, :max_length, :queue_length) } + let(:client_struct) { Struct.new(:subscription_executor) } let(:endpoint_struct) do Struct.new(:subject) do def initialize(subject, handler) @@ -231,9 +241,10 @@ def initialize(subject, handler) end let(:service_struct) { Struct.new(:endpoints) } let(:worker_struct) do - Struct.new(:service) do + Struct.new(:service, :client) do def instance_variable_get(name) return service if name == :@service + return client if name == :@client super end @@ -241,32 +252,56 @@ def instance_variable_get(name) end let(:expected_metrics) do <<~METRICS - # HELP leopard_subject_busy_instances Instances currently processing a message on this subject - # TYPE leopard_subject_busy_instances gauge - leopard_subject_busy_instances{subject="alpha"} 1 - leopard_subject_busy_instances{subject="beta"} 0 - - # HELP leopard_subject_total_instances Total Leopard instances in this process - # TYPE leopard_subject_total_instances gauge - leopard_subject_total_instances{subject="alpha"} 2 - leopard_subject_total_instances{subject="beta"} 2 - - # HELP leopard_subject_pending_messages Messages pending processing across all instances + # HELP leopard_subject_busy_slots Thread slots actively processing a message for this subject on this worker + # TYPE leopard_subject_busy_slots gauge + leopard_subject_busy_slots{subject="alpha",worker="0"} 1 + leopard_subject_busy_slots{subject="beta",worker="0"} 0 + leopard_subject_busy_slots{subject="alpha",worker="1"} 2 + + # HELP leopard_subject_capacity_slots Total thread slots allocated for this subject on this worker + # TYPE leopard_subject_capacity_slots gauge + leopard_subject_capacity_slots{subject="alpha",worker="0"} 2 + leopard_subject_capacity_slots{subject="beta",worker="0"} 2 + leopard_subject_capacity_slots{subject="alpha",worker="1"} 2 + + # HELP leopard_subject_pending_messages Messages waiting to acquire a processing slot for this subject on this worker # TYPE leopard_subject_pending_messages gauge - leopard_subject_pending_messages{subject="alpha"} 5 - leopard_subject_pending_messages{subject="beta"} 1 + leopard_subject_pending_messages{subject="alpha",worker="0"} 3 + leopard_subject_pending_messages{subject="beta",worker="0"} 1 + leopard_subject_pending_messages{subject="alpha",worker="1"} 2 + + # HELP leopard_executor_active_threads Approximate number of active threads in the subscription executor (concurrent-ruby active_count is approximate) + # TYPE leopard_executor_active_threads gauge + leopard_executor_active_threads{worker="0"} 5 + leopard_executor_active_threads{worker="1"} 10 + + # HELP leopard_executor_max_threads Maximum threads in the subscription executor; queued_tasks goes positive when sum of capacity_slots across subjects exceeds this value + # TYPE leopard_executor_max_threads gauge + leopard_executor_max_threads{worker="0"} 24 + leopard_executor_max_threads{worker="1"} 24 + + # HELP leopard_executor_queued_tasks Tasks holding a semaphore permit but waiting for a free executor thread; nonzero only when the executor pool is fully saturated + # TYPE leopard_executor_queued_tasks gauge + leopard_executor_queued_tasks{worker="0"} 0 + leopard_executor_queued_tasks{worker="1"} 2 METRICS end it 'renders prometheus metrics from the erb template' do workers = [ - worker_struct.new(service_struct.new([ - endpoint_struct.new('alpha', handler_struct.new(available_struct.new(true), queue_struct.new(3))), - endpoint_struct.new('beta', handler_struct.new(available_struct.new(false), queue_struct.new(1))), - ])), - worker_struct.new(service_struct.new([ - endpoint_struct.new('alpha', handler_struct.new(available_struct.new(false), queue_struct.new(2))), - ])), + worker_struct.new( + service_struct.new([ + endpoint_struct.new('alpha', handler_struct.new(semaphore_struct.new(1), queue_struct.new(3), 2)), + endpoint_struct.new('beta', handler_struct.new(semaphore_struct.new(2), queue_struct.new(1), 2)), + ]), + client_struct.new(executor_struct.new(5, 24, 0)), + ), + worker_struct.new( + service_struct.new([ + endpoint_struct.new('alpha', handler_struct.new(semaphore_struct.new(0), queue_struct.new(2), 2)), + ]), + client_struct.new(executor_struct.new(10, 24, 2)), + ), ] assert_equal expected_metrics, @klass.send(:prometheus_metrics, workers)