diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index f15a181026a50..fbffb1aa92c09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -41,6 +41,8 @@
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageSerializer;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
@@ -168,5 +170,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
new TxTimeoutOnPartitionMapExchangeChangeMessageSerializer());
factory.register((short)510, UserAcceptedMessage::new, new UserAcceptedMessageSerializer());
factory.register((short)511, UserProposedMessage::new, new UserProposedMessageSerializer());
+ factory.register((short)512, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageSerializer());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 0cfb2d96768b9..7c6c8a9dbc005 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -550,7 +550,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
//coordinator receives update request
if (metaVerInfo != null) {
if (metaVerInfo.removing()) {
- msg.markRejected(new BinaryObjectException("The type is removing now [typeId=" + typeId + ']'));
+ msg.markRejected("The type is removing now [typeId=" + typeId + ']');
pendingVer = REMOVED_VERSION;
acceptedVer = REMOVED_VERSION;
@@ -589,7 +589,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
catch (BinaryObjectException err) {
log.warning("Exception with merging metadata for typeId: " + typeId, err);
- msg.markRejected(err);
+ msg.markRejected(err.getMessage());
}
}
}
@@ -602,7 +602,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
MetadataUpdateResultFuture fut = unlabeledFutures.poll();
if (msg.rejected())
- fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
+ fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionErrorMessage()));
else {
if (clientNode) {
boolean success = casBinaryMetadata(typeId, new BinaryMetadataVersionInfo(msg.metadata(), pendingVer, acceptedVer));
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
index 4da098ab1014c..e136ea8a04a92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.processors.cache.binary;
import java.util.UUID;
-import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
@@ -25,9 +27,13 @@
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.marshaller.Marshallers.jdk;
+
/**
* MetadataUpdateProposedMessage and {@link MetadataUpdateAcceptedMessage} messages make a basis for
* discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches.
@@ -70,33 +76,50 @@
* it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with accepted version
* equals to pending version of this metadata to the moment when is was initially read by the thread.
*/
-public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage {
+public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;
/** */
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ IgniteUuid id;
/** Node UUID which initiated metadata update. */
- private final UUID origNodeId;
+ @Order(1)
+ UUID origNodeId;
/** */
private BinaryMetadata metadata;
+ /** Serialized {@link #metadata}. */
+ @Order(value = 2, method = "metadataBytes")
+ @SuppressWarnings("unused")
+ byte[] metadataBytesHolder;
+
/** Metadata type id. */
- private final int typeId;
+ @Order(3)
+ int typeId;
/** Metadata version which is pending for update. */
- private int pendingVer;
+ @Order(4)
+ int pendingVer;
/** Metadata version which is already accepted by entire cluster. */
- private int acceptedVer;
+ @Order(5)
+ int acceptedVer;
/** Message acceptance status. */
- private ProposalStatus status = ProposalStatus.SUCCESSFUL;
+ @Order(6)
+ boolean rejected;
/** */
- private BinaryObjectException err;
+ @Order(7)
+ String errMsg;
+
+ /** Constructor. */
+ public MetadataUpdateProposedMessage() {
+ // No-op.
+ }
/**
* @param metadata {@link BinaryMetadata} requested to be updated.
@@ -106,6 +129,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
assert origNodeId != null;
assert metadata != null;
+ id = IgniteUuid.randomUuid();
this.origNodeId = origNodeId;
this.metadata = metadata;
@@ -123,7 +147,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
* {@inheritDoc}
*/
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
- return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
+ return !rejected ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
}
/**
@@ -140,25 +164,25 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
}
/**
- * @param err Error caused this update to be rejected.
+ * @param errMsg Error message caused this update to be rejected.
*/
- void markRejected(BinaryObjectException err) {
- status = ProposalStatus.REJECTED;
- this.err = err;
+ void markRejected(String errMsg) {
+ rejected = true;
+ this.errMsg = errMsg;
}
/**
*
*/
boolean rejected() {
- return status == ProposalStatus.REJECTED;
+ return rejected;
}
/**
*
*/
- BinaryObjectException rejectionError() {
- return err;
+ String rejectionErrorMessage() {
+ return errMsg;
}
/**
@@ -210,6 +234,32 @@ public void metadata(BinaryMetadata metadata) {
this.metadata = metadata;
}
+ /**
+ * @return Serialized binary metadata.
+ */
+ public byte[] metadataBytes() {
+ try {
+ return U.marshal(jdk(), metadata);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal binary metadata", e);
+ }
+ }
+
+ /**
+ * @param metadataBytes Serialized binary metadata.
+ */
+ public void metadataBytes(byte[] metadataBytes) {
+ if (metadataBytes != null && metadata == null) {
+ try {
+ metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader());
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to unmarshal binary metadata", e);
+ }
+ }
+ }
+
/**
*
*/
@@ -217,13 +267,9 @@ public int typeId() {
return typeId;
}
- /** Message acceptance status. */
- private enum ProposalStatus {
- /** */
- SUCCESSFUL,
-
- /** */
- REJECTED
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 512;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index 501daa170d1fd..a475e63c8183f 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1032,7 +1032,6 @@ org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage
-org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage$ProposalStatus
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateResult$ResultType
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$BlockSetCallable
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate