Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,35 +19,37 @@

import java.util.Arrays;
import java.util.Map;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.MarshallableMessage;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;

/**
* Partition update counters message.
*
* @see #finishUpdating()
*/
public class PartitionUpdateCountersMessage implements MarshallableMessage {
public class PartitionUpdateCountersMessage implements Message {
/** */
private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */;

/** Byte representation of partition counters. */
/** */
@Order(0)
byte[] data;
int cacheId;

/** */
/** Byte representation of partition counters. */
@Order(1)
int cacheId;
byte[] data;

/** */
private int size;
@Order(2)
int size;

/** Used for assigning counters to cache entries during tx finish. */
private Map<Integer, Long> counters;

/** */
/** Empty constructor for a {@link MessageFactory}. */
public PartitionUpdateCountersMessage() {
// No-op.
}
Expand Down Expand Up @@ -120,6 +122,8 @@ public long updatesCount(int idx) {
* @param part Partition number.
* @param init Init partition counter.
* @param updatesCnt Update counter delta.
*
* @see #finishUpdating()
*/
public void add(int part, long init, long updatesCnt) {
ensureSpace(size + 1);
Expand All @@ -131,6 +135,15 @@ public void add(int part, long init, long updatesCnt) {
GridUnsafe.putLong(data, off, updatesCnt);
}

/** Optimizes the memory used after adding counters with {@link #add(int, long, long)}. */
public void finishUpdating() {
if (data != null && data.length != size) {
assert data.length > size;

data = Arrays.copyOf(data, size * ITEM_SIZE);
}
}

/**
* Calculate next counter for partition.
*
Expand Down Expand Up @@ -158,10 +171,9 @@ private void ensureSpace(int newSize) {
int req = newSize * ITEM_SIZE;

if (data.length < req)
data = Arrays.copyOf(data, data.length << 1);
data = Arrays.copyOf(data, (int)(data.length * 1.33f));
}


/** {@inheritDoc} */
@Override public String toString() {
StringBuilder sb = new StringBuilder();
Expand All @@ -182,14 +194,4 @@ private void ensureSpace(int newSize) {
", cntrs=" + sb +
'}';
}

/** {@inheritDoc} */
@Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException {
data = Arrays.copyOf(data, size * ITEM_SIZE);
}

/** {@inheritDoc} */
@Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException {
size = data == null ? 0 : data.length / ITEM_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2197,6 +2197,8 @@ public void applyPartitionsUpdatesCounters(
resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i));
}

resCntrs.finishUpdating();

if (resCntrs.size() > 0)
res.add(resCntrs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,8 @@ public void calculatePartitionUpdateCounters() throws IgniteTxRollbackCheckedExc
}
}

msg.finishUpdating();

if (msg.size() > 0)
cntrMsgs.add(msg);
}
Expand Down
Loading