Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.scalable.SegmentInfo;
Comment thread
shibd marked this conversation as resolved.
import org.apache.pulsar.common.scalable.SegmentTopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1331,8 +1331,7 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();

TopicName tn = TopicName.get(MLPendingAckStore
.getTransactionPendingAckStoreSuffix(topic,
Codec.encode(subscriptionName)));
.getTransactionPendingAckStoreSuffix(topic, subscriptionName));
if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
ManagedLedgerConfig managedLedgerConfig = ledger.getConfig();
ManagedLedgerFactory managedLedgerFactory = getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
Expand Down Expand Up @@ -524,6 +525,12 @@ public CompletableFuture<ManagedLedger> getManagedLedger() {

public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) {
TopicName origin = TopicName.get(originTopicName);
// URL-encode the subscription name so that any '/' characters it contains do not create
// extra path segments when the resulting string is parsed as a topic name. TopicName
// always decodes the local-name component on parse (via Codec.decode) and re-encodes it
// on output (via getEncodedLocalName / getPersistenceNamingEncoding), so encoding here
// produces a valid round-trip with no double-encoding.
String encodedSubName = Codec.encode(subName);
// Segment topics ("segment://tenant/ns/topic/<hexStart>-<hexEnd>-<segmentId>") cannot
// host a derived pending-ack topic in the segment domain — the descriptor parser would
// reject any name with extra dashes appended. Map to a flat persistent topic in the same
Expand All @@ -532,9 +539,9 @@ public static String getTransactionPendingAckStoreSuffix(String originTopicName,
return String.format("persistent://%s/%s/%s-%s-%s%s",
origin.getTenant(), origin.getNamespacePortion(),
origin.getLocalName(), origin.getSegmentDescriptor(),
subName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
encodedSubName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
}
return origin + "-" + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
return origin + "-" + encodedSubName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
}

public static String getTransactionPendingAckStoreCursorName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -140,6 +143,33 @@ private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig txnLo
return (MLPendingAckStore) mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get();
}

@Test
public void testPendingAckStoreWithSlashSubscriptionName() throws Exception {
String slashSubName = "tenant/namespace/my-function";
when(persistentSubscriptionMock.getName()).thenReturn(slashSubName);

MLPendingAckStoreProvider provider = new MLPendingAckStoreProvider();

// Should not throw — subscription names containing '/' must be URL-encoded so the
// resulting pending-ack topic name is a valid V2 persistent topic name.
MLPendingAckStore store = (MLPendingAckStore) provider.newPendingAckStore(persistentSubscriptionMock).get();

// Verify the managed ledger persistence path encodes the subscription name correctly.
// Expected: tenant/namespace/persistent/<encodedLocalName>
// where localName = "<topic>-<encodedSubName>__transaction_pending_ack"
String originTopicName = persistentSubscriptionMock.getTopic().getName();
TopicName origin = TopicName.get(originTopicName);
String encodedSubName = Codec.encode(slashSubName);
String expectedLocalName = origin.getLocalName() + "-" + encodedSubName
+ SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
// getPersistenceNamingEncoding() = tenant/namespace/persistent/encodedLocalName
String expectedMlName = origin.getTenant() + "/" + origin.getNamespacePortion()
+ "/persistent/" + Codec.encode(expectedLocalName);
Assert.assertEquals(store.getManagedLedger().get().getName(), expectedMlName);

closePendingAckStoreWithRetry(store);
}

/**
* Overridden cases:
* 1. Batched write and replay with batched feature.
Expand Down
Loading