Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 121 additions & 80 deletions codeflash/verification/coverage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,68 @@ def load_from_jest_json(
class JacocoCoverageUtils:
"""Coverage utils class for parsing JaCoCo XML reports (Java)."""

@staticmethod
def _extract_lines_for_method(
method_start_line: int | None, all_method_start_lines: list[int], line_data: dict[int, dict[str, int]]
) -> tuple[list[int], list[int], list[list[int]], list[list[int]]]:
"""Extract executed/unexecuted lines and branches for a method given its start line."""
executed_lines: list[int] = []
unexecuted_lines: list[int] = []
executed_branches: list[list[int]] = []
unexecuted_branches: list[list[int]] = []

if method_start_line:
method_end_line = None
for start_line in all_method_start_lines:
if start_line > method_start_line:
method_end_line = start_line - 1
break
if method_end_line is None:
all_lines = sorted(line_data.keys())
method_end_line = max(all_lines) if all_lines else method_start_line

for line_nr, data in sorted(line_data.items()):
if method_start_line <= line_nr <= method_end_line:
if data["ci"] > 0:
executed_lines.append(line_nr)
elif data["mi"] > 0:
unexecuted_lines.append(line_nr)
if data["cb"] > 0:
for i in range(data["cb"]):
executed_branches.append([line_nr, i])
if data["mb"] > 0:
for i in range(data["mb"]):
unexecuted_branches.append([line_nr, data["cb"] + i])
else:
for line_nr, data in sorted(line_data.items()):
if data["ci"] > 0:
executed_lines.append(line_nr)
elif data["mi"] > 0:
unexecuted_lines.append(line_nr)
if data["cb"] > 0:
for i in range(data["cb"]):
executed_branches.append([line_nr, i])
if data["mb"] > 0:
for i in range(data["mb"]):
unexecuted_branches.append([line_nr, data["cb"] + i])

return executed_lines, unexecuted_lines, executed_branches, unexecuted_branches

@staticmethod
def _compute_coverage_pct(executed_lines: list[int], unexecuted_lines: list[int], method_elem: Any | None) -> float:
"""Compute coverage %, preferring method-level LINE counter over line-by-line calculation."""
total_lines = set(executed_lines) | set(unexecuted_lines)
coverage_pct = (len(executed_lines) / len(total_lines) * 100) if total_lines else 0.0
if method_elem is not None:
for counter in method_elem.findall("counter"):
if counter.get("type") == "LINE":
missed = int(counter.get("missed", 0))
covered = int(counter.get("covered", 0))
if missed + covered > 0:
coverage_pct = covered / (missed + covered) * 100
break
return coverage_pct

@staticmethod
def load_from_jacoco_xml(
jacoco_xml_path: Path,
Expand Down Expand Up @@ -241,32 +303,31 @@ def load_from_jacoco_xml(
# Determine expected source file name from path
source_filename = source_code_path.name

# Find the matching sourcefile element and collect all method start lines
# Find the matching sourcefile element and collect all methods
sourcefile_elem = None
method_elem = None
method_start_line = None
all_method_start_lines: list[int] = []
# bare method name -> (element, start_line) for dependent function lookup
all_methods: dict[str, tuple[Any, int]] = {}

for package in root.findall(".//package"):
# Look for the sourcefile matching our source file
for sf in package.findall("sourcefile"):
if sf.get("name") == source_filename:
sourcefile_elem = sf
break

# Look for the class and method, collect all method start lines
for cls in package.findall("class"):
cls_source = cls.get("sourcefilename")
if cls_source == source_filename:
# Collect all method start lines for boundary detection
for method in cls.findall("method"):
method_line = int(method.get("line", 0))
if method_line > 0:
all_method_start_lines.append(method_line)

# Check if this is our target method
method_name = method.get("name")
if method_name == function_name:
bare_name = method.get("name")
if bare_name:
all_methods[bare_name] = (method, method_line)
if bare_name == function_name:
method_elem = method
method_start_line = method_line

Expand All @@ -277,16 +338,9 @@ def load_from_jacoco_xml(
logger.debug(f"No coverage data found for {source_filename} in JaCoCo report")
return CoverageData.create_empty(source_code_path, function_name, code_context)

# Sort method start lines to determine boundaries
all_method_start_lines = sorted(set(all_method_start_lines))

# Parse line-level coverage from sourcefile
executed_lines: list[int] = []
unexecuted_lines: list[int] = []
executed_branches: list[list[int]] = []
unexecuted_branches: list[list[int]] = []

# Get all line data
# Get all line data from the sourcefile element
line_data: dict[int, dict[str, int]] = {}
for line in sourcefile_elem.findall("line"):
line_nr = int(line.get("nr", 0))
Expand All @@ -297,67 +351,11 @@ def load_from_jacoco_xml(
"cb": int(line.get("cb", 0)), # covered branches
}

# Determine method boundaries
if method_start_line:
# Find the next method's start line to determine this method's end
method_end_line = None
for start_line in all_method_start_lines:
if start_line > method_start_line:
# Next method starts here, so our method ends before this
method_end_line = start_line - 1
break

# If no next method found, use the max line in the file
if method_end_line is None:
all_lines = sorted(line_data.keys())
method_end_line = max(all_lines) if all_lines else method_start_line

# Filter to lines within the method boundaries
for line_nr, data in sorted(line_data.items()):
if method_start_line <= line_nr <= method_end_line:
# Line is covered if it has covered instructions
if data["ci"] > 0:
executed_lines.append(line_nr)
elif data["mi"] > 0:
unexecuted_lines.append(line_nr)

# Branch coverage
if data["cb"] > 0:
# Covered branches - each branch is [line, branch_id]
for i in range(data["cb"]):
executed_branches.append([line_nr, i])
if data["mb"] > 0:
# Missed branches
for i in range(data["mb"]):
unexecuted_branches.append([line_nr, data["cb"] + i])
else:
# No method found - use all lines in the file
for line_nr, data in sorted(line_data.items()):
if data["ci"] > 0:
executed_lines.append(line_nr)
elif data["mi"] > 0:
unexecuted_lines.append(line_nr)

if data["cb"] > 0:
for i in range(data["cb"]):
executed_branches.append([line_nr, i])
if data["mb"] > 0:
for i in range(data["mb"]):
unexecuted_branches.append([line_nr, data["cb"] + i])

# Calculate coverage percentage
total_lines = set(executed_lines) | set(unexecuted_lines)
coverage_pct = (len(executed_lines) / len(total_lines) * 100) if total_lines else 0.0

# If we found method-level counters, use them as the authoritative source
if method_elem is not None:
for counter in method_elem.findall("counter"):
if counter.get("type") == "LINE":
missed = int(counter.get("missed", 0))
covered = int(counter.get("covered", 0))
if missed + covered > 0:
coverage_pct = covered / (missed + covered) * 100
break
# Extract main function coverage
executed_lines, unexecuted_lines, executed_branches, unexecuted_branches = (
JacocoCoverageUtils._extract_lines_for_method(method_start_line, all_method_start_lines, line_data)
)
coverage_pct = JacocoCoverageUtils._compute_coverage_pct(executed_lines, unexecuted_lines, method_elem)

main_func_coverage = FunctionCoverage(
name=function_name,
Expand All @@ -368,6 +366,42 @@ def load_from_jacoco_xml(
unexecuted_branches=unexecuted_branches,
)

# Find dependent (helper) function — mirrors Python behavior: only when exactly 1 helper exists
dependent_func_coverage = None
dep_helpers = code_context.helper_functions
if len(dep_helpers) == 1:
dep_helper = dep_helpers[0]
dep_bare_name = dep_helper.only_function_name
if dep_bare_name in all_methods:
dep_method_elem, dep_start_line = all_methods[dep_bare_name]
dep_executed, dep_unexecuted, dep_exec_branches, dep_unexec_branches = (
JacocoCoverageUtils._extract_lines_for_method(dep_start_line, all_method_start_lines, line_data)
)
dep_coverage_pct = JacocoCoverageUtils._compute_coverage_pct(
dep_executed, dep_unexecuted, dep_method_elem
)
dependent_func_coverage = FunctionCoverage(
name=dep_helper.qualified_name,
coverage=dep_coverage_pct,
executed_lines=sorted(dep_executed),
unexecuted_lines=sorted(dep_unexecuted),
executed_branches=dep_exec_branches,
unexecuted_branches=dep_unexec_branches,
)

# Total coverage = main function + helper (if any), matching Python behavior
total_executed = set(executed_lines)
total_unexecuted = set(unexecuted_lines)
if dependent_func_coverage:
total_executed.update(dependent_func_coverage.executed_lines)
total_unexecuted.update(dependent_func_coverage.unexecuted_lines)
total_lines_set = total_executed | total_unexecuted
total_coverage_pct = (len(total_executed) / len(total_lines_set) * 100) if total_lines_set else coverage_pct

functions_being_tested = [function_name]
if dependent_func_coverage:
functions_being_tested.append(dependent_func_coverage.name)

graph = {
function_name: {
"executed_lines": set(executed_lines),
Expand All @@ -376,16 +410,23 @@ def load_from_jacoco_xml(
"unexecuted_branches": unexecuted_branches,
}
}
if dependent_func_coverage:
graph[dependent_func_coverage.name] = {
"executed_lines": set(dependent_func_coverage.executed_lines),
"unexecuted_lines": set(dependent_func_coverage.unexecuted_lines),
"executed_branches": dependent_func_coverage.executed_branches,
"unexecuted_branches": dependent_func_coverage.unexecuted_branches,
}

return CoverageData(
file_path=source_code_path,
coverage=coverage_pct,
coverage=total_coverage_pct,
function_name=function_name,
functions_being_tested=[function_name],
functions_being_tested=functions_being_tested,
graph=graph,
code_context=code_context,
main_func_coverage=main_func_coverage,
dependent_func_coverage=None,
dependent_func_coverage=dependent_func_coverage,
status=CoverageStatus.PARSED_SUCCESSFULLY,
)

Expand Down
Loading
Loading