diff --git a/pip/pip-475.md b/pip/pip-475.md new file mode 100644 index 0000000000000..0337de31dd720 --- /dev/null +++ b/pip/pip-475.md @@ -0,0 +1,350 @@ +# PIP-475: Regular-to-Scalable Topic Migration + +*Sub-PIP of [PIP-460: Scalable Topics](pip-460.md)* + +## Motivation + +[PIP-460](pip-460.md) introduces scalable topics (`topic://...`) as a new topic type that supports range splitting and merging without breaking key ordering. For this to be adoptable in real deployments, users with existing partitioned and non-partitioned topics need a migration path that: + +1. **Doesn't require recreating their topics from scratch.** Existing topics may hold months of retained data and have many active subscriptions. Re-create + re-publish is not a viable upgrade story. +2. **Lets clients adopt the V5 SDK before any topic is migrated.** Operationally, applications need to be upgraded one at a time over weeks, while the topics they read and write keep working as-is. The V5 SDK has to interoperate with the *old* topic types until the migration moment. +3. **Keeps the migration moment small and surgical.** Once all clients are on the V5 SDK, an admin command flips a topic from regular to scalable in a single atomic step, without copying data or moving cursors. +4. **Cannot be reversed.** Once a topic is scalable, regressing to a regular topic is unsafe (the new layout can have already split, leaving data in segments that don't map back to a fixed partition count). The metadata transition has to be one-way. + +PIP-460 lists "Tooling for migrating existing partitioned topics to scalable topics" in its postponed section. This PIP closes that gap. + +This PIP also clarifies the V5 SDK's behavior when given a topic name that may or may not be scalable, and tightens the broker so that a v4 client cannot accidentally write to (or auto-create) a regular topic that has already been migrated. + +The longer-term direction for Pulsar is for scalable topics to **fully replace** partitioned and non-partitioned topics: the existing topic types stay supported for backward compatibility, but new development on the topic surface targets scalable topics, and migration tooling like this PIP is what lets existing deployments make that transition incrementally instead of all at once. + +--- + +## Background Knowledge + +### Topic domains in Pulsar today + +A Pulsar topic name encodes its domain in a URI scheme: + +- `persistent://t/n/x` — durable topic backed by a managed ledger. +- `non-persistent://t/n/x` — in-memory topic, no durability. +- `topic://t/n/x` — scalable topic introduced by PIP-460. Backed by a DAG of segments; each segment is itself a `segment://...` topic with its own managed ledger. + +A short name like `my-topic` is normalized to `persistent://public/default/my-topic` by the v4 client. The V5 SDK keeps the same normalization, then opens a scalable-topic lookup session for the resolved name (see [V5 SDK resolution rule](#v5-sdk-resolution-rule) below). + +### Partitioned vs. non-partitioned regular topics + +Two distinct shapes: + +- **Non-partitioned**: a single `persistent://t/n/x` with one managed ledger. +- **Partitioned**: `persistent://t/n/x` is a logical name; the actual data lives in `persistent://t/n/x-partition-0` … `persistent://t/n/x-partition-(N-1)`, each with its own managed ledger. The v4 client's partitioned producer routes each message to a partition by `signSafeMod(murmurHash3_32(key), N)`. + +### V5 segment routing + +A scalable topic's hash space is `[0x0000, 0xFFFF]`. Each active segment owns a contiguous sub-range. The V5 producer routes a message with key `K` to the segment whose range contains `murmurHash3_32(K) & 0xFFFF`. Segments split into two halves; halves can later be merged back. The routing function is range-based, not modulo-based. + +This difference matters for partitioned-topic migration: the migrated layout's initial segments line up 1:1 with the partitions, but their hash-range routing wouldn't agree with v4's mod-N routing for all keys. The fix is described in [Routing across migration](#routing-across-migration) below. + +--- + +## Goals + +### In Scope + +- A V5 SDK that operates against existing regular topics (partitioned or non-partitioned) without requiring source changes from the application. +- An admin command — `admin.scalableTopics().migrateToScalable(topic)` — that converts an existing regular topic into a scalable topic in a single atomic step, with no data copy and no cursor migration. +- A V5 SDK resolution rule that picks between the scalable code path and a v4-compatible fallback based on the input topic name and broker-side state, with a strict "once scalable, always scalable" guarantee. +- A broker-side guard that prevents new v4 clients from accidentally writing to (or auto-creating) a regular topic whose name has already been claimed by a scalable topic. +- Preservation of per-key ordering across the migration moment, via the same drain-before-assign protocol the controller uses for segment splits: old partitions become sealed parent segments and the subscription controller drains them before activating consumers on the new children. +- Rejection of `non-persistent://` topics in V5 builders. Non-persistent topics are out of scope for V5 entirely. + +### Out of Scope + +- **Cross-cluster (geo-replication) migration coordination.** A topic migrated in one cluster while still being replicated to another is a separate problem, deferred until geo-replication on scalable topics lands ([PIP-460](pip-460.md) sub-PIP). +- **Reverse migration (scalable → regular).** Not supported; the metadata transition is one-way. +- **Migration triggered by traffic or load thresholds.** Migration is operator-initiated only. +- **Per-message format conversion.** Messages stay byte-identical on disk before and after migration; only the topic-name surface above the managed ledger changes. + +--- + +## High-Level Design + +### V5 SDK resolution rule + +When a V5 application calls `client.newProducer(...).topic(input)` (or any consumer builder), the SDK opens a **scalable-topic lookup session** for the topic — the same long-lived push-based session that PIP-468 defines for `topic://...` discovery. The lookup session is the single source of truth for what the topic looks like, and how it routes; there is no separate "probe" call and no client-side TTL cache. + +The broker responds to the lookup session based on the input form and the topic's current state: + +| Input form | Lookup response | +|---|---| +| `topic://t/n/x` | Real DAG layout (or `NotFound` if the scalable topic doesn't exist — same as today). | +| `persistent://t/n/x` | If scalable metadata exists for the equivalent `topic://t/n/x`: real DAG layout, with the topic identity promoted to `topic://...` for all subsequent operations. Otherwise: a **synthetic layout** that models the regular topic's partitions as **special segments** (see [Special segments](#special-segments) below). | +| `my-topic` (or any short / unscoped form) | Normalize to `persistent://public/default/my-topic` then apply the rule above. | +| `non-persistent://...` | Reject at `create()` / `subscribe()` with `UnsupportedOperationException`. V5 does not support non-persistent topics. | + +Because the lookup session is push-based, the broker can **update the layout in place** when the topic is migrated. The V5 SDK already handles layout changes — splits and merges go through the same machinery — so a migration is observed as one more layout-change event: synthetic layout → real DAG. The application sees its `Producer` / `Consumer` keep working through the transition; the SDK's existing reconnection logic handles the underlying connection swap internally. + +This gives the "once scalable, always scalable" guarantee (Goal 4) for free: once the lookup session has reported a real DAG, future updates can only refine the DAG via splits / merges; there is no "downgrade to synthetic" path the broker exposes. + +### Special segments + +A special segment is the lookup-session encoding of "this slice of the keyspace is not yet a real `segment://...` topic — it's the existing `persistent://t/n/x[-partition-K]` topic instead." It carries the same fields as a regular segment plus a marker that points at the underlying `persistent://...` name. + +The V5 SDK's per-segment producer and consumer infrastructure already attaches to a topic name; the only difference for a special segment is that the name uses the `persistent://` domain instead of `segment://`. No separate code path, no separate wrapper class hierarchy. + +For routing, the layout carries the routing function as data: + +- **Synthetic layout for an N-partitioned regular topic**: N special segments, one per partition; routing is `signSafeMod(murmurHash3_32(key), N)` — exactly v4's partitioned-producer routing. Producers route the same way the v4 SDK would, ensuring per-key ordering during the migration window. +- **Synthetic layout for a non-partitioned regular topic**: 1 special segment covering the full hash range; routing is trivial (no key matters). +- **Real DAG (post-migration or natively scalable)**: range-based routing — the standard scalable-topic semantics. Producers route by hash range; per-key ordering across the migration boundary is preserved by the controller's drain-before-assign protocol (see [Routing across migration](#routing-across-migration) below). + +### Migration protocol (operator's view) + +``` +Pre-migration (steady state): + • Topic exists as persistent://t/n/x (with N partitions, possibly N=0) + • Some clients are still on the v4 SDK; others have been upgraded to V5. + • V5 clients see a SYNTHETIC layout from the lookup session: N special + segments (or 1 for non-partitioned) pointing at the existing + persistent://...-partition-K topics, with mod-N routing. + +Step 1 — Operator upgrades all clients to the V5 SDK. + Step 2 enforces this: the migration command inspects the topic's active + producer/consumer connections and fails with HTTP 409 if any v4 + connections remain. Old code keeps working unchanged before migration + because the synthetic layout exposes the existing persistent topics + through the V5 surface; routing is mod-N so per-key destinations are + identical to v4 partitioned-topic routing. + +Step 2 — Operator runs: + pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x + + The broker: + 2a. Validates that no v4 producer/consumer connections are attached to + the topic. If any are, fails with HTTP 409 and the connection count + in the error message. (Also fails 409 if scalable metadata already + exists, and 404 if the topic doesn't exist.) + 2b. Builds ScalableTopicMetadata with: + • N sealed parent segments (or 1 for non-partitioned), each + wrapping the existing managed ledger for + persistent://t/n/x-partition-K (or persistent://t/n/x for + non-partitioned). The managed ledgers are unchanged; no data copy. + • N active child segments (or 1) with equal-width contiguous hash + ranges covering [0x0000, 0xFFFF], using standard range-based + routing. Each child has all N parents as predecessors in the DAG. + New writes route to children; subscriptions drain the parents + before consuming from children — the same drain-before-assign + protocol the controller already uses for segment splits / merges. + 2c. Atomically writes the metadata, taking the topic from + "regular" to "scalable" in one CAS on the metadata store. + 2d. Pushes the new layout to every connected lookup session. + +Step 3 — Connected V5 clients receive the layout-update push on their + lookup session. Synthetic layout → real DAG. The SDK's existing + layout-change handling (used for split / merge) drives any internal + reconnection. Application-visible behaviour: nothing changes. +``` + +### Routing across migration + +The synthetic layout (pre-migration) routes mod-N so that V5 producers write to the same partitions v4 producers would. The post-migration real DAG routes by hash range — the standard scalable-topic semantics. The two routings are not equivalent: a given key's destination segment can change at the migration moment. + +Per-key ordering is preserved by the existing scalable-topic drain-before-assign protocol, the same one the subscription controller already uses for splits and merges: + +- Each old partition becomes a sealed parent segment in the new DAG. +- N new active child segments are created alongside, with equal-width contiguous hash ranges; every child has all N parents as predecessors. +- The subscription controller fully drains the parents before assigning their children to a consumer. By the time a consumer first sees a message from a child, every message previously published to those parents (including any with the same key) has already been delivered. + +Producers route to children using standard range-based routing immediately after the migration commit; their writes land in the new segments while the parents are draining behind them. No special routing flag is needed: the migration reuses the same machinery that handles splits, with the parent fan-in (each child has N parents instead of 1) being the only structural difference. + +For non-partitioned topics (N=1) the same protocol applies trivially: one sealed parent and one active child covering the full hash range. + +--- + +## Detailed Design + +### 1. V5 SDK changes + +The SDK reuses its existing scalable code path — the lookup session, per-segment producer / consumer infrastructure, layout-change handler — for *every* topic, including regular ones. The only new bits are: how the lookup session is opened from `persistent://` / short-form input, how the SDK interprets a "special segment" entry in the layout, and how it carries the routing function carried by the layout. + +#### 1.1 Lookup session opens for any input form + +`PulsarClientV5` opens a lookup session for the user's topic regardless of domain. The existing scalable-topic lookup-session machinery is extended so the broker accepts `persistent://...` and short-form names in addition to `topic://...`. The session response carries: + +- The promoted topic identity (always `topic://t/n/x` after normalization). +- The current layout — either a real DAG or a synthetic layout (see [Special segments](#special-segments) above). +- The routing function (`mod-N` for the synthetic layout, `range-based` for the real DAG). + +`non-persistent://...` inputs are rejected at the V5 builder before the lookup is opened, with `UnsupportedOperationException`. + +#### 1.2 Special-segment handling in the per-segment infrastructure + +A regular `Segment` carries a `segment://...` URI. A special segment carries a `persistent://...` URI instead, plus a flag indicating it's special. The SDK's per-segment producer (`PerSegmentProducer`) and per-segment consumer (`PerSegmentConsumer`) attach to whatever URI the segment carries — the v4 producer / consumer beneath them already accepts `persistent://` and `segment://` alike. No separate adapter classes are introduced. + +V5-specific features that don't apply on a regular topic surface as ordinary "this layout doesn't support that" errors at the API surface: + +- `CheckpointConsumer` requires a sealed-segment history; on a synthetic layout there is none, so subscribe fails with a clear error pointing at the migration command. +- `splitSegment` / `mergeSegments` admin operations on a synthetic layout fail at the broker with the same error class — the operations require real `ScalableTopicMetadata`. +- `topic://...`-domain DLQ targets work transparently for both layouts because the DLQ is its own scalable topic; nothing about it depends on the source topic's layout shape. + +#### 1.3 Layout-change handling drives the migration transition + +Layout updates pushed by the lookup session already trigger the SDK's per-segment reconcile: segments that disappeared get their per-segment producers / consumers torn down; new segments get fresh ones; segments whose URI changed get rebuilt on the new URI. + +A migration is exactly this: the special segment with URI `persistent://t/n/x-partition-K` is replaced in the new layout with a real segment whose URI is `segment://t/n/x/` — and that `segment://...` resolves to the same managed ledger. The reconciler tears down the per-segment v4 producer/consumer attached to the `persistent://` URI and reattaches one to the `segment://` URI; from the application's perspective the SDK's internal reconnect happens (as it does for any layout change) but the `Producer` / `Consumer` reference and the publish/receive flow are unaffected. + +There is no `TopicMigratedException` exposed to the application by default. Applications that *want* to observe migrations can subscribe to the lookup session's layout-change events directly via a future hook — out of scope for this PIP. + +### 2. Migration command + +#### 2.1 Admin REST endpoint + +`POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable` — requires `produce` permission on the topic (the same permission needed to write to it in the first place). Migration is irreversible but does not destroy data, so the blast radius is bounded by what a write-permissioned user can already do; super-user is not required. + +No request body. + +The broker counts v4 producer / consumer connections via the existing topic-stats path and rejects the migration with HTTP 409 if any are present, with the count in the error message. + +Response: `204 No Content` on success. `409 Conflict` if scalable metadata already exists or v4 connections are still attached. `404` if the topic doesn't exist. + +#### 2.2 Admin client + +```java +package org.apache.pulsar.client.admin; + +public interface ScalableTopics { + /** Migrate an existing partitioned or non-partitioned topic to a scalable topic. */ + void migrateToScalable(String topic) throws PulsarAdminException; + CompletableFuture migrateToScalableAsync(String topic); +} +``` + +#### 2.3 CLI + +``` +pulsar-admin scalable-topics migrate-to-scalable persistent://t/n/x +``` + +#### 2.4 Broker-side migration steps + +Executed by the topic's owning broker, atomically as far as possible: + +1. **Lock**: acquire a metadata-store lock on `/topics/t/n/x` to prevent concurrent migrations or competing admin operations. +2. **Precheck**: + - Topic exists (as either partitioned or non-partitioned). + - No `ScalableTopicMetadata` already exists at the same path. If it does, fail 409. + - No v4 producer / consumer connections are attached. If any are, fail 409 with the count in the error message. +3. **Build initial layout**: + - N sealed parent segments (or 1 for non-partitioned), each wrapping the existing managed ledger for `persistent://t/n/x-partition-K` (or `persistent://t/n/x`). + - N active child segments (or 1 for non-partitioned) with equal-width contiguous hash ranges covering `[0x0000, 0xFFFF]`. Routing is range-based. + - DAG edges: every child has all N parents as predecessors. The subscription controller's drain-before-assign protocol (already used for splits / merges) preserves per-key ordering. + - Set `epoch = 0`, `nextSegmentId = 2N`. +4. **Atomic flip**: write `ScalableTopicMetadata` to `/topics/t/n/x` via metadata-store CAS. This is the commit point — once it succeeds, the topic is scalable. +5. **Push the new layout to every connected lookup session.** V5 clients that were seeing the synthetic layout for this topic transition to the real DAG via the same machinery used for split / merge layout updates. No `TerminateTopic` is needed — the underlying managed ledgers don't change identity, only the layout-level segment names that wrap them do. +6. **Release the lock**. + +Failures before step 4 leave the topic untouched; failures after step 4 leave the topic scalable. Step 5 is best-effort per session — late-joining lookups always read the freshest metadata directly. + +### 3. Lookup-session guard for v4 clients + +The lookup session described above is V5-only. v4 clients use the older `CommandLookupTopic`. The broker must, for v4 lookups of `persistent://t/n/x` where `ScalableTopicMetadata` exists for `topic://t/n/x`, return a `TopicMigrated` redirect (binary protocol) / HTTP 410 Gone (REST). The error carries the new `topic://...` name. v4 clients translate this into a hard failure (they can't speak the scalable protocol); operators see a clear signal that some clients haven't upgraded yet. + +This guard is what makes the "once scalable, always scalable" guarantee robust against stale v4 clients or v4-only tooling. V5 clients are guarded by the lookup session itself — once it has reported a real DAG, the broker only ever pushes refinements (split / merge), never a downgrade. + +### 4. `migratedFrom` on `ScalableTopicMetadata` + +An optional informational field `migratedFrom: TopicMigrationOrigin?` records pre-migration metadata (partition count, original `persistent://` name) for debugging, metrics labelling, and future tooling. Not consulted on hot paths. + +No `legacyModNRouting` flag or other migration-specific routing knob is needed: the post-migration real DAG uses standard range-based routing and per-key ordering is preserved by the controller's existing drain-before-assign protocol (see [Routing across migration](#routing-across-migration)). + +--- + +## Public-facing changes + +### REST API + +New endpoint: +- `POST /admin/v2/scalable-topics/{tenant}/{namespace}/{topic}/migrate-to-scalable` — requires `produce` permission on the topic; no request body; 204 on success, 409 if already scalable or v4 connections are still attached, 404 if topic missing. + +Modified endpoints: +- All v4 lookup endpoints: when scalable metadata exists for the equivalent `topic://...`, return a `TopicMigrated` redirect with the new name. + +### Binary protocol + +- The existing scalable-topic lookup-session command (PIP-468) is extended to accept `persistent://...` and short-form names in addition to `topic://...`. For non-scalable topics it returns a synthetic layout with special-segment entries. +- New error code `TopicMigrated` — sent by the broker on the v4 lookup command (`CommandLookupTopic`) when the requested `persistent://...` name is shadowed by an existing scalable topic. Carries the new `topic://...` name in the error payload. V5 clients never see this error because they always use the scalable lookup session. + +### Configuration + +New broker config: +- `enableScalableTopicMigration` (default `true`) — kill switch for the migration command. Operators on regulated infra may want to disable. + +### CLI + +- `pulsar-admin scalable-topics migrate-to-scalable ` + +### V5 SDK behavior + +- V5 builders accept `persistent://...` and short-form names; the SDK opens a scalable-topic lookup session that the broker resolves to either a real DAG or a synthetic layout. +- V5 builders reject `non-persistent://...` with `UnsupportedOperationException`. +- Migration is observed as a layout-change push on the lookup session; no new SDK exception is exposed to the application. + +--- + +## Backward & forward compatibility + +### Upgrade + +The upgrade story this PIP supports is: + +1. Upgrade brokers to the version containing this PIP. Brokers handle both old `persistent://` clients and new `topic://` clients side by side; no behavior change for existing topics. +2. Upgrade applications to the V5 SDK at the operator's pace. No topic changes required; V5 SDK uses the wrapper path. +3. Once all applications on a given topic are V5, run `migrate-to-scalable`. The migration is atomic and one-way. + +Old client versions continue to work with the cluster on un-migrated topics. + +### Downgrade / Rollback + +- A broker downgrade *before* any topic has been migrated is fully supported. +- A broker downgrade *after* a topic has been migrated is **not supported**: older brokers don't understand the scalable-metadata layout. Recovery would require restoring the metadata store from backup. The migration command should be treated as a one-way commit. +- A V5 SDK client can be downgraded back to v4 only if the topic was never migrated. Once migrated, the topic only speaks the scalable protocol. + +### Geo-replication + +Out of scope; see [Goals: Out of Scope](#out-of-scope). + +--- + +## Alternatives + +### Alt 1: Probe-and-wrapper in the SDK (rejected) + +An earlier draft of this PIP had the V5 SDK probe via `admin.scalableTopics().getMetadataAsync(topic)` at every builder call, cache the verdict with a TTL, and use a separate v4-wrapper code path for regular topics. Migration would be observed by connected clients as `TopicTerminatedException` from the v4 wrapper, which the SDK would catch and use as the cue to re-probe and rebuild on the scalable path. + +Rejected in favour of the lookup-session approach because: +- Two SDK code paths (scalable + v4 wrapper) would have to be maintained and tested in parallel. +- The lookup session is push-based; the probe approach forces a TTL trade-off (long TTL = slow to notice migration; short TTL = constant load). +- Migration was visible to applications as a `TopicMigratedException` on receive futures, however brief; the lookup-session approach makes it strictly an internal SDK reconnect, with no application-visible signal at all. +- The hash-routing equivalence problem ([Routing across migration](#routing-across-migration)) had to be papered over with a documented constraint; with the lookup session carrying the routing function as data, the synthetic layout's mod-N routing and the real DAG's range-based routing coexist naturally, with per-key ordering preserved by the controller's drain-before-assign protocol. + +### Alt 2: Migration via per-message data copy + +The migration command would create a new scalable topic alongside the regular one and stream all retained data through. Producers and consumers would cut over after the copy completes. + +Rejected because: +- The metadata-flip approach in this PIP achieves the same result with no data movement. +- A retained topic with months of data could take hours to copy, during which producers and consumers would have no clear cut-over moment. + +--- + +## General Notes + +- `migratedFrom` is an optional informational field on `ScalableTopicMetadata` (partition count, original `persistent://` name) for debugging and metrics labelling; not consulted on hot paths. + +--- + +## Links + +- [PIP-460: Scalable Topics](pip-460.md) — parent PIP. +- [PIP-468: Scalable Topic Controller](pip-468.md) — sibling sub-PIP, defines the metadata-store schema this PIP extends. +- Mailing List discussion thread: TBD +- Mailing List voting thread: TBD