From 2970a21346c1e07c2323e61ed16b5902f4cf7235 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Thu, 30 Apr 2026 15:44:09 +0800 Subject: [PATCH 1/6] Avoid repeated NamespaceBundles construction in bundleRange validation. --- .../broker/admin/impl/NamespacesBase.java | 26 +++-- .../broker/admin/impl/ResourceQuotasBase.java | 3 +- .../broker/admin/v2/NonPersistentTopics.java | 5 +- .../pulsar/broker/web/PulsarWebResource.java | 97 +++++++++---------- 4 files changed, 60 insertions(+), 71 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index c94c59df2bfaf..6ecded356457e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -612,8 +612,7 @@ protected CompletableFuture internalDeleteNamespaceBundleAsync(String bund } return future .thenCompose(__ -> - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, - bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, true)) .thenCompose(bundle -> { return pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName) @@ -1528,9 +1527,8 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR } }) .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> - isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange) + .thenCompose(__ -> + isBundleOwnedByAnyBroker(namespaceName, bundleRange) .thenCompose(flag -> { if (!flag) { log.info() @@ -1540,7 +1538,7 @@ public CompletableFuture internalUnloadNamespaceBundleAsync(String bundleR return CompletableFuture.completedFuture(null); } Optional destinationBrokerOpt = Optional.ofNullable(destinationBroker); - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, true) .thenCompose(nsBundle -> pulsar().getNamespaceService() .unloadNamespaceBundle(nsBundle, destinationBrokerOpt)); @@ -1585,7 +1583,7 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl .thenCompose(bundleRange -> { return getNamespacePoliciesAsync(namespaceName) .thenCompose(policies -> - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, false)) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, pulsar().getNamespaceService() @@ -1604,7 +1602,7 @@ protected CompletableFuture internalGetTopicHashPositionsAsy PolicyOperation.READ) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenCompose(policies -> { - return validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, false, true) .thenCompose(nsBundle -> pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(nsBundle)) @@ -1972,11 +1970,10 @@ protected CompletableFuture internalClearNamespaceBundleBacklogAsync(Strin // check cluster ownership for a given global namespace: redirect if peer-cluster owns it return validateGlobalNamespaceOwnershipAsync(namespaceName); }) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> + .thenCompose(__ -> // Allow acquiring ownership for an unassigned bundle so backlog can be cleared // even if not loaded. - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, false)) .thenCompose(bundle -> clearBacklogAsync(bundle, null)) .thenRun(() -> log.info() @@ -2028,11 +2025,10 @@ protected CompletableFuture internalClearNamespaceBundleBacklogForSubscrip // check cluster ownership for a given global namespace: redirect if peer-cluster owns it return validateGlobalNamespaceOwnershipAsync(namespaceName); }) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> + .thenCompose(__ -> // Allow acquiring ownership for an unassigned bundle so backlog can be cleared // even if not loaded. - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, false)) .thenCompose(bundle -> clearBacklogAsync(bundle, subscription)) .thenRun(() -> log.info() @@ -2082,7 +2078,7 @@ protected CompletableFuture internalUnsubscribeNamespaceBundleAsync(String .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)) .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) .thenCompose(policies -> - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, authoritative, false)) .thenCompose(bundle -> unsubscribeAsync(bundle, subscription)) .thenRun(() -> log.info() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java index 2b9c13596c1dc..0c681a3041987 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java @@ -69,7 +69,6 @@ private CompletableFuture getNamespaceBundleRangeAsync(String b } }); return ret - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenApply(policies -> validateNamespaceBundleRange(namespaceName, policies.bundles, bundleRange)); + .thenComposeAsync(__ -> validateNamespaceBundleRangeAsync(namespaceName, bundleRange)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 2f33c15d77f0a..52c4addd9914e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -478,12 +478,11 @@ public void getListFromBundle( .attr("bundleRange", bundleRange) .log("list of topics on namespace bundle"); validateNamespaceOperation(namespaceName, NamespaceOperation.GET_BUNDLE); - Policies policies = getNamespacePolicies(namespaceName); // check cluster ownership for a given global namespace: redirect if peer-cluster owns it validateGlobalNamespaceOwnership(namespaceName); - isBundleOwnedByAnyBroker(namespaceName, policies.bundles, bundleRange).thenAccept(flag -> { + isBundleOwnedByAnyBroker(namespaceName, bundleRange).thenAccept(flag -> { if (!flag) { log.info() .attr("namespace", namespaceName) @@ -491,7 +490,7 @@ public void getListFromBundle( .log("Namespace bundle is not owned by any broker"); asyncResponse.resume(Response.noContent().build()); } else { - validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, true, true) + validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, true, true) .thenAccept(nsBundle -> { final var bundleTopics = pulsar().getBrokerService().getMultiLayerTopicsMap() .get(namespaceName.toString()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 109d44ad8513e..b8ddd40f037f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -585,7 +585,7 @@ static boolean isValidCluster(PulsarService pulsarService, String cluster) { return !pulsarService.getConfiguration().isAuthorizationEnabled(); } - protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, BundlesData bundles, + protected CompletableFuture validateNamespaceBundleRangeAsync(NamespaceName fqnn, String bundleRange) { try { checkArgument(bundleRange.contains("_"), "Invalid bundle range: " + bundleRange); @@ -596,77 +596,72 @@ protected NamespaceBundle validateNamespaceBundleRange(NamespaceName fqnn, Bundl (upperEndpoint.equals(NamespaceBundles.FULL_UPPER_BOUND)) ? BoundType.CLOSED : BoundType.OPEN); NamespaceBundle nsBundle = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundle(fqnn, hashRange); - NamespaceBundles nsBundles = pulsar().getNamespaceService().getNamespaceBundleFactory().getBundles(fqnn, - bundles); - nsBundles.validateBundle(nsBundle); - return nsBundle; + return pulsar().getNamespaceService().getNamespaceBundleFactory().getBundlesAsync(fqnn) + .thenApply(nsBundles -> { + try { + nsBundles.validateBundle(nsBundle); + return nsBundle; + } catch (IllegalArgumentException e) { + log.error() + .attr("namespace", fqnn.toString()) + .attr("bundleRange", bundleRange) + .exceptionMessage(e) + .log("Invalid bundle range"); + throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()); + } catch (Exception e) { + log.error() + .attr("namespace", fqnn.toString()) + .attr("bundleRange", bundleRange) + .exception(e) + .log("Failed to validate namespace bundle"); + throw new RestException(e); + } + }); } catch (IllegalArgumentException e) { log.error() .attr("namespace", fqnn.toString()) .attr("bundleRange", bundleRange) .exceptionMessage(e) .log("Invalid bundle range"); - throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage()); + return CompletableFuture.failedFuture( + new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage())); + } catch (RestException e) { + return CompletableFuture.failedFuture(e); } catch (Exception e) { log.error() - .attr("bundle", fqnn.toString()) + .attr("namespace", fqnn.toString()) .attr("bundleRange", bundleRange) .exception(e) .log("Failed to validate namespace bundle"); - throw new RestException(e); + return CompletableFuture.failedFuture(new RestException(e)); } } /** * Checks whether a given bundle is currently loaded by any broker. */ - protected CompletableFuture isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundles, - String bundleRange) { - NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); - NamespaceService nsService = pulsar().getNamespaceService(); - - if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { - return nsService.checkOwnershipPresentAsync(nsBundle); - } - - LookupOptions options = LookupOptions.builder() - .authoritative(false) - .requestHttps(isRequestHttps()) - .readOnly(true) - .loadTopicsInBundle(false).build(); - - return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent); - } - - protected NamespaceBundle validateNamespaceBundleOwnership(NamespaceName fqnn, BundlesData bundles, - String bundleRange, boolean authoritative, boolean readOnly) { - try { - NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); - validateBundleOwnership(nsBundle, authoritative, readOnly); - return nsBundle; - } catch (WebApplicationException wae) { - throw wae; - } catch (Exception e) { - log.error() - .attr("bundle", fqnn.toString()) - .attr("bundleRange", bundleRange) - .exception(e) - .log("Failed to validate namespace bundle"); - throw new RestException(e); - } + protected CompletableFuture isBundleOwnedByAnyBroker(NamespaceName fqnn, String bundleRange) { + return validateNamespaceBundleRangeAsync(fqnn, bundleRange) + .thenCompose(nsBundle -> { + NamespaceService nsService = pulsar().getNamespaceService(); + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + return nsService.checkOwnershipPresentAsync(nsBundle); + } + LookupOptions options = LookupOptions.builder() + .authoritative(false) + .requestHttps(isRequestHttps()) + .readOnly(true) + .loadTopicsInBundle(false).build(); + return nsService.getWebServiceUrlAsync(nsBundle, options).thenApply(Optional::isPresent); + }); } protected CompletableFuture validateNamespaceBundleOwnershipAsync( - NamespaceName fqnn, BundlesData bundles, String bundleRange, + NamespaceName fqnn, String bundleRange, boolean authoritative, boolean readOnly) { - NamespaceBundle nsBundle; - try { - nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange); - } catch (WebApplicationException wae) { - return CompletableFuture.failedFuture(wae); - } - return validateBundleOwnershipAsync(nsBundle, authoritative, readOnly) - .thenApply(__ -> nsBundle); + return validateNamespaceBundleRangeAsync(fqnn, bundleRange) + .thenCompose(nsBundle -> validateBundleOwnershipAsync(nsBundle, authoritative, readOnly) + .thenApply(__ -> nsBundle)); } public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritative, boolean readOnly) From ded16f0eeb3865b11e4b3fb2b270f0650aa3ac87 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Fri, 1 May 2026 10:19:59 +0800 Subject: [PATCH 2/6] Fix checkstyle: remove unused import BundlesData --- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index b8ddd40f037f4..4d1275a88e6dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -79,7 +79,6 @@ import org.apache.pulsar.common.naming.NamespaceBundles; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.NamespaceOperation; From cf01954fddb2361f2037aa34417060e522f24eb0 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Sat, 2 May 2026 02:32:16 +0800 Subject: [PATCH 3/6] Fix comments. --- .../org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java | 2 +- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java index 0c681a3041987..2c448a5e9d5da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/ResourceQuotasBase.java @@ -69,6 +69,6 @@ private CompletableFuture getNamespaceBundleRangeAsync(String b } }); return ret - .thenComposeAsync(__ -> validateNamespaceBundleRangeAsync(namespaceName, bundleRange)); + .thenCompose(__ -> validateNamespaceBundleRangeAsync(namespaceName, bundleRange)); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 4d1275a88e6dc..fbb65de9fe71e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -624,8 +624,6 @@ protected CompletableFuture validateNamespaceBundleRangeAsync(N .log("Invalid bundle range"); return CompletableFuture.failedFuture( new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage())); - } catch (RestException e) { - return CompletableFuture.failedFuture(e); } catch (Exception e) { log.error() .attr("namespace", fqnn.toString()) From 1c7fc8cadf73c8c4654a33ffd02470b382989d5f Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Sat, 2 May 2026 10:04:05 +0800 Subject: [PATCH 4/6] fix. --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 6ecded356457e..9aa8dea23bdc9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1581,10 +1581,8 @@ protected CompletableFuture internalSplitNamespaceBundleAsync(String bundl .thenCompose(__ -> validatePoliciesReadOnlyAccessAsync()) .thenCompose(__ -> getBundleRangeAsync(bundleName)) .thenCompose(bundleRange -> { - return getNamespacePoliciesAsync(namespaceName) - .thenCompose(policies -> - validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, - authoritative, false)) + return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, + authoritative, false) .thenCompose(nsBundle -> pulsar().getNamespaceService().splitAndOwnBundle(nsBundle, unload, pulsar().getNamespaceService() .getNamespaceBundleSplitAlgorithmByName(splitAlgorithmName), From 43517f515d960e334da44e9b52a09e6f2723f355 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Tue, 5 May 2026 16:16:38 +0800 Subject: [PATCH 5/6] fix. --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 9aa8dea23bdc9..1d88892553195 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -2074,10 +2074,8 @@ protected CompletableFuture internalUnsubscribeNamespaceBundleAsync(String return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName)) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> - validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, - authoritative, false)) + .thenCompose(__ -> validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, + authoritative, false)) .thenCompose(bundle -> unsubscribeAsync(bundle, subscription)) .thenRun(() -> log.info() .attr("subscription", subscription) From cb5f337476d4c714184cede512a4ff559f261362 Mon Sep 17 00:00:00 2001 From: zhaizhibo Date: Tue, 5 May 2026 16:31:14 +0800 Subject: [PATCH 6/6] Fix comments. --- .../broker/admin/impl/NamespacesBase.java | 3 +- .../pulsar/broker/admin/NamespacesTest.java | 120 ++++++++++++++++++ 2 files changed, 121 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 1d88892553195..4820a61eebe36 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1598,8 +1598,7 @@ protected CompletableFuture internalGetTopicHashPositionsAsy .log("Getting hash position for topic list , bundle"); return validateNamespacePolicyOperationAsync(namespaceName, PolicyName.PERSISTENCE, PolicyOperation.READ) - .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) - .thenCompose(policies -> { + .thenCompose(__ -> { return validateNamespaceBundleOwnershipAsync(namespaceName, bundleRange, false, true) .thenCompose(nsBundle -> diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 4ac64405c607d..e3f5dba1e6b6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -2571,4 +2571,124 @@ public void testGetClusterAntiAffinityNamespaces() throws Exception { assertEquals(namespacesResp, namespacesWithFullPath); } + @Test + public void testBundleValidationWithNonExistentNamespace() throws Exception { + String nonExistentNs = "non-existent-namespace"; + String bundleRange = "0x00000000_0x80000000"; + + // Test unload on non-existent namespace - should return 404 + // The error is thrown by validateGlobalNamespaceOwnershipAsync before reaching bundle validation + AsyncResponse unloadResponse = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadResponse, testTenant, nonExistentNs, + bundleRange, false, null); + ArgumentCaptor unloadCaptor = ArgumentCaptor.forClass(RestException.class); + verify(unloadResponse, timeout(5000).times(1)).resume(unloadCaptor.capture()); + assertEquals(unloadCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + + // Test split on non-existent namespace - should return 404 + AsyncResponse splitResponse = mock(AsyncResponse.class); + namespaces.splitNamespaceBundle(splitResponse, testTenant, nonExistentNs, + bundleRange, false, true, null, null); + ArgumentCaptor splitCaptor = ArgumentCaptor.forClass(RestException.class); + verify(splitResponse, timeout(5000).times(1)).resume(splitCaptor.capture()); + assertEquals(splitCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + + // Test clear backlog on non-existent namespace - should return 404 + AsyncResponse clearResponse = mock(AsyncResponse.class); + namespaces.clearNamespaceBundleBacklog(clearResponse, testTenant, nonExistentNs, + bundleRange, false); + ArgumentCaptor clearCaptor = ArgumentCaptor.forClass(RestException.class); + verify(clearResponse, timeout(5000).times(1)).resume(clearCaptor.capture()); + assertEquals(clearCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + + // Test clear backlog for subscription on non-existent namespace - should return 404 + AsyncResponse clearSubResponse = mock(AsyncResponse.class); + namespaces.clearNamespaceBundleBacklogForSubscription(clearSubResponse, testTenant, nonExistentNs, + bundleRange, "test-sub", false); + ArgumentCaptor clearSubCaptor = ArgumentCaptor.forClass(RestException.class); + verify(clearSubResponse, timeout(5000).times(1)).resume(clearSubCaptor.capture()); + assertEquals(clearSubCaptor.getValue().getResponse().getStatus(), + Response.Status.NOT_FOUND.getStatusCode(), + "Non-existent namespace should return 404"); + } + + @Test + public void testBundleValidationAfterSplit() throws Exception { + URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + String bundledNsLocal = "test-bundle-validation-after-split"; + List boundaries = List.of("0x00000000", "0xffffffff"); + BundlesData bundleData = BundlesData.builder() + .boundaries(boundaries) + .numBundles(boundaries.size() - 1) + .build(); + createBundledTestNamespaces(this.testTenant, bundledNsLocal, bundleData); + final NamespaceName testNs = NamespaceName.get(this.testTenant, bundledNsLocal); + + OwnershipCache mockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache()); + doReturn(CompletableFuture.completedFuture(null)).when(mockOwnershipCache) + .disableOwnership(any(NamespaceBundle.class)); + Field ownership = NamespaceService.class.getDeclaredField("ownershipCache"); + ownership.setAccessible(true); + ownership.set(pulsar.getNamespaceService(), mockOwnershipCache); + mockWebUrl(localWebServiceUrl, testNs); + + // Split the bundle + AsyncResponse splitResponse = mock(AsyncResponse.class); + namespaces.splitNamespaceBundle(splitResponse, testTenant, bundledNsLocal, + "0x00000000_0xffffffff", + false, true, null, null); + ArgumentCaptor splitCaptor = ArgumentCaptor.forClass(Response.class); + verify(splitResponse, timeout(5000).times(1)).resume(splitCaptor.capture()); + + // Verify split was successful + BundlesData bundlesDataAfterSplit = (BundlesData) asyncRequests(ctx -> namespaces.getBundlesData(ctx, + testTenant, bundledNsLocal)); + assertNotNull(bundlesDataAfterSplit); + assertEquals(bundlesDataAfterSplit.getBoundaries().size(), 3); + assertEquals(bundlesDataAfterSplit.getBoundaries().get(0), "0x00000000"); + assertEquals(bundlesDataAfterSplit.getBoundaries().get(1), "0x7fffffff"); + assertEquals(bundlesDataAfterSplit.getBoundaries().get(2), "0xffffffff"); + + // Now test bundle validation with the old (invalid) bundle range - should return 412 + AsyncResponse unloadOldBundleResponse = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadOldBundleResponse, testTenant, bundledNsLocal, + "0x00000000_0xffffffff", false, null); + ArgumentCaptor unloadOldCaptor = ArgumentCaptor.forClass(RestException.class); + verify(unloadOldBundleResponse, timeout(5000).times(1)).resume(unloadOldCaptor.capture()); + assertEquals(unloadOldCaptor.getValue().getResponse().getStatus(), + Response.Status.PRECONDITION_FAILED.getStatusCode(), + "Old bundle range after split should return 412"); + + // Test bundle validation with new valid bundle ranges - should succeed + doReturn(true).when(nsSvc) + .isServiceUnitOwned(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(testNs))); + doReturn(CompletableFuture.completedFuture(null)).when(nsSvc) + .unloadNamespaceBundle(any(NamespaceBundle.class)); + + AsyncResponse unloadNewBundle1Response = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadNewBundle1Response, testTenant, bundledNsLocal, + "0x00000000_0x7fffffff", false, null); + ArgumentCaptor newBundle1Captor = ArgumentCaptor.forClass(Response.class); + verify(unloadNewBundle1Response, timeout(5000).times(1)).resume(newBundle1Captor.capture()); + assertEquals(newBundle1Captor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode(), + "New bundle range should be valid"); + + AsyncResponse unloadNewBundle2Response = mock(AsyncResponse.class); + namespaces.unloadNamespaceBundle(unloadNewBundle2Response, testTenant, bundledNsLocal, + "0x7fffffff_0xffffffff", false, null); + ArgumentCaptor newBundle2Captor = ArgumentCaptor.forClass(Response.class); + verify(unloadNewBundle2Response, timeout(5000).times(1)).resume(newBundle2Captor.capture()); + assertEquals(newBundle2Captor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode(), + "New bundle range should be valid"); + + // cleanup + resetBroker(); + } + }