From 087d94fcf260d48f1231c2562f2af24530530b1b Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 5 Feb 2026 16:54:41 +0800 Subject: [PATCH 1/3] fix(pipe): preserve exclusions for complex wildcards --- .../auto/basic/IoTDBTreePatternFormatIT.java | 56 +++++++++++++++++++ .../pipe/pattern/TreePatternPruningTest.java | 18 ++++++ .../datastructure/pattern/TreePattern.java | 38 ++++++++++++- 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java index 00e3078a355c0..0a26291490d3d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java @@ -441,4 +441,60 @@ public void testIoTDBPatternWithExclusionOnlyRealtimeData() throws Exception { "Time,root.db1.d1.s,root.db.d1.t,root.db.d2.s,", expectedResSet); } + + @Test + public void testIoTDBPatternWithInclusionAndExclusionHistoricalData() throws Exception { + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put("source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); + sourceAttributes.put("source.pattern.exclusion", "root.test.g_0.d_20.**"); + sourceAttributes.put("source.inclusion", "data.insert"); + sourceAttributes.put("user", "root"); + + final List insertQueries = + Arrays.asList( + "insert into root.test.g_0.d_21(time, s_1) values (1, 1)", + "insert into root.test.g_0.d_22(time, s_1) values (2, 2)", + "insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)", + "insert into root.test.g_0.d_30(time, s_1) values (4, 4)"); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,null,"); + expectedResSet.add("2,null,2.0,"); + + testPipeWithMultiplePatterns( + sourceAttributes, + insertQueries, + true, // isHistorical = true + "select * from root.test.**", + "Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,", + expectedResSet); + } + + @Test + public void testIoTDBPatternWithInclusionAndExclusionRealtimeData() throws Exception { + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put("source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); + sourceAttributes.put("source.pattern.exclusion", "root.test.g_0.d_20.**"); + sourceAttributes.put("source.inclusion", "data.insert"); + sourceAttributes.put("user", "root"); + + final List insertQueries = + Arrays.asList( + "insert into root.test.g_0.d_21(time, s_1) values (1, 1)", + "insert into root.test.g_0.d_22(time, s_1) values (2, 2)", + "insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)", + "insert into root.test.g_0.d_30(time, s_1) values (4, 4)"); + + final Set expectedResSet = new HashSet<>(); + expectedResSet.add("1,1.0,null,"); + expectedResSet.add("2,null,2.0,"); + + testPipeWithMultiplePatterns( + sourceAttributes, + insertQueries, + false, // isHistorical = false + "select * from root.test.**", + "Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,", + expectedResSet); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java index d5a209e5d87be..cb6acb340a72f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBTreePattern; +import org.apache.iotdb.commons.pipe.datastructure.pattern.WithExclusionIoTDBTreePattern; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -172,4 +173,21 @@ public void testLegacyPathMultipleRulesRejected() { // Expected exception } } + + @Test + public void testWithExclusionPreserved() { + final PipeParameters params = + new PipeParameters( + new HashMap() { + { + put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, + "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); + put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.test.g_0.d_20.**"); + } + }); + + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern); + Assert.assertEquals("INCLUSION(root.test.g_0.d_2*.**), EXCLUSION(root.test.g_0.d_20.**)", result.getPattern()); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index 5d6c808a0bbd6..97f1d77fff9e8 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.path.PathPatternUtil; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.utils.TestOnly; @@ -873,6 +874,7 @@ public static int[] checkAndLogPatternCoverage( /** A specialized Trie to efficiently check path coverage. */ private static class PatternTrie { private final TrieNode root = new TrieNode(); + private final List complexPatterns = new ArrayList<>(); private static class TrieNode { // Children nodes mapped by specific path segments (excluding *) @@ -888,6 +890,11 @@ private static class TrieNode { /** Adds a path to the Trie. */ public void add(final PartialPath path) { + if (containsComplexWildcard(path)) { + complexPatterns.add(path); + return; + } + TrieNode node = root; final String[] nodes = path.getNodes(); @@ -924,7 +931,15 @@ public void add(final PartialPath path) { /** Checks if the given path is covered by any existing pattern in the Trie. */ public boolean isCovered(final PartialPath path) { - return checkCoverage(root, path.getNodes(), 0); + if (checkCoverage(root, path.getNodes(), 0)) { + return true; + } + for (final PartialPath complexPattern : complexPatterns) { + if (complexPattern.include(path)) { + return true; + } + } + return false; } private boolean checkCoverage(final TrieNode node, final String[] pathNodes, final int index) { @@ -957,7 +972,15 @@ private boolean checkCoverage(final TrieNode node, final String[] pathNodes, fin /** Checks if the given path overlaps with any pattern in the Trie. */ public boolean overlaps(final PartialPath path) { - return checkOverlap(root, path.getNodes(), 0); + if (checkOverlap(root, path.getNodes(), 0)) { + return true; + } + for (final PartialPath complexPattern : complexPatterns) { + if (complexPattern.overlapWith(path)) { + return true; + } + } + return false; } private boolean checkOverlap(final TrieNode node, final String[] pathNodes, final int index) { @@ -1004,5 +1027,16 @@ private boolean checkOverlap(final TrieNode node, final String[] pathNodes, fina // 5b. Check '*' in Trie (matches specific query node) return node.wildcardNode != null && checkOverlap(node.wildcardNode, pathNodes, index + 1); } + + private static boolean containsComplexWildcard(final PartialPath path) { + for (final String node : path.getNodes()) { + if (PathPatternUtil.hasWildcard(node) + && !IoTDBConstant.ONE_LEVEL_PATH_WILDCARD.equals(node) + && !IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(node)) { + return true; + } + } + return false; + } } } From 249c557977b39376050a51748f9305956b8493c4 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 5 Feb 2026 17:36:13 +0800 Subject: [PATCH 2/3] spotless --- .../auto/basic/IoTDBTreePatternFormatIT.java | 50 ++++++++++--------- .../pipe/pattern/TreePatternPruningTest.java | 32 ++++++------ 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java index 0a26291490d3d..fc5d483ea5c30 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBTreePatternFormatIT.java @@ -445,56 +445,58 @@ public void testIoTDBPatternWithExclusionOnlyRealtimeData() throws Exception { @Test public void testIoTDBPatternWithInclusionAndExclusionHistoricalData() throws Exception { final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); + sourceAttributes.put( + "source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); sourceAttributes.put("source.pattern.exclusion", "root.test.g_0.d_20.**"); sourceAttributes.put("source.inclusion", "data.insert"); sourceAttributes.put("user", "root"); final List insertQueries = - Arrays.asList( - "insert into root.test.g_0.d_21(time, s_1) values (1, 1)", - "insert into root.test.g_0.d_22(time, s_1) values (2, 2)", - "insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)", - "insert into root.test.g_0.d_30(time, s_1) values (4, 4)"); + Arrays.asList( + "insert into root.test.g_0.d_21(time, s_1) values (1, 1)", + "insert into root.test.g_0.d_22(time, s_1) values (2, 2)", + "insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)", + "insert into root.test.g_0.d_30(time, s_1) values (4, 4)"); final Set expectedResSet = new HashSet<>(); expectedResSet.add("1,1.0,null,"); expectedResSet.add("2,null,2.0,"); testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - true, // isHistorical = true - "select * from root.test.**", - "Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,", - expectedResSet); + sourceAttributes, + insertQueries, + true, // isHistorical = true + "select * from root.test.**", + "Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,", + expectedResSet); } @Test public void testIoTDBPatternWithInclusionAndExclusionRealtimeData() throws Exception { final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put("source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); + sourceAttributes.put( + "source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); sourceAttributes.put("source.pattern.exclusion", "root.test.g_0.d_20.**"); sourceAttributes.put("source.inclusion", "data.insert"); sourceAttributes.put("user", "root"); final List insertQueries = - Arrays.asList( - "insert into root.test.g_0.d_21(time, s_1) values (1, 1)", - "insert into root.test.g_0.d_22(time, s_1) values (2, 2)", - "insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)", - "insert into root.test.g_0.d_30(time, s_1) values (4, 4)"); + Arrays.asList( + "insert into root.test.g_0.d_21(time, s_1) values (1, 1)", + "insert into root.test.g_0.d_22(time, s_1) values (2, 2)", + "insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)", + "insert into root.test.g_0.d_30(time, s_1) values (4, 4)"); final Set expectedResSet = new HashSet<>(); expectedResSet.add("1,1.0,null,"); expectedResSet.add("2,null,2.0,"); testPipeWithMultiplePatterns( - sourceAttributes, - insertQueries, - false, // isHistorical = false - "select * from root.test.**", - "Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,", - expectedResSet); + sourceAttributes, + insertQueries, + false, // isHistorical = false + "select * from root.test.**", + "Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,", + expectedResSet); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java index cb6acb340a72f..eaa7e17865367 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/pattern/TreePatternPruningTest.java @@ -174,20 +174,22 @@ public void testLegacyPathMultipleRulesRejected() { } } - @Test - public void testWithExclusionPreserved() { - final PipeParameters params = - new PipeParameters( - new HashMap() { - { - put(PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, - "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); - put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.test.g_0.d_20.**"); - } - }); - - final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); - Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern); - Assert.assertEquals("INCLUSION(root.test.g_0.d_2*.**), EXCLUSION(root.test.g_0.d_20.**)", result.getPattern()); + @Test + public void testWithExclusionPreserved() { + final PipeParameters params = + new PipeParameters( + new HashMap() { + { + put( + PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY, + "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0"); + put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.test.g_0.d_20.**"); + } + }); + + final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params); + Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern); + Assert.assertEquals( + "INCLUSION(root.test.g_0.d_2*.**), EXCLUSION(root.test.g_0.d_20.**)", result.getPattern()); } } From 486866dc60350299635cd0e81b3021a5d3195326 Mon Sep 17 00:00:00 2001 From: VGalaxies Date: Thu, 5 Feb 2026 17:41:52 +0800 Subject: [PATCH 3/3] perf(pipe): avoid extra traversal in pattern trie --- .../datastructure/pattern/TreePattern.java | 22 +++++-------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java index 97f1d77fff9e8..399565ecaab43 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/pattern/TreePattern.java @@ -890,15 +890,16 @@ private static class TrieNode { /** Adds a path to the Trie. */ public void add(final PartialPath path) { - if (containsComplexWildcard(path)) { - complexPatterns.add(path); - return; - } - TrieNode node = root; final String[] nodes = path.getNodes(); for (final String segment : nodes) { + if (PathPatternUtil.hasWildcard(segment) + && !IoTDBConstant.ONE_LEVEL_PATH_WILDCARD.equals(segment) + && !IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(segment)) { + complexPatterns.add(path); + return; + } // If we are at a node that is already a MultiLevelWildcard (**), // everything below is already covered. We can stop adding. if (node.isMultiLevelWildcard) { @@ -1027,16 +1028,5 @@ private boolean checkOverlap(final TrieNode node, final String[] pathNodes, fina // 5b. Check '*' in Trie (matches specific query node) return node.wildcardNode != null && checkOverlap(node.wildcardNode, pathNodes, index + 1); } - - private static boolean containsComplexWildcard(final PartialPath path) { - for (final String node : path.getNodes()) { - if (PathPatternUtil.hasWildcard(node) - && !IoTDBConstant.ONE_LEVEL_PATH_WILDCARD.equals(node) - && !IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(node)) { - return true; - } - } - return false; - } } }