Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1135,18 +1135,25 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
// take into account broker load, the amount of data produced to each partition, etc.).
int partition = partition(record, serializedKey, serializedValue, cluster);

setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
compression.type(), serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();

// Append the record to the accumulator. Note, that the actual partition may be
// calculated there and can be accessed via appendCallbacks.topicPartition.
RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster);
final RecordAccumulator.RecordAppendResult result;
byte[] rawHeaders = record.rawSerializedHeaders();
if (rawHeaders != null) {
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
compression.type(), serializedKey, serializedValue, rawHeaders);
ensureValidRecordSize(serializedSize);
result = accumulator.appendWithRawHeaders(record.topic(), partition, timestamp, serializedKey,
serializedValue, rawHeaders, appendCallbacks, remainingWaitMs, nowMs, cluster);
} else {
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
compression.type(), serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
serializedValue, headers, appendCallbacks, remainingWaitMs, nowMs, cluster);
}
assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

// Add the partition to the transaction (if in progress) after it has been successfully
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;

import java.util.Arrays;
import java.util.Objects;

/**
Expand Down Expand Up @@ -54,6 +55,7 @@ public class ProducerRecord<K, V> {
private final K key;
private final V value;
private final Long timestamp;
private final byte[] rawSerializedHeaders;

/**
* Creates a record with a specified timestamp to be sent to a specified topic and partition
Expand Down Expand Up @@ -81,6 +83,35 @@ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
this.rawSerializedHeaders = null;
}

/**
* Creates a record carrying pre-serialized header bytes. When this constructor is used, the
* producer writes {@code rawSerializedHeaders} directly into the record batch without
* deserializing or re-serializing individual headers. The {@code rawSerializedHeaders} must
* use the standard Kafka header wire format: {@code [count(varint)][header1][header2]...},
* or be empty (length 0) for zero headers.
*
* <p>This is intended for internal use (e.g., Kafka Streams changelog writing) where headers
* are already available in serialized form and the deserialization round-trip should be avoided.
*/
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, byte[] rawSerializedHeaders) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders();
this.rawSerializedHeaders = rawSerializedHeaders;
}

/**
Expand All @@ -94,7 +125,7 @@ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, null);
this(topic, partition, timestamp, key, value, (Iterable<Header>) null);
}

/**
Expand All @@ -107,7 +138,7 @@ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V
* @param headers The headers that will be included in the record
*/
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, null, key, value, headers);
this(topic, partition, (Long) null, key, value, headers);
}

/**
Expand All @@ -119,7 +150,7 @@ public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<
* @param value The record contents
*/
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, null, key, value, null);
this(topic, partition, (Long) null, key, value, (Iterable<Header>) null);
}

/**
Expand All @@ -130,7 +161,7 @@ public ProducerRecord(String topic, Integer partition, K key, V value) {
* @param value The record contents
*/
public ProducerRecord(String topic, K key, V value) {
this(topic, null, null, key, value, null);
this(topic, null, (Long) null, key, value, (Iterable<Header>) null);
}

/**
Expand All @@ -140,7 +171,7 @@ public ProducerRecord(String topic, K key, V value) {
* @param value The record contents
*/
public ProducerRecord(String topic, V value) {
this(topic, null, null, null, value, null);
this(topic, null, (Long) null, null, value, (Iterable<Header>) null);
}

/**
Expand Down Expand Up @@ -185,6 +216,14 @@ public Integer partition() {
return partition;
}

/**
* @return Pre-serialized header bytes in Kafka wire format, or null if headers were
* provided as {@link Headers} objects via the standard constructors.
*/
public byte[] rawSerializedHeaders() {
return rawSerializedHeaders;
}

@Override
public String toString() {
String headers = this.headers == null ? "null" : this.headers.toString();
Expand All @@ -209,7 +248,8 @@ else if (!(o instanceof ProducerRecord))
Objects.equals(topic, that.topic) &&
Objects.equals(headers, that.headers) &&
Objects.equals(value, that.value) &&
Objects.equals(timestamp, that.timestamp);
Objects.equals(timestamp, that.timestamp) &&
Arrays.equals(rawSerializedHeaders, that.rawSerializedHeaders);
}

@Override
Expand All @@ -220,6 +260,7 @@ public int hashCode() {
result = 31 * result + (key != null ? key.hashCode() : 0);
result = 31 * result + (value != null ? value.hashCode() : 0);
result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
result = 31 * result + Arrays.hashCode(rawSerializedHeaders);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value,
}
}

public FutureRecordMetadata tryAppendRawHeaders(long timestamp, byte[] key, byte[] value, byte[] rawSerializedHeaders, Callback callback, long now) {
if (!recordsBuilder.hasRoomFor(timestamp, key, value, rawSerializedHeaders)) {
return null;
} else {
this.recordsBuilder.append(timestamp, key, value, rawSerializedHeaders);
this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
recordsBuilder.compression().type(), key, value, rawSerializedHeaders));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp,
key == null ? -1 : key.length,
value == null ? -1 : value.length,
Time.SYSTEM);
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
}

/**
* This method is only used by {@link #split(int)} when splitting a large batch to smaller ones.
* @return true if the record has been successfully appended, false otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,74 @@ private RecordAppendResult appendNewBatch(String topic,
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes());
}

public RecordAppendResult appendWithRawHeaders(String topic,
int partition,
long timestamp,
byte[] key,
byte[] value,
byte[] rawSerializedHeaders,
AppendCallbacks callbacks,
long maxTimeToBlock,
long nowMs,
Cluster cluster) throws InterruptedException {
TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(createBuiltInPartitioner(logContext, k, batchSize)));

appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (rawSerializedHeaders == null) rawSerializedHeaders = new byte[0];
try {
while (true) {
final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
final int effectivePartition;
if (partition == RecordMetadata.UNKNOWN_PARTITION) {
partitionInfo = topicInfo.builtInPartitioner.peekCurrentPartitionInfo(cluster);
effectivePartition = partitionInfo.partition();
} else {
partitionInfo = null;
effectivePartition = partition;
}

setPartition(callbacks, effectivePartition);

Deque<ProducerBatch> dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());
synchronized (dq) {
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;

RecordAppendResult appendResult = tryAppendRawHeaders(timestamp, key, value, rawSerializedHeaders, callbacks, dq, nowMs);
if (appendResult != null) {
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}

if (buffer == null) {
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
RecordBatch.CURRENT_MAGIC_VALUE, compression.type(), key, value, rawSerializedHeaders));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, effectivePartition, maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();
}

synchronized (dq) {
if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
continue;

RecordAppendResult appendResult = appendNewBatchRawHeaders(topic, effectivePartition, dq, timestamp, key, value, rawSerializedHeaders, callbacks, buffer, nowMs);
if (appendResult.newBatchCreated)
buffer = null;
boolean enableSwitch = allBatchesFull(dq);
topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
return appendResult;
}
}
} finally {
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer) {
return MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compression, TimestampType.CREATE_TIME, 0L);
}
Expand Down Expand Up @@ -440,6 +508,52 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, H
return null;
}

private RecordAppendResult tryAppendRawHeaders(long timestamp, byte[] key, byte[] value, byte[] rawSerializedHeaders,
Callback callback, Deque<ProducerBatch> deque, long nowMs) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
ProducerBatch last = deque.peekLast();
if (last != null) {
int initialBytes = last.estimatedSizeInBytes();
FutureRecordMetadata future = last.tryAppendRawHeaders(timestamp, key, value, rawSerializedHeaders, callback, nowMs);
if (future == null) {
last.closeForRecordAppends();
} else {
int appendedBytes = last.estimatedSizeInBytes() - initialBytes;
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, appendedBytes);
}
}
return null;
}

private RecordAppendResult appendNewBatchRawHeaders(String topic,
int partition,
Deque<ProducerBatch> dq,
long timestamp,
byte[] key,
byte[] value,
byte[] rawSerializedHeaders,
AppendCallbacks callbacks,
ByteBuffer buffer,
long nowMs) {
assert partition != RecordMetadata.UNKNOWN_PARTITION;

RecordAppendResult appendResult = tryAppendRawHeaders(timestamp, key, value, rawSerializedHeaders, callbacks, dq, nowMs);
if (appendResult != null) {
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer);
ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppendRawHeaders(timestamp, key, value, rawSerializedHeaders,
callbacks, nowMs));

dq.addLast(batch);
incomplete.add(batch);

return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, batch.estimatedSizeInBytes());
}

private boolean isMuted(TopicPartition tp) {
return muted.contains(tp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ else if (compressionType != CompressionType.NONE)
return Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value);
}

public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, byte[] key, byte[] value, byte[] rawSerializedHeaders) {
return estimateSizeInBytesUpperBound(magic, compressionType, Utils.wrapNullable(key), Utils.wrapNullable(value), rawSerializedHeaders);
}

public static int estimateSizeInBytesUpperBound(byte magic, CompressionType compressionType, ByteBuffer key,
ByteBuffer value, byte[] rawSerializedHeaders) {
if (magic >= RecordBatch.MAGIC_VALUE_V2)
return DefaultRecordBatch.estimateBatchSizeUpperBound(key, value, rawSerializedHeaders);
else
throw new IllegalArgumentException("Raw serialized headers are only supported for magic >= V2");
}

/**
* Return the size of the record batch header.
*
Expand Down
Loading