diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 5f110f503abb9..afaedfe9822d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; import javax.ws.rs.WebApplicationException; @@ -550,23 +551,7 @@ private CompletableFuture> getReplicationClusters() { } protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { - getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { - if (metadata != null && metadata.partitions > 0) { - validateNamespaceOperationAsync(topicName.getNamespaceObject(), - NamespaceOperation.CREATE_TOPIC) - .thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { - asyncResponse.resume(Response.noContent().build()); - }).exceptionally(e -> { - log.error() - .attr("topic", topicName) - .log("Failed to create partitions for topic"); - resumeAsyncResponseExceptionally(asyncResponse, e); - return null; - }); - } else { - throw new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName)); - } - }).exceptionally(ex -> { + Consumer errorHandler = ex -> { // If the exception is not redirect exception we need to log it. if (!isRedirectException(ex)) { log.error() @@ -574,6 +559,37 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { .log("Failed to create partitions for topic"); } resumeAsyncResponseExceptionally(asyncResponse, ex); + }; + pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed -> { + if (!allowed) { + resumeAsyncResponseExceptionally(asyncResponse, + new RestException(Status.BAD_REQUEST, String.format("Topic [%s] is not allowed to be loaded" + + " up on the current cluster, please recheck replication polices.", topicName))); + return; + } + getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { + if (metadata != null && metadata.partitions > 0) { + validateNamespaceOperationAsync(topicName.getNamespaceObject(), + NamespaceOperation.CREATE_TOPIC) + .thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> { + asyncResponse.resume(Response.noContent().build()); + }).exceptionally(e -> { + log.error() + .attr("topic", topicName) + .log("Failed to create partitions for topic"); + resumeAsyncResponseExceptionally(asyncResponse, e); + return null; + }); + } else { + resumeAsyncResponseExceptionally(asyncResponse, + new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName))); + } + }).exceptionally(ex -> { + errorHandler.accept(ex); + return null; + }); + }).exceptionally(ex -> { + errorHandler.accept(ex); return null; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 366e19dd7a57f..ae8fba51f98e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -94,11 +94,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.ImmutableTriple; import org.apache.commons.lang3.tuple.Pair; +import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -2076,29 +2079,43 @@ private CompletableFuture checkTopicAlreadyMigrated(TopicName topicName) { return result; } - public CompletableFuture getManagedLedgerConfig(@NonNull TopicName topicName) { + /** + * @return Triple [namespace policies, global topic policies, topic policies]. + */ + public CompletableFuture, Optional, Optional>> + getCombinedTopicPolicies(@NonNull TopicName topicName) { if (topicName == null) { return FutureUtil.failedFuture(new NullPointerException("topicName")); } NamespaceName namespace = topicName.getNamespaceObject(); - ServiceConfiguration serviceConfig = pulsar.getConfiguration(); - NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); - LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); final CompletableFuture> topicPoliciesFuture = getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY); final CompletableFuture> globalTopicPoliciesFuture = getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.GLOBAL_ONLY); final CompletableFuture> nsPolicies = nsr.getPoliciesAsync(namespace); - final CompletableFuture> lcPolicies = lpr.getLocalPoliciesAsync(namespace); return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture, (topicP, globalTopicP) -> { return new ImmutablePair<>(topicP, globalTopicP); }).thenCombine(nsPolicies, (topicPoliciesPair, np) -> { - return new ImmutablePair<>(topicPoliciesPair, np); - }).thenCombine(lcPolicies, (combined, localPolicies) -> { - Optional topicP = combined.getLeft().getLeft(); - Optional globalTopicP = combined.getLeft().getRight(); - Optional policies = combined.getRight(); + Optional topicP = topicPoliciesPair.getLeft(); + Optional globalTopicP = topicPoliciesPair.getRight(); + return new ImmutableTriple<>(np, globalTopicP, topicP); + }); + } + + public CompletableFuture getManagedLedgerConfig(@NonNull TopicName topicName) { + if (topicName == null) { + return FutureUtil.failedFuture(new NullPointerException("topicName")); + } + NamespaceName namespace = topicName.getNamespaceObject(); + ServiceConfiguration serviceConfig = pulsar.getConfiguration(); + + LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); + final CompletableFuture> lcPolicies = lpr.getLocalPoliciesAsync(namespace); + return getCombinedTopicPolicies(topicName).thenCombine(lcPolicies, (combined, localPolicies) -> { + Optional policies = combined.getLeft(); + Optional globalTopicP = combined.getMiddle(); + Optional topicP = combined.getRight(); PersistencePolicies persistencePolicies = null; RetentionPolicies retentionPolicies = null; @@ -4136,6 +4153,38 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory this.pulsarChannelInitFactory = factory; } + /** + * @return CompletableFuture, whether the current cluster is allowed to access the topic. + */ + public CompletableFuture isCurrentClusterAllowed(@NonNull TopicName topicName) { + final String cluster = getPulsar().getConfig().getClusterName(); + return getCombinedTopicPolicies(topicName).thenApply(triple -> { + Optional topicP = triple.getRight(); + Optional globalTopicP = triple.getMiddle(); + Optional nsPolicies = triple.getLeft(); + // Disabled a cluster for a namespace manually. + if (nsPolicies.isPresent() && !isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) { + return false; + } + // Manually enabled topic-level replication, which can skip to set a namespace-level replication. + if (topicP.isPresent() && CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) { + if (topicP.get().getReplicationClusters().contains(cluster)) { + return true; + } else { + return false; + } + } + if (globalTopicP.isPresent() && CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) { + if (globalTopicP.get().getReplicationClusters().contains(cluster)) { + return true; + } else { + return false; + } + } + return true; + }); + } + /*** * After PIP-321 Introduce allowed-cluster at the namespace level, the condition that whether the cluster is * allowed to access by the current cluster was defined by two fields: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index 2a9b0ac489692..708583fd62e04 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -149,6 +149,15 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception { }); waitReplicatorStopped(subTopic, pulsar1, pulsar2, true); + try { + admin2.topics().createMissedPartitions(topicName); + fail("The action that creates mission partitions should have thrown exception"); + } catch (Exception e) { + assertTrue(e.getMessage().contains("is not allowed to be loaded up")); + } + assertFalse(admin2.topics().getList(replicatedNamespace) + .contains(TopicName.get(topicName).getPartition(0).toString())); + // Remove global policy. admin1.topicPolicies(true).removeReplicationClusters(topicName); Producer producer2 = client1.newProducer().topic(topicName).create();