diff --git a/pip/pip-469.md b/pip/pip-469.md
index 69036381f0f01..c1976bd104374 100644
--- a/pip/pip-469.md
+++ b/pip/pip-469.md
@@ -105,8 +105,15 @@ Broker startup validates both backends:
- `SystemTopicBasedTopicPoliciesService` must be instantiable.
- The configured `topicPoliciesServiceClassName` must be instantiable.
-If either backend cannot be instantiated or started, broker startup fails. There is no per-request fallback from one
-backend to another.
+`LegacyAwareTopicPoliciesService#start` starts only the configured backend. It intentionally does not call
+`SystemTopicBasedTopicPoliciesService#start`, because that start path registers a namespace-bundle ownership listener
+whose only purpose is to eagerly create a reader on `/__change_events` when a namespace bundle is loaded.
+Under legacy-aware routing, that eager optimization would be counterproductive because it can create readers for
+namespaces that do not have topic policies in `__change_events`. For legacy namespaces, the system-topic reader and
+policy cache are initialized lazily by the routed system-topic backend operations.
+
+If either backend cannot be instantiated, or if the configured backend cannot be started, broker startup fails. There is
+no per-request fallback from one backend to another.
### Namespace-scoped service routing
@@ -118,6 +125,10 @@ backend to another.
the system-topic backend when the system topic exists.
- Routing the same operations to the configured backend when the system topic does not exist.
+Listener registration is routed through `TopicPoliciesService#registerListenerAsync`. This lets the wrapper resolve the
+namespace backend before registering the listener, and the listener is registered only on the selected backend instead
+of being registered on both backends.
+
The system-topic existence check can be cached per namespace in memory, but the routing rule is defined by actual topic
existence rather than by new namespace metadata.
@@ -159,6 +170,11 @@ managed-ledger metadata updates.
### Listener behavior
+`TopicPoliciesService` adds `registerListenerAsync(TopicName, TopicPolicyListener)` for listener registration. The
+existing synchronous `registerListener(TopicName, TopicPolicyListener)` method is retained as a deprecated compatibility
+hook for existing custom implementations, and the default async method delegates to it. Implementations that need async
+routing or initialization, such as `LegacyAwareTopicPoliciesService`, override `registerListenerAsync` directly.
+
The backend registers watchers on both metadata stores:
- A change on the local path re-reads the local node and notifies listeners with the latest local `TopicPolicies` or
@@ -173,6 +189,11 @@ append-only replay log; it relies on metadata-store notifications and read-after
### Public API
+The `TopicPoliciesService` extension point gains a default
+`CompletableFuture registerListenerAsync(TopicName, TopicPolicyListener)` method. Existing implementations
+remain compatible because `registerListener(TopicName, TopicPolicyListener)` is retained and used by the default async
+implementation.
+
No new namespace policy field is introduced.
No new namespace admin REST endpoint or Java admin client method is introduced.
@@ -221,6 +242,10 @@ This upgrade rule is intentionally conservative:
This means some namespaces with an empty but already-created `__change_events` topic may continue using the
system-topic backend. That is acceptable because it avoids missing legacy state.
+Existing custom `TopicPoliciesService` implementations that only implement the synchronous `registerListener` method
+continue to work through the default `registerListenerAsync` bridge. Implementations can override
+`registerListenerAsync` when registration itself needs asynchronous backend resolution or initialization.
+
## Downgrade / Rollback
Rolling back to a broker version that does not understand legacy-aware routing returns topic-policies backend
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index b22f9a4bef827..632df19a302a9 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1750,8 +1750,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
@FieldContext(
category = CATEGORY_SERVER,
- doc = "The class name of the topic policies service. The default config only takes affect when the "
- + "systemTopicEnable config is true"
+ doc = """
+ The class name of the topic policies service. There are 2 built-in implementations:
+ 1. "org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService" (default)
+ It stores a topic's policies in the `__change_events` topic. If `systemTopicEnabled` is false,
+ the topic policies will just be disabled
+ 2. "org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService"
+ It stores a topic's policies in the metadata store. If `systemTopicEnabled` is true and the
+ topic's namespace has a `__change_events` topic, the policies will still be stored in the
+ `__change_events` topic for backward compatibility.
+ """
)
private String topicPoliciesServiceClassName =
"org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService";
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 85bd9f53b1d2c..d6a47b4eb7cd1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -107,6 +107,7 @@
import org.apache.pulsar.broker.rest.Topics;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.HealthChecker;
+import org.apache.pulsar.broker.service.LegacyAwareTopicPoliciesService;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
@@ -2288,8 +2289,16 @@ private TopicPoliciesService initTopicPoliciesService() throws Exception {
return TopicPoliciesService.DISABLED;
}
}
- return (TopicPoliciesService) Reflections.createInstance(className,
+ final var configuredService = (TopicPoliciesService) Reflections.createInstance(className,
Thread.currentThread().getContextClassLoader());
+ if (!config.isSystemTopicEnabled()) {
+ log.info()
+ .attr("className", className)
+ .log("System topic is disabled, using configured topic policies service without legacy routing");
+ return configuredService;
+ }
+ return new LegacyAwareTopicPoliciesService(this, new SystemTopicBasedTopicPoliciesService(this),
+ configuredService);
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 4d29c751f03a9..3ad9eb438104a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -559,7 +559,7 @@ protected boolean isProducersExceeded(boolean isRemote) {
protected void registerTopicPolicyListener() {
brokerService.getPulsar().getTopicPoliciesService()
- .registerListener(TopicName.getPartitionedTopicName(topic), this);
+ .registerListenerAsync(TopicName.getPartitionedTopicName(topic), this);
}
protected void unregisterTopicPolicyListener() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
new file mode 100644
index 0000000000000..20f7b20799128
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesService.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
+import org.apache.pulsar.common.events.EventType;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.jspecify.annotations.NonNull;
+
+/**
+ * Routes topic policy operations to the legacy system-topic backend when a namespace already has
+ * a topic-policy {@code __change_events} system topic, and otherwise to the configured backend.
+ */
+@CustomLog
+public class LegacyAwareTopicPoliciesService implements TopicPoliciesService {
+
+ private final AsyncLoadingCache isLegacyNamespace;
+ @VisibleForTesting
+ final SystemTopicBasedTopicPoliciesService systemTopicService;
+ private final TopicPoliciesService configuredService;
+
+ public LegacyAwareTopicPoliciesService(PulsarService pulsar,
+ SystemTopicBasedTopicPoliciesService systemTopicService,
+ TopicPoliciesService configuredService) {
+ // Generally, we only need to check if the __change_events topic exists once because the __change_events topic
+ // should only be created by broker before the upgrade, where `SystemTopicBasedTopicPoliciesService` is
+ // configured as the topic policies service.
+ this.isLegacyNamespace = Caffeine.newBuilder().expireAfterWrite(Duration.ofHours(1))
+ .buildAsync(new AsyncCacheLoader<>() {
+ @NonNull
+ @Override
+ public CompletableFuture extends Boolean> asyncLoad(NamespaceName key,
+ @NonNull Executor executor) {
+ return NamespaceEventsSystemTopicFactory.checkSystemTopicExists(key, EventType.TOPIC_POLICY,
+ pulsar);
+ }
+ });
+ this.systemTopicService = systemTopicService;
+ this.configuredService = configuredService;
+ if (configuredService instanceof SystemTopicBasedTopicPoliciesService) {
+ throw new IllegalArgumentException(
+ "configuredService should not be an instance of SystemTopicBasedTopicPoliciesService");
+ }
+ }
+
+ @Override
+ public void start(PulsarService pulsarService) {
+ // We should not call `systemTopicService.start()`, which just registers a namespace bundle listener to create
+ // a reader on `/__change_events` when the namespace's bundle is loaded firstly. It's just an
+ // optimization to create the reader before loading any topic. However, it could create a reader on a namespace
+ // that does not even have the __change_events topic.
+ configuredService.start(pulsarService);
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ configuredService.close();
+ } finally {
+ systemTopicService.close();
+ }
+ }
+
+ @Override
+ public CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service -> service.getTopicPoliciesAsync(topicName, type));
+ }
+
+ @Override
+ public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy,
+ boolean skipUpdateWhenTopicPolicyDoesntExist,
+ Consumer policyUpdater) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service -> service.updateTopicPoliciesAsync(topicName, isGlobalPolicy,
+ skipUpdateWhenTopicPolicyDoesntExist, policyUpdater));
+ }
+
+ @Override
+ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service -> service.deleteTopicPoliciesAsync(topicName));
+ }
+
+ @Override
+ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName,
+ boolean keepGlobalPoliciesAfterDeleting) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service -> service.deleteTopicPoliciesAsync(topicName,
+ keepGlobalPoliciesAfterDeleting));
+ }
+
+ @Override
+ public CompletableFuture registerListenerAsync(TopicName topicName, TopicPolicyListener listener) {
+ return resolveService(topicName.getNamespaceObject())
+ .thenCompose(service -> service.registerListenerAsync(topicName, listener));
+ }
+
+ @Override
+ public boolean registerListener(TopicName topicName, TopicPolicyListener listener) {
+ throw new RuntimeException("should not be called");
+ }
+
+ @Override
+ public void unregisterListener(TopicName topicName, TopicPolicyListener listener) {
+ configuredService.unregisterListener(topicName, listener);
+ systemTopicService.unregisterListener(topicName, listener);
+ }
+
+ @VisibleForTesting
+ CompletableFuture resolveService(NamespaceName namespace) {
+ return isLegacyNamespace.get(namespace)
+ .thenApply(isLegacy -> isLegacy ? systemTopicService : configuredService);
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
new file mode 100644
index 0000000000000..10fec22292851
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MetadataStoreTopicPoliciesService.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import lombok.CustomLog;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.namespace.NamespaceService;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.metadata.api.MetadataCache;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
+import org.apache.pulsar.metadata.api.Notification;
+import org.apache.pulsar.metadata.api.NotificationType;
+import org.jspecify.annotations.Nullable;
+
+/**
+ * Topic policies service backed by Pulsar metadata stores.
+ */
+@CustomLog
+public class MetadataStoreTopicPoliciesService implements TopicPoliciesService {
+
+ public static final String GLOBAL_POLICIES_ROOT = "/admin/topic-policies/global";
+ public static final String LOCAL_POLICIES_ROOT = "/admin/topic-policies/local";
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Map> listeners = new ConcurrentHashMap<>();
+ private MetadataCache localPoliciesCache;
+ private MetadataCache globalPoliciesCache;
+
+ @Override
+ public void start(PulsarService pulsar) {
+ MetadataStore localStore = pulsar.getLocalMetadataStore();
+ MetadataStore configurationStore = pulsar.getConfigurationMetadataStore();
+ this.localPoliciesCache = localStore.getMetadataCache(TopicPolicies.class);
+ this.globalPoliciesCache = configurationStore.getMetadataCache(TopicPolicies.class);
+ localStore.registerListener(notification -> handleNotification(notification, false));
+ configurationStore.registerListener(notification -> handleNotification(notification, true));
+ }
+
+ @Override
+ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) {
+ return deleteTopicPoliciesAsync(topicName, false);
+ }
+
+ @Override
+ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName,
+ boolean keepGlobalPoliciesAfterDeleting) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ if (NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject())) {
+ return CompletableFuture.completedFuture(null);
+ }
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(new BrokerServiceException(getClass().getName() + " is closed."));
+ }
+ CompletableFuture deleteLocal =
+ deleteIfExists(localPoliciesCache, pathFor(partitionedTopicName, false));
+ if (keepGlobalPoliciesAfterDeleting) {
+ return deleteLocal;
+ }
+ CompletableFuture deleteGlobal =
+ deleteIfExists(globalPoliciesCache, pathFor(partitionedTopicName, true));
+ return CompletableFuture.allOf(deleteLocal, deleteGlobal);
+ }
+
+ @Override
+ public CompletableFuture updateTopicPoliciesAsync(TopicName topicName, boolean isGlobalPolicy,
+ boolean skipUpdateWhenTopicPolicyDoesntExist,
+ Consumer policyUpdater) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ if (NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject())) {
+ return CompletableFuture.failedFuture(new BrokerServiceException.NotAllowedException(
+ "Not allowed to update topic policy for the heartbeat topic"));
+ }
+ if (closed.get()) {
+ return CompletableFuture.failedFuture(new BrokerServiceException(getClass().getName() + " is closed."));
+ }
+ MetadataCache cache = cache(isGlobalPolicy);
+ String path = pathFor(partitionedTopicName, isGlobalPolicy);
+ CompletableFuture updateFuture;
+ if (skipUpdateWhenTopicPolicyDoesntExist) {
+ updateFuture = cache.readModifyUpdate(path,
+ current -> updatePolicies(Optional.of(current), isGlobalPolicy, policyUpdater));
+ } else {
+ updateFuture = cache.readModifyUpdateOrCreate(path,
+ current -> updatePolicies(current, isGlobalPolicy, policyUpdater));
+ }
+ return updateFuture.thenAccept(__ -> { }).exceptionally(error -> {
+ if (skipUpdateWhenTopicPolicyDoesntExist
+ && FutureUtil.unwrapCompletionException(error) instanceof NotFoundException) {
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(error);
+ });
+ }
+
+ @Override
+ public CompletableFuture> getTopicPoliciesAsync(TopicName topicName, GetType type) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ if (NamespaceService.isHeartbeatNamespace(partitionedTopicName.getNamespaceObject())) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ if (closed.get()) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ boolean global = type == GetType.GLOBAL_ONLY;
+ return cache(global).get(pathFor(partitionedTopicName, global))
+ .thenApply(policies -> policies.map(policy -> cloneWithScope(policy, global)));
+ }
+
+ @Override
+ public boolean registerListener(TopicName topicName, TopicPolicyListener listener) {
+ listeners.compute(normalizeTopicName(topicName), (__, topicListeners) -> {
+ if (topicListeners == null) {
+ topicListeners = new CopyOnWriteArrayList<>();
+ }
+ topicListeners.add(listener);
+ return topicListeners;
+ });
+ return true;
+ }
+
+ @Override
+ public void unregisterListener(TopicName topicName, TopicPolicyListener listener) {
+ listeners.computeIfPresent(normalizeTopicName(topicName), (__, topicListeners) -> {
+ topicListeners.remove(listener);
+ return topicListeners.isEmpty() ? null : topicListeners;
+ });
+ }
+
+ @Override
+ public void close() {
+ if (closed.compareAndSet(false, true)) {
+ listeners.clear();
+ if (localPoliciesCache != null) {
+ localPoliciesCache.invalidateAll();
+ }
+ if (globalPoliciesCache != null) {
+ globalPoliciesCache.invalidateAll();
+ }
+ }
+ }
+
+ private MetadataCache cache(boolean isGlobalPolicy) {
+ return isGlobalPolicy ? globalPoliciesCache : localPoliciesCache;
+ }
+
+ private CompletableFuture deleteIfExists(MetadataCache cache, String path) {
+ return cache.delete(path).handle((__, error) -> {
+ cache.invalidate(path);
+ if (error == null || FutureUtil.unwrapCompletionException(error) instanceof NotFoundException) {
+ return null;
+ }
+ throw FutureUtil.wrapToCompletionException(error);
+ });
+ }
+
+ private static TopicPolicies updatePolicies(Optional currentPolicies,
+ boolean isGlobalPolicy,
+ Consumer policyUpdater) {
+ TopicPolicies policies = currentPolicies.map(TopicPolicies::clone).orElseGet(TopicPolicies::new);
+ policies.setIsGlobal(isGlobalPolicy);
+ policyUpdater.accept(policies);
+ policies.setIsGlobal(isGlobalPolicy);
+ return policies;
+ }
+
+ private void handleNotification(Notification notification, boolean isGlobalPolicy) {
+ if (closed.get()
+ || (notification.getType() != NotificationType.Created
+ && notification.getType() != NotificationType.Modified
+ && notification.getType() != NotificationType.Deleted)) {
+ return;
+ }
+ String path = notification.getPath();
+ String root = isGlobalPolicy ? GLOBAL_POLICIES_ROOT : LOCAL_POLICIES_ROOT;
+ Optional topicName = topicNameFromPath(root, path);
+ if (topicName.isEmpty()) {
+ return;
+ }
+ MetadataCache cache = cache(isGlobalPolicy);
+ cache.invalidate(path);
+ if (notification.getType() == NotificationType.Deleted) {
+ notifyListeners(topicName.get(), null);
+ return;
+ }
+ cache.get(path).whenComplete((policies, error) -> {
+ if (error != null) {
+ log.warn()
+ .attr("path", path)
+ .exception(error)
+ .log("Failed to refresh topic policies after metadata notification");
+ return;
+ }
+ notifyListeners(topicName.get(),
+ policies.map(policy -> cloneWithScope(policy, isGlobalPolicy)).orElse(null));
+ });
+ }
+
+ private void notifyListeners(TopicName topicName, @Nullable TopicPolicies policies) {
+ List topicListeners = listeners.get(topicName);
+ if (topicListeners == null) {
+ return;
+ }
+ for (TopicPolicyListener listener : topicListeners) {
+ try {
+ listener.onUpdate(policies == null ? null : policies.clone());
+ } catch (Throwable error) {
+ log.error().attr("topic", topicName).exception(error).log("Call topic policy listener error");
+ }
+ }
+ }
+
+ private static TopicName normalizeTopicName(TopicName topicName) {
+ return TopicName.get(topicName.getPartitionedTopicName());
+ }
+
+ private static TopicPolicies cloneWithScope(TopicPolicies policies, boolean isGlobalPolicy) {
+ TopicPolicies cloned = policies.clone();
+ cloned.setIsGlobal(isGlobalPolicy);
+ return cloned;
+ }
+
+ @VisibleForTesting
+ public CompletableFuture> getTopicPoliciesDirectFromStore(TopicName topicName,
+ boolean isGlobal) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ String path = pathFor(partitionedTopicName, isGlobal);
+ MetadataCache c = cache(isGlobal);
+ c.invalidate(path);
+ return c.get(path).thenApply(opt -> opt.map(p -> cloneWithScope(p, isGlobal)));
+ }
+
+ @VisibleForTesting
+ static String pathFor(TopicName topicName, boolean isGlobalPolicy) {
+ TopicName partitionedTopicName = normalizeTopicName(topicName);
+ return (isGlobalPolicy ? GLOBAL_POLICIES_ROOT : LOCAL_POLICIES_ROOT)
+ + "/" + partitionedTopicName.getTenant()
+ + "/" + partitionedTopicName.getNamespacePortion()
+ + "/" + partitionedTopicName.getDomain()
+ + "/" + partitionedTopicName.getEncodedLocalName();
+ }
+
+ @VisibleForTesting
+ private static Optional topicNameFromPath(String root, String path) {
+ if (!path.startsWith(root + "/")) {
+ return Optional.empty();
+ }
+ String[] parts = path.substring(root.length() + 1).split("/", 4);
+ if (parts.length != 4) {
+ return Optional.empty();
+ }
+ return Optional.of(TopicName.get(parts[2], parts[0], parts[1], Codec.decode(parts[3])));
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
index 48653883d1a6a..a8f37d0c3894b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java
@@ -667,7 +667,7 @@ protected CompletableFuture> createSystemT
return systemTopicClient.newReaderAsync();
}
- private void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
+ void removeOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
NamespaceName namespace = namespaceBundle.getNamespaceObject();
if (NamespaceService.isHeartbeatNamespace(namespace)) {
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
index 803a18da72d11..5b5fe15723029 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicPoliciesService.java
@@ -96,6 +96,15 @@ default void start(PulsarService pulsar) {
default void close() throws Exception {
}
+
+ /**
+ * @implNote This method is never called unless by the default implementation of
+ * {@link TopicPoliciesService#registerListenerAsync(TopicName, TopicPolicyListener)}, which is actually called
+ * internally. This method is only retained for backward compatibility on custom implementations.
+ */
+ @Deprecated
+ boolean registerListener(TopicName topicName, TopicPolicyListener listener);
+
/**
* Registers a listener for topic policies updates.
*
@@ -106,10 +115,10 @@ default void close() throws Exception {
* guaranteed to be received by the listener.
* In summary, the listener is guaranteed to receive only the latest value.
*
- *
- * @return true if the listener is registered successfully
*/
- boolean registerListener(TopicName topicName, TopicPolicyListener listener);
+ default CompletableFuture registerListenerAsync(TopicName topicName, TopicPolicyListener listener) {
+ return CompletableFuture.completedFuture(registerListener(topicName, listener));
+ }
/**
* Unregister the topic policies listener.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bf795e85c3df7..ed9d0f747f624 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -4901,7 +4901,10 @@ private void updateSubscriptionsDispatcherRateLimiter() {
protected CompletableFuture initTopicPolicy() {
final var topicPoliciesService = brokerService.pulsar().getTopicPoliciesService();
final var partitionedTopicName = TopicName.getPartitionedTopicName(topic);
- if (topicPoliciesService.registerListener(partitionedTopicName, this)) {
+ return topicPoliciesService.registerListenerAsync(partitionedTopicName, this).thenCompose(registered -> {
+ if (!registered) {
+ return CompletableFuture.completedFuture(null);
+ }
if (ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}
@@ -4913,8 +4916,7 @@ protected CompletableFuture initTopicPolicy() {
TopicPoliciesService.GetType.LOCAL_ONLY))
.thenAcceptAsync(optionalPolicies -> optionalPolicies.ifPresent(this::onUpdate),
brokerService.getTopicOrderedExecutor());
- }
- return CompletableFuture.completedFuture(null);
+ });
}
@VisibleForTesting
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
new file mode 100644
index 0000000000000..e7fefa164973a
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/MetadataStoreTopicPoliciesTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.broker.admin;
+
+import org.apache.pulsar.broker.service.MetadataStoreTopicPoliciesService;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-admin")
+public class MetadataStoreTopicPoliciesTest extends TopicPoliciesTest {
+
+ @BeforeClass(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName());
+ super.setup();
+ }
+
+ @Override
+ protected void clearTopicPoliciesCache() {
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testTopicPolicyInitialValueWithNamespaceAlreadyLoaded() throws Exception {
+ // This test is specific to SystemTopicBasedTopicPoliciesService (uses getPoliciesCacheInit).
+ // Not applicable to MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testSystemTopicShouldBeCompacted() throws Exception {
+ // Relies on __change_events system topic, which does not exist with MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testPoliciesCanBeDeletedWithTopic() throws Exception {
+ // Directly accesses __change_events PersistentTopic for compaction.
+ // Not applicable to MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testProduceChangesWithEncryptionRequired() throws Exception {
+ // Checks __change_events LAC, which does not exist with MetadataStoreTopicPoliciesService.
+ }
+
+ @Test(enabled = false)
+ @Override
+ public void testTopicPoliciesAfterCompaction(String reloadPolicyType) throws Exception {
+ // The "Recreate_Service" variant creates a new SystemTopicBasedTopicPoliciesService,
+ // which is not applicable to MetadataStoreTopicPoliciesService.
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index d94aa51b73bfe..5ba979534ef55 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -119,7 +119,9 @@
import org.glassfish.jersey.client.JerseyClientBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -144,10 +146,11 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
private final int testTopicPartitions = 2;
- @BeforeMethod
+ @BeforeClass(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.conf.setDefaultNumberOfNamespaceBundles(1);
+ this.conf.setForceDeleteNamespaceAllowed(true);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
@@ -156,17 +159,48 @@ protected void setup() throws Exception {
admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of("test"));
admin.namespaces().createNamespace(myNamespaceV1);
admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
- Producer> producer = pulsarClient.newProducer().topic(testTopic).create();
- producer.close();
- waitForZooKeeperWatchers();
}
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
super.internalCleanup();
}
+ @BeforeMethod
+ void setupTestTopic() throws Exception {
+ // Recreate namespace to clear any policies set by previous tests
+ try {
+ admin.topics().deletePartitionedTopic(testTopic, true);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // topic may already be deleted
+ }
+ try {
+ admin.namespaces().deleteNamespace(myNamespace, true);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // namespace may already be deleted
+ }
+ try {
+ admin.namespaces().deleteNamespace(myNamespaceV1, true);
+ } catch (PulsarAdminException.NotFoundException e) {
+ // namespace may already be deleted
+ }
+ admin.namespaces().createNamespace(testTenant + "/" + testNamespace, Set.of("test"));
+ admin.namespaces().createNamespace(myNamespaceV1);
+ admin.topics().createPartitionedTopic(testTopic, testTopicPartitions);
+ // Acquire namespace bundle ownership so tests that call getOrCreateTopic() directly succeed.
+ // Without this, services that don't create a __change_events reader (e.g. MetadataStoreTopicPoliciesService)
+ // leave the bundle unowned after namespace recreation and the first broker-side topic load fails.
+ admin.lookups().lookupTopic(testTopic + "-partition-0");
+ }
+
+ @AfterMethod(alwaysRun = true)
+ void afterMethodCleanup() throws Exception{
+ admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "0");
+ admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "0");
+ clearTopicPoliciesCache();
+ }
+
@Test
public void updatePropertiesForAutoCreatedTopicTest() throws Exception {
TopicName topicName = TopicName.get(
@@ -519,8 +553,8 @@ public Object[][] clientRequestType() {
@Test(dataProvider = "clientRequestType")
public void testPriorityOfGlobalPolicies(String clientRequestType) throws Exception {
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
final JerseyClient httpClient = JerseyClientBuilder.createClient();
// create topic and load it up.
final String namespace = myNamespace;
@@ -600,8 +634,8 @@ public void testPriorityOfGlobalPolicies(String clientRequestType) throws Except
@Test(dataProvider = "clientRequestType")
public void testPriorityOfGlobalPolicies2(String clientRequestType) throws Exception {
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
final JerseyClient httpClient = JerseyClientBuilder.createClient();
// create topic and load it up.
final String namespace = myNamespace;
@@ -687,8 +721,8 @@ public void testGlobalPolicyStillAffectsAfterUnloading() throws Exception {
final TopicName topicName = TopicName.get(topic);
admin.topics().createNonPartitionedTopic(topic);
pulsarClient.newProducer().topic(topic).create().close();
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
// Set non-global policy of the limitation of max consumers.
// Set global policy of the limitation of max producers.
@@ -729,8 +763,8 @@ public void testRetentionGlobalPolicyAffects() throws Exception {
final TopicName topicName = TopicName.get(topic);
admin.topics().createNonPartitionedTopic(topic);
pulsarClient.newProducer().topic(topic).create().close();
- final SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+ final TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
// Set non-global policy of the limitation of max consumers.
// Set global policy of the persistence policies.
@@ -2756,10 +2790,8 @@ public void testRemoveSubscribeRate() throws Exception {
@Test
public void testPublishRateInDifferentLevelPolicy() throws Exception {
- cleanup();
- conf.setMaxPublishRatePerTopicInMessages(5);
- conf.setMaxPublishRatePerTopicInBytes(50L);
- setup();
+ admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInMessages", "5");
+ admin.brokers().updateDynamicConfiguration("maxPublishRatePerTopicInBytes", "50");
final String topicName = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
pulsarClient.newProducer().topic(topicName).create().close();
@@ -3050,9 +3082,7 @@ public void testMaxSubscriptionsPerTopicWithExistingSubs() throws Exception {
@Test
public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception {
- cleanup();
- conf.setMaxUnackedMessagesPerSubscription(30);
- setup();
+ restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(30));
final String topic = "persistent://" + myNamespace + "/test-" + UUID.randomUUID();
// init cache
@Cleanup
@@ -3115,6 +3145,9 @@ public void testMaxUnackedMessagesOnSubscriptionPriority() throws Exception {
&& admin.topicPolicies().getMaxUnackedMessagesOnSubscription(topic) == null);
messages = getMsgReceived(consumer1, Integer.MAX_VALUE);
assertEquals(messages.size(), defaultMaxUnackedMsgOnBroker);
+
+ // restore default config
+ restartBroker(conf -> conf.setMaxUnackedMessagesPerSubscription(4 * 50000));
}
private void produceMsg(Producer producer, int msgNum) throws Exception{
@@ -3299,14 +3332,16 @@ public void testGetReplicatorRateApplied() throws Exception {
@Test(timeOut = 30000)
public void testAutoCreationDisabled() throws Exception {
- cleanup();
- conf.setAllowAutoTopicCreation(false);
- setup();
+ admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "false");
+
final String topic = testTopic + UUID.randomUUID();
admin.topics().createPartitionedTopic(topic, 3);
pulsarClient.newProducer().topic(topic).create().close();
//should not fail
assertNull(admin.topicPolicies().getMessageTTL(topic));
+
+ // restore default
+ admin.brokers().updateDynamicConfiguration("allowAutoTopicCreation", "true");
}
@SuppressWarnings("deprecation")
@@ -3431,6 +3466,12 @@ public void testSubscriptionTypesEnabled() throws Exception {
pulsarClient.newConsumer().topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("test")
.subscribe().close();
+
+ // restore dynamic broker config and conf object
+ pulsar.getConfiguration().setSubscriptionTypesEnabled(
+ Set.of("Exclusive", "Shared", "Failover", "Key_Shared"));
+ admin.brokers().updateDynamicConfiguration("subscriptionTypesEnabled",
+ "Exclusive,Shared,Failover,Key_Shared");
}
@Test(timeOut = 20000)
@@ -3765,7 +3806,8 @@ public void testPolicyIsDeleteTogetherAutomatically() throws Exception {
}
@Test
- public void testDoNotCreateSystemTopicForHeartbeatNamespace() {
+ public void testDoNotCreateSystemTopicForHeartbeatNamespace() throws Exception {
+ initEventsTopicAndPartitions();
assertTrue(pulsar.getBrokerService().getTopics().size() > 0);
pulsar.getBrokerService().getTopics().forEach((k, v) -> {
TopicName topicName = TopicName.get(k);
@@ -3826,8 +3868,13 @@ public void testLoopCreateAndDeleteTopicPolicies() throws Exception {
}
private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception {
- PersistentTopic tp =
- (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
+ Optional topicOpt =
+ pulsar.getBrokerService().getTopic(topicName, false).join();
+ if (topicOpt.isEmpty()) {
+ // Topic doesn't exist (e.g., when not using system-topic-based policies service), nothing to compact.
+ return;
+ }
+ PersistentTopic tp = (PersistentTopic) topicOpt.get();
// Wait for the old task finish.
Awaitility.await().untilAsserted(() -> {
CompletableFuture compactionTask = WhiteboxImpl.getInternalState(tp, "currentCompaction");
@@ -3846,7 +3893,7 @@ private void triggerAndWaitNewTopicCompaction(String topicName) throws Exception
* It is not a thread safety method, something will go to a wrong pointer if there is a task is trying to load a
* topic policies.
*/
- private void clearTopicPoliciesCache() {
+ protected void clearTopicPoliciesCache() {
TopicPoliciesService topicPoliciesService = pulsar.getTopicPoliciesService();
if (topicPoliciesService instanceof TopicPoliciesService.TopicPoliciesServiceDisabled) {
return;
@@ -4076,8 +4123,8 @@ public void testGlobalTopicPolicies() throws Exception {
.isNull());
admin.topicPolicies(true).setRetention(topic, new RetentionPolicies(1,
2));
- SystemTopicBasedTopicPoliciesService topicPoliciesService =
- (SystemTopicBasedTopicPoliciesService) pulsar.getTopicPoliciesService();
+ TopicPoliciesService topicPoliciesService =
+ pulsar.getTopicPoliciesService();
// check global topic policies can be added correctly.
Awaitility.await().untilAsserted(() -> assertNotNull(
@@ -4121,6 +4168,7 @@ public void testGlobalTopicPolicies() throws Exception {
@Test
public void testMaxMessageSizeWithChunking() throws Exception {
+ final var maxMessageSize = this.conf.getMaxMessageSize();
this.conf.setMaxMessageSize(1000);
@Cleanup
@@ -4149,6 +4197,7 @@ public void testMaxMessageSizeWithChunking() throws Exception {
// chunk message send success
producer.send(new byte[2000]);
+ this.conf.setMaxMessageSize(maxMessageSize);
}
@Test(timeOut = 30000)
@@ -4202,6 +4251,7 @@ public void testGetTopicPoliciesWhenDeleteTopicPolicy() throws Exception {
@Test
public void testProduceChangesWithEncryptionRequired() throws Exception {
+ initEventsTopicAndPartitions();
final String beforeLac = admin.topics().getInternalStats(topicPolicyEventsTopic).lastConfirmedEntry;
admin.namespaces().setEncryptionRequiredStatus(myNamespace, true);
// just an update to trigger writes on __change_events
@@ -4657,4 +4707,10 @@ public void testGetAppliedOffloadPoliciesWithLegacyNamespacePolicies() throws Ex
assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), (Long) (1024 * 1024 * 10L),
"Should inherit offload threshold from legacy namespace policy");
}
+
+ private void initEventsTopicAndPartitions() throws Exception {
+ try (Producer> producer = pulsarClient.newProducer().topic(testTopic).create()) {
+ // No-op. Creating the producer initializes the events topic and partitions.
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
new file mode 100644
index 0000000000000..47a7de0528de9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/LegacyAwareTopicPoliciesServiceTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ThrowingRunnable;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test order: testUpgrade() -> other tests (with MetadataStoreTopicPoliciesService configured) -> testDowngrade().
+ */
+@Test(groups = "broker")
+public class LegacyAwareTopicPoliciesServiceTest extends MockedPulsarServiceBaseTest {
+
+ private static final String metaNamespace = "public/meta-ns";
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ super.setupDefaultTenantAndNamespace();
+ }
+
+ @AfterClass
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test(priority = -1)
+ public void testUpgrade() throws Exception {
+ final var topic = "test-upgrade";
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setCompactionThreshold(topic, 100);
+ waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100));
+
+ restartBroker(conf -> {
+ conf.setSystemTopicEnabled(false);
+ conf.setTopicPoliciesServiceClassName(MetadataStoreTopicPoliciesService.class.getName());
+ });
+ // The policies will be lost because when system topic is disabled, it will not try to read policies from the
+ // __change_events topic
+ assertNull(admin.topicPolicies().getCompactionThreshold(topic));
+
+ restartBroker(conf -> conf.setSystemTopicEnabled(true));
+ // The default namespace still read policies from the __change_events topic if it exists
+ assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 100);
+ assertFalse(pulsar.getLocalMetadataStore().exists(MetadataStoreTopicPoliciesService.LOCAL_POLICIES_ROOT).get());
+
+ // The global policies are still stored in the __change_events topic
+ admin.topicPolicies(true).setCompactionThreshold(topic, 200);
+ waitUntilAssert(() -> assertEquals(admin.topicPolicies(true).getCompactionThreshold(topic), 200));
+ assertFalse(pulsar.getConfigurationMetadataStore()
+ .exists(MetadataStoreTopicPoliciesService.GLOBAL_POLICIES_ROOT).get());
+
+ admin.topicPolicies().deleteTopicPolicies(topic);
+ waitUntilAssert(() -> assertNull(admin.topicPolicies().getCompactionThreshold(topic)));
+
+ admin.namespaces().createNamespace(metaNamespace);
+ }
+
+ @Test(priority = 1)
+ public void testDowngrade() throws Exception {
+ final var topic1 = "downgrade"; // in default namespace
+ admin.topics().createNonPartitionedTopic(topic1);
+ admin.topicPolicies().setCompactionThreshold(topic1, 1);
+ waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1));
+
+ final var topic2 = metaNamespace + "/downgrade";
+ admin.topics().createNonPartitionedTopic(topic2);
+ admin.topicPolicies().setCompactionThreshold(topic2, 2);
+ waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic2), 2));
+
+ restartBroker(conf ->
+ conf.setTopicPoliciesServiceClassName(SystemTopicBasedTopicPoliciesService.class.getName()));
+ assertEquals(admin.topicPolicies().getCompactionThreshold(topic1), 1);
+ // The policies will be lost because they are not stored in the __change_events topic
+ assertNull(admin.topicPolicies().getCompactionThreshold(topic2));
+ }
+
+ @DataProvider
+ public Object[][] namespaces() {
+ return new Object[][] {
+ { "public/default" },
+ { metaNamespace }
+ };
+ }
+
+ @Test(dataProvider = "namespaces")
+ public void testPoliciesOperations(String namespace) throws Exception {
+ final var topicName = TopicName.get(namespace + "/test-policies-operations");
+ final var topic = topicName.toString();
+ admin.topics().createNonPartitionedTopic(topic);
+
+ final var compactionThreshold = new AtomicLong(0);
+ // Verify the exception thrown from one listener does not affect other listeners
+ pulsar.getTopicPoliciesService().registerListenerAsync(topicName, __ -> {
+ throw new RuntimeException("injected failure");
+ }).get();
+ pulsar.getTopicPoliciesService().registerListenerAsync(topicName, policies ->
+ Optional.ofNullable(policies).map(TopicPolicies::getCompactionThreshold).ifPresentOrElse(
+ compactionThreshold::set, () -> compactionThreshold.set(-1))).get();
+
+ // Verify Created events are handled
+ admin.topicPolicies(false).setCompactionThreshold(topic, 100);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 100));
+ final var localStore = pulsar.getLocalMetadataStore();
+ final var configurationStore = pulsar.getConfigurationMetadataStore();
+
+ if (namespace.equals(metaNamespace)) {
+ assertTrue(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, false)).get());
+ assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, true)).get());
+ }
+
+ admin.topicPolicies(true).setCompactionThreshold(topic, 200);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 200));
+ if (namespace.equals(metaNamespace)) {
+ assertTrue(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, true)).get());
+ }
+
+ // Verify Modified events are handled
+ admin.topicPolicies(false).setCompactionThreshold(topic, 300);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 300));
+
+ admin.topicPolicies(true).setCompactionThreshold(topic, 400);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), 400));
+
+ final var readerNamespaces = ((LegacyAwareTopicPoliciesService) pulsar.getTopicPoliciesService())
+ .systemTopicService.getReaderCaches().keySet();
+ assertFalse(readerNamespaces.contains(NamespaceName.get(metaNamespace)));
+
+ // Verify Deleted events are handled
+ admin.topicPolicies(false).deleteTopicPolicies(topic);
+ waitUntilAssert(() -> assertEquals(compactionThreshold.get(), -1));
+ if (namespace.equals(metaNamespace)) {
+ assertFalse(localStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, false)).get());
+ assertFalse(configurationStore.exists(MetadataStoreTopicPoliciesService.pathFor(topicName, true)).get());
+ }
+ }
+
+ @Test
+ public void testUserCreatedEventsTopicAreIgnored() throws Exception {
+ final var topic = TopicName.get(metaNamespace + "/" + System.currentTimeMillis()).toString();
+ admin.topics().createNonPartitionedTopic(topic);
+ admin.topicPolicies().setCompactionThreshold(topic, 1);
+ waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1));
+
+ final var eventsTopic = metaNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
+ admin.topics().createNonPartitionedTopic(eventsTopic);
+ // Even if the __change_events topic is created, since it has detected the namespace didn't have the events
+ // topic before, it will be ignored and the policies are still read from metadata store.
+ waitUntilAssert(() -> assertEquals(admin.topicPolicies().getCompactionThreshold(topic), 1));
+ admin.topics().delete(eventsTopic);
+ }
+
+ private static void waitUntilAssert(ThrowingRunnable assertion) {
+ Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(assertion);
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index d02154c81781a..2e8ca8cebec88 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -109,7 +109,7 @@ public void onUpdate(TopicPolicies data) {
CompletableFuture f = CompletableFuture.completedFuture(null).thenRunAsync(() -> {
for (int i = 0; i < 100; i++) {
TopicPolicyListener listener = new TopicPolicyListenerImpl();
- systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+ systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener);
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
@@ -118,7 +118,7 @@ public void onUpdate(TopicPolicies data) {
for (int i = 0; i < 100; i++) {
TopicPolicyListener listener = new TopicPolicyListenerImpl();
- systemTopicBasedTopicPoliciesService.registerListener(topicName, listener);
+ systemTopicBasedTopicPoliciesService.registerListenerAsync(topicName, listener);
Assert.assertNotNull(systemTopicBasedTopicPoliciesService.listeners.get(topicName));
Assert.assertTrue(systemTopicBasedTopicPoliciesService.listeners.get(topicName).size() >= 1);
systemTopicBasedTopicPoliciesService.unregisterListener(topicName, listener);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
index 6b9735d59b21a..7e9c697fb5d03 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicPolicyTestUtils.java
@@ -72,6 +72,13 @@ public static TopicPolicies getGlobalTopicPolicies(TopicPoliciesService topicPol
public static Optional getTopicPoliciesBypassCache(TopicPoliciesService topicPoliciesService,
TopicName topicName, boolean isGlobal)
throws Exception {
+ if (topicPoliciesService instanceof LegacyAwareTopicPoliciesService legacyService) {
+ TopicPoliciesService resolved = legacyService.resolveService(topicName.getNamespaceObject()).get();
+ return getTopicPoliciesBypassCache(resolved, topicName, isGlobal);
+ }
+ if (topicPoliciesService instanceof MetadataStoreTopicPoliciesService metadataStoreService) {
+ return metadataStoreService.getTopicPoliciesDirectFromStore(topicName, isGlobal).get();
+ }
@Cleanup final var reader = ((SystemTopicBasedTopicPoliciesService) topicPoliciesService)
.getNamespaceEventsSystemTopicFactory()
.createTopicPoliciesSystemTopicClient(topicName.getNamespaceObject())