From 88e2eb271867782408d1bb1a26ab34d8aa7ea7d3 Mon Sep 17 00:00:00 2001 From: Nikita Amelchev Date: Thu, 5 Mar 2026 14:10:07 +0300 Subject: [PATCH] IGNITE-28061 Migrate ZookeeperDiscoverySpi to new serialization framework for discovery custom messages IGNITE-28061 Migrate ZookeeperDiscoverySpi to new serialization framework for discovery custom messages 3 --- .../zk/internal/DiscoveryMessageParser.java | 168 ++++++++++++++++++ .../internal/ZkDiscoveryCustomEventData.java | 21 ++- .../zk/internal/ZookeeperDiscoveryImpl.java | 51 +++--- 3 files changed, 207 insertions(+), 33 deletions(-) create mode 100644 modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java new file mode 100644 index 0000000000000..9d742fbb0dd88 --- /dev/null +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.zk.internal; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.zip.DeflaterOutputStream; +import java.util.zip.InflaterInputStream; +import org.apache.ignite.internal.direct.DirectMessageReader; +import org.apache.ignite.internal.direct.DirectMessageWriter; +import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; +import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; +import org.apache.ignite.plugin.extensions.communication.MessageSerializer; +import org.apache.ignite.spi.IgniteSpiException; + +import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType; + +/** + * Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism. + */ +public class DiscoveryMessageParser { + /** Leading byte for messages use {@link JdkMarshaller} for serialization. */ + // TODO: remove these flags after refactoring all discovery messages. + private static final byte JAVA_SERIALIZATION = (byte)1; + + /** Leading byte for messages use {@link MessageSerializer} for serialization. */ + private static final byte MESSAGE_SERIALIZATION = (byte)2; + + /** Size for an intermediate buffer for serializing discovery messages. */ + private static final int MSG_BUFFER_SIZE = 100; + + /** */ + private final MessageFactory msgFactory; + + /** */ + private final Marshaller marsh; + + /** */ + public DiscoveryMessageParser(Marshaller marsh) { + this.marsh = marsh; + this.msgFactory = new IgniteMessageFactoryImpl( + new MessageFactoryProvider[] { new DiscoveryMessageFactory() }); + } + + /** Marshals discovery message to bytes array. */ + public byte[] marshalZip(DiscoveryCustomMessage msg) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) { + if (msg instanceof Message) { + out.write(MESSAGE_SERIALIZATION); + + serializeMessage((Message)msg, out); + } + else { + out.write(JAVA_SERIALIZATION); + + U.marshal(marsh, msg, out); + } + } + catch (Exception e) { + throw new IgniteSpiException("Failed to serialize message: " + msg, e); + } + + return baos.toByteArray(); + } + + /** Unmarshals discovery message from bytes array. */ + public DiscoveryCustomMessage unmarshalZip(byte[] bytes) { + try ( + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + InflaterInputStream in = new InflaterInputStream(bais) + ) { + byte mode = (byte)in.read(); + + if (mode == JAVA_SERIALIZATION) + return U.unmarshal(marsh, in, U.gridClassLoader()); + + if (MESSAGE_SERIALIZATION != mode) + throw new IOException("Received unexpected byte while reading discovery message: " + mode); + + return (DiscoveryCustomMessage)deserializeMessage(in); + } + catch (Exception e) { + throw new IgniteSpiException("Failed to deserialize message.", e); + } + } + + /** */ + private void serializeMessage(Message m, OutputStream out) throws IOException { + DirectMessageWriter msgWriter = new DirectMessageWriter(msgFactory); + ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + + msgWriter.setBuffer(msgBuf); + + MessageSerializer msgSer = msgFactory.serializer(m.directType()); + + boolean finished; + + do { + // Should be cleared before first operation. + msgBuf.clear(); + + finished = msgSer.writeTo(m, msgWriter); + + out.write(msgBuf.array(), 0, msgBuf.position()); + } + while (!finished); + } + + /** */ + private Message deserializeMessage(InputStream in) throws IOException { + DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null); + ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE); + + msgReader.setBuffer(msgBuf); + + Message msg = msgFactory.create(makeMessageType((byte)in.read(), (byte)in.read())); + MessageSerializer msgSer = msgFactory.serializer(msg.directType()); + + boolean finished; + + do { + int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining()); + + if (read == -1) + throw new EOFException("Stream closed before message was fully read."); + + msgBuf.limit(msgBuf.position() + read); + msgBuf.rewind(); + + finished = msgSer.readFrom(msg, msgReader); + + if (!finished) + msgBuf.compact(); + } + while (!finished); + + return msg; + } +} diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java index a4db36079e8f0..87754e169c353 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZkDiscoveryCustomEventData.java @@ -20,7 +20,6 @@ import java.util.UUID; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; /** * @@ -38,8 +37,8 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { /** */ final String evtPath; - /** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */ - DiscoverySpiCustomMessage msg; + /** Message (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */ + byte[] msgBytes; /** Unmarshalled message. */ transient DiscoveryCustomMessage resolvedMsg; @@ -57,7 +56,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { long origEvtId, long topVer, UUID sndNodeId, - DiscoverySpiCustomMessage msg, + DiscoveryCustomMessage msg, String evtPath ) { super(evtId, ZK_EVT_CUSTOM_EVT, topVer); @@ -66,11 +65,23 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData { assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath); this.origEvtId = origEvtId; - this.msg = msg; + this.resolvedMsg = msg; this.sndNodeId = sndNodeId; this.evtPath = evtPath; } + /** */ + public void prepareMarshal(DiscoveryMessageParser parser) { + if (resolvedMsg != null) + msgBytes = parser.marshalZip(resolvedMsg); + } + + /** */ + public void finishUnmarshal(DiscoveryMessageParser parser) { + if (msgBytes != null) + resolvedMsg = parser.unmarshalZip(msgBytes); + } + /** * @return {@code True} for custom event ack message. */ diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index fd414cf3da01c..915ede5c1fe9d 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -216,6 +216,9 @@ public class ZookeeperDiscoveryImpl { /** */ private final ZookeeperDiscoveryStatistics stats; + /** */ + private final DiscoveryMessageParser msgParser; + /** * @param spi Discovery SPI. * @param igniteInstanceName Instance name. @@ -262,6 +265,8 @@ public ZookeeperDiscoveryImpl( this.evtsAckThreshold = evtsAckThreshold; this.stats = stats; + + msgParser = new DiscoveryMessageParser(marsh); } /** @@ -666,14 +671,7 @@ public void sendCustomMessage(DiscoveryCustomMessage msg) { if (!hasServerNode) throw new IgniteException("Failed to send custom message: no server nodes in topology."); - byte[] msgBytes; - - try { - msgBytes = marshalZip(msg); - } - catch (IgniteCheckedException e) { - throw new IgniteSpiException("Failed to marshal custom message: " + msg, e); - } + byte[] msgBytes = msgParser.marshalZip(msg); while (!busyLock.enterBusy()) checkState(); @@ -1486,6 +1484,8 @@ private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat evtsSta new ZkNoServersMessage(), null); + evtData.prepareMarshal(msgParser); + Collection nodesToAck = Collections.emptyList(); evtsData.addEvent(nodesToAck, evtData); @@ -1514,11 +1514,13 @@ private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) throws E if (evtData instanceof ZkDiscoveryCustomEventData) { ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData; + evtData0.finishUnmarshal(msgParser); + // It is possible previous coordinator failed before finished cleanup. - if (evtData0.msg instanceof ZkCommunicationErrorResolveFinishMessage) { + if (evtData0.resolvedMsg instanceof ZkCommunicationErrorResolveFinishMessage) { try { ZkCommunicationErrorResolveFinishMessage msg = - (ZkCommunicationErrorResolveFinishMessage)evtData0.msg; + (ZkCommunicationErrorResolveFinishMessage)evtData0.resolvedMsg; ZkCommunicationErrorResolveResult res = unmarshalZip( ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, msg.futId)); @@ -2472,7 +2474,7 @@ private void generateCustomEvents(List customEvtNodes) throws Exception DiscoveryCustomMessage msg; try { - msg = unmarshalZip(evtBytes); + msg = msgParser.unmarshalZip(evtBytes); } catch (Exception e) { U.error(log, "Failed to unmarshal custom discovery message: " + e, e); @@ -2746,19 +2748,16 @@ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Excep if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order()) break; - DiscoveryCustomMessage msg; + evtData0.finishUnmarshal(msgParser); - if (rtState.crd) { + if (rtState.crd) assert evtData0.resolvedMsg != null : evtData0; - - msg = evtData0.resolvedMsg; - } else { - if (evtData0.msg == null) { + if (evtData0.resolvedMsg == null) { if (evtData0.ackEvent()) { String path = zkPaths.ackEventDataPath(evtData0.origEvtId); - msg = unmarshalZip(zkClient.getData(path)); + evtData0.resolvedMsg = msgParser.unmarshalZip(zkClient.getData(path)); } else { assert evtData0.evtPath != null : evtData0; @@ -2767,19 +2766,15 @@ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Excep evtData0.evtPath, evtData0.sndNodeId); - msg = unmarshalZip(msgBytes); + evtData0.resolvedMsg = msgParser.unmarshalZip(msgBytes); } } - else - msg = evtData0.msg; - - evtData0.resolvedMsg = msg; } - if (msg instanceof ZkInternalMessage) - processInternalMessage(evtData0, (ZkInternalMessage)msg); + if (evtData0.resolvedMsg instanceof ZkInternalMessage) + processInternalMessage(evtData0, (ZkInternalMessage)evtData0.resolvedMsg); else { - notifyCustomEvent(evtData0, msg); + notifyCustomEvent(evtData0, evtData0.resolvedMsg); if (!evtData0.ackEvent()) updateNodeInfo = true; @@ -3455,7 +3450,7 @@ private void onCommunicationErrorResolveStatusReceived(final ZkRuntimeState rtSt msg, null); - evtData.resolvedMsg = msg; + evtData.prepareMarshal(msgParser); evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData); @@ -3770,7 +3765,7 @@ private ZkDiscoveryCustomEventData createAckEvent( long evtId = rtState.evtsData.evtIdGen; - byte[] ackBytes = marshalZip(ack); + byte[] ackBytes = msgParser.marshalZip(ack); String path = zkPaths.ackEventDataPath(origEvt.eventId());