diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 66189823f4aba..2c2655cad43e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -43,6 +43,14 @@ import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasAckMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateAckMessageSerializer; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage; +import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessageSerializer; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer; import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; @@ -157,6 +165,10 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new, new TcpDiscoveryServerOnlyCustomEventMessageSerializer()); factory.register((short)23, TcpConnectionRequestDiscoveryMessage::new, new TcpConnectionRequestDiscoveryMessageSerializer()); + factory.register((short)24, DistributedMetaStorageUpdateMessage::new, new DistributedMetaStorageUpdateMessageSerializer()); + factory.register((short)25, DistributedMetaStorageUpdateAckMessage::new, new DistributedMetaStorageUpdateAckMessageSerializer()); + factory.register((short)26, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer()); + factory.register((short)27, DistributedMetaStorageCasAckMessage::new, new DistributedMetaStorageCasAckMessageSerializer()); // DiscoveryCustomMessage factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java index 30dda350ec66a..8c3b7b7e14450 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasAckMessage.java @@ -18,19 +18,27 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; /** */ -class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage { +public class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAckMessage { /** */ private static final long serialVersionUID = 0L; /** */ - private final boolean updated; + @Order(0) + boolean updated; + + /** Empty constructor of {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageCasAckMessage() { + // No-op. + } /** */ - public DistributedMetaStorageCasAckMessage(UUID reqId, String errorMsg, boolean updated) { - super(reqId, errorMsg); + public DistributedMetaStorageCasAckMessage(UUID reqId, boolean updated) { + super(reqId); this.updated = updated; } @@ -40,6 +48,11 @@ public boolean updated() { return updated; } + /** {@inheritDoc} */ + @Override public short directType() { + return 27; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DistributedMetaStorageCasAckMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java index fa279596cba85..17532aff37be4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageCasMessage.java @@ -18,26 +18,36 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** */ -class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage { +public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage { /** */ private static final long serialVersionUID = 0L; - /** */ - private final byte[] expectedVal; + /** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */ + @Order(0) + byte[] expectedVal; /** */ - private boolean matches = true; + @Order(1) + boolean matches; + + /** Empty constructor for {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageCasMessage() { + // No-op. + } /** */ public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) { super(reqId, key, valBytes); expectedVal = expValBytes; + matches = true; } /** */ @@ -57,7 +67,12 @@ public boolean matches() { /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageCasAckMessage(requestId(), errorMessage(), matches); + return new DistributedMetaStorageCasAckMessage(requestId(), matches); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 26; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java index 147b98ceead25..c9c0d6c49f9a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java @@ -1128,9 +1128,6 @@ private void onUpdateMessage( ClusterNode node, DistributedMetaStorageUpdateMessage msg ) { - if (msg.errorMessage() != null) - return; - lock.writeLock().lock(); try { @@ -1166,17 +1163,11 @@ private void onAckMessage( GridFutureAdapter fut = updateFuts.remove(msg.requestId()); if (fut != null) { - String errorMsg = msg.errorMessage(); - - if (errorMsg == null) { - Boolean res = msg instanceof DistributedMetaStorageCasAckMessage - ? ((DistributedMetaStorageCasAckMessage)msg).updated() - : null; + Boolean res = msg instanceof DistributedMetaStorageCasAckMessage + ? ((DistributedMetaStorageCasAckMessage)msg).updated() + : null; - fut.onDone(res); - } - else - fut.onDone(new IllegalStateException(errorMsg)); + fut.onDone(res); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java index 9008f8b72579e..cbb435810fce8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateAckMessage.java @@ -18,32 +18,39 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** */ -class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage { +public class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Request ID. */ - private final UUID reqId; + @Order(1) + UUID reqId; - /** */ - private final String errorMsg; + /** Empty constructor of {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageUpdateAckMessage() { + // No-op. + } /** */ - public DistributedMetaStorageUpdateAckMessage(UUID reqId, String errorMsg) { + public DistributedMetaStorageUpdateAckMessage(UUID reqId) { + id = IgniteUuid.randomUuid(); this.reqId = reqId; - this.errorMsg = errorMsg; } /** {@inheritDoc} */ @@ -56,11 +63,6 @@ public UUID requestId() { return reqId; } - /** */ - public String errorMessage() { - return errorMsg; - } - /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { return null; @@ -80,6 +82,11 @@ public String errorMessage() { throw new UnsupportedOperationException(); } + /** {@inheritDoc} */ + @Override public short directType() { + return 25; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DistributedMetaStorageUpdateAckMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java index 3b9e462200133..3cc625dfea750 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageUpdateMessage.java @@ -18,39 +18,50 @@ package org.apache.ignite.internal.processors.metastorage.persistence; import java.util.UUID; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** */ -class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage { +public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + IgniteUuid id; /** Request ID. */ @GridToStringInclude - private final UUID reqId; + @Order(1) + UUID reqId; /** */ @GridToStringInclude - private final String key; + @Order(2) + String key; - /** */ - private final byte[] valBytes; + /** TODO: revise the external serialization https://issues.apache.org/jira/browse/IGNITE-28058. */ + @Order(3) + byte[] valBytes; - /** */ - private String errorMsg; + /** Empty constructor for {@link DiscoveryMessageFactory}. */ + public DistributedMetaStorageUpdateMessage() { + // No-op. + } /** */ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) { + id = IgniteUuid.randomUuid(); + this.reqId = reqId; this.key = key; this.valBytes = valBytes; @@ -76,14 +87,9 @@ public byte[] value() { return valBytes; } - /** */ - protected String errorMessage() { - return errorMsg; - } - /** {@inheritDoc} */ @Override @Nullable public DiscoveryCustomMessage ackMessage() { - return new DistributedMetaStorageUpdateAckMessage(reqId, errorMsg); + return new DistributedMetaStorageUpdateAckMessage(reqId); } /** {@inheritDoc} */ @@ -100,6 +106,11 @@ protected String errorMessage() { throw new UnsupportedOperationException("createDiscoCache"); } + /** {@inheritDoc} */ + @Override public short directType() { + return 24; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DistributedMetaStorageUpdateMessage.class, this);