Skip to content

Commit 28e1b52

Browse files
authored
Merge pull request #34 from microsoft/bug_fixes
Bug fixes
2 parents e06571a + 60b067e commit 28e1b52

File tree

9 files changed

+214
-22
lines changed

9 files changed

+214
-22
lines changed

flowquery-py/src/parsing/operations/limit.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@ def __init__(self, limit: int):
1111
self._count = 0
1212
self._limit = limit
1313

14+
@property
15+
def is_limit_reached(self) -> bool:
16+
return self._count >= self._limit
17+
18+
def increment(self) -> None:
19+
self._count += 1
20+
1421
async def run(self) -> None:
1522
if self._count >= self._limit:
1623
return

flowquery-py/src/parsing/operations/return_op.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from typing import TYPE_CHECKING, Any, Dict, List, Optional
55

66
from ..ast_node import ASTNode
7+
from .limit import Limit
78
from .projection import Projection
89

910
if TYPE_CHECKING:
@@ -24,6 +25,7 @@ def __init__(self, expressions: List[ASTNode]) -> None:
2425
super().__init__(expressions)
2526
self._where: Optional['Where'] = None
2627
self._results: List[Dict[str, Any]] = []
28+
self._limit: Optional[Limit] = None
2729

2830
@property
2931
def where(self) -> Any:
@@ -35,16 +37,28 @@ def where(self) -> Any:
3537
def where(self, where: 'Where') -> None:
3638
self._where = where
3739

40+
@property
41+
def limit(self) -> Optional[Limit]:
42+
return self._limit
43+
44+
@limit.setter
45+
def limit(self, limit: Limit) -> None:
46+
self._limit = limit
47+
3848
async def run(self) -> None:
3949
if not self.where:
4050
return
51+
if self._limit is not None and self._limit.is_limit_reached:
52+
return
4153
record: Dict[str, Any] = {}
4254
for expression, alias in self.expressions():
4355
raw = expression.value()
4456
# Deep copy objects to preserve their state
4557
value = copy.deepcopy(raw) if isinstance(raw, (dict, list)) else raw
4658
record[alias] = value
4759
self._results.append(record)
60+
if self._limit is not None:
61+
self._limit.increment()
4862

4963
async def initialize(self) -> None:
5064
self._results = []

flowquery-py/src/parsing/parser.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,9 @@ def _parse_tokenized(self, is_sub_query: bool = False) -> ASTNode:
116116
if self.token.is_union():
117117
break
118118

119+
if self.token.is_eof():
120+
break
121+
119122
operation = self._parse_operation()
120123
if operation is None and not is_sub_query:
121124
raise ValueError("Expected one of WITH, UNWIND, RETURN, LOAD, OR CALL")
@@ -145,8 +148,11 @@ def _parse_tokenized(self, is_sub_query: bool = False) -> ASTNode:
145148

146149
limit = self._parse_limit()
147150
if limit is not None:
148-
operation.add_sibling(limit)
149-
operation = limit
151+
if isinstance(operation, Return):
152+
operation.limit = limit
153+
else:
154+
operation.add_sibling(limit)
155+
operation = limit
150156

151157
previous = operation
152158

@@ -539,13 +545,11 @@ def _parse_node(self) -> Optional[Node]:
539545
node.properties = dict(self._parse_properties())
540546
if identifier is not None and identifier in self._state.variables:
541547
reference = self._state.variables.get(identifier)
542-
# Resolve through Expression -> Reference -> Node (e.g., after WITH)
543-
ref_child = reference.first_child() if isinstance(reference, Expression) else None
544-
if isinstance(ref_child, Reference):
545-
inner = ref_child.referred
546-
if isinstance(inner, Node):
547-
reference = inner
548-
if reference is None or (not isinstance(reference, Node) and not isinstance(reference, Unwind)):
548+
if reference is None or (
549+
not isinstance(reference, Node)
550+
and not isinstance(reference, Unwind)
551+
and not isinstance(reference, Expression)
552+
):
549553
raise ValueError(f"Undefined node reference: {identifier}")
550554
node = NodeReference(node, reference)
551555
elif identifier is not None:

flowquery-py/tests/compute/test_runner.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,20 @@ async def test_limit(self):
925925
results = runner.results
926926
assert len(results) == 50
927927

928+
@pytest.mark.asyncio
929+
async def test_limit_as_last_operation(self):
930+
"""Test limit as the last operation after return."""
931+
runner = Runner(
932+
"""
933+
unwind range(1, 10) as i
934+
return i
935+
limit 5
936+
"""
937+
)
938+
await runner.run()
939+
results = runner.results
940+
assert len(results) == 5
941+
928942
@pytest.mark.asyncio
929943
async def test_range_lookup(self):
930944
"""Test range lookup."""
@@ -1457,6 +1471,71 @@ async def test_match_with_referenced_to_previous_variable(self):
14571471
assert results[0] == {"name1": "Person 1", "name2": "Person 2", "name3": "Person 3"}
14581472
assert results[1] == {"name1": "Person 2", "name2": "Person 3", "name3": "Person 4"}
14591473

1474+
@pytest.mark.asyncio
1475+
async def test_match_with_aggregated_with_and_subsequent_match(self):
1476+
"""Test match with aggregated WITH followed by another match using the same node reference."""
1477+
await Runner(
1478+
"""
1479+
CREATE VIRTUAL (:AggUser) AS {
1480+
unwind [
1481+
{id: 1, name: 'Alice'},
1482+
{id: 2, name: 'Bob'},
1483+
{id: 3, name: 'Carol'}
1484+
] as record
1485+
RETURN record.id as id, record.name as name
1486+
}
1487+
"""
1488+
).run()
1489+
await Runner(
1490+
"""
1491+
CREATE VIRTUAL (:AggUser)-[:KNOWS]-(:AggUser) AS {
1492+
unwind [
1493+
{left_id: 1, right_id: 2},
1494+
{left_id: 1, right_id: 3}
1495+
] as record
1496+
RETURN record.left_id as left_id, record.right_id as right_id
1497+
}
1498+
"""
1499+
).run()
1500+
await Runner(
1501+
"""
1502+
CREATE VIRTUAL (:AggProject) AS {
1503+
unwind [
1504+
{id: 1, name: 'Project A'},
1505+
{id: 2, name: 'Project B'}
1506+
] as record
1507+
RETURN record.id as id, record.name as name
1508+
}
1509+
"""
1510+
).run()
1511+
await Runner(
1512+
"""
1513+
CREATE VIRTUAL (:AggUser)-[:WORKS_ON]-(:AggProject) AS {
1514+
unwind [
1515+
{left_id: 1, right_id: 1},
1516+
{left_id: 1, right_id: 2}
1517+
] as record
1518+
RETURN record.left_id as left_id, record.right_id as right_id
1519+
}
1520+
"""
1521+
).run()
1522+
match = Runner(
1523+
"""
1524+
MATCH (u:AggUser)-[:KNOWS]->(s:AggUser)
1525+
WITH u, count(s) as acquaintances
1526+
MATCH (u)-[:WORKS_ON]->(p:AggProject)
1527+
RETURN u.name as name, acquaintances, collect(p.name) as projects
1528+
"""
1529+
)
1530+
await match.run()
1531+
results = match.results
1532+
assert len(results) == 1
1533+
assert results[0] == {
1534+
"name": "Alice",
1535+
"acquaintances": 2,
1536+
"projects": ["Project A", "Project B"],
1537+
}
1538+
14601539
@pytest.mark.asyncio
14611540
async def test_match_and_return_full_node(self):
14621541
"""Test match and return full node."""

src/parsing/operations/group_by.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,12 @@ class GroupBy extends Projection {
5252
let node: Node = this.current;
5353
for (const mapper of this.mappers) {
5454
const value: any = mapper.value();
55-
let child: Node | undefined = node.children.get(value);
55+
const key: string =
56+
typeof value === "object" && value !== null ? JSON.stringify(value) : String(value);
57+
let child: Node | undefined = node.children.get(key);
5658
if (child === undefined) {
5759
child = new Node(value);
58-
node.children.set(value, child);
60+
node.children.set(key, child);
5961
}
6062
node = child;
6163
}

src/parsing/operations/limit.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ class Limit extends Operation {
77
super();
88
this.limit = limit;
99
}
10+
public get isLimitReached(): boolean {
11+
return this.count >= this.limit;
12+
}
13+
public increment(): void {
14+
this.count++;
15+
}
1016
public async run(): Promise<void> {
1117
if (this.count >= this.limit) {
1218
return;
@@ -19,4 +25,4 @@ class Limit extends Operation {
1925
}
2026
}
2127

22-
export default Limit;
28+
export default Limit;

src/parsing/operations/return.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import Limit from "./limit";
12
import Projection from "./projection";
23
import Where from "./where";
34

@@ -15,6 +16,7 @@ import Where from "./where";
1516
class Return extends Projection {
1617
protected _where: Where | null = null;
1718
protected _results: Record<string, any>[] = [];
19+
private _limit: Limit | null = null;
1820
public set where(where: Where) {
1921
this._where = where;
2022
}
@@ -24,17 +26,26 @@ class Return extends Projection {
2426
}
2527
return this._where.value();
2628
}
29+
public set limit(limit: Limit) {
30+
this._limit = limit;
31+
}
2732
public async run(): Promise<void> {
2833
if (!this.where) {
2934
return;
3035
}
36+
if (this._limit !== null && this._limit.isLimitReached) {
37+
return;
38+
}
3139
const record: Map<string, any> = new Map();
3240
for (const [expression, alias] of this.expressions()) {
3341
const raw = expression.value();
3442
const value: any = typeof raw === "object" && raw !== null ? structuredClone(raw) : raw;
3543
record.set(alias, value);
3644
}
3745
this._results.push(Object.fromEntries(record));
46+
if (this._limit !== null) {
47+
this._limit.increment();
48+
}
3849
}
3950
public async initialize(): Promise<void> {
4051
this._results = [];

src/parsing/parser.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ class Parser extends BaseParser {
112112
if (this.token.isUnion()) {
113113
break;
114114
}
115+
if (this.token.isEOF()) {
116+
break;
117+
}
115118
operation = this.parseOperation();
116119
if (operation === null && !isSubQuery) {
117120
throw new Error("Expected one of WITH, UNWIND, RETURN, LOAD, OR CALL");
@@ -142,8 +145,12 @@ class Parser extends BaseParser {
142145
}
143146
const limit = this.parseLimit();
144147
if (limit !== null) {
145-
operation!.addSibling(limit);
146-
operation = limit;
148+
if (operation instanceof Return) {
149+
(operation as Return).limit = limit;
150+
} else {
151+
operation!.addSibling(limit);
152+
operation = limit;
153+
}
147154
}
148155
previous = operation;
149156
}
@@ -494,16 +501,11 @@ class Parser extends BaseParser {
494501
node.label = label!;
495502
if (identifier !== null && this._state.variables.has(identifier)) {
496503
let reference = this._state.variables.get(identifier);
497-
// Resolve through Expression -> Reference -> Node (e.g., after WITH)
498-
if (reference instanceof Expression && reference.firstChild() instanceof Reference) {
499-
const inner = (reference.firstChild() as Reference).referred;
500-
if (inner instanceof Node) {
501-
reference = inner;
502-
}
503-
}
504504
if (
505505
reference === undefined ||
506-
(!(reference instanceof Node) && !(reference instanceof Unwind))
506+
(!(reference instanceof Node) &&
507+
!(reference instanceof Unwind) &&
508+
!(reference instanceof Expression))
507509
) {
508510
throw new Error(`Undefined node reference: ${identifier}`);
509511
}

tests/compute/runner.test.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,6 +842,19 @@ test("Test limit", async () => {
842842
expect(results.length).toBe(50);
843843
});
844844

845+
test("Test limit as last operation", async () => {
846+
const runner = new Runner(
847+
`
848+
unwind range(1, 10) as i
849+
return i
850+
limit 5
851+
`
852+
);
853+
await runner.run();
854+
const results = runner.results;
855+
expect(results.length).toBe(5);
856+
});
857+
845858
test("Test range lookup", async () => {
846859
const runner = new Runner(
847860
`
@@ -1327,6 +1340,60 @@ test("Test match with referenced to previous variable", async () => {
13271340
expect(results[1]).toEqual({ name1: "Person 2", name2: "Person 3", name3: "Person 4" });
13281341
});
13291342

1343+
test("Test match with aggregated with and subsequent match", async () => {
1344+
await new Runner(`
1345+
CREATE VIRTUAL (:User) AS {
1346+
unwind [
1347+
{id: 1, name: 'Alice'},
1348+
{id: 2, name: 'Bob'},
1349+
{id: 3, name: 'Carol'}
1350+
] as record
1351+
RETURN record.id as id, record.name as name
1352+
}
1353+
`).run();
1354+
await new Runner(`
1355+
CREATE VIRTUAL (:User)-[:KNOWS]-(:User) AS {
1356+
unwind [
1357+
{left_id: 1, right_id: 2},
1358+
{left_id: 1, right_id: 3}
1359+
] as record
1360+
RETURN record.left_id as left_id, record.right_id as right_id
1361+
}
1362+
`).run();
1363+
await new Runner(`
1364+
CREATE VIRTUAL (:Project) AS {
1365+
unwind [
1366+
{id: 1, name: 'Project A'},
1367+
{id: 2, name: 'Project B'}
1368+
] as record
1369+
RETURN record.id as id, record.name as name
1370+
}
1371+
`).run();
1372+
await new Runner(`
1373+
CREATE VIRTUAL (:User)-[:WORKS_ON]-(:Project) AS {
1374+
unwind [
1375+
{left_id: 1, right_id: 1},
1376+
{left_id: 1, right_id: 2}
1377+
] as record
1378+
RETURN record.left_id as left_id, record.right_id as right_id
1379+
}
1380+
`).run();
1381+
const match = new Runner(`
1382+
MATCH (u:User)-[:KNOWS]->(s:User)
1383+
WITH u, count(s) as acquaintances
1384+
MATCH (u)-[:WORKS_ON]->(p:Project)
1385+
RETURN u.name as name, acquaintances, collect(p.name) as projects
1386+
`);
1387+
await match.run();
1388+
const results = match.results;
1389+
expect(results.length).toBe(1);
1390+
expect(results[0]).toEqual({
1391+
name: "Alice",
1392+
acquaintances: 2,
1393+
projects: ["Project A", "Project B"],
1394+
});
1395+
});
1396+
13301397
test("Test match and return full node", async () => {
13311398
await new Runner(`
13321399
CREATE VIRTUAL (:Person) AS {

0 commit comments

Comments
 (0)