Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "slopometry"
version = "2026.3.11"
version = "2026.3.20"
description = "Opinionated code quality metrics for code agents and humans"
readme = "README.md"
requires-python = ">=3.13"
Expand Down
2 changes: 1 addition & 1 deletion src/slopometry/core/code_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def analyze_file(self, file_path: Path) -> FileAnalysisResult:
tokens=count_file_tokens(file_path),
)
except Exception as e:
logger.warning("Failed to analyze %s: %s", file_path, e)
logger.debug("Failed to analyze %s: %s", file_path, e)
return FileAnalysisResult(
path=str(file_path),
complexity=0,
Expand Down
10 changes: 5 additions & 5 deletions src/slopometry/core/compact_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def analyze_transcript(self, transcript_path: Path) -> list[CompactEvent]:
compact_events.append(compact_event)

except OSError as e:
logger.warning(f"Failed to read transcript file {transcript_path}: {e}")
logger.debug(f"Failed to read transcript file {transcript_path}: {e}")

return compact_events

Expand Down Expand Up @@ -110,13 +110,13 @@ def _create_compact_event(

timestamp_str = boundary.timestamp or summary.timestamp
if not timestamp_str:
logger.warning(f"Compact event at line {line_number} missing timestamp, skipping")
logger.debug(f"Compact event at line {line_number} missing timestamp, skipping")
return None

try:
timestamp = datetime.fromisoformat(timestamp_str.replace("Z", "+00:00"))
except ValueError:
logger.warning(f"Compact event at line {line_number} has invalid timestamp '{timestamp_str}', skipping")
logger.debug(f"Compact event at line {line_number} has invalid timestamp '{timestamp_str}', skipping")
return None

return CompactEvent(
Expand Down Expand Up @@ -179,7 +179,7 @@ def _transcript_matches_project(transcript_path: Path, working_directory: Path)
return False
return Path(cwd).resolve() == working_directory
except (OSError, json.JSONDecodeError) as e:
logger.warning(f"Failed to read transcript {transcript_path} for project matching: {e}")
logger.debug(f"Failed to read transcript {transcript_path} for project matching: {e}")
return False


Expand Down Expand Up @@ -217,7 +217,7 @@ def find_compact_instructions(transcript_path: Path, compact_line_number: int, l
continue

except OSError as e:
logger.warning(f"Failed to read transcript {transcript_path} for compact instructions: {e}")
logger.debug(f"Failed to read transcript {transcript_path} for compact instructions: {e}")

return None

Expand Down
4 changes: 2 additions & 2 deletions src/slopometry/core/complexity_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def _build_files_by_loc(self, python_files: list[Path], target_dir: Path) -> dic
relative_path = self._get_relative_path(file_path, target_dir)
files_by_loc[relative_path] = code_loc
except (OSError, UnicodeDecodeError) as e:
logger.warning(f"Skipping unreadable file {file_path}: {e}")
logger.debug(f"Skipping unreadable file {file_path}: {e}")
continue
return files_by_loc

Expand Down Expand Up @@ -294,7 +294,7 @@ def _analyze_files_parallel(self, files: list[Path], max_workers: int | None = N
results.append(result)
except Exception as e:
file_path = futures[future]
logger.warning(f"Failed to analyze {file_path}: {e}")
logger.debug(f"Failed to analyze {file_path}: {e}")
results.append(
FileAnalysisResult(
path=str(file_path),
Expand Down
94 changes: 43 additions & 51 deletions src/slopometry/core/hook_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from slopometry.core.lock import SlopometryLock
from slopometry.core.models.complexity import ComplexityDelta, ExtendedComplexityMetrics
from slopometry.core.models.hook import (
FeedbackCacheState,
HookEvent,
HookEventType,
HookInputUnion,
Expand Down Expand Up @@ -262,23 +263,18 @@ def _get_feedback_cache_path(working_directory: str) -> Path:
return cache_dir / "feedback_cache.json"


def _compute_feedback_cache_key(working_directory: str, edited_files: set[str], feedback_hash: str) -> str:
"""Compute a cache key for the current state.
def _compute_working_tree_cache_key(working_directory: str) -> str:
"""Compute a cache key based solely on working tree state.

Uses language-aware change detection to avoid cache invalidation from
non-source file changes (like uv.lock, submodules, build artifacts, etc.).

The languages parameter defaults to None (all supported languages).
Currently only Python is supported; future languages will be auto-detected
via LanguageDetector when added to the registry.
The cache key depends only on the git commit and source file contents,
making it stable across sessions and independent of smell analysis output.
This ensures the hook fires exactly once per code state change.

Args:
working_directory: Path to the working directory
edited_files: Set of edited file paths
feedback_hash: Hash of the feedback content

Returns:
Cache key string
Cache key string (BLAKE2b hex digest)
"""
tracker = GitTracker(Path(working_directory))
git_state = tracker.get_git_state()
Expand All @@ -288,42 +284,38 @@ def _compute_feedback_cache_key(working_directory: str, edited_files: set[str],
has_source_changes = bool(wt_calculator._get_modified_source_files_from_git())
working_tree_hash = wt_calculator.calculate_working_tree_hash(commit_sha) if has_source_changes else "clean"

files_key = ",".join(sorted(edited_files))
key_parts = f"{commit_sha}:{working_tree_hash}:{files_key}:{feedback_hash}"
key_parts = f"{commit_sha}:{working_tree_hash}"
return hashlib.blake2b(key_parts.encode(), digest_size=8).hexdigest()


def _is_feedback_cached(working_directory: str, cache_key: str) -> bool:
"""Check if the feedback for this state was already shown.

Args:
working_directory: Path to the working directory
cache_key: Cache key to check
def _load_feedback_cache(working_directory: str) -> FeedbackCacheState | None:
"""Load the feedback cache state from disk.

Returns:
True if feedback was already shown for this state
FeedbackCacheState if cache exists and is valid, None otherwise
"""
cache_path = _get_feedback_cache_path(working_directory)
if not cache_path.exists():
return False
return None

try:
cache_data = json.loads(cache_path.read_text())
return cache_data.get("last_key") == cache_key
except (json.JSONDecodeError, OSError):
return False
return FeedbackCacheState.model_validate_json(cache_path.read_text())
except (json.JSONDecodeError, OSError, ValueError):
return None


def _save_feedback_cache(working_directory: str, cache_key: str) -> None:
"""Save the feedback cache key.
def _save_feedback_cache(working_directory: str, cache_key: str, file_hashes: dict[str, str]) -> None:
"""Save the feedback cache state with per-file content hashes.

Args:
working_directory: Path to the working directory
cache_key: Cache key to save
cache_key: Working tree cache key
file_hashes: Per-file content hashes at the time of this cache save
"""
cache_path = _get_feedback_cache_path(working_directory)
try:
cache_path.write_text(json.dumps({"last_key": cache_key}))
state = FeedbackCacheState(last_key=cache_key, file_hashes=file_hashes)
cache_path.write_text(state.model_dump_json())
except OSError as e:
logger.debug(f"Failed to save feedback cache: {e}")

Expand Down Expand Up @@ -356,16 +348,26 @@ def handle_stop_event(session_id: str, parsed_input: "StopInput | SubagentStopIn

current_metrics, delta = db.calculate_extended_complexity_metrics(stats.working_directory)

feedback_parts: list[str] = []
cache_stable_parts: list[str] = []
# Determine which files changed since the last time feedback was shown.
# Uses per-file content hashes from the feedback cache to filter out
# pre-existing uncommitted changes that haven't changed.
wt_calculator = WorkingTreeStateCalculator(stats.working_directory, languages=None)
cached_state = _load_feedback_cache(stats.working_directory)

# Get edited files from git (more reliable than transcript-based context coverage)
try:
wt_calculator = WorkingTreeStateCalculator(stats.working_directory, languages=None)
# Early exit: if working tree state hasn't changed since last feedback, skip
cache_key = _compute_working_tree_cache_key(stats.working_directory)
if cached_state is not None and cached_state.last_key == cache_key:
return 0

current_file_hashes = wt_calculator.get_source_file_content_hashes()

if cached_state is not None:
edited_files = wt_calculator.get_files_changed_since(cached_state.file_hashes)
else:
# No cache yet (first run) — treat all modified source files as edited
edited_files = wt_calculator.get_modified_source_file_paths()
except (ValueError, OSError) as e:
logger.debug(f"Failed to get modified source files: {e}")
edited_files = set()

feedback_parts: list[str] = []

# Smell feedback: split into code-based (stable) and context-derived (unstable)
# Context-derived smells (e.g., unread_related_tests) change with every transcript
Expand All @@ -381,14 +383,12 @@ def handle_stop_event(session_id: str, parsed_input: "StopInput | SubagentStopIn
code_feedback, has_code_smells, _ = format_code_smell_feedback(code_smells, session_id)
if has_code_smells:
feedback_parts.append(code_feedback)
cache_stable_parts.append(code_feedback)

context_smell_feedback, has_context_smells, _ = format_code_smell_feedback(context_smells, session_id)
if has_context_smells:
feedback_parts.append(context_smell_feedback)

# Context coverage - informational but NOT stable (changes with every Read/Glob/Grep)
# Excluded from cache hash to avoid invalidation on tool calls
if settings.enable_complexity_feedback and stats.context_coverage and stats.context_coverage.has_gaps:
context_feedback = format_context_coverage_feedback(stats.context_coverage)
if context_feedback:
Expand All @@ -399,25 +399,17 @@ def handle_stop_event(session_id: str, parsed_input: "StopInput | SubagentStopIn
if dev_guidelines:
feedback_parts.append(f"\n**Project Development Guidelines:**\n{dev_guidelines}")

# Save cache with current file hashes regardless of whether feedback is shown.
# This ensures the next stop event compares against this point in time.
_save_feedback_cache(stats.working_directory, cache_key, current_file_hashes)

if feedback_parts:
feedback = "\n\n".join(feedback_parts)

# Cache key uses only code-based smell feedback — context coverage
# changes with every tool call and would invalidate cache
cache_content = "\n\n".join(cache_stable_parts) if cache_stable_parts else ""
feedback_hash = hashlib.blake2b(cache_content.encode(), digest_size=8).hexdigest()

feedback += (
f"\n\n---\n**Session**: `{session_id}` | Details: `slopometry solo show {session_id} --smell-details`"
)

cache_key = _compute_feedback_cache_key(stats.working_directory, edited_files, feedback_hash)

if _is_feedback_cached(stats.working_directory, cache_key):
return 0

_save_feedback_cache(stats.working_directory, cache_key)

hook_output = {"decision": "block", "reason": feedback}
print(json.dumps(hook_output))
return 2
Expand Down
2 changes: 1 addition & 1 deletion src/slopometry/core/language_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def _get_tracked_files(self) -> list[str]:
return [line for line in result.stdout.strip().split("\n") if line]

except subprocess.TimeoutExpired:
logger.warning("Language detection timed out for %s", self.repo_path)
logger.debug("Language detection timed out for %s", self.repo_path)
return []
except FileNotFoundError:
logger.debug("git not found, cannot detect languages in %s", self.repo_path)
Expand Down
15 changes: 15 additions & 0 deletions src/slopometry/core/models/hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ class GitState(BaseModel):
commit_sha: str | None = None


class FeedbackCacheState(BaseModel):
"""Persisted state of the feedback cache for change-based firing.

Stored in .slopometry/feedback_cache.json. The hook only fires when
the working tree state changes since the last time feedback was shown.
Per-file content hashes enable computing which specific files changed.
"""

last_key: str = Field(description="Cache key from last fire: commit_sha:working_tree_hash")
file_hashes: dict[str, str] = Field(
default_factory=dict,
description="Per-file content hashes (rel_path -> BLAKE2b hex) at time of last fire",
)


class HookEvent(BaseModel):
"""Represents a single hook invocation event."""

Expand Down
5 changes: 2 additions & 3 deletions src/slopometry/core/python_feature_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FeatureStats(BaseModel):
orphan_comment_count: int = SmellField(
label="Orphan Comments",
files_field="orphan_comment_files",
guidance="Make sure inline code comments add meaningful information about non-obvious design tradeoffs or explain tech debt or performance implications. Consider if these could be docstrings or field descriptors instead",
guidance="Make sure inline code comments add meaningful information about non-obvious design tradeoffs or explain tech debt or performance implications. Consider if these could be docstrings or field descriptors instead. Prefix with `# NOTE:`, `# REASON:`, `# PERF:`, `# SAFETY:`, `# WORKAROUND:`, `# CAVEAT:`, `# COMPAT:`, or `# IMPORTANT:` to mark intentional design decisions (these are excluded from the count)",
)
untracked_todo_count: int = SmellField(
label="Untracked TODOs",
Expand Down Expand Up @@ -166,7 +166,6 @@ def _analyze_single_file_features(file_path: Path) -> FeatureStats | None:
total_loc, code_loc = _count_loc(content)
path_str = str(file_path)

# 4 smells come from non-AST analysis; rest from FeatureVisitor
non_ast_counts: dict[str, int] = {
"orphan_comment_count": orphan_comments,
"untracked_todo_count": untracked_todos,
Expand Down Expand Up @@ -331,7 +330,7 @@ def _analyze_files_parallel(self, files: list[Path], max_workers: int | None = N
results.append(result)
except Exception as e:
file_path = futures[future]
logger.warning(f"Failed to analyze features for {file_path}: {e}")
logger.debug(f"Failed to analyze features for {file_path}: {e}")
results.append(None)

return results
Expand Down
2 changes: 1 addition & 1 deletion src/slopometry/core/tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def count_file_tokens(file_path: Path) -> int | TokenCountError:
content = file_path.read_text(encoding="utf-8")
return count_tokens(content)
except Exception as e:
logger.warning("Failed to read file for token counting %s: %s", file_path, e)
logger.debug("Failed to read file for token counting %s: %s", file_path, e)
return TokenCountError(message=str(e), path=str(file_path))


Expand Down
8 changes: 4 additions & 4 deletions src/slopometry/core/transcript_token_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ def extract_transcript_metadata(transcript_path: Path) -> TranscriptMetadata:
if agent_version is not None and model is not None and git_branch is not None:
break
except OSError as e:
logger.warning(f"Failed to read transcript for metadata: {e}")
logger.debug(f"Failed to read transcript for metadata: {e}")

if skipped_lines:
logger.warning(f"Skipped {skipped_lines} unparseable line(s) in metadata extraction from {transcript_path}")
logger.debug(f"Skipped {skipped_lines} unparseable line(s) in metadata extraction from {transcript_path}")

return TranscriptMetadata(agent_version=agent_version, model=model, git_branch=git_branch)

Expand Down Expand Up @@ -144,10 +144,10 @@ def analyze_transcript(self, transcript_path: Path) -> TokenUsage:
self._process_event(event, usage)

except OSError as e:
logger.warning(f"Failed to read transcript file {transcript_path}: {e}")
logger.debug(f"Failed to read transcript file {transcript_path}: {e}")

if skipped_lines:
logger.warning(f"Skipped {skipped_lines} unparseable line(s) in {transcript_path}")
logger.debug(f"Skipped {skipped_lines} unparseable line(s) in {transcript_path}")

usage.final_context_input_tokens = self._latest_raw_input_tokens
usage.subagent_tokens = usage.explore_subagent_tokens + usage.non_explore_subagent_tokens
Expand Down
Loading
Loading