diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 221e7708ae047..868b6fe3ed642 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -17,14 +17,21 @@ package org.apache.ignite.internal.managers.discovery; +import java.util.function.Supplier; +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveAcceptedMessageSerializer; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacketSerializer; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -49,6 +56,10 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequestSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponseSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; @@ -65,6 +76,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryJoinRequestMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryLoopbackProblemMessageSerializer; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMarshallableMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryMetricsUpdateMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage; @@ -87,11 +99,35 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustomEventMessageSerializer; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessageSerializer; +import org.jetbrains.annotations.Nullable; -/** Message factory for discovery messages. */ +/** + * Message factory for discovery messages. Allows to create an enhanced {@link MessageFactory} allowing to create + * automated pre- and post- marshalling message serializer for {@link TcpDiscoveryMarshallableMessage}. + */ public class DiscoveryMessageFactory implements MessageFactoryProvider { + /** Custom data marshaller. */ + private final @Nullable Marshaller cstDataMarshall; + + /** Class loader for the custom data marshalling. */ + private final @Nullable ClassLoader cstDataMarshallClsLdr; + + /** + * @param cstDataMarshall Custom data marshaller. + * @param cstDataMarshallClsLdr Class loader for the custom data marshalling. + */ + public DiscoveryMessageFactory(@Nullable Marshaller cstDataMarshall, @Nullable ClassLoader cstDataMarshallClsLdr) { + assert cstDataMarshall == null && cstDataMarshallClsLdr == null || cstDataMarshall != null && cstDataMarshallClsLdr != null; + + this.cstDataMarshall = cstDataMarshall; + this.cstDataMarshallClsLdr = cstDataMarshallClsLdr; + } + /** {@inheritDoc} */ @Override public void registerAll(MessageFactory factory) { + factory = enhanceMessageFactory(factory); + + factory.register((short)-108, TcpDiscoveryCollectionMessage::new, new TcpDiscoveryCollectionMessageSerializer()); factory.register((short)-107, NodeSpecificData::new, new NodeSpecificDataSerializer()); factory.register((short)-106, DiscoveryDataPacket::new, new DiscoveryDataPacketSerializer()); factory.register((short)-105, TcpDiscoveryNodeFullMetricsMessage::new, @@ -127,6 +163,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer()); factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new, new TcpDiscoveryServerOnlyCustomEventMessageSerializer()); + factory.register((short)23, TcpDiscoveryClientReconnectMessage::new, new TcpDiscoveryClientReconnectMessageSerializer()); // DiscoveryCustomMessage factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer()); @@ -134,4 +171,67 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)502, MetadataRemoveAcceptedMessage::new, new MetadataRemoveAcceptedMessageSerializer()); factory.register((short)503, MetadataRemoveProposedMessage::new, new MetadataRemoveProposedMessageSerializer()); } + + /** + * @return Enhanced {@link MessageFactory} allowing to create automated pre- and post- marshalling message serializer + * for {@link TcpDiscoveryMarshallableMessage}. + */ + private MessageFactory enhanceMessageFactory(MessageFactory mf) { + if (cstDataMarshall == null || cstDataMarshallClsLdr == null) + return mf; + + return new MessageFactory() { + @Override public void register( + short directType, + Supplier supplier, + MessageSerializer serializer + ) throws IgniteException { + if (supplier.get() instanceof TcpDiscoveryMarshallableMessage) { + final MessageSerializer serializer0 = serializer; + + serializer = new MessageSerializer() { + private boolean marshallableMsg; + + @Override public boolean writeTo(Message msg, MessageWriter writer) { + if (msg instanceof TcpDiscoveryMarshallableMessage && !marshallableMsg) { + marshallableMsg = true; + + ((TcpDiscoveryMarshallableMessage)msg).prepareMarshal(cstDataMarshall); + } + + boolean res = serializer0.writeTo(msg, writer); + + if (res && marshallableMsg) + marshallableMsg = false; + + return res; + } + + @Override public boolean readFrom(Message msg, MessageReader reader) { + boolean res = serializer0.readFrom(msg, reader); + + if (res && msg instanceof TcpDiscoveryMarshallableMessage) + ((TcpDiscoveryMarshallableMessage)msg).finishUnmarshal(cstDataMarshall, cstDataMarshallClsLdr); + + return res; + } + }; + } + + mf.register(directType, supplier, serializer); + } + + @Override public void register(short directType, Supplier supplier) throws IgniteException { + mf.register(directType, supplier); + } + + @Override public Message create(short type) { + return mf.create(type); + } + + @Override public MessageSerializer serializer(short type) { + return mf.serializer(type); + } + }; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index be87617d45b8d..d59b9cfe9a4dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -788,8 +788,6 @@ private static void sleepEx(long millis, Runnable before, Runnable after) throws TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(node, discoveryData); - joinReqMsg.prepareMarshal(spi.marshaller()); - TcpDiscoveryNode nodef = node; joinReqMsg.spanContainer().span( @@ -2241,7 +2239,7 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { nodeAdded = true; - if (msg.topologyHistory() != null) + if (!F.isEmpty(msg.topologyHistory())) topHist.putAll(msg.topologyHistory()); } else { @@ -2310,8 +2308,6 @@ private void processNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage ms delayDiscoData.clear(); } - msg.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - locNode.setAttributes(msg.clientNodeAttributes()); clearNodeSensitiveData(locNode); @@ -2548,6 +2544,10 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms return; if (getLocalNodeId().equals(msg.creatorNodeId())) { + Collection pendingMsgs = msg.pendingMsgsMsg == null + ? Collections.emptyList() + : msg.pendingMsgsMsg.messages(); + if (reconnector != null) { assert msg.success() : msg; @@ -2558,7 +2558,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms reconnector = null; - for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { if (log.isDebugEnabled()) log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); @@ -2568,7 +2568,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms else { if (joinLatch.getCount() > 0) { if (msg.success()) { - for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { + for (TcpDiscoveryAbstractMessage pendingMsg : pendingMsgs) { if (log.isDebugEnabled()) log.debug("Process pending message on connect [msg=" + pendingMsg + ']'); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 01a3733da9823..27e21b05bedba 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -137,6 +137,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingRequest; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientPingResponse; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryClientReconnectMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCollectionMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryConnectionCheckMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryDiscardMessage; @@ -1124,8 +1125,6 @@ private void joinTopology() throws IgniteSpiException { TcpDiscoveryJoinRequestMessage joinReqMsg = new TcpDiscoveryJoinRequestMessage(locNode, discoveryData); - joinReqMsg.prepareMarshal(spi.marshaller()); - joinReqMsg.spanContainer().span( tracing.create(TraceableMessagesTable.traceName(joinReqMsg.getClass())) .addTag(SpanTags.tag(SpanTags.EVENT_NODE, SpanTags.ID), () -> locNode.id().toString()) @@ -2493,8 +2492,6 @@ else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) { if (addFinishMsg.clientDiscoData() != null) { addFinishMsg = new TcpDiscoveryNodeAddFinishedMessage(addFinishMsg); - addFinishMsg.prepareMarshal(spi.marshaller()); - msg = addFinishMsg; DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData(); @@ -3308,9 +3305,6 @@ private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { if (msg instanceof TraceableMessage) tracing.messages().beforeSend((TraceableMessage)msg); - if (msg instanceof TcpDiscoveryJoinRequestMessage) - ((TcpDiscoveryJoinRequestMessage)msg).prepareMarshal(spi.marshaller()); - sendMessageToClients(msg); List failedNodes; @@ -4144,7 +4138,7 @@ private void processJoinRequestMessage(final TcpDiscoveryJoinRequestMessage msg) Collection msgs = msgHist.messages(null, node); if (msgs != null) { - reconMsg.pendingMessages(msgs); + reconMsg.pendingMsgsMsg = new TcpDiscoveryCollectionMessage(msgs); reconMsg.success(true); } @@ -4869,8 +4863,6 @@ private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { addFinishMsg.clientDiscoData(msg.gridDiscoveryData()); addFinishMsg.clientNodeAttributes(node.attributes()); - - addFinishMsg.prepareMarshal(spi.marshaller()); } addFinishMsg = tracing.messages().branch(addFinishMsg, msg); @@ -5086,7 +5078,9 @@ else if (spiState == CONNECTING) joiningNodesDiscoDataList = new ArrayList<>(); topHist.clear(); - topHist.putAll(msg.topologyHistory()); + + if (!F.isEmpty(msg.topologyHistory())) + topHist.putAll(msg.topologyHistory()); pendingMsgs.reset(msg.messages()); } @@ -6966,8 +6960,6 @@ else if (e.hasCause(ObjectStreamException.class) || else if (msg instanceof TcpDiscoveryJoinRequestMessage) { TcpDiscoveryJoinRequestMessage req = (TcpDiscoveryJoinRequestMessage)msg; - req.finishUnmarshal(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); - // Current node holds connection with the node that is joining the cluster. Therefore, it can // save certificates with which the connection was established to joining node attributes. if (spi.nodeAuth != null && nodeId.equals(req.node().id())) @@ -7379,7 +7371,7 @@ private void processClientReconnectMessage(TcpDiscoveryClientReconnectMessage ms if (pending != null) { msg.verifierNodeId(locNodeId); - msg.pendingMessages(pending); + msg.pendingMsgsMsg = new TcpDiscoveryCollectionMessage(pending); msg.success(true); if (log.isDebugEnabled()) { diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java index 4658abb1456cd..30e9b1b73f810 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryIoSession.java @@ -79,7 +79,7 @@ public class TcpDiscoveryIoSession { private final Socket sock; /** */ - final DirectMessageWriter msgWriter; + private final DirectMessageWriter msgWriter; /** */ private final DirectMessageReader msgReader; @@ -91,7 +91,7 @@ public class TcpDiscoveryIoSession { private final CompositeInputStream in; /** Intermediate buffer for serializing discovery messages. */ - final ByteBuffer msgBuf; + private final ByteBuffer msgBuf; /** * Creates a new discovery I/O session bound to the given socket. diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java index 0d99d795a195a..c91dffcb5e7ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java @@ -2119,7 +2119,7 @@ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) { registerMBean(igniteInstanceName, new TcpDiscoverySpiMBeanImpl(this), TcpDiscoverySpiMBean.class); msgFactory = new IgniteMessageFactoryImpl( - new MessageFactoryProvider[] { new DiscoveryMessageFactory() }); + new MessageFactoryProvider[] { new DiscoveryMessageFactory(marshaller(), U.resolveClassLoader(ignite().configuration())) }); impl.spiStart(igniteInstanceName); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java index 3a826058c06b2..493d8c6f4253e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryClientReconnectMessage.java @@ -17,30 +17,39 @@ package org.apache.ignite.spi.discovery.tcp.messages; -import java.util.Collection; import java.util.Objects; import java.util.UUID; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; /** * Message telling that client node is reconnecting to topology. */ @TcpDiscoveryEnsureDelivery -public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage { +public class TcpDiscoveryClientReconnectMessage extends TcpDiscoveryAbstractMessage implements Message { /** */ private static final long serialVersionUID = 0L; /** New router nodeID. */ - private final UUID routerNodeId; + @Order(0) + public UUID routerNodeId; /** Last message ID. */ - private final IgniteUuid lastMsgId; + @Order(1) + public IgniteUuid lastMsgId; - /** Pending messages. */ - @GridToStringExclude - private Collection msgs; + /** Pending messages holder. */ + @Order(2) + @Nullable public TcpDiscoveryCollectionMessage pendingMsgsMsg; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryClientReconnectMessage() { + // No-op. + } /** * @param creatorNodeId Creator node ID. @@ -68,20 +77,6 @@ public IgniteUuid lastMessageId() { return lastMsgId; } - /** - * @param msgs Pending messages. - */ - public void pendingMessages(Collection msgs) { - this.msgs = msgs; - } - - /** - * @return Pending messages. - */ - public Collection pendingMessages() { - return msgs; - } - /** * @param success Success flag. */ @@ -111,6 +106,11 @@ public boolean success() { Objects.equals(lastMsgId, other.lastMsgId); } + /** {@inheritDoc} */ + @Override public short directType() { + return 23; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpDiscoveryClientReconnectMessage.class, this, "super", super.toString()); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java new file mode 100644 index 0000000000000..bdaae72d2592d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryCollectionMessage.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.jetbrains.annotations.Nullable; + +/** + * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + * Message to transfer a collection of {@link TcpDiscoveryAbstractMessage}. + * Several of them might be a {@link Message}, several may not and require the original marshalling. + * Allows not to keep old-style marshalling for entyre messages collection if just one of them is not a {@link Message}. + * Should be removed when all the messages implement {@link Message}. + */ +public class TcpDiscoveryCollectionMessage implements TcpDiscoveryMarshallableMessage { + /** Pending messages which can be serialized with {@link MessageSerializer}. */ + @Order(0) + @Nullable Map writableMsgs; + + /** Marshallable or Java-serializable pending messages which still require old-style serialization. */ + @Nullable Map marshallableMsgs; + + /** Marshalled {@link #marshallableMsgs}. */ + @Order(1) + @GridToStringExclude + @Nullable byte[] marshallableMsgsBytes; + + /** Constructor for {@link DiscoveryMessageFactory}. */ + public TcpDiscoveryCollectionMessage() { + // No-op. + } + + /** @param msgs Discovery messages to hold. */ + public TcpDiscoveryCollectionMessage(Collection msgs) { + writableMsgs = null; + marshallableMsgsBytes = null; + marshallableMsgs = null; + + if (F.isEmpty(msgs)) + return; + + // Keeps the original message order. + int idx = 0; + + for (TcpDiscoveryAbstractMessage m : msgs) { + if (m instanceof Message) { + if (writableMsgs == null) + writableMsgs = U.newHashMap(msgs.size()); + + writableMsgs.put(idx++, (Message)m); + + continue; + } + + if (marshallableMsgs == null) + marshallableMsgs = U.newHashMap(msgs.size()); + + marshallableMsgs.put(idx++, m); + } + } + + /** @param marsh marshaller. */ + @Override public void prepareMarshal(Marshaller marsh) { + if (marshallableMsgs != null && marshallableMsgsBytes == null) { + try { + marshallableMsgsBytes = U.marshal(marsh, marshallableMsgs); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal marshallable pending messages.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (marshallableMsgsBytes != null && marshallableMsgs == null) { + try { + marshallableMsgs = U.unmarshal(marsh, marshallableMsgsBytes, clsLdr); + + marshallableMsgsBytes = null; + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal marshallable pending messages.", e); + } + } + } + + /** + * Gets pending messages sent to new node by its previous. + * + * @return Pending messages from previous node. + */ + public Collection messages() { + if (F.isEmpty(writableMsgs) && F.isEmpty(marshallableMsgs)) + return Collections.emptyList(); + + int totalSz = (F.isEmpty(writableMsgs) ? 0 : writableMsgs.size()) + + (F.isEmpty(marshallableMsgs) ? 0 : marshallableMsgs.size()); + + List res = new ArrayList<>(totalSz); + + for (int i = 0; i < totalSz; ++i) { + Message m = F.isEmpty(writableMsgs) ? null : writableMsgs.get(i); + + if (m == null) { + TcpDiscoveryAbstractMessage adm = marshallableMsgs.get(i); + + assert adm != null; + + res.add(adm); + } + else { + assert marshallableMsgs == null || marshallableMsgs.get(i) == null; + assert m instanceof TcpDiscoveryAbstractMessage; + + res.add((TcpDiscoveryAbstractMessage)m); + } + } + + return res; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -108; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TcpDiscoveryCollectionMessage.class, this, "super", super.toString()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java index 6ddbb8f7af2c4..d0c37dadb16ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java @@ -23,7 +23,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; @@ -33,7 +32,7 @@ * Initial message sent by a node that wants to enter topology. * Sent to random node during SPI start. Then forwarded directly to coordinator. */ -public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { +public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -95,10 +94,8 @@ public void responded(boolean responded) { setFlag(RESPONDED_FLAG_POS, responded); } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) { if (node != null && nodeBytes == null) { try { nodeBytes = U.marshal(marsh, node); @@ -109,11 +106,8 @@ public void prepareMarshal(Marshaller marsh) { } } - /** - * @param marsh Marshaller. - * @param clsLdr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { if (nodeBytes != null && node == null) { try { node = U.unmarshal(marsh, nodeBytes, clsLdr); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java new file mode 100644 index 0000000000000..cf1854275c565 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryMarshallableMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp.messages; + +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; + +/** + * Base class for TCP Discovery messages which still require external pre- and post- marshalling. + *
+ * TODO: Remove/revise after https://issues.apache.org/jira/browse/IGNITE-25883 + */ +public interface TcpDiscoveryMarshallableMessage extends Message { + /** + * Should be idempotent. + * + * @param marsh Marshaller. + */ + void prepareMarshal(Marshaller marsh); + + /** + * Should be idempotent. + * + * @param marsh Marshaller. + * @param clsLdr Class loader. + */ + void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr); +} diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java index 86fecaed00fa3..544a7b3e660f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java @@ -27,7 +27,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; import org.jetbrains.annotations.Nullable; @@ -36,7 +35,7 @@ */ @TcpDiscoveryEnsureDelivery @TcpDiscoveryRedirectToClient -public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements Message { +public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractTraceableMessage implements TcpDiscoveryMarshallableMessage { /** */ private static final long serialVersionUID = 0L; @@ -85,6 +84,7 @@ public TcpDiscoveryNodeAddFinishedMessage(TcpDiscoveryNodeAddFinishedMessage msg nodeId = msg.nodeId; clientDiscoData = msg.clientDiscoData; clientNodeAttrs = msg.clientNodeAttrs; + clientNodeAttrsBytes = msg.clientNodeAttrsBytes; } /** @@ -112,24 +112,24 @@ public void clientDiscoData(DiscoveryDataPacket clientDiscoData) { assert clientDiscoData == null || !clientDiscoData.hasDataFromNode(nodeId); } - /** - * @return Client node attributes. - */ - public Map clientNodeAttributes() { + /** @return Client attributes. */ + public @Nullable Map clientNodeAttributes() { return clientNodeAttrs; } /** - * @param clientNodeAttrs New client node attributes. + * Sets new client attributes and ensures that thet will be serialized. + * + * @param attrs Client attributes. */ - public void clientNodeAttributes(Map clientNodeAttrs) { - this.clientNodeAttrs = clientNodeAttrs; + public void clientNodeAttributes(@Nullable Map attrs) { + clientNodeAttrs = F.isEmpty(attrs) ? null : attrs; + // Ensure new data will be serialized. + clientNodeAttrsBytes = null; } - /** - * @param marsh Marshaller. - */ - public void prepareMarshal(Marshaller marsh) { + /** {@inheritDoc} */ + @Override public void prepareMarshal(Marshaller marsh) { if (clientNodeAttrs != null && clientNodeAttrsBytes == null) { try { clientNodeAttrsBytes = U.marshal(marsh, clientNodeAttrs); @@ -140,14 +140,9 @@ public void prepareMarshal(Marshaller marsh) { } } - /** - * @param marsh Marshaller. - * @param clsLdr Class loader. - */ - public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { - if (F.isEmpty(clientNodeAttrsBytes)) - clientNodeAttrs = null; - else { + /** {@inheritDoc} */ + @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) { + if (clientNodeAttrsBytes != null && clientNodeAttrs == null) { try { clientNodeAttrs = U.unmarshal(marsh, clientNodeAttrsBytes, clsLdr); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java index cb6c40576fe0a..44f44be77f700 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java @@ -101,7 +101,7 @@ private static DirectByteBufferStream createStream(ByteBuffer buff) { @Override public MessageSerializer serializer(short type) { return null; } - }, null); + }); stream.setBuffer(buff); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java index 6adada121f281..f6eaab9c754ce 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/discovery/IgniteDiscoveryMessageSerializationTest.java @@ -24,6 +24,6 @@ public class IgniteDiscoveryMessageSerializationTest extends AbstractMessageSerializationTest { /** {@inheritDoc} */ @Override protected MessageFactoryProvider messageFactory() { - return new DiscoveryMessageFactory(); + return new DiscoveryMessageFactory(null, null); } }