Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@

<allow class="org.apache.kafka.server.util.TopicFilter.IncludeList" />
<allow class="org.apache.kafka.test.TestUtils" />
<!-- ServerTestUtils uses yammer metrics for test cleanup -->
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="com.yammer.metrics.core" />
<subpackage name="timer">
<allow class="org.apache.kafka.server.util.MockTime" />
<allow class="org.apache.kafka.server.util.ShutdownableThread" />
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
<subpackage name="purgatory">
<allow pkg="kafka.utils" />
<allow pkg="org.apache.kafka.server.storage.log" />
<allow pkg="org.apache.kafka.server.util" />
</subpackage>
</subpackage>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.test.NoRetryException;
import org.apache.kafka.test.TestUtils;

Expand Down Expand Up @@ -184,7 +185,7 @@ public void setup() {

@AfterEach
public void tearDown() {
kafka.utils.TestUtils.clearYammerMetrics();
ServerTestUtils.clearYammerMetrics();
}

@ClusterTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
Expand Down Expand Up @@ -123,7 +124,7 @@ public class DelayedShareFetchTest {

@BeforeEach
public void setUp() {
kafka.utils.TestUtils.clearYammerMetrics();
ServerTestUtils.clearYammerMetrics();
mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper",
new SystemTimer("DelayedShareFetchTestTimer"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.ServerTestUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
Expand Down Expand Up @@ -173,7 +174,7 @@ public class SharePartitionManagerTest {
@BeforeEach
public void setUp() {
time = new MockTime();
kafka.utils.TestUtils.clearYammerMetrics();
ServerTestUtils.clearYammerMetrics();
brokerTopicStats = new BrokerTopicStats();
mockReplicaManager = mock(ReplicaManager.class);
Partition partition = mockPartition();
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/java/kafka/server/share/SharePartitionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@

import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.yammerMetricValue;
import static org.apache.kafka.server.util.ServerTestUtils.clearYammerMetrics;
import static org.apache.kafka.server.util.ServerTestUtils.yammerMetricValue;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -138,7 +139,7 @@ public class SharePartitionTest {

@BeforeEach
public void setUp() {
kafka.utils.TestUtils.clearYammerMetrics();
clearYammerMetrics();
mockTimer = new MockTimer();
sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID, TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.kafka.server.{ClientMetricsManager, ServerSocketFactory}
import org.apache.kafka.server.common.{MetadataVersion, TransactionVersion}
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.timer.SystemTimer
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag, TestInfo}
Expand Down Expand Up @@ -349,7 +350,7 @@ abstract class QuorumTestHarness extends Logging {
}
Exit.resetExitProcedure()
Exit.resetHaltProcedure()
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
Configuration.setConfiguration(null)
faultHandler.maybeRethrowFirstException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, MockConfig
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.partition.AlterPartitionListener
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.server.util.MockAlterPartitionManager
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
Expand Down Expand Up @@ -65,7 +66,7 @@ class AbstractPartitionTest {

@BeforeEach
def setup(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()

val logProps = createLogProperties(Map.empty)
logConfig = new LogConfig(logProps)
Expand Down Expand Up @@ -111,7 +112,7 @@ class AbstractPartitionTest {
if (tmpDir.exists()) {
logManager.shutdown()
Utils.delete(tmpDir)
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.{ConnectionThrottledException, SocketServer, SocketServerConfigs, TooManyConnectionsException}
import org.apache.kafka.server.config.{QuotaConfig, ReplicationConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.MockTime
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api._
Expand Down Expand Up @@ -86,7 +87,7 @@ class ConnectionQuotasTest {
@BeforeEach
def setUp(): Unit = {
// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()

val metricsPackage = "kafka.network"
val metricsClassName = "ConnectionQuotasTest"
Expand All @@ -109,7 +110,7 @@ class ConnectionQuotasTest {
connectionQuotas.close()
}
metrics.close()
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
blockedPercentMeters.clear()
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.kafka.server.config.QuotaConfig
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.network.ConnectionDisconnectListener
import org.apache.kafka.server.quota.{ThrottleCallback, ThrottledChannel}
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
import org.apache.logging.log4j.{Level, LogManager}
import org.apache.logging.log4j.core.config.Configurator
Expand Down Expand Up @@ -82,7 +83,7 @@ class SocketServerTest {
val localAddress = InetAddress.getLoopbackAddress

// Clean-up any metrics left around by previous tests
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()

private val apiVersionManager = new SimpleApiVersionManager(ListenerType.BROKER, true,
() => new FinalizedFeatures(MetadataVersion.latestTesting(), util.Map.of[String, java.lang.Short], 0))
Expand Down Expand Up @@ -113,7 +114,7 @@ class SocketServerTest {
sockets.foreach(_.close())
sockets.clear()
Configurator.setLevel(kafkaLogger.getName, logLevelToRestore)
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}

def sendRequest(socket: Socket, request: Array[Byte], id: Option[Short] = None, flush: Boolean = true): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.AuthorizerTestServerInfo
import org.apache.kafka.security.authorizer.AclEntry.{WILDCARD_HOST, WILDCARD_PRINCIPAL_STRING}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -97,7 +98,7 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
override def tearDown(): Unit = {
authorizer1.close()
authorizer2.close()
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
super.tearDown()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kafka.server

import com.yammer.metrics.core.Gauge
import kafka.utils.TestUtils
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
import org.apache.kafka.common.metrics.Metrics
Expand All @@ -33,6 +32,7 @@ import org.apache.kafka.server.ResultWithPartitions
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.quota.ReplicationQuotaManager
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.storage.internals.log.LogAppendInfo
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
import org.junit.jupiter.api.Assertions._
Expand All @@ -48,7 +48,7 @@ class AbstractFetcherManagerTest {

@BeforeEach
def cleanMetricRegistry(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}

private def getMetricValue(name: String): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData}
import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.server.{PartitionFetchState, ReplicaState}
import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource

Expand All @@ -53,7 +54,7 @@ class AbstractFetcherThreadTest {

@BeforeEach
def cleanMetricRegistry(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}

private def allMetricsNames: Set[String] = KafkaYammerMetrics.defaultRegistry().allMetrics().asScala.keySet.map(_.getName)
Expand Down Expand Up @@ -3850,4 +3851,4 @@ class AbstractFetcherThreadTest {
// LogEndOffset is unchanged
assertEquals(0, replicaState.logEndOffset)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.server

import org.apache.kafka.common.test.api.ClusterTest
import kafka.utils.TestUtils
import org.apache.kafka.common.test.ClusterInstance
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions.assertEquals

Expand All @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
class BrokerMetricNamesTest(cluster: ClusterInstance) {
@AfterEach
def tearDown(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}

@ClusterTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package kafka.server

import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.DelayedProduce
import org.apache.kafka.server.util.ServerTestUtils
import org.junit.jupiter.api.{AfterEach, Test}
import org.junit.jupiter.api.Assertions._

Expand All @@ -30,7 +30,7 @@ class DelayedProduceTest {

@AfterEach
def tearDown(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}

@Test
Expand Down
3 changes: 2 additions & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
import org.apache.kafka.server.share.context.{FinalContext, ShareSessionContext}
import org.apache.kafka.server.share.session.{ShareSession, ShareSessionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, RecordValidationStats, UnifiedLog}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
Expand Down Expand Up @@ -162,7 +163,7 @@ class KafkaApisTest extends Logging {
Utils.swallow(this.logger.underlying, () => quotas.shutdown())
if (kafkaApis != null)
Utils.swallow(this.logger.underlying, () => kafkaApis.close())
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
metrics.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.util.ServerTestUtils

import org.apache.kafka.server.quota.{ReplicaQuota, ReplicationQuotaManager}
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogConfig, LogManager, RecordValidationStats, UnifiedLog}
Expand Down Expand Up @@ -82,7 +83,7 @@ class ReplicaFetcherThreadTest {

@AfterEach
def cleanup(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}

private def createReplicaFetcherThread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPa
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation.{ADD_PARTITION, GENERIC_ERROR_SUPPORTED}
import org.apache.kafka.server.util.ServerTestUtils
import org.apache.kafka.server.util.timer.{MockTimer, SystemTimer}
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
Expand Down Expand Up @@ -108,7 +109,7 @@ import scala.jdk.OptionConverters.{RichOption, RichOptional}
object ReplicaManagerTest {
@AfterAll
def tearDownClass(): Unit = {
TestUtils.clearYammerMetrics()
ServerTestUtils.clearYammerMetrics()
}
}

Expand Down
5 changes: 0 additions & 5 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1396,11 +1396,6 @@ object TestUtils extends Logging {
}.sum
}

def clearYammerMetrics(): Unit = {
for (metricName <- KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala)
KafkaYammerMetrics.defaultRegistry.removeMetric(metricName)
}

/**
* Find an Authorizer that we can call createAcls or deleteAcls on.
*/
Expand Down
Loading
Loading