diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 8255abdc2a871..721831616fd39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -19,6 +19,10 @@ import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage; import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessageSerializer; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket; @@ -123,6 +127,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer()); factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new, new TcpDiscoveryServerOnlyCustomEventMessageSerializer()); + factory.register((short)23, DistributedMetaStorageUpdateMessage::new, new DistributedMetaStorageUpdateMessageSerializer()); + factory.register((short)24, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer()); // DiscoveryCustomMessage factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java index 30dda350ec66a..c293b949ed7e4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java @@ -29,8 +29,8 @@ class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAc private final boolean updated; /** */ - public DistributedMetaStorageCasAckMessage(UUID reqId, String errorMsg, boolean updated) { - super(reqId, errorMsg); + public DistributedMetaStorageCasAckMessage(UUID reqId, boolean updated) { + super(reqId); this.updated = updated; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java index fa279596cba85..fb7e1bc08ad16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java @@ -18,46 +18,46 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** */ -class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage { +public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final byte[] expectedVal; + @Order(4) + byte[] expectedVal; /** */ - private boolean matches = true; + @Order(5) + boolean matches; + + /** Empty constructor for {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageCasMessage() { + // No-op. + } /** */ public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) { super(reqId, key, valBytes); expectedVal = expValBytes; + matches = true; } - /** */ - public byte[] expectedValue() { - return expectedVal; - } - - /** */ - public void setMatches(boolean matches) { - this.matches = matches; - } - - /** */ - public boolean matches() { - return matches; + /** {@inheritDoc} */ + @Override @Nullable public DiscoveryCustomMessage ackMessage() { + return new DistributedMetaStorageCasAckMessage(reqId, matches); } /** {@inheritDoc} */ - @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageCasAckMessage(requestId(), errorMessage(), matches); + @Override public short directType() { + return 24; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 147b98ceead25..738f39ab02583 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -1128,16 +1128,13 @@ private void onUpdateMessage( ClusterNode node, DistributedMetaStorageUpdateMessage msg ) { - if (msg.errorMessage() != null) - return; - lock.writeLock().lock(); try { if (msg instanceof DistributedMetaStorageCasMessage) completeCas((DistributedMetaStorageCasMessage)msg); else - completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value())); + completeWrite(new DistributedMetaStorageHistoryItem(msg.key, msg.valBytes)); } catch (IgniteInterruptedCheckedException e) { throw U.convertException(e); @@ -1166,17 +1163,11 @@ private void onAckMessage( GridFutureAdapter fut = updateFuts.remove(msg.requestId()); if (fut != null) { - String errorMsg = msg.errorMessage(); - - if (errorMsg == null) { - Boolean res = msg instanceof DistributedMetaStorageCasAckMessage - ? ((DistributedMetaStorageCasAckMessage)msg).updated() - : null; + Boolean res = msg instanceof DistributedMetaStorageCasAckMessage + ? ((DistributedMetaStorageCasAckMessage)msg).updated() + : null; - fut.onDone(res); - } - else - fut.onDone(new IllegalStateException(errorMsg)); + fut.onDone(res); } } @@ -1322,21 +1313,21 @@ private void completeWrite( private void completeCas( DistributedMetaStorageCasMessage msg ) throws IgniteCheckedException { - if (!msg.matches()) + if (!msg.matches) return; - Serializable oldVal = bridge.read(msg.key()); + Serializable oldVal = bridge.read(msg.key); - Serializable expVal = unmarshal(marshaller, msg.expectedValue()); + Serializable expVal = unmarshal(marshaller, msg.expectedVal); if (!Objects.deepEquals(oldVal, expVal)) { - msg.setMatches(false); + msg.matches = false; // Do nothing if expected value doesn't match with the actual one. return; } - completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value())); + completeWrite(new DistributedMetaStorageHistoryItem(msg.key, msg.valBytes)); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java index 9008f8b72579e..8f032ddfcef60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java @@ -38,12 +38,8 @@ class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage { private final UUID reqId; /** */ - private final String errorMsg; - - /** */ - public DistributedMetaStorageUpdateAckMessage(UUID reqId, String errorMsg) { + public DistributedMetaStorageUpdateAckMessage(UUID reqId) { this.reqId = reqId; - this.errorMsg = errorMsg; } /** {@inheritDoc} */ @@ -56,11 +52,6 @@ public UUID requestId() { return reqId; } - /** */ - public String errorMessage() { - return errorMsg; - } - /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { return null; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java index 3b9e462200133..e87a21a8c737a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java @@ -18,39 +18,50 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** */ -class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage { +public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Request ID. */ + @Order(1) @GridToStringInclude - private final UUID reqId; + UUID reqId; /** */ + @Order(2) @GridToStringInclude - private final String key; + String key; /** */ - private final byte[] valBytes; + @Order(3) + byte[] valBytes; - /** */ - private String errorMsg; + /** Empty constructor for {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageUpdateMessage() { + // No-op. + } /** */ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) { + id = IgniteUuid.randomUuid(); + this.reqId = reqId; this.key = key; this.valBytes = valBytes; @@ -61,29 +72,9 @@ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valByt return id; } - /** */ - public UUID requestId() { - return reqId; - } - - /** */ - public String key() { - return key; - } - - /** */ - public byte[] value() { - return valBytes; - } - - /** */ - protected String errorMessage() { - return errorMsg; - } - /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageUpdateAckMessage(reqId, errorMsg); + return new DistributedMetaStorageUpdateAckMessage(reqId); } /** {@inheritDoc} */ @@ -100,6 +91,11 @@ protected String errorMessage() { throw new UnsupportedOperationException("createDiscoCache"); } + /** {@inheritDoc} */ + @Override public short directType() { + return 23; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DistributedMetaStorageUpdateMessage.class, this);