diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java index 4f7d0747f2822..8216e825979aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ClustersBase.java @@ -738,15 +738,16 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus } catch (PulsarServerException e) { return FutureUtil.failedFuture(e); } - // compile regex patterns once - List namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList(); - // TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option + Set combinedNamespaces = new HashSet<>(policyData.getNamespaces()); + final List oldNamespaces = new ArrayList<>(); + if (oldPolicy != null) { + oldNamespaces.addAll(oldPolicy.getNamespaces()); + combinedNamespaces.addAll(oldNamespaces); + } return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> { List>> filteredNamespacesForEachTenant = tenants.stream() .map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> { List> namespaceNamesInCluster = namespaces.stream() - .filter(namespaceName -> namespacePatterns.stream() - .anyMatch(pattern -> pattern.matcher(namespaceName).matches())) .map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName) .thenApply(policies -> policies.replication_clusters.contains(cluster) ? namespaceName : null)) @@ -762,46 +763,44 @@ private CompletableFuture filterAndUnloadMatchedNamespaceAsync(String clus .map(CompletableFuture::join) .flatMap(List::stream) .collect(Collectors.toList())); - }).thenCompose(shouldUnloadNamespaces -> { - if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) { + }).thenCompose(clusterLocalNamespaces -> { + if (CollectionUtils.isEmpty(clusterLocalNamespaces)) { return CompletableFuture.completedFuture(null); } // If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might // actually have been changed. log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData); - if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) { - // We also compare that the previous primary broker list is same as current, in case all namespaces need - // to be placed again anyway. - if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) { - // list is same, so we continue finding the changed namespaces. - // We create a union regex list contains old + new regexes - Set combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces()); - combinedNamespaces.addAll(policyData.getNamespaces()); - // We create a intersection of the old and new regexes. These won't need to be unloaded - Set commonNamespaces = new HashSet<>(oldPolicy.getNamespaces()); - commonNamespaces.retainAll(policyData.getNamespaces()); + boolean unloadAllNamespaces = false; + // We also compare that the previous primary broker list is same as current, in case all namespaces need + // to be placed again anyway. + if (NamespaceIsolationPolicyUnloadScope.all_matching.equals(policyData.getUnloadScope()) + || (oldPolicy != null + && !CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary()))) { + unloadAllNamespaces = true; + } + // list is same, so we continue finding the changed namespaces. - log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces); + // We create a intersection of the old and new regexes. These won't need to be unloaded. + Set commonNamespaces = new HashSet<>(policyData.getNamespaces()); + commonNamespaces.retainAll(oldNamespaces); - // Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old) - combinedNamespaces.removeAll(commonNamespaces); + log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, commonNamespaces); - log.debug("changed regexes: {}", commonNamespaces); + if (!unloadAllNamespaces) { + // Find the changed regexes ((new U old) - (new ∩ old)). + combinedNamespaces.removeAll(commonNamespaces); + log.debug("changed regexes: {}", commonNamespaces); + } - // Now we further filter the filtered namespaces based on this combinedNamespaces set - shouldUnloadNamespaces = shouldUnloadNamespaces.stream() - .filter(name -> combinedNamespaces.stream() - .map(Pattern::compile) - .anyMatch(pattern -> pattern.matcher(name).matches()) - ).toList(); + // Now we further filter the filtered namespaces based on this combinedNamespaces set + List namespacePatterns = combinedNamespaces.stream().map(Pattern::compile).toList(); + clusterLocalNamespaces = clusterLocalNamespaces.stream() + .filter(name -> namespacePatterns.stream().anyMatch(pattern -> pattern.matcher(name).matches())) + .toList(); - } - } - // unload type is either null or not in (changed, none), so we proceed to unload all namespaces - // TODO - default in 4.x should become `changed` - List> futures = shouldUnloadNamespaces.stream() + List> futures = clusterLocalNamespaces.stream() .map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName)) .collect(Collectors.toList()); return FutureUtil.waitForAll(futures).thenAccept(__ -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index f8218fa54f636..eba00324da641 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -3581,7 +3581,7 @@ private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadSc parameters1.put("usage_threshold", "100"); List nsRegexList = new ArrayList<>(namespaces); - return NamespaceIsolationData.builder() + NamespaceIsolationData.Builder build = NamespaceIsolationData.builder() // "prop-ig/ns1" is present in test cluster, policy set on test2 should work .namespaces(nsRegexList) .primary(primaryBrokers) @@ -3589,9 +3589,11 @@ private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadSc .autoFailoverPolicy(AutoFailoverPolicyData.builder() .policyType(AutoFailoverPolicyType.min_available) .parameters(parameters1) - .build()) - .unloadScope(scope) - .build(); + .build()); + if (scope != null) { + build.unloadScope(scope); + } + return build.build(); } private boolean allTopicsUnloaded(List topics) { @@ -3717,18 +3719,42 @@ public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) thr testIsolationPolicyUnloadsNSWithScope( topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-c.*"), List.of("b1", "b2"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithChangedScope1(final String topicType) throws Exception { + String nsPrefix1 = newUniqueName(defaultTenant + "/") + "-unload-test-"; + // Addition case + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-changed1", nsPrefix1, List.of("a1", "a2", "b1", "b2", "c1"), + all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), + changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), + Collections.singletonList(".*") + ); + } + + @Test(dataProvider = "topicType") + public void testIsolationPolicyUnloadsNSWithChangedScope2(final String topicType) throws Exception { + String nsPrefix2 = newUniqueName(defaultTenant + "/") + "-unload-test-"; + // removal case + testIsolationPolicyUnloadsNSWithScope( + topicType, "policy-changed2", nsPrefix2, List.of("a1", "a2", "b1", "b2", "c1"), all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"), + changed, List.of(".*-unload-test-c.*"), List.of("b1", "b2", "c1"), Collections.singletonList(".*") ); } @Test(dataProvider = "topicType") - public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception { + public void testIsolationPolicyUnloadsNSWithScopeMissing(final String topicType) throws Exception { String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-"; testIsolationPolicyUnloadsNSWithScope( topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"), all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"), - changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), + null, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"), Collections.singletonList(".*") ); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java index 0771144d31d9a..54cb5dcee4c97 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaceIsolationPolicy.java @@ -75,12 +75,12 @@ private class SetPolicy extends CliCommand { private Map autoFailoverPolicyParams; @Parameter(names = "--unload-scope", description = "configure the type of unload to do -" - + " ['all_matching', 'none', 'changed'] namespaces. By default, all namespaces matching the namespaces" - + " regex will be unloaded and placed again. You can choose to not unload any namespace while setting" - + " this new policy by choosing `none` or choose to unload only the namespaces whose placement will" - + " actually change. If you chose 'none', you will need to manually unload the namespaces for them to" - + " be placed correctly, or wait till some namespaces get load balanced automatically based on load" - + " shedding configurations.") + + " ['all_matching', 'none', 'changed'] namespaces. By default, only namespaces whose placement will" + + " actually change would be unloaded and placed again. You can choose to not unload any namespace" + + " while setting this new policy by choosing `none` or choose to unload all namespaces matching" + + " old (if any) and new namespace regex. If you chose 'none', you will need to manually unload the" + + " namespaces for them to be placed correctly, or wait till some namespaces get load balanced" + + " automatically based on load shedding configurations.") private NamespaceIsolationPolicyUnloadScope unloadScope; void run() throws PulsarAdminException { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java index 1e72f0e50ee05..85be8090f52a1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/NamespaceIsolationDataImpl.java @@ -78,8 +78,8 @@ public class NamespaceIsolationDataImpl implements NamespaceIsolationData { @ApiModelProperty( name = "unload_scope", value = "The type of unload to perform while applying the new isolation policy.", - example = "'all_matching' (default) for unloading all matching namespaces. 'none' for not unloading " - + "any namespace. 'changed' for unloading only the namespaces whose placement is actually changing" + example = "'changed' (default) for unloading only the namespaces whose placement is actually changing. " + + "'all_matching' for unloading all matching namespaces. 'none' for not unloading any namespaces." ) @JsonProperty("unload_scope") private NamespaceIsolationPolicyUnloadScope unloadScope;