[feat] PIP-468: segment-aware admin endpoints for cursor lifecycle#25717
Merged
merlimat merged 3 commits intoapache:masterfrom May 8, 2026
Merged
[feat] PIP-468: segment-aware admin endpoints for cursor lifecycle#25717merlimat merged 3 commits intoapache:masterfrom
merlimat merged 3 commits intoapache:masterfrom
Conversation
…ycle
createSubscription / deleteSubscription on a scalable topic was silently
broken: ScalableTopicController.createSubscriptionOnSegment built a
"persistent://t/n/parent" URL (no descriptor) and handed it to the
generic topics().createSubscriptionAsync admin API. That URL addresses
the parent scalable topic, which has no managed-ledger backing, so the
admin call hit "Subscription Busy" / NotFound and the .exceptionally
block swallowed it. Per-segment cursors were never pre-created. Lazy
creation by the V5 client on first consumer subscribe masked the bug
in the happy path; admin "createSubscription" had no observable effect
beyond writing the SubscriptionMetadata to the metadata store.
PR 25709 attempted to fix this by appending the descriptor to the
persistent URL ("persistent://t/n/parent/0000-ffff-0"), but TopicName's
non-scalable-domain parser uses splitLimit=4 and rejects 4-segment
paths as legacy V1 — making the admin call hard-fail synchronously
before any HTTP request goes out, so the user-facing breakage is now
loud instead of silent. Cleaned-up form here uses segment-specific
admin endpoints all the way down, no persistent-URL conversion.
New REST endpoints (Segments.java, super-user only, segment-domain only):
- PUT /segments/{t}/{n}/{topic}/{descriptor}/subscription/{sub}
- DELETE /segments/{t}/{n}/{topic}/{descriptor}/subscription/{sub}
New admin client methods (ScalableTopics admin API):
- createSegmentSubscriptionAsync(segmentTopic, subscription)
- deleteSegmentSubscriptionAsync(segmentTopic, subscription)
Both take a segment:// URI directly; ScalableTopicsImpl builds the
REST path from TopicName parts (same pattern as the existing
getSegmentSubscriptionBacklogAsync / seekSegmentSubscriptionAsync /
clearSegmentSubscriptionBacklogAsync helpers).
Updated ScalableTopicController.createSubscriptionOnSegment /
deleteSubscriptionOnSegment to call the new admin methods, removing
the broken toSegmentUnderlyingPersistentName helper.
Updated ScalableTopics.deleteSegmentTopics (the whole-scalable-topic
teardown path) to use admin.scalableTopics().deleteSegmentAsync(
segmentTopicName, force) — same segment-aware admin endpoint already
used for split-segment teardown — instead of trying to address the
segment via a fabricated persistent:// URL.
V5ScalableSubscriptionAdminTest (new) is a behavioral round trip:
create scalable topic, admin.scalableTopics().createSubscription,
produce 30 messages with no consumer attached, then subscribe and
assert every message is received. Fails on master, passes here.
ScalableTopicControllerTest updated to mock the new admin methods
and verify the controller invokes them.
11 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Supersedes #25709, which had the right intent but produced an invalid
persistent://t/n/parent/{descriptor}URL thatTopicNameparses as a legacy V1 topic name and rejects synchronously — making the admin call hard-fail before any HTTP request goes out.createSubscription/deleteSubscriptionon a scalable topic was silently broken before either change.ScalableTopicController.createSubscriptionOnSegmentbuilt apersistent://t/n/parentURL (no descriptor) and handed it to the generictopics().createSubscriptionAsyncadmin API. That URL addresses the parent scalable topic, which has no managed-ledger backing, so the admin call hitSubscription Busy/NotFoundand the.exceptionallyblock swallowed it. Per-segment cursors were never pre-created. Lazy cursor creation by the V5 client on first consumer subscribe masked the bug in the happy path; admincreateSubscriptionhad no observable effect beyond writing theSubscriptionMetadatato the metadata store. The same shape of bug existed inScalableTopics.deleteSegmentTopics(whole-scalable-topic teardown).The clean fix is segment-specific admin endpoints, all the way down — no
persistent://conversion.New REST endpoints (
Segments.java, super-user only, segment-domain only)PUT /segments/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}— create a cursor at earliest positionDELETE /segments/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{subscription}— delete the cursorSame
validateSuperUserAccessAsync→validateTopicOwnershipAsync→getOrCreateTopic(orgetTopicIfExistsfor delete) pattern as the existing/segments/.../subscription/{sub}/backlog,/seek,/skip-all.New admin client methods (
ScalableTopics)createSegmentSubscriptionAsync(segmentTopic, subscription)deleteSegmentSubscriptionAsync(segmentTopic, subscription)Both take a
segment://...URI directly.ScalableTopicsImplbuilds the REST path fromTopicNameparts — same pattern as the existinggetSegmentSubscriptionBacklogAsync/seekSegmentSubscriptionAsync/clearSegmentSubscriptionBacklogAsynchelpers.Caller updates
ScalableTopicController.createSubscriptionOnSegment/deleteSubscriptionOnSegmentnow callscalableTopics().createSegmentSubscriptionAsync/deleteSegmentSubscriptionAsync. Removed the brokentoSegmentUnderlyingPersistentNamehelper.ScalableTopics.deleteSegmentTopics(whole-scalable-topic teardown) now usesadmin.scalableTopics().deleteSegmentAsync(segmentTopicName, force)— the existing segment-aware admin endpoint already used for split-segment teardown — instead of fabricating apersistent://URL.Test plan
V5ScalableSubscriptionAdminTest(new, 1 test) — behavioral round trip: create scalable topic →admin.scalableTopics().createSubscription→ produce 30 messages with no consumer attached → subscribe and assert all 30 are received. Fails on master, passes here.ScalableTopicControllerTest31/31 — updated mocks:topics.createSubscriptionAsync/deleteSubscriptionAsync→scalableTopics.createSegmentSubscriptionAsync/deleteSegmentSubscriptionAsync. Verifies the controller calls the new admin path.ScalableTopicServiceTest16/16 — unchanged, no regressions.pulsar-client-admin-api,pulsar-client-admin-original,pulsar-brokercheckstyle clean.