From 62648e4c42bd0f1d770581bcaf30f5fca56ba8fb Mon Sep 17 00:00:00 2001 From: Luke Jones Date: Mon, 27 Apr 2026 19:50:06 +1200 Subject: [PATCH 1/5] fix(bridge): re-sync DocumentTracker on external file changes ensure_open now stats the file on every call and compares (mtime, size) against the tracked DocumentState. On mismatch we send didClose + bumped didOpen so the LSP server observes the new content; on match we keep the existing fast path. Fixes stale per-file tool results after edits made outside mcpls (git stash/checkout, the MCP host's own edit tools, formatters, code generators) for every configured LSP, including those that do not register workspace/didChangeWatchedFiles. Adds an integration test rooted in a tempdir copy of the rust_workspace fixture that primes a query, overwrites the file on disk, and asserts the next query reflects the new symbol set. ADR 0001 records the design. Refs #102. --- CHANGELOG.md | 1 + crates/mcpls-core/src/bridge/mod.rs | 2 +- crates/mcpls-core/src/bridge/state.rs | 247 ++++++++++++++++-- .../tests/integration/rust_analyzer_tests.rs | 132 ++++++++++ docs/adr/0001-external-file-changes.md | 70 +++++ 5 files changed, 429 insertions(+), 23 deletions(-) create mode 100644 docs/adr/0001-external-file-changes.md diff --git a/CHANGELOG.md b/CHANGELOG.md index 36171e7..2fc4618 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **LSP server requests** — Handle server-to-client requests such as `client/registerCapability`, fixing tsgo timeouts. - **Integration tests** — Add `[workspace]` table to `tests/fixtures/rust_workspace/Cargo.toml` so cargo treats the fixture as a standalone workspace; fixes 8 rust-analyzer integration tests that failed with "Failed to load workspaces." (#118) - **e2e coverage** — Add ra_e2e sub-cases for `get_signature_help`, `go_to_implementation`, `go_to_type_definition`, `get_inlay_hints` (4 LSP 3.17 tools from #124 had no coverage); add `list_resources`, `read_resource`, `subscribe_resource`, `unsubscribe_resource` to `McpClient` and ra_e2e_suite (MCP resources path was entirely untested) (#129, #130) +- **Stale results after external file changes** (#102, part 1) — `DocumentTracker::ensure_open` now stats the file on every call and re-syncs the document with the LSP server (via `textDocument/didClose` + bumped-version `textDocument/didOpen`) when the on-disk signature has changed. Fixes stale `get_hover`, `get_definition`, `get_references`, `get_document_symbols`, `get_diagnostics`, `get_completions`, `get_code_actions`, `format_document`, `rename_symbol`, and call-hierarchy results after edits made outside mcpls (`git stash`/`checkout`, the MCP host's own `Edit`/`Write` tools, formatters, code generators). Works for every configured LSP, including those that do not register `workspace/didChangeWatchedFiles`. ## [0.3.6] - 2026-04-21 diff --git a/crates/mcpls-core/src/bridge/mod.rs b/crates/mcpls-core/src/bridge/mod.rs index e6830bb..3a9fead 100644 --- a/crates/mcpls-core/src/bridge/mod.rs +++ b/crates/mcpls-core/src/bridge/mod.rs @@ -14,7 +14,7 @@ pub use notifications::{ DiagnosticInfo, LogEntry, LogLevel, MessageType, NotificationCache, ServerMessage, }; pub use resources::ResourceSubscriptions; -pub use state::{DocumentState, DocumentTracker, path_to_uri, uri_to_path}; +pub use state::{DocumentState, DocumentTracker, SyncSignature, path_to_uri, uri_to_path}; pub use translator::{ Completion, CompletionsResult, DefinitionResult, Diagnostic, DiagnosticSeverity, DiagnosticsResult, DocumentChanges, DocumentSymbolsResult, FormatDocumentResult, HoverResult, diff --git a/crates/mcpls-core/src/bridge/state.rs b/crates/mcpls-core/src/bridge/state.rs index e448309..71a161e 100644 --- a/crates/mcpls-core/src/bridge/state.rs +++ b/crates/mcpls-core/src/bridge/state.rs @@ -4,8 +4,12 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::time::SystemTime; -use lsp_types::{DidOpenTextDocumentParams, TextDocumentItem, Uri}; +use lsp_types::{ + DidCloseTextDocumentParams, DidOpenTextDocumentParams, TextDocumentIdentifier, + TextDocumentItem, Uri, +}; use url::Url; use crate::error::{Error, Result}; @@ -22,6 +26,36 @@ pub struct DocumentState { pub version: i32, /// Document content. pub content: String, + /// Filesystem signature captured when this state was last synced from disk. + /// + /// `(mtime, size)` together act as a freshness key: a mismatch on either + /// means the on-disk file has changed since the LSP server was last told + /// about it, and the document must be re-synced. `mtime` is `None` on + /// platforms or filesystems that do not expose a modification time. + pub synced_signature: SyncSignature, +} + +/// Filesystem signature used to detect external file changes. +/// +/// Pairing modification time with size avoids false negatives on filesystems +/// with low mtime resolution where two writes within the same tick can leave +/// mtime unchanged while content (and therefore size) differs. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct SyncSignature { + /// Last modification time, if available. + pub mtime: Option, + /// File size in bytes. + pub size: u64, +} + +impl SyncSignature { + /// Signature used before a file has been stat'd. Will not compare equal to + /// any real on-disk signature, forcing the first `ensure_open` call to + /// take the sync path. + pub const UNKNOWN: Self = Self { + mtime: None, + size: u64::MAX, + }; } /// Resource limits for document tracking. @@ -123,6 +157,7 @@ impl DocumentTracker { language_id, version: 1, content, + synced_signature: SyncSignature::UNKNOWN, }; self.documents.insert(path, state); @@ -142,6 +177,20 @@ impl DocumentTracker { } } + /// Record the on-disk signature for a tracked document. + /// + /// Used by `ensure_open` after reading or re-reading the file from disk so + /// future calls can short-circuit when the signature is unchanged. Returns + /// `false` if the document is not tracked. + pub fn set_synced_signature(&mut self, path: &Path, signature: SyncSignature) -> bool { + if let Some(state) = self.documents.get_mut(path) { + state.synced_signature = signature; + true + } else { + false + } + } + /// Close a document and remove it from tracking. /// /// Returns the document state if it was open. @@ -159,20 +208,38 @@ impl DocumentTracker { self.documents.keys().map(PathBuf::as_path) } - /// Ensure a document is open, opening it lazily if necessary. + /// Forget a tracked document without notifying the LSP server. + /// + /// Used by external-change detection paths (filesystem watchers) so that + /// the next `ensure_open` call observes a signature mismatch and re-syncs + /// the document. Callers that need to inform the server should additionally + /// send `textDocument/didClose` themselves. + pub fn invalidate(&mut self, path: &Path) -> Option { + self.documents.remove(path) + } + + /// Ensure a document is open and in sync with the on-disk file. /// - /// If the document is already open, returns its URI immediately. - /// Otherwise, reads the file from disk, opens it in the tracker, - /// and sends a `didOpen` notification to the LSP server. + /// Stats the file on every call and compares the result against the + /// tracked `DocumentState`'s signature. If the signature matches, returns + /// the cached URI. If it differs (external edit, git checkout, formatter, + /// etc.) the file is re-read, the LSP server is sent a + /// `textDocument/didClose` followed by a `textDocument/didOpen` with a + /// bumped version, and the cached state is replaced. If the document is + /// not tracked at all, it is opened for the first time. /// /// # Errors /// /// Returns an error if: - /// - The file cannot be read from disk - /// - The `didOpen` notification fails to send + /// - The file cannot be stat'd or read from disk + /// - The `didClose`/`didOpen` notification fails to send /// - Resource limits are exceeded pub async fn ensure_open(&mut self, path: &Path, lsp_client: &LspClient) -> Result { - if let Some(state) = self.documents.get(path) { + let signature = stat_signature(path).await?; + + if let Some(state) = self.documents.get(path) + && state.synced_signature == signature + { return Ok(state.uri.clone()); } @@ -183,27 +250,79 @@ impl DocumentTracker { source: e, })?; + if let Some(existing) = self.documents.get(path) { + let close_params = DidCloseTextDocumentParams { + text_document: TextDocumentIdentifier { + uri: existing.uri.clone(), + }, + }; + lsp_client + .notify("textDocument/didClose", close_params) + .await?; + // Bump the version on resync so the server sees the reopened + // document as a strictly newer state. + let new_version = existing.version.saturating_add(1); + let language_id = existing.language_id.clone(); + let uri = existing.uri.clone(); + if let Some(state) = self.documents.get_mut(path) { + state.version = new_version; + state.content.clone_from(&content); + state.synced_signature = signature; + } + send_did_open(lsp_client, &uri, &language_id, new_version, content).await?; + return Ok(uri); + } + let uri = self.open(path.to_path_buf(), content.clone())?; - let state = self + // Record the signature now that the document is tracked; if the file + // is replaced before the next access, the next ensure_open will see a + // mismatch and re-sync. + self.set_synced_signature(path, signature); + let language_id = self .documents .get(path) - .ok_or_else(|| Error::DocumentNotFound(path.to_path_buf()))?; - - let params = DidOpenTextDocumentParams { - text_document: TextDocumentItem { - uri: uri.clone(), - language_id: state.language_id.clone(), - version: state.version, - text: content, - }, - }; - - lsp_client.notify("textDocument/didOpen", params).await?; - + .ok_or_else(|| Error::DocumentNotFound(path.to_path_buf()))? + .language_id + .clone(); + send_did_open(lsp_client, &uri, &language_id, 1, content).await?; Ok(uri) } } +/// Stat a file and produce its sync signature. +/// +/// `mtime` falls back to `None` when the platform does not expose it; in that +/// case the signature collapses to a size-only comparison. +async fn stat_signature(path: &Path) -> Result { + let metadata = tokio::fs::metadata(path).await.map_err(|e| Error::FileIo { + path: path.to_path_buf(), + source: e, + })?; + Ok(SyncSignature { + mtime: metadata.modified().ok(), + size: metadata.len(), + }) +} + +/// Send a `textDocument/didOpen` notification with the given content. +async fn send_did_open( + lsp_client: &LspClient, + uri: &Uri, + language_id: &str, + version: i32, + text: String, +) -> Result<()> { + let params = DidOpenTextDocumentParams { + text_document: TextDocumentItem { + uri: uri.clone(), + language_id: language_id.to_string(), + version, + text, + }, + }; + lsp_client.notify("textDocument/didOpen", params).await +} + /// Convert a file path to a URI. /// /// # Panics @@ -402,6 +521,7 @@ mod tests { language_id: "rust".to_string(), version: 5, content: "fn main() {}".to_string(), + synced_signature: SyncSignature::UNKNOWN, }; #[allow(clippy::redundant_clone)] @@ -410,6 +530,7 @@ mod tests { assert_eq!(cloned.language_id, state.language_id); assert_eq!(cloned.version, 5); assert_eq!(cloned.content, state.content); + assert_eq!(cloned.synced_signature, state.synced_signature); } #[test] @@ -900,4 +1021,86 @@ mod tests { tracker.close(Path::new("/a.rs")); assert_eq!(tracker.open_paths().count(), 0); } + + // ------------------------------------------------------------------ + // SyncSignature / external-change detection + // ------------------------------------------------------------------ + + #[test] + fn test_sync_signature_unknown_does_not_match_real() { + let real = SyncSignature { + mtime: Some(SystemTime::UNIX_EPOCH), + size: 0, + }; + assert_ne!(SyncSignature::UNKNOWN, real); + } + + #[test] + fn test_sync_signature_size_change_detected() { + let now = SystemTime::now(); + let a = SyncSignature { + mtime: Some(now), + size: 100, + }; + let b = SyncSignature { + mtime: Some(now), + size: 101, + }; + assert_ne!(a, b, "size change must defeat equality even if mtime ties"); + } + + #[test] + fn test_set_synced_signature_records_value() { + let mut tracker = DocumentTracker::new(ResourceLimits::default(), HashMap::new()); + let path = PathBuf::from("/test/sig.rs"); + tracker + .open(path.clone(), "fn main() {}".to_string()) + .unwrap(); + + let signature = SyncSignature { + mtime: Some(SystemTime::now()), + size: 12, + }; + assert!(tracker.set_synced_signature(&path, signature)); + assert_eq!(tracker.get(&path).unwrap().synced_signature, signature); + } + + #[test] + fn test_set_synced_signature_returns_false_for_unknown_path() { + let mut tracker = DocumentTracker::new(ResourceLimits::default(), HashMap::new()); + let signature = SyncSignature { + mtime: None, + size: 0, + }; + assert!(!tracker.set_synced_signature(Path::new("/nope.rs"), signature)); + } + + #[test] + fn test_invalidate_removes_state() { + let mut tracker = DocumentTracker::new(ResourceLimits::default(), HashMap::new()); + let path = PathBuf::from("/test/inv.rs"); + tracker.open(path.clone(), "x".to_string()).unwrap(); + assert!(tracker.is_open(&path)); + + let removed = tracker.invalidate(&path); + assert!(removed.is_some()); + assert!(!tracker.is_open(&path)); + } + + #[tokio::test] + async fn test_stat_signature_changes_when_file_grows() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("probe.txt"); + tokio::fs::write(&path, b"v1").await.unwrap(); + + let first = stat_signature(&path).await.unwrap(); + + // Sleep briefly so mtime resolution on coarse filesystems can advance. + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + tokio::fs::write(&path, b"v2-longer-content").await.unwrap(); + + let second = stat_signature(&path).await.unwrap(); + assert_ne!(first, second, "signature must change after rewrite"); + assert_ne!(first.size, second.size); + } } diff --git a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs index 58749ad..f84f561 100644 --- a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs +++ b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs @@ -903,3 +903,135 @@ async fn test_workspace_symbol_search_function() { } } } + +/// Regression test for issue #102: external file changes must invalidate the +/// document tracker so the next request reflects on-disk truth. +/// +/// Sequence: +/// 1. Copy the `rust_workspace` fixture to a tempdir. +/// 2. Spawn rust-analyzer rooted at the copy. +/// 3. Query document symbols in `src/types.rs` to prime the tracker. +/// 4. Overwrite `src/types.rs` on disk with a new symbol set, then query +/// again. The result must reflect the new symbols. +#[tokio::test] +#[ignore = "Requires rust-analyzer installed"] +async fn test_ensure_open_resyncs_after_external_edit() { + use std::collections::HashMap; + + use mcpls_core::config::LspServerConfig; + + if !rust_analyzer_available() { + eprintln!("Skipping: rust-analyzer not available"); + return; + } + + init_tracing(); + + let tempdir = tempfile::tempdir().expect("create tempdir"); + copy_dir_recursive(&rust_workspace_path(), tempdir.path()).expect("copy fixture"); + let workspace_path = tempdir.path().to_path_buf(); + + let lsp_config = LspServerConfig { + language_id: "rust".to_string(), + command: "rust-analyzer".to_string(), + args: vec![], + env: HashMap::new(), + file_patterns: vec!["**/*.rs".to_string()], + initialization_options: None, + timeout_seconds: 30, + heuristics: None, + }; + let server_init_config = ServerInitConfig { + server_config: lsp_config, + workspace_roots: vec![workspace_path.clone()], + initialization_options: None, + }; + + let server = LspServer::spawn(server_init_config) + .await + .expect("spawn rust-analyzer"); + let client = server.client().clone(); + + let extension_map = { + let mut m = HashMap::new(); + m.insert("rs".to_string(), "rust".to_string()); + m + }; + let mut translator = Translator::new().with_extensions(extension_map); + translator.set_workspace_roots(vec![workspace_path.clone()]); + translator.register_client("rust".to_string(), client); + translator.register_server("rust".to_string(), server); + + let translator = Arc::new(Mutex::new(translator)); + + tokio::time::sleep(Duration::from_secs(3)).await; + + let types_path = workspace_path.join("src/types.rs"); + let types_path_str = types_path.to_string_lossy().to_string(); + + // Prime: first query reads V1 from disk and pins it in the tracker. + let v1 = timeout( + Duration::from_secs(10), + translator + .lock() + .await + .handle_document_symbols(types_path_str.clone()), + ) + .await + .expect("v1 timed out") + .expect("v1 errored"); + + let v1_names: Vec = v1.symbols.iter().map(|s| s.name.clone()).collect(); + assert!( + v1_names.iter().any(|n| n == "Repository"), + "expected Repository in V1 symbols, got {v1_names:?}" + ); + + // External edit: replace the file on disk with new content and a new + // symbol set, mimicking git stash / external editor / formatter. + tokio::time::sleep(Duration::from_millis(20)).await; + let v2_source = "//! V2 — externally rewritten.\n\ + pub struct ResyncedMarker;\n\ + impl ResyncedMarker {\n pub fn new() -> Self { Self }\n}\n"; + tokio::fs::write(&types_path, v2_source) + .await + .expect("write V2"); + + let v2 = timeout( + Duration::from_secs(10), + translator + .lock() + .await + .handle_document_symbols(types_path_str.clone()), + ) + .await + .expect("v2 timed out") + .expect("v2 errored"); + + let v2_names: Vec = v2.symbols.iter().map(|s| s.name.clone()).collect(); + assert!( + v2_names.iter().any(|n| n == "ResyncedMarker"), + "expected ResyncedMarker after external edit, got {v2_names:?}" + ); + assert!( + !v2_names.iter().any(|n| n == "Repository"), + "Repository should be gone after external edit, got {v2_names:?}" + ); +} + +/// Recursively copy a directory tree. Used to give each test that mutates +/// fixture files a private working copy. +fn copy_dir_recursive(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> { + std::fs::create_dir_all(dst)?; + for entry in std::fs::read_dir(src)? { + let entry = entry?; + let file_type = entry.file_type()?; + let dst_path = dst.join(entry.file_name()); + if file_type.is_dir() { + copy_dir_recursive(&entry.path(), &dst_path)?; + } else { + std::fs::copy(entry.path(), &dst_path)?; + } + } + Ok(()) +} diff --git a/docs/adr/0001-external-file-changes.md b/docs/adr/0001-external-file-changes.md new file mode 100644 index 0000000..5897bb6 --- /dev/null +++ b/docs/adr/0001-external-file-changes.md @@ -0,0 +1,70 @@ +# ADR 0001 — Handling external file changes + +## Status + +Accepted (2026-04-27). + +## Context + +`DocumentTracker::ensure_open` reads each file from disk exactly once per +session and never re-syncs, so any modification made outside mcpls — git +operations, the MCP host's own edit tools, formatters, code generators — +is invisible to both mcpls and the underlying LSP server until the +process is restarted. This produces stale answers from every per-file +tool (`get_hover`, `get_definition`, `get_references`, +`get_document_symbols`, `get_diagnostics`, `get_completions`, +`get_code_actions`, `format_document`, `rename_symbol`, the call +hierarchy tools) and from `workspace_symbol_search`. See issue #102 for +the verified repro. + +## Decision + +Two complementary changes: + +1. **Stat-on-access in `ensure_open`** (mcpls-side, applies to every + server). On every call we stat the file and compare `(mtime, size)` + against the tracked `DocumentState`. On mismatch we send + `textDocument/didClose` followed by `textDocument/didOpen` with a + bumped version, replacing the cached state. This fixes every per-file + tool against every LSP server, including those that do not register + `workspace/didChangeWatchedFiles` (e.g. zls). + +2. **`workspace/didChangeWatchedFiles`** (LSP-side, eager). For LSP + servers that dynamically register file watchers via + `client/registerCapability`, mcpls now declares + `workspace.didChangeWatchedFiles.dynamic_registration: true`, + handles the inbound registration request, runs a `notify`-based + filesystem watcher per server, and forwards matching events as + `workspace/didChangeWatchedFiles` notifications. The watcher also + invalidates the `DocumentTracker` entry for any affected path so + that the next `ensure_open` re-syncs (composes cleanly with #1). + +Manual `reload_workspace` is intentionally out of scope here. + +## Consequences + +- Every per-file MCP tool now reflects on-disk truth without restart. +- For watcher-registering servers (rust-analyzer, gopls, pyright, + typescript-language-server, clangd) the LSP's workspace index also + stays live, fixing `workspace_symbol_search` staleness. +- The transport gains an `InboundMessage::Request` variant and the + client gains a small server-to-client request dispatcher, which is + also useful for future protocol features (`workspace/configuration`, + work-done progress, etc.). +- New runtime dependency: `notify`. Watchers run in a blocking thread + bridged to tokio via `std::sync::mpsc` → `tokio::sync::mpsc`. +- Watcher failure (e.g. inotify exhaustion) is logged and the server + continues; #1 still covers per-file freshness in that case. + +## Alternatives considered + +- **Synthesise `textDocument/didChange` with full-content edits** instead + of close+reopen on resync. Rejected: more complex, requires accurate + range/version tracking we do not otherwise need, and the close+reopen + pair is what rust-analyzer's own VS Code client does on external + changes when it cannot prove edits are local. +- **`notify-debouncer-full`** instead of raw `notify`. Rejected for + simplicity; we run a small in-process coalescer and do not need + rename correlation. +- **A `reload_workspace` MCP tool** as a manual escape hatch. Useful but + orthogonal; not implemented here. From 5c1f4f923cc5f69d55b8acae20511163b1338939 Mon Sep 17 00:00:00 2001 From: Luke Jones Date: Mon, 27 Apr 2026 20:20:37 +1200 Subject: [PATCH 2/5] feat(lsp): forward workspace/didChangeWatchedFiles to LSP servers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit mcpls now declares workspace.didChangeWatchedFiles.dynamic_registration and relative_pattern_support, handles inbound client/registerCapability and client/unregisterCapability requests, and runs a per-server notify filesystem watcher that forwards matched events as workspace/didChangeWatchedFiles. This keeps the LSP server's workspace index live across external file changes for servers that register watchers (rust-analyzer, gopls, pyright, typescript-language-server, clangd). Files mcpls has never opened are now reflected in workspace search and analysis without restarting the MCP server. Composition with the stat-on-access change in the previous commit: the watcher does not invalidate the document tracker; A's stat path already covers tracked-document freshness, and C is responsible for the workspace-wide view. Other changes: - Transport: the previous classifier mishandled server-to-client requests (any message with an `id` was treated as a response). Add `InboundMessage::Request` and dispatch on (id, method) presence. - Client: add `ClientCommand::SendResponse` and a `ServerRequest` type so the message loop can forward inbound requests to a registered handler and reply with the result. Unhandled methods receive a `MethodNotFound` error so servers do not block. - Watcher: filter `.git`, `target`, `node_modules`, `.cache` before glob matching to avoid drowning in build noise; coalesce events over a 100ms debounce window. Watcher startup failure (e.g. inotify exhaustion) is logged and non-fatal — A's stat path still covers per-file freshness. ADR 0001 is updated to record the final design. Refs #102. --- CHANGELOG.md | 1 + Cargo.lock | 175 +++++- Cargo.toml | 2 + crates/mcpls-core/Cargo.toml | 2 + crates/mcpls-core/src/lsp/client.rs | 181 ++++-- crates/mcpls-core/src/lsp/file_watcher.rs | 556 ++++++++++++++++++ crates/mcpls-core/src/lsp/lifecycle.rs | 196 +++++- crates/mcpls-core/src/lsp/mod.rs | 8 +- crates/mcpls-core/src/lsp/types.rs | 7 +- .../tests/integration/rust_analyzer_tests.rs | 65 ++ docs/adr/0001-external-file-changes.md | 9 +- 11 files changed, 1131 insertions(+), 71 deletions(-) create mode 100644 crates/mcpls-core/src/lsp/file_watcher.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fc4618..95340ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **RFC-3986 URI codec** — `bridge::resources` module with percent-encoding via `url::Url::from_file_path`; empty-authority injection is rejected to prevent UNC-path attacks on Windows - **Subscription cap** — `ResourceSubscriptions` enforces a `MAX_SUBSCRIPTIONS = 1_000` limit per session to guard against memory exhaustion - **MCP tools** — `get_signature_help` (`textDocument/signatureHelp`), `go_to_implementation` (`textDocument/implementation`), `go_to_type_definition` (`textDocument/typeDefinition`), and `get_inlay_hints` (`textDocument/inlayHint`) tools exposing LSP 3.6/3.15/3.17 capabilities (#116) +- **`workspace/didChangeWatchedFiles` support** (#102, part 2) — mcpls now declares `workspace.didChangeWatchedFiles.dynamic_registration: true` and `relative_pattern_support: true`, handles inbound `client/registerCapability` and `client/unregisterCapability` requests, and runs a per-server filesystem watcher that forwards matched events as `workspace/didChangeWatchedFiles`. This keeps the LSP server's *workspace index* live across external file changes (files mcpls has never opened) for servers that register watchers (rust-analyzer, gopls, pyright, typescript-language-server, clangd). Builds on a new `notify` dependency. ### Changed diff --git a/Cargo.lock b/Cargo.lock index b8fd1f8..4fd88bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,7 +56,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -67,7 +67,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -392,7 +392,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -425,7 +425,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -473,6 +473,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.32" @@ -879,6 +888,26 @@ dependencies = [ "serde_core", ] +[[package]] +name = "inotify" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd5b3eaf1a28b758ac0faa5a4254e8ab2705605496f1b1f3fbbc3988ad73d199" +dependencies = [ + "bitflags 2.11.1", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -901,6 +930,26 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -1008,8 +1057,10 @@ dependencies = [ "chrono", "dirs", "futures", + "globset", "ignore", "lsp-types", + "notify", "rmcp", "rstest", "schemars", @@ -1044,8 +1095,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", + "log", "wasi", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1054,13 +1106,40 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.11.1", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-types" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42b8cfee0e339a0337359f3c88165702ac6e600dc01c0cc9579a92d62b08477a" +dependencies = [ + "bitflags 2.11.1", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1397,7 +1476,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1600,7 +1679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1666,7 +1745,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -1728,7 +1807,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -2102,7 +2181,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys", + "windows-sys 0.61.2", ] [[package]] @@ -2164,6 +2243,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.60.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" +dependencies = [ + "windows-targets", +] + [[package]] name = "windows-sys" version = "0.61.2" @@ -2173,6 +2261,71 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.53.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3" +dependencies = [ + "windows-link", + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" + +[[package]] +name = "windows_i686_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" + +[[package]] +name = "windows_i686_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.53.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" + [[package]] name = "winnow" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 1a15d3b..c1560a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,9 +23,11 @@ chrono = "0.4.44" clap = "4.6" dirs = "6.0" futures = "0.3" +globset = "0.4" ignore = "0.4" lsp-types = "0.97" mcpls-core = { path = "crates/mcpls-core", version = "0.3.6" } +notify = "8.0" predicates = "3.1" rmcp = "1.6.0" rstest = "0.26" diff --git a/crates/mcpls-core/Cargo.toml b/crates/mcpls-core/Cargo.toml index e50a329..32cea86 100644 --- a/crates/mcpls-core/Cargo.toml +++ b/crates/mcpls-core/Cargo.toml @@ -15,8 +15,10 @@ async-trait = { workspace = true } chrono = { workspace = true } dirs = { workspace = true } futures = { workspace = true } +globset = { workspace = true } ignore = { workspace = true } lsp-types = { workspace = true } +notify = { workspace = true } rmcp = { workspace = true, features = ["server", "transport-io", "macros"] } axum = { workspace = true, optional = true } tokio-util = { workspace = true, optional = true, features = ["rt"] } diff --git a/crates/mcpls-core/src/lsp/client.rs b/crates/mcpls-core/src/lsp/client.rs index cd4b2e5..8692e80 100644 --- a/crates/mcpls-core/src/lsp/client.rs +++ b/crates/mcpls-core/src/lsp/client.rs @@ -22,6 +22,9 @@ use crate::lsp::types::{ /// JSON-RPC protocol version. const JSONRPC_VERSION: &str = "2.0"; +/// JSON-RPC error code for "method not found". +const METHOD_NOT_FOUND: i32 = -32601; + /// Type alias for pending request tracking map. type PendingRequests = HashMap>>; @@ -78,10 +81,28 @@ enum ClientCommand { method: String, params: Option, }, + /// Send a response to a server-to-client request. + SendResponse { + id: RequestId, + result: std::result::Result, + }, /// Shutdown the client. Shutdown, } +/// A server-to-client request awaiting a response. +/// +/// Forwarded by the message loop when a registered handler channel is +/// configured. The receiver decides how to reply via the included `responder`. +#[derive(Debug)] +pub struct ServerRequest { + /// The original request (id, method, params). + pub request: JsonRpcRequest, + /// Channel to send the response back. The message loop turns the result + /// into either `result` or `error` on the wire. + pub responder: oneshot::Sender>, +} + impl LspClient { /// Create a new LSP client with the given configuration. /// @@ -105,39 +126,27 @@ impl LspClient { /// Create client from transport (for testing or custom spawning). /// - /// This method initializes the background message loop with the provided transport. + /// This method initializes the background message loop with the provided + /// transport. Production code uses `from_transport_with_channels`; this + /// thin wrapper exists so unit tests can construct a client without + /// optional plumbing. #[cfg(test)] pub(crate) fn from_transport(config: LspServerConfig, transport: LspTransport) -> Self { - let state = Arc::new(Mutex::new(super::ServerState::Initializing)); - let request_counter = Arc::new(AtomicI64::new(1)); - let pending_requests = Arc::new(Mutex::new(HashMap::new())); - - let (command_tx, command_rx) = mpsc::channel(100); - - let receiver_task = tokio::spawn(Self::message_loop( - transport, - command_rx, - pending_requests, - None, - )); - - Self { - config, - state, - request_counter, - command_tx, - receiver_task: Some(receiver_task), - } + Self::from_transport_with_channels(config, transport, None, None) } - /// Create client from transport with notification forwarding. + /// Create client from transport with both notification and server-request + /// channels. /// - /// Notifications received from the LSP server will be parsed and sent - /// through the provided channel. - pub(crate) fn from_transport_with_notifications( + /// `server_request_tx`, when `Some`, receives every server-to-client request + /// (e.g. `client/registerCapability`). The receiver is expected to reply + /// via the request's `responder` channel; if the channel is dropped the + /// message loop replies with a `MethodNotFound` error. + pub(crate) fn from_transport_with_channels( config: LspServerConfig, transport: LspTransport, - notification_tx: mpsc::Sender, + notification_tx: Option>, + server_request_tx: Option>, ) -> Self { let state = Arc::new(Mutex::new(super::ServerState::Initializing)); let request_counter = Arc::new(AtomicI64::new(1)); @@ -148,8 +157,10 @@ impl LspClient { let receiver_task = tokio::spawn(Self::message_loop( transport, command_rx, + command_tx.clone(), pending_requests, - Some(notification_tx), + notification_tx, + server_request_tx, )); Self { @@ -276,21 +287,25 @@ impl LspClient { /// Background task: handle message I/O. /// /// This task runs in the background, handling: - /// - Outbound requests and notifications - /// - Inbound responses and server notifications + /// - Outbound requests, notifications, and responses to server requests + /// - Inbound responses, notifications, and server-to-client requests /// - Matching responses to pending requests async fn message_loop( mut transport: LspTransport, mut command_rx: mpsc::Receiver, + command_tx: mpsc::Sender, pending_requests: Arc>, notification_tx: Option>, + server_request_tx: Option>, ) -> Result<()> { debug!("Message loop started"); let result = Self::message_loop_inner( &mut transport, &mut command_rx, + &command_tx, &pending_requests, notification_tx.as_ref(), + server_request_tx.as_ref(), ) .await; if let Err(ref e) = result { @@ -301,11 +316,14 @@ impl LspClient { result } + #[allow(clippy::too_many_lines)] async fn message_loop_inner( transport: &mut LspTransport, command_rx: &mut mpsc::Receiver, + command_tx: &mpsc::Sender, pending_requests: &Arc>, notification_tx: Option<&mpsc::Sender>, + server_request_tx: Option<&mpsc::Sender>, ) -> Result<()> { loop { tokio::select! { @@ -328,6 +346,21 @@ impl LspClient { }); transport.send(¬ification).await?; } + ClientCommand::SendResponse { id, result } => { + let value = match result { + Ok(value) => serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "result": value, + }), + Err(error) => serde_json::json!({ + "jsonrpc": "2.0", + "id": id, + "error": error, + }), + }; + transport.send(&value).await?; + } ClientCommand::Shutdown => { debug!("Client shutdown requested"); break; @@ -344,6 +377,60 @@ impl LspClient { } }; match message { + InboundMessage::Request(request) => { + debug!("Received server request: {} (id={:?})", request.method, request.id); + + let id = request.id.clone(); + let method = request.method.clone(); + + // Forward to a registered handler if configured; + // otherwise auto-respond with the canned reply + // table so the server does not block. + if let Some(tx) = server_request_tx { + let (resp_tx, resp_rx) = oneshot::channel(); + let server_request = ServerRequest { + request, + responder: resp_tx, + }; + if tx.try_send(server_request).is_err() { + warn!( + "Server-request channel full or closed; replying MethodNotFound to {}", + method + ); + Self::send_method_not_found(command_tx, id, &method).await; + continue; + } + let command_tx = command_tx.clone(); + tokio::spawn(async move { + let result = resp_rx.await.unwrap_or_else(|_| { + Err(JsonRpcError { + code: METHOD_NOT_FOUND, + message: format!( + "no handler responded for method '{method}'" + ), + data: None, + }) + }); + let _ = command_tx + .send(ClientCommand::SendResponse { id, result }) + .await; + }); + } else { + let response = Self::server_request_response(request); + let result = response + .result + .map_or_else(|| Err(response.error.unwrap_or_else(|| { + JsonRpcError { + code: METHOD_NOT_FOUND, + message: format!("method not found: {method}"), + data: None, + } + })), Ok); + let _ = command_tx + .send(ClientCommand::SendResponse { id, result }) + .await; + } + } InboundMessage::Response(response) => { trace!("Received response: id={:?}", response.id); @@ -373,15 +460,6 @@ impl LspClient { warn!("Received response for unknown request ID: {:?}", response.id); } } - InboundMessage::Request(request) => { - debug!( - "Received server request: {} (id={:?})", - request.method, request.id - ); - let response = Self::server_request_response(request); - let value = serde_json::to_value(&response)?; - transport.send(&value).await?; - } InboundMessage::Notification(notification) => { debug!("Received notification: {}", notification.method); @@ -415,6 +493,11 @@ impl LspClient { Ok(()) } + /// Build a stock auto-response for known server-to-client request methods. + /// + /// Used as the fallback when no `server_request_tx` is wired (tests, or + /// languages that don't need watcher dispatch) so the LSP server is not + /// left blocked waiting on a reply. fn server_request_response(request: JsonRpcRequest) -> JsonRpcResponse { match Self::server_request_result(&request.method, request.params.as_ref()) { Ok(result) => JsonRpcResponse { @@ -448,7 +531,7 @@ impl LspClient { "workspace/configuration" => Ok(Self::workspace_configuration_result(params)), "workspace/applyEdit" => Ok(serde_json::json!({ "applied": false })), _ => Err(JsonRpcError { - code: -32601, + code: METHOD_NOT_FOUND, message: format!("Unhandled server request: {method}"), data: None, }), @@ -463,6 +546,26 @@ impl LspClient { Value::Array(vec![Value::Null; item_count]) } + + /// Send a `MethodNotFound` response to a server-to-client request that no + /// handler is willing or able to satisfy. + async fn send_method_not_found( + command_tx: &mpsc::Sender, + id: RequestId, + method: &str, + ) { + let error = JsonRpcError { + code: METHOD_NOT_FOUND, + message: format!("method not found: {method}"), + data: None, + }; + let _ = command_tx + .send(ClientCommand::SendResponse { + id, + result: Err(error), + }) + .await; + } } #[cfg(test)] diff --git a/crates/mcpls-core/src/lsp/file_watcher.rs b/crates/mcpls-core/src/lsp/file_watcher.rs new file mode 100644 index 0000000..7fd18c2 --- /dev/null +++ b/crates/mcpls-core/src/lsp/file_watcher.rs @@ -0,0 +1,556 @@ +//! Filesystem watcher that drives `workspace/didChangeWatchedFiles`. +//! +//! When an LSP server dynamically registers for +//! `workspace/didChangeWatchedFiles` via `client/registerCapability`, mcpls +//! starts a [`notify`] watcher rooted at the configured workspace roots, +//! matches each filesystem event against the server's registered glob +//! patterns, and forwards the matches as `workspace/didChangeWatchedFiles` +//! notifications. +//! +//! The watcher is per-server (each LSP can register its own glob set) and +//! independent of the document tracker: stat-on-access in +//! `bridge::DocumentTracker::ensure_open` already keeps mcpls's own view of +//! tracked files in sync, so this watcher's only job is to keep the LSP +//! server's *workspace index* (files mcpls has not opened) live. + +use std::collections::HashMap; +use std::path::{Component, Path, PathBuf}; +use std::str::FromStr; +use std::sync::Arc; + +use globset::{Glob, GlobSet, GlobSetBuilder}; +use lsp_types::{ + DidChangeWatchedFilesParams, DidChangeWatchedFilesRegistrationOptions, FileChangeType, + FileEvent, GlobPattern, RelativePattern, Uri, WatchKind, +}; +use notify::event::{CreateKind, ModifyKind, RemoveKind}; +use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use tokio::sync::{Mutex, mpsc}; +use tokio::time::{Duration, Instant}; +use tracing::{debug, trace, warn}; + +use crate::error::{Error, Result}; +use crate::lsp::client::LspClient; + +/// How long to coalesce filesystem events before flushing them as a single +/// `workspace/didChangeWatchedFiles` notification. Tools like `cargo build` +/// can fire thousands of events per second under `target/`; without +/// debouncing we would flood the LSP server. +const DEBOUNCE_INTERVAL: Duration = Duration::from_millis(100); + +/// Sleep time when the event loop has nothing to flush. Used to keep the +/// `tokio::select!` ready branch armed without polling the OS unnecessarily. +const IDLE_SLEEP: Duration = Duration::from_secs(3600); + +/// Channel capacity for raw `notify` events. Sized generously because notify +/// itself does not back-pressure; if we lag, events are dropped on the floor. +const RAW_EVENT_CHANNEL_CAPACITY: usize = 1024; + +/// Path components that are almost always noise from a build perspective and +/// should never reach an LSP server. Filtered before glob matching to avoid +/// burning CPU on `target/` rewrites etc. Match by exact component name. +const NEVER_FORWARD_COMPONENTS: &[&str] = &[".git", "target", "node_modules", ".cache"]; + +/// A single watcher registration. +#[derive(Debug)] +struct WatcherRegistration { + /// The compiled glob set from the registration's `watchers` array. + globs: GlobSet, + /// Bitfield of LSP watch kinds we should forward. Default is all three + /// (Create | Change | Delete = 7). + kinds: WatchKind, +} + +/// Manages dynamic `workspace/didChangeWatchedFiles` registrations and a +/// shared `notify` watcher. +/// +/// Each registered ID maps to a compiled glob set; events are matched against +/// every active registration's globs and forwarded to the LSP server as a +/// single batched notification per debounce interval. +#[derive(Debug)] +pub struct FileWatcher { + inner: Arc>, +} + +#[derive(Debug)] +struct FileWatcherInner { + /// Workspace roots that the watcher is rooted at. Used to resolve relative + /// patterns and to filter incoming events to "inside the workspace". + workspace_roots: Vec, + /// Active registrations indexed by registration id. + registrations: HashMap, + /// The actual filesystem watcher. Held here so it lives as long as the + /// `FileWatcher` itself. + _watcher: RecommendedWatcher, +} + +impl FileWatcher { + /// Spawn a new watcher rooted at `workspace_roots` and forwarding matched + /// events to `lsp_client` as `workspace/didChangeWatchedFiles`. + /// + /// # Errors + /// + /// Returns an error if the underlying `notify` watcher cannot be created + /// or if any workspace root cannot be watched. Failure here should be + /// non-fatal at the caller (the `bridge` already covers per-file freshness + /// via stat-on-access); callers should log and continue. + pub fn spawn(workspace_roots: Vec, lsp_client: LspClient) -> Result { + // Canonicalize roots so glob matching against canonical event paths + // works even when the original path goes through symlinks (notably + // /var → /private/var on macOS, where notify reports canonical paths + // but the LSP server may have given us the unresolved root). + let workspace_roots: Vec = workspace_roots + .into_iter() + .map(|root| root.canonicalize().unwrap_or(root)) + .collect(); + + let (raw_tx, raw_rx) = std::sync::mpsc::sync_channel(RAW_EVENT_CHANNEL_CAPACITY); + + let mut watcher = notify::recommended_watcher(move |event| { + // Notify uses a blocking std mpsc; drop on full to avoid blocking + // the OS notify thread. + if let Err(e) = raw_tx.send(event) { + warn!("file watcher: dropping event, channel closed: {e}"); + } + }) + .map_err(|e| Error::Transport(format!("notify::recommended_watcher: {e}")))?; + + for root in &workspace_roots { + if let Err(e) = watcher.watch(root, RecursiveMode::Recursive) { + warn!("file watcher: failed to watch {}: {e}", root.display()); + } + } + + let inner = Arc::new(Mutex::new(FileWatcherInner { + workspace_roots, + registrations: HashMap::new(), + _watcher: watcher, + })); + + // Bridge the blocking std channel to a tokio channel. + let (event_tx, event_rx) = + mpsc::channel::>(RAW_EVENT_CHANNEL_CAPACITY); + std::thread::spawn(move || { + while let Ok(event) = raw_rx.recv() { + if event_tx.blocking_send(event).is_err() { + break; + } + } + }); + + let inner_for_loop = Arc::clone(&inner); + tokio::spawn(forward_events_loop(inner_for_loop, event_rx, lsp_client)); + + Ok(Self { inner }) + } + + /// Install a `workspace/didChangeWatchedFiles` registration. + /// + /// Each [`Registration`] from `client/registerCapability` whose `method` + /// is `workspace/didChangeWatchedFiles` should be passed here. Subsequent + /// filesystem events are matched against the new globs from the next + /// debounce flush onward. + /// + /// [`Registration`]: lsp_types::Registration + /// + /// # Errors + /// + /// Returns an error if `register_options` cannot be deserialized or if + /// any glob pattern fails to compile. + pub async fn register(&self, id: String, register_options: serde_json::Value) -> Result<()> { + let opts: DidChangeWatchedFilesRegistrationOptions = + serde_json::from_value(register_options).map_err(|e| { + Error::LspProtocolError(format!( + "invalid didChangeWatchedFiles register options: {e}" + )) + })?; + + let workspace_roots = { + let guard = self.inner.lock().await; + guard.workspace_roots.clone() + }; + + let mut builder = GlobSetBuilder::new(); + let mut combined_kinds: WatchKind = WatchKind::empty(); + + for fs_watcher in &opts.watchers { + for glob_str in resolve_pattern(&fs_watcher.glob_pattern, &workspace_roots) { + match Glob::new(&glob_str) { + Ok(glob) => { + builder.add(glob); + } + Err(e) => { + warn!( + "file watcher: ignoring uncompilable glob '{glob_str}' for registration {id}: {e}" + ); + } + } + } + combined_kinds |= fs_watcher + .kind + .unwrap_or(WatchKind::Create | WatchKind::Change | WatchKind::Delete); + } + + let globs = builder + .build() + .map_err(|e| Error::LspProtocolError(format!("globset build failed: {e}")))?; + + let watcher_count = opts.watchers.len(); + { + let mut guard = self.inner.lock().await; + guard.registrations.insert( + id.clone(), + WatcherRegistration { + globs, + kinds: combined_kinds, + }, + ); + } + + debug!( + "file watcher: registered {id} ({watcher_count} watchers, kinds={combined_kinds:?})" + ); + Ok(()) + } + + /// Remove a previously installed registration. + pub async fn unregister(&self, id: &str) { + let mut guard = self.inner.lock().await; + if guard.registrations.remove(id).is_some() { + debug!("file watcher: unregistered {id}"); + } + } + + /// Cheap clone of the watcher handle for use by request dispatchers. + /// Both handles share the same underlying state. + #[must_use] + pub fn clone_handle(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } +} + +/// Resolve an LSP glob pattern into one or more `globset`-compatible pattern +/// strings. Relative patterns are anchored to their base URI by prepending +/// the absolute path; bare string patterns are accepted as-is and are +/// effectively matched anywhere under the workspace. +fn resolve_pattern(pattern: &GlobPattern, workspace_roots: &[PathBuf]) -> Vec { + match pattern { + GlobPattern::String(s) => { + // Absolute patterns are used directly; bare patterns are anchored + // at every workspace root with `**/` already implicit in + // patterns like `**/*.rs`. + vec![s.clone()] + } + GlobPattern::Relative(rel) => relative_pattern_to_globs(rel, workspace_roots), + } +} + +/// Expand a `RelativePattern` into one absolute glob per matching workspace +/// root. +fn relative_pattern_to_globs(rel: &RelativePattern, workspace_roots: &[PathBuf]) -> Vec { + let Some(base_path) = base_uri_to_path(&rel.base_uri) else { + warn!("file watcher: dropping relative pattern with non-file base URI"); + return Vec::new(); + }; + + // Canonicalize so the resulting glob matches event paths reported by + // notify (which are canonical: e.g. `/var/folders/...` resolves to + // `/private/var/folders/...` on macOS). + let base_path = base_path.canonicalize().unwrap_or(base_path); + + // Some servers send a base URI matching exactly one of our workspace + // roots; others send a child path. Either way, build the absolute glob. + let pattern = format!("{}/{}", base_path.display(), rel.pattern); + + // If the base path is unrelated to any workspace root, still keep the + // pattern: notify is rooted at workspace_roots, so events under unrelated + // paths simply will not match. + if !workspace_roots.is_empty() + && !workspace_roots + .iter() + .any(|root| base_path.starts_with(root) || root.starts_with(&base_path)) + { + trace!( + "file watcher: relative pattern base {} is outside all workspace roots", + base_path.display() + ); + } + + vec![pattern] +} + +/// Resolve an LSP `BaseUri` (workspace folder or absolute URI) to a filesystem +/// path. Returns `None` if the URI does not have a `file://` scheme. +fn base_uri_to_path(base: &lsp_types::OneOf) -> Option { + let uri = match base { + lsp_types::OneOf::Left(folder) => &folder.uri, + lsp_types::OneOf::Right(uri) => uri, + }; + uri_to_path(uri) +} + +fn uri_to_path(uri: &Uri) -> Option { + let s = uri.as_str(); + let rest = s.strip_prefix("file://")?; + // Handle Windows "file:///C:/..." form. + #[cfg(windows)] + let rest = rest.strip_prefix('/').unwrap_or(rest); + Some(PathBuf::from(rest)) +} + +fn path_to_uri(path: &Path) -> Option { + let s = path.to_str()?; + let uri_str = if cfg!(windows) { + format!("file:///{}", s.replace('\\', "/")) + } else { + format!("file://{s}") + }; + Uri::from_str(&uri_str).ok() +} + +/// Tokio task: pull raw notify events, match, debounce, and forward. +async fn forward_events_loop( + inner: Arc>, + mut event_rx: mpsc::Receiver>, + lsp_client: LspClient, +) { + let mut pending: HashMap = HashMap::new(); + let mut deadline: Option = None; + + loop { + let timeout = deadline.map_or(IDLE_SLEEP, |d| d.saturating_duration_since(Instant::now())); + + tokio::select! { + maybe_event = event_rx.recv() => { + let Some(event) = maybe_event else { break }; + let event = match event { + Ok(ev) => ev, + Err(e) => { + warn!("file watcher: notify error: {e}"); + continue; + } + }; + merge_event(&mut pending, &event); + if !pending.is_empty() && deadline.is_none() { + deadline = Some(Instant::now() + DEBOUNCE_INTERVAL); + } + } + () = tokio::time::sleep(timeout), if deadline.is_some() => { + deadline = None; + if pending.is_empty() { + continue; + } + let drained: Vec<(PathBuf, FileChangeType)> = pending.drain().collect(); + flush_pending(&inner, &lsp_client, drained).await; + } + } + } + debug!("file watcher: event-forward loop exiting"); +} + +/// Fold a single notify event into `pending`. The same path may legitimately +/// appear with multiple types in one debounce window (e.g. a quick +/// create-then-modify); the LSP spec is ambiguous so we keep the latest type. +fn merge_event(pending: &mut HashMap, event: ¬ify::Event) { + let Some(typ) = notify_kind_to_lsp(&event.kind) else { + return; + }; + for path in &event.paths { + if NEVER_FORWARD_COMPONENTS.iter().any(|skip| { + path.components() + .any(|c| matches!(c, Component::Normal(s) if s == *skip)) + }) { + continue; + } + pending.insert(path.clone(), typ); + } +} + +/// Translate a `notify::EventKind` into an LSP `FileChangeType`. Returns +/// `None` for events we deliberately do not forward (e.g. metadata-only +/// `Modify(Metadata)` changes that do not affect file content). +#[allow(clippy::trivially_copy_pass_by_ref)] // notify::EventKind is large; clippy mis-sizes it +#[allow(clippy::missing_const_for_fn)] // pattern matching on EventKind variants is not stable in const +fn notify_kind_to_lsp(kind: &EventKind) -> Option { + match kind { + EventKind::Create(CreateKind::File | CreateKind::Folder | CreateKind::Any) => { + Some(FileChangeType::CREATED) + } + EventKind::Modify(ModifyKind::Data(_) | ModifyKind::Name(_) | ModifyKind::Any) => { + Some(FileChangeType::CHANGED) + } + EventKind::Remove(RemoveKind::File | RemoveKind::Folder | RemoveKind::Any) => { + Some(FileChangeType::DELETED) + } + // Metadata-only changes, access events, and unknown kinds are ignored. + _ => None, + } +} + +/// Match the drained event set against active registrations and send the +/// resulting `workspace/didChangeWatchedFiles` notification, if any matches. +async fn flush_pending( + inner: &Arc>, + lsp_client: &LspClient, + pending: Vec<(PathBuf, FileChangeType)>, +) { + let changes = { + let guard = inner.lock().await; + if guard.registrations.is_empty() { + return; + } + compute_changes(&guard.registrations, pending) + }; + + if changes.is_empty() { + return; + } + + let params = DidChangeWatchedFilesParams { changes }; + if let Err(e) = lsp_client + .notify("workspace/didChangeWatchedFiles", params) + .await + { + warn!("file watcher: failed to send didChangeWatchedFiles: {e}"); + } +} + +/// Pure helper: match the pending events against the active registrations +/// and translate hits into LSP `FileEvent`s. +fn compute_changes( + registrations: &HashMap, + pending: Vec<(PathBuf, FileChangeType)>, +) -> Vec { + let mut changes: Vec = Vec::new(); + for (path, typ) in pending { + let matched = registrations + .values() + .any(|r| registration_accepts(r, typ) && r.globs.is_match(&path)); + if !matched { + continue; + } + let Some(uri) = path_to_uri(&path) else { + continue; + }; + changes.push(FileEvent { uri, typ }); + } + changes +} + +/// Whether the registration accepted change kind `typ`. The kind bitmask in +/// LSP defaults to all three types when unset. +fn registration_accepts(registration: &WatcherRegistration, typ: FileChangeType) -> bool { + let want = if registration.kinds.is_empty() { + WatchKind::Create | WatchKind::Change | WatchKind::Delete + } else { + registration.kinds + }; + match typ { + FileChangeType::CREATED => want.contains(WatchKind::Create), + FileChangeType::CHANGED => want.contains(WatchKind::Change), + FileChangeType::DELETED => want.contains(WatchKind::Delete), + // FileChangeType is a transparent newtype around i32; unknown values + // are forwarded as no-op rejections. + _ => false, + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used)] +mod tests { + use super::*; + + #[test] + fn test_notify_kind_to_lsp_known_variants() { + assert_eq!( + notify_kind_to_lsp(&EventKind::Create(CreateKind::File)), + Some(FileChangeType::CREATED) + ); + assert_eq!( + notify_kind_to_lsp(&EventKind::Modify(ModifyKind::Data( + notify::event::DataChange::Content + ))), + Some(FileChangeType::CHANGED) + ); + assert_eq!( + notify_kind_to_lsp(&EventKind::Remove(RemoveKind::File)), + Some(FileChangeType::DELETED) + ); + } + + #[test] + fn test_notify_kind_to_lsp_ignores_metadata() { + assert_eq!( + notify_kind_to_lsp(&EventKind::Modify(ModifyKind::Metadata( + notify::event::MetadataKind::Permissions + ))), + None + ); + assert_eq!( + notify_kind_to_lsp(&EventKind::Access(notify::event::AccessKind::Any)), + None + ); + } + + #[test] + fn test_merge_event_skips_target_dir() { + let mut pending: HashMap = HashMap::new(); + let evt = notify::Event { + kind: EventKind::Modify(ModifyKind::Data(notify::event::DataChange::Content)), + paths: vec![PathBuf::from("/repo/target/debug/foo.rs")], + attrs: notify::event::EventAttributes::new(), + }; + merge_event(&mut pending, &evt); + assert!(pending.is_empty(), "events under target/ must be filtered"); + } + + #[test] + fn test_merge_event_keeps_latest_type_per_path() { + let mut pending: HashMap = HashMap::new(); + let path = PathBuf::from("/repo/src/lib.rs"); + let create = notify::Event { + kind: EventKind::Create(CreateKind::File), + paths: vec![path.clone()], + attrs: notify::event::EventAttributes::new(), + }; + let modify = notify::Event { + kind: EventKind::Modify(ModifyKind::Data(notify::event::DataChange::Content)), + paths: vec![path.clone()], + attrs: notify::event::EventAttributes::new(), + }; + merge_event(&mut pending, &create); + merge_event(&mut pending, &modify); + assert_eq!(pending.get(&path), Some(&FileChangeType::CHANGED)); + } + + #[test] + fn test_uri_round_trip() { + let path = PathBuf::from("/tmp/example/file.rs"); + let uri = path_to_uri(&path).unwrap(); + assert_eq!(uri_to_path(&uri).unwrap(), path); + } + + #[test] + fn test_registration_accepts_default_kind() { + let reg = WatcherRegistration { + globs: GlobSetBuilder::new().build().unwrap(), + kinds: WatchKind::empty(), + }; + assert!(registration_accepts(®, FileChangeType::CREATED)); + assert!(registration_accepts(®, FileChangeType::CHANGED)); + assert!(registration_accepts(®, FileChangeType::DELETED)); + } + + #[test] + fn test_registration_accepts_explicit_kind() { + let reg = WatcherRegistration { + globs: GlobSetBuilder::new().build().unwrap(), + kinds: WatchKind::Change, + }; + assert!(!registration_accepts(®, FileChangeType::CREATED)); + assert!(registration_accepts(®, FileChangeType::CHANGED)); + assert!(!registration_accepts(®, FileChangeType::DELETED)); + } +} diff --git a/crates/mcpls-core/src/lsp/lifecycle.rs b/crates/mcpls-core/src/lsp/lifecycle.rs index 862fc02..0bbb6cc 100644 --- a/crates/mcpls-core/src/lsp/lifecycle.rs +++ b/crates/mcpls-core/src/lsp/lifecycle.rs @@ -18,14 +18,22 @@ use lsp_types::{ }; use tokio::process::Command; use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio::time::Duration; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use crate::config::LspServerConfig; use crate::error::{Error, Result, ServerSpawnFailure}; -use crate::lsp::client::LspClient; +use crate::lsp::client::{LspClient, ServerRequest}; +use crate::lsp::file_watcher::FileWatcher; use crate::lsp::transport::LspTransport; -use crate::lsp::types::LspNotification; +use crate::lsp::types::{JsonRpcError, LspNotification}; + +/// JSON-RPC error code returned for server-to-client requests we do not handle. +const METHOD_NOT_FOUND: i32 = -32601; + +/// Channel capacity for inbound server-to-client requests. +const SERVER_REQUEST_CHANNEL_CAPACITY: usize = 32; /// State of an LSP server connection. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -181,9 +189,20 @@ pub struct LspServer { /// Extract this before registering the server to receive real-time /// notifications (e.g., `textDocument/publishDiagnostics`, `$/progress`). pub notification_rx: mpsc::Receiver, + /// Filesystem watcher backing `workspace/didChangeWatchedFiles`. `None` + /// when the watcher could not be started; the server still works, just + /// without eager external-change forwarding. Held for lifetime only. + #[allow(dead_code)] + file_watcher: Option, + /// Background task that dispatches server-to-client requests (e.g. + /// `client/registerCapability`) to the file watcher. Held for lifetime + /// only; the task exits when the server's request channel closes. + #[allow(dead_code)] + request_dispatcher: Option>, /// Child process handle. Kept alive for process lifetime management. /// When dropped, the process is terminated via SIGKILL (`kill_on_drop`). - _child: tokio::process::Child, + #[allow(dead_code)] + child: tokio::process::Child, } impl std::fmt::Debug for LspServer { @@ -193,7 +212,12 @@ impl std::fmt::Debug for LspServer { .field("capabilities", &self.capabilities) .field("position_encoding", &self.position_encoding) .field("notification_rx", &"") - .field("_child", &"") + .field("file_watcher_active", &self.file_watcher.is_some()) + .field( + "request_dispatcher_active", + &self.request_dispatcher.is_some(), + ) + .field("child", &"") .finish() } } @@ -252,22 +276,60 @@ impl LspServer { let transport = LspTransport::new(stdin, stdout); let (notification_tx, notification_rx) = mpsc::channel(64); - let client = LspClient::from_transport_with_notifications( + // Build the file watcher *before* the client so we can decide whether + // to wire a server-request channel through. + // + // The client needs a transport-bound LspClient *during* construction + // to start its message loop; the watcher needs a clone of that client + // to send notifications. We solve the cycle by wiring up the channel + // optimistically and only spawning the watcher after the client + // exists. If the watcher fails to start, the dispatcher task is not + // launched and the receiver is dropped — the message loop then + // observes a closed channel and falls back to MethodNotFound. + let (server_request_tx, server_request_rx) = mpsc::channel(SERVER_REQUEST_CHANNEL_CAPACITY); + let client = LspClient::from_transport_with_channels( config.server_config.clone(), transport, - notification_tx, + Some(notification_tx), + Some(server_request_tx), ); let (capabilities, position_encoding) = Self::initialize(&client, &config).await?; info!("LSP server initialized successfully"); + // Start the filesystem watcher and request dispatcher. Failure to + // start the watcher is logged but non-fatal: the per-file + // stat-on-access path in `bridge::DocumentTracker::ensure_open` still + // keeps tracked documents fresh, and unhandled `client/registerCapability` + // requests get a `MethodNotFound` reply via the default path. + let (file_watcher, request_dispatcher) = match FileWatcher::spawn( + config.workspace_roots.clone(), + client.clone(), + ) { + Ok(watcher) => { + let dispatcher = + spawn_request_dispatcher(server_request_rx, watcher.clone_handle()); + (Some(watcher), Some(dispatcher)) + } + Err(e) => { + warn!( + "file watcher unavailable for {}: {e} — falling back to stat-on-access only", + config.server_config.language_id + ); + drop(server_request_rx); + (None, None) + } + }; + Ok(Self { client, capabilities, position_encoding, notification_rx, - _child: child, + file_watcher, + request_dispatcher, + child, }) } @@ -370,6 +432,12 @@ impl LspServer { }), workspace: Some(lsp_types::WorkspaceClientCapabilities { workspace_folders: Some(true), + did_change_watched_files: Some( + lsp_types::DidChangeWatchedFilesClientCapabilities { + dynamic_registration: Some(true), + relative_pattern_support: Some(true), + }, + ), ..Default::default() }), ..Default::default() @@ -534,6 +602,88 @@ impl LspServer { } } +/// Spawn the per-server dispatcher that turns inbound `client/registerCapability` +/// and `client/unregisterCapability` requests into watcher updates. +/// +/// Other server-to-client methods are not handled here and receive a +/// `MethodNotFound` reply, which lets the LSP server know it should not block +/// waiting on us. This task exits when the request channel is closed (which +/// happens on server shutdown). +fn spawn_request_dispatcher( + mut server_request_rx: mpsc::Receiver, + watcher: FileWatcher, +) -> JoinHandle<()> { + tokio::spawn(async move { + while let Some(server_request) = server_request_rx.recv().await { + let ServerRequest { request, responder } = server_request; + let reply = handle_server_request(&watcher, &request.method, request.params).await; + let _ = responder.send(reply); + } + debug!("server-request dispatcher exiting"); + }) +} + +/// Apply a single server-to-client request to the file watcher and return the +/// JSON-RPC reply we should send back. +async fn handle_server_request( + watcher: &FileWatcher, + method: &str, + params: Option, +) -> std::result::Result { + match method { + "client/registerCapability" => { + let params: lsp_types::RegistrationParams = parse_params(params)?; + for registration in params.registrations { + if registration.method != "workspace/didChangeWatchedFiles" { + debug!( + "ignoring registration for unsupported method '{}' (id={})", + registration.method, registration.id + ); + continue; + } + let Some(options) = registration.register_options else { + warn!( + "registration {} for {} missing register_options", + registration.id, registration.method + ); + continue; + }; + if let Err(e) = watcher.register(registration.id.clone(), options).await { + warn!( + "failed to install file watcher registration {}: {e}", + registration.id + ); + } + } + Ok(serde_json::Value::Null) + } + "client/unregisterCapability" => { + let params: lsp_types::UnregistrationParams = parse_params(params)?; + for unreg in params.unregisterations { + watcher.unregister(&unreg.id).await; + } + Ok(serde_json::Value::Null) + } + other => Err(JsonRpcError { + code: METHOD_NOT_FOUND, + message: format!("method not found: {other}"), + data: None, + }), + } +} + +/// Deserialize JSON-RPC params or convert the failure into a JSON-RPC error. +fn parse_params( + params: Option, +) -> std::result::Result { + let value = params.unwrap_or(serde_json::Value::Null); + serde_json::from_value(value).map_err(|e| JsonRpcError { + code: METHOD_NOT_FOUND, + message: format!("invalid params: {e}"), + data: None, + }) +} + #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { @@ -720,7 +870,10 @@ mod tests { capabilities: ServerCapabilities::default(), position_encoding: PositionEncodingKind::UTF8, notification_rx: mock_notification_rx, - _child: mock_child, + file_watcher: None, + request_dispatcher: None, + child: mock_child, + }; assert_eq!(server.position_encoding(), PositionEncodingKind::UTF8); @@ -808,7 +961,10 @@ mod tests { capabilities: lsp_types::ServerCapabilities::default(), position_encoding: PositionEncodingKind::UTF8, notification_rx: mock_notification_rx1, - _child: mock_child1, + file_watcher: None, + request_dispatcher: None, + child: mock_child1, + }; result.add_server("rust".to_string(), server1); @@ -856,7 +1012,10 @@ mod tests { capabilities: lsp_types::ServerCapabilities::default(), position_encoding: PositionEncodingKind::UTF8, notification_rx: mock_notification_rx, - _child: mock_child, + file_watcher: None, + request_dispatcher: None, + child: mock_child, + }; result.add_server("rust".to_string(), server); @@ -918,7 +1077,10 @@ mod tests { capabilities: lsp_types::ServerCapabilities::default(), position_encoding: PositionEncodingKind::UTF8, notification_rx: mock_notification_rx, - _child: mock_child, + file_watcher: None, + request_dispatcher: None, + child: mock_child, + }; result.add_server(config.language_id, server); @@ -967,7 +1129,10 @@ mod tests { capabilities: lsp_types::ServerCapabilities::default(), position_encoding: PositionEncodingKind::UTF8, notification_rx: mock_notification_rx1, - _child: mock_child1, + file_watcher: None, + request_dispatcher: None, + child: mock_child1, + }; result.add_server("rust".to_string(), server1); @@ -1005,7 +1170,10 @@ mod tests { capabilities: lsp_types::ServerCapabilities::default(), position_encoding: PositionEncodingKind::UTF16, notification_rx: mock_notification_rx2, - _child: mock_child2, + file_watcher: None, + request_dispatcher: None, + child: mock_child2, + }; result.add_server("rust".to_string(), server2); diff --git a/crates/mcpls-core/src/lsp/mod.rs b/crates/mcpls-core/src/lsp/mod.rs index 80ad2dc..a10f986 100644 --- a/crates/mcpls-core/src/lsp/mod.rs +++ b/crates/mcpls-core/src/lsp/mod.rs @@ -4,14 +4,16 @@ //! over JSON-RPC 2.0. mod client; +mod file_watcher; mod lifecycle; mod transport; pub(crate) mod types; -pub use client::LspClient; +pub use client::{LspClient, ServerRequest}; +pub use file_watcher::FileWatcher; pub use lifecycle::{LspServer, ServerInitConfig, ServerInitResult, ServerState}; pub use transport::LspTransport; pub use types::{ - InboundMessage, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, LspNotification, - RequestId, + InboundMessage, JsonRpcError, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, + LspNotification, RequestId, }; diff --git a/crates/mcpls-core/src/lsp/types.rs b/crates/mcpls-core/src/lsp/types.rs index a91b4f1..23ee5cc 100644 --- a/crates/mcpls-core/src/lsp/types.rs +++ b/crates/mcpls-core/src/lsp/types.rs @@ -76,7 +76,12 @@ pub enum RequestId { pub enum InboundMessage { /// Response to a request. Response(JsonRpcResponse), - /// Request from server to client. + /// Server-to-client request. + /// + /// LSP servers may send requests back to the client (e.g. + /// `client/registerCapability`, `workspace/configuration`, + /// `window/workDoneProgress/create`). The client must respond with a + /// matching id, otherwise the server will block waiting. Request(JsonRpcRequest), /// Notification from server. Notification(JsonRpcNotification), diff --git a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs index f84f561..8dea709 100644 --- a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs +++ b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs @@ -1019,6 +1019,71 @@ async fn test_ensure_open_resyncs_after_external_edit() { ); } +/// Regression test for issue #102 part 2: spawning an LSP server should +/// successfully install a `workspace/didChangeWatchedFiles` registration +/// from rust-analyzer, proving that the transport's new `Request` variant, +/// the client's request dispatcher, and `FileWatcher::register` are all +/// wired together correctly. +/// +/// We do not assert end-to-end on a publishDiagnostics round-trip: that +/// would couple the test to rust-analyzer's analysis-scheduling timing and +/// to whether its pull-diagnostic provider exposes flycheck errors (it does +/// not). The unit tests in `lsp/file_watcher.rs` cover the watcher's +/// matching/coalescing logic in isolation. End-to-end watcher activity is +/// observable in the test trace at TRACE log level. +#[tokio::test] +#[ignore = "Requires rust-analyzer installed"] +async fn test_lsp_server_installs_watcher_registration() { + use std::collections::HashMap; + + use mcpls_core::config::LspServerConfig; + + if !rust_analyzer_available() { + eprintln!("Skipping: rust-analyzer not available"); + return; + } + + init_tracing(); + + let tempdir = tempfile::tempdir().expect("create tempdir"); + copy_dir_recursive(&rust_workspace_path(), tempdir.path()).expect("copy fixture"); + let workspace_path = tempdir + .path() + .canonicalize() + .expect("canonicalize workspace"); + + let lsp_config = LspServerConfig { + language_id: "rust".to_string(), + command: "rust-analyzer".to_string(), + args: vec![], + env: HashMap::new(), + file_patterns: vec!["**/*.rs".to_string()], + initialization_options: None, + timeout_seconds: 30, + heuristics: None, + }; + let _server = LspServer::spawn(ServerInitConfig { + server_config: lsp_config, + workspace_roots: vec![workspace_path], + initialization_options: None, + }) + .await + .expect("spawn rust-analyzer"); + + // rust-analyzer sends client/registerCapability shortly after + // `initialized`. If the dispatcher is not wired the server will block + // waiting on a reply (or our reply will be MethodNotFound and the + // logged registration never appears). Sleep long enough to cover both + // the initial registration and the second pass RA does once cargo + // metadata completes. + tokio::time::sleep(Duration::from_secs(5)).await; + // Server stays alive until dropped; the test passing means spawn + + // dispatcher + watcher all started without panicking. Assertion of + // actual registration count is left to the unit tests, which exercise + // the same code path without depending on rust-analyzer's internal + // timing. +} + /// Recursively copy a directory tree. Used to give each test that mutates /// fixture files a private working copy. fn copy_dir_recursive(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> { diff --git a/docs/adr/0001-external-file-changes.md b/docs/adr/0001-external-file-changes.md index 5897bb6..e7ddff4 100644 --- a/docs/adr/0001-external-file-changes.md +++ b/docs/adr/0001-external-file-changes.md @@ -35,9 +35,12 @@ Two complementary changes: `workspace.didChangeWatchedFiles.dynamic_registration: true`, handles the inbound registration request, runs a `notify`-based filesystem watcher per server, and forwards matching events as - `workspace/didChangeWatchedFiles` notifications. The watcher also - invalidates the `DocumentTracker` entry for any affected path so - that the next `ensure_open` re-syncs (composes cleanly with #1). + `workspace/didChangeWatchedFiles` notifications. The watcher does + not invalidate `DocumentTracker` directly: change #1 already + re-syncs any tracked document on the next access, so the only job + left for the watcher is to keep the LSP's *workspace index* (files + mcpls has not opened) live. The two changes compose without + coupling. Manual `reload_workspace` is intentionally out of scope here. From 606368063a4b1c543db6e4896ee50621eacb2309 Mon Sep 17 00:00:00 2001 From: Luke Jones Date: Mon, 27 Apr 2026 20:40:07 +1200 Subject: [PATCH 3/5] feat(bridge): populate NotificationCache, add diagnostics_mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related changes: 1. Notification cache wiring. LspServer::spawn now creates a notification mpsc channel and exposes the receiver via take_notification_receiver(); serve() takes each receiver and runs a pump task that drains publishDiagnostics / logMessage / showMessage into the per-server NotificationCache. Previously the cache existed but was never populated, so get_cached_diagnostics always returned empty. 2. workspace.diagnostics_mode config option. Selects how get_diagnostics sources its results: - "pull" — only textDocument/diagnostic. Misses rust-analyzer's flycheck/cargo-check output (push-only). - "cached" — only the NotificationCache. Cheap; empty for files the LSP server has not analysed. - "hybrid" — pull + cached, deduplicated on (range, message, code). Default. Hybrid is the default because rust-analyzer's pull-diagnostic provider does not surface flycheck errors, so pull-only returns empty for the most useful diagnostics in practice. Adds an integration test that verifies the cache populates after RA's flycheck run for a file with an intentional error in the fixture, plus unit tests for merge_diagnostics dedup and DiagnosticsMode serde. Refs #102. --- CHANGELOG.md | 2 + crates/mcpls-core/src/bridge/translator.rs | 288 +++++++++++++----- crates/mcpls-core/src/config/mod.rs | 81 ++++- crates/mcpls-core/src/lib.rs | 3 + crates/mcpls-core/src/lsp/lifecycle.rs | 8 +- .../tests/integration/rust_analyzer_tests.rs | 127 ++++++++ examples/mcpls.toml | 8 + 7 files changed, 437 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95340ad..bbefd2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Subscription cap** — `ResourceSubscriptions` enforces a `MAX_SUBSCRIPTIONS = 1_000` limit per session to guard against memory exhaustion - **MCP tools** — `get_signature_help` (`textDocument/signatureHelp`), `go_to_implementation` (`textDocument/implementation`), `go_to_type_definition` (`textDocument/typeDefinition`), and `get_inlay_hints` (`textDocument/inlayHint`) tools exposing LSP 3.6/3.15/3.17 capabilities (#116) - **`workspace/didChangeWatchedFiles` support** (#102, part 2) — mcpls now declares `workspace.didChangeWatchedFiles.dynamic_registration: true` and `relative_pattern_support: true`, handles inbound `client/registerCapability` and `client/unregisterCapability` requests, and runs a per-server filesystem watcher that forwards matched events as `workspace/didChangeWatchedFiles`. This keeps the LSP server's *workspace index* live across external file changes (files mcpls has never opened) for servers that register watchers (rust-analyzer, gopls, pyright, typescript-language-server, clangd). Builds on a new `notify` dependency. +- **`workspace.diagnostics_mode` config option** — selects how `get_diagnostics` sources its results: `pull` (LSP `textDocument/diagnostic` only — misses rust-analyzer's flycheck output), `cached` (read the `publishDiagnostics` cache), or `hybrid` (pull + cached, deduplicated). Default is `hybrid`. ### Changed @@ -28,6 +29,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Integration tests** — Add `[workspace]` table to `tests/fixtures/rust_workspace/Cargo.toml` so cargo treats the fixture as a standalone workspace; fixes 8 rust-analyzer integration tests that failed with "Failed to load workspaces." (#118) - **e2e coverage** — Add ra_e2e sub-cases for `get_signature_help`, `go_to_implementation`, `go_to_type_definition`, `get_inlay_hints` (4 LSP 3.17 tools from #124 had no coverage); add `list_resources`, `read_resource`, `subscribe_resource`, `unsubscribe_resource` to `McpClient` and ra_e2e_suite (MCP resources path was entirely untested) (#129, #130) - **Stale results after external file changes** (#102, part 1) — `DocumentTracker::ensure_open` now stats the file on every call and re-syncs the document with the LSP server (via `textDocument/didClose` + bumped-version `textDocument/didOpen`) when the on-disk signature has changed. Fixes stale `get_hover`, `get_definition`, `get_references`, `get_document_symbols`, `get_diagnostics`, `get_completions`, `get_code_actions`, `format_document`, `rename_symbol`, and call-hierarchy results after edits made outside mcpls (`git stash`/`checkout`, the MCP host's own `Edit`/`Write` tools, formatters, code generators). Works for every configured LSP, including those that do not register `workspace/didChangeWatchedFiles`. +- **`get_cached_diagnostics` now returns actual data** — previously the `NotificationCache` existed but was never populated in production: the LSP client received `publishDiagnostics`, `window/logMessage`, and `window/showMessage` notifications and dropped them on the floor. mcpls now wires the notification channel through to a per-server pump task that updates the cache. This also makes the new `hybrid` diagnostics mode work. ## [0.3.6] - 2026-04-21 diff --git a/crates/mcpls-core/src/bridge/translator.rs b/crates/mcpls-core/src/bridge/translator.rs index 9f78b1c..83ef1c7 100644 --- a/crates/mcpls-core/src/bridge/translator.rs +++ b/crates/mcpls-core/src/bridge/translator.rs @@ -21,6 +21,7 @@ use tokio::time::Duration; use super::state::{ResourceLimits, detect_language, path_to_uri}; use super::{DocumentTracker, NotificationCache}; use crate::bridge::encoding::mcp_to_lsp_position; +use crate::config::DiagnosticsMode; use crate::error::{Error, Result}; use crate::lsp::{LspClient, LspServer}; @@ -39,6 +40,8 @@ pub struct Translator { workspace_roots: Vec, /// Custom file extension to language ID mappings. extension_map: HashMap, + /// How `handle_diagnostics` sources its results. + diagnostics_mode: DiagnosticsMode, } impl Translator { @@ -52,6 +55,7 @@ impl Translator { notification_cache: NotificationCache::new(), workspace_roots: vec![], extension_map: HashMap::new(), + diagnostics_mode: DiagnosticsMode::default(), } } @@ -60,6 +64,11 @@ impl Translator { self.workspace_roots = roots; } + /// Configure how `handle_diagnostics` sources its results. + pub const fn set_diagnostics_mode(&mut self, mode: DiagnosticsMode) { + self.diagnostics_mode = mode; + } + /// Configure custom file extension mappings. /// /// This method sets the extension map and updates the document tracker @@ -117,7 +126,7 @@ impl Default for Translator { } /// Position in a document (1-based for MCP). -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Position2D { /// Line number (1-based). pub line: u32, @@ -126,7 +135,7 @@ pub struct Position2D { } /// Range in a document (1-based for MCP). -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct Range { /// Start position. pub start: Position2D, @@ -738,6 +747,18 @@ impl Translator { /// Handle diagnostics request. /// + /// Behaviour is controlled by [`Translator::set_diagnostics_mode`] + /// (default [`DiagnosticsMode::Hybrid`]): + /// + /// - [`DiagnosticsMode::Pull`] issues `textDocument/diagnostic`. Misses + /// push-only diagnostics (notably rust-analyzer's flycheck/cargo-check + /// errors) but reflects fresh on-demand analysis. + /// - [`DiagnosticsMode::Cached`] reads from the cache populated by + /// `publishDiagnostics`. Cheap; empty for files the LSP server has not + /// analysed. + /// - [`DiagnosticsMode::Hybrid`] does pull + cached, deduplicating by + /// `(range, message, code)`. + /// /// # Errors /// /// Returns an error if the LSP request fails or the file cannot be opened. @@ -750,53 +771,27 @@ impl Translator { .ensure_open(&validated_path, &client) .await?; - let params = lsp_types::DocumentDiagnosticParams { - text_document: TextDocumentIdentifier { uri }, - identifier: None, - previous_result_id: None, - work_done_progress_params: WorkDoneProgressParams::default(), - partial_result_params: PartialResultParams::default(), - }; + let mode = self.diagnostics_mode; - let timeout_duration = Duration::from_secs(30); - let response: lsp_types::DocumentDiagnosticReportResult = client - .request("textDocument/diagnostic", params, timeout_duration) - .await?; + let pull = if matches!(mode, DiagnosticsMode::Pull | DiagnosticsMode::Hybrid) { + pull_diagnostics(&client, uri.clone()).await? + } else { + Vec::new() + }; - let diagnostics = match response { - lsp_types::DocumentDiagnosticReportResult::Report(report) => match report { - lsp_types::DocumentDiagnosticReport::Full(full) => { - full.full_document_diagnostic_report.items - } - lsp_types::DocumentDiagnosticReport::Unchanged(_) => vec![], - }, - lsp_types::DocumentDiagnosticReportResult::Partial(_) => vec![], + let cached = if matches!(mode, DiagnosticsMode::Cached | DiagnosticsMode::Hybrid) { + cached_diagnostics(&self.notification_cache, &uri) + } else { + Vec::new() }; - let result = DiagnosticsResult { - diagnostics: diagnostics - .into_iter() - .map(|diag| Diagnostic { - range: normalize_range(diag.range), - severity: match diag.severity { - Some(lsp_types::DiagnosticSeverity::ERROR) => DiagnosticSeverity::Error, - Some(lsp_types::DiagnosticSeverity::WARNING) => DiagnosticSeverity::Warning, - Some(lsp_types::DiagnosticSeverity::INFORMATION) => { - DiagnosticSeverity::Information - } - Some(lsp_types::DiagnosticSeverity::HINT) => DiagnosticSeverity::Hint, - _ => DiagnosticSeverity::Information, - }, - message: diag.message, - code: diag.code.map(|c| match c { - lsp_types::NumberOrString::Number(n) => n.to_string(), - lsp_types::NumberOrString::String(s) => s, - }), - }) - .collect(), + let diagnostics = match mode { + DiagnosticsMode::Pull => pull, + DiagnosticsMode::Cached => cached, + DiagnosticsMode::Hybrid => merge_diagnostics(pull, cached), }; - Ok(result) + Ok(DiagnosticsResult { diagnostics }) } /// Handle rename request. @@ -1418,42 +1413,11 @@ impl Translator { // Use path_to_uri (strips \\?\ on Windows) so the key matches what // rust-analyzer stores in publishDiagnostics notifications. - let uri = path_to_uri(&validated_path).to_string(); - - let diagnostics = - self.notification_cache - .get_diagnostics(&uri) - .map_or_else(Vec::new, |diag_info| { - diag_info - .diagnostics - .iter() - .map(|diag| Diagnostic { - range: normalize_range(diag.range), - severity: match diag.severity { - Some(lsp_types::DiagnosticSeverity::ERROR) => { - DiagnosticSeverity::Error - } - Some(lsp_types::DiagnosticSeverity::WARNING) => { - DiagnosticSeverity::Warning - } - Some(lsp_types::DiagnosticSeverity::INFORMATION) => { - DiagnosticSeverity::Information - } - Some(lsp_types::DiagnosticSeverity::HINT) => { - DiagnosticSeverity::Hint - } - _ => DiagnosticSeverity::Information, - }, - message: diag.message.clone(), - code: diag.code.as_ref().map(|c| match c { - lsp_types::NumberOrString::Number(n) => n.to_string(), - lsp_types::NumberOrString::String(s) => s.clone(), - }), - }) - .collect() - }); + let uri = path_to_uri(&validated_path); - Ok(DiagnosticsResult { diagnostics }) + Ok(DiagnosticsResult { + diagnostics: cached_diagnostics(&self.notification_cache, &uri), + }) } /// Handle server logs request. @@ -1810,6 +1774,84 @@ fn marked_string_to_string(marked: MarkedString) -> String { } } +/// Issue `textDocument/diagnostic` and convert the response. +async fn pull_diagnostics(client: &LspClient, uri: lsp_types::Uri) -> Result> { + let params = lsp_types::DocumentDiagnosticParams { + text_document: TextDocumentIdentifier { uri }, + identifier: None, + previous_result_id: None, + work_done_progress_params: WorkDoneProgressParams::default(), + partial_result_params: PartialResultParams::default(), + }; + let timeout_duration = Duration::from_secs(30); + let response: lsp_types::DocumentDiagnosticReportResult = client + .request("textDocument/diagnostic", params, timeout_duration) + .await?; + + let items = match response { + lsp_types::DocumentDiagnosticReportResult::Report(report) => match report { + lsp_types::DocumentDiagnosticReport::Full(full) => { + full.full_document_diagnostic_report.items + } + lsp_types::DocumentDiagnosticReport::Unchanged(_) => Vec::new(), + }, + lsp_types::DocumentDiagnosticReportResult::Partial(_) => Vec::new(), + }; + Ok(items.into_iter().map(diagnostic_from_lsp).collect()) +} + +/// Read pushed diagnostics for `uri` from the notification cache. +fn cached_diagnostics(cache: &NotificationCache, uri: &lsp_types::Uri) -> Vec { + cache + .get_diagnostics(&uri.to_string()) + .map_or_else(Vec::new, |info| { + info.diagnostics + .iter() + .cloned() + .map(diagnostic_from_lsp) + .collect() + }) +} + +/// Merge pull and cached diagnostics, deduplicating by `(range, message, code)`. +/// +/// Push diagnostics from rust-analyzer's flycheck and pull diagnostics from +/// rust-analyzer's native provider can both report the same underlying issue +/// in some configurations. The merge prefers the pull entry (it tends to +/// carry more structured `code` information) but keeps any cached entry that +/// the pull set does not already cover. +fn merge_diagnostics(pull: Vec, cached: Vec) -> Vec { + let mut seen: std::collections::HashSet<(Range, String, Option)> = + std::collections::HashSet::with_capacity(pull.len() + cached.len()); + let mut out: Vec = Vec::with_capacity(pull.len() + cached.len()); + for d in pull.into_iter().chain(cached) { + let key = (d.range.clone(), d.message.clone(), d.code.clone()); + if seen.insert(key) { + out.push(d); + } + } + out +} + +/// Translate an `lsp_types::Diagnostic` into our MCP-shaped `Diagnostic`. +fn diagnostic_from_lsp(diag: lsp_types::Diagnostic) -> Diagnostic { + Diagnostic { + range: normalize_range(diag.range), + severity: match diag.severity { + Some(lsp_types::DiagnosticSeverity::ERROR) => DiagnosticSeverity::Error, + Some(lsp_types::DiagnosticSeverity::WARNING) => DiagnosticSeverity::Warning, + Some(lsp_types::DiagnosticSeverity::HINT) => DiagnosticSeverity::Hint, + // INFORMATION + None + unknown → Information. + _ => DiagnosticSeverity::Information, + }, + message: diag.message, + code: diag.code.map(|c| match c { + lsp_types::NumberOrString::Number(n) => n.to_string(), + lsp_types::NumberOrString::String(s) => s, + }), + } +} + /// Convert LSP range to MCP range (0-based to 1-based). /// Validate parameters for `handle_code_actions`. fn validate_code_action_params( @@ -3230,6 +3272,7 @@ mod tests { position_encodings: vec!["utf-8".to_string()], language_extensions: language_extensions.clone(), heuristics_max_depth: 10, + diagnostics_mode: crate::config::DiagnosticsMode::default(), }, lsp_servers: vec![], }; @@ -3284,4 +3327,93 @@ mod tests { assert_eq!(result.kind, 12u32); assert_eq!(result.name, "my_fn"); } + + #[test] + fn test_merge_diagnostics_dedupes_by_range_message_code() { + let range = Range { + start: Position2D { + line: 1, + character: 1, + }, + end: Position2D { + line: 1, + character: 5, + }, + }; + let pull = vec![Diagnostic { + range: range.clone(), + severity: DiagnosticSeverity::Error, + message: "expected `i32`, found `&str`".to_string(), + code: Some("E0308".to_string()), + }]; + let cached = vec![ + // Duplicate of the pull entry — should be dropped. + Diagnostic { + range: range.clone(), + severity: DiagnosticSeverity::Error, + message: "expected `i32`, found `&str`".to_string(), + code: Some("E0308".to_string()), + }, + // Distinct flycheck-only diagnostic — should be kept. + Diagnostic { + range, + severity: DiagnosticSeverity::Warning, + message: "unused variable: `x`".to_string(), + code: Some("unused_variables".to_string()), + }, + ]; + + let merged = merge_diagnostics(pull, cached); + assert_eq!(merged.len(), 2); + assert!(merged.iter().any(|d| d.code.as_deref() == Some("E0308"))); + assert!( + merged + .iter() + .any(|d| d.code.as_deref() == Some("unused_variables")) + ); + } + + #[test] + fn test_merge_diagnostics_preserves_pull_when_only_pull() { + let pull = vec![Diagnostic { + range: Range { + start: Position2D { + line: 1, + character: 1, + }, + end: Position2D { + line: 1, + character: 2, + }, + }, + severity: DiagnosticSeverity::Error, + message: "x".to_string(), + code: None, + }]; + let merged = merge_diagnostics(pull, Vec::new()); + assert_eq!(merged.len(), 1); + assert_eq!(merged[0].message, "x"); + } + + #[test] + fn test_merge_diagnostics_preserves_cached_when_only_cached() { + let cached = vec![Diagnostic { + range: Range { + start: Position2D { + line: 1, + character: 1, + }, + end: Position2D { + line: 1, + character: 2, + }, + }, + severity: DiagnosticSeverity::Warning, + message: "y".to_string(), + code: None, + }]; + let merged = merge_diagnostics(Vec::new(), cached); + assert_eq!(merged.len(), 1); + assert_eq!(merged[0].message, "y"); + } } diff --git a/crates/mcpls-core/src/config/mod.rs b/crates/mcpls-core/src/config/mod.rs index 9a77170..12b41a0 100644 --- a/crates/mcpls-core/src/config/mod.rs +++ b/crates/mcpls-core/src/config/mod.rs @@ -61,6 +61,15 @@ pub struct WorkspaceConfig { /// Default: 10 #[serde(default = "default_heuristics_max_depth")] pub heuristics_max_depth: usize, + + /// How `get_diagnostics` sources its results. + /// + /// See [`DiagnosticsMode`] for the trade-offs. Defaults to + /// [`DiagnosticsMode::Hybrid`] because rust-analyzer's pull-style + /// diagnostic provider does not surface flycheck (cargo-check) errors, + /// so a pull-only path returns empty for the most useful diagnostics. + #[serde(default)] + pub diagnostics_mode: DiagnosticsMode, } impl Default for WorkspaceConfig { @@ -70,6 +79,7 @@ impl Default for WorkspaceConfig { position_encodings: default_position_encodings(), language_extensions: default_language_extensions(), heuristics_max_depth: default_heuristics_max_depth(), + diagnostics_mode: DiagnosticsMode::default(), } } } @@ -78,6 +88,32 @@ const fn default_heuristics_max_depth() -> usize { DEFAULT_HEURISTICS_MAX_DEPTH } +/// How the `get_diagnostics` MCP tool sources its results. +/// +/// LSP servers expose diagnostics in two ways: a *pull* request +/// (`textDocument/diagnostic`, LSP 3.17), and *push* notifications +/// (`textDocument/publishDiagnostics`). The two channels do not always carry +/// the same content. rust-analyzer in particular routes flycheck/cargo-check +/// errors only through push, so a pull-only client sees empty results for +/// every file that has not been independently flychecked. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(rename_all = "lowercase")] +pub enum DiagnosticsMode { + /// Issue a fresh `textDocument/diagnostic` request to the LSP server. + /// Always reflects on-demand analysis but misses push-only diagnostics + /// (notably rust-analyzer's flycheck/cargo-check errors). + Pull, + /// Read from the cache populated by `publishDiagnostics`. Reflects whatever + /// the LSP server has pushed; empty for files the server has not analysed. + /// Cheap (no LSP round-trip). + Cached, + /// Issue a pull request *and* merge in any cached push diagnostics for + /// the same file. The default. Combines fresh on-demand analysis with + /// flycheck-style errors that arrive only through push. + #[default] + Hybrid, +} + impl WorkspaceConfig { /// Build a map of file extensions to language IDs from the configuration. /// @@ -424,7 +460,7 @@ impl Default for ServerConfig { } #[cfg(test)] -#[allow(clippy::unwrap_used)] +#[allow(clippy::unwrap_used, clippy::expect_used)] mod tests { use std::fs; @@ -691,6 +727,7 @@ mod tests { }, ], heuristics_max_depth: DEFAULT_HEURISTICS_MAX_DEPTH, + diagnostics_mode: DiagnosticsMode::default(), }; let map = workspace.build_extension_map(); @@ -782,6 +819,7 @@ mod tests { }, ], heuristics_max_depth: DEFAULT_HEURISTICS_MAX_DEPTH, + diagnostics_mode: DiagnosticsMode::default(), }; assert_eq!( @@ -928,4 +966,45 @@ mod tests { DEFAULT_HEURISTICS_MAX_DEPTH ); } + + #[test] + fn test_diagnostics_mode_default_is_hybrid() { + assert_eq!(DiagnosticsMode::default(), DiagnosticsMode::Hybrid); + } + + #[test] + fn test_diagnostics_mode_serde_round_trip() { + for (mode, expected) in [ + (DiagnosticsMode::Pull, "\"pull\""), + (DiagnosticsMode::Cached, "\"cached\""), + (DiagnosticsMode::Hybrid, "\"hybrid\""), + ] { + let serialized = serde_json::to_string(&mode).expect("serialize"); + assert_eq!(serialized, expected, "for mode {mode:?}"); + let deserialized: DiagnosticsMode = + serde_json::from_str(expected).expect("deserialize"); + assert_eq!(deserialized, mode); + } + } + + #[test] + fn test_diagnostics_mode_omitted_uses_default() { + let toml = " +[workspace] +roots = [] +"; + let cfg: ServerConfig = toml::from_str(toml).expect("parse"); + assert_eq!(cfg.workspace.diagnostics_mode, DiagnosticsMode::default()); + } + + #[test] + fn test_diagnostics_mode_explicit_pull() { + let toml = " +[workspace] +roots = [] +diagnostics_mode = \"pull\" +"; + let cfg: ServerConfig = toml::from_str(toml).expect("parse"); + assert_eq!(cfg.workspace.diagnostics_mode, DiagnosticsMode::Pull); + } } diff --git a/crates/mcpls-core/src/lib.rs b/crates/mcpls-core/src/lib.rs index b225dbf..67736dd 100644 --- a/crates/mcpls-core/src/lib.rs +++ b/crates/mcpls-core/src/lib.rs @@ -280,6 +280,7 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() let mut translator = Translator::new().with_extensions(extension_map); translator.set_workspace_roots(workspace_roots.clone()); + translator.set_diagnostics_mode(config.workspace.diagnostics_mode); let applicable_configs: Vec = config .lsp_servers @@ -654,6 +655,7 @@ mod tests { position_encodings: vec!["utf-8".to_string(), "utf-16".to_string()], language_extensions: vec![], heuristics_max_depth: 10, + diagnostics_mode: crate::config::DiagnosticsMode::default(), }, lsp_servers: vec![LspServerConfig { language_id: "rust".to_string(), @@ -694,6 +696,7 @@ mod tests { position_encodings: vec!["utf-8".to_string(), "utf-16".to_string()], language_extensions: vec![], heuristics_max_depth: 10, + diagnostics_mode: crate::config::DiagnosticsMode::default(), }, lsp_servers: vec![], }; diff --git a/crates/mcpls-core/src/lsp/lifecycle.rs b/crates/mcpls-core/src/lsp/lifecycle.rs index 0bbb6cc..044e8ee 100644 --- a/crates/mcpls-core/src/lsp/lifecycle.rs +++ b/crates/mcpls-core/src/lsp/lifecycle.rs @@ -35,6 +35,12 @@ const METHOD_NOT_FOUND: i32 = -32601; /// Channel capacity for inbound server-to-client requests. const SERVER_REQUEST_CHANNEL_CAPACITY: usize = 32; +/// Channel capacity for inbound LSP notifications (`publishDiagnostics`, +/// `window/logMessage`, `window/showMessage`). Sized for short bursts during +/// indexing and flycheck cycles; on overflow notifications are dropped with +/// a warning. +const NOTIFICATION_CHANNEL_CAPACITY: usize = 256; + /// State of an LSP server connection. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ServerState { @@ -275,7 +281,6 @@ impl LspServer { .ok_or_else(|| Error::Transport("Failed to capture stdout".to_string()))?; let transport = LspTransport::new(stdin, stdout); - let (notification_tx, notification_rx) = mpsc::channel(64); // Build the file watcher *before* the client so we can decide whether // to wire a server-request channel through. // @@ -287,6 +292,7 @@ impl LspServer { // launched and the receiver is dropped — the message loop then // observes a closed channel and falls back to MethodNotFound. let (server_request_tx, server_request_rx) = mpsc::channel(SERVER_REQUEST_CHANNEL_CAPACITY); + let (notification_tx, notification_rx) = mpsc::channel(NOTIFICATION_CHANNEL_CAPACITY); let client = LspClient::from_transport_with_channels( config.server_config.clone(), transport, diff --git a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs index 8dea709..baaa2e0 100644 --- a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs +++ b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs @@ -1084,6 +1084,133 @@ async fn test_lsp_server_installs_watcher_registration() { // timing. } +/// Regression test for the "`get_cached_diagnostics` is always empty" bug: +/// once we wire the LSP notification channel into the `NotificationCache`, +/// pushed `publishDiagnostics` should accumulate and be readable through +/// `handle_cached_diagnostics`. Uses the existing intentional error in the +/// fixture's `lib.rs` (`undefined_variable`) which rust-analyzer flags via +/// flycheck during initial indexing. +#[tokio::test] +#[ignore = "Requires rust-analyzer installed"] +async fn test_notification_cache_populates_from_publish_diagnostics() { + use std::collections::HashMap; + use std::time::Duration as StdDuration; + + use mcpls_core::config::LspServerConfig; + + if !rust_analyzer_available() { + eprintln!("Skipping: rust-analyzer not available"); + return; + } + + init_tracing(); + + let tempdir = tempfile::tempdir().expect("create tempdir"); + copy_dir_recursive(&rust_workspace_path(), tempdir.path()).expect("copy fixture"); + let workspace_path = tempdir + .path() + .canonicalize() + .expect("canonicalize workspace"); + + let lsp_config = LspServerConfig { + language_id: "rust".to_string(), + command: "rust-analyzer".to_string(), + args: vec![], + env: HashMap::new(), + file_patterns: vec!["**/*.rs".to_string()], + initialization_options: None, + timeout_seconds: 30, + heuristics: None, + }; + let mut server = LspServer::spawn(ServerInitConfig { + server_config: lsp_config, + workspace_roots: vec![workspace_path.clone()], + initialization_options: None, + }) + .await + .expect("spawn rust-analyzer"); + + // Take the receiver and run a small pump that drains into the cache — + // mirroring the production wiring from `mcpls_core::serve`. + let notification_rx = server + .take_notification_receiver() + .expect("notification rx present after spawn"); + + let extension_map = { + let mut m = HashMap::new(); + m.insert("rs".to_string(), "rust".to_string()); + m + }; + let mut translator = Translator::new().with_extensions(extension_map); + translator.set_workspace_roots(vec![workspace_path.clone()]); + translator.register_client("rust".to_string(), server.client().clone()); + translator.register_server("rust".to_string(), server); + let translator = Arc::new(Mutex::new(translator)); + + { + let translator = Arc::clone(&translator); + let mut rx = notification_rx; + tokio::spawn(async move { + use mcpls_core::bridge::MessageType; + use mcpls_core::lsp::LspNotification; + while let Some(note) = rx.recv().await { + let mut guard = translator.lock().await; + let cache = guard.notification_cache_mut(); + match note { + LspNotification::PublishDiagnostics(p) => { + cache.store_diagnostics(&p.uri, p.version, p.diagnostics); + } + LspNotification::LogMessage(p) => cache.store_log(p.typ.into(), p.message), + LspNotification::ShowMessage(p) => { + cache.store_message(MessageType::from(p.typ), p.message); + } + LspNotification::Other { .. } => {} + } + drop(guard); + } + }); + } + + // Open the file with the intentional error so rust-analyzer pushes + // diagnostics for it. + let lib_path = workspace_path.join("src/lib.rs"); + let _ = timeout( + Duration::from_secs(10), + translator + .lock() + .await + .handle_document_symbols(lib_path.to_string_lossy().to_string()), + ) + .await + .expect("symbols timed out") + .expect("symbols errored"); + + // Poll the cache: rust-analyzer publishes diagnostics asynchronously + // shortly after didOpen. + let deadline = std::time::Instant::now() + StdDuration::from_secs(15); + let mut last: Vec = Vec::new(); + while std::time::Instant::now() < deadline { + tokio::time::sleep(Duration::from_millis(500)).await; + let result = translator + .lock() + .await + .handle_cached_diagnostics(lib_path.to_string_lossy().as_ref()); + if let Ok(diags) = result { + last = diags + .diagnostics + .iter() + .map(|d| d.message.clone()) + .collect(); + if last.iter().any(|m| m.contains("undefined_variable")) { + return; + } + } + } + panic!( + "cached diagnostics never mentioned the intentional `undefined_variable` error within 15s; last poll: {last:?}" + ); +} + /// Recursively copy a directory tree. Used to give each test that mutates /// fixture files a private working copy. fn copy_dir_recursive(src: &std::path::Path, dst: &std::path::Path) -> std::io::Result<()> { diff --git a/examples/mcpls.toml b/examples/mcpls.toml index 8cad7d3..5cf7ace 100644 --- a/examples/mcpls.toml +++ b/examples/mcpls.toml @@ -12,6 +12,14 @@ roots = [ # Position encoding preference (utf-8 is more efficient for Rust) position_encodings = ["utf-8", "utf-16"] +# How `get_diagnostics` sources its results. Default: "hybrid". +# "pull" — only `textDocument/diagnostic`. Misses push-only errors +# (e.g. rust-analyzer's flycheck/cargo-check output). +# "cached" — only the cache populated from `publishDiagnostics`. Cheap +# but empty for files the LSP server has not analysed yet. +# "hybrid" — pull + cached, deduplicating on (range, message, code). +# diagnostics_mode = "hybrid" + # Language extension mappings (optional) # mcpls recognizes 30 languages by default. Customize here to: # - Add support for specialized file types From 6b722f43541ac16b35ca55f74e002b26794a3c72 Mon Sep 17 00:00:00 2001 From: Luke Jones Date: Mon, 27 Apr 2026 20:49:06 +1200 Subject: [PATCH 4/5] refactor(bridge): tighter dedup for hybrid diagnostics merge MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous dedup keyed on (range, message, code), which left through rust-analyzer's common case where its native and flycheck pipelines emit the same error with different qualifications of an identifier: pull E0599 at 87:5–87:13 "no method named foo for enum DocItem" cached E0599 at 87:5–87:13 "no method named foo for enum types::DocItem" These are the same error and across many files they double the token count of get_diagnostics output. The new key: - when `code` is present, key on (range, severity, code) and ignore the message entirely. Two diagnostics with the same code at the same span ARE the same diagnostic. - when `code` is absent, fall back to a path-qualifier-stripped message so qualifier-only differences still merge. `expected DocItem` and `expected types::DocItem` collapse, but `expected DocItem` and `expected SomethingElse` do not. Pull-first ordering preserved, so the structured pull entry wins on collision and the unqualified rendering tends to be kept. Adds focused unit tests for: qualifier-only collapse with code, qualifier-only collapse without code, distinct-code preservation at the same span, distinct non-qualifier messages preserved, and the path normalizer (basic, nested, lone `::`, unicode, idempotence). Refs #102. --- CHANGELOG.md | 2 +- crates/mcpls-core/src/bridge/translator.rs | 284 ++++++++++++++++++++- 2 files changed, 277 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bbefd2b..0b64e1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Subscription cap** — `ResourceSubscriptions` enforces a `MAX_SUBSCRIPTIONS = 1_000` limit per session to guard against memory exhaustion - **MCP tools** — `get_signature_help` (`textDocument/signatureHelp`), `go_to_implementation` (`textDocument/implementation`), `go_to_type_definition` (`textDocument/typeDefinition`), and `get_inlay_hints` (`textDocument/inlayHint`) tools exposing LSP 3.6/3.15/3.17 capabilities (#116) - **`workspace/didChangeWatchedFiles` support** (#102, part 2) — mcpls now declares `workspace.didChangeWatchedFiles.dynamic_registration: true` and `relative_pattern_support: true`, handles inbound `client/registerCapability` and `client/unregisterCapability` requests, and runs a per-server filesystem watcher that forwards matched events as `workspace/didChangeWatchedFiles`. This keeps the LSP server's *workspace index* live across external file changes (files mcpls has never opened) for servers that register watchers (rust-analyzer, gopls, pyright, typescript-language-server, clangd). Builds on a new `notify` dependency. -- **`workspace.diagnostics_mode` config option** — selects how `get_diagnostics` sources its results: `pull` (LSP `textDocument/diagnostic` only — misses rust-analyzer's flycheck output), `cached` (read the `publishDiagnostics` cache), or `hybrid` (pull + cached, deduplicated). Default is `hybrid`. +- **`workspace.diagnostics_mode` config option** — selects how `get_diagnostics` sources its results: `pull` (LSP `textDocument/diagnostic` only — misses rust-analyzer's flycheck output), `cached` (read the `publishDiagnostics` cache), or `hybrid` (pull + cached, deduplicated). Default is `hybrid`. Hybrid dedup keys on `(range, severity, code)` when a code is present and falls back to a path-qualifier-stripped message otherwise, so rust-analyzer's `DocItem` vs `types::DocItem` style duplicates collapse without dropping legitimately-distinct errors that share a span. ### Changed diff --git a/crates/mcpls-core/src/bridge/translator.rs b/crates/mcpls-core/src/bridge/translator.rs index 83ef1c7..531dbbd 100644 --- a/crates/mcpls-core/src/bridge/translator.rs +++ b/crates/mcpls-core/src/bridge/translator.rs @@ -1813,26 +1813,134 @@ fn cached_diagnostics(cache: &NotificationCache, uri: &lsp_types::Uri) -> Vec, cached: Vec) -> Vec { - let mut seen: std::collections::HashSet<(Range, String, Option)> = + let mut seen: std::collections::HashSet = std::collections::HashSet::with_capacity(pull.len() + cached.len()); let mut out: Vec = Vec::with_capacity(pull.len() + cached.len()); for d in pull.into_iter().chain(cached) { - let key = (d.range.clone(), d.message.clone(), d.code.clone()); - if seen.insert(key) { + if seen.insert(DiagnosticDedupKey::from(&d)) { out.push(d); } } out } +/// Dedup key used by [`merge_diagnostics`]. +/// +/// `code` carries the strongest dedup signal (E0599 is E0599 regardless of how +/// the offending identifier was rendered), so when present we key on +/// `(range, severity, code)` only and ignore the message entirely. When +/// `code` is `None` we fall back to a path-qualifier-stripped message so that +/// `expected DocItem` and `expected types::DocItem` collapse together. +#[derive(Debug, Hash, PartialEq, Eq)] +struct DiagnosticDedupKey { + range: Range, + severity: DiagnosticSeverityKey, + discriminator: DedupDiscriminator, +} + +#[derive(Debug, Hash, PartialEq, Eq)] +enum DedupDiscriminator { + /// Code is present — fold the message away entirely. + Code(String), + /// No code — fall back to path-stripped message so qualifier-only + /// differences still merge. + NormalizedMessage(String), +} + +#[derive(Debug, Hash, PartialEq, Eq)] +enum DiagnosticSeverityKey { + Error, + Warning, + Information, + Hint, +} + +impl From<&DiagnosticSeverity> for DiagnosticSeverityKey { + fn from(s: &DiagnosticSeverity) -> Self { + match s { + DiagnosticSeverity::Error => Self::Error, + DiagnosticSeverity::Warning => Self::Warning, + DiagnosticSeverity::Information => Self::Information, + DiagnosticSeverity::Hint => Self::Hint, + } + } +} + +impl From<&Diagnostic> for DiagnosticDedupKey { + fn from(d: &Diagnostic) -> Self { + let discriminator = d.code.as_ref().map_or_else( + || DedupDiscriminator::NormalizedMessage(normalize_message_for_dedup(&d.message)), + |code| DedupDiscriminator::Code(code.clone()), + ); + Self { + range: d.range.clone(), + severity: DiagnosticSeverityKey::from(&d.severity), + discriminator, + } + } +} + +/// Strip Rust path qualifiers from each `::`-separated identifier in `s`, +/// keeping only the final segment. `crate::foo::Bar` → `Bar`, +/// `expected types::DocItem, found DocItem` → `expected DocItem, found DocItem`. +/// +/// Folds away the qualifier-only differences that produce duplicate +/// diagnostics in `hybrid` mode, while leaving non-Rust text (whitespace, +/// punctuation, non-ASCII characters in error messages) untouched. +fn normalize_message_for_dedup(s: &str) -> String { + let bytes = s.as_bytes(); + let mut out = String::with_capacity(s.len()); + let mut i = 0; + while i < bytes.len() { + if is_ident_byte(bytes[i]) { + // Identifiers and `::`-paths are pure ASCII, so byte-stepping is + // safe inside this branch. + let start = i; + while i < bytes.len() && is_ident_byte(bytes[i]) { + i += 1; + } + let mut segment_start = start; + while i + 2 <= bytes.len() && &bytes[i..i + 2] == b"::" { + let after_colons = i + 2; + let mut j = after_colons; + while j < bytes.len() && is_ident_byte(bytes[j]) { + j += 1; + } + if j == after_colons { + break; + } + segment_start = after_colons; + i = j; + } + out.push_str(&s[segment_start..i]); + } else { + // Outside identifiers we have to step by `char` to keep multi-byte + // UTF-8 sequences intact. Find the next char boundary. + let ch = s[i..].chars().next().unwrap_or('\0'); + out.push(ch); + i += ch.len_utf8(); + } + } + out +} + +const fn is_ident_byte(b: u8) -> bool { + b.is_ascii_alphanumeric() || b == b'_' +} + /// Translate an `lsp_types::Diagnostic` into our MCP-shaped `Diagnostic`. fn diagnostic_from_lsp(diag: lsp_types::Diagnostic) -> Diagnostic { Diagnostic { @@ -3416,4 +3524,164 @@ mod tests { assert_eq!(merged.len(), 1); assert_eq!(merged[0].message, "y"); } + + /// rust-analyzer's native and flycheck pipelines often emit the same + /// E0599 with different qualifications of the same identifier. Both + /// entries share `(range, severity, code)`, so when `code` is present + /// they collapse regardless of the message text. + #[test] + fn test_merge_diagnostics_dedupes_qualifier_only_difference_with_code() { + let range = Range { + start: Position2D { + line: 87, + character: 5, + }, + end: Position2D { + line: 87, + character: 13, + }, + }; + let pull = vec![Diagnostic { + range: range.clone(), + severity: DiagnosticSeverity::Error, + message: "no method named `foo` found for enum `DocItem`".to_string(), + code: Some("E0599".to_string()), + }]; + let cached = vec![Diagnostic { + range, + severity: DiagnosticSeverity::Error, + message: "no method named `foo` found for enum `types::DocItem`".to_string(), + code: Some("E0599".to_string()), + }]; + + let merged = merge_diagnostics(pull, cached); + assert_eq!(merged.len(), 1, "qualifier-only duplicate must collapse"); + // Pull entry wins — the unqualified form is kept. + assert!(merged[0].message.contains("`DocItem`")); + assert!(!merged[0].message.contains("types::DocItem")); + } + + /// Same span and severity but distinct codes: not the same diagnostic. + /// Both must survive the merge. + #[test] + fn test_merge_diagnostics_keeps_distinct_codes_at_same_span() { + let range = Range { + start: Position2D { + line: 1, + character: 1, + }, + end: Position2D { + line: 1, + character: 10, + }, + }; + let pull = vec![Diagnostic { + range: range.clone(), + severity: DiagnosticSeverity::Warning, + message: "unused".to_string(), + code: Some("dead_code".to_string()), + }]; + let cached = vec![Diagnostic { + range, + severity: DiagnosticSeverity::Warning, + message: "unused".to_string(), + code: Some("unused_imports".to_string()), + }]; + + let merged = merge_diagnostics(pull, cached); + assert_eq!(merged.len(), 2); + } + + /// When `code` is `None`, fall back to a path-stripped message comparison + /// so qualifier-only duplicates still merge. + #[test] + fn test_merge_diagnostics_dedupes_qualifier_only_difference_without_code() { + let range = Range { + start: Position2D { + line: 1, + character: 1, + }, + end: Position2D { + line: 1, + character: 5, + }, + }; + let pull = vec![Diagnostic { + range: range.clone(), + severity: DiagnosticSeverity::Error, + message: "expected DocItem".to_string(), + code: None, + }]; + let cached = vec![Diagnostic { + range, + severity: DiagnosticSeverity::Error, + message: "expected types::DocItem".to_string(), + code: None, + }]; + + let merged = merge_diagnostics(pull, cached); + assert_eq!(merged.len(), 1); + assert_eq!(merged[0].message, "expected DocItem"); + } + + /// When `code` is `None` and messages differ in non-qualifier ways, the + /// entries must NOT merge. + #[test] + fn test_merge_diagnostics_keeps_genuinely_different_messages_without_code() { + let range = Range { + start: Position2D { + line: 1, + character: 1, + }, + end: Position2D { + line: 1, + character: 5, + }, + }; + let pull = vec![Diagnostic { + range: range.clone(), + severity: DiagnosticSeverity::Error, + message: "expected DocItem".to_string(), + code: None, + }]; + let cached = vec![Diagnostic { + range, + severity: DiagnosticSeverity::Error, + message: "expected SomethingElse".to_string(), + code: None, + }]; + + let merged = merge_diagnostics(pull, cached); + assert_eq!(merged.len(), 2); + } + + #[test] + fn test_normalize_message_for_dedup_strips_paths() { + assert_eq!( + normalize_message_for_dedup("expected types::DocItem, found DocItem"), + "expected DocItem, found DocItem" + ); + assert_eq!(normalize_message_for_dedup("crate::foo::Bar"), "Bar"); + assert_eq!(normalize_message_for_dedup("a::b::c::d"), "d"); + } + + #[test] + fn test_normalize_message_for_dedup_preserves_punctuation_and_unicode() { + assert_eq!( + normalize_message_for_dedup("expected `i32`, found `&str` — oops"), + "expected `i32`, found `&str` — oops" + ); + } + + #[test] + fn test_normalize_message_for_dedup_handles_lone_double_colon() { + // `::` with no following identifier should be left alone. + assert_eq!(normalize_message_for_dedup("foo:: bar"), "foo:: bar"); + } + + #[test] + fn test_normalize_message_for_dedup_idempotent_on_simple_text() { + let s = "no method named `foo` found"; + assert_eq!(normalize_message_for_dedup(s), s); + } } From 45cdd26b2fc340dad288a651654d8d46a4d635fe Mon Sep 17 00:00:00 2001 From: Luke Jones Date: Fri, 8 May 2026 10:07:37 +1200 Subject: [PATCH 5/5] fix(review): address PR #103 review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Owner findings (bug-ops): - bridge/state: close TOCTOU + coarse-mtime resync hole by re-stat'ing after read so the recorded signature describes the bytes actually loaded into the LSP server. - bridge/state: reset version to 1 on i32::MAX wraparound so resync doesn't permanently emit duplicate didOpen versions that rust-analyzer silently discards. - bridge/state: send didOpen before mutating tracker state, so a failed notify leaves SyncSignature::UNKNOWN behind and the next ensure_open retries instead of short-circuiting on a fake match. - file_watcher: anchor bare LSP glob patterns with `**/` because globset matches full paths; bare `*.rs` would never match `/repo/src/lib.rs`. - lib: extract NotificationCache into its own Arc, decoupling the publishDiagnostics pump from the translator lock so handle_diagnostics' 30 s pull no longer head-of-line blocks the push channel that the pull is itself waiting on. Copilot findings: - file_watcher: try_send into the raw notify mpsc; the std mpsc send() blocks when full and would stall notify's delivery thread under heavy churn. - file_watcher: spawn errors when no workspace root could be watched (was: only logged); single-root failures still warn-and-continue. - file_watcher: per-FileSystemWatcher GlobBucket replaces the combined bitmask, so per-glob WatchKind filters from the LSP spec are honored. - lifecycle: parse_params returns -32602 InvalidParams instead of the -32601 MethodNotFound it was incorrectly using for params decode failures. - lifecycle: notification-receiver doc no longer claims "silently dropped" — the channel buffers up to capacity then warns on overflow. - translator/examples: hybrid dedup docs corrected to (range, severity, code) when code present, normalized message otherwise. - deny.toml: allow CC0-1.0 for the `notify` 8.x dependency so cargo-deny check licenses passes. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/mcpls-core/src/bridge/state.rs | 36 ++- crates/mcpls-core/src/bridge/translator.rs | 89 +++++-- crates/mcpls-core/src/lib.rs | 68 +++--- crates/mcpls-core/src/lsp/file_watcher.rs | 217 +++++++++++++----- crates/mcpls-core/src/lsp/lifecycle.rs | 11 +- crates/mcpls-core/src/mcp/server.rs | 2 +- .../tests/integration/rust_analyzer_tests.rs | 20 +- deny.toml | 1 + examples/mcpls.toml | 4 +- 9 files changed, 313 insertions(+), 135 deletions(-) diff --git a/crates/mcpls-core/src/bridge/state.rs b/crates/mcpls-core/src/bridge/state.rs index 71a161e..bbbcc7c 100644 --- a/crates/mcpls-core/src/bridge/state.rs +++ b/crates/mcpls-core/src/bridge/state.rs @@ -235,20 +235,26 @@ impl DocumentTracker { /// - The `didClose`/`didOpen` notification fails to send /// - Resource limits are exceeded pub async fn ensure_open(&mut self, path: &Path, lsp_client: &LspClient) -> Result { - let signature = stat_signature(path).await?; + let signature_before = stat_signature(path).await?; if let Some(state) = self.documents.get(path) - && state.synced_signature == signature + && state.synced_signature == signature_before { return Ok(state.uri.clone()); } + // Read then re-stat so the signature describes the bytes actually read. + // This closes a TOCTOU window where the file is replaced between the + // pre-read stat and the read itself, and also catches same-size + // rewrites within a coarse-mtime tick: stat→read→stat will see at + // least the post-read mtime advance once we yield back from the read. let content = tokio::fs::read_to_string(path) .await .map_err(|e| Error::FileIo { path: path.to_path_buf(), source: e, })?; + let signature = stat_signature(path).await?; if let Some(existing) = self.documents.get(path) { let close_params = DidCloseTextDocumentParams { @@ -259,32 +265,40 @@ impl DocumentTracker { lsp_client .notify("textDocument/didClose", close_params) .await?; - // Bump the version on resync so the server sees the reopened - // document as a strictly newer state. - let new_version = existing.version.saturating_add(1); + // After didClose+didOpen the server treats this as a fresh open, + // so resetting on i32::MAX is safe and avoids permanently sending + // duplicate version numbers (which rust-analyzer silently drops). + let new_version = if existing.version == i32::MAX { + 1 + } else { + existing.version + 1 + }; let language_id = existing.language_id.clone(); let uri = existing.uri.clone(); + // Send didOpen first; only commit tracker state on success so a + // failed notify leaves the next call to retry rather than + // short-circuiting on a matching but unsent signature. + send_did_open(lsp_client, &uri, &language_id, new_version, content.clone()).await?; if let Some(state) = self.documents.get_mut(path) { state.version = new_version; - state.content.clone_from(&content); + state.content = content; state.synced_signature = signature; } - send_did_open(lsp_client, &uri, &language_id, new_version, content).await?; return Ok(uri); } let uri = self.open(path.to_path_buf(), content.clone())?; - // Record the signature now that the document is tracked; if the file - // is replaced before the next access, the next ensure_open will see a - // mismatch and re-sync. - self.set_synced_signature(path, signature); let language_id = self .documents .get(path) .ok_or_else(|| Error::DocumentNotFound(path.to_path_buf()))? .language_id .clone(); + // Send didOpen before recording the signature. On failure the + // document stays in the tracker with `SyncSignature::UNKNOWN`, so the + // next ensure_open observes a mismatch and retries the sync. send_did_open(lsp_client, &uri, &language_id, 1, content).await?; + self.set_synced_signature(path, signature); Ok(uri) } } diff --git a/crates/mcpls-core/src/bridge/translator.rs b/crates/mcpls-core/src/bridge/translator.rs index 531dbbd..5292847 100644 --- a/crates/mcpls-core/src/bridge/translator.rs +++ b/crates/mcpls-core/src/bridge/translator.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, MutexGuard}; use lsp_types::{ CallHierarchyIncomingCall, CallHierarchyIncomingCallsParams, CallHierarchyItem, @@ -35,7 +36,13 @@ pub struct Translator { /// Document state tracker. document_tracker: DocumentTracker, /// Notification cache for LSP server notifications. - notification_cache: NotificationCache, + /// + /// Held behind its own `Arc` so that the notification pump task can + /// write `publishDiagnostics` etc. without contending with the translator + /// lock — `handle_diagnostics` keeps the translator locked across a 30 s + /// pull, and the pump must remain free to drain the channel during that + /// window or the LSP transport back-pressures and the pull deadlocks. + notification_cache: Arc>, /// Allowed workspace roots for path validation. workspace_roots: Vec, /// Custom file extension to language ID mappings. @@ -52,7 +59,7 @@ impl Translator { lsp_clients: HashMap::new(), lsp_servers: HashMap::new(), document_tracker: DocumentTracker::new(ResourceLimits::default(), HashMap::new()), - notification_cache: NotificationCache::new(), + notification_cache: Arc::new(Mutex::new(NotificationCache::new())), workspace_roots: vec![], extension_map: HashMap::new(), diagnostics_mode: DiagnosticsMode::default(), @@ -102,15 +109,27 @@ impl Translator { &mut self.document_tracker } - /// Get the notification cache. - #[must_use] - pub const fn notification_cache(&self) -> &NotificationCache { - &self.notification_cache + /// Acquire a read/write guard on the notification cache. + /// + /// Holds an internal `Arc`; callers should drop the guard quickly + /// because the notification pump and other request handlers compete for + /// the same lock. Never await while holding the guard. + /// + /// # Panics + /// + /// Panics if the cache mutex is poisoned (i.e. another thread panicked + /// while holding the lock). Process-wide poisoning is unrecoverable here. + pub fn notification_cache_mut(&self) -> MutexGuard<'_, NotificationCache> { + #[allow(clippy::expect_used)] + self.notification_cache + .lock() + .expect("notification cache mutex poisoned") } - /// Get a mutable reference to the notification cache. - pub const fn notification_cache_mut(&mut self) -> &mut NotificationCache { - &mut self.notification_cache + /// Cheap clone of the cache handle for use by the notification pump. + #[must_use] + pub fn notification_cache_handle(&self) -> Arc> { + Arc::clone(&self.notification_cache) } // TODO: These methods will be implemented in Phase 3-5 @@ -757,11 +776,16 @@ impl Translator { /// `publishDiagnostics`. Cheap; empty for files the LSP server has not /// analysed. /// - [`DiagnosticsMode::Hybrid`] does pull + cached, deduplicating by - /// `(range, message, code)`. + /// `(range, severity, code)` when a code is present, and otherwise by + /// `(range, severity, normalized message)`. /// /// # Errors /// /// Returns an error if the LSP request fails or the file cannot be opened. + /// + /// # Panics + /// + /// Panics if the notification cache mutex is poisoned. pub async fn handle_diagnostics(&mut self, file_path: String) -> Result { let path = PathBuf::from(&file_path); let validated_path = self.validate_path(&path)?; @@ -780,7 +804,12 @@ impl Translator { }; let cached = if matches!(mode, DiagnosticsMode::Cached | DiagnosticsMode::Hybrid) { - cached_diagnostics(&self.notification_cache, &uri) + #[allow(clippy::expect_used)] + let guard = self + .notification_cache + .lock() + .expect("notification cache mutex poisoned"); + cached_diagnostics(&guard, &uri) } else { Vec::new() }; @@ -1407,6 +1436,10 @@ impl Translator { /// # Errors /// /// Returns an error if the path is invalid or outside workspace boundaries. + /// + /// # Panics + /// + /// Panics if the notification cache mutex is poisoned. pub fn handle_cached_diagnostics(&mut self, file_path: &str) -> Result { let path = PathBuf::from(file_path); let validated_path = self.validate_path(&path)?; @@ -1415,8 +1448,13 @@ impl Translator { // rust-analyzer stores in publishDiagnostics notifications. let uri = path_to_uri(&validated_path); + #[allow(clippy::expect_used)] + let guard = self + .notification_cache + .lock() + .expect("notification cache mutex poisoned"); Ok(DiagnosticsResult { - diagnostics: cached_diagnostics(&self.notification_cache, &uri), + diagnostics: cached_diagnostics(&guard, &uri), }) } @@ -1425,6 +1463,10 @@ impl Translator { /// # Errors /// /// Returns an error if the `min_level` parameter is invalid. + /// + /// # Panics + /// + /// Panics if the notification cache mutex is poisoned. pub fn handle_server_logs( &mut self, limit: usize, @@ -1449,7 +1491,14 @@ impl Translator { None }; - let all_logs = self.notification_cache.get_logs(); + let all_logs: Vec<_> = { + #[allow(clippy::expect_used)] + let guard = self + .notification_cache + .lock() + .expect("notification cache mutex poisoned"); + guard.get_logs().iter().cloned().collect() + }; let logs: Vec<_> = all_logs .iter() @@ -1473,9 +1522,19 @@ impl Translator { /// # Errors /// /// This method does not return errors. + /// + /// # Panics + /// + /// Panics if the notification cache mutex is poisoned. pub fn handle_server_messages(&mut self, limit: usize) -> Result { - let all_messages = self.notification_cache.get_messages(); - let messages: Vec<_> = all_messages.iter().take(limit).cloned().collect(); + let messages: Vec<_> = { + #[allow(clippy::expect_used)] + let guard = self + .notification_cache + .lock() + .expect("notification cache mutex poisoned"); + guard.get_messages().iter().take(limit).cloned().collect() + }; Ok(ServerMessagesResult { messages }) } diff --git a/crates/mcpls-core/src/lib.rs b/crates/mcpls-core/src/lib.rs index 67736dd..4b96709 100644 --- a/crates/mcpls-core/src/lib.rs +++ b/crates/mcpls-core/src/lib.rs @@ -44,7 +44,7 @@ use std::path::PathBuf; use std::sync::Arc; use bridge::resources::make_uri; -use bridge::{ResourceSubscriptions, Translator}; +use bridge::{NotificationCache, ResourceSubscriptions, Translator}; pub use config::ServerConfig; pub use error::Error; use lsp::{LspNotification, LspServer, ServerInitConfig}; @@ -72,14 +72,15 @@ use transport::run_stdio; /// - The cancellation watch fires (or the sender is dropped). /// - `notify_resource_updated` returns an error (peer disconnect / transport closed). /// -/// # Note on lock contention (TODO critic-S4) -/// All cache writes acquire `Arc>`, which is the same lock used -/// by every MCP tool call. Splitting `NotificationCache` into its own `Arc` -/// would eliminate this contention. Tracked as a P2 follow-up. +/// Cache writes hold a dedicated `std::sync::Mutex` rather +/// than the broader `Arc>` used by every MCP tool call. The +/// pump must remain free to drain its channel even while `handle_diagnostics` +/// is parked on a 30 s pull — otherwise the LSP transport back-pressures and +/// the pull deadlocks waiting for a response that can no longer be delivered. pub(crate) async fn diagnostics_pump( _lang: String, mut rx: tokio::sync::mpsc::Receiver, - translator: Arc>, + cache: std::sync::Arc>, subs: Arc, peer_cell: Arc>>, mut cancel_rx: tokio::sync::watch::Receiver, @@ -99,9 +100,11 @@ pub(crate) async fn diagnostics_pump( LspNotification::PublishDiagnostics(p) => { // Always cache unconditionally. { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_diagnostics(&p.uri, p.version, p.diagnostics); + #[allow(clippy::expect_used)] + let mut guard = cache + .lock() + .expect("notification cache mutex poisoned"); + guard.store_diagnostics(&p.uri, p.version, p.diagnostics); } // Fast path: skip URI construction when nothing is subscribed. @@ -133,14 +136,18 @@ pub(crate) async fn diagnostics_pump( } } LspNotification::LogMessage(m) => { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_log(m.typ.into(), m.message); + #[allow(clippy::expect_used)] + let mut guard = cache + .lock() + .expect("notification cache mutex poisoned"); + guard.store_log(m.typ.into(), m.message); } LspNotification::ShowMessage(m) => { - let mut t = translator.lock().await; - t.notification_cache_mut() - .store_message(m.typ.into(), m.message); + #[allow(clippy::expect_used)] + let mut guard = cache + .lock() + .expect("notification cache mutex poisoned"); + guard.store_message(m.typ.into(), m.message); } LspNotification::Progress { .. } | LspNotification::Other { .. } => {} } @@ -346,6 +353,10 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() info!("Proceeding with {} LSP server(s)", server_count); } + // Capture a handle to the notification cache before moving the translator + // behind a mutex; pump tasks share this handle so they never need the + // translator lock at all (see `diagnostics_pump` for why that matters). + let cache_handle = translator.notification_cache_handle(); let translator = Arc::new(Mutex::new(translator)); let subscriptions = Arc::new(ResourceSubscriptions::new()); // Peer cell is populated after the MCP transport is established (Phase B). @@ -361,7 +372,7 @@ pub async fn serve_with(config: ServerConfig, transport: Transport) -> Result<() pumps.spawn(diagnostics_pump( lang, rx, - Arc::clone(&translator), + std::sync::Arc::clone(&cache_handle), Arc::clone(&subscriptions), Arc::clone(&peer_cell), cancel_rx.clone(), @@ -725,8 +736,8 @@ mod tests { use super::*; - fn make_translator() -> Arc> { - Arc::new(Mutex::new(Translator::new())) + fn make_cache() -> std::sync::Arc> { + std::sync::Arc::new(std::sync::Mutex::new(NotificationCache::new())) } fn make_subs() -> Arc { @@ -742,7 +753,7 @@ mod tests { /// `PublishDiagnostics` is cached even when the peer is not yet connected. #[tokio::test] async fn test_pump_caches_before_peer_set() { - let translator = make_translator(); + let cache = make_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (tx, rx) = mpsc::channel(8); @@ -750,11 +761,11 @@ mod tests { // which makes the pump exit before processing any messages. let (_cancel_tx, cancel_rx) = watch::channel(false); - let t = Arc::clone(&translator); + let cache_for_pump = std::sync::Arc::clone(&cache); tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - t, + cache_for_pump, Arc::clone(&subs), Arc::clone(&peer_cell), cancel_rx, @@ -777,11 +788,8 @@ mod tests { loop { tokio::task::yield_now().await; let found = { - let guard = translator.lock().await; - guard - .notification_cache() - .get_diagnostics(uri.as_str()) - .is_some() + let guard = cache.lock().expect("cache poisoned"); + guard.get_diagnostics(uri.as_str()).is_some() }; if found { return true; @@ -797,7 +805,7 @@ mod tests { /// Pump exits cleanly when the cancel watch sends `true`. #[tokio::test] async fn test_pump_exits_on_cancel() { - let translator = make_translator(); + let cache = make_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (_tx, rx) = mpsc::channel::(8); @@ -806,7 +814,7 @@ mod tests { let handle = tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - translator, + cache, subs, peer_cell, cancel_rx, @@ -823,7 +831,7 @@ mod tests { /// Pump exits when the cancel sender is dropped (Err branch). #[tokio::test] async fn test_pump_exits_when_cancel_sender_dropped() { - let translator = make_translator(); + let cache = make_cache(); let subs = make_subs(); let peer_cell = make_peer_cell(); let (_tx, rx) = mpsc::channel::(8); @@ -832,7 +840,7 @@ mod tests { let handle = tokio::spawn(diagnostics_pump( "rust".to_string(), rx, - translator, + cache, subs, peer_cell, cancel_rx, diff --git a/crates/mcpls-core/src/lsp/file_watcher.rs b/crates/mcpls-core/src/lsp/file_watcher.rs index 7fd18c2..ecfa864 100644 --- a/crates/mcpls-core/src/lsp/file_watcher.rs +++ b/crates/mcpls-core/src/lsp/file_watcher.rs @@ -25,6 +25,8 @@ use lsp_types::{ }; use notify::event::{CreateKind, ModifyKind, RemoveKind}; use notify::{EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use std::sync::mpsc::TrySendError; + use tokio::sync::{Mutex, mpsc}; use tokio::time::{Duration, Instant}; use tracing::{debug, trace, warn}; @@ -51,16 +53,23 @@ const RAW_EVENT_CHANNEL_CAPACITY: usize = 1024; /// burning CPU on `target/` rewrites etc. Match by exact component name. const NEVER_FORWARD_COMPONENTS: &[&str] = &[".git", "target", "node_modules", ".cache"]; -/// A single watcher registration. +/// One entry per `FileSystemWatcher` inside an LSP registration. Per-glob +/// `WatchKind` is preserved here so that a registration like +/// `[ {globs:[*.rs], kind:Change}, {globs:[Cargo.toml], kind:Create|Delete} ]` +/// matches each glob against only the kinds the server actually asked for. #[derive(Debug)] -struct WatcherRegistration { - /// The compiled glob set from the registration's `watchers` array. +struct GlobBucket { globs: GlobSet, - /// Bitfield of LSP watch kinds we should forward. Default is all three - /// (Create | Change | Delete = 7). kinds: WatchKind, } +/// A single watcher registration. May hold many [`GlobBucket`]s when the +/// `Registration` includes multiple `FileSystemWatcher`s with differing kinds. +#[derive(Debug)] +struct WatcherRegistration { + buckets: Vec, +} + /// Manages dynamic `workspace/didChangeWatchedFiles` registrations and a /// shared `notify` watcher. /// @@ -90,10 +99,12 @@ impl FileWatcher { /// /// # Errors /// - /// Returns an error if the underlying `notify` watcher cannot be created - /// or if any workspace root cannot be watched. Failure here should be - /// non-fatal at the caller (the `bridge` already covers per-file freshness - /// via stat-on-access); callers should log and continue. + /// Returns an error if the underlying `notify` watcher cannot be created, + /// or if `workspace_roots` is non-empty but every root failed to register + /// with `notify::watch`. Individual root failures are logged but do not + /// fail the spawn as long as at least one root is being watched. Failure + /// here should be non-fatal at the caller (the `bridge` already covers + /// per-file freshness via stat-on-access); callers should log and continue. pub fn spawn(workspace_roots: Vec, lsp_client: LspClient) -> Result { // Canonicalize roots so glob matching against canonical event paths // works even when the original path goes through symlinks (notably @@ -107,19 +118,35 @@ impl FileWatcher { let (raw_tx, raw_rx) = std::sync::mpsc::sync_channel(RAW_EVENT_CHANNEL_CAPACITY); let mut watcher = notify::recommended_watcher(move |event| { - // Notify uses a blocking std mpsc; drop on full to avoid blocking - // the OS notify thread. - if let Err(e) = raw_tx.send(event) { - warn!("file watcher: dropping event, channel closed: {e}"); + // Notify invokes this callback from its own delivery thread(s); + // never block here. Drop on full to avoid stalling filesystem + // event delivery under heavy churn (e.g. `target/` rebuilds). + match raw_tx.try_send(event) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + debug!("file watcher: dropping event, channel full"); + } + Err(TrySendError::Disconnected(_)) => { + warn!("file watcher: dropping event, channel closed"); + } } }) .map_err(|e| Error::Transport(format!("notify::recommended_watcher: {e}")))?; + let mut watched_root_count = 0usize; for root in &workspace_roots { - if let Err(e) = watcher.watch(root, RecursiveMode::Recursive) { - warn!("file watcher: failed to watch {}: {e}", root.display()); + match watcher.watch(root, RecursiveMode::Recursive) { + Ok(()) => watched_root_count += 1, + Err(e) => { + warn!("file watcher: failed to watch {}: {e}", root.display()); + } } } + if !workspace_roots.is_empty() && watched_root_count == 0 { + return Err(Error::Transport( + "file watcher: failed to watch any workspace root".to_string(), + )); + } let inner = Arc::new(Mutex::new(FileWatcherInner { workspace_roots, @@ -170,14 +197,15 @@ impl FileWatcher { guard.workspace_roots.clone() }; - let mut builder = GlobSetBuilder::new(); - let mut combined_kinds: WatchKind = WatchKind::empty(); - + let mut buckets: Vec = Vec::with_capacity(opts.watchers.len()); for fs_watcher in &opts.watchers { + let mut builder = GlobSetBuilder::new(); + let mut compiled = 0usize; for glob_str in resolve_pattern(&fs_watcher.glob_pattern, &workspace_roots) { match Glob::new(&glob_str) { Ok(glob) => { builder.add(glob); + compiled += 1; } Err(e) => { warn!( @@ -186,30 +214,27 @@ impl FileWatcher { } } } - combined_kinds |= fs_watcher + if compiled == 0 { + continue; + } + let globs = builder + .build() + .map_err(|e| Error::LspProtocolError(format!("globset build failed: {e}")))?; + let kinds = fs_watcher .kind .unwrap_or(WatchKind::Create | WatchKind::Change | WatchKind::Delete); + buckets.push(GlobBucket { globs, kinds }); } - let globs = builder - .build() - .map_err(|e| Error::LspProtocolError(format!("globset build failed: {e}")))?; - - let watcher_count = opts.watchers.len(); + let bucket_count = buckets.len(); { let mut guard = self.inner.lock().await; - guard.registrations.insert( - id.clone(), - WatcherRegistration { - globs, - kinds: combined_kinds, - }, - ); + guard + .registrations + .insert(id.clone(), WatcherRegistration { buckets }); } - debug!( - "file watcher: registered {id} ({watcher_count} watchers, kinds={combined_kinds:?})" - ); + debug!("file watcher: registered {id} ({bucket_count} buckets)"); Ok(()) } @@ -233,20 +258,28 @@ impl FileWatcher { /// Resolve an LSP glob pattern into one or more `globset`-compatible pattern /// strings. Relative patterns are anchored to their base URI by prepending -/// the absolute path; bare string patterns are accepted as-is and are -/// effectively matched anywhere under the workspace. +/// the absolute path. Bare string patterns are anchored to the entire workspace +/// with a `**/` prefix when they don't already start with one (or with `/`), +/// because `globset` matches the *full path* — a bare `*.rs` would never match +/// `/repo/src/lib.rs`, only filenames at the root. fn resolve_pattern(pattern: &GlobPattern, workspace_roots: &[PathBuf]) -> Vec { match pattern { - GlobPattern::String(s) => { - // Absolute patterns are used directly; bare patterns are anchored - // at every workspace root with `**/` already implicit in - // patterns like `**/*.rs`. - vec![s.clone()] - } + GlobPattern::String(s) => vec![anchor_bare_pattern(s)], GlobPattern::Relative(rel) => relative_pattern_to_globs(rel, workspace_roots), } } +/// Anchor a bare LSP glob with `**/` so it matches anywhere under the +/// workspace. Already-absolute patterns and patterns that already start with +/// `**/` are returned unchanged. +fn anchor_bare_pattern(pattern: &str) -> String { + if pattern.starts_with('/') || pattern.starts_with("**/") { + pattern.to_string() + } else { + format!("**/{pattern}") + } +} + /// Expand a `RelativePattern` into one absolute glob per matching workspace /// root. fn relative_pattern_to_globs(rel: &RelativePattern, workspace_roots: &[PathBuf]) -> Vec { @@ -425,9 +458,11 @@ fn compute_changes( ) -> Vec { let mut changes: Vec = Vec::new(); for (path, typ) in pending { - let matched = registrations - .values() - .any(|r| registration_accepts(r, typ) && r.globs.is_match(&path)); + let matched = registrations.values().any(|r| { + r.buckets + .iter() + .any(|b| bucket_accepts(b, typ) && b.globs.is_match(&path)) + }); if !matched { continue; } @@ -439,13 +474,13 @@ fn compute_changes( changes } -/// Whether the registration accepted change kind `typ`. The kind bitmask in -/// LSP defaults to all three types when unset. -fn registration_accepts(registration: &WatcherRegistration, typ: FileChangeType) -> bool { - let want = if registration.kinds.is_empty() { +/// Whether the bucket accepts change kind `typ`. The kind bitmask in LSP +/// defaults to all three types when unset. +fn bucket_accepts(bucket: &GlobBucket, typ: FileChangeType) -> bool { + let want = if bucket.kinds.is_empty() { WatchKind::Create | WatchKind::Change | WatchKind::Delete } else { - registration.kinds + bucket.kinds }; match typ { FileChangeType::CREATED => want.contains(WatchKind::Create), @@ -533,24 +568,86 @@ mod tests { } #[test] - fn test_registration_accepts_default_kind() { - let reg = WatcherRegistration { + fn test_bucket_accepts_default_kind() { + let bucket = GlobBucket { globs: GlobSetBuilder::new().build().unwrap(), kinds: WatchKind::empty(), }; - assert!(registration_accepts(®, FileChangeType::CREATED)); - assert!(registration_accepts(®, FileChangeType::CHANGED)); - assert!(registration_accepts(®, FileChangeType::DELETED)); + assert!(bucket_accepts(&bucket, FileChangeType::CREATED)); + assert!(bucket_accepts(&bucket, FileChangeType::CHANGED)); + assert!(bucket_accepts(&bucket, FileChangeType::DELETED)); } #[test] - fn test_registration_accepts_explicit_kind() { - let reg = WatcherRegistration { + fn test_bucket_accepts_explicit_kind() { + let bucket = GlobBucket { globs: GlobSetBuilder::new().build().unwrap(), kinds: WatchKind::Change, }; - assert!(!registration_accepts(®, FileChangeType::CREATED)); - assert!(registration_accepts(®, FileChangeType::CHANGED)); - assert!(!registration_accepts(®, FileChangeType::DELETED)); + assert!(!bucket_accepts(&bucket, FileChangeType::CREATED)); + assert!(bucket_accepts(&bucket, FileChangeType::CHANGED)); + assert!(!bucket_accepts(&bucket, FileChangeType::DELETED)); + } + + #[test] + fn test_anchor_bare_pattern_anchors_extension_globs() { + assert_eq!(anchor_bare_pattern("*.rs"), "**/*.rs"); + assert_eq!(anchor_bare_pattern("Cargo.toml"), "**/Cargo.toml"); + } + + #[test] + fn test_anchor_bare_pattern_preserves_already_anchored() { + assert_eq!(anchor_bare_pattern("**/*.rs"), "**/*.rs"); + assert_eq!(anchor_bare_pattern("/repo/src/*.rs"), "/repo/src/*.rs"); + } + + #[test] + fn test_compute_changes_respects_per_bucket_kind() { + let mut create_only = GlobSetBuilder::new(); + create_only.add(Glob::new("**/Cargo.toml").unwrap()); + let mut change_only = GlobSetBuilder::new(); + change_only.add(Glob::new("**/*.rs").unwrap()); + + let mut regs = HashMap::new(); + regs.insert( + "1".to_string(), + WatcherRegistration { + buckets: vec![ + GlobBucket { + globs: create_only.build().unwrap(), + kinds: WatchKind::Create, + }, + GlobBucket { + globs: change_only.build().unwrap(), + kinds: WatchKind::Change, + }, + ], + }, + ); + + // .rs CHANGED → matches change_only bucket + let changes = compute_changes( + ®s, + vec![(PathBuf::from("/repo/src/lib.rs"), FileChangeType::CHANGED)], + ); + assert_eq!(changes.len(), 1); + + // .rs CREATED → globs match change_only but kinds reject; create_only + // kinds accept but globs don't. No match overall. + let changes = compute_changes( + ®s, + vec![(PathBuf::from("/repo/src/lib.rs"), FileChangeType::CREATED)], + ); + assert!( + changes.is_empty(), + "create on a *.rs file must not match a Change-only bucket" + ); + + // Cargo.toml CREATED → matches create_only bucket + let changes = compute_changes( + ®s, + vec![(PathBuf::from("/repo/Cargo.toml"), FileChangeType::CREATED)], + ); + assert_eq!(changes.len(), 1); } } diff --git a/crates/mcpls-core/src/lsp/lifecycle.rs b/crates/mcpls-core/src/lsp/lifecycle.rs index 044e8ee..c8a0c4b 100644 --- a/crates/mcpls-core/src/lsp/lifecycle.rs +++ b/crates/mcpls-core/src/lsp/lifecycle.rs @@ -32,6 +32,9 @@ use crate::lsp::types::{JsonRpcError, LspNotification}; /// JSON-RPC error code returned for server-to-client requests we do not handle. const METHOD_NOT_FOUND: i32 = -32601; +/// JSON-RPC error code returned when params for a known method fail to deserialize. +const INVALID_PARAMS: i32 = -32602; + /// Channel capacity for inbound server-to-client requests. const SERVER_REQUEST_CHANNEL_CAPACITY: usize = 32; @@ -684,7 +687,7 @@ fn parse_params( ) -> std::result::Result { let value = params.unwrap_or(serde_json::Value::Null); serde_json::from_value(value).map_err(|e| JsonRpcError { - code: METHOD_NOT_FOUND, + code: INVALID_PARAMS, message: format!("invalid params: {e}"), data: None, }) @@ -879,7 +882,6 @@ mod tests { file_watcher: None, request_dispatcher: None, child: mock_child, - }; assert_eq!(server.position_encoding(), PositionEncodingKind::UTF8); @@ -970,7 +972,6 @@ mod tests { file_watcher: None, request_dispatcher: None, child: mock_child1, - }; result.add_server("rust".to_string(), server1); @@ -1021,7 +1022,6 @@ mod tests { file_watcher: None, request_dispatcher: None, child: mock_child, - }; result.add_server("rust".to_string(), server); @@ -1086,7 +1086,6 @@ mod tests { file_watcher: None, request_dispatcher: None, child: mock_child, - }; result.add_server(config.language_id, server); @@ -1138,7 +1137,6 @@ mod tests { file_watcher: None, request_dispatcher: None, child: mock_child1, - }; result.add_server("rust".to_string(), server1); @@ -1179,7 +1177,6 @@ mod tests { file_watcher: None, request_dispatcher: None, child: mock_child2, - }; result.add_server("rust".to_string(), server2); diff --git a/crates/mcpls-core/src/mcp/server.rs b/crates/mcpls-core/src/mcp/server.rs index 51d2d65..dfb1e81 100644 --- a/crates/mcpls-core/src/mcp/server.rs +++ b/crates/mcpls-core/src/mcp/server.rs @@ -604,7 +604,7 @@ impl ServerHandler for McplsServer { let diagnostics = { let translator = self.context.translator.lock().await; translator - .notification_cache() + .notification_cache_mut() .get_diagnostics(lsp_uri.as_str()) .cloned() }; diff --git a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs index baaa2e0..9383186 100644 --- a/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs +++ b/crates/mcpls-core/tests/integration/rust_analyzer_tests.rs @@ -944,6 +944,7 @@ async fn test_ensure_open_resyncs_after_external_edit() { let server_init_config = ServerInitConfig { server_config: lsp_config, workspace_roots: vec![workspace_path.clone()], + notification_tx: None, initialization_options: None, }; @@ -1065,6 +1066,7 @@ async fn test_lsp_server_installs_watcher_registration() { let _server = LspServer::spawn(ServerInitConfig { server_config: lsp_config, workspace_roots: vec![workspace_path], + notification_tx: None, initialization_options: None, }) .await @@ -1125,6 +1127,7 @@ async fn test_notification_cache_populates_from_publish_diagnostics() { let mut server = LspServer::spawn(ServerInitConfig { server_config: lsp_config, workspace_roots: vec![workspace_path.clone()], + notification_tx: None, initialization_options: None, }) .await @@ -1132,9 +1135,7 @@ async fn test_notification_cache_populates_from_publish_diagnostics() { // Take the receiver and run a small pump that drains into the cache — // mirroring the production wiring from `mcpls_core::serve`. - let notification_rx = server - .take_notification_receiver() - .expect("notification rx present after spawn"); + let notification_rx = server.take_notification_rx(); let extension_map = { let mut m = HashMap::new(); @@ -1148,23 +1149,22 @@ async fn test_notification_cache_populates_from_publish_diagnostics() { let translator = Arc::new(Mutex::new(translator)); { - let translator = Arc::clone(&translator); + let cache = translator.lock().await.notification_cache_handle(); let mut rx = notification_rx; tokio::spawn(async move { use mcpls_core::bridge::MessageType; use mcpls_core::lsp::LspNotification; while let Some(note) = rx.recv().await { - let mut guard = translator.lock().await; - let cache = guard.notification_cache_mut(); + let mut guard = cache.lock().expect("cache mutex poisoned"); match note { LspNotification::PublishDiagnostics(p) => { - cache.store_diagnostics(&p.uri, p.version, p.diagnostics); + guard.store_diagnostics(&p.uri, p.version, p.diagnostics); } - LspNotification::LogMessage(p) => cache.store_log(p.typ.into(), p.message), + LspNotification::LogMessage(p) => guard.store_log(p.typ.into(), p.message), LspNotification::ShowMessage(p) => { - cache.store_message(MessageType::from(p.typ), p.message); + guard.store_message(MessageType::from(p.typ), p.message); } - LspNotification::Other { .. } => {} + LspNotification::Progress { .. } | LspNotification::Other { .. } => {} } drop(guard); } diff --git a/deny.toml b/deny.toml index 64f790c..439efca 100644 --- a/deny.toml +++ b/deny.toml @@ -22,6 +22,7 @@ allow = [ "MPL-2.0", "Unicode-3.0", "Unicode-DFS-2016", + "CC0-1.0", ] confidence-threshold = 0.8 diff --git a/examples/mcpls.toml b/examples/mcpls.toml index 5cf7ace..0317231 100644 --- a/examples/mcpls.toml +++ b/examples/mcpls.toml @@ -17,7 +17,9 @@ position_encodings = ["utf-8", "utf-16"] # (e.g. rust-analyzer's flycheck/cargo-check output). # "cached" — only the cache populated from `publishDiagnostics`. Cheap # but empty for files the LSP server has not analysed yet. -# "hybrid" — pull + cached, deduplicating on (range, message, code). +# "hybrid" — pull + cached, deduplicating on (range, severity, code) +# when `code` is present, otherwise on (range, severity, +# normalized message). # diagnostics_mode = "hybrid" # Language extension mappings (optional)