diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index e30b0c453a7c5..b94f0df3ca02c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -1326,11 +1326,8 @@ private void initMessageFactory() throws IgniteCheckedException { for (IgniteComponentType compType : IgniteComponentType.values()) { MessageFactoryProvider f = compType.messageFactory(); - if (f != null) { - initProvider(f, resolvedClsLdr); - + if (f != null) compMsgs.add(f); - } } DiscoverySpi discoSpi = ctx.config().getDiscoverySpi(); @@ -1338,16 +1335,16 @@ private void initMessageFactory() throws IgniteCheckedException { if (discoSpi instanceof IgniteDiscoverySpi) { MessageFactoryProvider discoMsgs = ((IgniteDiscoverySpi)discoSpi).messageFactoryProvider(); - if (discoMsgs != null) { - initProvider(discoMsgs, resolvedClsLdr); - + if (discoMsgs != null) compMsgs.add(discoMsgs); - } } if (!compMsgs.isEmpty()) msgs = F.concat(msgs, compMsgs.toArray(new MessageFactoryProvider[compMsgs.size()])); + for (MessageFactoryProvider msg : msgs) + initProvider(msg, resolvedClsLdr); + msgFactory = new IgniteMessageFactoryImpl(msgs); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageFactoryMarshallerInitializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageFactoryMarshallerInitializationTest.java new file mode 100644 index 0000000000000..c35ff721f67f7 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageFactoryMarshallerInitializationTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.plugin.AbstractMarshallableMessageFactoryProvider; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.AbstractTestPluginProvider; +import org.apache.ignite.plugin.ExtensionRegistry; +import org.apache.ignite.plugin.PluginContext; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** */ +public class MessageFactoryMarshallerInitializationTest extends GridCommonAbstractTest { + /** */ + private static final AtomicInteger initCnt = new AtomicInteger(); + + /** */ + public MessageFactoryMarshallerInitializationTest() { + super(false); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + initCnt.set(0); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** */ + @Test + public void testInitFromPlugin() throws Exception { + checkInits( + getConfiguration().setPluginProviders(new PluginWithMessageFactory()), + 1); + } + + /** */ + @Test + public void testInitFromDiscovery() throws Exception { + checkInits( + getConfiguration().setDiscoverySpi(new DiscoverySpiWithExtraFactory()), + 1); + } + + /** */ + @Test + public void testInitFromPluginAndDiscovery() throws Exception { + checkInits( + getConfiguration() + .setPluginProviders(new PluginWithMessageFactory()) + .setDiscoverySpi(new DiscoverySpiWithExtraFactory()), + 2); + } + + /** + * @param cfg Configuration. + * @param exp Expected count. + */ + private void checkInits(IgniteConfiguration cfg, int exp) throws Exception { + assertEquals(0, initCnt.get()); + + startGrid(cfg); + + assertEquals(exp, initCnt.get()); + } + + /** Message factory provider, which counts initializations. */ + private static class TestMessageFactoryProvider extends AbstractMarshallableMessageFactoryProvider { + /** {@inheritDoc} */ + @Override public void registerAll(MessageFactory factory) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void init(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, ClassLoader resolvedClsLdr) { + super.init(dfltMarsh, schemaAwareMarsh, resolvedClsLdr); + + initCnt.incrementAndGet(); + } + } + + /** */ + private static class PluginWithMessageFactory extends AbstractTestPluginProvider { + /** {@inheritDoc} */ + @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) { + super.initExtensions(ctx, registry); + + registry.registerExtension(MessageFactoryProvider.class, new TestMessageFactoryProvider()); + } + + /** {@inheritDoc} */ + @Override public String name() { + return "PluginWithMessageFactory"; + } + + /** {@inheritDoc} */ + @Override public void validateNewNode(ClusterNode node, Serializable data) { + super.validateNewNode(node, data); + } + } + + /** */ + private static class DiscoverySpiWithExtraFactory extends TcpDiscoverySpi { + /** {@inheritDoc} */ + @Override public MessageFactoryProvider messageFactoryProvider() { + return new TestMessageFactoryProvider(); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 96c03839da5c1..40140614e7bf5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.communication.CompressedMessageTest; import org.apache.ignite.internal.managers.communication.DefaultEnumMapperTest; import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest; +import org.apache.ignite.internal.managers.communication.MessageFactoryMarshallerInitializationTest; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2Test; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2TestNoOptimizations; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest; @@ -153,6 +154,8 @@ DefaultEnumMapperTest.class, IgniteDataTransferObjectProcessorTest.class, CompressedMessageTest.class, + MessageFactoryMarshallerInitializationTest.class, + LogEvictionResultsTest.class, }) public class IgniteBasicTestSuite {