diff --git a/.circleci/config.yml b/.circleci/config.yml index a8031e54..bf951eef 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -20,6 +20,7 @@ default_steps: &default_steps - run: | mvn clean install + - run: cd dogstatsd-http-serializer && mvn clean install jobs: openjdk7: diff --git a/dogstatsd-http-serializer/.gitignore b/dogstatsd-http-serializer/.gitignore new file mode 100644 index 00000000..b83d2226 --- /dev/null +++ b/dogstatsd-http-serializer/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/dogstatsd-http-serializer/pom.xml b/dogstatsd-http-serializer/pom.xml new file mode 100644 index 00000000..1e90218b --- /dev/null +++ b/dogstatsd-http-serializer/pom.xml @@ -0,0 +1,100 @@ + + 4.0.0 + + com.datadoghq + dogstatsd-http-serializer + jar + dogstatsd-http-serializer + 1.0.0-SNAPSHOT + HTTP serializer for DogStatsD metrics. + https://github.com/DataDog/java-dogstatsd-client + + + UTF-8 + + + + + The MIT License (MIT) + http://opensource.org/licenses/MIT + repo + + + + + https://github.com/DataDog/java-dogstatsd-client + scm:git:git@github.com:DataDog/java-dogstatsd-client.git + scm:git:git@github.com:Datadog/java-dogstatsd-client.git + + + + + datadog + Datadog developers + dev@datadoghq.com + + + + + + junit + junit + 4.13.1 + test + + + + + + spotless + + [17.0,) + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.45.0 + + + + 1.28.0 + + + + + + + + check + + + + + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.7 + 1.7 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.19 + + + + diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Buffer.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Buffer.java new file mode 100644 index 00000000..dbc6f08b --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Buffer.java @@ -0,0 +1,52 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.nio.BufferOverflowException; + +abstract class Buffer { + protected int size; + + // more must be >= 0. + // if this returns, size + more <= INT_MAX && size + more <= data.length + protected final void reserve(int more) { + // size + more > capacity, but without integer overflow + if (size > capacity() - more) { + grow(more); + } + } + + protected final void grow(int more) { + if (size > Integer.MAX_VALUE - more) { + throw new BufferOverflowException(); + } + int newSize = size + more; + int cap = capacity(); + if (cap < Integer.MAX_VALUE / 2 && newSize < cap * 2) { + newSize = cap * 2; + } + realloc(newSize); + } + + int length() { + return size; + } + + void clear() { + size = 0; + } + + /** Return true if buf is null or empty */ + static boolean isEmpty(Buffer buf) { + return buf == null || buf.size == 0; + } + + abstract int capacity(); + + protected abstract void realloc(int newSize); +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ByteBuffer.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ByteBuffer.java new file mode 100644 index 00000000..339dcba1 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ByteBuffer.java @@ -0,0 +1,105 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.util.Arrays; + +class ByteBuffer extends Buffer { + byte[] data; + + ByteBuffer() { + data = new byte[64]; + } + + ByteBuffer(int cap) { + data = new byte[cap]; + } + + void put(byte v) { + reserve(1); + data[size++] = v; + } + + void put(byte[] vs, int length) { + reserve(length); + System.arraycopy(vs, 0, data, size, length); + size += length; + } + + void put(byte[] vs) { + put(vs, vs.length); + } + + void put(ByteBuffer buf) { + put(buf.data, buf.size); + } + + void putFixed32(int v) { + put((byte) v); + put((byte) (v >>> 8)); + put((byte) (v >>> 16)); + put((byte) (v >>> 24)); + } + + void putFixed64(long v) { + put((byte) v); + put((byte) (v >>> 8)); + put((byte) (v >>> 16)); + put((byte) (v >>> 24)); + put((byte) (v >>> 32)); + put((byte) (v >>> 40)); + put((byte) (v >>> 48)); + put((byte) (v >>> 56)); + } + + void putFloat32(float v) { + putFixed32(Float.floatToRawIntBits(v)); + } + + void putFloat64(double v) { + putFixed64(Double.doubleToRawLongBits(v)); + } + + void putUint64(long v) { + do { + put((byte) (v & 127 | (v > 127 ? 128 : 0))); + v >>>= 7; + } while (v != 0); + } + + void putSint64(long v) { + putUint64((v >> 63) ^ (v << 1)); + } + + void putBytesFieldHeader(int id, int len) { + putUint64(ProtoUtil.bytesFieldHeader(id)); + putUint64(len); + } + + void putBytesField(int id, ByteBuffer data) { + putBytesFieldHeader(id, data.length()); + put(data); + } + + @Override + int capacity() { + return data.length; + } + + @Override + protected void realloc(int newSize) { + data = Arrays.copyOf(data, newSize); + } + + byte[] toArray() { + if (size == data.length) { + return data; + } + return Arrays.copyOf(data, size); + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Column.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Column.java new file mode 100644 index 00000000..549ca762 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Column.java @@ -0,0 +1,43 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +enum Column { + dictNameStr(1), + dictTagStr(2), + dictTagsets(3), + dictResourceStr(4), + dictResourceLen(5), + dictResourceType(6), + dictResourceName(7), + dictSourceTypeName(8), + dictOriginInfo(9), + types(10), + nameRefs(11), + tagsetRefs(12), + resourcesRefs(13), + intervals(14), + numPoints(15), + timestamps(16), + valsSint64(17), + valsFloat32(18), + valsFloat64(19), + sketchNumBins(20), + sketchBinKeys(21), + sketchBinCnts(22), + sourceTypeNameRefs(23), + originInfoRefs(24); + + static final int MAX = 24; + + final int id; + + Column(int id) { + this.id = id; + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ColumnarBuffer.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ColumnarBuffer.java new file mode 100644 index 00000000..8abe9dcf --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ColumnarBuffer.java @@ -0,0 +1,96 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.nio.BufferOverflowException; + +class ColumnarBuffer { + final ByteBuffer[] buffers = new ByteBuffer[Column.MAX + 1]; + + ByteBuffer column(Column dst) { + int idx = dst.id; + if (buffers[idx] == null) { + buffers[idx] = new ByteBuffer(); + } + return buffers[idx]; + } + + void clear() { + for (ByteBuffer b : buffers) { + if (!Buffer.isEmpty(b)) { + b.clear(); + } + } + } + + void clear(Column dst) { + column(dst).clear(); + } + + void put(ColumnarBuffer other) { + for (int i = 0; i < buffers.length; i++) { + if (Buffer.isEmpty(other.buffers[i])) { + continue; + } + if (buffers[i] == null) { + buffers[i] = new ByteBuffer(); + } + buffers[i].put(other.buffers[i]); + } + } + + void putBytes(Column dst, byte[] val) { + putUint64(dst, val.length); + column(dst).put(val); + } + + void putString(Column dst, String val) { + putBytes(dst, val.getBytes(UTF_8)); + } + + void putUint64(Column dst, long val) { + column(dst).putUint64(val); + } + + void putSint64(Column dst, long val) { + column(dst).putSint64(val); + } + + void putFloat32(Column dst, float val) { + column(dst).putFloat32(val); + } + + void putFloat64(Column dst, double val) { + column(dst).putFloat64(val); + } + + int length() { + int n = 0; + for (int i = 0; i < buffers.length; i++) { + if (Buffer.isEmpty(buffers[i])) { + continue; + } + int len = ProtoUtil.fieldLen(i, buffers[i].length()); + if (len > Integer.MAX_VALUE - n) { + throw new BufferOverflowException(); + } + n += len; + } + return n; + } + + void renderProtobufTo(ByteBuffer p) { + for (int i = 0; i < buffers.length; i++) { + if (!Buffer.isEmpty(buffers[i])) { + p.putBytesField(i, buffers[i]); + } + } + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/DeltaEncoder.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/DeltaEncoder.java new file mode 100644 index 00000000..1668b401 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/DeltaEncoder.java @@ -0,0 +1,22 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +class DeltaEncoder { + private long prev = 0; + + long encode(long v) { + long r = v - prev; + prev = v; + return r; + } + + void clear() { + prev = 0; + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/DoubleBuffer.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/DoubleBuffer.java new file mode 100644 index 00000000..73d70caf --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/DoubleBuffer.java @@ -0,0 +1,33 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.util.Arrays; + +class DoubleBuffer extends Buffer { + double[] data = new double[16]; + + void put(double v) { + reserve(1); + data[size++] = v; + } + + double get(int i) { + return data[i]; + } + + @Override + int capacity() { + return data.length; + } + + @Override + protected void realloc(int newSize) { + data = Arrays.copyOf(data, newSize); + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Interner.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Interner.java new file mode 100644 index 00000000..bef16438 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Interner.java @@ -0,0 +1,49 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.util.HashMap; + +class Interner { + interface Encoder { + void encode(T val); + } + + private HashMap inner = new HashMap<>(); + private long lastId = 0; + private final Encoder encoder; + private final T empty; + + Interner(T empty, Encoder encoder) { + this.empty = empty; + this.encoder = encoder; + clear(); + } + + long intern(T val) { + if (val == null || empty.equals(val)) { + return 0; + } + + Long id = inner.get(val); + if (id != null) { + return id; + } + + encoder.encode(val); + + lastId++; + inner.put(val, lastId); + return lastId; + } + + void clear() { + inner.clear(); + lastId = 0; + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/LongBuffer.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/LongBuffer.java new file mode 100644 index 00000000..6cd81dc0 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/LongBuffer.java @@ -0,0 +1,37 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.util.Arrays; + +class LongBuffer extends Buffer { + long[] data = new long[16]; + + void put(long v) { + reserve(1); + data[size++] = v; + } + + long get(int i) { + return data[i]; + } + + @Override + int capacity() { + return data.length; + } + + @Override + protected void realloc(int newSize) { + data = Arrays.copyOf(data, newSize); + } + + void sort() { + Arrays.sort(data, 0, size); + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Metric.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Metric.java new file mode 100644 index 00000000..913ec5b9 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Metric.java @@ -0,0 +1,107 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.util.List; + +abstract class Metric> { + final PayloadBuilder pb; + + final long type; + final String name; + List tags = null; + List resources = null; + int interval = 0; + Origin origin = Origin.dogstatsd; + + Metric(PayloadBuilder pb, int type, String name) { + this.pb = pb; + this.type = type; + this.name = name; + } + + /** + * Set tags for this metric. + * + * @param tags List of tags to apply to this metric, or null for no tags. + * @return This. + */ + public T setTags(List tags) { + this.tags = tags; + return self(); + } + + /** + * Set resources for this metric. + * + * @param resources List of even length, containing zero or more (type, name) pairs, or null for + * no resources. + * @return This. + */ + public T setResources(List resources) { + if (resources != null && resources.size() % 2 != 0) { + throw new IllegalArgumentException("resources must contain even number of elements"); + } + this.resources = resources; + return self(); + } + + /** + * Set aggregation interval setting for this metric. + * + * @param interval Aggregation interval in seconds. + * @return This. + */ + public T setInterval(int interval) { + this.interval = interval; + return self(); + } + + abstract T self(); + + abstract void encodeValues(ValueType valueType); + + void encodeIndependentFields() { + ColumnarBuffer r = pb.currentRecord(); + ValueType valueType = PointKind.of(pb.values).toValueType(); + r.putUint64(Column.types, type | valueType.flag()); + r.putUint64(Column.intervals, interval); + r.putSint64(Column.sourceTypeNameRefs, 0); + encodeValues(valueType); + } + + void clearDependentFields() { + ColumnarBuffer r = pb.currentRecord(); + r.clear(Column.dictNameStr); + r.clear(Column.nameRefs); + r.clear(Column.dictTagStr); + r.clear(Column.dictTagsets); + r.clear(Column.tagsetRefs); + r.clear(Column.dictResourceStr); + r.clear(Column.dictResourceLen); + r.clear(Column.dictResourceType); + r.clear(Column.dictResourceName); + r.clear(Column.resourcesRefs); + r.clear(Column.timestamps); + r.clear(Column.dictOriginInfo); + r.clear(Column.originInfoRefs); + } + + void encodeDependentFields() { + pb.encodeName(name); + pb.encodeTags(tags); + pb.encodeResources(resources); + pb.encodeTimestamps(); + pb.encodeOrigin(origin); + } + + /** Finish this timeseries and add it to the payload. */ + public void close() { + pb.endMetric(); + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Origin.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Origin.java new file mode 100644 index 00000000..810c935a --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/Origin.java @@ -0,0 +1,23 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +enum Origin { + undefined(0, 0, 0), + dogstatsd(10, 10, 0); + + int product; + int category; + int service; + + Origin(int product, int category, int service) { + this.product = product; + this.category = category; + this.service = service; + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilder.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilder.java new file mode 100644 index 00000000..60c9ae8f --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilder.java @@ -0,0 +1,292 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import java.nio.BufferOverflowException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** Build metrics payloads in a format accepted by the agent or the intake. */ +public class PayloadBuilder { + private static final int DEFAULT_MAX_PAYLOAD_SIZE = 256 * 1024; + private static final int METRIC_DATA_FIELD_ID = 3; + + final int maxPayloadSize; + + final ColumnarBuffer payload = new ColumnarBuffer(); + final ColumnarBuffer record = new ColumnarBuffer(); + final LongBuffer timestamps = new LongBuffer(); + final DoubleBuffer values = new DoubleBuffer(); + final LongBuffer counts = new LongBuffer(); + + final PayloadConsumer consumer; + + final Interner nameStrInterner = + new Interner( + "", + new Interner.Encoder() { + @Override + public void encode(String s) { + record.putString(Column.dictNameStr, s); + } + }); + + final Interner tagStrInterner = + new Interner( + "", + new Interner.Encoder() { + @Override + public void encode(String s) { + record.putString(Column.dictTagStr, s); + } + }); + + final Interner> tagsInterner = + new Interner>( + Collections.emptyList(), + new Interner.Encoder>() { + String[] strBuf = new String[0]; + LongBuffer idBuf = new LongBuffer(); + + @Override + public void encode(List val) { + int size = val.size(); + strBuf = val.toArray(strBuf); + Arrays.sort(strBuf, 0, size); + + idBuf.clear(); + for (int i = 0; i < size; i++) { + idBuf.put(tagStrInterner.intern(strBuf[i])); + } + idBuf.sort(); + Arrays.fill(strBuf, null); + + DeltaEncoder enc = new DeltaEncoder(); + + record.putSint64(Column.dictTagsets, size); + for (int i = 0; i < size; i++) { + record.putSint64(Column.dictTagsets, enc.encode(idBuf.get(i))); + } + } + }); + + final Interner resourceStrInterner = + new Interner( + "", + new Interner.Encoder() { + @Override + public void encode(String s) { + record.putString(Column.dictResourceStr, s); + } + }); + + final Interner> resourcesInterner = + new Interner>( + Collections.emptyList(), + new Interner.Encoder>() { + @Override + public void encode(List val) { + int size = val.size() / 2; + DeltaEncoder dt = new DeltaEncoder(); + DeltaEncoder dn = new DeltaEncoder(); + record.putUint64(Column.dictResourceLen, size); + for (int i = 0; i < size; i++) { + record.putSint64( + Column.dictResourceType, + dt.encode(resourceStrInterner.intern(val.get(2 * i)))); + record.putSint64( + Column.dictResourceName, + dn.encode(resourceStrInterner.intern(val.get(2 * i + 1)))); + } + } + }); + + final Interner originInfoInterner = + new Interner( + Origin.undefined, + new Interner.Encoder() { + @Override + public void encode(Origin o) { + record.putUint64(Column.dictOriginInfo, o.product); + record.putUint64(Column.dictOriginInfo, o.category); + record.putUint64(Column.dictOriginInfo, o.service); + } + }); + + final DeltaEncoder nameRefsDelta = new DeltaEncoder(); + final DeltaEncoder tagsetRefsDelta = new DeltaEncoder(); + final DeltaEncoder resourcesRefsDelta = new DeltaEncoder(); + final DeltaEncoder originInfoRefsDelta = new DeltaEncoder(); + final DeltaEncoder timestampsDelta = new DeltaEncoder(); + + Metric metricInProgress; + + /** + * Create new PayloadBuilder. + * + * @param consumer Is given payloads one by one as they are finished. + */ + public PayloadBuilder(PayloadConsumer consumer) { + this.consumer = consumer; + this.maxPayloadSize = DEFAULT_MAX_PAYLOAD_SIZE; + } + + /** + * Begin encoding new count metric. + * + *

Only one metric can be encoded at a time. + * + * @param name Name of the metric. + * @return Builder instance. + */ + public ScalarMetric count(String name) { + ScalarMetric m = new ScalarMetric(this, 1, name); + beginMetric(m); + return m; + } + + /** + * Begin encoding new rate metric. + * + *

Only one metric can be encoded at a time. + * + * @param name Name of the metric. + * @return New builder instance. + */ + public ScalarMetric rate(String name) { + ScalarMetric m = new ScalarMetric(this, 2, name); + beginMetric(m); + return m; + } + + /** + * Begin encoding new gauge metric. + * + *

Only one metric can be encoded at a time. + * + * @param name Name of the metric. + * @return New builder instance. + */ + public ScalarMetric gauge(String name) { + ScalarMetric m = new ScalarMetric(this, 3, name); + beginMetric(m); + return m; + } + + /** + * Begin encoding new sketch metric. + * + *

Only one metric can be encoded at a time. + * + * @param name Name of the metric. + * @return New builder instance. + */ + public SketchMetric sketch(String name) { + SketchMetric m = new SketchMetric(this, 4, name); + beginMetric(m); + return m; + } + + void beginMetric(Metric m) { + endMetric(); + metricInProgress = m; + } + + void endMetric() { + Metric m = metricInProgress; + if (m == null) { + return; + } + + try { + m.encodeIndependentFields(); + m.encodeDependentFields(); + + if (record.length() > maxPayloadSize) { + throw new BufferOverflowException(); + } + + if (payload.length() + record.length() > maxPayloadSize) { + flushPayload(); + // Flush clears interners and delta-encoders, so we need to re-encode some + // columns. + m.clearDependentFields(); + m.encodeDependentFields(); + } + payload.put(record); + } finally { + record.clear(); + timestamps.clear(); + values.clear(); + counts.clear(); + metricInProgress = null; + } + } + + ColumnarBuffer currentRecord() { + return record; + } + + void encodeName(String name) { + long id = nameStrInterner.intern(name); + record.putSint64(Column.nameRefs, nameRefsDelta.encode(id)); + } + + void encodeTimestamps() { + for (int i = 0; i < timestamps.length(); i++) { + record.putSint64(Column.timestamps, timestampsDelta.encode(timestamps.get(i))); + } + } + + void encodeTags(List tags) { + long id = tagsInterner.intern(tags); + record.putSint64(Column.tagsetRefs, tagsetRefsDelta.encode(id)); + } + + void encodeResources(List resources) { + long id = resourcesInterner.intern(resources); + record.putSint64(Column.resourcesRefs, resourcesRefsDelta.encode(id)); + } + + void encodeOrigin(Origin origin) { + long id = originInfoInterner.intern(origin); + record.putSint64(Column.originInfoRefs, originInfoRefsDelta.encode(id)); + } + + void flushPayload() { + int dataLen = payload.length(); + if (dataLen > 0) { + ByteBuffer p = new ByteBuffer(ProtoUtil.fieldLen(METRIC_DATA_FIELD_ID, dataLen)); + p.putBytesFieldHeader(METRIC_DATA_FIELD_ID, dataLen); + payload.renderProtobufTo(p); + consumer.handle(p.toArray()); + } + + payload.clear(); + + nameStrInterner.clear(); + tagStrInterner.clear(); + tagsInterner.clear(); + resourceStrInterner.clear(); + resourcesInterner.clear(); + originInfoInterner.clear(); + + nameRefsDelta.clear(); + tagsetRefsDelta.clear(); + resourcesRefsDelta.clear(); + originInfoRefsDelta.clear(); + timestampsDelta.clear(); + } + + /** Finish any pending data. */ + public void close() { + endMetric(); + flushPayload(); + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PayloadConsumer.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PayloadConsumer.java new file mode 100644 index 00000000..594afb03 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PayloadConsumer.java @@ -0,0 +1,18 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +/** Consumes payloads from the PayloadBuilder. */ +public interface PayloadConsumer { + /** + * Called when payload builder finishes another payload. + * + * @param payload Completed payload. + */ + void handle(byte[] payload); +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PointKind.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PointKind.java new file mode 100644 index 00000000..277f4117 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/PointKind.java @@ -0,0 +1,81 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +enum PointKind { + zero(0), + int24(1), + float32(2), + int48(3), + float64(4); + + private final int rank; + + PointKind(int rank) { + this.rank = rank; + } + + static PointKind of(double v) { + if (v == 0) { + return PointKind.zero; + } + + // Integers in this range can still fit into float32 column if needed. + final long maxInt24 = 1L << 24; + final long minInt24 = -1L << 24; + // Integers in this range encode to 7 byte varints or less. + final long maxInt48 = 1L << 48; + final long minInt48 = -1L << 48; + final long i = (long) v; + if ((double) i == v) { + if (i >= minInt24 && i <= maxInt24) { + return PointKind.int24; + } + if (i >= minInt48 && i <= maxInt48) { + return PointKind.int48; + } + } + final float f = (float) v; + if ((double) f == v) { + return PointKind.float32; + } + return PointKind.float64; + } + + static PointKind of(DoubleBuffer values) { + PointKind kind = PointKind.zero; + for (int i = 0; i < values.length(); i++) { + kind = kind.union(of(values.get(i))); + } + return kind; + } + + PointKind union(PointKind o) { + if ((this == int48 && o == float32) || (this == float32 && o == int48)) { + return PointKind.float64; + } + if (o.rank > rank) { + return o; + } + return this; + } + + ValueType toValueType() { + switch (this) { + case zero: + return ValueType.zero; + case int24: + case int48: + return ValueType.sint64; + case float32: + return ValueType.float32; + default: + return ValueType.float64; + } + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ProtoUtil.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ProtoUtil.java new file mode 100644 index 00000000..7928c7a2 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ProtoUtil.java @@ -0,0 +1,29 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +class ProtoUtil { + static final int ID_SHIFT = 3; + static final int TYPE_BYTES = 2; + + static int varintLen(long v) { + if (v == 0) { + return 1; + } + int n = 64 - Long.numberOfLeadingZeros(v); + return (n + 6) / 7; + } + + static int fieldLen(int id, int len) { + return varintLen(id << ID_SHIFT) + varintLen(len) + len; + } + + static long bytesFieldHeader(int id) { + return (id << ID_SHIFT) | TYPE_BYTES; + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ScalarMetric.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ScalarMetric.java new file mode 100644 index 00000000..2433b0ca --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ScalarMetric.java @@ -0,0 +1,60 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +/** Builder for scalar timeseries. */ +public class ScalarMetric extends Metric { + ScalarMetric(PayloadBuilder pb, int type, String name) { + super(pb, type, name); + } + + @Override + protected ScalarMetric self() { + return this; + } + + /** + * Add new data point to the timeseries. + * + * @param timestamp Timestamp of the point in seconds since Unix epoch. + * @param value Metric value at timestamp. + * @return This. + */ + public ScalarMetric addPoint(long timestamp, double value) { + pb.timestamps.put(timestamp); + pb.values.put(value); + return this; + } + + @Override + void encodeValues(ValueType valueType) { + ColumnarBuffer r = pb.currentRecord(); + + r.putUint64(Column.numPoints, pb.values.length()); + + switch (valueType) { + case zero: + break; + case sint64: + for (int i = 0; i < pb.values.length(); i++) { + r.putSint64(Column.valsSint64, (long) pb.values.get(i)); + } + break; + case float32: + for (int i = 0; i < pb.values.length(); i++) { + r.putFloat32(Column.valsFloat32, (float) pb.values.get(i)); + } + break; + case float64: + for (int i = 0; i < pb.values.length(); i++) { + r.putFloat64(Column.valsFloat64, pb.values.get(i)); + } + break; + } + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java new file mode 100644 index 00000000..e418ae60 --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/SketchMetric.java @@ -0,0 +1,116 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +/** Builder for sketch timeseries. */ +public class SketchMetric extends Metric { + private static final int VALUES_PER_SKETCH_POINT = 3; + + SketchMetric(PayloadBuilder pb, int type, String name) { + super(pb, type, name); + } + + @Override + protected SketchMetric self() { + return this; + } + + /** + * Add a new timeseries point. + * + * @param timestamp Timestamp of the point in seconds since Unix epoch. + * @param sum Total sum of all observed values. + * @param min Minimum observed value. + * @param max Maximum observed value. + * @param cnt Number of observed values. + * @param binKeys Array of keys for each bin in the sketch. + * @param binCnts Array of number of observations for each bin. + * @return This. + */ + public SketchMetric addPoint( + long timestamp, + double sum, + double min, + double max, + long cnt, + int[] binKeys, + int[] binCnts) { + + if (binKeys.length != binCnts.length) { + throw new IllegalArgumentException("binKeys and binCnts must have the same length"); + } + + pb.timestamps.put(timestamp); + pb.values.put(sum); + pb.values.put(min); + pb.values.put(max); + pb.counts.put(cnt); + + ColumnarBuffer r = pb.currentRecord(); + DeltaEncoder dk = new DeltaEncoder(); + + r.putUint64(Column.sketchNumBins, binKeys.length); + for (int i = 0; i < binKeys.length; i++) { + r.putSint64(Column.sketchBinKeys, dk.encode(binKeys[i])); + r.putUint64(Column.sketchBinCnts, binCnts[i]); + } + + return this; + } + + @Override + void encodeValues(ValueType valueType) { + ColumnarBuffer r = pb.currentRecord(); + + r.putUint64(Column.numPoints, pb.counts.length()); + + switch (valueType) { + case zero: + for (int i = 0; i < pb.counts.length(); i++) { + r.putSint64(Column.valsSint64, pb.counts.get(i)); + } + break; + case sint64: + for (int i = 0; i < pb.counts.length(); i++) { + r.putSint64( + Column.valsSint64, (long) pb.values.get(VALUES_PER_SKETCH_POINT * i)); + r.putSint64( + Column.valsSint64, + (long) pb.values.get(VALUES_PER_SKETCH_POINT * i + 1)); + r.putSint64( + Column.valsSint64, + (long) pb.values.get(VALUES_PER_SKETCH_POINT * i + 2)); + r.putSint64(Column.valsSint64, pb.counts.get(i)); + } + break; + case float32: + for (int i = 0; i < pb.counts.length(); i++) { + r.putFloat32( + Column.valsFloat32, (float) pb.values.get(VALUES_PER_SKETCH_POINT * i)); + r.putFloat32( + Column.valsFloat32, + (float) pb.values.get(VALUES_PER_SKETCH_POINT * i + 1)); + r.putFloat32( + Column.valsFloat32, + (float) pb.values.get(VALUES_PER_SKETCH_POINT * i + 2)); + r.putSint64(Column.valsSint64, pb.counts.get(i)); + } + break; + case float64: + for (int i = 0; i < pb.counts.length(); i++) { + r.putFloat64(Column.valsFloat64, pb.values.get(VALUES_PER_SKETCH_POINT * i)); + r.putFloat64( + Column.valsFloat64, pb.values.get(VALUES_PER_SKETCH_POINT * i + 1)); + r.putFloat64( + Column.valsFloat64, pb.values.get(VALUES_PER_SKETCH_POINT * i + 2)); + r.putSint64(Column.valsSint64, pb.counts.get(i)); + } + break; + } + } +} diff --git a/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ValueType.java b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ValueType.java new file mode 100644 index 00000000..cfe2ba9d --- /dev/null +++ b/dogstatsd-http-serializer/src/main/java/com/datadoghq/dogstatsd/http/serializer/ValueType.java @@ -0,0 +1,25 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +enum ValueType { + zero(0), + sint64(0x10), + float32(0x20), + float64(0x30); + + private final int flag; + + ValueType(int flag) { + this.flag = flag; + } + + int flag() { + return flag; + } +} diff --git a/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java b/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java new file mode 100644 index 00000000..997c58d3 --- /dev/null +++ b/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/PayloadBuilderTest.java @@ -0,0 +1,308 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import org.junit.Test; + +public class PayloadBuilderTest { + @Test + public void simple() { + final ArrayList payloads = new ArrayList<>(); + PayloadBuilder b = + new PayloadBuilder( + new PayloadConsumer() { + @Override + public void handle(byte[] p) { + payloads.add(p); + } + }); + + b.count("unused") + .setTags(Arrays.asList(new String[] {"foo", "bar"})) + .addPoint(100, 1) + .close(); + b.flushPayload(); + + b.count("abc") + .setTags(Arrays.asList(new String[] {"foo", "bar"})) + .setResources(Arrays.asList(new String[] {"host", ""})) + .addPoint(100, 1) + .addPoint(110, 2) + .close(); + + b.gauge("defgh").addPoint(100, 0).close(); + + b.sketch("ijk") + .setTags(Arrays.asList(new String[] {"foo", "baz"})) + .addPoint(100, 4.75, 1.25, 1.75, 3, new int[] {1351, 1373}, new int[] {1, 2}) + .addPoint(110, 6.5, 2.25, 2.75, 5, new int[] {1389, 1402}, new int[] {2, 3}) + .close(); + + b.rate("lm").setInterval(10).addPoint(100, 3.14).close(); + + b.close(); + + assertEquals(2, payloads.size()); + byte[] p = payloads.get(1); + + TestUtil.assertPayload( + p, + new int[] { + // MetricData + (3 << 3) | 2, + 188, + 1, + // dictNameStr + (1 << 3) | 2, + 17, + 3, + 97, + 98, + 99, // abc + 5, + 100, + 101, + 102, + 103, + 104, // defgh + 3, + 105, + 106, + 107, // ijk + 2, + 108, + 109, // lm + // dictTagsStr + (2 << 3) | 2, + 12, + 3, + 98, + 97, + 114, // bar + 3, + 102, + 111, + 111, // foo + 3, + 98, + 97, + 122, // baz + // dictTagsets + (3 << 3) | 2, + 6, + 4, + 2, + 2, + 4, + 4, + 2, + // dictResourcesStr + (4 << 3) | 2, + 5, + 4, + 104, + 111, + 115, + 116, // host + // dictResourcesLen + (5 << 3) | 2, + 1, + 1, + // dictResourcesType + (6 << 3) | 2, + 1, + 2, + // dictResourcesName + (7 << 3) | 2, + 1, + 0, + // dictSourceTypeName is empty + // dictOrigin + (9 << 3) | 2, + 3, + 10, + 10, + 0, + // types + (10 << 3) | 2, + 4, + 0x11, + 0x03, + 0x24, + 0x32, + // names + (11 << 3) | 2, + 4, + 2, + 2, + 2, + 2, + // tags + (12 << 3) | 2, + 4, + 2, + 1, + 4, + 3, + // resources + (13 << 3) | 2, + 4, + 2, + 1, + 0, + 0, + // intervals + (14 << 3) | 2, + 4, + 0, + 0, + 0, + 10, + // numPoints + (15 << 3) | 2, + 4, + 2, + 1, + 2, + 1, + // timestamps + (16 << 3) | 2, + 1, + 7, + 200, + 1, + 20, + 19, + 0, + 20, + 19, + // valsSint64 + (17 << 3) | 2, + 1, + 4, + 2, + 4, + 6, + 10, + // valsFloat32, list(pack(' payloads = new ArrayList<>(); + PayloadBuilder b = + new PayloadBuilder( + new PayloadConsumer() { + @Override + public void handle(byte[] p) { + payloads.add(p); + } + }); + + for (int i = 0; i < 1000; i++) { + ScalarMetric g = b.gauge(String.format("custom.metric.%d", i)); + g.setTags(Arrays.asList(new String[] {String.format("foobar.%d", i)})); + for (int j = 0; j < 100; j++) { + g.addPoint(100 + j * 10, (double) j); + } + g.close(); + } + b.close(); + + assertEquals(2, payloads.size()); + + for (byte[] p : payloads) { + assertTrue(p.length <= b.maxPayloadSize); + } + } +} diff --git a/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/PointKindTest.java b/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/PointKindTest.java new file mode 100644 index 00000000..920cfd34 --- /dev/null +++ b/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/PointKindTest.java @@ -0,0 +1,34 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class PointKindTest { + @Test + public void simple() { + assertEquals(ValueType.zero, typeOf(0, -0)); + assertEquals(ValueType.sint64, typeOf(-10, 0, 10)); + assertEquals(ValueType.sint64, typeOf(-1 << 32, 0, 1 << 32)); + assertEquals(ValueType.float32, typeOf(-10, -0.5, 0, 1.25, 10)); + assertEquals(ValueType.float64, typeOf(3.14159, 0)); + // Large integer values should not be represented as float32 to avoid truncation. + assertEquals(ValueType.float64, typeOf(-1L << 30, 1.5)); + assertEquals(ValueType.float64, typeOf(1.5, -1L << 30)); + } + + static ValueType typeOf(double... values) { + DoubleBuffer buf = new DoubleBuffer(); + for (double v : values) { + buf.put(v); + } + return PointKind.of(buf).toValueType(); + } +} diff --git a/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/TestUtil.java b/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/TestUtil.java new file mode 100644 index 00000000..0de6793f --- /dev/null +++ b/dogstatsd-http-serializer/src/test/java/com/datadoghq/dogstatsd/http/serializer/TestUtil.java @@ -0,0 +1,144 @@ +/* Unless explicitly stated otherwise all files in this repository are + * licensed under the Apache 2.0 License. + * + * This product includes software developed at Datadog + * (https://www.datadoghq.com/) Copyright 2026 Datadog, Inc. + */ + +package com.datadoghq.dogstatsd.http.serializer; + +import static org.junit.Assert.fail; + +import java.util.Formatter; + +class TestUtil { + static void hexdump(String prefix, Formatter out, byte[] p, int base, int len) { + for (int i = 0; i < len; i++) { + if (i % 16 == 0) { + out.format("%s", prefix); + } + if (base + i >= p.length) { + out.format("unexpected end of field\n"); + return; + } + out.format(" %02x", p[base + i]); + if ((i + 1) % 16 == 0 || i == len - 1) { + for (int j = i % 16; j < 16; j++) { + out.format(" "); + } + out.format(" "); + for (int j = i - i % 16; j <= i; j++) { + if (p[base + j] >= 32 && p[base + j] < 127) { + out.format("%c", p[base + j]); + } else { + out.format("."); + } + } + out.format("\n"); + } + } + } + + static class Varint { + int len; + int val; + + void read(byte[] p, int base) { + len = 0; + val = 0; + for (int sw = 0; base + len < p.length; sw += 7) { + int b = p[base + len++]; + if (b < 0) b += 256; + val |= (b & 127) << sw; + if ((b & 128) == 0) { + break; + } + } + } + } + + static final String indent[] = new String[] {"", " "}; + + static void protodump(Formatter out, byte[] p, int base, int len, int depth) { + Varint var = new Varint(); + + for (int idx = 0; idx < len; ) { + int start = idx; + var.read(p, base + idx); + int id = var.val >> 3; + int ty = var.val & 3; + if (var.len == 0) { + out.format("unexpected end of message\n"); + return; + } + idx += var.len; + String prefix = String.format("%s(%2d) ", indent[depth], id); + if (ty != ProtoUtil.TYPE_BYTES) { + out.format("%sunexpected data type %x at offset %d\n", prefix, ty, base + idx); + hexdump(prefix, out, p, base + start, idx - start); + return; + } + + var.read(p, base + idx); + idx += var.len; + int flen = var.val; + if (var.len == 0) { + out.format("unexpected end of message\n"); + return; + } + if (depth == 0) { + hexdump(prefix, out, p, base + start, idx - start); + protodump(out, p, base + idx, flen, depth + 1); + } else { + hexdump(prefix, out, p, base + start, flen + idx - start); + } + idx += flen; + } + } + + static String protodump(byte[] p) { + Formatter out = new Formatter(); + protodump(out, p, 0, p.length, 0); + return out.out().toString(); + } + + static String protodump(int[] p) { + byte[] pb = new byte[p.length]; + for (int i = 0; i < p.length; i++) pb[i] = (byte) p[i]; + return protodump(pb); + } + + static void formatTwoCols(Formatter out, String hl, String hr, String dl, String dr) { + final String fmt = "%-80s%3s%-80s\n"; + out.format(fmt, hl, "", hr); + String[] linesl = dl.split("\n"); + String[] linesr = dr.split("\n"); + int i = 0; + for (; i < linesl.length && i < linesr.length; i++) { + String l = linesl[i]; + String r = linesr[i]; + out.format(fmt, l, l.equals(r) ? "" : "≠ ", r); + } + for (; i < linesl.length; i++) { + out.format(fmt, linesl[i], "", ""); + } + for (; i < linesr.length; i++) { + out.format(fmt, "", "", linesr[i]); + } + } + + // expected is int[] to be able to write unsigned byte values wtihout conversion. + static void assertPayload(byte[] got, int[] expected) { + boolean same = got.length == expected.length; + for (int i = 0; same && i < got.length; i++) { + same &= got[i] == (byte) expected[i]; + } + if (!same) { + Formatter out = new Formatter(); + out.format("payloads do not match:\n"); + formatTwoCols(out, "GOT", "EXPECTED", protodump(got), protodump(expected)); + out.format("(field id in parens, fields printed including header and length)"); + fail(out.out().toString()); + } + } +}