Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.spi.discovery.zk.internal;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.IgniteMessageFactoryImpl;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
import org.apache.ignite.spi.IgniteSpiException;

import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.makeMessageType;

/**
* Class is responsible for serializing discovery messages using RU-ready {@link MessageSerializer} mechanism.
*/
public class DiscoveryMessageParser {
/** Leading byte for messages use {@link JdkMarshaller} for serialization. */
// TODO: remove these flags after refactoring all discovery messages.

Check warning on line 50 in modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/DiscoveryMessageParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Complete the task associated to this TODO comment.

See more on https://sonarcloud.io/project/issues?id=apache_ignite&issues=AZy9wGz73rRGJkHAMmY5&open=AZy9wGz73rRGJkHAMmY5&pullRequest=12860
private static final byte JAVA_SERIALIZATION = (byte)1;

/** Leading byte for messages use {@link MessageSerializer} for serialization. */
private static final byte MESSAGE_SERIALIZATION = (byte)2;

/** Size for an intermediate buffer for serializing discovery messages. */
private static final int MSG_BUFFER_SIZE = 100;

/** */
private final MessageFactory msgFactory;

/** */
private final Marshaller marsh;

/** */
public DiscoveryMessageParser(Marshaller marsh) {
this.marsh = marsh;
this.msgFactory = new IgniteMessageFactoryImpl(
new MessageFactoryProvider[] { new DiscoveryMessageFactory() });
}

/** Marshals discovery message to bytes array. */
public byte[] marshalZip(DiscoveryCustomMessage msg) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();

try (DeflaterOutputStream out = new DeflaterOutputStream(baos)) {
if (msg instanceof Message) {
out.write(MESSAGE_SERIALIZATION);

serializeMessage((Message)msg, out);
}
else {
out.write(JAVA_SERIALIZATION);

U.marshal(marsh, msg, out);
}
}
catch (Exception e) {
throw new IgniteSpiException("Failed to serialize message: " + msg, e);
}

return baos.toByteArray();
}

/** Unmarshals discovery message from bytes array. */
public DiscoveryCustomMessage unmarshalZip(byte[] bytes) {
try (
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
InflaterInputStream in = new InflaterInputStream(bais)
) {
byte mode = (byte)in.read();

if (mode == JAVA_SERIALIZATION)
return U.unmarshal(marsh, in, U.gridClassLoader());

if (MESSAGE_SERIALIZATION != mode)
throw new IOException("Received unexpected byte while reading discovery message: " + mode);

return (DiscoveryCustomMessage)deserializeMessage(in);
}
catch (Exception e) {
throw new IgniteSpiException("Failed to deserialize message.", e);
}
}

/** */
private void serializeMessage(Message m, OutputStream out) throws IOException {
DirectMessageWriter msgWriter = new DirectMessageWriter(msgFactory);
ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);

msgWriter.setBuffer(msgBuf);

MessageSerializer msgSer = msgFactory.serializer(m.directType());

boolean finished;

do {
// Should be cleared before first operation.
msgBuf.clear();

finished = msgSer.writeTo(m, msgWriter);

out.write(msgBuf.array(), 0, msgBuf.position());
}
while (!finished);
}

/** */
private Message deserializeMessage(InputStream in) throws IOException {
DirectMessageReader msgReader = new DirectMessageReader(msgFactory, null);
ByteBuffer msgBuf = ByteBuffer.allocate(MSG_BUFFER_SIZE);

msgReader.setBuffer(msgBuf);

Message msg = msgFactory.create(makeMessageType((byte)in.read(), (byte)in.read()));
MessageSerializer msgSer = msgFactory.serializer(msg.directType());

boolean finished;

do {
int read = in.read(msgBuf.array(), msgBuf.position(), msgBuf.remaining());

if (read == -1)
throw new EOFException("Stream closed before message was fully read.");

msgBuf.limit(msgBuf.position() + read);
msgBuf.rewind();

finished = msgSer.readFrom(msg, msgReader);

if (!finished)
msgBuf.compact();
}
while (!finished);

return msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.util.UUID;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;

/**
*
Expand All @@ -38,8 +37,8 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
/** */
final String evtPath;

/** Message instance (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */
DiscoverySpiCustomMessage msg;
/** Message (can be marshalled as part of ZkDiscoveryCustomEventData or stored in separate znode. */
byte[] msgBytes;

/** Unmarshalled message. */
transient DiscoveryCustomMessage resolvedMsg;
Expand All @@ -57,7 +56,7 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
long origEvtId,
long topVer,
UUID sndNodeId,
DiscoverySpiCustomMessage msg,
DiscoveryCustomMessage msg,
String evtPath
) {
super(evtId, ZK_EVT_CUSTOM_EVT, topVer);
Expand All @@ -66,11 +65,23 @@ class ZkDiscoveryCustomEventData extends ZkDiscoveryEventData {
assert msg != null || origEvtId != 0 || !F.isEmpty(evtPath);

this.origEvtId = origEvtId;
this.msg = msg;
this.resolvedMsg = msg;
this.sndNodeId = sndNodeId;
this.evtPath = evtPath;
}

/** */
public void prepareMarshal(DiscoveryMessageParser parser) {
if (resolvedMsg != null)
msgBytes = parser.marshalZip(resolvedMsg);
}

/** */
public void finishUnmarshal(DiscoveryMessageParser parser) {
if (msgBytes != null)
resolvedMsg = parser.unmarshalZip(msgBytes);
}

/**
* @return {@code True} for custom event ack message.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ public class ZookeeperDiscoveryImpl {
/** */
private final ZookeeperDiscoveryStatistics stats;

/** */
private final DiscoveryMessageParser msgParser;

/**
* @param spi Discovery SPI.
* @param igniteInstanceName Instance name.
Expand Down Expand Up @@ -262,6 +265,8 @@ public ZookeeperDiscoveryImpl(
this.evtsAckThreshold = evtsAckThreshold;

this.stats = stats;

msgParser = new DiscoveryMessageParser(marsh);
}

/**
Expand Down Expand Up @@ -666,14 +671,7 @@ public void sendCustomMessage(DiscoveryCustomMessage msg) {
if (!hasServerNode)
throw new IgniteException("Failed to send custom message: no server nodes in topology.");

byte[] msgBytes;

try {
msgBytes = marshalZip(msg);
}
catch (IgniteCheckedException e) {
throw new IgniteSpiException("Failed to marshal custom message: " + msg, e);
}
byte[] msgBytes = msgParser.marshalZip(msg);

while (!busyLock.enterBusy())
checkState();
Expand Down Expand Up @@ -1486,6 +1484,8 @@ private void generateNoServersEvent(ZkDiscoveryEventsData evtsData, Stat evtsSta
new ZkNoServersMessage(),
null);

evtData.prepareMarshal(msgParser);

Collection<ZookeeperClusterNode> nodesToAck = Collections.emptyList();

evtsData.addEvent(nodesToAck, evtData);
Expand Down Expand Up @@ -1514,11 +1514,13 @@ private void previousCoordinatorCleanup(ZkDiscoveryEventsData lastEvts) throws E
if (evtData instanceof ZkDiscoveryCustomEventData) {
ZkDiscoveryCustomEventData evtData0 = (ZkDiscoveryCustomEventData)evtData;

evtData0.finishUnmarshal(msgParser);

// It is possible previous coordinator failed before finished cleanup.
if (evtData0.msg instanceof ZkCommunicationErrorResolveFinishMessage) {
if (evtData0.resolvedMsg instanceof ZkCommunicationErrorResolveFinishMessage) {
try {
ZkCommunicationErrorResolveFinishMessage msg =
(ZkCommunicationErrorResolveFinishMessage)evtData0.msg;
(ZkCommunicationErrorResolveFinishMessage)evtData0.resolvedMsg;

ZkCommunicationErrorResolveResult res = unmarshalZip(
ZkDistributedCollectDataFuture.readResult(rtState.zkClient, zkPaths, msg.futId));
Expand Down Expand Up @@ -2472,7 +2474,7 @@ private void generateCustomEvents(List<String> customEvtNodes) throws Exception
DiscoveryCustomMessage msg;

try {
msg = unmarshalZip(evtBytes);
msg = msgParser.unmarshalZip(evtBytes);
}
catch (Exception e) {
U.error(log, "Failed to unmarshal custom discovery message: " + e, e);
Expand Down Expand Up @@ -2746,19 +2748,16 @@ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Excep
if (evtData0.ackEvent() && evtData0.topologyVersion() < locNode.order())
break;

DiscoveryCustomMessage msg;
evtData0.finishUnmarshal(msgParser);

if (rtState.crd) {
if (rtState.crd)
assert evtData0.resolvedMsg != null : evtData0;

msg = evtData0.resolvedMsg;
}
else {
if (evtData0.msg == null) {
if (evtData0.resolvedMsg == null) {
if (evtData0.ackEvent()) {
String path = zkPaths.ackEventDataPath(evtData0.origEvtId);

msg = unmarshalZip(zkClient.getData(path));
evtData0.resolvedMsg = msgParser.unmarshalZip(zkClient.getData(path));
}
else {
assert evtData0.evtPath != null : evtData0;
Expand All @@ -2767,19 +2766,15 @@ private void processNewEvents(final ZkDiscoveryEventsData evtsData) throws Excep
evtData0.evtPath,
evtData0.sndNodeId);

msg = unmarshalZip(msgBytes);
evtData0.resolvedMsg = msgParser.unmarshalZip(msgBytes);
}
}
else
msg = evtData0.msg;

evtData0.resolvedMsg = msg;
}

if (msg instanceof ZkInternalMessage)
processInternalMessage(evtData0, (ZkInternalMessage)msg);
if (evtData0.resolvedMsg instanceof ZkInternalMessage)
processInternalMessage(evtData0, (ZkInternalMessage)evtData0.resolvedMsg);
else {
notifyCustomEvent(evtData0, msg);
notifyCustomEvent(evtData0, evtData0.resolvedMsg);

if (!evtData0.ackEvent())
updateNodeInfo = true;
Expand Down Expand Up @@ -3455,7 +3450,7 @@ private void onCommunicationErrorResolveStatusReceived(final ZkRuntimeState rtSt
msg,
null);

evtData.resolvedMsg = msg;
evtData.prepareMarshal(msgParser);

evtsData.addEvent(rtState.top.nodesByOrder.values(), evtData);

Expand Down Expand Up @@ -3770,7 +3765,7 @@ private ZkDiscoveryCustomEventData createAckEvent(

long evtId = rtState.evtsData.evtIdGen;

byte[] ackBytes = marshalZip(ack);
byte[] ackBytes = msgParser.marshalZip(ack);

String path = zkPaths.ackEventDataPath(origEvt.eventId());

Expand Down
Loading