Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions pip/pip-469.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,15 @@ Broker startup validates both backends:
- `SystemTopicBasedTopicPoliciesService` must be instantiable.
- The configured `topicPoliciesServiceClassName` must be instantiable.

If either backend cannot be instantiated or started, broker startup fails. There is no per-request fallback from one
backend to another.
`LegacyAwareTopicPoliciesService#start` starts only the configured backend. It intentionally does not call
`SystemTopicBasedTopicPoliciesService#start`, because that start path registers a namespace-bundle ownership listener
whose only purpose is to eagerly create a reader on `<namespace>/__change_events` when a namespace bundle is loaded.
Under legacy-aware routing, that eager optimization would be counterproductive because it can create readers for
namespaces that do not have topic policies in `__change_events`. For legacy namespaces, the system-topic reader and
policy cache are initialized lazily by the routed system-topic backend operations.

If either backend cannot be instantiated, or if the configured backend cannot be started, broker startup fails. There is
no per-request fallback from one backend to another.

### Namespace-scoped service routing

Expand All @@ -118,6 +125,10 @@ backend to another.
the system-topic backend when the system topic exists.
- Routing the same operations to the configured backend when the system topic does not exist.

Listener registration is routed through `TopicPoliciesService#registerListenerAsync`. This lets the wrapper resolve the
namespace backend before registering the listener, and the listener is registered only on the selected backend instead
of being registered on both backends.

The system-topic existence check can be cached per namespace in memory, but the routing rule is defined by actual topic
existence rather than by new namespace metadata.

Expand Down Expand Up @@ -159,6 +170,11 @@ managed-ledger metadata updates.

### Listener behavior

`TopicPoliciesService` adds `registerListenerAsync(TopicName, TopicPolicyListener)` for listener registration. The
existing synchronous `registerListener(TopicName, TopicPolicyListener)` method is retained as a deprecated compatibility
hook for existing custom implementations, and the default async method delegates to it. Implementations that need async
routing or initialization, such as `LegacyAwareTopicPoliciesService`, override `registerListenerAsync` directly.

The backend registers watchers on both metadata stores:

- A change on the local path re-reads the local node and notifies listeners with the latest local `TopicPolicies` or
Expand All @@ -173,6 +189,11 @@ append-only replay log; it relies on metadata-store notifications and read-after

### Public API

The `TopicPoliciesService` extension point gains a default
`CompletableFuture<Boolean> registerListenerAsync(TopicName, TopicPolicyListener)` method. Existing implementations
remain compatible because `registerListener(TopicName, TopicPolicyListener)` is retained and used by the default async
implementation.

No new namespace policy field is introduced.

No new namespace admin REST endpoint or Java admin client method is introduced.
Expand Down Expand Up @@ -221,6 +242,10 @@ This upgrade rule is intentionally conservative:
This means some namespaces with an empty but already-created `__change_events` topic may continue using the
system-topic backend. That is acceptable because it avoids missing legacy state.

Existing custom `TopicPoliciesService` implementations that only implement the synchronous `registerListener` method
continue to work through the default `registerListenerAsync` bridge. Implementations can override
`registerListenerAsync` when registration itself needs asynchronous backend resolution or initialization.

## Downgrade / Rollback

Rolling back to a broker version that does not understand legacy-aware routing returns topic-policies backend
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1750,8 +1750,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece

@FieldContext(
category = CATEGORY_SERVER,
doc = "The class name of the topic policies service. The default config only takes affect when the "
+ "systemTopicEnable config is true"
doc = """
The class name of the topic policies service. There are 2 built-in implementations:
1. "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService" (default)
It stores a topic's policies in the `__change_events` topic. If `systemTopicEnabled` is false,
the topic policies will just be disabled
2. "org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService"
It stores a topic's policies in the metadata store. If `systemTopicEnabled` is true and the
topic's namespace has a `__change_events` topic, the policies will still be stored in the
`__change_events` topic for backward compatibility.
"""
)
private String topicPoliciesServiceClassName =
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.HealthChecker;
import org.apache.pulsar.broker.service.LegacyAwareTopicPoliciesService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -2288,8 +2289,16 @@ private TopicPoliciesService initTopicPoliciesService() throws Exception {
return TopicPoliciesService.DISABLED;
}
}
return (TopicPoliciesService) Reflections.createInstance(className,
final var configuredService = (TopicPoliciesService) Reflections.createInstance(className,
Thread.currentThread().getContextClassLoader());
if (!config.isSystemTopicEnabled()) {
log.info()
.attr("className", className)
.log("System topic is disabled, using configured topic policies service without legacy routing");
return configuredService;
}
return new LegacyAwareTopicPoliciesService(this, new SystemTopicBasedTopicPoliciesService(this),
configuredService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ protected boolean isProducersExceeded(boolean isRemote) {

protected void registerTopicPolicyListener() {
brokerService.getPulsar().getTopicPoliciesService()
.registerListener(TopicName.getPartitionedTopicName(topic), this);
.registerListenerAsync(TopicName.getPartitionedTopicName(topic), this);
}

protected void unregisterTopicPolicyListener() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import lombok.CustomLog;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.jspecify.annotations.NonNull;

/**
* Routes topic policy operations to the legacy system-topic backend when a namespace already has
* a topic-policy {@code __change_events} system topic, and otherwise to the configured backend.
*/
@CustomLog
public class LegacyAwareTopicPoliciesService implements TopicPoliciesService {

private final AsyncLoadingCache<NamespaceName, Boolean> isLegacyNamespace;
@VisibleForTesting
final SystemTopicBasedTopicPoliciesService systemTopicService;
private final TopicPoliciesService configuredService;

public LegacyAwareTopicPoliciesService(PulsarService pulsar,
SystemTopicBasedTopicPoliciesService systemTopicService,
TopicPoliciesService configuredService) {
// Generally, we only need to check if the __change_events topic exists once because the __change_events topic
// should only be created by broker before the upgrade, where `SystemTopicBasedTopicPoliciesService` is
// configured as the topic policies service.
this.isLegacyNamespace = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(1))
.buildAsync(new AsyncCacheLoader<>() {
@NonNull
@Override
public CompletableFuture<? extends Boolean> asyncLoad(NamespaceName key,
@NonNull Executor executor) {
return NamespaceEventsSystemTopicFactory.checkSystemTopicExists(key, EventType.TOPIC_POLICY,
pulsar);
}
});
this.systemTopicService = systemTopicService;
this.configuredService = configuredService;
if (configuredService instanceof SystemTopicBasedTopicPoliciesService) {
throw new IllegalArgumentException(
"configuredService should not be an instance of SystemTopicBasedTopicPoliciesService");
}
}

@Override
public void start(PulsarService pulsarService) {
// We should not call `systemTopicService.start()`, which just registers a namespace bundle listener to create
// a reader on `<namespace>/__change_events` when the namespace's bundle is loaded firstly. It's just an
// optimization to create the reader before loading any topic. However, it could create a reader on a namespace
// that does not even have the __change_events topic.
configuredService.start(pulsarService);
Comment thread
BewareMyPower marked this conversation as resolved.
}

@Override
public void close() throws Exception {
try {
configuredService.close();
} finally {
systemTopicService.close();
}
}

@Override
public CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type) {
return resolveService(topicName.getNamespaceObject())
.thenCompose(service -> service.getTopicPoliciesAsync(topicName, type));
}

@Override
public CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy,
boolean skipUpdateWhenTopicPolicyDoesntExist,
Consumer<TopicPolicies> policyUpdater) {
return resolveService(topicName.getNamespaceObject())
.thenCompose(service -> service.updateTopicPoliciesAsync(topicName, isGlobalPolicy,
skipUpdateWhenTopicPolicyDoesntExist, policyUpdater));
}

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
return resolveService(topicName.getNamespaceObject())
.thenCompose(service -> service.deleteTopicPoliciesAsync(topicName));
}

@Override
public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName,
boolean keepGlobalPoliciesAfterDeleting) {
return resolveService(topicName.getNamespaceObject())
.thenCompose(service -> service.deleteTopicPoliciesAsync(topicName,
keepGlobalPoliciesAfterDeleting));
}

@Override
public CompletableFuture<Boolean> registerListenerAsync(TopicName topicName, TopicPolicyListener listener) {
return resolveService(topicName.getNamespaceObject())
.thenCompose(service -> service.registerListenerAsync(topicName, listener));
}

@Override
public boolean registerListener(TopicName topicName, TopicPolicyListener listener) {
throw new RuntimeException("should not be called");
}

@Override
public void unregisterListener(TopicName topicName, TopicPolicyListener listener) {
configuredService.unregisterListener(topicName, listener);
systemTopicService.unregisterListener(topicName, listener);
}

@VisibleForTesting
CompletableFuture<TopicPoliciesService> resolveService(NamespaceName namespace) {
return isLegacyNamespace.get(namespace)
.thenApply(isLegacy -> isLegacy ? systemTopicService : configuredService);
}
}
Loading
Loading