Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,9 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case GRANT_PERMISSION:
case GET_PERMISSION:
case REVOKE_PERMISSION:
case GET_PROPERTIES:
case UPDATE_PROPERTIES:
case DELETE_PROPERTIES:
return CompletableFuture.completedFuture(false);
default:
return FutureUtil.failedFuture(new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2759,7 +2759,7 @@ protected void internalSetMaxTopicsPerNamespace(Integer maxTopicsPerNamespace) {
}

protected void internalSetProperty(String key, String value, AsyncResponse asyncResponse) {
validateAdminAccessForTenantAsync(namespaceName.getTenant())
validateBothAdminAccessForTenantAndNamespaceOperationAsync(namespaceName, NamespaceOperation.UPDATE_PROPERTIES)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.put(key, value);
Expand All @@ -2779,7 +2779,7 @@ protected void internalSetProperty(String key, String value, AsyncResponse async
}

protected void internalSetProperties(Map<String, String> properties, AsyncResponse asyncResponse) {
validateAdminAccessForTenantAsync(namespaceName.getTenant())
validateBothAdminAccessForTenantAndNamespaceOperationAsync(namespaceName, NamespaceOperation.UPDATE_PROPERTIES)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
policies.properties.putAll(properties);
Expand All @@ -2799,7 +2799,7 @@ protected void internalSetProperties(Map<String, String> properties, AsyncRespon
}

protected void internalGetProperty(String key, AsyncResponse asyncResponse) {
validateAdminAccessForTenantAsync(namespaceName.getTenant())
validateBothAdminAccessForTenantAndNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PROPERTIES)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties.get(key)))
.exceptionally(ex -> {
Expand All @@ -2812,7 +2812,7 @@ protected void internalGetProperty(String key, AsyncResponse asyncResponse) {
}

protected void internalGetProperties(AsyncResponse asyncResponse) {
validateAdminAccessForTenantAsync(namespaceName.getTenant())
validateBothAdminAccessForTenantAndNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_PROPERTIES)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenAccept(policies -> asyncResponse.resume(policies.properties))
.exceptionally(ex -> {
Expand All @@ -2825,7 +2825,7 @@ protected void internalGetProperties(AsyncResponse asyncResponse) {

protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) {
AtomicReference<String> oldVal = new AtomicReference<>(null);
validateAdminAccessForTenantAsync(namespaceName.getTenant())
validateBothAdminAccessForTenantAndNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_PROPERTIES)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
oldVal.set(policies.properties.remove(key));
Expand All @@ -2845,7 +2845,7 @@ protected void internalRemoveProperty(String key, AsyncResponse asyncResponse) {

protected void internalClearProperties(AsyncResponse asyncResponse) {
AtomicReference<Integer> clearedCount = new AtomicReference<>(0);
validateAdminAccessForTenantAsync(namespaceName.getTenant())
validateBothAdminAccessForTenantAndNamespaceOperationAsync(namespaceName, NamespaceOperation.DELETE_PROPERTIES)
.thenCompose(__ -> validatePoliciesReadOnlyAccessAsync())
.thenCompose(__ -> updatePoliciesAsync(namespaceName, policies -> {
clearedCount.set(policies.properties.size());
Expand Down Expand Up @@ -3206,4 +3206,41 @@ private CompletableFuture<BundlesData> getDefaultBundleDataAsync() {
getBundles(config().getDefaultNumberOfNamespaceBundles()));
}


protected CompletableFuture<Void> validateBothAdminAccessForTenantAndNamespaceOperationAsync(
NamespaceName namespaceName, NamespaceOperation operation) {
final var tenantAdminValidation = validateAdminAccessForTenantAsync(namespaceName.getTenant());
final var namespaceOperationValidation = validateNamespaceOperationAsync(namespaceName, operation);
return FutureUtil.waitForAll(List.of(tenantAdminValidation, namespaceOperationValidation))
.handle((result, err) -> {
if (!tenantAdminValidation.isCompletedExceptionally()
|| !namespaceOperationValidation.isCompletedExceptionally()) {
return null;
}
if (log.isDebugEnabled()) {
Throwable tenantAdminValidationException = null;
try {
tenantAdminValidation.join();
} catch (Throwable ex) {
tenantAdminValidationException = FutureUtil.unwrapCompletionException(ex);
}
Throwable namespaceOperationValidationException = null;
try {
namespaceOperationValidation.join();
} catch (Throwable ex) {
namespaceOperationValidationException = FutureUtil.unwrapCompletionException(ex);
}
log.debug("validateBothAdminAccessForTenantAndNamespaceOperationAsync failed."
+ " originalPrincipal={} clientAppId={} operation={} namespace={} "
+ "tenantAdminValidationError={} namespaceOperationValidationError={}",
originalPrincipal(), clientAppId(), operation.toString(), namespaceName,
tenantAdminValidationException, namespaceOperationValidationException);
}
throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to validateBothAdminAccessForTenantAndNamespaceOperationAsync"
+ " for originalPrincipal [%s] and clientAppId [%s] about operation [%s]"
+ " on namespace [%s]",
originalPrincipal(), clientAppId(), operation.toString(), namespaceName));
});
Comment on lines +3212 to +3244
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateBothAdminAccessForTenantAndNamespaceOperationAsync always starts both authorization checks and waits for both to complete, even if the tenant-admin check succeeds. This adds extra auth traffic/latency to every namespace properties request and can make the endpoint as slow as the slower of the two checks. Consider short-circuiting on the first successful validation (and only running the second path if the first fails).

Suggested change
final var tenantAdminValidation = validateAdminAccessForTenantAsync(namespaceName.getTenant());
final var namespaceOperationValidation = validateNamespaceOperationAsync(namespaceName, operation);
return FutureUtil.waitForAll(List.of(tenantAdminValidation, namespaceOperationValidation))
.handle((result, err) -> {
if (!tenantAdminValidation.isCompletedExceptionally()
|| !namespaceOperationValidation.isCompletedExceptionally()) {
return null;
}
if (log.isDebugEnabled()) {
Throwable tenantAdminValidationException = null;
try {
tenantAdminValidation.join();
} catch (Throwable ex) {
tenantAdminValidationException = FutureUtil.unwrapCompletionException(ex);
}
Throwable namespaceOperationValidationException = null;
try {
namespaceOperationValidation.join();
} catch (Throwable ex) {
namespaceOperationValidationException = FutureUtil.unwrapCompletionException(ex);
}
log.debug("validateBothAdminAccessForTenantAndNamespaceOperationAsync failed."
+ " originalPrincipal={} clientAppId={} operation={} namespace={} "
+ "tenantAdminValidationError={} namespaceOperationValidationError={}",
originalPrincipal(), clientAppId(), operation.toString(), namespaceName,
tenantAdminValidationException, namespaceOperationValidationException);
}
throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to validateBothAdminAccessForTenantAndNamespaceOperationAsync"
+ " for originalPrincipal [%s] and clientAppId [%s] about operation [%s]"
+ " on namespace [%s]",
originalPrincipal(), clientAppId(), operation.toString(), namespaceName));
});
return validateAdminAccessForTenantAsync(namespaceName.getTenant())
.handle((result, tenantAdminValidationError) -> {
if (tenantAdminValidationError == null) {
return CompletableFuture.<Void>completedFuture(null);
}
final Throwable tenantAdminValidationException =
FutureUtil.unwrapCompletionException(tenantAdminValidationError);
return validateNamespaceOperationAsync(namespaceName, operation)
.handle((namespaceResult, namespaceOperationValidationError) -> {
if (namespaceOperationValidationError == null) {
return null;
}
final Throwable namespaceOperationValidationException =
FutureUtil.unwrapCompletionException(namespaceOperationValidationError);
if (log.isDebugEnabled()) {
log.debug("validateBothAdminAccessForTenantAndNamespaceOperationAsync failed."
+ " originalPrincipal={} clientAppId={} operation={} "
+ "namespace={} tenantAdminValidationError={} "
+ "namespaceOperationValidationError={}",
originalPrincipal(), clientAppId(), operation.toString(), namespaceName,
tenantAdminValidationException, namespaceOperationValidationException);
}
throw new RestException(Status.UNAUTHORIZED,
String.format("Unauthorized to "
+ "validateBothAdminAccessForTenantAndNamespaceOperationAsync"
+ " for originalPrincipal [%s] and clientAppId [%s] "
+ "about operation [%s] on namespace [%s]",
originalPrincipal(), clientAppId(), operation.toString(),
namespaceName));
});
}).thenCompose(Function.identity());

Copilot uses AI. Check for mistakes.
Comment on lines +3239 to +3244
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When both validations fail, this helper throws a new RestException(Status.UNAUTHORIZED, ...), which can change the HTTP status and semantics compared to the underlying checks (e.g., validateNamespaceOperationAsync uses 403 FORBIDDEN; validateAdminAccessForTenantAsync can return 404 NOT_FOUND for missing tenants). This risks returning 401 for cases that should remain 403/404 and also discards the original error details. Prefer propagating one of the original exceptions (or selecting the most appropriate status) instead of always creating a new 401.

Copilot uses AI. Check for mistakes.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.List;
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added java.util.List import appears to be unused in this file (the only List usage is still fully-qualified as java.util.List), which will fail style checks (unused imports) and should be removed or the fully-qualified usages should be switched to the imported type.

Suggested change
import java.util.List;

Copilot uses AI. Check for mistakes.
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,24 +242,38 @@ public void testProperties() {
tenantManagerAdmin.namespaces().clearProperties(namespace);

// test nobody
AtomicBoolean execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.UPDATE_PROPERTIES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setProperties(namespace, properties));
Assert.assertTrue(execFlag.get());

execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.UPDATE_PROPERTIES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().setProperty(namespace, "key2", "value2"));
Assert.assertTrue(execFlag.get());

execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PROPERTIES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getProperties(namespace));
Assert.assertTrue(execFlag.get());

execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.GET_PROPERTIES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().getProperty(namespace, "key2"));
Assert.assertTrue(execFlag.get());


execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.DELETE_PROPERTIES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().removeProperty(namespace, "key2"));
Assert.assertTrue(execFlag.get());

execFlag = setAuthorizationOperationChecker(subject, NamespaceOperation.DELETE_PROPERTIES);
Assert.assertThrows(PulsarAdminException.NotAuthorizedException.class,
() -> subAdmin.namespaces().clearProperties(namespace));
Assert.assertTrue(execFlag.get());

clearAuthorizationOperationChecker();

for (AuthAction action : AuthAction.values()) {
superUserAdmin.namespaces().grantPermissionOnNamespace(namespace, subject, Set.of(action));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public enum NamespaceOperation {
GET_PERMISSION,
GRANT_PERMISSION,
REVOKE_PERMISSION,
GET_PROPERTIES,
UPDATE_PROPERTIES,
DELETE_PROPERTIES,
Comment on lines +37 to +39
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if adding new enums breaks existing 3rd party AuthorizationProviders which many Pulsar users use currently in production?


CLEAR_BACKLOG,
UNSUBSCRIBE,
Expand Down
Loading