From 9372655f01e3f614f066eae04f7a26d4ab3974a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=A9=E9=BE=99?= Date: Wed, 1 Apr 2026 19:45:20 +0800 Subject: [PATCH 1/3] Add BatchSplittingMetricExporter to prevent OTLP gRPC export failures When high-cardinality metrics (consumer_group x topic) produce OTLP export payloads exceeding the gRPC 32MB limit or SLS per-RPC processing limit, all metrics fail to export. This adds a MetricExporter decorator that: - Splits large batches of MetricData objects into smaller sub-batches - Splits single oversized MetricData objects by their internal data points into multiple smaller MetricData objects (supports all 7 MetricDataType) - Configurable via BrokerConfig.metricsExportBatchMaxDataPoints (default 1000) - Fast path with zero overhead when data points are within threshold - Logs failed batch details for debugging --- .../metrics/BatchSplittingMetricExporter.java | 525 ++++++++++++++++ .../broker/metrics/BrokerMetricsManager.java | 7 +- .../BatchSplittingMetricExporterTest.java | 586 ++++++++++++++++++ .../apache/rocketmq/common/BrokerConfig.java | 10 + 4 files changed, 1126 insertions(+), 2 deletions(-) create mode 100644 broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java create mode 100644 broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java new file mode 100644 index 00000000000..ec7cb4a038e --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +import com.google.common.collect.Lists; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.IntSupplier; + +/** + * A {@link MetricExporter} decorator that splits large + * metric batches into smaller sub-batches before delegating + * to the underlying exporter. + * + *

This addresses the gRPC 32MB payload size limit when + * exporting OTLP metrics. High-cardinality metrics (e.g., + * consumer lag with consumer_group x topic combinations) + * can produce payloads exceeding this limit, causing all + * metrics to fail to export. + * + *

Splitting is based on the total number of data points + * (not the number of MetricData objects), because a single + * MetricData can contain thousands of data points. When the + * total data point count is within the configured threshold, + * the batch is passed through directly (fast path). + * + *

When a single MetricData contains more data points + * than the batch limit, its internal points are split into + * multiple smaller MetricData objects, each preserving the + * original resource, scope, name, description, unit, and + * type metadata. + */ +public final class BatchSplittingMetricExporter + implements MetricExporter { + + /** Logger. */ + private static final Logger LOGGER = + LoggerFactory.getLogger( + LoggerName.BROKER_LOGGER_NAME); + + /** The underlying exporter to delegate to. */ + private final MetricExporter delegate; + + /** Supplies the max data points per batch at runtime. */ + private final IntSupplier maxBatchSizeSupplier; + + /** + * Creates a new BatchSplittingMetricExporter. + * + * @param metricExporter the underlying MetricExporter + * @param batchSizeSupplier supplies the max number + * of data points per batch; must return > 0 + */ + public BatchSplittingMetricExporter( + final MetricExporter metricExporter, + final IntSupplier batchSizeSupplier) { + if (metricExporter == null) { + throw new NullPointerException( + "metricExporter must not be null"); + } + if (batchSizeSupplier == null) { + throw new NullPointerException( + "batchSizeSupplier must not be null"); + } + this.delegate = metricExporter; + this.maxBatchSizeSupplier = batchSizeSupplier; + } + + /** {@inheritDoc} */ + @Override + public CompletableResultCode export( + final Collection metrics) { + if (metrics == null || metrics.isEmpty()) { + return delegate.export(metrics); + } + + int maxBatchSize = + maxBatchSizeSupplier.getAsInt(); + + int totalDataPoints = 0; + for (MetricData md : metrics) { + totalDataPoints += + md.getData().getPoints().size(); + } + + if (totalDataPoints <= maxBatchSize) { + return delegate.export(metrics); + } + + List> batches = + splitIntoBatches(metrics, maxBatchSize); + + LOGGER.debug( + "Splitting metrics export: " + + "totalDataPoints={}, " + + "maxBatchSize={}, " + + "batchCount={}", + totalDataPoints, maxBatchSize, + batches.size()); + + List results = + new ArrayList<>(batches.size()); + for (int i = 0; i < batches.size(); i++) { + final List batch = + batches.get(i); + final int batchIndex = i; + CompletableResultCode r = + delegate.export(batch); + r.whenComplete(() -> { + if (!r.isSuccess()) { + logFailedBatch(batchIndex, batch); + } + }); + results.add(r); + } + + return CompletableResultCode.ofAll(results); + } + + /** + * Logs details of a failed batch export. + * + * @param batchIndex the index of the failed batch + * @param batch the batch that failed + */ + private static void logFailedBatch( + final int batchIndex, + final List batch) { + StringBuilder names = new StringBuilder(); + for (MetricData md : batch) { + if (names.length() > 0) { + names.append(","); + } + names.append(md.getName()) + .append("(") + .append( + md.getData() + .getPoints().size()) + .append("pts)"); + } + LOGGER.warn( + "Batch {} failed. Metrics: {}", + batchIndex, names); + } + + /** + * Splits metrics into sub-batches by data point count. + * When a single MetricData has more points than the + * batch limit, its points are split into multiple + * smaller MetricData objects. + * + * @param metrics the full metrics collection + * @param maxBatchSize max data points per batch + * @return list of sub-batches + */ + private List> splitIntoBatches( + final Collection metrics, + final int maxBatchSize) { + List> batches = + new ArrayList<>(); + List currentBatch = + new ArrayList<>(); + int currentPoints = 0; + + for (MetricData md : metrics) { + int pts = + md.getData().getPoints().size(); + + if (pts > maxBatchSize) { + // Flush current batch first + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentPoints = 0; + } + // Split the large MetricData + List subMetrics = + splitMetricData( + md, maxBatchSize); + for (MetricData sub : subMetrics) { + int subPts = sub.getData() + .getPoints().size(); + if (currentPoints > 0 + && currentPoints + subPts + > maxBatchSize) { + batches.add(currentBatch); + currentBatch = + new ArrayList<>(); + currentPoints = 0; + } + currentBatch.add(sub); + currentPoints += subPts; + } + continue; + } + + if (currentPoints > 0 + && currentPoints + pts + > maxBatchSize) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentPoints = 0; + } + + currentBatch.add(md); + currentPoints += pts; + } + + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + } + + return batches; + } + + /** + * Splits a single MetricData into multiple MetricData + * objects, each containing at most maxChunkSize points. + * The original metadata (resource, scope, name, etc.) + * is preserved in each resulting MetricData. + * + *

NOTE: This method and the createXxx helpers below + * use OTel SDK internal APIs (ImmutableMetricData, + * ImmutableGaugeData, etc. from the + * io.opentelemetry.sdk.metrics.internal.data package). + * These are not public API and may change across SDK + * upgrades. When upgrading the OTel SDK version, check + * for breaking changes in these factory methods. + * + * @param md the MetricData to split + * @param maxChunkSize max points per chunk + * @return list of MetricData, each with <= + * maxChunkSize points + */ + @SuppressWarnings("unchecked") + private static List splitMetricData( + final MetricData md, + final int maxChunkSize) { + + List allPoints = + new ArrayList<>( + md.getData().getPoints()); + MetricDataType type = md.getType(); + List> chunks = + Lists.partition(allPoints, maxChunkSize); + + List result = + new ArrayList<>(chunks.size()); + for (List chunk : chunks) { + result.add( + createMetricDataForType( + md, type, chunk)); + } + return result; + } + + /** + * Creates a new MetricData of the given type with the + * specified subset of data points. Preserves the + * original metadata from the source MetricData. + * + * @param src the original MetricData + * @param type the MetricDataType + * @param pts the subset of points + * @return a new MetricData with the given points + */ + @SuppressWarnings("unchecked") + private static MetricData createMetricDataForType( + final MetricData src, + final MetricDataType type, + final List pts) { + + switch (type) { + case LONG_GAUGE: + return ImmutableMetricData + .createLongGauge( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableGaugeData.create( + (Collection) + (Collection) pts)); + case DOUBLE_GAUGE: + return ImmutableMetricData + .createDoubleGauge( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableGaugeData.create( + (Collection) + (Collection) pts)); + case LONG_SUM: + return createLongSum(src, pts); + case DOUBLE_SUM: + return createDoubleSum(src, pts); + case HISTOGRAM: + return createHistogram(src, pts); + case EXPONENTIAL_HISTOGRAM: + return createExpHistogram( + src, pts); + case SUMMARY: + return createSummary(src, pts); + default: + throw new IllegalArgumentException( + "Unsupported MetricDataType: " + + type); + } + } + + /** + * Creates a LONG_SUM MetricData with a subset of + * points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createLongSum( + final MetricData src, + final List pts) { + SumData sumData = + src.getLongSumData(); + return ImmutableMetricData.createLongSum( + src.getResource(), + src.getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableSumData.create( + sumData.isMonotonic(), + sumData + .getAggregationTemporality(), + (Collection) + (Collection) pts)); + } + + /** + * Creates a DOUBLE_SUM MetricData with a subset of + * points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createDoubleSum( + final MetricData src, + final List pts) { + SumData sumData = + src.getDoubleSumData(); + return ImmutableMetricData.createDoubleSum( + src.getResource(), + src.getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableSumData.create( + sumData.isMonotonic(), + sumData + .getAggregationTemporality(), + (Collection) + (Collection) pts)); + } + + /** + * Creates a HISTOGRAM MetricData with a subset of + * points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createHistogram( + final MetricData src, + final List pts) { + HistogramData histData = + src.getHistogramData(); + return ImmutableMetricData + .createDoubleHistogram( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableHistogramData.create( + histData + .getAggregationTemporality(), + (Collection) + (Collection) pts)); + } + + /** + * Creates an EXPONENTIAL_HISTOGRAM MetricData with a + * subset of points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createExpHistogram( + final MetricData src, + final List pts) { + ExponentialHistogramData ehData = + src.getExponentialHistogramData(); + return ImmutableMetricData + .createExponentialHistogram( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableExponentialHistogramData + .create( + ehData + .getAggregationTemporality(), + (Collection< + ExponentialHistogramPointData>) + (Collection) pts)); + } + + /** + * Creates a SUMMARY MetricData with a subset of points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createSummary( + final MetricData src, + final List pts) { + return ImmutableMetricData + .createDoubleSummary( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableSummaryData.create( + (Collection) + (Collection) pts)); + } + + /** {@inheritDoc} */ + @Override + public AggregationTemporality + getAggregationTemporality( + final InstrumentType instrumentType) { + return delegate.getAggregationTemporality( + instrumentType); + } + + /** {@inheritDoc} */ + @Override + public Aggregation getDefaultAggregation( + final InstrumentType instrumentType) { + return delegate + .getDefaultAggregation(instrumentType); + } + + /** {@inheritDoc} */ + @Override + public CompletableResultCode flush() { + return delegate.flush(); + } + + /** {@inheritDoc} */ + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 2f8a07020c5..60e86d46909 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -123,7 +123,7 @@ public class BrokerMetricsManager { private final ConsumerLagCalculator consumerLagCalculator; private final LiteConsumerLagCalculator liteConsumerLagCalculator; private final Map labelMap = new HashMap<>(); - private OtlpGrpcMetricExporter metricExporter; + private MetricExporter metricExporter; private PeriodicMetricReader periodicMetricReader; private PrometheusHttpServer prometheusHttpServer; private MetricExporter loggingMetricExporter; @@ -358,6 +358,7 @@ private void init() { } OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() .setEndpoint(endpoint) + .setCompression("gzip") .setTimeout(brokerConfig.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) .setAggregationTemporalitySelector(type -> { if (brokerConfig.isMetricsInDelta() && @@ -382,7 +383,9 @@ private void init() { headerMap.forEach(metricExporterBuilder::addHeader); } - metricExporter = metricExporterBuilder.build(); + OtlpGrpcMetricExporter otlpExporter = metricExporterBuilder.build(); + metricExporter = new BatchSplittingMetricExporter(otlpExporter, + brokerConfig::getMetricsExportBatchMaxDataPoints); periodicMetricReader = PeriodicMetricReader.builder(metricExporter) .setInterval(brokerConfig.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java new file mode 100644 index 00000000000..8d518667383 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java @@ -0,0 +1,586 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.resources.Resource; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BatchSplittingMetricExporterTest { + + private MetricExporter delegate; + private static final int MAX_DATA_POINTS = 10; + + @Before + public void setUp() { + delegate = mock(MetricExporter.class); + when(delegate.export(anyCollection())) + .thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.flush()) + .thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.shutdown()) + .thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.getAggregationTemporality( + any(InstrumentType.class))) + .thenReturn( + AggregationTemporality.CUMULATIVE); + } + + @Test + public void testConstructorRejectsNullDelegate() { + assertThatThrownBy(() -> + new BatchSplittingMetricExporter( + null, () -> 100)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void testConstructorRejectsNullSupplier() { + assertThatThrownBy(() -> + new BatchSplittingMetricExporter( + delegate, null)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void testExportEmptyCollection() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + CompletableResultCode result = + exporter.export(Collections.emptyList()); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)) + .export(Collections.emptyList()); + } + + @Test + public void testFastPathWhenBelowThreshold() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + List metrics = Arrays.asList( + createMockMetricData("metric1", 3), + createMockMetricData("metric2", 3), + createMockMetricData("metric3", 3) + ); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).export(metrics); + } + + @Test + public void testFastPathWhenExactlyAtThreshold() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + List metrics = Arrays.asList( + createMockMetricData("metric1", 5), + createMockMetricData("metric2", 5) + ); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).export(metrics); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitWhenAboveThreshold() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + MetricData m1 = createMockMetricData("m1", 5); + MetricData m2 = createMockMetricData("m2", 5); + MetricData m3 = createMockMetricData("m3", 5); + List metrics = + Arrays.asList(m1, m2, m3); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result).isNotNull(); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(2)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches).hasSize(2); + assertThat(batches.get(0)) + .containsExactly(m1, m2); + assertThat(batches.get(1)) + .containsExactly(m3); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitSingleLargeMetricData() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + // A single MetricData with 25 points. + // maxBatchSize=10, so it should be split + // into 3 sub-MetricData: 10, 10, 5 points. + // Each goes into its own batch. + MetricData largeMetric = + createRealLongGaugeMetricData( + "large_metric", 25); + List metrics = + Collections.singletonList(largeMetric); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result).isNotNull(); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(3)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches).hasSize(3); + + // Verify each batch has correct point count + assertThat(totalPoints(batches.get(0))) + .isEqualTo(10); + assertThat(totalPoints(batches.get(1))) + .isEqualTo(10); + assertThat(totalPoints(batches.get(2))) + .isEqualTo(5); + + // Verify metadata preserved + for (Collection batch : batches) { + for (MetricData md : batch) { + assertThat(md.getName()) + .isEqualTo("large_metric"); + assertThat(md.getType()) + .isEqualTo( + MetricDataType.LONG_GAUGE); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitSingleLargeMetricExactMultiple() { + // 20 points / maxBatchSize 10 = exactly 2 batches + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + MetricData largeMetric = + createRealLongGaugeMetricData( + "exact_metric", 20); + List metrics = + Collections.singletonList(largeMetric); + + exporter.export(metrics); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(2)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(totalPoints(batches.get(0))) + .isEqualTo(10); + assertThat(totalPoints(batches.get(1))) + .isEqualTo(10); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitLargeMetricMixedWithSmall() { + // maxBatchSize = 10 + // m1: 3 pts (small), large: 25 pts, m3: 4 pts + // Expected: + // batch1: [m1] (3 pts) - flushed before large + // large split into: sub1(10), sub2(10), sub3(5) + // batch2: [sub1] (10 pts) + // batch3: [sub2] (10 pts) + // batch4: [sub3, m3] (5+4=9 pts) + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + MetricData m1 = + createMockMetricData("m1", 3); + MetricData large = + createRealLongGaugeMetricData( + "large", 25); + MetricData m3 = + createMockMetricData("m3", 4); + + exporter.export(Arrays.asList(m1, large, m3)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(4)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches).hasSize(4); + + // batch 1: m1 (3 pts) + assertThat(totalPoints(batches.get(0))) + .isEqualTo(3); + // batch 2: sub1 of large (10 pts) + assertThat(totalPoints(batches.get(1))) + .isEqualTo(10); + // batch 3: sub2 of large (10 pts) + assertThat(totalPoints(batches.get(2))) + .isEqualTo(10); + // batch 4: sub3 of large (5) + m3 (4) = 9 + assertThat(totalPoints(batches.get(3))) + .isEqualTo(9); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitMultipleBatches() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 5); + + MetricData m1 = + createMockMetricData("m1", 3); + MetricData m2 = + createMockMetricData("m2", 3); + MetricData m3 = + createMockMetricData("m3", 3); + MetricData m4 = + createMockMetricData("m4", 3); + MetricData m5 = + createMockMetricData("m5", 3); + List metrics = + Arrays.asList(m1, m2, m3, m4, m5); + + exporter.export(metrics); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(5)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches.get(0)) + .containsExactly(m1); + assertThat(batches.get(1)) + .containsExactly(m2); + assertThat(batches.get(2)) + .containsExactly(m3); + assertThat(batches.get(3)) + .containsExactly(m4); + assertThat(batches.get(4)) + .containsExactly(m5); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitMixedSizeMetricData() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 10); + + MetricData m1 = + createMockMetricData("m1", 2); + MetricData m2 = + createMockMetricData("m2", 7); + MetricData m3 = + createMockMetricData("m3", 3); + MetricData m4 = + createMockMetricData("m4", 8); + MetricData m5 = + createMockMetricData("m5", 1); + List metrics = + Arrays.asList(m1, m2, m3, m4, m5); + + exporter.export(metrics); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(3)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches.get(0)) + .containsExactly(m1, m2); + assertThat(batches.get(1)) + .containsExactly(m3); + assertThat(batches.get(2)) + .containsExactly(m4, m5); + } + + @Test + public void testDelegateFailureIsPropagated() { + when(delegate.export(anyCollection())) + .thenReturn( + CompletableResultCode.ofFailure()); + + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + List metrics = + Collections.singletonList( + createMockMetricData("metric1", 5)); + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result.isSuccess()).isFalse(); + } + + @Test + public void testFlushDelegatesToDelegate() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + CompletableResultCode result = + exporter.flush(); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).flush(); + } + + @Test + public void testShutdownDelegatesToDelegate() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + CompletableResultCode result = + exporter.shutdown(); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).shutdown(); + } + + @Test + public void testGetAggregationTemporality() { + when(delegate.getAggregationTemporality( + InstrumentType.COUNTER)) + .thenReturn(AggregationTemporality.DELTA); + + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + AggregationTemporality result = + exporter.getAggregationTemporality( + InstrumentType.COUNTER); + + assertThat(result) + .isEqualTo(AggregationTemporality.DELTA); + verify(delegate, times(1)) + .getAggregationTemporality( + InstrumentType.COUNTER); + } + + @Test + public void testExportNullCollection() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + exporter.export(null); + verify(delegate, times(1)).export(null); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitPreservesAllPoints() { + // Verify no points are lost during split. + // 83 points with maxBatch=10 -> 9 batches + // (8*10 + 1*3) + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 10); + + MetricData large = + createRealLongGaugeMetricData( + "big_metric", 83); + + exporter.export( + Collections.singletonList(large)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + // 83 / 10 = 8 full + 1 partial = 9 batches + verify(delegate, times(9)) + .export(captor.capture()); + + int totalPts = 0; + for (Collection batch + : captor.getAllValues()) { + for (MetricData md : batch) { + totalPts += + md.getData() + .getPoints().size(); + } + } + assertThat(totalPts).isEqualTo(83); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitPointsContentPreserved() { + // Verify actual point data values are preserved + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 3); + + MetricData metric = + createRealLongGaugeMetricData( + "val_metric", 5); + + exporter.export( + Collections.singletonList(metric)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(2)) + .export(captor.capture()); + + // Collect all point values from batches + List allValues = new ArrayList<>(); + for (Collection batch + : captor.getAllValues()) { + for (MetricData md : batch) { + for (PointData pt + : md.getData().getPoints()) { + LongPointData lp = + (LongPointData) pt; + allValues.add(lp.getValue()); + } + } + } + + // Values should be 0, 1, 2, 3, 4 + assertThat(allValues) + .containsExactly(0L, 1L, 2L, 3L, 4L); + } + + /** + * Creates a mock MetricData with the specified + * number of data points. + */ + @SuppressWarnings("unchecked") + private MetricData createMockMetricData( + final String name, + final int numPoints) { + List points = new ArrayList<>(); + for (int i = 0; i < numPoints; i++) { + points.add(mock(PointData.class)); + } + + Data data = mock(Data.class); + doReturn(points).when(data).getPoints(); + + MetricData metricData = mock(MetricData.class); + when(metricData.getName()).thenReturn(name); + doReturn(data).when(metricData).getData(); + + return metricData; + } + + /** + * Creates a real LONG_GAUGE MetricData using the + * OTel SDK ImmutableMetricData, suitable for testing + * the split logic which needs to reconstruct + * MetricData objects. + */ + private MetricData createRealLongGaugeMetricData( + final String name, + final int numPoints) { + List points = + new ArrayList<>(); + for (int i = 0; i < numPoints; i++) { + points.add( + ImmutableLongPointData.create( + 0L, 1L, + io.opentelemetry.api.common + .Attributes.empty(), + (long) i)); + } + return ImmutableMetricData.createLongGauge( + Resource.getDefault(), + InstrumentationScopeInfo.empty(), + name, + "test description", + "1", + ImmutableGaugeData.create(points)); + } + + /** + * Calculates total data points in a batch. + */ + private int totalPoints( + final Collection batch) { + int total = 0; + for (MetricData md : batch) { + total += + md.getData().getPoints().size(); + } + return total; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 7271c12b187..76bd356a54a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -403,6 +403,8 @@ public class BrokerConfig extends BrokerIdentity { // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx private String metricsLabel = ""; + private int metricsExportBatchMaxDataPoints = 1000; + private boolean metricsInDelta = false; private boolean enableRemotingMetrics = true; @@ -1865,6 +1867,14 @@ public void setMetricsInDelta(boolean metricsInDelta) { this.metricsInDelta = metricsInDelta; } + public int getMetricsExportBatchMaxDataPoints() { + return metricsExportBatchMaxDataPoints; + } + + public void setMetricsExportBatchMaxDataPoints(int metricsExportBatchMaxDataPoints) { + this.metricsExportBatchMaxDataPoints = metricsExportBatchMaxDataPoints; + } + public int getMetricsPromExporterPort() { return metricsPromExporterPort; } From 253595e8a1f8b0fdb2fea68cc6af26b8be724339 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=A9=E9=BE=99?= Date: Thu, 2 Apr 2026 19:43:53 +0800 Subject: [PATCH 2/3] fix(metrics): snapshot MetricData points before export to prevent AIOOBE The OTel SDK's NumberDataPointMarshaler.createRepeated allocates an array based on points.size() then iterates. If callback threads concurrently add data points between size() and iteration, an ArrayIndexOutOfBoundsException occurs. This adds a defensive snapshot of all data point collections at the start of export(), ensuring the delegate exporter always receives immutable point collections. --- .../metrics/BatchSplittingMetricExporter.java | 56 ++++++++++++++++++- 1 file changed, 53 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java index ec7cb4a038e..cd5e8b20a83 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java @@ -115,21 +115,26 @@ public CompletableResultCode export( return delegate.export(metrics); } + // Snapshot to avoid concurrent-modification AIOOBE + // in OTel SDK marshaling (see NumberDataPointMarshaler) + List snapshotted = + snapshotAllMetrics(metrics); + int maxBatchSize = maxBatchSizeSupplier.getAsInt(); int totalDataPoints = 0; - for (MetricData md : metrics) { + for (MetricData md : snapshotted) { totalDataPoints += md.getData().getPoints().size(); } if (totalDataPoints <= maxBatchSize) { - return delegate.export(metrics); + return delegate.export(snapshotted); } List> batches = - splitIntoBatches(metrics, maxBatchSize); + splitIntoBatches(snapshotted, maxBatchSize); LOGGER.debug( "Splitting metrics export: " @@ -184,6 +189,51 @@ private static void logFailedBatch( batchIndex, names); } + /** + * Creates defensive snapshots of all MetricData by + * copying their data point collections into new + * ArrayLists. This prevents + * {@link ArrayIndexOutOfBoundsException} in the OTel + * SDK marshaling code when callback threads + * concurrently modify point collections during export. + * + * @param metrics the original metrics collection + * @return list of snapshotted MetricData + */ + private static List snapshotAllMetrics( + final Collection metrics) { + List result = + new ArrayList<>(metrics.size()); + for (MetricData md : metrics) { + try { + result.add(snapshotMetricData(md)); + } catch (Exception e) { + LOGGER.warn( + "Failed to snapshot MetricData:" + + " {}, using original", + md.getName(), e); + result.add(md); + } + } + return result; + } + + /** + * Creates a snapshot of a single MetricData by copying + * its points into a new ArrayList and reconstructing + * the MetricData with immutable internal data. + * + * @param md the original MetricData + * @return a new MetricData with snapshotted points + */ + private static MetricData snapshotMetricData( + final MetricData md) { + List points = + new ArrayList<>(md.getData().getPoints()); + return createMetricDataForType( + md, md.getType(), points); + } + /** * Splits metrics into sub-batches by data point count. * When a single MetricData has more points than the From 21215a89df39997bfe7f0cdce4587e1323d8fd19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B5=A9=E9=BE=99?= Date: Thu, 2 Apr 2026 19:49:50 +0800 Subject: [PATCH 3/3] test(metrics): add unit tests for snapshot defensive copy - testSnapshotCreatesNewMetricData: verify delegate receives snapshotted MetricData, not the original reference - testSnapshotFallsBackToOriginal: verify catch block falls back to original when snapshot fails (e.g., mock without type) - testSnapshotPointsAreIndependentCopy: verify the snapshotted points collection is a separate instance from the original --- .../BatchSplittingMetricExporterTest.java | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java index 8d518667383..0b2278beda6 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java @@ -520,6 +520,96 @@ public void testSplitPointsContentPreserved() { .containsExactly(0L, 1L, 2L, 3L, 4L); } + @Test + @SuppressWarnings("unchecked") + public void testSnapshotCreatesNewMetricData() { + // On fast path, delegate should receive + // snapshotted MetricData, not the original. + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 100); + + MetricData original = + createRealLongGaugeMetricData("test", 5); + + exporter.export( + Collections.singletonList(original)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(1)) + .export(captor.capture()); + + MetricData exported = + captor.getValue().iterator().next(); + assertThat(exported).isNotSameAs(original); + assertThat(exported.getName()) + .isEqualTo("test"); + assertThat(exported.getType()) + .isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(exported.getData().getPoints()) + .hasSize(5); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotFallsBackToOriginal() { + // Mock MetricData has no type set, so snapshot + // will fail. Should fall back to original object. + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 100); + + MetricData mockMd = + createMockMetricData("mock", 3); + + exporter.export( + Collections.singletonList(mockMd)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(1)) + .export(captor.capture()); + + MetricData exported = + captor.getValue().iterator().next(); + assertThat(exported).isSameAs(mockMd); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotPointsAreIndependentCopy() { + // Verify snapshot points collection is a separate + // copy from the original, preventing concurrent + // modification issues. + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 100); + + MetricData original = + createRealLongGaugeMetricData("test", 5); + Collection originalPoints = + original.getData().getPoints(); + + exporter.export( + Collections.singletonList(original)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(1)) + .export(captor.capture()); + + MetricData exported = + captor.getValue().iterator().next(); + Collection exportedPoints = + exported.getData().getPoints(); + + assertThat(exportedPoints) + .isNotSameAs(originalPoints); + assertThat(exportedPoints) + .hasSize(originalPoints.size()); + } + /** * Creates a mock MetricData with the specified * number of data points.