Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 97 additions & 125 deletions datajunction-server/tests/api/nodes_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,12 @@ async def test_hard_deleting_node_with_versions(
):
"""
Test that hard deleting a node will remove all previous node revisions.

Tests representative nodes of each type to verify hard delete works correctly:
- Source: default.dispatchers
- Transform: default.regional_level_agg
- Dimension with versions: default.repair_order (has 3 versions after patches)
- Metric: default.num_repair_orders
"""
# Create a few revisions for the `default.repair_order` dimension
await client_with_roads.patch(
Expand All @@ -1465,37 +1471,48 @@ async def test_hard_deleting_node_with_versions(
response = await client_with_roads.get("/nodes/default.repair_order")
assert response.json()["version"] == "v3.0"

# Hard delete all nodes and verify after each delete
default_nodes = (await client_with_roads.get("/namespaces/default/")).json()
for node_name in default_nodes:
# Test representative nodes of each type instead of all nodes
# This covers the key scenarios while reducing test time from ~24s to ~5s
representative_nodes = [
"default.dispatchers", # source (no downstream deps)
"default.regional_level_agg", # transform
"default.repair_order", # dimension with multiple versions
"default.num_repair_orders", # metric
]

# Hard delete representative nodes and verify after each delete
for node_name in representative_nodes:
await self.verify_complete_hard_delete(
session,
client_with_roads,
node_name["name"],
node_name,
)

# Check that all nodes under the `default` namespace and their revisions have been deleted
nodes = (
(await session.execute(select(Node).where(Node.namespace == "default")))
.unique()
.scalars()
.all()
)
assert len(nodes) == 0
# Check that the representative nodes and their revisions have been deleted
for node_name in representative_nodes:
nodes = (
(await session.execute(select(Node).where(Node.name == node_name)))
.unique()
.scalars()
.all()
)
assert len(nodes) == 0, f"Node {node_name} should have been deleted"

revisions = (
(
await session.execute(
select(NodeRevision).where(
NodeRevision.name.like("default%"), # type: ignore
),
revisions = (
(
await session.execute(
select(NodeRevision).where(
NodeRevision.name == node_name,
),
)
)
.unique()
.scalars()
.all()
)
assert len(revisions) == 0, (
f"Revisions for {node_name} should have been deleted"
)
.unique()
.scalars()
.all()
)
assert len(revisions) == 0

@pytest.mark.asyncio
async def test_hard_deleting_a_node(
Expand Down Expand Up @@ -4800,7 +4817,14 @@ async def test_node_column_lineage(self, client_with_roads: AsyncClient):
@pytest.mark.asyncio
async def test_revalidating_existing_nodes(self, client_with_roads: AsyncClient):
"""
Test revalidating all example nodes and confirm that they are set to valid
Test revalidating representative nodes and confirm that they are set to valid.

Tests one node of each type to verify validation works correctly:
- Source: default.repair_orders
- Transform: default.repair_orders_fact
- Dimension: default.hard_hat
- Metric: default.num_repair_orders
- Cube: default.repairs_cube (created in test)
"""
await client_with_roads.post(
"/nodes/cube/",
Expand All @@ -4821,19 +4845,29 @@ async def test_revalidating_existing_nodes(self, client_with_roads: AsyncClient)
"name": "default.repairs_cube",
},
)
for node in (await client_with_roads.get("/nodes/")).json():
if node.startswith("default."):
status = (
await client_with_roads.post(
f"/nodes/{node}/validate/",
)
).json()["status"]
assert status == "valid"

# Test representative nodes of each type instead of all nodes
# This covers the key scenarios while reducing test time from ~15s to ~3s
representative_nodes = [
"default.repair_orders", # source
"default.repair_orders_fact", # transform
"default.hard_hat", # dimension
"default.num_repair_orders", # metric
"default.repairs_cube", # cube
]

for node in representative_nodes:
status = (
await client_with_roads.post(
f"/nodes/{node}/validate/",
)
).json()["status"]
assert status == "valid", f"Node {node} should be valid after revalidation"

# Confirm that they still show as valid server-side
for node in (await client_with_roads.get("/nodes/")).json():
if node.startswith("default."):
node = (await client_with_roads.get(f"/nodes/{node}")).json()
assert node["status"] == "valid"
for node in representative_nodes:
node_data = (await client_with_roads.get(f"/nodes/{node}")).json()
assert node_data["status"] == "valid", f"Node {node} should still be valid"

@pytest.mark.asyncio
async def test_lineage_on_complex_transforms(self, client_with_roads: AsyncClient):
Expand Down Expand Up @@ -6063,8 +6097,18 @@ async def test_copy_nodes(
metric_with_required_dim_payload,
):
"""
Test copying all nodes in the roads database
Test copying representative nodes of each type in the roads database.

Tests one node of each type to verify copy functionality works correctly:
- Source with dimension links: default.repair_orders
- Source without dimension links: default.dispatchers
- Transform: default.repair_orders_fact
- Dimension with links: default.hard_hat
- Dimension without links: default.dispatcher
- Metric: default.num_repair_orders
- Cube: default.repairs_cube
"""
# Expected dimension links for nodes that have them (after copy, join SQL references _copy)
expected_dimension_links = {
"default.repair_orders": [
{
Expand Down Expand Up @@ -6094,36 +6138,6 @@ async def test_copy_nodes(
"role": None,
},
],
"default.repair_order_details": [
{
"dimension": {"name": "default.repair_order"},
"join_type": "inner",
"join_sql": "default.repair_order_details_copy.repair_order_id "
"= default.repair_order.repair_order_id",
"join_cardinality": "many_to_one",
"role": None,
"foreign_keys": {
"default.repair_order_details_copy.repair_order_id": (
"default.repair_order.repair_order_id"
),
},
},
],
"default.repair_type": [
{
"dimension": {"name": "default.contractor"},
"join_type": "inner",
"join_sql": "default.repair_type_copy.contractor_id = "
"default.contractor.contractor_id",
"join_cardinality": "many_to_one",
"role": None,
"foreign_keys": {
"default.repair_type_copy.contractor_id": (
"default.contractor.contractor_id"
),
},
},
],
"default.repair_orders_fact": [
{
"dimension": {"name": "default.dispatcher"},
Expand Down Expand Up @@ -6190,74 +6204,32 @@ async def test_copy_nodes(
},
},
],
"default.repair_order": [
{
"dimension": {"name": "default.dispatcher"},
"join_type": "inner",
"join_sql": "default.repair_order_copy.dispatcher_id = "
"default.dispatcher.dispatcher_id",
"join_cardinality": "many_to_one",
"role": None,
"foreign_keys": {
"default.repair_order_copy.dispatcher_id": (
"default.dispatcher.dispatcher_id"
),
},
},
{
"dimension": {"name": "default.hard_hat"},
"join_type": "inner",
"join_sql": "default.repair_order_copy.hard_hat_id = "
"default.hard_hat.hard_hat_id",
"join_cardinality": "many_to_one",
"role": None,
"foreign_keys": {
"default.repair_order_copy.hard_hat_id": (
"default.hard_hat.hard_hat_id"
),
},
},
{
"dimension": {"name": "default.hard_hat_to_delete"},
"join_type": "left",
"join_sql": "default.repair_order_copy.hard_hat_id = "
"default.hard_hat_to_delete.hard_hat_id",
"join_cardinality": "many_to_one",
"role": None,
"foreign_keys": {
"default.repair_order_copy.hard_hat_id": (
"default.hard_hat_to_delete.hard_hat_id"
),
},
},
{
"dimension": {"name": "default.municipality_dim"},
"join_type": "inner",
"join_sql": "default.repair_order_copy.municipality_id = "
"default.municipality_dim.municipality_id",
"join_cardinality": "many_to_one",
"role": None,
"foreign_keys": {
"default.repair_order_copy.municipality_id": (
"default.municipality_dim.municipality_id"
),
},
},
],
}
await client_with_roads.post("/nodes/cube", json=repairs_cube_payload)
await client_with_roads.post(
"/nodes/metric",
json=metric_with_required_dim_payload,
)

# Copy all nodes to a node name with _copy appended
nodes = (await client_with_roads.get("/nodes")).json()
for node in sorted(nodes):
# Test representative nodes of each type instead of all nodes
# This covers: source (with/without links), transform, dimension (with/without links),
# metric, and cube - reducing test time from ~55s to ~5s
representative_nodes = [
"default.repair_orders", # source with dimension links
"default.dispatchers", # source without dimension links
"default.repair_orders_fact", # transform with dimension links
"default.hard_hat", # dimension with links to us_state
"default.dispatcher", # dimension without outgoing links
"default.num_repair_orders", # metric
"default.repairs_cube", # cube
]

# Copy representative nodes
for node in representative_nodes:
await client_with_roads.post(f"/nodes/{node}/copy?new_name={node}_copy")

# Check that each node was successfully copied by comparing against the original
for node in nodes:
for node in representative_nodes:
original = (await client_with_roads.get(f"/nodes/{node}")).json()
copied = (await client_with_roads.get(f"/nodes/{node}_copy")).json()
for field in ["name", "node_id", "node_revision_id", "updated_at"]:
Expand Down
Loading