diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java index 255d6c0379779..30e3eac8054a4 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.authorization; import static java.util.concurrent.TimeUnit.SECONDS; +import io.opentelemetry.api.OpenTelemetry; import java.net.SocketAddress; import java.util.List; import java.util.Map; @@ -33,6 +34,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationParameters; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.client.admin.GrantTopicPermissionOptions; import org.apache.pulsar.client.admin.RevokeTopicPermissionOptions; @@ -58,13 +60,22 @@ @CustomLog public class AuthorizationService { + private static final String UNKNOWN_OPERATION = "unknown"; + private final PulsarResources resources; private final AuthorizationProvider provider; private final ServiceConfiguration conf; + private final AuthorizationMetrics authorizationMetrics; public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources) throws PulsarServerException { + this(conf, pulsarResources, OpenTelemetry.noop()); + } + + public AuthorizationService(ServiceConfiguration conf, PulsarResources pulsarResources, OpenTelemetry openTelemetry) + throws PulsarServerException { this.conf = conf; + this.authorizationMetrics = new AuthorizationMetrics(openTelemetry); try { final String providerClassname = conf.getAuthorizationProvider(); if (StringUtils.isNotBlank(providerClassname)) { @@ -101,12 +112,14 @@ public CompletableFuture isSuperUser(AuthenticationParameters authParam } public CompletableFuture isSuperUser(String user, AuthenticationDataSource authenticationData) { - return provider.isSuperUser(user, authenticationData, conf); + return recordAuthorizationOperation(provider.isSuperUser(user, authenticationData, conf), + AuthorizationMetrics.RESOURCE_TYPE_SUPERUSER, "check"); } public CompletableFuture isTenantAdmin(String tenant, String role, TenantInfo tenantInfo, AuthenticationDataSource authenticationData) { - return provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData); + return recordAuthorizationOperation(provider.isTenantAdmin(tenant, role, tenantInfo, authenticationData), + AuthorizationMetrics.RESOURCE_TYPE_TENANT_ADMIN, "check"); } /** @@ -547,7 +560,8 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTenantOperationAsync(tenantName, role, operation, authData); + return recordAuthorizationOperation(provider.allowTenantOperationAsync(tenantName, role, operation, authData), + AuthorizationMetrics.RESOURCE_TYPE_TENANT, operationName(operation)); } public CompletableFuture allowTenantOperationAsync(String tenantName, @@ -556,7 +570,7 @@ public CompletableFuture allowTenantOperationAsync(String tenantName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TENANT, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTenantOperationAsync( @@ -577,18 +591,23 @@ public CompletableFuture allowBrokerOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, - brokerOperation, role, authData); - final var isOriginalAuthorizedFuture = provider.allowBrokerOperationAsync(clusterName, brokerId, - brokerOperation, originalRole, authData); + final var isRoleAuthorizedFuture = recordAuthorizationOperation( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), + AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); + final var isOriginalAuthorizedFuture = recordAuthorizationOperation( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, originalRole, + authData), + AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData); + return recordAuthorizationOperation( + provider.allowBrokerOperationAsync(clusterName, brokerId, brokerOperation, role, authData), + AuthorizationMetrics.RESOURCE_TYPE_BROKER, operationName(brokerOperation)); } } @@ -598,40 +617,48 @@ public CompletableFuture allowClusterOperationAsync(String clusterName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, - clusterOperation, role, authData); - final var isOriginalAuthorizedFuture = provider.allowClusterOperationAsync(clusterName, - clusterOperation, originalRole, authData); + final var isRoleAuthorizedFuture = recordAuthorizationOperation( + provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); + final var isOriginalAuthorizedFuture = recordAuthorizationOperation( + provider.allowClusterOperationAsync(clusterName, clusterOperation, originalRole, authData), + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData); + return recordAuthorizationOperation( + provider.allowClusterOperationAsync(clusterName, clusterOperation, role, authData), + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER, operationName(clusterOperation)); } } public CompletableFuture allowClusterPolicyOperationAsync(String clusterName, PolicyName policy, PolicyOperation operation, - String originalRole, - String role, - AuthenticationDataSource authData) { + String originalRole, + String role, + AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { - final var isRoleAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, role, - policy, operation, authData); - final var isOriginalAuthorizedFuture = provider.allowClusterPolicyOperationAsync(clusterName, originalRole, - policy, operation, authData); + final var isRoleAuthorizedFuture = recordAuthorizationOperation( + provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); + final var isOriginalAuthorizedFuture = recordAuthorizationOperation( + provider.allowClusterPolicyOperationAsync(clusterName, originalRole, policy, operation, authData), + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); return isRoleAuthorizedFuture.thenCombine(isOriginalAuthorizedFuture, (isRoleAuthorized, isOriginalAuthorized) -> isRoleAuthorized && isOriginalAuthorized); } else { - return provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData); + return recordAuthorizationOperation( + provider.allowClusterPolicyOperationAsync(clusterName, role, policy, operation, authData), + AuthorizationMetrics.RESOURCE_TYPE_CLUSTER_POLICY, operationName(operation)); } } @@ -675,7 +702,8 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowNamespaceOperationAsync(namespaceName, role, operation, authData); + return recordAuthorizationOperation(provider.allowNamespaceOperationAsync(namespaceName, role, operation, + authData), AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operationName(operation)); } public CompletableFuture allowNamespaceOperationAsync(NamespaceName namespaceName, @@ -684,7 +712,7 @@ public CompletableFuture allowNamespaceOperationAsync(NamespaceName nam String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespaceOperationAsync( @@ -718,7 +746,9 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData); + return recordAuthorizationOperation( + provider.allowNamespacePolicyOperationAsync(namespaceName, policy, operation, role, authData), + AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operationName(operation)); } public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceName namespaceName, @@ -728,7 +758,7 @@ public CompletableFuture allowNamespacePolicyOperationAsync(NamespaceNa String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowNamespacePolicyOperationAsync( @@ -781,7 +811,8 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic if (!this.conf.isAuthorizationEnabled()) { return CompletableFuture.completedFuture(true); } - return provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, authData); + return recordAuthorizationOperation(provider.allowTopicPolicyOperationAsync(topicName, role, policy, operation, + authData), AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operationName(operation)); } public CompletableFuture allowTopicPolicyOperationAsync(TopicName topicName, @@ -791,7 +822,7 @@ public CompletableFuture allowTopicPolicyOperationAsync(TopicName topic String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC_POLICY, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicPolicyOperationAsync( @@ -852,8 +883,9 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, return CompletableFuture.completedFuture(true); } - CompletableFuture allowFuture = - provider.allowTopicOperationAsync(topicName, role, operation, authData); + CompletableFuture allowFuture = recordAuthorizationOperation( + provider.allowTopicOperationAsync(topicName, role, operation, authData), + AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); return allowFuture.whenComplete((allowed, exception) -> { if (exception == null) { if (allowed) { @@ -885,7 +917,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, AuthenticationDataSource originalAuthData, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, originalAuthData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -905,7 +937,7 @@ public CompletableFuture allowTopicOperationAsync(TopicName topicName, String role, AuthenticationDataSource authData) { if (!isValidOriginalPrincipal(role, originalRole, authData)) { - return CompletableFuture.completedFuture(false); + return deniedFuture(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, operationName(operation)); } if (isProxyRole(role) && !isWebsocketPrinciple(originalRole)) { CompletableFuture isRoleAuthorizedFuture = allowTopicOperationAsync( @@ -953,4 +985,29 @@ public CompletableFuture>> getPermissionsAsync(Names public CompletableFuture>> getSubscriptionPermissionsAsync(NamespaceName namespaceName) { return provider.getSubscriptionPermissionsAsync(namespaceName); } + + private CompletableFuture deniedFuture(String resourceType, String operation) { + authorizationMetrics.recordFailure(resourceType, operation); + return CompletableFuture.completedFuture(false); + } + + private static String operationName(Enum operation) { + return operation == null ? UNKNOWN_OPERATION : operation.name().toLowerCase(); + } + + private CompletableFuture recordAuthorizationOperation(CompletableFuture authorizationFuture, + String resourceType, + String operation) { + return authorizationFuture.whenComplete((allowed, exception) -> { + if (exception != null) { + authorizationMetrics.recordError(resourceType, operation); + } else { + if (Boolean.TRUE.equals(allowed)) { + authorizationMetrics.recordSuccess(resourceType, operation); + } else if (Boolean.FALSE.equals(allowed)) { + authorizationMetrics.recordFailure(resourceType, operation); + } + } + }); + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java new file mode 100644 index 0000000000000..c227642ece7e0 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/AuthorizationMetrics.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.authorization.metrics; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; +import io.prometheus.client.Counter; + +public class AuthorizationMetrics { + public static final String AUTHORIZATION_OPERATIONS_METRIC_NAME = "pulsar_authorization_operations_total"; + public static final String AUTHORIZATION_COUNTER_METRIC_NAME = "pulsar.authorization.operation.count"; + public static final String INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.authorization"; + public static final String RESULT_SUCCESS = "success"; + public static final String RESULT_FAILURE = "failure"; + public static final String RESULT_ERROR = "error"; + public static final String RESOURCE_TYPE_SUPERUSER = "superuser"; + public static final String RESOURCE_TYPE_TENANT_ADMIN = "tenant_admin"; + public static final String RESOURCE_TYPE_TENANT = "tenant"; + public static final String RESOURCE_TYPE_BROKER = "broker"; + public static final String RESOURCE_TYPE_CLUSTER = "cluster"; + public static final String RESOURCE_TYPE_CLUSTER_POLICY = "cluster_policy"; + public static final String RESOURCE_TYPE_NAMESPACE = "namespace"; + public static final String RESOURCE_TYPE_NAMESPACE_POLICY = "namespace_policy"; + public static final String RESOURCE_TYPE_TOPIC = "topic"; + public static final String RESOURCE_TYPE_TOPIC_POLICY = "topic_policy"; + public static final AttributeKey RESOURCE_TYPE_KEY = + AttributeKey.stringKey("pulsar.authorization.resource.type"); + public static final AttributeKey OPERATION_KEY = AttributeKey.stringKey("pulsar.authorization.operation"); + public static final AttributeKey RESULT_KEY = AttributeKey.stringKey("pulsar.authorization.result"); + + private static final Counter authorizationOperations = Counter.build() + .name(AUTHORIZATION_OPERATIONS_METRIC_NAME) + .help("Pulsar authorization operations") + .labelNames("resource_type", "operation", "result") + .register(); + + private final LongCounter authorizationCounter; + + public AuthorizationMetrics(OpenTelemetry openTelemetry) { + var meter = openTelemetry.getMeter(INSTRUMENTATION_SCOPE_NAME); + authorizationCounter = meter.counterBuilder(AUTHORIZATION_COUNTER_METRIC_NAME) + .setDescription("The number of authorization operations") + .setUnit("{operation}") + .build(); + } + + public void recordSuccess(String resourceType, String operation) { + record(resourceType, operation, RESULT_SUCCESS); + } + + public void recordFailure(String resourceType, String operation) { + record(resourceType, operation, RESULT_FAILURE); + } + + public void recordError(String resourceType, String operation) { + record(resourceType, operation, RESULT_ERROR); + } + + private void record(String resourceType, String operation, String result) { + authorizationOperations.labels(resourceType, operation, result).inc(); + authorizationCounter.add(1, Attributes.of(RESOURCE_TYPE_KEY, resourceType, + OPERATION_KEY, operation, + RESULT_KEY, result)); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java new file mode 100644 index 0000000000000..a0ec5d99f5815 --- /dev/null +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/metrics/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Authorization metrics. + */ +package org.apache.pulsar.broker.authorization.metrics; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java index 6f9dffa11b948..96ef696fb47cc 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/AuthorizationServiceTest.java @@ -20,9 +20,13 @@ import static org.testng.AssertJUnit.assertFalse; import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; +import io.prometheus.client.CollectorRegistry; import java.util.HashSet; +import java.util.concurrent.ExecutionException; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.NamespaceOperation; @@ -132,4 +136,68 @@ public void testTopicPolicyOperationAsync(String role, String originalRole, bool PolicyName.ALL, PolicyOperation.READ, originalRole, role, null).get(); checkResult(shouldPass, isAuthorized); } + + @Test + public void testAuthorizationFailureMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); + boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "fail.client", null).get(); + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); + + assertFalse(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationFailureMetricForInvalidOriginalPrincipal() throws Exception { + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, + NamespaceOperation.PACKAGES.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); + boolean isAuthorized = authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"), + NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", null).get(); + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, + NamespaceOperation.PACKAGES.name().toLowerCase(), AuthorizationMetrics.RESULT_FAILURE); + + assertFalse(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationSuccessMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_SUCCESS); + boolean isAuthorized = authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "pass.client", null).get(); + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_SUCCESS); + + assertTrue(isAuthorized); + assertTrue(after - before == 1.0d); + } + + @Test + public void testAuthorizationErrorMetricForTopicOperation() throws Exception { + double before = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_ERROR); + try { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "error.client", null).get(); + fail("Expected authorization provider error"); + } catch (ExecutionException e) { + // Expected. + } + double after = getAuthorizationOperations(AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + TopicOperation.PRODUCE.name().toLowerCase(), AuthorizationMetrics.RESULT_ERROR); + + assertTrue(after - before == 1.0d); + } + + private double getAuthorizationOperations(String resourceType, String operation, String result) { + Double sample = CollectorRegistry.defaultRegistry.getSampleValue( + AuthorizationMetrics.AUTHORIZATION_OPERATIONS_METRIC_NAME, + new String[] {"resource_type", "operation", "result"}, + new String[] {resourceType, operation, result}); + return sample == null ? 0.0d : sample; + } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java index 9f9cd92cd0a75..e254dfb50bed6 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authorization/MockAuthorizationProvider.java @@ -38,6 +38,9 @@ public class MockAuthorizationProvider implements AuthorizationProvider { private CompletableFuture shouldPass(String role) { + if (role != null && role.startsWith("error")) { + return CompletableFuture.failedFuture(new RuntimeException("Authorization provider error")); + } return CompletableFuture.completedFuture(role != null && role.startsWith("pass")); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 7ecb6f2bdfd4b..3b144f3730aab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -374,7 +374,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws this.statsUpdater = new SingleThreadNonConcurrentFixedRateScheduler("pulsar-stats-updater"); this.authorizationService = new AuthorizationService( - pulsar.getConfiguration(), pulsar().getPulsarResources()); + pulsar.getConfiguration(), pulsar().getPulsarResources(), pulsar.getOpenTelemetry().getOpenTelemetry()); this.entryFilterProvider = new EntryFilterProvider(pulsar.getConfiguration()); pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java new file mode 100644 index 0000000000000..061e4785e354b --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryAuthorizationStatsTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; +import static org.testng.AssertJUnit.fail; +import io.opentelemetry.api.common.Attributes; +import java.util.HashSet; +import java.util.concurrent.ExecutionException; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.authorization.AuthorizationService; +import org.apache.pulsar.broker.authorization.MockAuthorizationProvider; +import org.apache.pulsar.broker.authorization.metrics.AuthorizationMetrics; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.NamespaceOperation; +import org.apache.pulsar.common.policies.data.TopicOperation; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class OpenTelemetryAuthorizationStatsTest extends BrokerTestBase { + + private AuthorizationService authorizationService; + + @BeforeMethod(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.baseSetup(); + + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setAuthorizationEnabled(true); + conf.setAuthorizationProvider(MockAuthorizationProvider.class.getName()); + HashSet proxyRoles = new HashSet<>(); + proxyRoles.add("pass.proxy"); + proxyRoles.add("fail.proxy"); + conf.setProxyRoles(proxyRoles); + authorizationService = new AuthorizationService(conf, null, pulsar.getOpenTelemetry().getOpenTelemetry()); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) { + super.customizeMainPulsarTestContextBuilder(builder); + builder.enableOpenTelemetry(true); + } + + @Test + public void testAuthorizationSuccess() throws Exception { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "pass.client", null).get(); + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + AuthorizationMetrics.OPERATION_KEY, "produce", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_SUCCESS), + 1); + } + + @Test + public void testAuthorizationFailure() throws Exception { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "fail.client", null).get(); + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + AuthorizationMetrics.OPERATION_KEY, "produce", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), + 1); + } + + @Test + public void testAuthorizationFailureForInvalidOriginalPrincipal() throws Exception { + authorizationService.allowNamespaceOperationAsync(NamespaceName.get("public/default"), + NamespaceOperation.PACKAGES, "pass.client", "pass.not-proxy", null).get(); + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_NAMESPACE, + AuthorizationMetrics.OPERATION_KEY, "packages", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_FAILURE), + 1); + } + + @Test + public void testAuthorizationError() throws Exception { + try { + authorizationService.allowTopicOperationAsync(TopicName.get("topic"), + TopicOperation.PRODUCE, null, "error.client", null).get(); + fail("Expected authorization provider error"); + } catch (ExecutionException e) { + // Expected. + } + + assertMetricLongSumValue(pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(), + AuthorizationMetrics.AUTHORIZATION_COUNTER_METRIC_NAME, + Attributes.of(AuthorizationMetrics.RESOURCE_TYPE_KEY, AuthorizationMetrics.RESOURCE_TYPE_TOPIC, + AuthorizationMetrics.OPERATION_KEY, "produce", + AuthorizationMetrics.RESULT_KEY, AuthorizationMetrics.RESULT_ERROR), + 1); + } +}