-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreading_metrics.py
More file actions
193 lines (153 loc) · 6.79 KB
/
threading_metrics.py
File metadata and controls
193 lines (153 loc) · 6.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# -*- coding: utf-8 -*-
# threading_metrics.py
"""
Prometheus metrics for token-managed execution.
Provides counters, gauges, and histograms for task lifecycle events,
queue behavior, worker state, and convergence-related pattern changes.
These metrics support both external monitoring and internal runtime
adaptation based on directly observed system behavior.
"""
import threading
from typing import Optional
from prometheus_client import Counter, Gauge, Histogram, CollectorRegistry
class ThreadingMetrics:
"""Central Prometheus metrics registry for the execution system.
Exposes runtime metrics for monitoring and provides the convergence
layer with directly observed queue and worker-state signals.
"""
def __init__(self, registry: Optional[CollectorRegistry] = None):
"""Initialize metric families in the provided or a new registry."""
self.registry = registry or CollectorRegistry()
# Task lifecycle counters
self.tasks_submitted = Counter(
'threading_tasks_submitted_total',
'Total tasks submitted',
['operation_type'],
registry=self.registry
)
self.tasks_completed = Counter(
'threading_tasks_completed_total',
'Total tasks completed successfully',
['operation_type', 'core_id'],
registry=self.registry
)
self.tasks_failed = Counter(
'threading_tasks_failed_total',
'Total tasks that failed',
['operation_type', 'core_id'],
registry=self.registry
)
# Task duration histogram
# Buckets: 1ms, 10ms, 50ms, 100ms, 500ms, 1s, 2s, 5s, 10s
self.task_duration = Histogram(
'threading_task_duration_seconds',
'Task execution time in seconds',
['operation_type', 'core_id'],
buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0),
registry=self.registry
)
# Queue wait time histogram
self.queue_wait_time = Histogram(
'threading_queue_wait_seconds',
'Time task spent waiting in queue',
['core_id'],
buckets=(0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0),
registry=self.registry
)
# Current queue depth per core
self.queue_depth = Gauge(
'threading_queue_depth',
'Current number of tasks in queue',
['core_id'],
registry=self.registry
)
# Workers state
self.workers_busy = Gauge(
'threading_workers_busy',
'Number of workers currently executing tasks',
['core_id'],
registry=self.registry
)
self.workers_idle = Gauge(
'threading_workers_idle',
'Number of workers waiting for tasks',
['core_id'],
registry=self.registry
)
# Current worker pattern (2=HEAVY, 3=MEDIUM, 4=LIGHT)
self.worker_pattern = Gauge(
'threading_worker_pattern',
'Current worker pattern for core (2=HEAVY, 3=MEDIUM, 4=LIGHT)',
['core_id'],
registry=self.registry
)
# Worker utilization percentage
self.worker_utilization = Gauge(
'threading_worker_utilization_percent',
'Percentage of workers busy on this core',
['core_id'],
registry=self.registry
)
# Convergence events
self.convergence_changes = Counter(
'threading_convergence_pattern_changes_total',
'Total number of pattern changes due to convergence',
['core_id', 'from_pattern', 'to_pattern'],
registry=self.registry
)
# Lock for thread-safe metric updates
self._lock = threading.Lock()
def record_task_submission(self, operation_type: str):
"""Increment the submitted-task counter for an operation type."""
self.tasks_submitted.labels(operation_type=operation_type).inc()
def record_task_completion(self, operation_type: str, core_id: int, duration: float):
"""Record successful completion and observe execution duration."""
self.tasks_completed.labels(operation_type=operation_type, core_id=str(core_id)).inc()
self.task_duration.labels(operation_type=operation_type, core_id=str(core_id)).observe(duration)
def record_task_failure(self, operation_type: str, core_id: int):
"""Increment the failed-task counter for an operation/core pair."""
self.tasks_failed.labels(operation_type=operation_type, core_id=str(core_id)).inc()
def record_queue_wait(self, core_id: int, wait_time: float):
"""Observe queue wait time for a core."""
self.queue_wait_time.labels(core_id=str(core_id)).observe(wait_time)
def update_queue_depth(self, core_id: int, depth: int):
"""Set the current queue depth gauge for a core."""
self.queue_depth.labels(core_id=str(core_id)).set(depth)
def update_worker_state(self, core_id: int, busy_count: int, idle_count: int):
"""Update busy/idle worker gauges and derived utilization for a core."""
self.workers_busy.labels(core_id=str(core_id)).set(busy_count)
self.workers_idle.labels(core_id=str(core_id)).set(idle_count)
total = busy_count + idle_count
if total > 0:
utilization = (busy_count / total) * 100
self.worker_utilization.labels(core_id=str(core_id)).set(utilization)
def update_pattern(self, core_id: int, pattern_value: int):
"""Set the active worker-pattern gauge for a core."""
self.worker_pattern.labels(core_id=str(core_id)).set(pattern_value)
def record_convergence_change(self, core_id: int, from_pattern: int, to_pattern: int):
"""Increment the convergence-triggered pattern-change counter."""
self.convergence_changes.labels(
core_id=str(core_id),
from_pattern=str(from_pattern),
to_pattern=str(to_pattern)
).inc()
def get_registry(self) -> CollectorRegistry:
"""Return the Prometheus registry used by this metrics instance."""
return self.registry
# Global singleton instance
_global_metrics: Optional[ThreadingMetrics] = None
_global_lock = threading.Lock()
def get_metrics() -> ThreadingMetrics | None:
"""Return the process-global metrics instance, creating it if needed."""
global _global_metrics
if _global_metrics is None:
with _global_lock:
if _global_metrics is None:
_global_metrics = ThreadingMetrics()
return _global_metrics
def reset_metrics():
"""Replace the process-global metrics instance, primarily for tests."""
global _global_metrics
with _global_lock:
_global_metrics = ThreadingMetrics()
return _global_metrics