Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,62 @@ public void testIoTDBPatternWithExclusionOnlyRealtimeData() throws Exception {
"Time,root.db1.d1.s,root.db.d1.t,root.db.d2.s,",
expectedResSet);
}

@Test
public void testIoTDBPatternWithInclusionAndExclusionHistoricalData() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
sourceAttributes.put(
"source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0");
sourceAttributes.put("source.pattern.exclusion", "root.test.g_0.d_20.**");
sourceAttributes.put("source.inclusion", "data.insert");
sourceAttributes.put("user", "root");

final List<String> insertQueries =
Arrays.asList(
"insert into root.test.g_0.d_21(time, s_1) values (1, 1)",
"insert into root.test.g_0.d_22(time, s_1) values (2, 2)",
"insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)",
"insert into root.test.g_0.d_30(time, s_1) values (4, 4)");

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,null,");
expectedResSet.add("2,null,2.0,");

testPipeWithMultiplePatterns(
sourceAttributes,
insertQueries,
true, // isHistorical = true
"select * from root.test.**",
"Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,",
expectedResSet);
}

@Test
public void testIoTDBPatternWithInclusionAndExclusionRealtimeData() throws Exception {
final Map<String, String> sourceAttributes = new HashMap<>();
sourceAttributes.put(
"source.pattern.inclusion", "root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0");
sourceAttributes.put("source.pattern.exclusion", "root.test.g_0.d_20.**");
sourceAttributes.put("source.inclusion", "data.insert");
sourceAttributes.put("user", "root");

final List<String> insertQueries =
Arrays.asList(
"insert into root.test.g_0.d_21(time, s_1) values (1, 1)",
"insert into root.test.g_0.d_22(time, s_1) values (2, 2)",
"insert into root.test.g_0.d_20(time, s_0, s_1) values (3, 3, 3)",
"insert into root.test.g_0.d_30(time, s_1) values (4, 4)");

final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1,1.0,null,");
expectedResSet.add("2,null,2.0,");

testPipeWithMultiplePatterns(
sourceAttributes,
insertQueries,
false, // isHistorical = false
"select * from root.test.**",
"Time,root.test.g_0.d_21.s_1,root.test.g_0.d_22.s_1,",
expectedResSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.UnionIoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.WithExclusionIoTDBTreePattern;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.exception.PipeException;

Expand Down Expand Up @@ -172,4 +173,23 @@ public void testLegacyPathMultipleRulesRejected() {
// Expected exception
}
}

@Test
public void testWithExclusionPreserved() {
final PipeParameters params =
new PipeParameters(
new HashMap<String, String>() {
{
put(
PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY,
"root.test.g_0.d_2*.**,root.test.g_0.d_20.s_0");
put(PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY, "root.test.g_0.d_20.**");
}
});

final TreePattern result = TreePattern.parsePipePatternFromSourceParameters(params);
Assert.assertTrue(result instanceof WithExclusionIoTDBTreePattern);
Assert.assertEquals(
"INCLUSION(root.test.g_0.d_2*.**), EXCLUSION(root.test.g_0.d_20.**)", result.getPattern());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternUtil;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.commons.utils.TestOnly;
Expand Down Expand Up @@ -873,6 +874,7 @@ public static int[] checkAndLogPatternCoverage(
/** A specialized Trie to efficiently check path coverage. */
private static class PatternTrie {
private final TrieNode root = new TrieNode();
private final List<PartialPath> complexPatterns = new ArrayList<>();

private static class TrieNode {
// Children nodes mapped by specific path segments (excluding *)
Expand All @@ -892,6 +894,12 @@ public void add(final PartialPath path) {
final String[] nodes = path.getNodes();

for (final String segment : nodes) {
if (PathPatternUtil.hasWildcard(segment)
&& !IoTDBConstant.ONE_LEVEL_PATH_WILDCARD.equals(segment)
&& !IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD.equals(segment)) {
complexPatterns.add(path);
return;
}
// If we are at a node that is already a MultiLevelWildcard (**),
// everything below is already covered. We can stop adding.
if (node.isMultiLevelWildcard) {
Expand Down Expand Up @@ -924,7 +932,15 @@ public void add(final PartialPath path) {

/** Checks if the given path is covered by any existing pattern in the Trie. */
public boolean isCovered(final PartialPath path) {
return checkCoverage(root, path.getNodes(), 0);
if (checkCoverage(root, path.getNodes(), 0)) {
return true;
}
for (final PartialPath complexPattern : complexPatterns) {
if (complexPattern.include(path)) {
return true;
}
}
return false;
}

private boolean checkCoverage(final TrieNode node, final String[] pathNodes, final int index) {
Expand Down Expand Up @@ -957,7 +973,15 @@ private boolean checkCoverage(final TrieNode node, final String[] pathNodes, fin

/** Checks if the given path overlaps with any pattern in the Trie. */
public boolean overlaps(final PartialPath path) {
return checkOverlap(root, path.getNodes(), 0);
if (checkOverlap(root, path.getNodes(), 0)) {
return true;
}
for (final PartialPath complexPattern : complexPatterns) {
if (complexPattern.overlapWith(path)) {
return true;
}
}
return false;
}

private boolean checkOverlap(final TrieNode node, final String[] pathNodes, final int index) {
Expand Down