From 3ce7878d7e32c1c477424bd1d813d5cb152e238e Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 15 May 2026 11:18:08 +0300 Subject: [PATCH 1/2] impl --- .../dht/PartitionUpdateCountersMessage.java | 47 ++++++++++--------- .../cache/transactions/IgniteTxHandler.java | 2 + .../transactions/IgniteTxLocalAdapter.java | 2 + 3 files changed, 29 insertions(+), 22 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index 55a90f49a3374..e0c280cc9aebe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -19,35 +19,37 @@ import java.util.Arrays; import java.util.Map; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.MarshallableMessage; import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageFactory; /** * Partition update counters message. + * + * @see #finishUpdating() */ -public class PartitionUpdateCountersMessage implements MarshallableMessage { +public class PartitionUpdateCountersMessage implements Message { /** */ private static final int ITEM_SIZE = 4 /* partition */ + 8 /* initial counter */ + 8 /* updates count */; - /** Byte representation of partition counters. */ + /** */ @Order(0) - byte[] data; + int cacheId; - /** */ + /** Byte representation of partition counters. */ @Order(1) - int cacheId; + byte[] data; /** */ - private int size; + @Order(2) + int size; /** Used for assigning counters to cache entries during tx finish. */ private Map counters; - /** */ + /** Empty constructor for a {@link MessageFactory}. */ public PartitionUpdateCountersMessage() { // No-op. } @@ -120,6 +122,8 @@ public long updatesCount(int idx) { * @param part Partition number. * @param init Init partition counter. * @param updatesCnt Update counter delta. + * + * @see #finishUpdating() */ public void add(int part, long init, long updatesCnt) { ensureSpace(size + 1); @@ -131,6 +135,15 @@ public void add(int part, long init, long updatesCnt) { GridUnsafe.putLong(data, off, updatesCnt); } + /** Optimizes the memory used after adding counters with {@link #add(int, long, long)}. */ + public void finishUpdating() { + if (data != null && data.length != size) { + assert data.length > size; + + data = Arrays.copyOf(data, size * ITEM_SIZE); + } + } + /** * Calculate next counter for partition. * @@ -157,11 +170,11 @@ public Long nextCounter(int partId) { private void ensureSpace(int newSize) { int req = newSize * ITEM_SIZE; + // Calling of #finishUpdating() isn't mantatory. If not called, let's do not use too much extra memory. if (data.length < req) - data = Arrays.copyOf(data, data.length << 1); + data = Arrays.copyOf(data, (int)(data.length * 1.33f)); } - /** {@inheritDoc} */ @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -182,14 +195,4 @@ private void ensureSpace(int newSize) { ", cntrs=" + sb + '}'; } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(Marshaller marsh) throws IgniteCheckedException { - data = Arrays.copyOf(data, size * ITEM_SIZE); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(Marshaller marsh, ClassLoader clsLdr) throws IgniteCheckedException { - size = data == null ? 0 : data.length / ITEM_SIZE; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 10f38a3d105e1..392f37d32b060 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -2197,6 +2197,8 @@ public void applyPartitionsUpdatesCounters( resCntrs.add(part, partCntrs.initialCounter(i), partCntrs.updatesCount(i)); } + resCntrs.finishUpdating(); + if (resCntrs.size() > 0) res.add(resCntrs); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 71c79fe8f9bac..bf2ea8938c245 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -475,6 +475,8 @@ public void calculatePartitionUpdateCounters() throws IgniteTxRollbackCheckedExc } } + msg.finishUpdating(); + if (msg.size() > 0) cntrMsgs.add(msg); } From fd409747b53c0ce32cf2cc84f4ad9894847d5e8f Mon Sep 17 00:00:00 2001 From: Vladimir Steshin Date: Fri, 15 May 2026 21:31:19 +0300 Subject: [PATCH 2/2] minor --- .../cache/distributed/dht/PartitionUpdateCountersMessage.java | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java index e0c280cc9aebe..3ad1cb40e4983 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/PartitionUpdateCountersMessage.java @@ -170,7 +170,6 @@ public Long nextCounter(int partId) { private void ensureSpace(int newSize) { int req = newSize * ITEM_SIZE; - // Calling of #finishUpdating() isn't mantatory. If not called, let's do not use too much extra memory. if (data.length < req) data = Arrays.copyOf(data, (int)(data.length * 1.33f)); }