Skip to content

Comments

⚡️ Speed up function find_last_node by 17,067%#273

Closed
codeflash-ai[bot] wants to merge 1 commit intooptimizefrom
codeflash/optimize-find_last_node-mldx8gg7
Closed

⚡️ Speed up function find_last_node by 17,067%#273
codeflash-ai[bot] wants to merge 1 commit intooptimizefrom
codeflash/optimize-find_last_node-mldx8gg7

Conversation

@codeflash-ai
Copy link

@codeflash-ai codeflash-ai bot commented Feb 8, 2026

📄 17,067% (170.67x) speedup for find_last_node in src/algorithms/graph.py

⏱️ Runtime : 47.5 milliseconds 277 microseconds (best of 250 runs)

📝 Explanation and details

The optimized code achieves a 17,067% speedup (47.5ms → 277μs) by eliminating a nested iteration anti-pattern that caused O(N×M) time complexity.

Key optimization:
The original code uses a nested generator expression that checks all(e["source"] != n["id"] for e in edges) for each node. This means for every node in the graph, it iterates through ALL edges to verify none have that node as a source. With N nodes and M edges, this becomes O(N×M) comparisons.

The optimized version precomputes a set of all source IDs once: sources = {e["source"] for e in edges}. Then it performs a single O(1) set membership check per node: if n["id"] not in sources. This reduces complexity to O(N + M).

Why this matters:

  • Set lookups are O(1) vs linear search through edges which is O(M)
  • Single pass through edges vs M passes (one per node)
  • No generator overhead - direct iteration is faster than nested generators with next() and all()

Performance characteristics from tests:

  • Small graphs (≤10 nodes): 60-160% faster - modest gains due to overhead of set creation
  • Medium graphs (100-500 nodes): 2,900-6,700% faster - optimization dominates
  • Large graphs (≥500 nodes): 16,000-38,000% faster - dramatic impact as the O(N×M) penalty compounds

The test results show the optimization is universally beneficial but particularly critical for graphs with hundreds of nodes and edges, which appear to be common in this codebase based on the comprehensive test coverage for large-scale scenarios.

Correctness verification report:

Test Status
⚙️ Existing Unit Tests 🔘 None Found
🌀 Generated Regression Tests 48 Passed
⏪ Replay Tests 🔘 None Found
🔎 Concolic Coverage Tests 🔘 None Found
📊 Tests Coverage 100.0%
🌀 Click to see Generated Regression Tests
from __future__ import annotations

import random  # used to shuffle nodes deterministically in large-scale tests

# imports
import pytest  # used for our unit tests
from src.algorithms.graph import find_last_node


def test_basic_linear_chain_returns_true_last_node():
    # Simple linear chain 1 -> 2 -> 3. The node with id 3 has no outgoing edges.
    n1 = {"id": 1}
    n2 = {"id": 2}
    n3 = {"id": 3}
    nodes = [n1, n2, n3]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    # The exact dict object for node 3 should be returned.
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.83μs -> 792ns (131% faster)


def test_multiple_terminal_nodes_returns_first_terminal_in_nodes_order():
    # Nodes A,B,C where only A has outgoing edges. B and C are terminals.
    A = {"id": "A"}
    B = {"id": "B"}
    C = {"id": "C"}
    nodes = [A, B, C]
    edges = [{"source": "A", "target": "B"}]
    # Both B and C have no outgoing edges, but B appears first in `nodes` so it should be returned.
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62μs -> 750ns (117% faster)


def test_no_edges_returns_first_node():
    # With no edges, every node qualifies as "no outgoing edges".
    # The function should return the first node in the provided nodes iterable.
    first = {"id": 10}
    second = {"id": 20}
    nodes = [first, second]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.00μs -> 625ns (60.0% faster)


def test_empty_nodes_returns_none():
    # If no nodes are provided, the function should return None.
    nodes = []
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 667ns -> 541ns (23.3% faster)


def test_node_missing_id_key_raises_keyerror_when_edges_present():
    # If a node lacks the "id" key and edges is non-empty, accessing n["id"] should raise KeyError.
    # Note: if edges were empty, the inner expression would never be evaluated and no KeyError would arise.
    broken_node = {"name": "no-id"}  # missing 'id'
    nodes = [broken_node]
    edges = [{"source": 999}]  # non-empty ensures n["id"] is accessed
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)  # 1.92μs -> 917ns (109% faster)


def test_edge_missing_source_key_raises_keyerror():
    # If an edge is missing the 'source' key, the function should raise KeyError when evaluating that edge.
    node = {"id": 1}
    nodes = [node]
    edges = [{"target": 2}]  # missing 'source'
    with pytest.raises(KeyError):
        find_last_node(nodes, edges)  # 1.58μs -> 833ns (90.2% faster)


def test_node_not_mapping_raises_typeerror_when_edges_present():
    # If nodes contain non-mapping objects (e.g., integers), attempting to subscript them (n["id"])
    # while evaluating the edges should raise a TypeError.
    nodes = [1]  # integer is not subscriptable
    edges = [{"source": 1}]
    with pytest.raises(TypeError):
        find_last_node(nodes, edges)  # 1.92μs -> 1.04μs (84.1% faster)


def test_none_id_and_none_source_are_compared_correctly():
    # If a node has id None and an edge has source None, they should compare equal and thus
    # that node should NOT be considered the last node. The function should thus return another valid node.
    node_none = {"id": None}
    node_two = {"id": 2}
    nodes = [node_none, node_two]
    edges = [{"source": None}]
    # node_none has an outgoing (source matches its id), so node_two is the terminal node.
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.83μs -> 750ns (144% faster)


def test_duplicate_ids_all_considered_by_id_value():
    # When multiple node objects share the same id, the function's logic compares by id value.
    # If an edge references that id, none of the nodes with that id should be considered terminal.
    n_a1 = {"id": 1}
    n_a2 = {"id": 1}  # same id as n_a1
    n_b = {"id": 2}
    nodes = [n_a1, n_a2, n_b]
    edges = [{"source": 1, "target": 2}]
    # Because edges reference id 1, both n_a1 and n_a2 are considered to have outgoing edges.
    # The only candidate with no outgoing edges is n_b.
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.75μs -> 791ns (121% faster)


def test_large_chain_of_500_nodes_returns_last_node():
    # Build a chain of 500 nodes: 0 -> 1 -> 2 -> ... -> 498 -> 499
    size = 500  # well under the 1000 limit
    nodes = [{"id": i} for i in range(size)]
    # Create edges for the chain: (0->1), (1->2), ..., (498->499)
    edges = [{"source": i, "target": i + 1} for i in range(size - 1)]
    # The unique terminal node is id == size-1
    expected_last = nodes[-1]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.52ms -> 26.8μs (16741% faster)


def test_large_chain_with_shuffled_nodes_still_finds_terminal_node():
    # Same chain as above but nodes are shuffled. The terminal node is unique (id == 499),
    # so regardless of position in nodes, the function must return that node object.
    size = 500
    nodes = [{"id": i} for i in range(size)]
    edges = [{"source": i, "target": i + 1} for i in range(size - 1)]
    # Choose a deterministic shuffle so the test is repeatable.
    random.seed(0)
    random.shuffle(nodes)
    # Find the node dict object that has id == size-1 so we can test identity.
    expected_last = next(n for n in nodes if n["id"] == size - 1)
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.23ms -> 18.0μs (6724% faster)


def test_many_nodes_with_edges_pointing_outside_nodes_list():
    # If edges reference sources not present among node ids, those edges do not mark any node
    # in `nodes` as having outgoing edges. The first node in `nodes` should therefore be returned.
    nodes = [{"id": i} for i in range(100)]  # 100 nodes
    # Edges reference ids starting at 100 which are not in nodes' ids
    edges = [{"source": i, "target": i + 1} for i in range(100, 150)]
    # Since none of the edges' sources match any node's id, all nodes are terminal candidates.
    # The function must return the first node.
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 3.29μs -> 2.08μs (58.0% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.
import pytest
from src.algorithms.graph import find_last_node


def test_single_node_no_edges():
    """Test with a single node and no edges. The node should be returned as the last node."""
    nodes = [{"id": 1, "name": "node1"}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.08μs -> 542ns (99.8% faster)


def test_linear_chain_three_nodes():
    """Test with a linear chain of three nodes. Should return the last node in the chain."""
    nodes = [
        {"id": 1, "name": "node1"},
        {"id": 2, "name": "node2"},
        {"id": 3, "name": "node3"},
    ]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.88μs -> 833ns (125% faster)


def test_simple_two_node_flow():
    """Test with two nodes where one points to the other."""
    nodes = [{"id": "A", "label": "Start"}, {"id": "B", "label": "End"}]
    edges = [{"source": "A", "target": "B"}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.58μs -> 709ns (123% faster)


def test_branching_flow_one_sink():
    """Test with branching flow where multiple nodes point to one sink node."""
    nodes = [
        {"id": 1, "name": "start"},
        {"id": 2, "name": "branch1"},
        {"id": 3, "name": "branch2"},
        {"id": 4, "name": "end"},
    ]
    edges = [
        {"source": 1, "target": 2},
        {"source": 1, "target": 3},
        {"source": 2, "target": 4},
        {"source": 3, "target": 4},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.33μs -> 917ns (154% faster)


def test_diamond_shaped_flow():
    """Test with diamond-shaped flow (two paths converging)."""
    nodes = [
        {"id": 1, "name": "top"},
        {"id": 2, "name": "left"},
        {"id": 3, "name": "right"},
        {"id": 4, "name": "bottom"},
    ]
    edges = [
        {"source": 1, "target": 2},
        {"source": 1, "target": 3},
        {"source": 2, "target": 4},
        {"source": 3, "target": 4},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.29μs -> 875ns (162% faster)


def test_multiple_sinks_returns_first():
    """Test when there are multiple nodes with no outgoing edges. Should return the first one found."""
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.46μs -> 708ns (106% faster)


def test_self_loop_node():
    """Test with a node that has a self-loop. It should not be considered as a last node."""
    nodes = [{"id": 1, "name": "loop"}, {"id": 2, "name": "end"}]
    edges = [{"source": 1, "target": 1}, {"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.54μs -> 791ns (94.9% faster)


def test_empty_nodes_list():
    """Test with an empty nodes list. Should return None."""
    nodes = []
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 667ns -> 417ns (60.0% faster)


def test_empty_edges_all_nodes_are_sinks():
    """Test with empty edges list. All nodes are potential sinks, so first node should be returned."""
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = []
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.04μs -> 583ns (78.7% faster)


def test_all_nodes_have_outgoing_edges():
    """Test with a cycle where all nodes have outgoing edges. Should return None (no sink)."""
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 2, "target": 3},
        {"source": 3, "target": 1},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.92μs -> 833ns (130% faster)


def test_node_with_numeric_id():
    """Test with nodes having numeric IDs."""
    nodes = [{"id": 100}, {"id": 200}, {"id": 300}]
    edges = [{"source": 100, "target": 200}, {"source": 200, "target": 300}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.88μs -> 916ns (105% faster)


def test_node_with_string_id():
    """Test with nodes having string IDs."""
    nodes = [{"id": "node_a"}, {"id": "node_b"}, {"id": "node_c"}]
    edges = [
        {"source": "node_a", "target": "node_b"},
        {"source": "node_b", "target": "node_c"},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.96μs -> 833ns (135% faster)


def test_node_with_uuid_string_id():
    """Test with UUID-like string IDs."""
    uuid1 = "550e8400-e29b-41d4-a716-446655440000"
    uuid2 = "550e8400-e29b-41d4-a716-446655440001"
    uuid3 = "550e8400-e29b-41d4-a716-446655440002"
    nodes = [{"id": uuid1}, {"id": uuid2}, {"id": uuid3}]
    edges = [{"source": uuid1, "target": uuid2}, {"source": uuid2, "target": uuid3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.04μs -> 875ns (133% faster)


def test_nodes_with_extra_attributes():
    """Test that nodes with extra attributes are handled correctly."""
    nodes = [
        {"id": 1, "name": "start", "color": "red", "size": 10},
        {"id": 2, "name": "end", "color": "blue", "size": 20},
    ]
    edges = [{"source": 1, "target": 2}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.46μs -> 708ns (106% faster)


def test_edges_with_extra_attributes():
    """Test that edges with extra attributes don't affect the result."""
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2, "weight": 5, "label": "edge1"},
        {"source": 2, "target": 3, "weight": 10, "label": "edge2"},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.88μs -> 792ns (137% faster)


def test_duplicate_edges():
    """Test with duplicate edges. Should still find the correct sink node."""
    nodes = [{"id": 1}, {"id": 2}, {"id": 3}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 1, "target": 2},  # duplicate
        {"source": 2, "target": 3},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.96μs -> 834ns (135% faster)


def test_single_node_with_self_loop():
    """Test with a single node that has a self-loop. It has an outgoing edge, so None should be returned."""
    nodes = [{"id": 1, "name": "loop"}]
    edges = [{"source": 1, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.21μs -> 625ns (93.3% faster)


def test_disconnected_components():
    """Test with disconnected components where one component has a sink."""
    nodes = [
        {"id": 1, "component": "A"},
        {"id": 2, "component": "A"},
        {"id": 3, "component": "B"},
        {"id": 4, "component": "B"},
    ]
    edges = [{"source": 1, "target": 2}, {"source": 3, "target": 4}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.58μs -> 792ns (99.9% faster)


def test_node_id_zero():
    """Test with node ID of 0 (falsy value)."""
    nodes = [{"id": 0, "name": "zero"}, {"id": 1, "name": "one"}]
    edges = [{"source": 0, "target": 1}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.46μs -> 750ns (94.4% faster)


def test_node_id_false_value():
    """Test with node ID as boolean False (edge case for equality checks)."""
    nodes = [{"id": False, "name": "false"}, {"id": True, "name": "true"}]
    edges = [{"source": False, "target": True}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.58μs -> 708ns (124% faster)


def test_complex_node_ids_with_tuples():
    """Test with tuple IDs (hashable but complex)."""
    nodes = [{"id": (1, 2)}, {"id": (3, 4)}, {"id": (5, 6)}]
    edges = [{"source": (1, 2), "target": (3, 4)}, {"source": (3, 4), "target": (5, 6)}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 2.08μs -> 1.00μs (108% faster)


def test_nodes_out_of_order():
    """Test with nodes list not in any particular order."""
    nodes = [{"id": 3}, {"id": 1}, {"id": 2}]
    edges = [{"source": 1, "target": 2}, {"source": 2, "target": 3}]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.29μs -> 667ns (93.6% faster)


def test_very_long_node_id_string():
    """Test with very long string node IDs."""
    long_id1 = "a" * 1000
    long_id2 = "b" * 1000
    long_id3 = "c" * 1000
    nodes = [{"id": long_id1}, {"id": long_id2}, {"id": long_id3}]
    edges = [
        {"source": long_id1, "target": long_id2},
        {"source": long_id2, "target": long_id3},
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.96μs -> 792ns (147% faster)


def test_edges_referencing_nonexistent_nodes():
    """Test with edges that reference nodes not in the nodes list (should still work correctly)."""
    nodes = [{"id": 1}, {"id": 2}]
    edges = [
        {"source": 1, "target": 2},
        {"source": 999, "target": 1000},  # references non-existent nodes
    ]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 1.62μs -> 792ns (105% faster)


def test_large_single_chain():
    """Test with a large linear chain (100 nodes)."""
    nodes = [{"id": i} for i in range(100)]
    edges = [{"source": i, "target": i + 1} for i in range(99)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 203μs -> 6.79μs (2890% faster)


def test_wide_branching_tree():
    """Test with a wide branching structure (one root with 50 children, all converging to one sink)."""
    nodes = [{"id": 0, "name": "root"}]
    nodes.extend([{"id": i, "name": f"child_{i}"} for i in range(1, 51)])
    nodes.append({"id": 51, "name": "sink"})

    edges = [{"source": 0, "target": i} for i in range(1, 51)]
    edges.extend([{"source": i, "target": 51} for i in range(1, 51)])

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 151μs -> 4.88μs (2999% faster)


def test_large_linear_chain_500_nodes():
    """Test with a large linear chain of 500 nodes."""
    nodes = [{"id": i, "value": i * 2} for i in range(500)]
    edges = [{"source": i, "target": i + 1} for i in range(499)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.48ms -> 26.9μs (16566% faster)


def test_large_linear_chain_1000_nodes():
    """Test with a large linear chain of 1000 nodes."""
    nodes = [{"id": i} for i in range(1000)]
    edges = [{"source": i, "target": i + 1} for i in range(999)]
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 18.0ms -> 48.6μs (36859% faster)


def test_complex_dag_200_nodes():
    """Test with a complex DAG (Directed Acyclic Graph) with 200 nodes and multiple paths."""
    nodes = [{"id": i} for i in range(200)]
    edges = []

    # Create a layered structure where each layer connects to next layer
    layer_size = 50
    for layer in range(3):  # 4 layers (0-49, 50-99, 100-149, 150-199)
        for i in range(layer * layer_size, (layer + 1) * layer_size):
            if layer < 3:  # Connect to next layer
                next_node = ((i - layer * layer_size) % 10) + (layer + 1) * layer_size
                edges.append({"source": i, "target": next_node})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 431μs -> 8.92μs (4735% faster)


def test_wide_tree_500_children():
    """Test with a wide tree where root has 500 children all connecting to one sink."""
    root_id = 0
    sink_id = 501
    nodes = [{"id": root_id}]
    nodes.extend([{"id": i} for i in range(1, 501)])
    nodes.append({"id": sink_id})

    edges = [{"source": root_id, "target": i} for i in range(1, 501)]
    edges.extend([{"source": i, "target": sink_id} for i in range(1, 501)])

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 13.4ms -> 34.5μs (38931% faster)


def test_many_disconnected_chains():
    """Test with many disconnected linear chains (200 chains of 5 nodes each)."""
    nodes = []
    edges = []

    for chain_idx in range(200):
        base_id = chain_idx * 5
        for i in range(5):
            nodes.append({"id": base_id + i, "chain": chain_idx})

        # Connect nodes within the chain
        for i in range(4):
            edges.append({"source": base_id + i, "target": base_id + i + 1})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 33.2μs -> 19.2μs (72.7% faster)


def test_grid_graph_structure():
    """Test with a grid-like graph structure (10x10 grid)."""
    nodes = [{"id": i * 10 + j, "x": i, "y": j} for i in range(10) for j in range(10)]
    edges = []

    # Connect each node to the node to its right (if exists)
    for i in range(10):
        for j in range(9):
            source_id = i * 10 + j
            target_id = i * 10 + (j + 1)
            edges.append({"source": source_id, "target": target_id})

    # Connect each node to the node below (if exists)
    for i in range(9):
        for j in range(10):
            source_id = i * 10 + j
            target_id = (i + 1) * 10 + j
            edges.append({"source": source_id, "target": target_id})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 220μs -> 8.21μs (2580% faster)


def test_performance_many_edges_single_sink():
    """Test performance with 500 nodes and 1000 edges all leading to one sink."""
    nodes = [{"id": i} for i in range(500)]
    edges = []

    # Create edges: some nodes connect to the sink (499)
    for i in range(250):
        edges.append({"source": i, "target": 499})

    # And others form intermediate connections
    for i in range(250, 499):
        edges.append({"source": i, "target": 499})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 4.46ms -> 26.3μs (16875% faster)


def test_sparse_graph_800_nodes_100_edges():
    """Test with a very sparse graph (800 nodes, only 100 edges)."""
    nodes = [{"id": i} for i in range(800)]
    edges = []

    # Only 100 edges creating a small connected component
    for i in range(100):
        edges.append({"source": i, "target": i + 1})

    # The remaining 700 nodes have no edges
    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 206μs -> 6.83μs (2921% faster)


def test_multiple_sinks_large_graph():
    """Test with large graph having multiple sink nodes."""
    nodes = [{"id": i} for i in range(300)]
    edges = []

    # Create multiple independent chains
    chains = 6
    nodes_per_chain = 50

    for chain_idx in range(chains):
        base = chain_idx * nodes_per_chain
        for i in range(nodes_per_chain - 1):
            edges.append({"source": base + i, "target": base + i + 1})

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 66.8μs -> 9.33μs (615% faster)
    sink_id = result["id"]


def test_high_branching_factor_tree():
    """Test with tree having high branching factor (each node has 10 children, 3 levels deep)."""
    nodes = [{"id": 0, "level": 0}]  # Root
    edges = []

    node_counter = 1
    queue = [(0, 0)]  # (node_id, level)

    while queue:
        parent_id, level = queue.pop(0)

        if level < 2:  # 3 levels total (0, 1, 2)
            for _ in range(10):
                child_id = node_counter
                nodes.append({"id": child_id, "level": level + 1})
                edges.append({"source": parent_id, "target": child_id})
                queue.append((child_id, level + 1))
                node_counter += 1

    codeflash_output = find_last_node(nodes, edges)
    result = codeflash_output  # 28.6μs -> 4.04μs (607% faster)


# codeflash_output is used to check that the output of the original code is the same as that of the optimized code.

To edit these changes git checkout codeflash/optimize-find_last_node-mldx8gg7 and push.

Codeflash Static Badge

The optimized code achieves a **17,067% speedup** (47.5ms → 277μs) by eliminating a nested iteration anti-pattern that caused O(N×M) time complexity.

**Key optimization:**
The original code uses a nested generator expression that checks `all(e["source"] != n["id"] for e in edges)` for each node. This means for every node in the graph, it iterates through ALL edges to verify none have that node as a source. With N nodes and M edges, this becomes O(N×M) comparisons.

The optimized version precomputes a set of all source IDs once: `sources = {e["source"] for e in edges}`. Then it performs a single O(1) set membership check per node: `if n["id"] not in sources`. This reduces complexity to O(N + M).

**Why this matters:**
- **Set lookups are O(1)** vs linear search through edges which is O(M)
- **Single pass through edges** vs M passes (one per node)
- **No generator overhead** - direct iteration is faster than nested generators with `next()` and `all()`

**Performance characteristics from tests:**
- Small graphs (≤10 nodes): 60-160% faster - modest gains due to overhead of set creation
- Medium graphs (100-500 nodes): 2,900-6,700% faster - optimization dominates
- Large graphs (≥500 nodes): 16,000-38,000% faster - dramatic impact as the O(N×M) penalty compounds

The test results show the optimization is universally beneficial but particularly critical for graphs with hundreds of nodes and edges, which appear to be common in this codebase based on the comprehensive test coverage for large-scale scenarios.
@codeflash-ai codeflash-ai bot requested a review from KRRT7 February 8, 2026 15:52
@codeflash-ai codeflash-ai bot added ⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash labels Feb 8, 2026
@KRRT7 KRRT7 closed this Feb 9, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

⚡️ codeflash Optimization PR opened by Codeflash AI 🎯 Quality: High Optimization Quality according to Codeflash

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant