Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260220143557050413.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "finalize_graph streaming"
}
4 changes: 4 additions & 0 deletions .semversioner/next-release/patch-20260223133523034773.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "patch",
"description": "create_final_documents streaming"
}
76 changes: 49 additions & 27 deletions packages/graphrag/graphrag/index/operations/finalize_entities.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,56 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""All the steps to transform final entities."""
"""Stream-finalize entity rows into an output Table."""

from typing import Any
from uuid import uuid4

import pandas as pd
from graphrag_storage.tables.table import Table

from graphrag.data_model.schemas import ENTITIES_FINAL_COLUMNS
from graphrag.graphs.compute_degree import compute_degree


def finalize_entities(
entities: pd.DataFrame,
relationships: pd.DataFrame,
) -> pd.DataFrame:
"""All the steps to transform final entities."""
degrees = compute_degree(relationships)
final_entities = entities.merge(degrees, on="title", how="left").drop_duplicates(
subset="title"
)
final_entities = final_entities.loc[entities["title"].notna()].reset_index()
# disconnected nodes and those with no community even at level 0 can be missing degree
final_entities["degree"] = final_entities["degree"].fillna(0).astype(int)
final_entities.reset_index(inplace=True)
final_entities["human_readable_id"] = final_entities.index
final_entities["id"] = final_entities["human_readable_id"].apply(
lambda _x: str(uuid4())
)
return final_entities.loc[
:,
ENTITIES_FINAL_COLUMNS,
]


async def finalize_entities(
entities_table: Table,
degree_map: dict[str, int],
) -> list[dict[str, Any]]:
"""Read entity rows, enrich with degree, and write back.

Streams through the entities table, deduplicates by title,
assigns degree from the pre-computed degree map, and writes
each finalized row back to the same table (safe when using
truncate=True, which reads from the original and writes to
a temp file).

Args
----
entities_table: Table
Opened table for both reading input and writing output.
degree_map: dict[str, int]
Pre-computed mapping of entity title to node degree.

Returns
-------
list[dict[str, Any]]
Sample of up to 5 entity rows for logging.
"""
sample_rows: list[dict[str, Any]] = []
seen_titles: set[str] = set()
human_readable_id = 0

async for row in entities_table:
title = row.get("title")
if not title or title in seen_titles:
continue
seen_titles.add(title)
row["degree"] = degree_map.get(title, 0)
row["human_readable_id"] = human_readable_id
row["id"] = str(uuid4())
human_readable_id += 1
out = {col: row.get(col) for col in ENTITIES_FINAL_COLUMNS}
await entities_table.write(out)
if len(sample_rows) < 5:
sample_rows.append(out)

return sample_rows
Original file line number Diff line number Diff line change
@@ -1,42 +1,55 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""All the steps to transform final relationships."""
"""Stream-finalize relationship rows into an output Table."""

from typing import Any
from uuid import uuid4

import pandas as pd
from graphrag_storage.tables.table import Table

from graphrag.data_model.schemas import RELATIONSHIPS_FINAL_COLUMNS
from graphrag.graphs.compute_degree import compute_degree
from graphrag.index.operations.compute_edge_combined_degree import (
compute_edge_combined_degree,
)


def finalize_relationships(
relationships: pd.DataFrame,
) -> pd.DataFrame:
"""All the steps to transform final relationships."""
degrees = compute_degree(relationships)

final_relationships = relationships.drop_duplicates(subset=["source", "target"])
final_relationships["combined_degree"] = compute_edge_combined_degree(
final_relationships,
degrees,
node_name_column="title",
node_degree_column="degree",
edge_source_column="source",
edge_target_column="target",
)

final_relationships.reset_index(inplace=True)
final_relationships["human_readable_id"] = final_relationships.index
final_relationships["id"] = final_relationships["human_readable_id"].apply(
lambda _x: str(uuid4())
)

return final_relationships.loc[
:,
RELATIONSHIPS_FINAL_COLUMNS,
]


async def finalize_relationships(
relationships_table: Table,
degree_map: dict[str, int],
) -> list[dict[str, Any]]:
"""Deduplicate relationships, enrich with combined degree, and write.

Streams through the relationships table, deduplicates by
(source, target) pair, computes combined_degree as the sum of
source and target node degrees, and writes each finalized row
back to the table.

Args
----
relationships_table: Table
Opened table for reading and writing relationship rows.
degree_map: dict[str, int]
Pre-computed mapping of entity title to node degree.

Returns
-------
list[dict[str, Any]]
Sample of up to 5 relationship rows for logging.
"""
sample_rows: list[dict[str, Any]] = []
seen: set[tuple[str, str]] = set()
human_readable_id = 0

async for row in relationships_table:
key = (row.get("source", ""), row.get("target", ""))
if key in seen:
continue
seen.add(key)
row["combined_degree"] = degree_map.get(key[0], 0) + degree_map.get(key[1], 0)
row["human_readable_id"] = human_readable_id
row["id"] = str(uuid4())
human_readable_id += 1
final = {col: row.get(col) for col in RELATIONSHIPS_FINAL_COLUMNS}
await relationships_table.write(final)
if len(sample_rows) < 5:
sample_rows.append(final)

return sample_rows
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# Copyright (c) 2024 Microsoft Corporation.
# Copyright (C) 2026 Microsoft
# Licensed under the MIT License

"""A module containing run_workflow method definition."""
"""Workflow to create final documents with text unit mappings."""

import logging
from typing import Any

import pandas as pd
from graphrag_storage.tables.table import Table

from graphrag.config.models.graph_rag_config import GraphRagConfig
from graphrag.data_model.data_reader import DataReader
from graphrag.data_model.row_transformers import (
transform_document_row,
)
from graphrag.data_model.schemas import DOCUMENTS_FINAL_COLUMNS
from graphrag.index.typing.context import PipelineRunContext
from graphrag.index.typing.workflow import WorkflowFunctionOutput
Expand All @@ -20,49 +23,51 @@ async def run_workflow(
_config: GraphRagConfig,
context: PipelineRunContext,
) -> WorkflowFunctionOutput:
"""All the steps to transform final documents."""
"""Transform final documents via streaming Table reads/writes."""
logger.info("Workflow started: create_final_documents")
reader = DataReader(context.output_table_provider)
documents = await reader.documents()
text_units = await reader.text_units()

output = create_final_documents(documents, text_units)

await context.output_table_provider.write_dataframe("documents", output)
async with (
context.output_table_provider.open(
"text_units",
) as text_units_table,
context.output_table_provider.open(
"documents",
transformer=transform_document_row,
) as documents_table,
context.output_table_provider.open(
"documents",
) as output_table,
):
sample = await create_final_documents(
text_units_table,
documents_table,
output_table,
)

logger.info("Workflow completed: create_final_documents")
return WorkflowFunctionOutput(result=output)


def create_final_documents(
documents: pd.DataFrame, text_units: pd.DataFrame
) -> pd.DataFrame:
"""All the steps to transform final documents."""
renamed = text_units.loc[:, ["id", "document_id", "text"]].rename(
columns={
"document_id": "chunk_doc_id",
"id": "chunk_id",
"text": "chunk_text",
}
)

joined = renamed.merge(
documents,
left_on="chunk_doc_id",
right_on="id",
how="inner",
copy=False,
)

docs_with_text_units = joined.groupby("id", sort=False).agg(
text_unit_ids=("chunk_id", list)
)

rejoined = docs_with_text_units.merge(
documents,
on="id",
how="right",
copy=False,
).reset_index(drop=True)

return rejoined.loc[:, DOCUMENTS_FINAL_COLUMNS]
return WorkflowFunctionOutput(result=sample)


async def create_final_documents(
text_units_table: Table,
documents_table: Table,
output_table: Table,
) -> list[dict[str, Any]]:
"""Build text-unit mapping, then stream-enrich documents."""
mapping: dict[str, list[str]] = {}
async for row in text_units_table:
document_id = row.get("document_id", "")
if document_id:
mapping.setdefault(document_id, []).append(
row["id"],
)

sample_rows: list[dict[str, Any]] = []
async for row in documents_table:
row["text_unit_ids"] = mapping.get(row["id"], [])
out = {c: row.get(c) for c in DOCUMENTS_FINAL_COLUMNS}
await output_table.write(out)
if len(sample_rows) < 5:
sample_rows.append(out)

return sample_rows
Loading
Loading