From 4abe3bd1000ab2d796bf594e5bfa64a875e72cfd Mon Sep 17 00:00:00 2001 From: ruihongzhou Date: Mon, 27 Apr 2026 20:13:14 +0800 Subject: [PATCH 1/3] [fix][client] Fix OpenTelemetryProducerInterceptor not executing due to eligible check --- .../OpenTelemetryTracingIntegrationTest.java | 48 +++++++++---------- .../OpenTelemetryProducerInterceptor.java | 4 +- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java index fcc0bd1776eb5..8a33c695f0bd7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -128,33 +128,30 @@ public void testBasicProducerConsumerTracing() throws Exception { // Force flush tracer provider flushSpans(); - // Verify spans - at least one span should be created + // Verify spans - expected 2 spans to be created List spans = spanExporter.getFinishedSpanItems(); - assertTrue(spans.size() > 0, "Expected at least one span, got: " + spans.size()); + assertEquals(spans.size(), 2, "Expected 2 spans, got: " + spans.size()); - // Verify producer span if present - spans.stream() + // Verify producer span + SpanData producerSpan = spans.stream() .filter(s -> s.getKind() == SpanKind.PRODUCER) .findFirst() - .ifPresent(producerSpan -> { - assertEquals(producerSpan.getName(), "send " + topic); - assertEquals(producerSpan.getAttributes().get( - io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); - }); + .orElseThrow(); + assertEquals(producerSpan.getName(), "send " + topic); + assertEquals(producerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); - // Verify consumer span if present - spans.stream() + // Verify consumer span + SpanData consumerSpan = spans.stream() .filter(s -> s.getKind() == SpanKind.CONSUMER) .findFirst() - .ifPresent(consumerSpan -> { - assertEquals(consumerSpan.getName(), "process " + topic); - assertEquals(consumerSpan.getAttributes().get( - io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); - assertEquals(consumerSpan.getAttributes().get( - io.opentelemetry.api.common.AttributeKey.stringKey( - "messaging.pulsar.acknowledgment.type")), - "acknowledge"); - }); + .orElseThrow(); + assertEquals(consumerSpan.getName(), "process " + topic); + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.system")), "pulsar"); + assertEquals(consumerSpan.getAttributes().get( + io.opentelemetry.api.common.AttributeKey.stringKey("messaging.pulsar.acknowledgment.type")), + "acknowledge"); } @Test @@ -720,14 +717,16 @@ public void testBatchMessagesTracing() throws Exception { .subscriptionName("test-sub") .subscribe(); + int numMessages = 5; + // Send batch of messages - for (int i = 0; i < 5; i++) { + for (int i = 0; i < numMessages; i++) { producer.sendAsync("message-" + i); } producer.flush(); // Receive and acknowledge all messages - for (int i = 0; i < 5; i++) { + for (int i = 0; i < numMessages; i++) { Message msg = consumer.receive(5, TimeUnit.SECONDS); assertNotNull(msg); consumer.acknowledge(msg); @@ -741,9 +740,10 @@ public void testBatchMessagesTracing() throws Exception { flushSpans(); // Verify spans for batched messages - // Note: Tracing behavior may vary for batched messages depending on when spans are created List spans = spanExporter.getFinishedSpanItems(); - assertTrue(spans.size() > 0, "Expected at least some spans for batched messages"); + int exceptedNumSpans = numMessages * 2; + assertEquals(exceptedNumSpans, spans.size(), + "Expected " + exceptedNumSpans + " spans for batched messages, got " + spans.size()); // Verify that spans have correct attributes spans.stream() diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java index aa3fea9615582..9d49a77562a4a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/tracing/OpenTelemetryProducerInterceptor.java @@ -77,7 +77,7 @@ public void close() { @Override public boolean eligible(Message message) { - return tracer != null && propagator != null; + return true; } @Override @@ -85,7 +85,7 @@ public Message beforeSend(Producer producer, Message message) { // Initialize tracer from producer on first call initializeIfNeeded(producer); - if (!eligible(message)) { + if (tracer == null || propagator == null) { return message; } From 6e76b9fae5105af94dc533fa258db6fddd7fffb7 Mon Sep 17 00:00:00 2001 From: ruihongzhou Date: Thu, 7 May 2026 17:46:55 +0800 Subject: [PATCH 2/3] fix typo --- .../broker/service/OpenTelemetryTracingIntegrationTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java index 8a33c695f0bd7..9ef3f4b514bd0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -741,9 +741,9 @@ public void testBatchMessagesTracing() throws Exception { // Verify spans for batched messages List spans = spanExporter.getFinishedSpanItems(); - int exceptedNumSpans = numMessages * 2; - assertEquals(exceptedNumSpans, spans.size(), - "Expected " + exceptedNumSpans + " spans for batched messages, got " + spans.size()); + int expectedNumSpans = numMessages * 2; + assertEquals(expectedNumSpans, spans.size(), + "Expected " + expectedNumSpans + " spans for batched messages, got " + spans.size()); // Verify that spans have correct attributes spans.stream() From 88d6e7fd0e93e06f150c24fd6127b7378505a561 Mon Sep 17 00:00:00 2001 From: ruihongzhou Date: Thu, 7 May 2026 17:51:50 +0800 Subject: [PATCH 3/3] fix --- .../broker/service/OpenTelemetryTracingIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java index 9ef3f4b514bd0..1007f7747882f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OpenTelemetryTracingIntegrationTest.java @@ -742,7 +742,7 @@ public void testBatchMessagesTracing() throws Exception { // Verify spans for batched messages List spans = spanExporter.getFinishedSpanItems(); int expectedNumSpans = numMessages * 2; - assertEquals(expectedNumSpans, spans.size(), + assertEquals(spans.size(), expectedNumSpans, "Expected " + expectedNumSpans + " spans for batched messages, got " + spans.size()); // Verify that spans have correct attributes