Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private static class Customer {
int age;
}

@Test(dataProvider = "autoUpdateSchemaParams")
@Test(dataProvider = "autoUpdateSchemaParams", timeOut = 60_000)
public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
Boolean allowAutoUpdateSchemaWithReplicator) throws Exception {
final String ns = BrokerTestUtil.newUniqueName("public/ns");
Expand All @@ -331,17 +331,18 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
RetentionPolicies retentionPolicies = new RetentionPolicies(10, 1);
admin1.namespaces().setRetention(ns, retentionPolicies);
admin2.namespaces().setRetention(ns, retentionPolicies);
PersistentTopic topic1 = (PersistentTopic) broker1.getTopic(topicName, false).join().get();
AtomicReference<PersistentTopic> topic1 = new AtomicReference<>((PersistentTopic) broker1
.getTopic(topicName, false).join().get());
PersistentTopic topic2 = (PersistentTopic) broker2.getTopic(topicName, false).join().get();
Awaitility.await().untilAsserted(() -> {
HierarchyTopicPolicies policies1 = topic1.getHierarchyTopicPolicies();
HierarchyTopicPolicies policies1 = topic1.get().getHierarchyTopicPolicies();
HierarchyTopicPolicies policies2 = topic2.getHierarchyTopicPolicies();
assertEquals(policies1.getSchemaCompatibilityStrategy().get(),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
assertEquals(policies2.getSchemaCompatibilityStrategy().get(),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
assertTrue(topic1.isAllowAutoUpdateSchema);
assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
assertTrue(topic1.get().isAllowAutoUpdateSchema);
assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator);
assertEquals(topic2.isAllowAutoUpdateSchema, isAllowAutoUpdateSchema);
assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
assertEquals(policies1.getRetentionPolicies().get().getRetentionTimeInMinutes(), 10);
Expand Down Expand Up @@ -405,8 +406,8 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, isAllowAutoUpdateSchema,
allowAutoUpdateSchemaWithReplicator);
Awaitility.await().untilAsserted(() -> {
assertTrue(topic1.isAllowAutoUpdateSchema);
assertTrue(topic1.isAllowAutoUpdateSchemaWithReplicator);
assertTrue(topic1.get().isAllowAutoUpdateSchema);
assertTrue(topic1.get().isAllowAutoUpdateSchemaWithReplicator);
assertEquals(topic2.isAllowAutoUpdateSchema, isAllowAutoUpdateSchema);
if (allowAutoUpdateSchemaWithReplicator != null && !allowAutoUpdateSchemaWithReplicator) {
assertFalse(topic2.isAllowAutoUpdateSchemaWithReplicator);
Expand All @@ -426,8 +427,19 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
// The message can not be replicated to the remote side.
TopicStats topicStats = admin1.topics().getStats(topicName);
assertEquals(topicStats.getReplication().get(cluster2).getReplicationBacklog(), 1);
producer1.close();
return;
// Change the policy to allow replicator update schemas.
admin2.namespaces().setIsAllowAutoUpdateSchemaAsync(ns, isAllowAutoUpdateSchema, true);
Awaitility.await().untilAsserted(() -> {
assertEquals(topic2.isAllowAutoUpdateSchema, isAllowAutoUpdateSchema);
assertTrue(topic2.isAllowAutoUpdateSchemaWithReplicator);
});
// Unload topic. Highlight, please do not remove this line, it is in order to test whether the replication
// can be recovered from the following case: the internal producer of replicator is closed when it's state
// is registering schema.
admin1.topics().unload(topicName);
topic1.set((PersistentTopic) broker1.getTopic(topicName, false).join().get());
waitReplicatorStarted(topicName);
//return;
}
Awaitility.await().untilAsserted(() -> {
TopicStats topicStats = admin1.topics().getStats(topicName);
Expand Down Expand Up @@ -464,16 +476,12 @@ public void testMultipleVersionSchemas(boolean isAllowAutoUpdateSchema,
assertEquals(msg21.getValue().getAge(), 16);
consumer2.acknowledge(msg21);
Message<Customer> msg22 = consumer2.receive(5, TimeUnit.SECONDS);
if (allowAutoUpdateSchemaWithReplicator != null && !allowAutoUpdateSchemaWithReplicator) {
assertNull(msg22);
} else {
assertNotNull(msg22);
byte[] bytesVersion22 = msg22.getSchemaVersion();
assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
assertEquals(msg22.getValue().getName(), "Apache");
assertEquals(msg22.getValue().getAge(), 26);
consumer2.acknowledge(msg22);
}
assertNotNull(msg22);
byte[] bytesVersion22 = msg22.getSchemaVersion();
assertEquals(ByteBuffer.wrap(bytesVersion22).getLong(), 1);
assertEquals(msg22.getValue().getName(), "Apache");
assertEquals(msg22.getValue().getAge(), 26);
consumer2.acknowledge(msg22);

// cleanup.
consumer1.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,32 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import lombok.CustomLog;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.InjectedClientCnxClientBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -57,6 +66,7 @@
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.MockExternalJsonSchema;
import org.apache.pulsar.schema.Schemas;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -638,6 +648,64 @@ public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy s

}

@Test
public void testCloseProducerWhenRegisteringNewSchema() throws Exception {
final String ns = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
final String topic = "persistent://" + BrokerTestUtil.newUniqueName(ns + "/tp");
admin.namespaces().createNamespace(ns);
admin.namespaces().setSchemaCompatibilityStrategy(ns, SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
Awaitility.await().untilAsserted(() -> {
assertEquals(admin.namespaces().getSchemaCompatibilityStrategy(ns),
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
});

// Injection: Let the handling response of registering schema delay, then we have enough time to close producer
// when it's state is registering schema.
CountDownLatch handleErrorSignal = new CountDownLatch(1);
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
PulsarClient injectedReplClient = InjectedClientCnxClientBuilder.create(clientBuilder,
(conf, eventLoopGroup) -> {
return new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {

@Override
protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse response) {
if (response.hasErrorCode()) {
try {
handleErrorSignal.await();
} catch (InterruptedException e) {
// Nothing to do.
}
}
super.handleGetOrCreateSchemaResponse(response);
}
};
});

Producer<byte[]> producer = injectedReplClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topic).create();
// Registers a consumer to avoid client to close idle connections.
Consumer consumer = injectedReplClient.newConsumer(Schema.AUTO_CONSUME()).subscriptionName("s1")
.topic(topic).subscribe();
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
assertEquals(persistentTopic.getProducers().size(), 1);
producer.newMessage(Schema.AVRO(Schemas.PersonOne.class)).value(new Schemas.PersonOne(1)).send();
CompletableFuture<MessageId> send2 = producer.newMessage(Schema.AVRO(Schemas.PersonTwo.class))
.value(new Schemas.PersonTwo(2, "2")).sendAsync();
producer.close();
Awaitility.await().untilAsserted(() -> {
assertTrue(send2.isDone());
assertTrue(send2.isCompletedExceptionally());
// Since the producer was closed, the topic should maintain 0 producers.
assertEquals(persistentTopic.getProducers().size(), 0);
});
handleErrorSignal.countDown();

// cleanup.
consumer.close();
injectedReplClient.close();
admin.topics().unload(topic);
}

@Test
public void testExternalSchemaTypeCompatibility() throws Exception {
String namespace = "test-namespace-" + randomName(16);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
closeProducerTasks();

ClientCnx cnx = cnx();
if (cnx == null || currentState != State.Ready) {
if (cnx == null || (currentState != State.Ready && currentState != State.RegisteringSchema)) {
log.info("Closed Producer (not connected)");
closeAndClearPendingMessages();
return CompletableFuture.completedFuture(null);
Expand Down
Loading