Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,13 @@ public boolean isSuccessorTo(Producer other) {

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize,
boolean isChunked, boolean isMarker, Position position) {
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position);
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, position, messagePublishContext)) {
publishMessageToTopic(headersAndPayload, messagePublishContext);
} else {
messagePublishContext.recycle();
}
}

Expand All @@ -223,14 +228,18 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
});
return;
}
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) {
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked,
isMarker, position);
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, position, messagePublishContext)) {
publishMessageToTopic(headersAndPayload, messagePublishContext);
} else {
messagePublishContext.recycle();
}
}

public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize,
Position position) {
public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload,
Position position, PublishContext publishContext) {
Comment thread
dao-jun marked this conversation as resolved.
if (!isShadowTopic && position != null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
Expand Down Expand Up @@ -266,10 +275,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
}

if (topic.isEncryptionRequired()) {

headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload);
int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
// Check whether the message is encrypted or not
if (encryptionKeysCount < 1) {
Expand All @@ -283,7 +289,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he
}
}

startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
startPublishOperation((int) publishContext.getNumberOfMessages(), headersAndPayload.readableBytes());
return true;
}

Expand All @@ -292,26 +298,9 @@ private boolean isSupportsReplDedupByLidAndEid() {
return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent();
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked,
boolean isMarker, Position position) {
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(),
batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId,
int batchSize, boolean isChunked, boolean isMarker, Position position) {
MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId,
highestSequenceId, headersAndPayload.readableBytes(), batchSize,
isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid());
private void publishMessageToTopic(ByteBuf headersAndPayload, MessagePublishContext messagePublishContext) {
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishMessage(headersAndPayload, messagePublishContext);
}
Expand Down Expand Up @@ -412,6 +401,7 @@ private static final class MessagePublishContext implements PublishContext, Runn
private long originalHighestSequenceId;

private long entryTimestamp;
private MessageMetadata messageMetadata;

@Override
public long getLedgerId() {
Expand Down Expand Up @@ -467,6 +457,15 @@ public long getHighestSequenceId() {
return highestSequenceId;
}

@Override
public MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) {
if (messageMetadata == null) {
messageMetadata = new MessageMetadata();
Commands.peekMessageMetadata(headersAndPayload, messageMetadata);
}
return messageMetadata;
}

@Override
public void setOriginalProducerName(String originalProducerName) {
this.originalProducerName = originalProducerName;
Expand Down Expand Up @@ -629,11 +628,14 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize
callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.highestSequenceId = -1L;
callback.originalHighestSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.isMarker = isMarker;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
callback.messageMetadata = null;
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
Expand All @@ -651,12 +653,14 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.originalHighestSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
callback.isMarker = isMarker;
callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid;
callback.ledgerId = position == null ? -1 : position.getLedgerId();
callback.entryId = position == null ? -1 : position.getEntryId();
callback.messageMetadata = null;
if (callback.propertyMap != null) {
callback.propertyMap.clear();
}
Expand Down Expand Up @@ -703,6 +707,7 @@ public void recycle() {
startTimeNs = -1L;
chunked = false;
isMarker = false;
messageMetadata = null;
if (propertyMap != null) {
propertyMap.clear();
}
Expand Down Expand Up @@ -868,16 +873,16 @@ public void checkEncryption() {

public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, int batchSize, boolean isChunked, boolean isMarker) {
if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) {
return;
}
MessagePublishContext messagePublishContext =
MessagePublishContext.get(this, sequenceId, highSequenceId,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null,
cnx.isClientSupportsReplDedupByLidAndEid());
if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, null, messagePublishContext)) {
messagePublishContext.recycle();
return;
}
if (brokerInterceptor != null) {
brokerInterceptor
.onMessagePublish(this, headersAndPayload, messagePublishContext);
brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext);
}
topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2193,19 +2193,20 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
}

private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
log.debug()
.attr("producerId", send.getProducerId())
.attr("sendSequenceId", send.getSequenceId())
.attr("producerName", msgMetadata.getProducerName())
.attr("metadataSequenceId", msgMetadata.getSequenceId())
.attr("readableBytes", headersAndPayload.readableBytes())
.attr("partitionKey", msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null)
.attr("orderingKey", msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null)
.attr("uncompressedSize", msgMetadata.getUncompressedSize())
.log("Received send message request");
log.debug(event -> {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
event.attr("producerId", send.getProducerId())
.attr("sendSequenceId", send.getSequenceId())
.attr("producerName", msgMetadata.getProducerName())
.attr("metadataSequenceId", msgMetadata.getSequenceId())
.attr("readableBytes", headersAndPayload.readableBytes())
.attr("partitionKey", msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null)
.attr("orderingKey", msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null)
.attr("uncompressedSize", msgMetadata.getUncompressedSize())
.log("Received send message request");
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -85,6 +87,12 @@ default long getOriginalSequenceId() {
default void setMetadataFromEntryData(ByteBuf entryData) {
}

default MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) {
MessageMetadata messageMetadata = new MessageMetadata();
Commands.peekMessageMetadata(headersAndPayload, messageMetadata);
return messageMetadata;
}

default long getHighestSequenceId() {
return -1L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
public MessageDupStatus isDuplicateReplV1(PublishContext publishContext, ByteBuf headersAndPayload) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload);

String producerName = md.getProducerName();
long sequenceId = md.getSequenceId();
Expand All @@ -333,9 +331,7 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header
if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) {
// Message is coming from replication, we need to use the replication's producer name, source cluster's
// ledger id and entry id for the purpose of deduplication.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload);

List<KeyValue> kvPairList = md.getPropertiesList();
for (KeyValue kvPair : kvPairList) {
Expand Down Expand Up @@ -448,9 +444,7 @@ public MessageDupStatus isDuplicateNormal(PublishContext publishContext, ByteBuf
long chunkID = -1;
long totalChunk = -1;
if (publishContext.isChunked()) {
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.readerIndex(readerIndex);
MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload);
chunkID = md.getChunkId();
totalChunk = md.getNumChunksFromMsg();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
Expand Down Expand Up @@ -4737,7 +4737,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumDeliveryDelay(headersAndPayload)) {
if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) {
publishContext.completed(
new NotAllowedException(
String.format("Exceeds max allowed delivery delay of %s milliseconds",
Expand Down Expand Up @@ -4978,13 +4978,11 @@ public Optional<TopicName> getShadowSourceTopic() {
return Optional.ofNullable(shadowSourceTopic);
}

protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload, PublishContext publishContext) {
Comment thread
dao-jun marked this conversation as resolved.
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
if (maxDeliveryDelayInMs > 0) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload);
return msgMetadata.hasDeliverAtTime()
&& msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
}
Expand Down
Loading