Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.ignite.internal.processors.cache.binary.MetadataRemoveProposedMessageSerializer;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessageSerializer;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessageSerializer;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessageSerializer;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
Expand Down Expand Up @@ -168,5 +170,6 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
new TxTimeoutOnPartitionMapExchangeChangeMessageSerializer());
factory.register((short)510, UserAcceptedMessage::new, new UserAcceptedMessageSerializer());
factory.register((short)511, UserProposedMessage::new, new UserProposedMessageSerializer());
factory.register((short)512, MetadataUpdateProposedMessage::new, new MetadataUpdateProposedMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
//coordinator receives update request
if (metaVerInfo != null) {
if (metaVerInfo.removing()) {
msg.markRejected(new BinaryObjectException("The type is removing now [typeId=" + typeId + ']'));
msg.markRejected("The type is removing now [typeId=" + typeId + ']');

pendingVer = REMOVED_VERSION;
acceptedVer = REMOVED_VERSION;
Expand Down Expand Up @@ -589,7 +589,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
catch (BinaryObjectException err) {
log.warning("Exception with merging metadata for typeId: " + typeId, err);

msg.markRejected(err);
msg.markRejected(err.getMessage());
}
}
}
Expand All @@ -602,7 +602,7 @@ private final class MetadataUpdateProposedListener implements CustomEventListene
MetadataUpdateResultFuture fut = unlabeledFutures.poll();

if (msg.rejected())
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionErrorMessage()));
else {
if (clientNode) {
boolean success = casBinaryMetadata(typeId, new BinaryMetadataVersionInfo(msg.metadata(), pendingVer, acceptedVer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@
package org.apache.ignite.internal.processors.cache.binary;

import java.util.UUID;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryMetadataHandler;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.marshaller.Marshallers.jdk;

/**
* <b>MetadataUpdateProposedMessage</b> and {@link MetadataUpdateAcceptedMessage} messages make a basis for
* discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches.
Expand Down Expand Up @@ -70,33 +76,50 @@
* it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with <b>accepted version</b>
* equals to <b>pending version</b> of this metadata to the moment when is was initially read by the thread.
*/
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage {
public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** */
private final IgniteUuid id = IgniteUuid.randomUuid();
@Order(0)
IgniteUuid id;

/** Node UUID which initiated metadata update. */
private final UUID origNodeId;
@Order(1)
UUID origNodeId;

/** */
private BinaryMetadata metadata;

/** Serialized {@link #metadata}. */
@Order(value = 2, method = "metadataBytes")
@SuppressWarnings("unused")
byte[] metadataBytesHolder;

/** Metadata type id. */
private final int typeId;
@Order(3)
int typeId;

/** Metadata version which is pending for update. */
private int pendingVer;
@Order(4)
int pendingVer;

/** Metadata version which is already accepted by entire cluster. */
private int acceptedVer;
@Order(5)
int acceptedVer;

/** Message acceptance status. */
private ProposalStatus status = ProposalStatus.SUCCESSFUL;
@Order(6)
boolean rejected;

/** */
private BinaryObjectException err;
@Order(7)
String errMsg;

/** Constructor. */
public MetadataUpdateProposedMessage() {
// No-op.
}

/**
* @param metadata {@link BinaryMetadata} requested to be updated.
Expand All @@ -106,6 +129,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
assert origNodeId != null;
assert metadata != null;

id = IgniteUuid.randomUuid();
this.origNodeId = origNodeId;

this.metadata = metadata;
Expand All @@ -123,7 +147,7 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
* {@inheritDoc}
*/
@Nullable @Override public DiscoveryCustomMessage ackMessage() {
return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
return !rejected ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
}

/**
Expand All @@ -140,25 +164,25 @@ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
}

/**
* @param err Error caused this update to be rejected.
* @param errMsg Error message caused this update to be rejected.
*/
void markRejected(BinaryObjectException err) {
status = ProposalStatus.REJECTED;
this.err = err;
void markRejected(String errMsg) {
rejected = true;
this.errMsg = errMsg;
}

/**
*
*/
boolean rejected() {
return status == ProposalStatus.REJECTED;
return rejected;
}

/**
*
*/
BinaryObjectException rejectionError() {
return err;
String rejectionErrorMessage() {
return errMsg;
}

/**
Expand Down Expand Up @@ -210,20 +234,42 @@ public void metadata(BinaryMetadata metadata) {
this.metadata = metadata;
}

/**
* @return Serialized binary metadata.
*/
public byte[] metadataBytes() {
try {
return U.marshal(jdk(), metadata);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to marshal binary metadata", e);
}
}

/**
* @param metadataBytes Serialized binary metadata.
*/
public void metadataBytes(byte[] metadataBytes) {
if (metadataBytes != null && metadata == null) {
try {
metadata = U.unmarshal(jdk(), metadataBytes, U.gridClassLoader());
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to unmarshal binary metadata", e);
}
}
}

/**
*
*/
public int typeId() {
return typeId;
}

/** Message acceptance status. */
private enum ProposalStatus {
/** */
SUCCESSFUL,

/** */
REJECTED
/** {@inheritDoc} */
@Override public short directType() {
return 512;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,6 @@ org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage$ProposalStatus
org.apache.ignite.internal.processors.cache.binary.MetadataUpdateResult$ResultType
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$BlockSetCallable
org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager$QueueHeaderPredicate
Expand Down