Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1277,8 +1277,7 @@ public CompletableFuture<Void> unsubscribe(String subscriptionName) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();

TopicName tn = TopicName.get(MLPendingAckStore
.getTransactionPendingAckStoreSuffix(topic,
Codec.encode(subscriptionName)));
.getTransactionPendingAckStoreSuffix(topic, subscriptionName));
if (brokerService.pulsar().getConfiguration().isTransactionCoordinatorEnabled()) {
ManagedLedgerConfig managedLedgerConfig = ledger.getConfig();
ManagedLedgerFactory managedLedgerFactory = getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
Expand Down Expand Up @@ -516,7 +517,13 @@ public CompletableFuture<ManagedLedger> getManagedLedger() {
}

public static String getTransactionPendingAckStoreSuffix(String originTopicName, String subName) {
return TopicName.get(originTopicName) + "-" + subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
// URL-encode the subscription name so that any '/' characters it contains do not create
// extra path segments when the resulting string is parsed as a topic name. TopicName
// always decodes the local-name component on parse (via Codec.decode) and re-encodes it
// on output (via getEncodedLocalName / getPersistenceNamingEncoding), so encoding here
// produces a valid round-trip with no double-encoding.
String encodedSubName = Codec.encode(subName);
return TopicName.get(originTopicName) + "-" + encodedSubName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
}

public static String getTransactionPendingAckStoreCursorName() {
Expand Down Expand Up @@ -569,4 +576,4 @@ public ByteBuf serialize(ArrayList<PendingAckMetadataEntry> dataArray) {
return buf;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -140,6 +143,33 @@ private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig txnLo
return (MLPendingAckStore) mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get();
}

@Test
public void testPendingAckStoreWithSlashSubscriptionName() throws Exception {
String slashSubName = "tenant/namespace/my-function";
when(persistentSubscriptionMock.getName()).thenReturn(slashSubName);

MLPendingAckStoreProvider provider = new MLPendingAckStoreProvider();

// Should not throw — subscription names containing '/' must be URL-encoded so the
// resulting pending-ack topic name is a valid V2 persistent topic name.
MLPendingAckStore store = (MLPendingAckStore) provider.newPendingAckStore(persistentSubscriptionMock).get();

// Verify the managed ledger persistence path encodes the subscription name correctly.
// Expected: tenant/namespace/persistent/<encodedLocalName>
// where localName = "<topic>-<encodedSubName>__transaction_pending_ack"
String originTopicName = persistentSubscriptionMock.getTopic().getName();
TopicName origin = TopicName.get(originTopicName);
String encodedSubName = Codec.encode(slashSubName);
String expectedLocalName = origin.getLocalName() + "-" + encodedSubName
+ SystemTopicNames.PENDING_ACK_STORE_SUFFIX;
// getPersistenceNamingEncoding() = tenant/namespace/persistent/encodedLocalName
String expectedMlName = origin.getTenant() + "/" + origin.getNamespacePortion()
+ "/persistent/" + Codec.encode(expectedLocalName);
Assert.assertEquals(store.getManagedLedger().get().getName(), expectedMlName);

closePendingAckStoreWithRetry(store);
}

/**
* Overridden cases:
* 1. Batched write and replay with batched feature.
Expand Down
Loading