From 031b34b3d2a496cf379c85f3afa50f415a9732e5 Mon Sep 17 00:00:00 2001 From: zstan Date: Thu, 26 Feb 2026 13:12:28 +0300 Subject: [PATCH] IGNITE-27996 One SingleNodeMessage can be serialized numerous times during sending --- .../SnapshotPartitionsVerifyHandler.java | 4 +- .../util/distributed/SingleNodeMessage.java | 16 +- .../TestRecordingCommunicationSpi.java | 5 + .../SingleNodeMessageSerializationTest.java | 182 ++++++++++++++++++ .../testsuites/IgniteUtilSelfTestSuite.java | 2 + 5 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 8619b53fc02f0..c701d444901db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -327,7 +327,7 @@ private Map checkSnapshotFiles( closeAllComponents(snpCtx); } - return res; + return Collections.unmodifiableMap(res); } /** */ @@ -386,7 +386,7 @@ private Map checkDumpFiles( ) ); - return partitionHashRecords.stream().collect(Collectors.toMap(PartitionHashRecord::partitionKey, r -> r)); + return partitionHashRecords.stream().collect(Collectors.toUnmodifiableMap(PartitionHashRecord::partitionKey, r -> r)); } catch (Throwable t) { log.error("Error executing handler: ", t); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/SingleNodeMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/SingleNodeMessage.java index 862b2279a9bff..a048277f11846 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/SingleNodeMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/distributed/SingleNodeMessage.java @@ -47,6 +47,9 @@ public class SingleNodeMessage implements Message { /** Single node response. */ private R resp; + /** Response byte representation. */ + private byte[] respBytes; + /** Error. */ private Throwable err; @@ -92,8 +95,14 @@ public SingleNodeMessage(UUID processId, DistributedProcessType type, R resp, Th writer.incrementState(); case 2: - if (!writer.writeByteArray(U.toBytes(resp))) + if (respBytes == null) { + respBytes = toBytes(resp); + } + + if (!writer.writeByteArray(respBytes)) return false; + else + respBytes = null; writer.incrementState(); @@ -152,6 +161,11 @@ public SingleNodeMessage(UUID processId, DistributedProcessType type, R resp, Th return true; } + /** */ + byte[] toBytes(Serializable obj) { + return U.toBytes(obj); + } + /** {@inheritDoc} */ @Override public short directType() { return TYPE_CODE; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 422d5c333a374..fc3f0807a492e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -379,6 +379,11 @@ public static void stopBlockAll() { spi(ignite).stopBlock(true); } + /** Return list of blocked messages. */ + public List blockedMessages() { + return blockedMsgs; + } + /** * @param grpId Group id. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java new file mode 100644 index 0000000000000..ede286eacb87c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/distributed/SingleNodeMessageSerializationTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.distributed; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; +import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessage; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.TEST_PROCESS; +import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** Check {@link SingleNodeMessage} serialization. */ +public class SingleNodeMessageSerializationTest extends GridCommonAbstractTest { + /** Nodes count. */ + public static final int NODES_CNT = 2; + + /** {@inheritDoc} */ + @Override protected void afterTest() { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(instanceName); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setDiscoverySpi(new TestDiscoverySpi() + .setIpFinder(((TcpDiscoverySpi)cfg.getDiscoverySpi()).getIpFinder())); + + return cfg; + } + + /** Test check that serialization raised only once. */ + @Test + public void testSingleSerializedOnce() throws Exception { + startGridsMultiThreaded(NODES_CNT); + startClientGrid(NODES_CNT); + + TestRecordingCommunicationSpi clnCommSpi = TestRecordingCommunicationSpi.spi(grid(NODES_CNT)); + + assertTrue(grid(NODES_CNT).configuration().isClientMode()); + + clnCommSpi.blockMessages((node, msg) -> msg instanceof SingleNodeMessage); + + TestDiscoverySpi discoSpi = (TestDiscoverySpi)grid(NODES_CNT).context().discovery().getInjectedDiscoverySpi(); + + CountDownLatch latch = new CountDownLatch(1); + + discoSpi.messageLatch(latch); + + Set nodeIdsRes = new HashSet<>(); + + List> processes = new ArrayList<>(NODES_CNT + 1); + + for (int i = 0; i < NODES_CNT; i++) + nodeIdsRes.add(grid(i).localNode().id()); + + for (int n = 0; n < NODES_CNT + 1; n++) { + DistributedProcess dp = new TestDistributedProcess( + grid(n).context(), (id, req) -> new InitMessage<>(id, TEST_PROCESS, req, true)); + + processes.add(dp); + } + + int sendBuffSize = clnCommSpi.getSocketSendBuffer(); + + // it will be enough for buffer overflow cause some serialization overhead is present + byte[] arr = new byte[sendBuffSize]; + + byte[] serialized = U.toBytes(arr); + + assertTrue(serialized.length > sendBuffSize); + + processes.get(0).start(UUID.randomUUID(), arr); + + clnCommSpi.waitForBlocked(); + + assertEquals(1, clnCommSpi.blockedMessages().size()); + + TestRecordingCommunicationSpi.BlockedMessageDescriptor blocked = clnCommSpi.blockedMessages().get(0); + + SingleNodeMessage msgSpied = (SingleNodeMessage)spy(blocked.ioMessage().message()); + + setFieldValue(blocked.ioMessage(), "msg", msgSpied); + + clnCommSpi.stopBlock(); + + latch.await(10, TimeUnit.SECONDS); + + // Serialized only once. + verify(msgSpied, times(1)).toBytes(any()); + + // Write to buffer - several times cause buffer size is less than serialization representation. + verify(msgSpied, times(2)).writeTo(any(), any()); + } + + /** */ + private static class TestDistributedProcess extends DistributedProcess { + /** */ + public TestDistributedProcess( + GridKernalContext ctx, + BiFunction> initMsgFactory + ) { + super( + ctx, + TEST_PROCESS, + (req) -> new GridFinishedFuture<>(req), + (uuid, res, err) -> {}, + initMsgFactory); + } + } + + /** */ + private static class TestDiscoverySpi extends TcpDiscoverySpi { + /** */ + private CountDownLatch messageLatch; + + /** Message raized trigger. */ + void messageLatch(CountDownLatch messageLatch) { + this.messageLatch = messageLatch; + } + + /** {@inheritDoc} */ + @Override protected void startMessageProcess(TcpDiscoveryAbstractMessage msg) { + if (messageLatch != null && msg instanceof TcpDiscoveryCustomEventMessage) { + TcpDiscoveryCustomEventMessage discoMsg = (TcpDiscoveryCustomEventMessage)msg; + + try { + DiscoverySpiCustomMessage custMsg = discoMsg.message(marshaller(), + U.resolveClassLoader(ignite().configuration())); + + if (custMsg instanceof CustomMessageWrapper) { + if (((CustomMessageWrapper)custMsg).delegate() instanceof FullMessage) + messageLatch.countDown(); + } + } + catch (Throwable e) { + throw new RuntimeException(e); + } + } + + super.startMessageProcess(msg); + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index abacbcaa46ccb..e26cc843c4585 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -32,6 +32,7 @@ import org.apache.ignite.internal.util.IgniteExceptionRegistrySelfTest; import org.apache.ignite.internal.util.IgniteUtilsSelfTest; import org.apache.ignite.internal.util.IgniteUtilsUnitTest; +import org.apache.ignite.internal.util.distributed.SingleNodeMessageSerializationTest; import org.apache.ignite.internal.util.nio.GridNioDelimitedBufferSelfTest; import org.apache.ignite.internal.util.nio.GridNioSelfTest; import org.apache.ignite.internal.util.nio.GridNioServerTest; @@ -147,6 +148,7 @@ DistributedProcessErrorHandlingTest.class, DistributedProcessCoordinatorLeftTest.class, DistributedProcessClientAwaitTest.class, + SingleNodeMessageSerializationTest.class, BasicRateLimiterTest.class,