From dae26a49afef916e503476b5dc94b1c2e898b0d6 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 7 May 2026 12:35:33 -0700 Subject: [PATCH 1/2] [fix][broker] PIP-468: segment-aware admin endpoints for cursor lifecycle MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit createSubscription / deleteSubscription on a scalable topic was silently broken: ScalableTopicController.createSubscriptionOnSegment built a "persistent://t/n/parent" URL (no descriptor) and handed it to the generic topics().createSubscriptionAsync admin API. That URL addresses the parent scalable topic, which has no managed-ledger backing, so the admin call hit "Subscription Busy" / NotFound and the .exceptionally block swallowed it. Per-segment cursors were never pre-created. Lazy creation by the V5 client on first consumer subscribe masked the bug in the happy path; admin "createSubscription" had no observable effect beyond writing the SubscriptionMetadata to the metadata store. PR 25709 attempted to fix this by appending the descriptor to the persistent URL ("persistent://t/n/parent/0000-ffff-0"), but TopicName's non-scalable-domain parser uses splitLimit=4 and rejects 4-segment paths as legacy V1 — making the admin call hard-fail synchronously before any HTTP request goes out, so the user-facing breakage is now loud instead of silent. Cleaned-up form here uses segment-specific admin endpoints all the way down, no persistent-URL conversion. New REST endpoints (Segments.java, super-user only, segment-domain only): - PUT /segments/{t}/{n}/{topic}/{descriptor}/subscription/{sub} - DELETE /segments/{t}/{n}/{topic}/{descriptor}/subscription/{sub} New admin client methods (ScalableTopics admin API): - createSegmentSubscriptionAsync(segmentTopic, subscription) - deleteSegmentSubscriptionAsync(segmentTopic, subscription) Both take a segment:// URI directly; ScalableTopicsImpl builds the REST path from TopicName parts (same pattern as the existing getSegmentSubscriptionBacklogAsync / seekSegmentSubscriptionAsync / clearSegmentSubscriptionBacklogAsync helpers). Updated ScalableTopicController.createSubscriptionOnSegment / deleteSubscriptionOnSegment to call the new admin methods, removing the broken toSegmentUnderlyingPersistentName helper. Updated ScalableTopics.deleteSegmentTopics (the whole-scalable-topic teardown path) to use admin.scalableTopics().deleteSegmentAsync( segmentTopicName, force) — same segment-aware admin endpoint already used for split-segment teardown — instead of trying to address the segment via a fabricated persistent:// URL. V5ScalableSubscriptionAdminTest (new) is a behavioral round trip: create scalable topic, admin.scalableTopics().createSubscription, produce 30 messages with no consumer attached, then subscribe and assert every message is received. Fails on master, passes here. ScalableTopicControllerTest updated to mock the new admin methods and verify the controller invokes them. --- .../broker/admin/v2/ScalableTopics.java | 24 ++--- .../pulsar/broker/admin/v2/Segments.java | 101 ++++++++++++++++++ .../scalable/ScalableTopicController.java | 26 ++--- .../scalable/ScalableTopicControllerTest.java | 18 ++-- .../v5/V5ScalableSubscriptionAdminTest.java | 91 ++++++++++++++++ .../pulsar/client/admin/ScalableTopics.java | 26 +++++ .../admin/internal/ScalableTopicsImpl.java | 22 ++++ 7 files changed, 263 insertions(+), 45 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java index 32468c4889259..3cdece40851f6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/ScalableTopics.java @@ -57,7 +57,6 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; import org.apache.pulsar.common.policies.data.TopicOperation; -import org.apache.pulsar.common.scalable.SegmentInfo; import org.apache.pulsar.common.scalable.SegmentTopicName; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -675,8 +674,9 @@ private static String deserializeLeaderBrokerId(byte[] bytes) { } /** - * Best-effort delete underlying persistent topics for all segments. - * Uses the internal admin client which handles cross-broker routing. + * Best-effort delete the underlying topic for every segment in the DAG. Uses the + * segment-aware admin endpoint, which routes to the segment-owning broker via the + * standard bundle-ownership lookup. */ private CompletableFuture deleteSegmentTopics(TopicName parentTopic, ScalableTopicMetadata metadata, @@ -685,10 +685,11 @@ private CompletableFuture deleteSegmentTopics(TopicName parentTopic, var admin = pulsar().getAdminClient(); CompletableFuture[] futures = metadata.getSegments().values().stream() .map(seg -> { - String name = segmentPersistentName(parentTopic, seg); - return admin.topics().deleteAsync(name, force) + String segmentTopicName = SegmentTopicName.fromParent( + parentTopic, seg.hashRange(), seg.segmentId()).toString(); + return admin.scalableTopics().deleteSegmentAsync(segmentTopicName, force) .exceptionally(ex -> { - log.warn().attr("segment", name).exceptionMessage(ex) + log.warn().attr("segment", segmentTopicName).exceptionMessage(ex) .log("Failed to delete segment topic"); return null; }); @@ -701,15 +702,4 @@ private CompletableFuture deleteSegmentTopics(TopicName parentTopic, return CompletableFuture.completedFuture(null); } } - - /** - * Convert a segment:// topic name to persistent:// for the underlying managed ledger topic. - */ - private String segmentPersistentName(TopicName parentTopic, SegmentInfo segment) { - TopicName segTopic = SegmentTopicName.fromParent( - parentTopic, segment.hashRange(), segment.segmentId()); - return "persistent://" + segTopic.getTenant() + "/" - + segTopic.getNamespacePortion() + "/" - + segTopic.getLocalName(); - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java index 507119518b07d..98d09613d7d48 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Segments.java @@ -175,6 +175,107 @@ public void terminateSegment( }); } + @PUT + @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}") + @ApiOperation(value = "Create a subscription cursor on the segment topic at the earliest" + + " position. Super-user only.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Subscription cursor created (or already existed)"), + @ApiResponse(code = 401, message = "This operation requires super-user access"), + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void createSubscription( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify the parent topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true) + @PathParam("descriptor") String descriptor, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Whether leader broker redirected this call to this broker.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(tenant, namespace); + TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor); + + validateSuperUserAccessAsync() + .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative)) + .thenCompose(__ -> pulsar().getBrokerService().getOrCreateTopic(segmentTopic.toString())) + .thenCompose(topic -> topic.createSubscription(subscription, + CommandSubscribe.InitialPosition.Earliest, false, null)) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .log("Created subscription on segment topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .exception(ex).log("Failed to create subscription on segment"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}") + @ApiOperation(value = "Delete a subscription cursor on the segment topic. Super-user only.") + @ApiResponses(value = { + @ApiResponse(code = 204, message = "Subscription cursor deleted (or never existed)"), + @ApiResponse(code = 401, message = "This operation requires super-user access"), + @ApiResponse(code = 403, message = "This operation requires super-user access"), + @ApiResponse(code = 500, message = "Internal server error")}) + public void deleteSubscription( + @Suspended final AsyncResponse asyncResponse, + @ApiParam(value = "Specify the tenant", required = true) + @PathParam("tenant") String tenant, + @ApiParam(value = "Specify the namespace", required = true) + @PathParam("namespace") String namespace, + @ApiParam(value = "Specify the parent topic name", required = true) + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Segment descriptor (e.g. 0000-7fff-1)", required = true) + @PathParam("descriptor") String descriptor, + @ApiParam(value = "Subscription name", required = true) + @PathParam("subscription") String subscription, + @ApiParam(value = "Whether leader broker redirected this call to this broker.") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateNamespaceName(tenant, namespace); + TopicName segmentTopic = segmentTopicName(tenant, namespace, encodedTopic, descriptor); + + validateSuperUserAccessAsync() + .thenCompose(__ -> validateTopicOwnershipAsync(segmentTopic, authoritative)) + .thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(segmentTopic.toString())) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + // Topic not loaded → no cursor to delete. Idempotent success. + return CompletableFuture.completedFuture(null); + } + var sub = optTopic.get().getSubscription(subscription); + if (sub == null) { + // Subscription doesn't exist on this segment — idempotent success. + return CompletableFuture.completedFuture(null); + } + return sub.delete(); + }) + .thenAccept(__ -> { + log.info().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .log("Deleted subscription on segment topic"); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + log.error().attr("clientAppId", clientAppId()).attr("segment", segmentTopic) + .attr("subscription", subscription) + .exception(ex).log("Failed to delete subscription on segment"); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); + } + @GET @Path("/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}/backlog") @ApiOperation(value = "Number of unconsumed entries in the segment topic for the " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java index 3bd562a48e100..c596a2e073a66 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java @@ -711,11 +711,11 @@ private CompletableFuture clearSubscriptionBacklogOnSegment(SegmentInfo se } private CompletableFuture createSubscriptionOnSegment(SegmentInfo segment, String subscription) { - String persistentName = toSegmentUnderlyingPersistentName(segment); + String segmentTopicName = toSegmentPersistentName(segment); try { return brokerService.getPulsar().getAdminClient() - .topics().createSubscriptionAsync(persistentName, subscription, - org.apache.pulsar.client.api.MessageId.earliest) + .scalableTopics() + .createSegmentSubscriptionAsync(segmentTopicName, subscription) .exceptionally(ex -> { Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException.ConflictException) { @@ -730,17 +730,18 @@ private CompletableFuture createSubscriptionOnSegment(SegmentInfo segment, } private CompletableFuture deleteSubscriptionOnSegment(SegmentInfo segment, String subscription) { - String persistentName = toSegmentUnderlyingPersistentName(segment); + String segmentTopicName = toSegmentPersistentName(segment); try { return brokerService.getPulsar().getAdminClient() - .topics().deleteSubscriptionAsync(persistentName, subscription, true) + .scalableTopics() + .deleteSegmentSubscriptionAsync(segmentTopicName, subscription) .exceptionally(ex -> { Throwable cause = org.apache.pulsar.common.util.FutureUtil.unwrapCompletionException(ex); if (cause instanceof org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException) { return null; } log.warn().attr("subscription", subscription) - .attr("segment", persistentName).exceptionMessage(cause) + .attr("segment", segmentTopicName).exceptionMessage(cause) .log("Failed to delete subscription from segment"); return null; }); @@ -1096,19 +1097,6 @@ private String toSegmentPersistentName(SegmentInfo segment) { return segmentTopicName.toString(); } - /** - * Return the {@code persistent://} form of a segment's underlying managed-ledger topic, - * suitable for the standard {@link org.apache.pulsar.client.admin.Topics} admin API. - * The segment-owning broker is discovered by the admin client's normal bundle routing. - */ - private String toSegmentUnderlyingPersistentName(SegmentInfo segment) { - TopicName segmentTopicName = SegmentTopicName.fromParent( - topicName, segment.hashRange(), segment.segmentId()); - return "persistent://" + segmentTopicName.getTenant() + "/" - + segmentTopicName.getNamespacePortion() + "/" - + segmentTopicName.getLocalName(); - } - private CompletableFuture terminateSegmentTopic(String segmentTopicName) { try { return brokerService.getPulsar().getAdminClient() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 8c75823ebef0d..738f9eb516a66 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -117,14 +117,14 @@ public void setUp() throws Exception { // Default: all admin ops succeed. when(topics.getSubscriptionsAsync(anyString())) .thenReturn(CompletableFuture.completedFuture(java.util.List.of())); - when(topics.createSubscriptionAsync(anyString(), anyString(), any(MessageId.class))) - .thenReturn(CompletableFuture.completedFuture(null)); - when(topics.deleteSubscriptionAsync(anyString(), anyString(), anyBoolean())) - .thenReturn(CompletableFuture.completedFuture(null)); when(scalableTopics.createSegmentAsync(anyString(), any())) .thenReturn(CompletableFuture.completedFuture(null)); when(scalableTopics.terminateSegmentAsync(anyString())) .thenReturn(CompletableFuture.completedFuture(null)); + when(scalableTopics.createSegmentSubscriptionAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); + when(scalableTopics.deleteSegmentSubscriptionAsync(anyString(), anyString())) + .thenReturn(CompletableFuture.completedFuture(null)); controller = newController(topicName); } @@ -288,9 +288,9 @@ public void testCreateSubscriptionStream() throws Exception { resources.getSubscriptionAsync(topicName, "sub-stream").get(); assertTrue(persisted.isPresent()); assertEquals(persisted.get().type(), SubscriptionType.STREAM); - // Propagated to every active segment via admin.topics().createSubscriptionAsync(). - verify(topics, org.mockito.Mockito.times(INITIAL_SEGMENTS)) - .createSubscriptionAsync(anyString(), anyString(), any(MessageId.class)); + // Propagated to every active segment via the segment-subscription admin endpoint. + verify(scalableTopics, org.mockito.Mockito.times(INITIAL_SEGMENTS)) + .createSegmentSubscriptionAsync(anyString(), anyString()); } @Test @@ -321,8 +321,8 @@ public void testDeleteSubscription() throws Exception { controller.deleteSubscription("sub-a").get(); assertFalse(resources.getSubscriptionAsync(topicName, "sub-a").get().isPresent()); // Propagated a delete to every segment (all segments incl. any sealed ones). - verify(topics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS)) - .deleteSubscriptionAsync(anyString(), anyString(), anyBoolean()); + verify(scalableTopics, org.mockito.Mockito.atLeast(INITIAL_SEGMENTS)) + .deleteSegmentSubscriptionAsync(anyString(), anyString()); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java new file mode 100644 index 0000000000000..d55495999d5ea --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ScalableSubscriptionAdminTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import java.time.Duration; +import java.util.HashSet; +import java.util.Set; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.common.policies.data.ScalableSubscriptionType; +import org.testng.annotations.Test; + +/** + * Coverage for {@code admin.scalableTopics().createSubscription(...)}: the admin + * API must materialize a cursor on every active segment so a consumer that + * subscribes after messages are produced still receives those + * messages — the whole point of pre-creating the subscription. + * + *

The behavioural assertion (a late consumer sees pre-subscription messages) + * is the user-facing guarantee, and any regression in + * {@code ScalableTopicController.createSubscriptionOnSegment} — which converts + * each {@code SegmentInfo} to the underlying {@code persistent://} topic and + * pre-creates the cursor through the standard topic admin API — would surface + * here as messages going missing. + */ +public class V5ScalableSubscriptionAdminTest extends V5ClientBaseTest { + + @Test + public void testPreCreatedSubscriptionRetainsPreProductionMessages() throws Exception { + String topic = newScalableTopic(3); + String subscription = "pre-created-sub"; + + // Pre-create the subscription on the scalable topic. This must materialize a + // cursor on every active segment so that subsequent produces are retained + // until the consumer drains them. + admin.scalableTopics().createSubscription(topic, subscription, + ScalableSubscriptionType.QUEUE); + + // Produce *before* any consumer subscribes. + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(topic) + .create(); + int n = 30; + Set sent = new HashSet<>(); + for (int i = 0; i < n; i++) { + String v = "msg-" + i; + producer.newMessage().key("k-" + i).value(v).send(); + sent.add(v); + } + + // Subscribe with the SAME subscription name. If createSubscription truly + // pre-created cursors on every segment, the consumer must receive every + // message produced above. If it didn't, the consumer attaches at "latest" + // by default and drops the entire backlog. + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Set received = new HashSet<>(); + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < n && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + } + assertEquals(received, sent, + "pre-created subscription must retain every message produced before consumer subscribed"); + } +} diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java index 10da169d14e08..f9fb01444a6ba 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/ScalableTopics.java @@ -313,6 +313,32 @@ CompletableFuture seekSubscriptionAsync(String topic, String subscription, */ CompletableFuture deleteSegmentAsync(String segmentTopic, boolean force); + /** + * Create a subscription cursor on the given segment topic at the earliest position. + * The call routes to the broker that owns the segment. + * + *

Used internally by {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController + * ScalableTopicController} to fan a new scalable-topic subscription out across every + * active segment so a future consumer doesn't drop the backlog. + * + * @param segmentTopic Full segment topic name ({@code segment://tenant/namespace/topic/descriptor}) + * @param subscription Subscription name + */ + CompletableFuture createSegmentSubscriptionAsync(String segmentTopic, String subscription); + + /** + * Delete a subscription cursor on the given segment topic. The call routes to the broker + * that owns the segment. + * + *

Used internally by {@link org.apache.pulsar.broker.service.scalable.ScalableTopicController + * ScalableTopicController} when a scalable-topic subscription is deleted, so no orphan + * cursors remain on any segment in the DAG. + * + * @param segmentTopic Full segment topic name ({@code segment://tenant/namespace/topic/descriptor}) + * @param subscription Subscription name + */ + CompletableFuture deleteSegmentSubscriptionAsync(String segmentTopic, String subscription); + /** * Returns the number of unconsumed entries in the given subscription's cursor on the * segment topic — i.e. the per-subscription backlog. The call routes to the broker diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java index 5a040123d2ed5..57d26c7011acd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/ScalableTopicsImpl.java @@ -290,6 +290,28 @@ public CompletableFuture deleteSegmentAsync(String segmentTopic, boolean f return asyncDeleteRequest(path); } + @Override + public CompletableFuture createSegmentSubscriptionAsync(String segmentTopic, + String subscription) { + TopicName tn = TopicName.get(segmentTopic); + WebTarget path = adminSegments + .path(tn.getTenant()).path(tn.getNamespacePortion()) + .path(tn.getLocalName()).path(tn.getSegmentDescriptor()) + .path("subscription").path(subscription); + return asyncPutRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + + @Override + public CompletableFuture deleteSegmentSubscriptionAsync(String segmentTopic, + String subscription) { + TopicName tn = TopicName.get(segmentTopic); + WebTarget path = adminSegments + .path(tn.getTenant()).path(tn.getNamespacePortion()) + .path(tn.getLocalName()).path(tn.getSegmentDescriptor()) + .path("subscription").path(subscription); + return asyncDeleteRequest(path); + } + @Override public CompletableFuture getSegmentSubscriptionBacklogAsync(String segmentTopic, String subscription) { From 21c5a0c6c8292cbc987ef33dfe9e979cc10e2636 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 7 May 2026 12:55:45 -0700 Subject: [PATCH 2/2] Remove unused MessageId import flagged by CI checkstyle --- .../broker/service/scalable/ScalableTopicControllerTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java index 738f9eb516a66..4a0fc93a9e8c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/scalable/ScalableTopicControllerTest.java @@ -47,7 +47,6 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.ScalableTopics; import org.apache.pulsar.client.admin.Topics; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ScalableTopicStats; import org.apache.pulsar.metadata.api.MetadataStoreConfig;