From 24b5cf2dcaf26652056813c18a064f51fce2b24b Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Sat, 9 May 2026 11:11:55 +0800 Subject: [PATCH] [fix][broker] URL-encode subscription name in pending-ack topic to support '/' in sub names Subscription names containing '/' caused TopicName.get() to throw IllegalArgumentException ("V1 topic names are no longer supported") when opening the transaction pending-ack store, because the extra slashes created four path segments instead of three. Fix: URL-encode the subscription name in MLPendingAckStore.getTransactionPendingAckStoreSuffix() before appending it to the topic name. TopicName always Codec.decode()s the local-name on parse and re-encodes it on output, so there is no double-encoding. Also remove the pre-emptive Codec.encode() call in PersistentTopic.unsubscribe() that was added as a workaround. --- .../broker/admin/v2/ScalableTopics.java | 1 + .../service/persistent/PersistentTopic.java | 3 +- .../pendingack/impl/MLPendingAckStore.java | 11 +++++-- .../impl/MLPendingAckStoreTest.java | 30 +++++++++++++++++++ 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 6b73fdf00d6c0..81a55da7f7079 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -58,6 +58,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; +import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentTopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bf795e85c3df7..6f0a8cea54abd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1331,8 +1331,7 @@ public CompletableFuture unsubscribe(String subscriptionName) { CompletableFuture unsubscribeFuture = new CompletableFuture<>(); TopicName tn = TopicName.get(MLPendingAckStore - .getTransactionPendingAckStoreSuffix(topic, - Codec.encode(subscriptionName))); + .getTransactionPendingAckStoreSuffix(topic, subscriptionName)); if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) { ManagedLedgerConfig managedLedgerConfig = ledger.getConfig(); ManagedLedgerFactory managedLedgerFactory = getBrokerService() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index e10a93cbc1a6e..c3524e1c2ea0f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; @@ -524,6 +525,12 @@ public CompletableFuture getManagedLedger() { public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) { TopicName origin = TopicName.get(originTopicName); + // URL-encode the subscription name so that any '/' characters it contains do not create + // extra path segments when the resulting string is parsed as a topic name. TopicName + // always decodes the local-name component on parse (via Codec.decode) and re-encodes it + // on output (via getEncodedLocalName / getPersistenceNamingEncoding), so encoding here + // produces a valid round-trip with no double-encoding. + String encodedSubName = Codec.encode(subName); // Segment topics ("segment://tenant/ns/topic/--") cannot // host a derived pending-ack topic in the segment domain — the descriptor parser would // reject any name with extra dashes appended. Map to a flat persistent topic in the same @@ -532,9 +539,9 @@ public static String getTransactionPendingAckStoreSuffix(String originTopicName, return String.format("persistent://%s/%s/%s-%s-%s%s", origin.getTenant(), origin.getNamespacePortion(), origin.getLocalName(), origin.getSegmentDescriptor(), - subName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX); + encodedSubName, SystemTopicNames.PENDING_ACK_STORE_SUFFIX); } - return origin + "-" + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX; + return origin + "-" + encodedSubName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX; } public static String getTransactionPendingAckStoreCursorName() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java index b41a9feaefcdc..b20e44d6dc09a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java @@ -45,6 +45,9 @@ import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck; import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.naming.SystemTopicNames; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.awaitility.Awaitility; @@ -140,6 +143,33 @@ private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig txnLo return (MLPendingAckStore) mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get(); } + @Test + public void testPendingAckStoreWithSlashSubscriptionName() throws Exception { + String slashSubName = "tenant/namespace/my-function"; + when(persistentSubscriptionMock.getName()).thenReturn(slashSubName); + + MLPendingAckStoreProvider provider = new MLPendingAckStoreProvider(); + + // Should not throw — subscription names containing '/' must be URL-encoded so the + // resulting pending-ack topic name is a valid V2 persistent topic name. + MLPendingAckStore store = (MLPendingAckStore) provider.newPendingAckStore(persistentSubscriptionMock).get(); + + // Verify the managed ledger persistence path encodes the subscription name correctly. + // Expected: tenant/namespace/persistent/ + // where localName = "-__transaction_pending_ack" + String originTopicName = persistentSubscriptionMock.getTopic().getName(); + TopicName origin = TopicName.get(originTopicName); + String encodedSubName = Codec.encode(slashSubName); + String expectedLocalName = origin.getLocalName() + "-" + encodedSubName + + SystemTopicNames.PENDING_ACK_STORE_SUFFIX; + // getPersistenceNamingEncoding() = tenant/namespace/persistent/encodedLocalName + String expectedMlName = origin.getTenant() + "/" + origin.getNamespacePortion() + + "/persistent/" + Codec.encode(expectedLocalName); + Assert.assertEquals(store.getManagedLedger().get().getName(), expectedMlName); + + closePendingAckStoreWithRetry(store); + } + /** * Overridden cases: * 1. Batched write and replay with batched feature.