From f1bef676baf056adce64311731571d845f7f59c0 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Mon, 4 May 2026 08:36:05 +0300 Subject: [PATCH 1/6] IGNITE-28477 Merge StartRoutineDiscoveryMessageV2 into StartRoutineDiscoveryMessage --- .../ignite/internal/CoreMessagesProvider.java | 4 -- .../continuous/GridContinuousProcessor.java | 30 +++------ .../StartRoutineDiscoveryMessage.java | 26 +++++++- .../StartRoutineDiscoveryMessageV2.java | 62 ------------------- .../resources/META-INF/classnames.properties | 1 - .../messaging/GridMessagingSelfTest.java | 5 +- 6 files changed, 33 insertions(+), 95 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 836e36254e1eb..702e2ee14553e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -184,9 +184,7 @@ import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.continuous.StartRequestData; -import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; -import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2; import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; @@ -512,8 +510,6 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withSchema(AtomicApplicationAttributesAwareRequest.class); withNoSchema(StartRequestData.class); withNoSchema(StartRoutineDiscoveryMessage.class); - withNoSchema(StartRoutineAckDiscoveryMessage.class); - withNoSchema(StartRoutineDiscoveryMessageV2.class); withNoSchema(StoredCacheData.class); // [10600-10800]: Affinity & partition maps. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 7509a88fa178a..545627f8aab40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker; import org.apache.ignite.internal.thread.OomExceptionHandler; @@ -211,26 +212,13 @@ public GridContinuousProcessor(GridKernalContext ctx) { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StartRoutineDiscoveryMessage msg) { - assert !immutableDiscoCustomMsg; - if (ctx.isStopping()) return; - processStartRequestMutable(snd, msg); - } - }); - - ctx.discovery().setCustomEventListener(StartRoutineDiscoveryMessageV2.class, - new CustomEventListener() { - @Override public void onCustomEvent(AffinityTopologyVersion topVer, - ClusterNode snd, - StartRoutineDiscoveryMessageV2 msg) { - assert immutableDiscoCustomMsg; - - if (ctx.isStopping()) - return; - - processStartRequestImmutable(topVer, snd, msg); + if (immutableDiscoCustomMsg) + processStartRequestImmutable(topVer, snd, msg); + else + processStartRequestMutable(snd, msg); } }); @@ -992,9 +980,7 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, reqData.prepareMarshal(ctx); if (!immutableDiscoCustomMsg) { - StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage( - routineId, - reqData); + StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE); if (hnd.updateCounters() != null) msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); @@ -1002,7 +988,7 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, return msg; } else - return new StartRoutineDiscoveryMessageV2(routineId, reqData); + return new StartRoutineDiscoveryMessage(routineId, reqData, Mode.IMMUTABLE); } /** @@ -1468,7 +1454,7 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart */ private void processStartRequestImmutable(final AffinityTopologyVersion topVer, final ClusterNode snd, - final StartRoutineDiscoveryMessageV2 msg) { + final StartRoutineDiscoveryMessage msg) { StartRequestData reqData = msg.startRequestData(); ContinuousRoutineInfo routineInfo = new ContinuousRoutineInfo(snd.id(), diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index af18f362fe0d7..ddf9e63f6c424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -26,15 +26,26 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; +import static org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE; + /** * Discovery message used for Continuous Query registration. */ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { + /** Discovery message mode. */ + enum Mode { + /** Mutable discovery mode. */ + MUTABLE, + + /** Immutable discovery mode. */ + IMMUTABLE + } + /** */ @Order(0) StartRequestData startReqData; - /** */ + /** Errors collected by mutable discovery. */ @Order(1) Map errs = new HashMap<>(); @@ -46,14 +57,20 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { @Order(3) Map> updateCntrsPerNode; + /** Discovery message mode. */ + @Order(4) + Mode mode; + /** * @param routineId Routine id. * @param startReqData Start request data. + * @param mode Discovery message mode. */ - public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { + StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, Mode mode) { super(routineId); this.startReqData = startReqData; + this.mode = mode; } /** */ @@ -110,11 +127,14 @@ public void addUpdateCounters(UUID nodeId, Map cntrs) { /** {@inheritDoc} */ @Override public boolean isMutable() { - return true; + return mode == MUTABLE; } /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { + if (!isMutable()) + return null; + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java deleted file mode 100644 index 15b5783652849..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessageV2.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.continuous; - -import java.util.UUID; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.util.typedef.internal.S; - -/** - * - */ -public class StartRoutineDiscoveryMessageV2 extends AbstractContinuousMessage { - /** */ - @Order(0) - StartRequestData startReqData; - - /** */ - public StartRoutineDiscoveryMessageV2() {} - - /** - * @param routineId Routine id. - * @param startReqData Start request data. - */ - StartRoutineDiscoveryMessageV2(UUID routineId, StartRequestData startReqData) { - super(routineId); - - this.startReqData = startReqData; - } - - /** - * @return Start request data. - */ - public StartRequestData startRequestData() { - return startReqData; - } - - /** {@inheritDoc} */ - @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRoutineDiscoveryMessageV2.class, this, "routineId", routineId()); - } -} diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index 954755b375ad7..e13c53c74e0ab 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1518,7 +1518,6 @@ org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRo org.apache.ignite.internal.processors.continuous.StartRequestData org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage -org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2 org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$3 diff --git a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java index 6a7760ec5584c..0af9e2f28762d 100644 --- a/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/messaging/GridMessagingSelfTest.java @@ -40,7 +40,6 @@ import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.internal.DiscoverySpiTestListener; import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; -import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessageV2; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.typedef.P2; @@ -1051,7 +1050,7 @@ public void testAsyncOld() throws Exception { } }, IllegalStateException.class, null); - lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); + lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class); final String topic = "topic"; @@ -1149,7 +1148,7 @@ public void testAsync() throws Exception { discoSpi.setInternalListener(lsnr); - lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class, StartRoutineDiscoveryMessageV2.class); + lsnr.blockCustomEvent(StartRoutineDiscoveryMessage.class); final String topic = "topic"; From 97dd47f4fa4e0990c66343e75e58254dcf10a86b Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Wed, 6 May 2026 09:47:53 +0300 Subject: [PATCH 2/6] IGNITE-28477 Remove mutable path --- .../continuous/AbstractContinuousMessage.java | 6 + .../continuous/GridContinuousProcessor.java | 577 ++---------------- .../StartRoutineAckDiscoveryMessage.java | 99 --- .../StartRoutineDiscoveryMessage.java | 91 +-- .../StopRoutineAckDiscoveryMessage.java | 7 - .../resources/META-INF/classnames.properties | 3 - .../ContinuousQueryBuffersCleanupTest.java | 4 - .../discovery/tcp/TcpDiscoverySelfTest.java | 8 +- 8 files changed, 52 insertions(+), 743 deletions(-) delete mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java index d16ef9f59d741..50c65d984ab27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -21,6 +21,7 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * @@ -50,4 +51,9 @@ protected AbstractContinuousMessage(UUID id) { public UUID routineId() { return routineId; } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 545627f8aab40..ed7d95cffd977 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -17,12 +17,7 @@ package org.apache.ignite.internal.processors.continuous; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -72,7 +67,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; -import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker; import org.apache.ignite.internal.thread.OomExceptionHandler; @@ -80,10 +74,8 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -127,9 +119,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Local infos. */ private final ConcurrentMap locInfos = new ConcurrentHashMap<>(); - /** Local infos. */ - private final ConcurrentMap> clientInfos = new ConcurrentHashMap<>(); - /** Remote infos. */ private final ConcurrentMap rmtInfos = new ConcurrentHashMap<>(); @@ -172,9 +161,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** */ private ContinuousRoutinesInfo routinesInfo; - /** Whether Discovery SPI uses immutable custom messages. */ - private boolean immutableDiscoCustomMsg; - /** * @param ctx Kernal context. */ @@ -189,10 +175,7 @@ public GridContinuousProcessor(GridKernalContext ctx) { new ReadOnlyCollectionView2X<>(rmtInfos.entrySet(), locInfos.entrySet()), e -> new ContinuousQueryView(e.getKey(), e.getValue())); - immutableDiscoCustomMsg = !ctx.discovery().mutableCustomMessages(); - - if (immutableDiscoCustomMsg) - routinesInfo = new ContinuousRoutinesInfo(); + routinesInfo = new ContinuousRoutinesInfo(); retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); @@ -215,22 +198,7 @@ public GridContinuousProcessor(GridKernalContext ctx) { if (ctx.isStopping()) return; - if (immutableDiscoCustomMsg) - processStartRequestImmutable(topVer, snd, msg); - else - processStartRequestMutable(snd, msg); - } - }); - - ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, - new CustomEventListener() { - @Override public void onCustomEvent(AffinityTopologyVersion topVer, - ClusterNode snd, - StartRoutineAckDiscoveryMessage msg) { - if (ctx.isStopping()) - return; - - processStartAckRequest(topVer, msg); + processStartRequest(topVer, snd, msg); } }); @@ -239,8 +207,7 @@ public GridContinuousProcessor(GridKernalContext ctx) { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { - if (immutableDiscoCustomMsg) - routinesInfo.removeRoutine(msg.routineId); + routinesInfo.removeRoutine(msg.routineId); if (ctx.isStopping()) return; @@ -334,11 +301,6 @@ Map localRoutineInfos() { return Collections.unmodifiableMap(locInfos); } - /** */ - Map> clientRoutineInfos() { - return Collections.unmodifiableMap(clientInfos); - } - /** * @return {@code true} if lock successful, {@code false} if processor already stopped. */ @@ -396,106 +358,12 @@ public void unlockStopping() { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - if (immutableDiscoCustomMsg) { - routinesInfo.collectJoiningNodeData(dataBag); - - return; - } - - Serializable data = getDiscoveryData(dataBag.joiningNodeId()); - - if (data != null) - dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data); + routinesInfo.collectJoiningNodeData(dataBag); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - if (immutableDiscoCustomMsg) { - routinesInfo.collectGridNodeData(dataBag); - - return; - } - - Serializable data = getDiscoveryData(dataBag.joiningNodeId()); - - if (data != null) - dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data); - } - - /** - * @param joiningNodeId Joining node id. - */ - private Serializable getDiscoveryData(UUID joiningNodeId) { - if (log.isDebugEnabled()) { - log.debug("collectDiscoveryData [node=" + joiningNodeId + - ", loc=" + ctx.localNodeId() + - ", locInfos=" + locInfos + - ", clientInfos=" + clientInfos + - ']'); - } - - if (!joiningNodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { - Map> clientInfos0 = copyClientInfos(clientInfos); - - if (joiningNodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) { - Map infos = copyLocalInfos(locInfos); - - clientInfos0.put(ctx.localNodeId(), infos); - } - - DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); - - // Collect listeners information (will be sent to joining node during discovery process). - for (Map.Entry e : locInfos.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); - - assert !ctx.config().isPeerClassLoadingEnabled() || - !(info.hnd instanceof CacheContinuousQueryHandler) || - ((CacheContinuousQueryHandler)info.hnd).isMarshalled(); - - data.addItem(new DiscoveryDataItem(routineId, - info.prjPred, - info.hnd, - info.bufSize, - info.interval, - info.autoUnsubscribe)); - } - - return data; - } - - return null; - } - - /** - * @param clientInfos Client infos. - */ - private Map> copyClientInfos(Map> clientInfos) { - Map> res = U.newHashMap(clientInfos.size()); - - for (Map.Entry> e : clientInfos.entrySet()) { - Map cp = U.newHashMap(e.getValue().size()); - - for (Map.Entry e0 : e.getValue().entrySet()) - cp.put(e0.getKey(), e0.getValue()); - - res.put(e.getKey(), cp); - } - - return res; - } - - /** - * @param locInfos Locale infos. - */ - private Map copyLocalInfos(Map locInfos) { - Map res = U.newHashMap(locInfos.size()); - - for (Map.Entry e : locInfos.entrySet()) - res.put(e.getKey(), e.getValue()); - - return res; + routinesInfo.collectGridNodeData(dataBag); } /** {@inheritDoc} */ @@ -507,102 +375,41 @@ private Map copyLocalInfos(Map l ']'); } - if (immutableDiscoCustomMsg) { - if (data.hasJoiningNodeData()) { - ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) - data.joiningNodeData(); + if (data.hasJoiningNodeData()) { + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) + data.joiningNodeData(); - for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { - routinesInfo.addRoutineInfo(routineInfo); + for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { + routinesInfo.addRoutineInfo(routineInfo); - onDiscoveryDataReceivedImmutable(routineInfo); - } + onDiscoveryDataReceived(routineInfo); } } - else { - if (data.hasJoiningNodeData()) - onDiscoveryDataReceivedMutable((DiscoveryData)data.joiningNodeData()); - } } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - if (immutableDiscoCustomMsg) { - if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); - - for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { - if (routinesInfo.routineExists(routineInfo.routineId)) - continue; - - routinesInfo.addRoutineInfo(routineInfo); - - onDiscoveryDataReceivedImmutable(routineInfo); - } - } - } - else { - Map nodeSpecData = data.nodeSpecificData(); - - if (nodeSpecData != null) { - for (Map.Entry e : nodeSpecData.entrySet()) - onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue()); - } - } - } - - /** - * Processes data received in a discovery message. - * Used when Discovery SPI supports mutable custom messages. - * - * @param data received discovery data. - */ - private void onDiscoveryDataReceivedMutable(DiscoveryData data) { - if (data != null) { - for (DiscoveryDataItem item : data.items) { - if (!locInfos.containsKey(item.routineId)) { - registerHandlerOnJoin(data.nodeId, item.routineId, item.prjPred, - item.hnd, item.bufSize, item.interval, item.autoUnsubscribe); - } - - if (!item.autoUnsubscribe) { - locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(data.nodeId, - item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe)); - } - } - - // Process CQs started on clients. - for (Map.Entry> entry : data.clientInfos.entrySet()) { - UUID clientNodeId = entry.getKey(); - - if (!ctx.localNodeId().equals(clientNodeId)) { - Map clientRoutineMap = entry.getValue(); - - for (Map.Entry e : clientRoutineMap.entrySet()) { - UUID routineId = e.getKey(); - LocalRoutineInfo info = e.getValue(); + if (data.commonData() != null) { + ContinuousRoutinesCommonDiscoveryData commonData = + (ContinuousRoutinesCommonDiscoveryData)data.commonData(); - registerHandlerOnJoin(clientNodeId, routineId, info.prjPred, - info.hnd, info.bufSize, info.interval, info.autoUnsubscribe); - } - } + for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { + if (routinesInfo.routineExists(routineInfo.routineId)) + continue; - Map map = - clientInfos.computeIfAbsent(clientNodeId, k -> new HashMap<>()); + routinesInfo.addRoutineInfo(routineInfo); - map.putAll(entry.getValue()); + onDiscoveryDataReceived(routineInfo); } } } /** * Processes data received in a discovery message. - * Used when Discovery SPI doesn't support mutable custom messages. * * @param routineInfo Routine info. */ - private void onDiscoveryDataReceivedImmutable(ContinuousRoutineInfo routineInfo) { + private void onDiscoveryDataReceived(ContinuousRoutineInfo routineInfo) { IgnitePredicate nodeFilter; try { @@ -780,16 +587,14 @@ public UUID registerStaticRoutine( LocalRoutineInfo routineInfo = new LocalRoutineInfo(ctx.localNodeId(), prjPred, hnd, 1, 0, true); - if (immutableDiscoCustomMsg) { - routinesInfo.addRoutineInfo(createRoutineInfo( - ctx.localNodeId(), - routineId, - hnd, - prjPred, - routineInfo.bufSize, - routineInfo.interval, - routineInfo.autoUnsubscribe)); - } + routinesInfo.addRoutineInfo(createRoutineInfo( + ctx.localNodeId(), + routineId, + hnd, + prjPred, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe)); locInfos.put(routineId, routineInfo); @@ -979,16 +784,7 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, reqData.prepareMarshal(ctx); - if (!immutableDiscoCustomMsg) { - StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE); - - if (hnd.updateCounters() != null) - msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); - - return msg; - } - else - return new StartRoutineDiscoveryMessage(routineId, reqData, Mode.IMMUTABLE); + return new StartRoutineDiscoveryMessage(routineId, reqData); } /** @@ -1058,7 +854,7 @@ public IgniteInternalFuture stopRoutine(UUID routineId) { unregisterHandler(routineId, routine.hnd, true); } - if (!stop && immutableDiscoCustomMsg) + if (!stop) stop = routinesInfo.routineExists(routineId); // Finish if routine is not found (wrong ID is provided). @@ -1226,11 +1022,8 @@ public void addNotification(UUID nodeId, @Override public void onDisconnected(IgniteFuture reconnectFut) { cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected.")); - if (log.isDebugEnabled()) { - log.debug("onDisconnected [rmtInfos=" + rmtInfos + - ", locInfos=" + locInfos + - ", clientInfos=" + clientInfos + ']'); - } + if (log.isDebugEnabled()) + log.debug("onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos + ']'); for (Map.Entry e : rmtInfos.entrySet()) { RemoteRoutineInfo info = e.getValue(); @@ -1244,16 +1037,10 @@ public void addNotification(UUID nodeId, rmtInfos.clear(); - clientInfos.clear(); + routinesInfo.onClientDisconnected(locInfos.keySet()); - if (immutableDiscoCustomMsg) - routinesInfo.onClientDisconnected(locInfos.keySet()); - - if (log.isDebugEnabled()) { - log.debug("after onDisconnected [rmtInfos=" + rmtInfos + - ", locInfos=" + locInfos + - ", clientInfos=" + clientInfos + ']'); - } + if (log.isDebugEnabled()) + log.debug("after onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos + ']'); } /** @@ -1297,7 +1084,7 @@ private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) { /** * @param snd Sender node. - * @param msg Message/ + * @param msg Message. */ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) { if (!snd.id().equals(ctx.localNodeId())) { @@ -1305,135 +1092,6 @@ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg unregisterRemote(routineId); } - - for (Map clientInfo : clientInfos.values()) { - if (clientInfo.remove(msg.routineId()) != null) - break; - } - } - - /** - * @param topVer Topology version. - * @param msg Message. - */ - private void processStartAckRequest(AffinityTopologyVersion topVer, - StartRoutineAckDiscoveryMessage msg) { - StartFuture fut = startFuts.remove(msg.routineId()); - - if (fut != null) { - fut.onAllRemoteRegistered( - topVer, - msg.errors(), - msg.updateCountersPerNode(), - msg.updateCounters()); - } - } - - /** - * @param node Sender. - * @param req Start request. - */ - private void processStartRequestMutable(ClusterNode node, StartRoutineDiscoveryMessage req) { - if (node.id().equals(ctx.localNodeId())) - return; - - UUID routineId = req.routineId(); - - StartRequestData data = req.startRequestData(); - - IgniteCheckedException err = null; - - try { - data.finishUnmarshal(ctx, node.id()); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to unmarshal start request data [nodeId=" + node.id() + - ", routineId=" + routineId + ']', e); - - // Tolerate missing classes exceptions (e.g. remote filter class). - // We need this means because CQ registration process assumes that an "ack message" will be sent. - if (X.hasCause(e, ClassNotFoundException.class)) { - if (checkNodeFilter(req)) - req.addError(node.id(), e); - - return; - } - - err = e; - } - - GridContinuousHandler hnd = data.handler(); - - if (node.isClient()) { - Map clientRoutineMap = clientInfos.get(node.id()); - - if (clientRoutineMap == null) { - clientRoutineMap = new HashMap<>(); - - Map old = clientInfos.put(node.id(), clientRoutineMap); - - assert old == null; - } - - clientRoutineMap.put(routineId, new LocalRoutineInfo(node.id(), - data.nodeFilter(), - hnd, - data.bufferSize(), - data.interval(), - data.autoUnsubscribe())); - } - - if (err == null) { - try { - IgnitePredicate prjPred = data.nodeFilter(); - - if (prjPred != null) - ctx.resource().injectGeneric(prjPred); - - if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && - !locInfos.containsKey(routineId)) { - if (ctx.config().isPeerClassLoadingEnabled()) - hnd.p2pUnmarshal(node.id(), ctx); - - registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), - data.autoUnsubscribe(), false); - - // Load partition counters. - if (err == null && hnd.isQuery()) { - GridCacheProcessor proc = ctx.cache(); - - if (proc != null) { - GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); - - if (cache != null && cache.context().userCache()) - req.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); - } - } - } - - if (!data.autoUnsubscribe()) - // Register routine locally. - locInfos.putIfAbsent(routineId, new LocalRoutineInfo( - node.id(), prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe())); - } - catch (IgniteCheckedException e) { - err = e; - - U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); - } - } - - if (err != null) - req.addError(ctx.localNodeId(), err); - } - - /** */ - private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) { - StartRequestData reqData = req.startRequestData(); - IgnitePredicate nodeFilter; - - return reqData == null || (nodeFilter = reqData.nodeFilter()) == null - || nodeFilter.apply(ctx.discovery().localNode()); } /** @@ -1452,7 +1110,7 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart * @param snd Sender. * @param msg Start request. */ - private void processStartRequestImmutable(final AffinityTopologyVersion topVer, + private void processStartRequest(final AffinityTopologyVersion topVer, final ClusterNode snd, final StartRoutineDiscoveryMessage msg) { StartRequestData reqData = msg.startRequestData(); @@ -1894,14 +1552,10 @@ private class DiscoveryListener implements GridLocalEventListener, HighPriorityL UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - if (immutableDiscoCustomMsg) { - routinesInfo.onNodeFail(nodeId); - - for (StartFuture fut : startFuts.values()) - fut.onNodeFail(nodeId); - } + routinesInfo.onNodeFail(nodeId); - clientInfos.remove(nodeId); + for (StartFuture fut : startFuts.values()) + fut.onNodeFail(nodeId); // Unregister handlers created by left node. for (Map.Entry e : rmtInfos.entrySet()) { @@ -2277,157 +1931,6 @@ IgniteBiTuple checkInterval() { } } - /** - * Discovery data. - */ - private static class DiscoveryData implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Node ID. */ - private UUID nodeId; - - /** Items. */ - @GridToStringInclude - private Collection items; - - /** */ - private Map> clientInfos; - - /** - * Required by {@link Externalizable}. - */ - public DiscoveryData() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param clientInfos Client information. - */ - DiscoveryData(UUID nodeId, Map> clientInfos) { - assert nodeId != null; - - this.nodeId = nodeId; - - this.clientInfos = clientInfos; - - items = new ArrayList<>(); - } - - /** - * @param item Item. - */ - public void addItem(DiscoveryDataItem item) { - items.add(item); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, nodeId); - U.writeCollection(out, items); - U.writeMap(out, clientInfos); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - nodeId = U.readUuid(in); - items = U.readCollection(in); - clientInfos = U.readMap(in); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoveryData.class, this); - } - } - - /** - * Discovery data item. - */ - private static class DiscoveryDataItem implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Consume ID. */ - private UUID routineId; - - /** Projection predicate. */ - private IgnitePredicate prjPred; - - /** Handler. */ - private GridContinuousHandler hnd; - - /** Buffer size. */ - private int bufSize; - - /** Time interval. */ - private long interval; - - /** Automatic unsubscribe flag. */ - private boolean autoUnsubscribe; - - /** - * Required by {@link Externalizable}. - */ - public DiscoveryDataItem() { - // No-op. - } - - /** - * @param routineId Consume ID. - * @param prjPred Projection predicate. - * @param hnd Handler. - * @param bufSize Buffer size. - * @param interval Time interval. - * @param autoUnsubscribe Automatic unsubscribe flag. - */ - DiscoveryDataItem(UUID routineId, - @Nullable IgnitePredicate prjPred, - GridContinuousHandler hnd, - int bufSize, - long interval, - boolean autoUnsubscribe - ) { - assert routineId != null; - assert hnd != null; - assert bufSize > 0; - assert interval >= 0; - - this.routineId = routineId; - this.prjPred = prjPred; - this.hnd = hnd; - this.bufSize = bufSize; - this.interval = interval; - this.autoUnsubscribe = autoUnsubscribe; - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - U.writeUuid(out, routineId); - out.writeObject(prjPred); - out.writeObject(hnd); - out.writeInt(bufSize); - out.writeLong(interval); - out.writeBoolean(autoUnsubscribe); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - routineId = U.readUuid(in); - prjPred = (IgnitePredicate)in.readObject(); - hnd = (GridContinuousHandler)in.readObject(); - bufSize = in.readInt(); - interval = in.readLong(); - autoUnsubscribe = in.readBoolean(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(DiscoveryDataItem.class, this); - } - } - /** * Future for start routine. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java deleted file mode 100644 index ff4901dfd1c98..0000000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.continuous; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.communication.ErrorMessage; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; - -/** - * - */ -public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { - /** */ - @Order(0) - Map errs; - - /** */ - @GridToStringExclude - @Order(1) - Map updateCntrs; - - /** */ - @GridToStringExclude - @Order(2) - Map> updateCntrsPerNode; - - /** - * @param routineId Routine id. - * @param errs Errs. - * @param cntrs Partition counters. - * @param cntrsPerNode Partition counters per node. - */ - public StartRoutineAckDiscoveryMessage(UUID routineId, - Map errs, - Map cntrs, - Map> cntrsPerNode) { - super(routineId); - - this.errs = new HashMap<>(errs); - updateCntrs = cntrs; - updateCntrsPerNode = cntrsPerNode; - } - - /** */ - public StartRoutineAckDiscoveryMessage() {} - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - - /** - * @return Update counters for partitions. - */ - public Map updateCounters() { - return updateCntrs; - } - - /** - * @return Update counters for partitions per each node. - */ - public Map> updateCountersPerNode() { - return updateCntrsPerNode; - } - - /** - * @return Errs. - */ - public Map errors() { - return F.viewReadOnly(errs, m -> ErrorMessage.error(m)); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(StartRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index ddf9e63f6c424..93b8f5d0dcef5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -17,60 +17,26 @@ package org.apache.ignite.internal.processors.continuous; -import java.util.HashMap; -import java.util.Map; import java.util.UUID; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; -import org.apache.ignite.internal.managers.communication.ErrorMessage; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; -import static org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE; - /** * Discovery message used for Continuous Query registration. */ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { - /** Discovery message mode. */ - enum Mode { - /** Mutable discovery mode. */ - MUTABLE, - - /** Immutable discovery mode. */ - IMMUTABLE - } - /** */ @Order(0) StartRequestData startReqData; - /** Errors collected by mutable discovery. */ - @Order(1) - Map errs = new HashMap<>(); - - /** */ - @Order(2) - Map updateCntrs; - - /** */ - @Order(3) - Map> updateCntrsPerNode; - - /** Discovery message mode. */ - @Order(4) - Mode mode; - /** * @param routineId Routine id. * @param startReqData Start request data. - * @param mode Discovery message mode. */ - StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, Mode mode) { + public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { super(routineId); this.startReqData = startReqData; - this.mode = mode; } /** */ @@ -83,61 +49,6 @@ public StartRequestData startRequestData() { return startReqData; } - /** - * @param nodeId Node id. - * @param e Exception. - */ - public void addError(UUID nodeId, IgniteCheckedException e) { - if (errs == null) - errs = new HashMap<>(); - - errs.put(nodeId, new ErrorMessage(e)); - } - - /** - * @param cntrs Update counters. - */ - private void addUpdateCounters(Map cntrs) { - if (updateCntrs == null) - updateCntrs = new HashMap<>(); - - for (Map.Entry e : cntrs.entrySet()) { - Long cntr0 = updateCntrs.get(e.getKey()); - Long cntr1 = e.getValue(); - - if (cntr0 == null || cntr1 > cntr0) - updateCntrs.put(e.getKey(), cntr1); - } - } - - /** - * @param nodeId Local node ID. - * @param cntrs Update counters. - */ - public void addUpdateCounters(UUID nodeId, Map cntrs) { - addUpdateCounters(cntrs); - - if (updateCntrsPerNode == null) - updateCntrsPerNode = new HashMap<>(); - - Map old = updateCntrsPerNode.put(nodeId, cntrs); - - assert old == null : old; - } - - /** {@inheritDoc} */ - @Override public boolean isMutable() { - return mode == MUTABLE; - } - - /** {@inheritDoc} */ - @Override public DiscoveryCustomMessage ackMessage() { - if (!isMutable()) - return null; - - return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode); - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index e889b5fd45fba..91361996918c1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -18,9 +18,7 @@ package org.apache.ignite.internal.processors.continuous; import java.util.UUID; -import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; -import org.jetbrains.annotations.Nullable; /** * @@ -38,11 +36,6 @@ public StopRoutineAckDiscoveryMessage(UUID routineId) { super(routineId); } - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index e13c53c74e0ab..dc44b38a0f7d1 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1512,11 +1512,8 @@ org.apache.ignite.internal.processors.continuous.GridContinuousMessage org.apache.ignite.internal.processors.continuous.GridContinuousMessageType org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$11$1 org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$9 -org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData -org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryDataItem org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRoutineInfo org.apache.ignite.internal.processors.continuous.StartRequestData -org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java index 8f453ca4a876e..d5356de0826ab 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java @@ -194,10 +194,6 @@ private boolean isContinuesQueryBufferEmpty(Ignite ignite) { contProc.localRoutineInfos().forEach((routineId, locRoutineInfo) -> cqHandlers.add((CacheContinuousQueryHandler)locRoutineInfo.handler())); - contProc.clientRoutineInfos().forEach((nodeId, rmtRoutineInfos) -> - rmtRoutineInfos.forEach((routineId, locRoutineInfo) -> - cqHandlers.add((CacheContinuousQueryHandler)locRoutineInfo.handler()))); - for (CacheContinuousQueryHandler cqHnd : cqHandlers) { for (CacheContinuousQueryEventBuffer evtBuf : partitionContinuesQueryEntryBuffers(cqHnd).values()) { if (backupQueueSize(evtBuf) != 0) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index ad98a2936b98f..febcb9eea0635 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -57,7 +57,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; +import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -2073,7 +2073,9 @@ public void testCustomEventAckNotSend() throws Exception { spi0.stopBeforeSndAck = true; - ignite1.message().remoteListen("test", new DummyPredicate()); + UUID routineId = ignite1.message().remoteListen("test", new DummyPredicate()); + + ignite1.message().stopRemoteListen(routineId); waitNodeStop(ignite0.name()); @@ -2649,7 +2651,7 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { DiscoveryCustomMessage custMsg = U.unwrapCustomMessage(evtMsg.message()); - if (custMsg instanceof StartRoutineAckDiscoveryMessage) { + if (custMsg instanceof StopRoutineAckDiscoveryMessage) { log.info("Skip message send and stop node: " + msg); ses.socket().close(); From 4103b2b7aa8385b1e8128ba0fad36547a72338f8 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Wed, 13 May 2026 18:35:05 +0300 Subject: [PATCH 3/6] IGNITE-28477 Handle missing filter class --- .../continuous/GridContinuousProcessor.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index ed7d95cffd977..0f19aa131a39f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -1094,6 +1095,14 @@ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg } } + /** */ + private boolean checkNodeFilter(StartRequestData reqData) { + IgnitePredicate nodeFilter; + + return reqData == null || (nodeFilter = reqData.nodeFilter()) == null + || nodeFilter.apply(ctx.discovery().localNode()); + } + /** * @param sndId Sender node ID. * @param msg Message. @@ -1152,6 +1161,12 @@ private void processStartRequest(final AffinityTopologyVersion topVer, U.error(log, "Failed to unmarshal continuous request data [" + "routineId=" + msg.routineId + ", srcNodeId=" + snd.id() + ']', e); + + if (X.hasCause(e, ClassNotFoundException.class) && !checkNodeFilter(reqData)) { + sendMessageStartResult(snd, msg.routineId(), null, null); + + return; + } } IgnitePredicate nodeFilter = reqData.nodeFilter(); From 05de69cf581e889d4daffbcfcbfc09594ade8cf9 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Thu, 14 May 2026 14:22:57 +0300 Subject: [PATCH 4/6] Revert "IGNITE-28477 Handle missing filter class" This reverts commit c2270ee83a7db22b89d00bc753bbf2937f2ab84f. --- .../continuous/GridContinuousProcessor.java | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 0f19aa131a39f..ed7d95cffd977 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -76,7 +76,6 @@ import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -1095,14 +1094,6 @@ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg } } - /** */ - private boolean checkNodeFilter(StartRequestData reqData) { - IgnitePredicate nodeFilter; - - return reqData == null || (nodeFilter = reqData.nodeFilter()) == null - || nodeFilter.apply(ctx.discovery().localNode()); - } - /** * @param sndId Sender node ID. * @param msg Message. @@ -1161,12 +1152,6 @@ private void processStartRequest(final AffinityTopologyVersion topVer, U.error(log, "Failed to unmarshal continuous request data [" + "routineId=" + msg.routineId + ", srcNodeId=" + snd.id() + ']', e); - - if (X.hasCause(e, ClassNotFoundException.class) && !checkNodeFilter(reqData)) { - sendMessageStartResult(snd, msg.routineId(), null, null); - - return; - } } IgnitePredicate nodeFilter = reqData.nodeFilter(); From d7d2acb1351d5c0a630486b2ef9d363cc97a8a22 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Thu, 14 May 2026 14:22:57 +0300 Subject: [PATCH 5/6] Revert "IGNITE-28477 Remove mutable path" This reverts commit 7815031914cf261896eac49cdb99104dd104e522. --- .../continuous/AbstractContinuousMessage.java | 6 - .../continuous/GridContinuousProcessor.java | 577 ++++++++++++++++-- .../StartRoutineAckDiscoveryMessage.java | 99 +++ .../StartRoutineDiscoveryMessage.java | 91 ++- .../StopRoutineAckDiscoveryMessage.java | 7 + .../resources/META-INF/classnames.properties | 3 + .../ContinuousQueryBuffersCleanupTest.java | 4 + .../discovery/tcp/TcpDiscoverySelfTest.java | 8 +- 8 files changed, 743 insertions(+), 52 deletions(-) create mode 100644 modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java index 50c65d984ab27..d16ef9f59d741 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/AbstractContinuousMessage.java @@ -21,7 +21,6 @@ import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.lang.IgniteUuid; -import org.jetbrains.annotations.Nullable; /** * @@ -51,9 +50,4 @@ protected AbstractContinuousMessage(UUID id) { public UUID routineId() { return routineId; } - - /** {@inheritDoc} */ - @Nullable @Override public DiscoveryCustomMessage ackMessage() { - return null; - } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index ed7d95cffd977..545627f8aab40 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -17,7 +17,12 @@ package org.apache.ignite.internal.processors.continuous; +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -67,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; +import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.systemview.ContinuousQueryViewWalker; import org.apache.ignite.internal.thread.OomExceptionHandler; @@ -74,8 +80,10 @@ import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.lang.gridfunc.ReadOnlyCollectionView2X; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -119,6 +127,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** Local infos. */ private final ConcurrentMap locInfos = new ConcurrentHashMap<>(); + /** Local infos. */ + private final ConcurrentMap> clientInfos = new ConcurrentHashMap<>(); + /** Remote infos. */ private final ConcurrentMap rmtInfos = new ConcurrentHashMap<>(); @@ -161,6 +172,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter { /** */ private ContinuousRoutinesInfo routinesInfo; + /** Whether Discovery SPI uses immutable custom messages. */ + private boolean immutableDiscoCustomMsg; + /** * @param ctx Kernal context. */ @@ -175,7 +189,10 @@ public GridContinuousProcessor(GridKernalContext ctx) { new ReadOnlyCollectionView2X<>(rmtInfos.entrySet(), locInfos.entrySet()), e -> new ContinuousQueryView(e.getKey(), e.getValue())); - routinesInfo = new ContinuousRoutinesInfo(); + immutableDiscoCustomMsg = !ctx.discovery().mutableCustomMessages(); + + if (immutableDiscoCustomMsg) + routinesInfo = new ContinuousRoutinesInfo(); retryDelay = ctx.config().getNetworkSendRetryDelay(); retryCnt = ctx.config().getNetworkSendRetryCount(); @@ -198,7 +215,22 @@ public GridContinuousProcessor(GridKernalContext ctx) { if (ctx.isStopping()) return; - processStartRequest(topVer, snd, msg); + if (immutableDiscoCustomMsg) + processStartRequestImmutable(topVer, snd, msg); + else + processStartRequestMutable(snd, msg); + } + }); + + ctx.discovery().setCustomEventListener(StartRoutineAckDiscoveryMessage.class, + new CustomEventListener() { + @Override public void onCustomEvent(AffinityTopologyVersion topVer, + ClusterNode snd, + StartRoutineAckDiscoveryMessage msg) { + if (ctx.isStopping()) + return; + + processStartAckRequest(topVer, msg); } }); @@ -207,7 +239,8 @@ public GridContinuousProcessor(GridKernalContext ctx) { @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, StopRoutineDiscoveryMessage msg) { - routinesInfo.removeRoutine(msg.routineId); + if (immutableDiscoCustomMsg) + routinesInfo.removeRoutine(msg.routineId); if (ctx.isStopping()) return; @@ -301,6 +334,11 @@ Map localRoutineInfos() { return Collections.unmodifiableMap(locInfos); } + /** */ + Map> clientRoutineInfos() { + return Collections.unmodifiableMap(clientInfos); + } + /** * @return {@code true} if lock successful, {@code false} if processor already stopped. */ @@ -358,12 +396,106 @@ public void unlockStopping() { /** {@inheritDoc} */ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) { - routinesInfo.collectJoiningNodeData(dataBag); + if (immutableDiscoCustomMsg) { + routinesInfo.collectJoiningNodeData(dataBag); + + return; + } + + Serializable data = getDiscoveryData(dataBag.joiningNodeId()); + + if (data != null) + dataBag.addJoiningNodeData(CONTINUOUS_PROC.ordinal(), data); } /** {@inheritDoc} */ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) { - routinesInfo.collectGridNodeData(dataBag); + if (immutableDiscoCustomMsg) { + routinesInfo.collectGridNodeData(dataBag); + + return; + } + + Serializable data = getDiscoveryData(dataBag.joiningNodeId()); + + if (data != null) + dataBag.addNodeSpecificData(CONTINUOUS_PROC.ordinal(), data); + } + + /** + * @param joiningNodeId Joining node id. + */ + private Serializable getDiscoveryData(UUID joiningNodeId) { + if (log.isDebugEnabled()) { + log.debug("collectDiscoveryData [node=" + joiningNodeId + + ", loc=" + ctx.localNodeId() + + ", locInfos=" + locInfos + + ", clientInfos=" + clientInfos + + ']'); + } + + if (!joiningNodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) { + Map> clientInfos0 = copyClientInfos(clientInfos); + + if (joiningNodeId.equals(ctx.localNodeId()) && ctx.discovery().localNode().isClient()) { + Map infos = copyLocalInfos(locInfos); + + clientInfos0.put(ctx.localNodeId(), infos); + } + + DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos0); + + // Collect listeners information (will be sent to joining node during discovery process). + for (Map.Entry e : locInfos.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); + + assert !ctx.config().isPeerClassLoadingEnabled() || + !(info.hnd instanceof CacheContinuousQueryHandler) || + ((CacheContinuousQueryHandler)info.hnd).isMarshalled(); + + data.addItem(new DiscoveryDataItem(routineId, + info.prjPred, + info.hnd, + info.bufSize, + info.interval, + info.autoUnsubscribe)); + } + + return data; + } + + return null; + } + + /** + * @param clientInfos Client infos. + */ + private Map> copyClientInfos(Map> clientInfos) { + Map> res = U.newHashMap(clientInfos.size()); + + for (Map.Entry> e : clientInfos.entrySet()) { + Map cp = U.newHashMap(e.getValue().size()); + + for (Map.Entry e0 : e.getValue().entrySet()) + cp.put(e0.getKey(), e0.getValue()); + + res.put(e.getKey(), cp); + } + + return res; + } + + /** + * @param locInfos Locale infos. + */ + private Map copyLocalInfos(Map locInfos) { + Map res = U.newHashMap(locInfos.size()); + + for (Map.Entry e : locInfos.entrySet()) + res.put(e.getKey(), e.getValue()); + + return res; } /** {@inheritDoc} */ @@ -375,41 +507,102 @@ public void unlockStopping() { ']'); } - if (data.hasJoiningNodeData()) { - ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) - data.joiningNodeData(); + if (immutableDiscoCustomMsg) { + if (data.hasJoiningNodeData()) { + ContinuousRoutinesJoiningNodeDiscoveryData nodeData = (ContinuousRoutinesJoiningNodeDiscoveryData) + data.joiningNodeData(); - for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { - routinesInfo.addRoutineInfo(routineInfo); + for (ContinuousRoutineInfo routineInfo : nodeData.startedRoutines) { + routinesInfo.addRoutineInfo(routineInfo); - onDiscoveryDataReceived(routineInfo); + onDiscoveryDataReceivedImmutable(routineInfo); + } } } + else { + if (data.hasJoiningNodeData()) + onDiscoveryDataReceivedMutable((DiscoveryData)data.joiningNodeData()); + } } /** {@inheritDoc} */ @Override public void onGridDataReceived(GridDiscoveryData data) { - if (data.commonData() != null) { - ContinuousRoutinesCommonDiscoveryData commonData = - (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + if (immutableDiscoCustomMsg) { + if (data.commonData() != null) { + ContinuousRoutinesCommonDiscoveryData commonData = + (ContinuousRoutinesCommonDiscoveryData)data.commonData(); + + for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { + if (routinesInfo.routineExists(routineInfo.routineId)) + continue; + + routinesInfo.addRoutineInfo(routineInfo); + + onDiscoveryDataReceivedImmutable(routineInfo); + } + } + } + else { + Map nodeSpecData = data.nodeSpecificData(); + + if (nodeSpecData != null) { + for (Map.Entry e : nodeSpecData.entrySet()) + onDiscoveryDataReceivedMutable((DiscoveryData)e.getValue()); + } + } + } + + /** + * Processes data received in a discovery message. + * Used when Discovery SPI supports mutable custom messages. + * + * @param data received discovery data. + */ + private void onDiscoveryDataReceivedMutable(DiscoveryData data) { + if (data != null) { + for (DiscoveryDataItem item : data.items) { + if (!locInfos.containsKey(item.routineId)) { + registerHandlerOnJoin(data.nodeId, item.routineId, item.prjPred, + item.hnd, item.bufSize, item.interval, item.autoUnsubscribe); + } + + if (!item.autoUnsubscribe) { + locInfos.putIfAbsent(item.routineId, new LocalRoutineInfo(data.nodeId, + item.prjPred, item.hnd, item.bufSize, item.interval, item.autoUnsubscribe)); + } + } + + // Process CQs started on clients. + for (Map.Entry> entry : data.clientInfos.entrySet()) { + UUID clientNodeId = entry.getKey(); + + if (!ctx.localNodeId().equals(clientNodeId)) { + Map clientRoutineMap = entry.getValue(); + + for (Map.Entry e : clientRoutineMap.entrySet()) { + UUID routineId = e.getKey(); + LocalRoutineInfo info = e.getValue(); - for (ContinuousRoutineInfo routineInfo : commonData.startedRoutines) { - if (routinesInfo.routineExists(routineInfo.routineId)) - continue; + registerHandlerOnJoin(clientNodeId, routineId, info.prjPred, + info.hnd, info.bufSize, info.interval, info.autoUnsubscribe); + } + } - routinesInfo.addRoutineInfo(routineInfo); + Map map = + clientInfos.computeIfAbsent(clientNodeId, k -> new HashMap<>()); - onDiscoveryDataReceived(routineInfo); + map.putAll(entry.getValue()); } } } /** * Processes data received in a discovery message. + * Used when Discovery SPI doesn't support mutable custom messages. * * @param routineInfo Routine info. */ - private void onDiscoveryDataReceived(ContinuousRoutineInfo routineInfo) { + private void onDiscoveryDataReceivedImmutable(ContinuousRoutineInfo routineInfo) { IgnitePredicate nodeFilter; try { @@ -587,14 +780,16 @@ public UUID registerStaticRoutine( LocalRoutineInfo routineInfo = new LocalRoutineInfo(ctx.localNodeId(), prjPred, hnd, 1, 0, true); - routinesInfo.addRoutineInfo(createRoutineInfo( - ctx.localNodeId(), - routineId, - hnd, - prjPred, - routineInfo.bufSize, - routineInfo.interval, - routineInfo.autoUnsubscribe)); + if (immutableDiscoCustomMsg) { + routinesInfo.addRoutineInfo(createRoutineInfo( + ctx.localNodeId(), + routineId, + hnd, + prjPred, + routineInfo.bufSize, + routineInfo.interval, + routineInfo.autoUnsubscribe)); + } locInfos.put(routineId, routineInfo); @@ -784,7 +979,16 @@ private AbstractContinuousMessage createStartMessage(UUID routineId, reqData.prepareMarshal(ctx); - return new StartRoutineDiscoveryMessage(routineId, reqData); + if (!immutableDiscoCustomMsg) { + StartRoutineDiscoveryMessage msg = new StartRoutineDiscoveryMessage(routineId, reqData, Mode.MUTABLE); + + if (hnd.updateCounters() != null) + msg.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); + + return msg; + } + else + return new StartRoutineDiscoveryMessage(routineId, reqData, Mode.IMMUTABLE); } /** @@ -854,7 +1058,7 @@ public IgniteInternalFuture stopRoutine(UUID routineId) { unregisterHandler(routineId, routine.hnd, true); } - if (!stop) + if (!stop && immutableDiscoCustomMsg) stop = routinesInfo.routineExists(routineId); // Finish if routine is not found (wrong ID is provided). @@ -1022,8 +1226,11 @@ public void addNotification(UUID nodeId, @Override public void onDisconnected(IgniteFuture reconnectFut) { cancelFutures(new IgniteClientDisconnectedCheckedException(reconnectFut, "Client node disconnected.")); - if (log.isDebugEnabled()) - log.debug("onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos + ']'); + if (log.isDebugEnabled()) { + log.debug("onDisconnected [rmtInfos=" + rmtInfos + + ", locInfos=" + locInfos + + ", clientInfos=" + clientInfos + ']'); + } for (Map.Entry e : rmtInfos.entrySet()) { RemoteRoutineInfo info = e.getValue(); @@ -1037,10 +1244,16 @@ public void addNotification(UUID nodeId, rmtInfos.clear(); - routinesInfo.onClientDisconnected(locInfos.keySet()); + clientInfos.clear(); - if (log.isDebugEnabled()) - log.debug("after onDisconnected [rmtInfos=" + rmtInfos + ", locInfos=" + locInfos + ']'); + if (immutableDiscoCustomMsg) + routinesInfo.onClientDisconnected(locInfos.keySet()); + + if (log.isDebugEnabled()) { + log.debug("after onDisconnected [rmtInfos=" + rmtInfos + + ", locInfos=" + locInfos + + ", clientInfos=" + clientInfos + ']'); + } } /** @@ -1084,7 +1297,7 @@ private void processStopAckRequest(StopRoutineAckDiscoveryMessage msg) { /** * @param snd Sender node. - * @param msg Message. + * @param msg Message/ */ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg) { if (!snd.id().equals(ctx.localNodeId())) { @@ -1092,6 +1305,135 @@ private void processStopRequest(ClusterNode snd, StopRoutineDiscoveryMessage msg unregisterRemote(routineId); } + + for (Map clientInfo : clientInfos.values()) { + if (clientInfo.remove(msg.routineId()) != null) + break; + } + } + + /** + * @param topVer Topology version. + * @param msg Message. + */ + private void processStartAckRequest(AffinityTopologyVersion topVer, + StartRoutineAckDiscoveryMessage msg) { + StartFuture fut = startFuts.remove(msg.routineId()); + + if (fut != null) { + fut.onAllRemoteRegistered( + topVer, + msg.errors(), + msg.updateCountersPerNode(), + msg.updateCounters()); + } + } + + /** + * @param node Sender. + * @param req Start request. + */ + private void processStartRequestMutable(ClusterNode node, StartRoutineDiscoveryMessage req) { + if (node.id().equals(ctx.localNodeId())) + return; + + UUID routineId = req.routineId(); + + StartRequestData data = req.startRequestData(); + + IgniteCheckedException err = null; + + try { + data.finishUnmarshal(ctx, node.id()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal start request data [nodeId=" + node.id() + + ", routineId=" + routineId + ']', e); + + // Tolerate missing classes exceptions (e.g. remote filter class). + // We need this means because CQ registration process assumes that an "ack message" will be sent. + if (X.hasCause(e, ClassNotFoundException.class)) { + if (checkNodeFilter(req)) + req.addError(node.id(), e); + + return; + } + + err = e; + } + + GridContinuousHandler hnd = data.handler(); + + if (node.isClient()) { + Map clientRoutineMap = clientInfos.get(node.id()); + + if (clientRoutineMap == null) { + clientRoutineMap = new HashMap<>(); + + Map old = clientInfos.put(node.id(), clientRoutineMap); + + assert old == null; + } + + clientRoutineMap.put(routineId, new LocalRoutineInfo(node.id(), + data.nodeFilter(), + hnd, + data.bufferSize(), + data.interval(), + data.autoUnsubscribe())); + } + + if (err == null) { + try { + IgnitePredicate prjPred = data.nodeFilter(); + + if (prjPred != null) + ctx.resource().injectGeneric(prjPred); + + if ((prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) && + !locInfos.containsKey(routineId)) { + if (ctx.config().isPeerClassLoadingEnabled()) + hnd.p2pUnmarshal(node.id(), ctx); + + registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), + data.autoUnsubscribe(), false); + + // Load partition counters. + if (err == null && hnd.isQuery()) { + GridCacheProcessor proc = ctx.cache(); + + if (proc != null) { + GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); + + if (cache != null && cache.context().userCache()) + req.addUpdateCounters(ctx.localNodeId(), hnd.updateCounters()); + } + } + } + + if (!data.autoUnsubscribe()) + // Register routine locally. + locInfos.putIfAbsent(routineId, new LocalRoutineInfo( + node.id(), prjPred, hnd, data.bufferSize(), data.interval(), data.autoUnsubscribe())); + } + catch (IgniteCheckedException e) { + err = e; + + U.error(log, "Failed to register handler [nodeId=" + node.id() + ", routineId=" + routineId + ']', e); + } + } + + if (err != null) + req.addError(ctx.localNodeId(), err); + } + + /** */ + private boolean checkNodeFilter(StartRoutineDiscoveryMessage req) { + StartRequestData reqData = req.startRequestData(); + IgnitePredicate nodeFilter; + + return reqData == null || (nodeFilter = reqData.nodeFilter()) == null + || nodeFilter.apply(ctx.discovery().localNode()); } /** @@ -1110,7 +1452,7 @@ private void processRoutineStartResultMessage(UUID sndId, ContinuousRoutineStart * @param snd Sender. * @param msg Start request. */ - private void processStartRequest(final AffinityTopologyVersion topVer, + private void processStartRequestImmutable(final AffinityTopologyVersion topVer, final ClusterNode snd, final StartRoutineDiscoveryMessage msg) { StartRequestData reqData = msg.startRequestData(); @@ -1552,10 +1894,14 @@ private class DiscoveryListener implements GridLocalEventListener, HighPriorityL UUID nodeId = ((DiscoveryEvent)evt).eventNode().id(); - routinesInfo.onNodeFail(nodeId); + if (immutableDiscoCustomMsg) { + routinesInfo.onNodeFail(nodeId); + + for (StartFuture fut : startFuts.values()) + fut.onNodeFail(nodeId); + } - for (StartFuture fut : startFuts.values()) - fut.onNodeFail(nodeId); + clientInfos.remove(nodeId); // Unregister handlers created by left node. for (Map.Entry e : rmtInfos.entrySet()) { @@ -1931,6 +2277,157 @@ IgniteBiTuple checkInterval() { } } + /** + * Discovery data. + */ + private static class DiscoveryData implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Node ID. */ + private UUID nodeId; + + /** Items. */ + @GridToStringInclude + private Collection items; + + /** */ + private Map> clientInfos; + + /** + * Required by {@link Externalizable}. + */ + public DiscoveryData() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param clientInfos Client information. + */ + DiscoveryData(UUID nodeId, Map> clientInfos) { + assert nodeId != null; + + this.nodeId = nodeId; + + this.clientInfos = clientInfos; + + items = new ArrayList<>(); + } + + /** + * @param item Item. + */ + public void addItem(DiscoveryDataItem item) { + items.add(item); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, nodeId); + U.writeCollection(out, items); + U.writeMap(out, clientInfos); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + nodeId = U.readUuid(in); + items = U.readCollection(in); + clientInfos = U.readMap(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryData.class, this); + } + } + + /** + * Discovery data item. + */ + private static class DiscoveryDataItem implements Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Consume ID. */ + private UUID routineId; + + /** Projection predicate. */ + private IgnitePredicate prjPred; + + /** Handler. */ + private GridContinuousHandler hnd; + + /** Buffer size. */ + private int bufSize; + + /** Time interval. */ + private long interval; + + /** Automatic unsubscribe flag. */ + private boolean autoUnsubscribe; + + /** + * Required by {@link Externalizable}. + */ + public DiscoveryDataItem() { + // No-op. + } + + /** + * @param routineId Consume ID. + * @param prjPred Projection predicate. + * @param hnd Handler. + * @param bufSize Buffer size. + * @param interval Time interval. + * @param autoUnsubscribe Automatic unsubscribe flag. + */ + DiscoveryDataItem(UUID routineId, + @Nullable IgnitePredicate prjPred, + GridContinuousHandler hnd, + int bufSize, + long interval, + boolean autoUnsubscribe + ) { + assert routineId != null; + assert hnd != null; + assert bufSize > 0; + assert interval >= 0; + + this.routineId = routineId; + this.prjPred = prjPred; + this.hnd = hnd; + this.bufSize = bufSize; + this.interval = interval; + this.autoUnsubscribe = autoUnsubscribe; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + U.writeUuid(out, routineId); + out.writeObject(prjPred); + out.writeObject(hnd); + out.writeInt(bufSize); + out.writeLong(interval); + out.writeBoolean(autoUnsubscribe); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + routineId = U.readUuid(in); + prjPred = (IgnitePredicate)in.readObject(); + hnd = (GridContinuousHandler)in.readObject(); + bufSize = in.readInt(); + interval = in.readLong(); + autoUnsubscribe = in.readBoolean(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryDataItem.class, this); + } + } + /** * Future for start routine. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java new file mode 100644 index 0000000000000..ff4901dfd1c98 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.continuous; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { + /** */ + @Order(0) + Map errs; + + /** */ + @GridToStringExclude + @Order(1) + Map updateCntrs; + + /** */ + @GridToStringExclude + @Order(2) + Map> updateCntrsPerNode; + + /** + * @param routineId Routine id. + * @param errs Errs. + * @param cntrs Partition counters. + * @param cntrsPerNode Partition counters per node. + */ + public StartRoutineAckDiscoveryMessage(UUID routineId, + Map errs, + Map cntrs, + Map> cntrsPerNode) { + super(routineId); + + this.errs = new HashMap<>(errs); + updateCntrs = cntrs; + updateCntrsPerNode = cntrsPerNode; + } + + /** */ + public StartRoutineAckDiscoveryMessage() {} + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** + * @return Update counters for partitions. + */ + public Map updateCounters() { + return updateCntrs; + } + + /** + * @return Update counters for partitions per each node. + */ + public Map> updateCountersPerNode() { + return updateCntrsPerNode; + } + + /** + * @return Errs. + */ + public Map errors() { + return F.viewReadOnly(errs, m -> ErrorMessage.error(m)); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(StartRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 93b8f5d0dcef5..ddf9e63f6c424 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -17,26 +17,60 @@ package org.apache.ignite.internal.processors.continuous; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.Order; +import org.apache.ignite.internal.managers.communication.ErrorMessage; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; +import static org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage.Mode.MUTABLE; + /** * Discovery message used for Continuous Query registration. */ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { + /** Discovery message mode. */ + enum Mode { + /** Mutable discovery mode. */ + MUTABLE, + + /** Immutable discovery mode. */ + IMMUTABLE + } + /** */ @Order(0) StartRequestData startReqData; + /** Errors collected by mutable discovery. */ + @Order(1) + Map errs = new HashMap<>(); + + /** */ + @Order(2) + Map updateCntrs; + + /** */ + @Order(3) + Map> updateCntrsPerNode; + + /** Discovery message mode. */ + @Order(4) + Mode mode; + /** * @param routineId Routine id. * @param startReqData Start request data. + * @param mode Discovery message mode. */ - public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) { + StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, Mode mode) { super(routineId); this.startReqData = startReqData; + this.mode = mode; } /** */ @@ -49,6 +83,61 @@ public StartRequestData startRequestData() { return startReqData; } + /** + * @param nodeId Node id. + * @param e Exception. + */ + public void addError(UUID nodeId, IgniteCheckedException e) { + if (errs == null) + errs = new HashMap<>(); + + errs.put(nodeId, new ErrorMessage(e)); + } + + /** + * @param cntrs Update counters. + */ + private void addUpdateCounters(Map cntrs) { + if (updateCntrs == null) + updateCntrs = new HashMap<>(); + + for (Map.Entry e : cntrs.entrySet()) { + Long cntr0 = updateCntrs.get(e.getKey()); + Long cntr1 = e.getValue(); + + if (cntr0 == null || cntr1 > cntr0) + updateCntrs.put(e.getKey(), cntr1); + } + } + + /** + * @param nodeId Local node ID. + * @param cntrs Update counters. + */ + public void addUpdateCounters(UUID nodeId, Map cntrs) { + addUpdateCounters(cntrs); + + if (updateCntrsPerNode == null) + updateCntrsPerNode = new HashMap<>(); + + Map old = updateCntrsPerNode.put(nodeId, cntrs); + + assert old == null : old; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return mode == MUTABLE; + } + + /** {@inheritDoc} */ + @Override public DiscoveryCustomMessage ackMessage() { + if (!isMutable()) + return null; + + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateCntrs, updateCntrsPerNode); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(StartRoutineDiscoveryMessage.class, this, "routineId", routineId()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java index 91361996918c1..e889b5fd45fba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StopRoutineAckDiscoveryMessage.java @@ -18,7 +18,9 @@ package org.apache.ignite.internal.processors.continuous; import java.util.UUID; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; /** * @@ -36,6 +38,11 @@ public StopRoutineAckDiscoveryMessage(UUID routineId) { super(routineId); } + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(StopRoutineAckDiscoveryMessage.class, this, "routineId", routineId()); diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties index dc44b38a0f7d1..e13c53c74e0ab 100644 --- a/modules/core/src/main/resources/META-INF/classnames.properties +++ b/modules/core/src/main/resources/META-INF/classnames.properties @@ -1512,8 +1512,11 @@ org.apache.ignite.internal.processors.continuous.GridContinuousMessage org.apache.ignite.internal.processors.continuous.GridContinuousMessageType org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$11$1 org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$9 +org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData +org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryDataItem org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$LocalRoutineInfo org.apache.ignite.internal.processors.continuous.StartRequestData +org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java index d5356de0826ab..8f453ca4a876e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/ContinuousQueryBuffersCleanupTest.java @@ -194,6 +194,10 @@ private boolean isContinuesQueryBufferEmpty(Ignite ignite) { contProc.localRoutineInfos().forEach((routineId, locRoutineInfo) -> cqHandlers.add((CacheContinuousQueryHandler)locRoutineInfo.handler())); + contProc.clientRoutineInfos().forEach((nodeId, rmtRoutineInfos) -> + rmtRoutineInfos.forEach((routineId, locRoutineInfo) -> + cqHandlers.add((CacheContinuousQueryHandler)locRoutineInfo.handler()))); + for (CacheContinuousQueryHandler cqHnd : cqHandlers) { for (CacheContinuousQueryEventBuffer evtBuf : partitionContinuesQueryEntryBuffers(cqHnd).values()) { if (backupQueueSize(evtBuf) != 0) diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index febcb9eea0635..ad98a2936b98f 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -57,7 +57,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage; +import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.lang.GridAbsPredicate; @@ -2073,9 +2073,7 @@ public void testCustomEventAckNotSend() throws Exception { spi0.stopBeforeSndAck = true; - UUID routineId = ignite1.message().remoteListen("test", new DummyPredicate()); - - ignite1.message().stopRemoteListen(routineId); + ignite1.message().remoteListen("test", new DummyPredicate()); waitNodeStop(ignite0.name()); @@ -2651,7 +2649,7 @@ private static class TestCustomerEventAckSpi extends TcpDiscoverySpi { DiscoveryCustomMessage custMsg = U.unwrapCustomMessage(evtMsg.message()); - if (custMsg instanceof StopRoutineAckDiscoveryMessage) { + if (custMsg instanceof StartRoutineAckDiscoveryMessage) { log.info("Skip message send and stop node: " + msg); ses.socket().close(); From d697d8892b4c06c70e239f206ab33c921d2cdd41 Mon Sep 17 00:00:00 2001 From: chesnokoff Date: Sat, 16 May 2026 15:58:09 +0300 Subject: [PATCH 6/6] IGNITE-28477 Register start routine ack discovery message --- .../java/org/apache/ignite/internal/CoreMessagesProvider.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java index 702e2ee14553e..a79448bfcb4c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java @@ -184,6 +184,7 @@ import org.apache.ignite.internal.processors.continuous.ContinuousRoutineStartResultMessage; import org.apache.ignite.internal.processors.continuous.GridContinuousMessage; import org.apache.ignite.internal.processors.continuous.StartRequestData; +import org.apache.ignite.internal.processors.continuous.StartRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StartRoutineDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StopRoutineAckDiscoveryMessage; import org.apache.ignite.internal.processors.continuous.StopRoutineDiscoveryMessage; @@ -509,6 +510,7 @@ public CoreMessagesProvider(Marshaller dfltMarsh, Marshaller schemaAwareMarsh, C withNoSchema(LatchAckMessage.class); withSchema(AtomicApplicationAttributesAwareRequest.class); withNoSchema(StartRequestData.class); + withNoSchema(StartRoutineAckDiscoveryMessage.class); withNoSchema(StartRoutineDiscoveryMessage.class); withNoSchema(StoredCacheData.class);