Skip to content

Commit b986992

Browse files
committed
[promotel] Implement Prometheus to OTel metrics conversion and export based on OTel collector components
1 parent 42c3764 commit b986992

18 files changed

Lines changed: 2408 additions & 129 deletions

go.mod

Lines changed: 191 additions & 38 deletions
Large diffs are not rendered by default.

go.sum

Lines changed: 904 additions & 91 deletions
Large diffs are not rendered by default.

pkg/promotel/Makefile

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
help: ## Print this help text
2+
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-40s\033[0m %s\n", $$1, $$2}'
3+
4+
.PHONY: \
5+
tidy \
6+
fmt \
7+
lint \
8+
test \
9+
run-example
10+
11+
tidy: fmt ## run go mod tidy
12+
go mod tidy
13+
14+
fmt: ## run go fmt
15+
go fmt ./...
16+
17+
lint: ## run golangci-lint
18+
golangci-lint run ./...
19+
20+
test: ## run unit tests
21+
cd otel/collector-gateway && go test -v ./beholder_kafka/... ./tokenauthextension/...
22+
23+
build: tidy ## build the demo
24+
go build ./cmd/example.go
25+
26+
run-example: ## run the example
27+
go run ./cmd/example.go

pkg/promotel/README.md

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Package Overview
2+
The package provides components for performing Prometheus to OTel metrics conversion.
3+
4+
Main components: MetricsReceiver, MetricsExporter
5+
6+
## Receiver
7+
- Wraps [prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver)
8+
- Fetches prometheus metrics data via `prometheus.Gatherer` (same process memory, no HTTP calls)
9+
- Uses custom implementation of `prometheus.scraper` (from here https://github.com/pkcll/prometheus/pull/1) to shortcut HTTP request calls and fetch data from `prometheus.Gatherer`
10+
- Converts Prometheus metrics into OTel format using [prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver)
11+
- Passes OTel metrics data to downstream OTel [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)
12+
13+
## Exporter
14+
- Wraps [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)
15+
- Receives metric data from the receiver
16+
- Export OTel metrics data to otel collector endpoint via [otlpexporter](go.opentelemetry.io/collector/exporter/otlpexporter)
17+
18+
## OTel collector prometheusreceiver
19+
20+
[prometheusreceiver](github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver).
21+
is a component of otel-collector which collects metrics from Prometheus endpoints. It scrapes the metrics at regular intervals and converts them into a format that can be processed by the rest of the collector pipeline.
22+
23+
`promotel` is a wrapper around `prometheusreceiver` which provides a simple API to start and stop the receiver and process the metrics data.
24+
25+
`promotel` uses `prometheusreceiver` factory to create an instance of the receiver via `factory.CreateMetrics` with provided configuration. It also provides a callback function which is called every time new metrics data is received. The metrics data is a `pmetric.Metrics` object which contains the metrics data received from the Prometheus endpoint.
26+
27+
`promotel/inernal` contains implementations for `consumer.Metrics`, `component.Host`, `receiver.Settings`, `component.TelemetrySettings` which are dependencies required for `factory.CreateMetrics`.
28+
29+
`metrics.Consumer` is an interface which is used to process the metrics data. The `prometheusreceiver` calls `Consumer.ConsumeMetrics` function every time new metrics data is received.
30+
31+
`prometheusreceiver` has Start and Shutdown methods.
32+
33+
`github.com/pkcll/prometheus v0.54.1-promotel` fork overrides the `prometheus` package to provide a way to scrape metrics directly from `prometheus.DefaultGatherer` without making HTTP requests to the Prometheus endpoint. This is useful when the Prometheus endpoint is not accessible from the collector.
34+
35+
Example configuration:
36+
37+
38+
```yaml
39+
receivers:
40+
prometheus:
41+
config:
42+
scrape_configs:
43+
- job_name: 'example'
44+
static_configs:
45+
- targets: ['localhost:9090']
46+
47+
```
48+
49+
## OTel collector otlpexporter
50+
51+
[otlpexporter](https://github.com/open-telemetry/opentelemetry-collector/tree/main/exporter/otlpexporter) is a component of the OpenTelemetry Collector that exports telemetry data (metrics, logs, and traces) using the OpenTelemetry Protocol (OTLP). It supports both gRPC and HTTP transport protocols.
52+
53+
Example configuration:
54+
55+
```yaml
56+
exporters:
57+
otlp:
58+
endpoint: "localhost:4317"
59+
tls:
60+
insecure: true
61+
retry_on_failure:
62+
enabled: true
63+
initial_interval: 5s
64+
max_interval: 30s
65+
max_elapsed_time: 300s
66+
sending_queue:
67+
enabled: true
68+
queue_size: 5000
69+
```
70+
71+
### `promotel` usage example:
72+
73+
```go
74+
import (
75+
"context"
76+
"fmt"
77+
"time"
78+
79+
"go.opentelemetry.io/collector/pdata/pmetric"
80+
81+
"github.com/smartcontractkit/chainlink-common/pkg/promotel"
82+
)
83+
84+
func main() {
85+
exporterConfig, _ := promotel.NewDefaultExporterConfig()
86+
exporter, _ := promotel.NewMetricExporter(exporterConfig, logger)
87+
receiverConfig, _ := promotel.NewDefaultReceiverConfig()
88+
// Fetches metrics data directly from DefaultGatherer without making HTTP requests to 127.0.0.1:8888
89+
receiver, _ := promotel.NewMetricReceiver(receiverConfig, prometheus.DefaultGatherer, exporter.Consumer().ConsumeMetrics, logger)
90+
fmt.Println("Starting promotel pipeline")
91+
exporter.Start(context.Background())
92+
receiver.Start(context.Background())
93+
defer receiver.Close()
94+
defer exporter.Close()
95+
time.Sleep(1 * time.Minute)
96+
}
97+
```
98+
99+
### Debug Metric Receiver
100+
101+
`DebugMetricReceiver` is an implementation of `metrics.Consumer` which prints formatted metrics data to stdout. It is useful for testing purposes.
102+
103+
### `Debug Metric Receiver` usage example:
104+
105+
```go
106+
...
107+
// Debug metric receiver prints fetched metrics to stdout
108+
receiver, err := promotel.NewDebugMetricReceiver(config, prometheus.DefaultGatherer, logger)
109+
// Start metric receiver
110+
receiver.Start(context.Background())
111+
...
112+
```
113+
114+
Output example
115+
116+
```
117+
NumberDataPoints #0
118+
StartTimestamp: 1970-01-01 00:00:00 +0000 UTC
119+
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
120+
Value: 44.000000
121+
Metric #18
122+
Descriptor:
123+
-> Name: otelcol_exporter_sent_metric_points
124+
-> Description: Number of metric points successfully sent to destination.
125+
-> Unit:
126+
-> DataType: Sum
127+
-> IsMonotonic: true
128+
-> AggregationTemporality: Cumulative
129+
NumberDataPoints #0
130+
Data point attributes:
131+
-> exporter: Str(debug)
132+
-> service_version: Str(0.108.1)
133+
StartTimestamp: 2025-01-02 18:38:05.905 +0000 UTC
134+
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
135+
Value: 137.000000
136+
NumberDataPoints #1
137+
Data point attributes:
138+
-> exporter: Str(otlphttp)
139+
-> service_version: Str(0.108.1)
140+
StartTimestamp: 2025-01-02 18:38:05.905 +0000 UTC
141+
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
142+
Value: 137.000000
143+
Metric #19
144+
Descriptor:
145+
-> Name: otelcol_process_cpu_seconds
146+
-> Description: Total CPU user and system time in seconds
147+
-> Unit:
148+
-> DataType: Sum
149+
-> IsMonotonic: true
150+
-> AggregationTemporality: Cumulative
151+
NumberDataPoints #0
152+
Data point attributes:
153+
-> service_version: Str(0.108.1)
154+
StartTimestamp: 2025-01-02 18:38:05.905 +0000 UTC
155+
Timestamp: 2025-01-02 18:38:28.905 +0000 UTC
156+
Value: 0.930000
157+
```

pkg/promotel/cmd/example.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
9+
"time"
10+
11+
"go.opentelemetry.io/collector/consumer"
12+
"go.opentelemetry.io/collector/pdata/pmetric"
13+
"go.uber.org/zap"
14+
15+
"github.com/prometheus/client_golang/prometheus"
16+
"github.com/prometheus/client_golang/prometheus/promauto"
17+
dto "github.com/prometheus/client_model/go"
18+
19+
"github.com/smartcontractkit/chainlink-common/pkg/promotel"
20+
)
21+
22+
const testCounterMetricName = "test_counter_metric"
23+
24+
func reportMetrics(reg prometheus.Registerer, logger *zap.Logger) {
25+
testCounter := promauto.With(reg).NewCounter(prometheus.CounterOpts{
26+
Name: testCounterMetricName,
27+
ConstLabels: prometheus.Labels{
28+
"app": "promotel-demo",
29+
},
30+
})
31+
for {
32+
testCounter.Inc()
33+
m := &dto.Metric{}
34+
_ = testCounter.Write(m)
35+
logger.Info("Reported Prometheus metric ", zap.Any("name", testCounterMetricName), zap.Any("value", m.GetCounter().GetValue()))
36+
time.Sleep(1 * time.Second)
37+
}
38+
}
39+
40+
func gatherMetricsDirectly(reg prometheus.Gatherer, logger *zap.Logger) {
41+
for {
42+
mf, err := reg.Gather()
43+
if err != nil {
44+
fmt.Printf("Error gathering metrics: %v\n", err)
45+
}
46+
for _, metricFamily := range mf {
47+
if *metricFamily.Name == testCounterMetricName {
48+
for _, metric := range metricFamily.Metric {
49+
logger.Info("Received Prometheus metric ", zap.Any("name", testCounterMetricName), zap.Any("value", metric.Counter.GetValue()))
50+
}
51+
}
52+
}
53+
time.Sleep(1 * time.Second)
54+
}
55+
}
56+
57+
func startExporter(ctx context.Context, logger *zap.Logger) promotel.MetricExporter {
58+
expConfig, err := promotel.NewExporterConfig(map[string]any{
59+
"endpoint": "localhost:4317",
60+
"tls": map[string]any{
61+
"insecure": true,
62+
},
63+
})
64+
if err != nil {
65+
logger.Fatal("Failed to create exporter config", zap.Error(err))
66+
}
67+
// Sends metrics data in OTLP format to otel-collector endpoint
68+
exporter, err := promotel.NewMetricExporter(expConfig, logger)
69+
if err != nil {
70+
logger.Fatal("Failed to create metric exporter", zap.Error(err))
71+
}
72+
err = exporter.Start(ctx)
73+
if err != nil {
74+
logger.Fatal("Failed to start exporter", zap.Error(err))
75+
}
76+
return exporter
77+
}
78+
79+
func startMetricReceiver(reg prometheus.Gatherer, logger *zap.Logger, next consumer.ConsumeMetricsFunc) promotel.Runnable {
80+
logger.Info("Starting promotel metric receiver")
81+
config, err := promotel.NewDefaultReceiverConfig()
82+
if err != nil {
83+
logger.Fatal("Failed to create config", zap.Error(err))
84+
}
85+
86+
// Gather metrics via promotel
87+
// MetricReceiver fetches metrics from pormetheus.Gatherer, then converts it to OTel format and writes formatted metrics to stdout
88+
receiver, err := promotel.NewMetricReceiver(config, reg, next, logger)
89+
90+
if err != nil {
91+
logger.Fatal("Failed to create debug metric receiver", zap.Error(err))
92+
}
93+
// Starts the promotel
94+
if err := receiver.Start(context.Background()); err != nil {
95+
logger.Fatal("Failed to start metric receiver", zap.Error(err))
96+
}
97+
return receiver
98+
}
99+
100+
func main() {
101+
logger, _ := zap.NewDevelopment()
102+
103+
go reportMetrics(prometheus.DefaultRegisterer, logger)
104+
// Gather metrics directly from DefaultGatherer to verify that the metrics are being reported
105+
go gatherMetricsDirectly(prometheus.DefaultGatherer, logger)
106+
107+
exporter := startExporter(context.Background(), logger)
108+
// Fetches metrics from in memory prometheus.Gatherer and converts to OTel format
109+
receiver := startMetricReceiver(prometheus.DefaultGatherer, logger, func(ctx context.Context, md pmetric.Metrics) error {
110+
// Logs the converted OTel metric
111+
logOtelMetric(md, testCounterMetricName, logger)
112+
// Exports the converted OTel metric
113+
return exporter.Consumer().ConsumeMetrics(ctx, md)
114+
})
115+
116+
// Wait for a signal to exit
117+
signalChan := make(chan os.Signal, 1)
118+
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
119+
120+
// Block until a signal is received
121+
<-signalChan
122+
logger.Info("Exiting promotel")
123+
// Gracefully shuts down promotel
124+
if err := receiver.Close(); err != nil {
125+
logger.Fatal("Failed to close scraper", zap.Error(err))
126+
}
127+
if err := exporter.Close(); err != nil {
128+
logger.Fatal("Failed to close exporter", zap.Error(err))
129+
}
130+
}
131+
132+
func logOtelMetric(md pmetric.Metrics, name string, logger *zap.Logger) {
133+
rms := md.ResourceMetrics()
134+
for i := 0; i < rms.Len(); i++ {
135+
rm := rms.At(i)
136+
ilms := rm.ScopeMetrics()
137+
for j := 0; j < ilms.Len(); j++ {
138+
ilm := ilms.At(j)
139+
metrics := ilm.Metrics()
140+
for k := 0; k < metrics.Len(); k++ {
141+
metric := metrics.At(k)
142+
if metric.Name() == name {
143+
logger.Info("Exporting OTel metric ", zap.Any("name", metric.Name()), zap.Any("value", metric.Sum().DataPoints().At(0).DoubleValue()))
144+
}
145+
}
146+
}
147+
}
148+
}

0 commit comments

Comments
 (0)