diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 3649da70c2ab9..8a58b5bed7a16 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -208,8 +208,13 @@ public boolean isSuccessorTo(Producer other) { public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize, boolean isChunked, boolean isMarker, Position position) { - if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) { - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position); + MessagePublishContext messagePublishContext = + MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), + batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); + if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, position, messagePublishContext)) { + publishMessageToTopic(headersAndPayload, messagePublishContext); + } else { + messagePublishContext.recycle(); } } @@ -223,14 +228,18 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS }); return; } - if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) { - publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, - isMarker, position); + MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, + highestSequenceId, headersAndPayload.readableBytes(), batchSize, + isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); + if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, position, messagePublishContext)) { + publishMessageToTopic(headersAndPayload, messagePublishContext); + } else { + messagePublishContext.recycle(); } } - public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, int batchSize, - Position position) { + public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, + Position position, PublishContext publishContext) { if (!isShadowTopic && position != null) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, @@ -266,10 +275,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } if (topic.isEncryptionRequired()) { - - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); + MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload); int encryptionKeysCount = msgMetadata.getEncryptionKeysCount(); // Check whether the message is encrypted or not if (encryptionKeysCount < 1) { @@ -283,7 +289,7 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } } - startPublishOperation((int) batchSize, headersAndPayload.readableBytes()); + startPublishOperation((int) publishContext.getNumberOfMessages(), headersAndPayload.readableBytes()); return true; } @@ -292,26 +298,9 @@ private boolean isSupportsReplDedupByLidAndEid() { return cnx.isClientSupportsReplDedupByLidAndEid() && topic.isPersistent(); } - private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, int batchSize, boolean isChunked, - boolean isMarker, Position position) { - MessagePublishContext messagePublishContext = - MessagePublishContext.get(this, sequenceId, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); - if (brokerInterceptor != null) { - brokerInterceptor - .onMessagePublish(this, headersAndPayload, messagePublishContext); - } - topic.publishMessage(headersAndPayload, messagePublishContext); - } - - private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, - int batchSize, boolean isChunked, boolean isMarker, Position position) { - MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, - highestSequenceId, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker, position, isSupportsReplDedupByLidAndEid()); + private void publishMessageToTopic(ByteBuf headersAndPayload, MessagePublishContext messagePublishContext) { if (brokerInterceptor != null) { - brokerInterceptor - .onMessagePublish(this, headersAndPayload, messagePublishContext); + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); } topic.publishMessage(headersAndPayload, messagePublishContext); } @@ -412,6 +401,7 @@ private static final class MessagePublishContext implements PublishContext, Runn private long originalHighestSequenceId; private long entryTimestamp; + private MessageMetadata messageMetadata; @Override public long getLedgerId() { @@ -467,6 +457,15 @@ public long getHighestSequenceId() { return highestSequenceId; } + @Override + public MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) { + if (messageMetadata == null) { + messageMetadata = new MessageMetadata(); + Commands.peekMessageMetadata(headersAndPayload, messageMetadata); + } + return messageMetadata; + } + @Override public void setOriginalProducerName(String originalProducerName) { this.originalProducerName = originalProducerName; @@ -629,11 +628,14 @@ static MessagePublishContext get(Producer producer, long sequenceId, int msgSize callback.chunked = chunked; callback.originalProducerName = null; callback.originalSequenceId = -1L; + callback.highestSequenceId = -1L; + callback.originalHighestSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); + callback.messageMetadata = null; if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -651,12 +653,14 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long callback.batchSize = batchSize; callback.originalProducerName = null; callback.originalSequenceId = -1L; + callback.originalHighestSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; callback.supportsReplDedupByLidAndEid = supportsReplDedupByLidAndEid; callback.ledgerId = position == null ? -1 : position.getLedgerId(); callback.entryId = position == null ? -1 : position.getEntryId(); + callback.messageMetadata = null; if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -703,6 +707,7 @@ public void recycle() { startTimeNs = -1L; chunked = false; isMarker = false; + messageMetadata = null; if (propertyMap != null) { propertyMap.clear(); } @@ -868,16 +873,16 @@ public void checkEncryption() { public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, ByteBuf headersAndPayload, int batchSize, boolean isChunked, boolean isMarker) { - if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null)) { - return; - } MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null, cnx.isClientSupportsReplDedupByLidAndEid()); + if (!checkAndStartPublish(producerId, sequenceId, headersAndPayload, null, messagePublishContext)) { + messagePublishContext.recycle(); + return; + } if (brokerInterceptor != null) { - brokerInterceptor - .onMessagePublish(this, headersAndPayload, messagePublishContext); + brokerInterceptor.onMessagePublish(this, headersAndPayload, messagePublishContext); } topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 569cbab1e07d9..9fd56b1593f9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2193,19 +2193,20 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { } private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) { - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); - log.debug() - .attr("producerId", send.getProducerId()) - .attr("sendSequenceId", send.getSequenceId()) - .attr("producerName", msgMetadata.getProducerName()) - .attr("metadataSequenceId", msgMetadata.getSequenceId()) - .attr("readableBytes", headersAndPayload.readableBytes()) - .attr("partitionKey", msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null) - .attr("orderingKey", msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null) - .attr("uncompressedSize", msgMetadata.getUncompressedSize()) - .log("Received send message request"); + log.debug(event -> { + headersAndPayload.markReaderIndex(); + MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); + headersAndPayload.resetReaderIndex(); + event.attr("producerId", send.getProducerId()) + .attr("sendSequenceId", send.getSequenceId()) + .attr("producerName", msgMetadata.getProducerName()) + .attr("metadataSequenceId", msgMetadata.getSequenceId()) + .attr("readableBytes", headersAndPayload.readableBytes()) + .attr("partitionKey", msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null) + .attr("orderingKey", msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null) + .attr("uncompressedSize", msgMetadata.getUncompressedSize()) + .log("Received send message request"); + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index fd3ffb1a34c76..87fdd9f024abc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -35,6 +35,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; +import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.EntryFilters; @@ -42,6 +43,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.protocol.schema.SchemaVersion; import org.apache.pulsar.common.util.FutureUtil; @@ -85,6 +87,12 @@ default long getOriginalSequenceId() { default void setMetadataFromEntryData(ByteBuf entryData) { } + default MessageMetadata getMessageMetadata(ByteBuf headersAndPayload) { + MessageMetadata messageMetadata = new MessageMetadata(); + Commands.peekMessageMetadata(headersAndPayload, messageMetadata); + return messageMetadata; + } + default long getHighestSequenceId() { return -1L; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 2db01ccc94624..405e1f2c71c6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -304,9 +304,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade public MessageDupStatus isDuplicateReplV1(PublishContext publishContext, ByteBuf headersAndPayload) { // Message is coming from replication, we need to use the original producer name and sequence id // for the purpose of deduplication and not rely on the "replicator" name. - int readerIndex = headersAndPayload.readerIndex(); - MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.readerIndex(readerIndex); + MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload); String producerName = md.getProducerName(); long sequenceId = md.getSequenceId(); @@ -333,9 +331,7 @@ private void setContextPropsIfRepl(PublishContext publishContext, ByteBuf header if (Producer.isRemoteOrShadow(publishContext.getProducerName(), replicatorPrefix)) { // Message is coming from replication, we need to use the replication's producer name, source cluster's // ledger id and entry id for the purpose of deduplication. - int readerIndex = headersAndPayload.readerIndex(); - MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.readerIndex(readerIndex); + MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload); List kvPairList = md.getPropertiesList(); for (KeyValue kvPair : kvPairList) { @@ -448,9 +444,7 @@ public MessageDupStatus isDuplicateNormal(PublishContext publishContext, ByteBuf long chunkID = -1; long totalChunk = -1; if (publishContext.isChunked()) { - int readerIndex = headersAndPayload.readerIndex(); - MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.readerIndex(readerIndex); + MessageMetadata md = publishContext.getMessageMetadata(headersAndPayload); chunkID = md.getChunkId(); totalChunk = md.getNumChunksFromMsg(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 4d57da30cb2c4..dcd34e375a201 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -651,7 +651,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont decrementPendingWriteOpsAndCheck(); return; } - if (isExceedMaximumDeliveryDelay(headersAndPayload)) { + if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) { publishContext.completed( new NotAllowedException( String.format("Exceeds max allowed delivery delay of %s milliseconds", @@ -4737,7 +4737,7 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon decrementPendingWriteOpsAndCheck(); return; } - if (isExceedMaximumDeliveryDelay(headersAndPayload)) { + if (isExceedMaximumDeliveryDelay(headersAndPayload, publishContext)) { publishContext.completed( new NotAllowedException( String.format("Exceeds max allowed delivery delay of %s milliseconds", @@ -4978,13 +4978,11 @@ public Optional getShadowSourceTopic() { return Optional.ofNullable(shadowSourceTopic); } - protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) { + protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload, PublishContext publishContext) { if (isDelayedDeliveryEnabled()) { long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis(); if (maxDeliveryDelayInMs > 0) { - headersAndPayload.markReaderIndex(); - MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); - headersAndPayload.resetReaderIndex(); + MessageMetadata msgMetadata = publishContext.getMessageMetadata(headersAndPayload); return msgMetadata.hasDeliverAtTime() && msgMetadata.getDeliverAtTime() - msgMetadata.getPublishTime() > maxDeliveryDelayInMs; }