diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java new file mode 100644 index 00000000000..cd5e8b20a83 --- /dev/null +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporter.java @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +import com.google.common.collect.Lists; +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.metrics.Aggregation; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.DoublePointData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramData; +import io.opentelemetry.sdk.metrics.data.ExponentialHistogramPointData; +import io.opentelemetry.sdk.metrics.data.HistogramData; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.data.SumData; +import io.opentelemetry.sdk.metrics.data.SummaryPointData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableExponentialHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableHistogramData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableSummaryData; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.IntSupplier; + +/** + * A {@link MetricExporter} decorator that splits large + * metric batches into smaller sub-batches before delegating + * to the underlying exporter. + * + *

This addresses the gRPC 32MB payload size limit when + * exporting OTLP metrics. High-cardinality metrics (e.g., + * consumer lag with consumer_group x topic combinations) + * can produce payloads exceeding this limit, causing all + * metrics to fail to export. + * + *

Splitting is based on the total number of data points + * (not the number of MetricData objects), because a single + * MetricData can contain thousands of data points. When the + * total data point count is within the configured threshold, + * the batch is passed through directly (fast path). + * + *

When a single MetricData contains more data points + * than the batch limit, its internal points are split into + * multiple smaller MetricData objects, each preserving the + * original resource, scope, name, description, unit, and + * type metadata. + */ +public final class BatchSplittingMetricExporter + implements MetricExporter { + + /** Logger. */ + private static final Logger LOGGER = + LoggerFactory.getLogger( + LoggerName.BROKER_LOGGER_NAME); + + /** The underlying exporter to delegate to. */ + private final MetricExporter delegate; + + /** Supplies the max data points per batch at runtime. */ + private final IntSupplier maxBatchSizeSupplier; + + /** + * Creates a new BatchSplittingMetricExporter. + * + * @param metricExporter the underlying MetricExporter + * @param batchSizeSupplier supplies the max number + * of data points per batch; must return > 0 + */ + public BatchSplittingMetricExporter( + final MetricExporter metricExporter, + final IntSupplier batchSizeSupplier) { + if (metricExporter == null) { + throw new NullPointerException( + "metricExporter must not be null"); + } + if (batchSizeSupplier == null) { + throw new NullPointerException( + "batchSizeSupplier must not be null"); + } + this.delegate = metricExporter; + this.maxBatchSizeSupplier = batchSizeSupplier; + } + + /** {@inheritDoc} */ + @Override + public CompletableResultCode export( + final Collection metrics) { + if (metrics == null || metrics.isEmpty()) { + return delegate.export(metrics); + } + + // Snapshot to avoid concurrent-modification AIOOBE + // in OTel SDK marshaling (see NumberDataPointMarshaler) + List snapshotted = + snapshotAllMetrics(metrics); + + int maxBatchSize = + maxBatchSizeSupplier.getAsInt(); + + int totalDataPoints = 0; + for (MetricData md : snapshotted) { + totalDataPoints += + md.getData().getPoints().size(); + } + + if (totalDataPoints <= maxBatchSize) { + return delegate.export(snapshotted); + } + + List> batches = + splitIntoBatches(snapshotted, maxBatchSize); + + LOGGER.debug( + "Splitting metrics export: " + + "totalDataPoints={}, " + + "maxBatchSize={}, " + + "batchCount={}", + totalDataPoints, maxBatchSize, + batches.size()); + + List results = + new ArrayList<>(batches.size()); + for (int i = 0; i < batches.size(); i++) { + final List batch = + batches.get(i); + final int batchIndex = i; + CompletableResultCode r = + delegate.export(batch); + r.whenComplete(() -> { + if (!r.isSuccess()) { + logFailedBatch(batchIndex, batch); + } + }); + results.add(r); + } + + return CompletableResultCode.ofAll(results); + } + + /** + * Logs details of a failed batch export. + * + * @param batchIndex the index of the failed batch + * @param batch the batch that failed + */ + private static void logFailedBatch( + final int batchIndex, + final List batch) { + StringBuilder names = new StringBuilder(); + for (MetricData md : batch) { + if (names.length() > 0) { + names.append(","); + } + names.append(md.getName()) + .append("(") + .append( + md.getData() + .getPoints().size()) + .append("pts)"); + } + LOGGER.warn( + "Batch {} failed. Metrics: {}", + batchIndex, names); + } + + /** + * Creates defensive snapshots of all MetricData by + * copying their data point collections into new + * ArrayLists. This prevents + * {@link ArrayIndexOutOfBoundsException} in the OTel + * SDK marshaling code when callback threads + * concurrently modify point collections during export. + * + * @param metrics the original metrics collection + * @return list of snapshotted MetricData + */ + private static List snapshotAllMetrics( + final Collection metrics) { + List result = + new ArrayList<>(metrics.size()); + for (MetricData md : metrics) { + try { + result.add(snapshotMetricData(md)); + } catch (Exception e) { + LOGGER.warn( + "Failed to snapshot MetricData:" + + " {}, using original", + md.getName(), e); + result.add(md); + } + } + return result; + } + + /** + * Creates a snapshot of a single MetricData by copying + * its points into a new ArrayList and reconstructing + * the MetricData with immutable internal data. + * + * @param md the original MetricData + * @return a new MetricData with snapshotted points + */ + private static MetricData snapshotMetricData( + final MetricData md) { + List points = + new ArrayList<>(md.getData().getPoints()); + return createMetricDataForType( + md, md.getType(), points); + } + + /** + * Splits metrics into sub-batches by data point count. + * When a single MetricData has more points than the + * batch limit, its points are split into multiple + * smaller MetricData objects. + * + * @param metrics the full metrics collection + * @param maxBatchSize max data points per batch + * @return list of sub-batches + */ + private List> splitIntoBatches( + final Collection metrics, + final int maxBatchSize) { + List> batches = + new ArrayList<>(); + List currentBatch = + new ArrayList<>(); + int currentPoints = 0; + + for (MetricData md : metrics) { + int pts = + md.getData().getPoints().size(); + + if (pts > maxBatchSize) { + // Flush current batch first + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentPoints = 0; + } + // Split the large MetricData + List subMetrics = + splitMetricData( + md, maxBatchSize); + for (MetricData sub : subMetrics) { + int subPts = sub.getData() + .getPoints().size(); + if (currentPoints > 0 + && currentPoints + subPts + > maxBatchSize) { + batches.add(currentBatch); + currentBatch = + new ArrayList<>(); + currentPoints = 0; + } + currentBatch.add(sub); + currentPoints += subPts; + } + continue; + } + + if (currentPoints > 0 + && currentPoints + pts + > maxBatchSize) { + batches.add(currentBatch); + currentBatch = new ArrayList<>(); + currentPoints = 0; + } + + currentBatch.add(md); + currentPoints += pts; + } + + if (!currentBatch.isEmpty()) { + batches.add(currentBatch); + } + + return batches; + } + + /** + * Splits a single MetricData into multiple MetricData + * objects, each containing at most maxChunkSize points. + * The original metadata (resource, scope, name, etc.) + * is preserved in each resulting MetricData. + * + *

NOTE: This method and the createXxx helpers below + * use OTel SDK internal APIs (ImmutableMetricData, + * ImmutableGaugeData, etc. from the + * io.opentelemetry.sdk.metrics.internal.data package). + * These are not public API and may change across SDK + * upgrades. When upgrading the OTel SDK version, check + * for breaking changes in these factory methods. + * + * @param md the MetricData to split + * @param maxChunkSize max points per chunk + * @return list of MetricData, each with <= + * maxChunkSize points + */ + @SuppressWarnings("unchecked") + private static List splitMetricData( + final MetricData md, + final int maxChunkSize) { + + List allPoints = + new ArrayList<>( + md.getData().getPoints()); + MetricDataType type = md.getType(); + List> chunks = + Lists.partition(allPoints, maxChunkSize); + + List result = + new ArrayList<>(chunks.size()); + for (List chunk : chunks) { + result.add( + createMetricDataForType( + md, type, chunk)); + } + return result; + } + + /** + * Creates a new MetricData of the given type with the + * specified subset of data points. Preserves the + * original metadata from the source MetricData. + * + * @param src the original MetricData + * @param type the MetricDataType + * @param pts the subset of points + * @return a new MetricData with the given points + */ + @SuppressWarnings("unchecked") + private static MetricData createMetricDataForType( + final MetricData src, + final MetricDataType type, + final List pts) { + + switch (type) { + case LONG_GAUGE: + return ImmutableMetricData + .createLongGauge( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableGaugeData.create( + (Collection) + (Collection) pts)); + case DOUBLE_GAUGE: + return ImmutableMetricData + .createDoubleGauge( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableGaugeData.create( + (Collection) + (Collection) pts)); + case LONG_SUM: + return createLongSum(src, pts); + case DOUBLE_SUM: + return createDoubleSum(src, pts); + case HISTOGRAM: + return createHistogram(src, pts); + case EXPONENTIAL_HISTOGRAM: + return createExpHistogram( + src, pts); + case SUMMARY: + return createSummary(src, pts); + default: + throw new IllegalArgumentException( + "Unsupported MetricDataType: " + + type); + } + } + + /** + * Creates a LONG_SUM MetricData with a subset of + * points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createLongSum( + final MetricData src, + final List pts) { + SumData sumData = + src.getLongSumData(); + return ImmutableMetricData.createLongSum( + src.getResource(), + src.getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableSumData.create( + sumData.isMonotonic(), + sumData + .getAggregationTemporality(), + (Collection) + (Collection) pts)); + } + + /** + * Creates a DOUBLE_SUM MetricData with a subset of + * points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createDoubleSum( + final MetricData src, + final List pts) { + SumData sumData = + src.getDoubleSumData(); + return ImmutableMetricData.createDoubleSum( + src.getResource(), + src.getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableSumData.create( + sumData.isMonotonic(), + sumData + .getAggregationTemporality(), + (Collection) + (Collection) pts)); + } + + /** + * Creates a HISTOGRAM MetricData with a subset of + * points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createHistogram( + final MetricData src, + final List pts) { + HistogramData histData = + src.getHistogramData(); + return ImmutableMetricData + .createDoubleHistogram( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableHistogramData.create( + histData + .getAggregationTemporality(), + (Collection) + (Collection) pts)); + } + + /** + * Creates an EXPONENTIAL_HISTOGRAM MetricData with a + * subset of points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createExpHistogram( + final MetricData src, + final List pts) { + ExponentialHistogramData ehData = + src.getExponentialHistogramData(); + return ImmutableMetricData + .createExponentialHistogram( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableExponentialHistogramData + .create( + ehData + .getAggregationTemporality(), + (Collection< + ExponentialHistogramPointData>) + (Collection) pts)); + } + + /** + * Creates a SUMMARY MetricData with a subset of points. + * + * @param src the original MetricData + * @param pts the subset of points + * @return new MetricData + */ + @SuppressWarnings("unchecked") + private static MetricData createSummary( + final MetricData src, + final List pts) { + return ImmutableMetricData + .createDoubleSummary( + src.getResource(), + src + .getInstrumentationScopeInfo(), + src.getName(), + src.getDescription(), + src.getUnit(), + ImmutableSummaryData.create( + (Collection) + (Collection) pts)); + } + + /** {@inheritDoc} */ + @Override + public AggregationTemporality + getAggregationTemporality( + final InstrumentType instrumentType) { + return delegate.getAggregationTemporality( + instrumentType); + } + + /** {@inheritDoc} */ + @Override + public Aggregation getDefaultAggregation( + final InstrumentType instrumentType) { + return delegate + .getDefaultAggregation(instrumentType); + } + + /** {@inheritDoc} */ + @Override + public CompletableResultCode flush() { + return delegate.flush(); + } + + /** {@inheritDoc} */ + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } +} diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java index 2f8a07020c5..60e86d46909 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java @@ -123,7 +123,7 @@ public class BrokerMetricsManager { private final ConsumerLagCalculator consumerLagCalculator; private final LiteConsumerLagCalculator liteConsumerLagCalculator; private final Map labelMap = new HashMap<>(); - private OtlpGrpcMetricExporter metricExporter; + private MetricExporter metricExporter; private PeriodicMetricReader periodicMetricReader; private PrometheusHttpServer prometheusHttpServer; private MetricExporter loggingMetricExporter; @@ -358,6 +358,7 @@ private void init() { } OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder() .setEndpoint(endpoint) + .setCompression("gzip") .setTimeout(brokerConfig.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS) .setAggregationTemporalitySelector(type -> { if (brokerConfig.isMetricsInDelta() && @@ -382,7 +383,9 @@ private void init() { headerMap.forEach(metricExporterBuilder::addHeader); } - metricExporter = metricExporterBuilder.build(); + OtlpGrpcMetricExporter otlpExporter = metricExporterBuilder.build(); + metricExporter = new BatchSplittingMetricExporter(otlpExporter, + brokerConfig::getMetricsExportBatchMaxDataPoints); periodicMetricReader = PeriodicMetricReader.builder(metricExporter) .setInterval(brokerConfig.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java new file mode 100644 index 00000000000..0b2278beda6 --- /dev/null +++ b/broker/src/test/java/org/apache/rocketmq/broker/metrics/BatchSplittingMetricExporterTest.java @@ -0,0 +1,676 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.broker.metrics; + +import io.opentelemetry.sdk.common.CompletableResultCode; +import io.opentelemetry.sdk.common.InstrumentationScopeInfo; +import io.opentelemetry.sdk.metrics.InstrumentType; +import io.opentelemetry.sdk.metrics.data.AggregationTemporality; +import io.opentelemetry.sdk.metrics.data.Data; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.MetricDataType; +import io.opentelemetry.sdk.metrics.data.PointData; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableGaugeData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData; +import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData; +import io.opentelemetry.sdk.resources.Resource; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class BatchSplittingMetricExporterTest { + + private MetricExporter delegate; + private static final int MAX_DATA_POINTS = 10; + + @Before + public void setUp() { + delegate = mock(MetricExporter.class); + when(delegate.export(anyCollection())) + .thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.flush()) + .thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.shutdown()) + .thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.getAggregationTemporality( + any(InstrumentType.class))) + .thenReturn( + AggregationTemporality.CUMULATIVE); + } + + @Test + public void testConstructorRejectsNullDelegate() { + assertThatThrownBy(() -> + new BatchSplittingMetricExporter( + null, () -> 100)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void testConstructorRejectsNullSupplier() { + assertThatThrownBy(() -> + new BatchSplittingMetricExporter( + delegate, null)) + .isInstanceOf(NullPointerException.class); + } + + @Test + public void testExportEmptyCollection() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + CompletableResultCode result = + exporter.export(Collections.emptyList()); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)) + .export(Collections.emptyList()); + } + + @Test + public void testFastPathWhenBelowThreshold() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + List metrics = Arrays.asList( + createMockMetricData("metric1", 3), + createMockMetricData("metric2", 3), + createMockMetricData("metric3", 3) + ); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).export(metrics); + } + + @Test + public void testFastPathWhenExactlyAtThreshold() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + List metrics = Arrays.asList( + createMockMetricData("metric1", 5), + createMockMetricData("metric2", 5) + ); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).export(metrics); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitWhenAboveThreshold() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + MetricData m1 = createMockMetricData("m1", 5); + MetricData m2 = createMockMetricData("m2", 5); + MetricData m3 = createMockMetricData("m3", 5); + List metrics = + Arrays.asList(m1, m2, m3); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result).isNotNull(); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(2)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches).hasSize(2); + assertThat(batches.get(0)) + .containsExactly(m1, m2); + assertThat(batches.get(1)) + .containsExactly(m3); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitSingleLargeMetricData() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + // A single MetricData with 25 points. + // maxBatchSize=10, so it should be split + // into 3 sub-MetricData: 10, 10, 5 points. + // Each goes into its own batch. + MetricData largeMetric = + createRealLongGaugeMetricData( + "large_metric", 25); + List metrics = + Collections.singletonList(largeMetric); + + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result).isNotNull(); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(3)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches).hasSize(3); + + // Verify each batch has correct point count + assertThat(totalPoints(batches.get(0))) + .isEqualTo(10); + assertThat(totalPoints(batches.get(1))) + .isEqualTo(10); + assertThat(totalPoints(batches.get(2))) + .isEqualTo(5); + + // Verify metadata preserved + for (Collection batch : batches) { + for (MetricData md : batch) { + assertThat(md.getName()) + .isEqualTo("large_metric"); + assertThat(md.getType()) + .isEqualTo( + MetricDataType.LONG_GAUGE); + } + } + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitSingleLargeMetricExactMultiple() { + // 20 points / maxBatchSize 10 = exactly 2 batches + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + MetricData largeMetric = + createRealLongGaugeMetricData( + "exact_metric", 20); + List metrics = + Collections.singletonList(largeMetric); + + exporter.export(metrics); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(2)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(totalPoints(batches.get(0))) + .isEqualTo(10); + assertThat(totalPoints(batches.get(1))) + .isEqualTo(10); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitLargeMetricMixedWithSmall() { + // maxBatchSize = 10 + // m1: 3 pts (small), large: 25 pts, m3: 4 pts + // Expected: + // batch1: [m1] (3 pts) - flushed before large + // large split into: sub1(10), sub2(10), sub3(5) + // batch2: [sub1] (10 pts) + // batch3: [sub2] (10 pts) + // batch4: [sub3, m3] (5+4=9 pts) + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + MetricData m1 = + createMockMetricData("m1", 3); + MetricData large = + createRealLongGaugeMetricData( + "large", 25); + MetricData m3 = + createMockMetricData("m3", 4); + + exporter.export(Arrays.asList(m1, large, m3)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(4)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches).hasSize(4); + + // batch 1: m1 (3 pts) + assertThat(totalPoints(batches.get(0))) + .isEqualTo(3); + // batch 2: sub1 of large (10 pts) + assertThat(totalPoints(batches.get(1))) + .isEqualTo(10); + // batch 3: sub2 of large (10 pts) + assertThat(totalPoints(batches.get(2))) + .isEqualTo(10); + // batch 4: sub3 of large (5) + m3 (4) = 9 + assertThat(totalPoints(batches.get(3))) + .isEqualTo(9); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitMultipleBatches() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 5); + + MetricData m1 = + createMockMetricData("m1", 3); + MetricData m2 = + createMockMetricData("m2", 3); + MetricData m3 = + createMockMetricData("m3", 3); + MetricData m4 = + createMockMetricData("m4", 3); + MetricData m5 = + createMockMetricData("m5", 3); + List metrics = + Arrays.asList(m1, m2, m3, m4, m5); + + exporter.export(metrics); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(5)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches.get(0)) + .containsExactly(m1); + assertThat(batches.get(1)) + .containsExactly(m2); + assertThat(batches.get(2)) + .containsExactly(m3); + assertThat(batches.get(3)) + .containsExactly(m4); + assertThat(batches.get(4)) + .containsExactly(m5); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitMixedSizeMetricData() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 10); + + MetricData m1 = + createMockMetricData("m1", 2); + MetricData m2 = + createMockMetricData("m2", 7); + MetricData m3 = + createMockMetricData("m3", 3); + MetricData m4 = + createMockMetricData("m4", 8); + MetricData m5 = + createMockMetricData("m5", 1); + List metrics = + Arrays.asList(m1, m2, m3, m4, m5); + + exporter.export(metrics); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(3)) + .export(captor.capture()); + + List> batches = + captor.getAllValues(); + assertThat(batches.get(0)) + .containsExactly(m1, m2); + assertThat(batches.get(1)) + .containsExactly(m3); + assertThat(batches.get(2)) + .containsExactly(m4, m5); + } + + @Test + public void testDelegateFailureIsPropagated() { + when(delegate.export(anyCollection())) + .thenReturn( + CompletableResultCode.ofFailure()); + + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + + List metrics = + Collections.singletonList( + createMockMetricData("metric1", 5)); + CompletableResultCode result = + exporter.export(metrics); + + assertThat(result.isSuccess()).isFalse(); + } + + @Test + public void testFlushDelegatesToDelegate() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + CompletableResultCode result = + exporter.flush(); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).flush(); + } + + @Test + public void testShutdownDelegatesToDelegate() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + CompletableResultCode result = + exporter.shutdown(); + + assertThat(result.isSuccess()).isTrue(); + verify(delegate, times(1)).shutdown(); + } + + @Test + public void testGetAggregationTemporality() { + when(delegate.getAggregationTemporality( + InstrumentType.COUNTER)) + .thenReturn(AggregationTemporality.DELTA); + + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + AggregationTemporality result = + exporter.getAggregationTemporality( + InstrumentType.COUNTER); + + assertThat(result) + .isEqualTo(AggregationTemporality.DELTA); + verify(delegate, times(1)) + .getAggregationTemporality( + InstrumentType.COUNTER); + } + + @Test + public void testExportNullCollection() { + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> MAX_DATA_POINTS); + exporter.export(null); + verify(delegate, times(1)).export(null); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitPreservesAllPoints() { + // Verify no points are lost during split. + // 83 points with maxBatch=10 -> 9 batches + // (8*10 + 1*3) + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 10); + + MetricData large = + createRealLongGaugeMetricData( + "big_metric", 83); + + exporter.export( + Collections.singletonList(large)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + // 83 / 10 = 8 full + 1 partial = 9 batches + verify(delegate, times(9)) + .export(captor.capture()); + + int totalPts = 0; + for (Collection batch + : captor.getAllValues()) { + for (MetricData md : batch) { + totalPts += + md.getData() + .getPoints().size(); + } + } + assertThat(totalPts).isEqualTo(83); + } + + @Test + @SuppressWarnings("unchecked") + public void testSplitPointsContentPreserved() { + // Verify actual point data values are preserved + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 3); + + MetricData metric = + createRealLongGaugeMetricData( + "val_metric", 5); + + exporter.export( + Collections.singletonList(metric)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(2)) + .export(captor.capture()); + + // Collect all point values from batches + List allValues = new ArrayList<>(); + for (Collection batch + : captor.getAllValues()) { + for (MetricData md : batch) { + for (PointData pt + : md.getData().getPoints()) { + LongPointData lp = + (LongPointData) pt; + allValues.add(lp.getValue()); + } + } + } + + // Values should be 0, 1, 2, 3, 4 + assertThat(allValues) + .containsExactly(0L, 1L, 2L, 3L, 4L); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotCreatesNewMetricData() { + // On fast path, delegate should receive + // snapshotted MetricData, not the original. + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 100); + + MetricData original = + createRealLongGaugeMetricData("test", 5); + + exporter.export( + Collections.singletonList(original)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(1)) + .export(captor.capture()); + + MetricData exported = + captor.getValue().iterator().next(); + assertThat(exported).isNotSameAs(original); + assertThat(exported.getName()) + .isEqualTo("test"); + assertThat(exported.getType()) + .isEqualTo(MetricDataType.LONG_GAUGE); + assertThat(exported.getData().getPoints()) + .hasSize(5); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotFallsBackToOriginal() { + // Mock MetricData has no type set, so snapshot + // will fail. Should fall back to original object. + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 100); + + MetricData mockMd = + createMockMetricData("mock", 3); + + exporter.export( + Collections.singletonList(mockMd)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(1)) + .export(captor.capture()); + + MetricData exported = + captor.getValue().iterator().next(); + assertThat(exported).isSameAs(mockMd); + } + + @Test + @SuppressWarnings("unchecked") + public void testSnapshotPointsAreIndependentCopy() { + // Verify snapshot points collection is a separate + // copy from the original, preventing concurrent + // modification issues. + BatchSplittingMetricExporter exporter = + new BatchSplittingMetricExporter( + delegate, () -> 100); + + MetricData original = + createRealLongGaugeMetricData("test", 5); + Collection originalPoints = + original.getData().getPoints(); + + exporter.export( + Collections.singletonList(original)); + + ArgumentCaptor> captor = + ArgumentCaptor.forClass(Collection.class); + verify(delegate, times(1)) + .export(captor.capture()); + + MetricData exported = + captor.getValue().iterator().next(); + Collection exportedPoints = + exported.getData().getPoints(); + + assertThat(exportedPoints) + .isNotSameAs(originalPoints); + assertThat(exportedPoints) + .hasSize(originalPoints.size()); + } + + /** + * Creates a mock MetricData with the specified + * number of data points. + */ + @SuppressWarnings("unchecked") + private MetricData createMockMetricData( + final String name, + final int numPoints) { + List points = new ArrayList<>(); + for (int i = 0; i < numPoints; i++) { + points.add(mock(PointData.class)); + } + + Data data = mock(Data.class); + doReturn(points).when(data).getPoints(); + + MetricData metricData = mock(MetricData.class); + when(metricData.getName()).thenReturn(name); + doReturn(data).when(metricData).getData(); + + return metricData; + } + + /** + * Creates a real LONG_GAUGE MetricData using the + * OTel SDK ImmutableMetricData, suitable for testing + * the split logic which needs to reconstruct + * MetricData objects. + */ + private MetricData createRealLongGaugeMetricData( + final String name, + final int numPoints) { + List points = + new ArrayList<>(); + for (int i = 0; i < numPoints; i++) { + points.add( + ImmutableLongPointData.create( + 0L, 1L, + io.opentelemetry.api.common + .Attributes.empty(), + (long) i)); + } + return ImmutableMetricData.createLongGauge( + Resource.getDefault(), + InstrumentationScopeInfo.empty(), + name, + "test description", + "1", + ImmutableGaugeData.create(points)); + } + + /** + * Calculates total data points in a batch. + */ + private int totalPoints( + final Collection batch) { + int total = 0; + for (MetricData md : batch) { + total += + md.getData().getPoints().size(); + } + return total; + } +} diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 7271c12b187..76bd356a54a 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -403,6 +403,8 @@ public class BrokerConfig extends BrokerIdentity { // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx private String metricsLabel = ""; + private int metricsExportBatchMaxDataPoints = 1000; + private boolean metricsInDelta = false; private boolean enableRemotingMetrics = true; @@ -1865,6 +1867,14 @@ public void setMetricsInDelta(boolean metricsInDelta) { this.metricsInDelta = metricsInDelta; } + public int getMetricsExportBatchMaxDataPoints() { + return metricsExportBatchMaxDataPoints; + } + + public void setMetricsExportBatchMaxDataPoints(int metricsExportBatchMaxDataPoints) { + this.metricsExportBatchMaxDataPoints = metricsExportBatchMaxDataPoints; + } + public int getMetricsPromExporterPort() { return metricsPromExporterPort; }