Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -550,30 +551,45 @@ private CompletableFuture<Set<String>> getReplicationClusters() {
}

protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null && metadata.partitions > 0) {
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC)
.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error()
.attr("topic", topicName)
.log("Failed to create partitions for topic");
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
} else {
throw new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName));
}
}).exceptionally(ex -> {
Consumer<Throwable> errorHandler = ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error()
.attr("topic", topicName)
.log("Failed to create partitions for topic");
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
};
pulsar().getBrokerService().isCurrentClusterAllowed(topicName).thenAccept(allowed -> {
Comment thread
poorbarcode marked this conversation as resolved.
if (!allowed) {
resumeAsyncResponseExceptionally(asyncResponse,
Comment thread
poorbarcode marked this conversation as resolved.
new RestException(Status.BAD_REQUEST, String.format("Topic [%s] is not allowed to be loaded"
+ " up on the current cluster, please recheck replication polices.", topicName)));
return;
}
getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
if (metadata != null && metadata.partitions > 0) {
validateNamespaceOperationAsync(topicName.getNamespaceObject(),
NamespaceOperation.CREATE_TOPIC)
.thenCompose(__ -> tryCreatePartitionsAsync(metadata.partitions)).thenAccept(v -> {
asyncResponse.resume(Response.noContent().build());
}).exceptionally(e -> {
log.error()
.attr("topic", topicName)
.log("Failed to create partitions for topic");
resumeAsyncResponseExceptionally(asyncResponse, e);
return null;
});
} else {
resumeAsyncResponseExceptionally(asyncResponse,
new RestException(Status.NOT_FOUND, String.format("Topic %s does not exist", topicName)));
}
}).exceptionally(ex -> {
errorHandler.accept(ex);
return null;
});
}).exceptionally(ex -> {
errorHandler.accept(ex);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,14 @@
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.NonAppendableLedgerOffloader;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.ImmutableTriple;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -2076,29 +2079,43 @@ private CompletableFuture<Void> checkTopicAlreadyMigrated(TopicName topicName) {
return result;
}

public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull TopicName topicName) {
/**
* @return Triple [namespace policies, global topic policies, topic policies].
*/
public CompletableFuture<Triple<Optional<Policies>, Optional<TopicPolicies>, Optional<TopicPolicies>>>
getCombinedTopicPolicies(@NonNull TopicName topicName) {
if (topicName == null) {
return FutureUtil.failedFuture(new NullPointerException("topicName"));
}
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();

NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources();
LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies();
final CompletableFuture<Optional<TopicPolicies>> topicPoliciesFuture =
getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.LOCAL_ONLY);
final CompletableFuture<Optional<TopicPolicies>> globalTopicPoliciesFuture =
getTopicPoliciesBypassSystemTopic(topicName, TopicPoliciesService.GetType.GLOBAL_ONLY);
final CompletableFuture<Optional<Policies>> nsPolicies = nsr.getPoliciesAsync(namespace);
final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace);
return topicPoliciesFuture.thenCombine(globalTopicPoliciesFuture, (topicP, globalTopicP) -> {
return new ImmutablePair<>(topicP, globalTopicP);
}).thenCombine(nsPolicies, (topicPoliciesPair, np) -> {
return new ImmutablePair<>(topicPoliciesPair, np);
}).thenCombine(lcPolicies, (combined, localPolicies) -> {
Optional<TopicPolicies> topicP = combined.getLeft().getLeft();
Optional<TopicPolicies> globalTopicP = combined.getLeft().getRight();
Optional<Policies> policies = combined.getRight();
Optional<TopicPolicies> topicP = topicPoliciesPair.getLeft();
Optional<TopicPolicies> globalTopicP = topicPoliciesPair.getRight();
return new ImmutableTriple<>(np, globalTopicP, topicP);
});
}

public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@NonNull TopicName topicName) {
if (topicName == null) {
return FutureUtil.failedFuture(new NullPointerException("topicName"));
}
NamespaceName namespace = topicName.getNamespaceObject();
ServiceConfiguration serviceConfig = pulsar.getConfiguration();

LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies();
final CompletableFuture<Optional<LocalPolicies>> lcPolicies = lpr.getLocalPoliciesAsync(namespace);
return getCombinedTopicPolicies(topicName).thenCombine(lcPolicies, (combined, localPolicies) -> {
Optional<Policies> policies = combined.getLeft();
Optional<TopicPolicies> globalTopicP = combined.getMiddle();
Optional<TopicPolicies> topicP = combined.getRight();

PersistencePolicies persistencePolicies = null;
RetentionPolicies retentionPolicies = null;
Expand Down Expand Up @@ -4136,6 +4153,38 @@ public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory
this.pulsarChannelInitFactory = factory;
}

/**
* @return CompletableFuture<Triple<Optional<Policies>, whether the current cluster is allowed to access the topic.
*/
public CompletableFuture<Boolean> isCurrentClusterAllowed(@NonNull TopicName topicName) {
final String cluster = getPulsar().getConfig().getClusterName();
return getCombinedTopicPolicies(topicName).thenApply(triple -> {
Optional<TopicPolicies> topicP = triple.getRight();
Optional<TopicPolicies> globalTopicP = triple.getMiddle();
Optional<Policies> nsPolicies = triple.getLeft();
// Disabled a cluster for a namespace manually.
if (nsPolicies.isPresent() && !isCurrentClusterAllowed(topicName.getNamespaceObject(), nsPolicies.get())) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible regression: isCurrentClusterAllowed(NamespaceName, Policies) returns false when namespace replication_clusters excludes the current cluster (with empty allowed_clusters). But topic-level replicationClusters is supposed to override namespace-level defaults — e.g. ns replication_clusters=[c1], topic replicationClusters=[c1,c2] should be allowed on c2, but this short-circuit returns false.

Suggest only short-circuiting on the allowed_clusters hard gate (PIP-321), then deferring to topic-level checks before falling back to ns replication_clusters.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possible regression: isCurrentClusterAllowed(NamespaceName, Policies) returns false when namespace replication_clusters excludes the current cluster (with empty allowed_clusters). But topic-level replicationClusters is supposed to override namespace-level defaults — e.g. ns replication_clusters=[c1], topic replicationClusters=[c1,c2] should be allowed on c2, but this short-circuit returns false.

This situation does not exist. Let's review all possible use cases:

  1. Both clusters use a shared metadata store
    a. The topic will never be allowed to be accessed by the current cluster, even if the topic-level replication policy contains {current cluster}.
    b. The PIP-321 Introduce allowed-cluster at the namespace level introduces concept namespace level allowed_clusters to let clusters that using shared metadata store can enable topic level replication.
  2. Both clusters use their own metadata store
    a. All namespace-level replication_clutsers will contain the current cluster, and namespace-level replication cluster can not be set to a empty value.
    b. No one will remove current cluster from namespace-level replication_clusters, which is meaningless

return false;
}
// Manually enabled topic-level replication, which can skip to set a namespace-level replication.
if (topicP.isPresent() && CollectionUtils.isNotEmpty(topicP.get().getReplicationClusters())) {
if (topicP.get().getReplicationClusters().contains(cluster)) {
return true;
} else {
return false;
}
}
if (globalTopicP.isPresent() && CollectionUtils.isNotEmpty(globalTopicP.get().getReplicationClusters())) {
if (globalTopicP.get().getReplicationClusters().contains(cluster)) {
return true;
} else {
return false;
}
}
return true;
});
}

/***
* After PIP-321 Introduce allowed-cluster at the namespace level, the condition that whether the cluster is
* allowed to access by the current cluster was defined by two fields:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@ public void testDeleteRemoteTopicByGlobalPolicy() throws Exception {
});
waitReplicatorStopped(subTopic, pulsar1, pulsar2, true);

try {
admin2.topics().createMissedPartitions(topicName);
Comment thread
poorbarcode marked this conversation as resolved.
fail("The action that creates mission partitions should have thrown exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("is not allowed to be loaded up"));
}
assertFalse(admin2.topics().getList(replicatedNamespace)
.contains(TopicName.get(topicName).getPartition(0).toString()));

// Remove global policy.
admin1.topicPolicies(true).removeReplicationClusters(topicName);
Producer<byte[]> producer2 = client1.newProducer().topic(topicName).create();
Expand Down
Loading