Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@

import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheStatisticsModeChangeMessageSerializer;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageCasMessageSerializer;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessage;
import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageUpdateMessageSerializer;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
Expand Down Expand Up @@ -123,6 +127,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)21, TcpDiscoveryCustomEventMessage::new, new TcpDiscoveryCustomEventMessageSerializer());
factory.register((short)22, TcpDiscoveryServerOnlyCustomEventMessage::new,
new TcpDiscoveryServerOnlyCustomEventMessageSerializer());
factory.register((short)23, DistributedMetaStorageUpdateMessage::new, new DistributedMetaStorageUpdateMessageSerializer());
factory.register((short)24, DistributedMetaStorageCasMessage::new, new DistributedMetaStorageCasMessageSerializer());

// DiscoveryCustomMessage
factory.register((short)500, CacheStatisticsModeChangeMessage::new, new CacheStatisticsModeChangeMessageSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ class DistributedMetaStorageCasAckMessage extends DistributedMetaStorageUpdateAc
private final boolean updated;

/** */
public DistributedMetaStorageCasAckMessage(UUID reqId, String errorMsg, boolean updated) {
super(reqId, errorMsg);
public DistributedMetaStorageCasAckMessage(UUID reqId, boolean updated) {
super(reqId);

this.updated = updated;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,46 @@
package org.apache.ignite.internal.processors.metastorage.persistence;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;

/** */
class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
public class DistributedMetaStorageCasMessage extends DistributedMetaStorageUpdateMessage {
/** */
private static final long serialVersionUID = 0L;

/** */
private final byte[] expectedVal;
@Order(4)
byte[] expectedVal;

/** */
private boolean matches = true;
@Order(5)
boolean matches;

/** Empty constructor for {@link DiscoveryMessageFactory}. */
public DistributedMetaStorageCasMessage() {
// No-op.
}

/** */
public DistributedMetaStorageCasMessage(UUID reqId, String key, byte[] expValBytes, byte[] valBytes) {
super(reqId, key, valBytes);

expectedVal = expValBytes;
matches = true;
}

/** */
public byte[] expectedValue() {
return expectedVal;
}

/** */
public void setMatches(boolean matches) {
this.matches = matches;
}

/** */
public boolean matches() {
return matches;
/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return new DistributedMetaStorageCasAckMessage(reqId, matches);
}

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return new DistributedMetaStorageCasAckMessage(requestId(), errorMessage(), matches);
@Override public short directType() {
return 24;
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,16 +1128,13 @@ private void onUpdateMessage(
ClusterNode node,
DistributedMetaStorageUpdateMessage msg
) {
if (msg.errorMessage() != null)
return;

lock.writeLock().lock();

try {
if (msg instanceof DistributedMetaStorageCasMessage)
completeCas((DistributedMetaStorageCasMessage)msg);
else
completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
completeWrite(new DistributedMetaStorageHistoryItem(msg.key, msg.valBytes));
}
catch (IgniteInterruptedCheckedException e) {
throw U.convertException(e);
Expand Down Expand Up @@ -1166,17 +1163,11 @@ private void onAckMessage(
GridFutureAdapter<Boolean> fut = updateFuts.remove(msg.requestId());

if (fut != null) {
String errorMsg = msg.errorMessage();

if (errorMsg == null) {
Boolean res = msg instanceof DistributedMetaStorageCasAckMessage
? ((DistributedMetaStorageCasAckMessage)msg).updated()
: null;
Boolean res = msg instanceof DistributedMetaStorageCasAckMessage
? ((DistributedMetaStorageCasAckMessage)msg).updated()
: null;

fut.onDone(res);
}
else
fut.onDone(new IllegalStateException(errorMsg));
fut.onDone(res);
}
}

Expand Down Expand Up @@ -1322,21 +1313,21 @@ private void completeWrite(
private void completeCas(
DistributedMetaStorageCasMessage msg
) throws IgniteCheckedException {
if (!msg.matches())
if (!msg.matches)
return;

Serializable oldVal = bridge.read(msg.key());
Serializable oldVal = bridge.read(msg.key);

Serializable expVal = unmarshal(marshaller, msg.expectedValue());
Serializable expVal = unmarshal(marshaller, msg.expectedVal);

if (!Objects.deepEquals(oldVal, expVal)) {
msg.setMatches(false);
msg.matches = false;

// Do nothing if expected value doesn't match with the actual one.
return;
}

completeWrite(new DistributedMetaStorageHistoryItem(msg.key(), msg.value()));
completeWrite(new DistributedMetaStorageHistoryItem(msg.key, msg.valBytes));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,8 @@ class DistributedMetaStorageUpdateAckMessage implements DiscoveryCustomMessage {
private final UUID reqId;

/** */
private final String errorMsg;

/** */
public DistributedMetaStorageUpdateAckMessage(UUID reqId, String errorMsg) {
public DistributedMetaStorageUpdateAckMessage(UUID reqId) {
this.reqId = reqId;
this.errorMsg = errorMsg;
}

/** {@inheritDoc} */
Expand All @@ -56,11 +52,6 @@ public UUID requestId() {
return reqId;
}

/** */
public String errorMessage() {
return errorMsg;
}

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,50 @@
package org.apache.ignite.internal.processors.metastorage.persistence;

import java.util.UUID;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.DiscoveryMessageFactory;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/** */
class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage {
public class DistributedMetaStorageUpdateMessage implements DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** */
private final IgniteUuid id = IgniteUuid.randomUuid();
@Order(0)
IgniteUuid id;

/** Request ID. */
@Order(1)
@GridToStringInclude
private final UUID reqId;
UUID reqId;

/** */
@Order(2)
@GridToStringInclude
private final String key;
String key;

/** */
private final byte[] valBytes;
@Order(3)
byte[] valBytes;

/** */
private String errorMsg;
/** Empty constructor for {@link DiscoveryMessageFactory}. */
public DistributedMetaStorageUpdateMessage() {
// No-op.
}

/** */
public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valBytes) {
id = IgniteUuid.randomUuid();

this.reqId = reqId;
this.key = key;
this.valBytes = valBytes;
Expand All @@ -61,29 +72,9 @@ public DistributedMetaStorageUpdateMessage(UUID reqId, String key, byte[] valByt
return id;
}

/** */
public UUID requestId() {
return reqId;
}

/** */
public String key() {
return key;
}

/** */
public byte[] value() {
return valBytes;
}

/** */
protected String errorMessage() {
return errorMsg;
}

/** {@inheritDoc} */
@Override @Nullable public DiscoveryCustomMessage ackMessage() {
return new DistributedMetaStorageUpdateAckMessage(reqId, errorMsg);
return new DistributedMetaStorageUpdateAckMessage(reqId);
}

/** {@inheritDoc} */
Expand All @@ -100,6 +91,11 @@ protected String errorMessage() {
throw new UnsupportedOperationException("createDiscoCache");
}

/** {@inheritDoc} */
@Override public short directType() {
return 23;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DistributedMetaStorageUpdateMessage.class, this);
Expand Down