From 81ff9b6bf379fba20f86c2139d2d2666bbd715aa Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Sat, 9 May 2026 03:37:39 +0800 Subject: [PATCH] [fix][client]Broker-side memory leak and client side repeatedly register a same producer if closes a producer which state is regitering schema --- .../broker/service/OneWayReplicatorTest.java | 46 +++++++------ .../SchemaCompatibilityCheckTest.java | 68 +++++++++++++++++++ .../pulsar/client/impl/ProducerImpl.java | 2 +- 3 files changed, 96 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 8665a8cd53ab4..c4f5bdeab74e7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -312,7 +312,7 @@ private static class Customer { int age; } - @Test(dataProvider = "autoUpdateSchemaParams") + @Test(dataProvider = "autoUpdateSchemaParams", timeOut = 60_000) public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema, Boolean allowAutoUpdateSchemaWithReplicator) throws Exception { final String ns = BrokerTestUtil.newUniqueName("public/ns"); @@ -331,17 +331,18 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema, RetentionPolicies retentionPolicies = new RetentionPolicies(10, 1); admin1.namespaces().setRetention(ns, retentionPolicies); admin2.namespaces().setRetention(ns, retentionPolicies); - PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName, false).join().get(); + AtomicReference topic1 = new AtomicReference<>((PersistentTopic) broker1 + .getTopic(topicName, false).join().get()); PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName, false).join().get(); Awaitility.await().untilAsserted(() -> { - HierarchyTopicPolicies policies1 = topic1.getHierarchyTopicPolicies(); + HierarchyTopicPolicies policies1 = topic1.get().getHierarchyTopicPolicies(); HierarchyTopicPolicies policies2 = topic2.getHierarchyTopicPolicies(); assertEquals(policies1.getSchemaCompatibilityStrategy().get(), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); assertEquals(policies2.getSchemaCompatibilityStrategy().get(), SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE); - assertTrue(topic1.isAllowAutoUpdateSchema); - assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator); + assertTrue(topic1.get().isAllowAutoUpdateSchema); + assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator); assertEquals(topic2.isAllowAutoUpdateSchema, isAllowAutoUpdateSchema); assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator); assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(), 10); @@ -405,8 +406,8 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema, admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, isAllowAutoUpdateSchema, allowAutoUpdateSchemaWithReplicator); Awaitility.await().untilAsserted(() -> { - assertTrue(topic1.isAllowAutoUpdateSchema); - assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator); + assertTrue(topic1.get().isAllowAutoUpdateSchema); + assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator); assertEquals(topic2.isAllowAutoUpdateSchema, isAllowAutoUpdateSchema); if (allowAutoUpdateSchemaWithReplicator != null && !allowAutoUpdateSchemaWithReplicator) { assertFalse(topic2.isAllowAutoUpdateSchemaWithReplicator); @@ -426,8 +427,19 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema, // The message can not be replicated to the remote side. TopicStats topicStats = admin1.topics().getStats(topicName); assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 1); - producer1.close(); - return; + // Change the policy to allow replicator update schemas. + admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, isAllowAutoUpdateSchema, true); + Awaitility.await().untilAsserted(() -> { + assertEquals(topic2.isAllowAutoUpdateSchema, isAllowAutoUpdateSchema); + assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator); + }); + // Unload topic. Highlight, please do not remove this line, it is in order to test whether the replication + // can be recovered from the following case: the internal producer of replicator is closed when it's state + // is registering schema. + admin1.topics().unload(topicName); + topic1.set((PersistentTopic) broker1.getTopic(topicName, false).join().get()); + waitReplicatorStarted(topicName); + //return; } Awaitility.await().untilAsserted(() -> { TopicStats topicStats = admin1.topics().getStats(topicName); @@ -464,16 +476,12 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema, assertEquals(msg21.getValue().getAge(), 16); consumer2.acknowledge(msg21); Message msg22 = consumer2.receive(5, TimeUnit.SECONDS); - if (allowAutoUpdateSchemaWithReplicator != null && !allowAutoUpdateSchemaWithReplicator) { - assertNull(msg22); - } else { - assertNotNull(msg22); - byte[] bytesVersion22 = msg22.getSchemaVersion(); - assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1); - assertEquals(msg22.getValue().getName(), "Apache"); - assertEquals(msg22.getValue().getAge(), 26); - consumer2.acknowledge(msg22); - } + assertNotNull(msg22); + byte[] bytesVersion22 = msg22.getSchemaVersion(); + assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1); + assertEquals(msg22.getValue().getName(), "Apache"); + assertEquals(msg22.getValue().getAge(), 26); + consumer2.acknowledge(msg22); // cleanup. consumer1.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java index 52c6628d37c98..27b03d18e0a42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.java @@ -29,15 +29,20 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import lombok.CustomLog; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; @@ -45,7 +50,11 @@ import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.api.schema.SchemaDefinition; +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.metrics.InstrumentProvider; import org.apache.pulsar.client.impl.schema.SchemaInfoImpl; +import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; @@ -57,6 +66,7 @@ import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.schema.MockExternalJsonSchema; import org.apache.pulsar.schema.Schemas; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -638,6 +648,64 @@ public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy s } + @Test + public void testCloseProducerWhenRegisteringNewSchema() throws Exception { + final String ns = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns"); + final String topic = "persistent://" + BrokerTestUtil.newUniqueName(ns + "/tp"); + admin.namespaces().createNamespace(ns); + admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(ns), + SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE); + }); + + // Injection: Let the handling response of registering schema delay, then we have enough time to close producer + // when it's state is registering schema. + CountDownLatch handleErrorSignal = new CountDownLatch(1); + ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString()); + PulsarClient injectedReplClient = InjectedClientCnxClientBuilder.create(clientBuilder, + (conf, eventLoopGroup) -> { + return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) { + + @Override + protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse response) { + if (response.hasErrorCode()) { + try { + handleErrorSignal.await(); + } catch (InterruptedException e) { + // Nothing to do. + } + } + super.handleGetOrCreateSchemaResponse(response); + } + }; + }); + + Producer producer = injectedReplClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create(); + // Registers a consumer to avoid client to close idle connections. + Consumer consumer = injectedReplClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("s1") + .topic(topic).subscribe(); + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + assertEquals(persistentTopic.getProducers().size(), 1); + producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new Schemas.PersonOne(1)).send(); + CompletableFuture send2 = producer.newMessage(Schema.AVRO(Schemas.PersonTwo.class)) + .value(new Schemas.PersonTwo(2, "2")).sendAsync(); + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertTrue(send2.isDone()); + assertTrue(send2.isCompletedExceptionally()); + // Since the producer was closed, the topic should maintain 0 producers. + assertEquals(persistentTopic.getProducers().size(), 0); + }); + handleErrorSignal.countDown(); + + // cleanup. + consumer.close(); + injectedReplClient.close(); + admin.topics().unload(topic); + } + @Test public void testExternalSchemaTypeCompatibility() throws Exception { String namespace = "test-namespace-" + randomName(16); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index cf5ba97f43669..ca733da998c2e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1262,7 +1262,7 @@ public synchronized CompletableFuture closeAsync() { closeProducerTasks(); ClientCnx cnx = cnx(); - if (cnx == null || currentState != State.Ready) { + if (cnx == null || (currentState != State.Ready && currentState != State.RegisteringSchema)) { log.info("Closed Producer (not connected)"); closeAndClearPendingMessages(); return CompletableFuture.completedFuture(null);