From 90e5e4580cd70aca8949265c7223a7192e8caf42 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 31 Mar 2026 23:21:21 +0800 Subject: [PATCH 1/5] [fix][broker]The partitions of the topic was wrongly created even though this cluster was not allowed to access it --- .../admin/impl/PersistentTopicsBase.java | 50 +++++++----- .../pulsar/broker/service/BrokerService.java | 76 ++++++++++++++++--- .../OneWayReplicatorUsingGlobalZKTest.java | 7 ++ 3 files changed, 102 insertions(+), 31 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 5f110f503abb9..ea23e17b9248b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -550,31 +550,39 @@ private CompletableFuture> getReplicationClusters() { } protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { - getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { - if (metadata != null && metadata.partitions > 0) { - validateNamespaceOperationAsync(topicName.getNamespaceObject(), + pulsar().getBrokerService().isAllowedCurrentClusterAccess(topicName).thenAccept(allowed -> { + if (!allowed) { + resumeAsyncResponseExceptionally(asyncResponse, + new RestException(Status.BAD_REQUEST, String.format("Topic [%s] is not allowed to be loaded" + + " up on the current cluster, please recheck replication polices.", topicName))); + } + getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { + if (metadata != null && metadata.partitions > 0) { + validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC) - .thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(e -> { + .thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(e -> { + log.error() + .attr("topic", topicName) + .log("Failed to create partitions for topic"); + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); + } else { + resumeAsyncResponseExceptionally(asyncResponse, + new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName))); + } + }).exceptionally(ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { log.error() - .attr("topic", topicName) - .log("Failed to create partitions for topic"); - resumeAsyncResponseExceptionally(asyncResponse, e); - return null; - }); - } else { - throw new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName)); - } - }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error() .attr("topic", topicName) .log("Failed to create partitions for topic"); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); - return null; + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 366e19dd7a57f..b1b63ac7003b8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -94,11 +94,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -2076,29 +2079,82 @@ private CompletableFuture checkTopicAlreadyMigrated(TopicName topicName) { return result; } - public CompletableFuture getManagedLedgerConfig(@NonNull TopicName topicName) { + /** + * @return Triple [namespace policies, global topic policies, topic policies]. + */ + public CompletableFuture isAllowedCurrentClusterAccess(@NonNull TopicName topicName) { + final String cluster = getPulsar().getConfig().getClusterName(); + return getCombinedTopicPolicies(topicName).thenApply(triple -> { + Optional topicP = triple.getRight(); + Optional globalTopicP = triple.getMiddle(); + Optional nsPolicies = triple.getLeft(); + // Disabled a cluster for a namespace manually. + if (nsPolicies.isPresent() && !nsPolicies.get().allowed_clusters.isEmpty() + && !nsPolicies.get().allowed_clusters.contains(cluster)) { + return false; + } + // Manually enabled topic-level replication, which can skip to set a namespace-level replication. + if (topicP.isPresent() && CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) { + if (topicP.get().getReplicationClusters().contains(cluster)) { + return true; + } else { + return false; + } + } + if (globalTopicP.isPresent() && CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) { + if (globalTopicP.get().getReplicationClusters().contains(cluster)) { + return true; + } else { + return false; + } + } + // No settings for replication/allowed_clusters. + if (nsPolicies.isEmpty()) { + return true; + } + // Namespace level settings. + return nsPolicies.get().replication_clusters.isEmpty() + || nsPolicies.get().replication_clusters.contains(cluster); + }); + } + + /** + * @return Triple [namespace policies, global topic policies, topic policies]. + */ + public CompletableFuture, Optional, Optional>> + getCombinedTopicPolicies(@NonNull TopicName topicName) { if (topicName == null) { return FutureUtil.failedFuture(new NullPointerException("topicName")); } NamespaceName namespace = topicName.getNamespaceObject(); - ServiceConfiguration serviceConfig = pulsar.getConfiguration(); - NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); - LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); final CompletableFuture> topicPoliciesFuture = getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY); final CompletableFuture> globalTopicPoliciesFuture = getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.GLOBAL_ONLY); final CompletableFuture> nsPolicies = nsr.getPoliciesAsync(namespace); - final CompletableFuture> lcPolicies = lpr.getLocalPoliciesAsync(namespace); return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture, (topicP, globalTopicP) -> { return new ImmutablePair<>(topicP, globalTopicP); }).thenCombine(nsPolicies, (topicPoliciesPair, np) -> { - return new ImmutablePair<>(topicPoliciesPair, np); - }).thenCombine(lcPolicies, (combined, localPolicies) -> { - Optional topicP = combined.getLeft().getLeft(); - Optional globalTopicP = combined.getLeft().getRight(); - Optional policies = combined.getRight(); + Optional topicP = topicPoliciesPair.getLeft(); + Optional globalTopicP = topicPoliciesPair.getRight(); + return new ImmutableTriple<>(np, globalTopicP, topicP); + }); + } + + public CompletableFuture getManagedLedgerConfig(@NonNull TopicName topicName) { + if (topicName == null) { + return FutureUtil.failedFuture(new NullPointerException("topicName")); + } + NamespaceName namespace = topicName.getNamespaceObject(); + ServiceConfiguration serviceConfig = pulsar.getConfiguration(); + + LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); + final CompletableFuture> lcPolicies = lpr.getLocalPoliciesAsync(namespace); + return getCombinedTopicPolicies(topicName).thenCombine(lcPolicies, (combined, localPolicies) -> { + Optional policies = combined.getLeft(); + Optional globalTopicP = combined.getMiddle(); + Optional topicP = combined.getRight(); PersistencePolicies persistencePolicies = null; RetentionPolicies retentionPolicies = null; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 2a9b0ac489692..1a20b1c667654 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -149,6 +149,13 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception { }); waitReplicatorStopped(subTopic, pulsar1, pulsar2, true); + try { + admin2.topics().createMissedPartitions(topicName); + fail("The action that creates mission partitions should have thrown exception"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("is not allowed to be loaded up")); + } + // Remove global policy. admin1.topicPolicies(true).removeReplicationClusters(topicName); Producer producer2 = client1.newProducer().topic(topicName).create(); From 877b73414ac1155554610d9a2fad7640cc045615 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 1 Apr 2026 15:29:13 +0800 Subject: [PATCH 2/5] address comments --- .../admin/impl/PersistentTopicsBase.java | 2 +- .../pulsar/broker/service/BrokerService.java | 72 +++++++++---------- 2 files changed, 34 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index ea23e17b9248b..bb642e35383e3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -550,7 +550,7 @@ private CompletableFuture> getReplicationClusters() { } protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { - pulsar().getBrokerService().isAllowedCurrentClusterAccess(topicName).thenAccept(allowed -> { + pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed -> { if (!allowed) { resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.BAD_REQUEST, String.format("Topic [%s] is not allowed to be loaded" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index b1b63ac7003b8..8a7134caef550 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2079,45 +2079,6 @@ private CompletableFuture checkTopicAlreadyMigrated(TopicName topicName) { return result; } - /** - * @return Triple [namespace policies, global topic policies, topic policies]. - */ - public CompletableFuture isAllowedCurrentClusterAccess(@NonNull TopicName topicName) { - final String cluster = getPulsar().getConfig().getClusterName(); - return getCombinedTopicPolicies(topicName).thenApply(triple -> { - Optional topicP = triple.getRight(); - Optional globalTopicP = triple.getMiddle(); - Optional nsPolicies = triple.getLeft(); - // Disabled a cluster for a namespace manually. - if (nsPolicies.isPresent() && !nsPolicies.get().allowed_clusters.isEmpty() - && !nsPolicies.get().allowed_clusters.contains(cluster)) { - return false; - } - // Manually enabled topic-level replication, which can skip to set a namespace-level replication. - if (topicP.isPresent() && CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) { - if (topicP.get().getReplicationClusters().contains(cluster)) { - return true; - } else { - return false; - } - } - if (globalTopicP.isPresent() && CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) { - if (globalTopicP.get().getReplicationClusters().contains(cluster)) { - return true; - } else { - return false; - } - } - // No settings for replication/allowed_clusters. - if (nsPolicies.isEmpty()) { - return true; - } - // Namespace level settings. - return nsPolicies.get().replication_clusters.isEmpty() - || nsPolicies.get().replication_clusters.contains(cluster); - }); - } - /** * @return Triple [namespace policies, global topic policies, topic policies]. */ @@ -4192,6 +4153,39 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory this.pulsarChannelInitFactory = factory; } + /** + * @return Triple [namespace policies, global topic policies, topic policies]. + */ + public CompletableFuture isCurrentClusterAllowed(@NonNull TopicName topicName) { + final String cluster = getPulsar().getConfig().getClusterName(); + return getCombinedTopicPolicies(topicName).thenApply(triple -> { + Optional topicP = triple.getRight(); + Optional globalTopicP = triple.getMiddle(); + Optional nsPolicies = triple.getLeft(); + // Disabled a cluster for a namespace manually. + if (nsPolicies.isPresent() && !isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) { + return false; + } + // Manually enabled topic-level replication, which can skip to set a namespace-level replication. + if (topicP.isPresent() && CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) { + if (topicP.get().getReplicationClusters().contains(cluster)) { + return true; + } else { + return false; + } + } + if (globalTopicP.isPresent() && CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) { + if (globalTopicP.get().getReplicationClusters().contains(cluster)) { + return true; + } else { + return false; + } + } + // No settings for replication/allowed_clusters. + return nsPolicies.isEmpty(); + }); + } + /*** * After PIP-321 Introduce allowed-cluster at the namespace level, the condition that whether the cluster is * allowed to access by the current cluster was defined by two fields: From f3907fe623919b1f531367f856728899e901d086 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 22 Apr 2026 11:46:23 +0800 Subject: [PATCH 3/5] fix issues --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8a7134caef550..1f8bdfb3c707f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -4181,8 +4181,7 @@ public CompletableFuture isCurrentClusterAllowed(@NonNull TopicName top return false; } } - // No settings for replication/allowed_clusters. - return nsPolicies.isEmpty(); + return true; }); } From 2fe9beca4d768cae439aff8d834e7b6898b47f1b Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 23 Apr 2026 21:33:24 +0800 Subject: [PATCH 4/5] address comments --- .../admin/impl/PersistentTopicsBase.java | 22 +++++++++++++------ .../OneWayReplicatorUsingGlobalZKTest.java | 2 ++ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index bb642e35383e3..afaedfe9822d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.ws.rs.WebApplicationException; @@ -550,11 +551,21 @@ private CompletableFuture> getReplicationClusters() { } protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { + Consumer errorHandler = ex -> { + // If the exception is not redirect exception we need to log it. + if (!isRedirectException(ex)) { + log.error() + .attr("topic", topicName) + .log("Failed to create partitions for topic"); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + }; pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed -> { if (!allowed) { resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.BAD_REQUEST, String.format("Topic [%s] is not allowed to be loaded" + " up on the current cluster, please recheck replication polices.", topicName))); + return; } getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { if (metadata != null && metadata.partitions > 0) { @@ -574,15 +585,12 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName))); } }).exceptionally(ex -> { - // If the exception is not redirect exception we need to log it. - if (!isRedirectException(ex)) { - log.error() - .attr("topic", topicName) - .log("Failed to create partitions for topic"); - } - resumeAsyncResponseExceptionally(asyncResponse, ex); + errorHandler.accept(ex); return null; }); + }).exceptionally(ex -> { + errorHandler.accept(ex); + return null; }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 1a20b1c667654..708583fd62e04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -155,6 +155,8 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception { } catch (Exception e) { assertTrue(e.getMessage().contains("is not allowed to be loaded up")); } + assertFalse(admin2.topics().getList(replicatedNamespace) + .contains(TopicName.get(topicName).getPartition(0).toString())); // Remove global policy. admin1.topicPolicies(true).removeReplicationClusters(topicName); From 8d9cabceb8d926be73d196d80a21a4f3a5bf263d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Wed, 6 May 2026 23:20:03 +0800 Subject: [PATCH 5/5] fix stale java doc --- .../java/org/apache/pulsar/broker/service/BrokerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 1f8bdfb3c707f..ae8fba51f98e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -4154,7 +4154,7 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory } /** - * @return Triple [namespace policies, global topic policies, topic policies]. + * @return CompletableFuture, whether the current cluster is allowed to access the topic. */ public CompletableFuture isCurrentClusterAllowed(@NonNull TopicName topicName) { final String cluster = getPulsar().getConfig().getClusterName();