diff --git a/.github/workflows/daily-observability-report.md b/.github/workflows/daily-observability-report.md index 8f3631b3b0..8cd7806b30 100644 --- a/.github/workflows/daily-observability-report.md +++ b/.github/workflows/daily-observability-report.md @@ -192,16 +192,24 @@ For each firewall-enabled workflow run, check: ## Phase 3: Analyze MCP Gateway Logs -The MCP Gateway logs tool execution in `gateway.jsonl` format. +The MCP Gateway logs tool execution. Two log formats may be present depending on engine version: -### Key Log File: gateway.jsonl +- **`gateway.jsonl`**: Structured gateway log with per-event metrics (preferred format) +- **`mcp-logs/rpc-messages.jsonl`**: Raw JSON-RPC message log written by the Copilot CLI (canonical fallback) -For each run that uses MCP servers, check: +### Key Log Files: gateway.jsonl or rpc-messages.jsonl -1. **gateway.jsonl existence**: Look for the file in run logs - - Path pattern: `/tmp/gh-aw/aw-mcp/logs/run-/gateway.jsonl` +For each run that uses MCP servers, check in this order: -2. **gateway.jsonl content quality**: +1. **gateway.jsonl existence** (preferred): Look for the file in run logs + - Path pattern: `/tmp/gh-aw/aw-mcp/logs/run-/mcp-logs/gateway.jsonl` + +2. **rpc-messages.jsonl existence** (canonical fallback): Check when gateway.jsonl is missing + - Path pattern: `/tmp/gh-aw/aw-mcp/logs/run-/mcp-logs/rpc-messages.jsonl` + - This file is written by the Copilot CLI and contains raw JSON-RPC protocol messages + - A run with this file present has MCP telemetry and should NOT be reported as Critical + +3. **gateway.jsonl content quality** (when present): - Are log entries valid JSONL format? - Do entries contain required fields: - `timestamp`: When the event occurred @@ -213,20 +221,33 @@ For each run that uses MCP servers, check: - `duration`: Execution time in milliseconds - `status`: Request status (success, error) -3. **Metrics coverage**: +4. **rpc-messages.jsonl content quality** (when used as fallback): + - Are entries valid JSONL format? + - Do entries contain required fields: + - `timestamp`: When the message was sent/received + - `direction`: "IN" (from server) or "OUT" (to server) + - `type`: "REQUEST" or "RESPONSE" + - `server_id`: MCP server identifier + - `payload`: JSON-RPC payload with `method`, `params`, `result`, or `error` + - Tool call count derived from outgoing `tools/call` requests + +5. **Metrics coverage** (from whichever log is available): - Tool call counts per server - Error rates - - Response times (min, max, avg) + - Response times (min, max, avg) — available in gateway.jsonl; computed from request/response pairing in rpc-messages.jsonl ### MCP Gateway Analysis Criteria | Status | Condition | |--------|-----------| | ✅ **Healthy** | gateway.jsonl present with proper JSONL entries and metrics | -| âš ī¸ **Warning** | gateway.jsonl present but missing key fields or has parse errors | -| 🔴 **Critical** | gateway.jsonl missing from MCP-enabled run | +| ✅ **Healthy** | rpc-messages.jsonl present (canonical fallback) with valid JSON-RPC entries | +| âš ī¸ **Warning** | gateway.jsonl or rpc-messages.jsonl present but missing key fields or has parse errors | +| 🔴 **Critical** | Neither gateway.jsonl nor rpc-messages.jsonl found in MCP-enabled run | | â„šī¸ **N/A** | No MCP servers configured for this workflow | +**Important**: When reporting MCP telemetry coverage, treat a run as having MCP telemetry if **either** `gateway.jsonl` **or** `rpc-messages.jsonl` is present. Only flag as Critical when both files are absent. + ## Phase 4: Analyze Additional Telemetry Check for other observability artifacts: @@ -261,7 +282,8 @@ firewall_logs_present = count_runs_with_access_log() firewall_coverage = (firewall_logs_present / firewall_enabled_workflows) * 100 if firewall_enabled_workflows > 0 else "N/A" mcp_enabled_workflows = count_runs_with_mcp() -gateway_logs_present = count_runs_with_gateway_jsonl() +# A run has MCP telemetry if gateway.jsonl OR rpc-messages.jsonl is present +gateway_logs_present = count_runs_with_gateway_jsonl_or_rpc_messages() gateway_coverage = (gateway_logs_present / mcp_enabled_workflows) * 100 if mcp_enabled_workflows > 0 else "N/A" # Calculate observability_coverage_percentage for overall health @@ -297,7 +319,7 @@ Follow the formatting guidelines above. Use the following structure: [Critical missing logs or observability gaps that need immediate attention. If none, state "No critical issues detected." Always visible.] 🔴 **Critical Issues:** -- [List any runs missing critical logs - access.log for firewall runs, gateway.jsonl for MCP runs] +- [List any runs missing critical logs - access.log for firewall runs, gateway.jsonl AND rpc-messages.jsonl both absent for MCP runs] âš ī¸ **Warnings:** - [List runs with incomplete or low-quality logs] @@ -307,7 +329,7 @@ Follow the formatting guidelines above. Use the following structure: | Component | Runs Analyzed | Logs Present | Coverage | Status | |-----------|--------------|--------------|----------|--------| | AWF Firewall (access.log) | X (`firewall_enabled_workflows`) | Y (`runs_with_complete_logs`) | Z% (`observability_coverage_percentage`) | ✅/âš ī¸/🔴 | -| MCP Gateway (gateway.jsonl) | X (`mcp_enabled_workflows`) | Y (`runs_with_complete_logs`) | Z% (`observability_coverage_percentage`) | ✅/âš ī¸/🔴 | +| MCP Gateway (gateway.jsonl or rpc-messages.jsonl) | X (`mcp_enabled_workflows`) | Y (`runs_with_complete_logs`) | Z% (`observability_coverage_percentage`) | ✅/âš ī¸/🔴 | [Always visible. Summary table showing high-level coverage metrics.] @@ -328,11 +350,11 @@ Follow the formatting guidelines above. Use the following structure: #### MCP-Enabled Runs -| Workflow | Run ID | gateway.jsonl | Entries | Servers | Tool Calls | Errors | Status | -|----------|--------|---------------|---------|---------|------------|--------|--------| -| ... | ... | ✅/❌ | N | N | N | N | ✅/âš ī¸/🔴 | +| Workflow | Run ID | Telemetry Source | Entries | Servers | Tool Calls | Errors | Status | +|----------|--------|-----------------|---------|---------|------------|--------|--------| +| ... | ... | gateway.jsonl / rpc-messages.jsonl / ❌ None | N | N | N | N | ✅/âš ī¸/🔴 | -#### Missing Gateway Logs (gateway.jsonl) +#### Missing MCP Telemetry (no gateway.jsonl or rpc-messages.jsonl) | Workflow | Run ID | Date | Link | |----------|--------|------|------| @@ -352,11 +374,12 @@ Follow the formatting guidelines above. Use the following structure: #### Gateway Log Quality -- Total gateway.jsonl entries analyzed: N +- Telemetry source: gateway.jsonl (preferred) or rpc-messages.jsonl (canonical fallback) +- Total entries analyzed: N - MCP servers used: server1, server2 - Total tool calls: N - Error rate: X% -- Average response time: Xms +- Average response time: Xms (N/A when derived from rpc-messages.jsonl without duration pairing) #### Healthy Runs Summary @@ -396,7 +419,7 @@ Follow the formatting guidelines above. Use the following structure: ### Severity Classification -- **CRITICAL**: Missing logs that would prevent debugging (access.log for firewall runs, gateway.jsonl for MCP runs) +- **CRITICAL**: Missing logs that would prevent debugging (access.log for firewall runs; **both** gateway.jsonl and rpc-messages.jsonl absent for MCP runs) - **WARNING**: Logs present but with quality issues (empty, missing fields, parse errors) - **HEALTHY**: Complete observability coverage with quality logs @@ -412,9 +435,9 @@ Follow the formatting guidelines above. Use the following structure: A successful run will: - ✅ Download and analyze logs from the past 7 days of workflow runs - ✅ Check all firewall-enabled runs for access.log presence -- ✅ Check all MCP-enabled runs for gateway.jsonl presence +- ✅ Check all MCP-enabled runs for gateway.jsonl **or** rpc-messages.jsonl presence - ✅ Calculate coverage percentages and identify gaps -- ✅ Flag any runs missing critical logs as CRITICAL +- ✅ Flag any runs missing **all** MCP telemetry (neither gateway.jsonl nor rpc-messages.jsonl) as CRITICAL - ✅ Create a new discussion with comprehensive report (previous discussions automatically closed) - ✅ Include actionable recommendations diff --git a/pkg/cli/gateway_logs.go b/pkg/cli/gateway_logs.go index 1e3f5bee26..49d88adc8d 100644 --- a/pkg/cli/gateway_logs.go +++ b/pkg/cli/gateway_logs.go @@ -1,9 +1,10 @@ // This file provides command-line interface functionality for gh-aw. // This file (gateway_logs.go) contains functions for parsing and analyzing -// MCP gateway logs from gateway.jsonl files. +// MCP gateway logs from gateway.jsonl or rpc-messages.jsonl files. // // Key responsibilities: -// - Parsing gateway.jsonl JSONL format logs +// - Parsing gateway.jsonl JSONL format logs (preferred) +// - Parsing rpc-messages.jsonl JSONL format logs (canonical fallback) // - Extracting server and tool usage metrics // - Aggregating gateway statistics // - Rendering gateway metrics tables @@ -29,6 +30,9 @@ import ( var gatewayLogsLog = logger.New("cli:gateway_logs") +// maxScannerBufferSize is the maximum scanner buffer for large JSONL payloads (1 MB). +const maxScannerBufferSize = 1024 * 1024 + // GatewayLogEntry represents a single log entry from gateway.jsonl type GatewayLogEntry struct { Timestamp string `json:"timestamp"` @@ -80,7 +84,218 @@ type GatewayMetrics struct { TotalDuration float64 // in milliseconds } -// parseGatewayLogs parses a gateway.jsonl file and extracts metrics +// RPCMessageEntry represents a single entry from rpc-messages.jsonl. +// This file is written by the Copilot CLI and contains raw JSON-RPC protocol messages +// exchanged between the AI engine and MCP servers. +type RPCMessageEntry struct { + Timestamp string `json:"timestamp"` + Direction string `json:"direction"` // "IN" = received from server, "OUT" = sent to server + Type string `json:"type"` // "REQUEST" or "RESPONSE" + ServerID string `json:"server_id"` + Payload json.RawMessage `json:"payload"` +} + +// rpcRequestPayload represents the JSON-RPC request payload fields we care about. +type rpcRequestPayload struct { + Method string `json:"method"` + ID any `json:"id"` + Params json.RawMessage `json:"params"` +} + +// rpcToolCallParams represents the params for a tools/call request. +type rpcToolCallParams struct { + Name string `json:"name"` +} + +// rpcResponsePayload represents the JSON-RPC response payload fields we care about. +type rpcResponsePayload struct { + ID any `json:"id"` + Error *rpcError `json:"error,omitempty"` +} + +// rpcError represents a JSON-RPC error object. +type rpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// rpcPendingRequest tracks an in-flight tool call for duration calculation. +type rpcPendingRequest struct { + ServerID string + ToolName string + Timestamp time.Time +} + +// parseRPCMessages parses a rpc-messages.jsonl file and extracts GatewayMetrics. +// This is the canonical fallback when gateway.jsonl is not available. +func parseRPCMessages(logPath string, verbose bool) (*GatewayMetrics, error) { + gatewayLogsLog.Printf("Parsing rpc-messages.jsonl from: %s", logPath) + + file, err := os.Open(logPath) + if err != nil { + return nil, fmt.Errorf("failed to open rpc-messages.jsonl: %w", err) + } + defer file.Close() + + metrics := &GatewayMetrics{ + Servers: make(map[string]*GatewayServerMetrics), + } + + // Track pending requests by (serverID, id) for duration calculation. + // Key format: "/" + pendingRequests := make(map[string]*rpcPendingRequest) + + scanner := bufio.NewScanner(file) + // Increase scanner buffer for large payloads + buf := make([]byte, maxScannerBufferSize) + scanner.Buffer(buf, maxScannerBufferSize) + lineNum := 0 + + for scanner.Scan() { + lineNum++ + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + var entry RPCMessageEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + gatewayLogsLog.Printf("Failed to parse rpc-messages.jsonl line %d: %v", lineNum, err) + if verbose { + fmt.Fprintln(os.Stderr, console.FormatWarningMessage( + fmt.Sprintf("Failed to parse rpc-messages.jsonl line %d: %v", lineNum, err))) + } + continue + } + + // Update time range + if entry.Timestamp != "" { + if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { + if metrics.StartTime.IsZero() || t.Before(metrics.StartTime) { + metrics.StartTime = t + } + if metrics.EndTime.IsZero() || t.After(metrics.EndTime) { + metrics.EndTime = t + } + } + } + + if entry.ServerID == "" { + continue + } + + switch { + case entry.Direction == "OUT" && entry.Type == "REQUEST": + // Outgoing request from AI engine to MCP server + var req rpcRequestPayload + if err := json.Unmarshal(entry.Payload, &req); err != nil { + continue + } + if req.Method != "tools/call" { + continue + } + + // Extract tool name + var params rpcToolCallParams + if err := json.Unmarshal(req.Params, ¶ms); err != nil || params.Name == "" { + continue + } + + metrics.TotalRequests++ + server := getOrCreateServer(metrics, entry.ServerID) + server.RequestCount++ + metrics.TotalToolCalls++ + server.ToolCallCount++ + + tool := getOrCreateTool(server, params.Name) + tool.CallCount++ + + // Store pending request for duration calculation + if req.ID != nil && entry.Timestamp != "" { + if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { + key := fmt.Sprintf("%s/%v", entry.ServerID, req.ID) + pendingRequests[key] = &rpcPendingRequest{ + ServerID: entry.ServerID, + ToolName: params.Name, + Timestamp: t, + } + } + } + + case entry.Direction == "IN" && entry.Type == "RESPONSE": + // Incoming response from MCP server to AI engine + var resp rpcResponsePayload + if err := json.Unmarshal(entry.Payload, &resp); err != nil { + continue + } + + // Track errors + if resp.Error != nil { + metrics.TotalErrors++ + server := getOrCreateServer(metrics, entry.ServerID) + server.ErrorCount++ + } + + // Calculate duration by matching with pending request + if resp.ID != nil && entry.Timestamp != "" { + key := fmt.Sprintf("%s/%v", entry.ServerID, resp.ID) + if pending, ok := pendingRequests[key]; ok { + delete(pendingRequests, key) + if t, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil { + durationMs := float64(t.Sub(pending.Timestamp).Milliseconds()) + if durationMs >= 0 { + server := getOrCreateServer(metrics, entry.ServerID) + server.TotalDuration += durationMs + metrics.TotalDuration += durationMs + + tool := getOrCreateTool(server, pending.ToolName) + tool.TotalDuration += durationMs + if tool.MaxDuration == 0 || durationMs > tool.MaxDuration { + tool.MaxDuration = durationMs + } + if tool.MinDuration == 0 || durationMs < tool.MinDuration { + tool.MinDuration = durationMs + } + + if resp.Error != nil { + tool.ErrorCount++ + } + } + } + } + } + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading rpc-messages.jsonl: %w", err) + } + + calculateGatewayAggregates(metrics) + + gatewayLogsLog.Printf("Successfully parsed rpc-messages.jsonl: %d servers, %d total requests", + len(metrics.Servers), metrics.TotalRequests) + + return metrics, nil +} + +// findRPCMessagesPath returns the path to rpc-messages.jsonl if it exists, or "" if not found. +func findRPCMessagesPath(logDir string) string { + // Check mcp-logs subdirectory (standard location) + mcpLogsPath := filepath.Join(logDir, "mcp-logs", "rpc-messages.jsonl") + if _, err := os.Stat(mcpLogsPath); err == nil { + return mcpLogsPath + } + // Check root directory as fallback + rootPath := filepath.Join(logDir, "rpc-messages.jsonl") + if _, err := os.Stat(rootPath); err == nil { + return rootPath + } + return "" +} + +// parseGatewayLogs parses a gateway.jsonl file and extracts metrics. +// Falls back to rpc-messages.jsonl (canonical fallback) when gateway.jsonl is not present. func parseGatewayLogs(logDir string, verbose bool) (*GatewayMetrics, error) { // Try root directory first (for older logs where gateway.jsonl was in the root) gatewayLogPath := filepath.Join(logDir, "gateway.jsonl") @@ -92,6 +307,12 @@ func parseGatewayLogs(logDir string, verbose bool) (*GatewayMetrics, error) { // /tmp/gh-aw/ is stripped during artifact upload, resulting in mcp-logs/gateway.jsonl after download mcpLogsPath := filepath.Join(logDir, "mcp-logs", "gateway.jsonl") if _, err := os.Stat(mcpLogsPath); os.IsNotExist(err) { + // Fall back to rpc-messages.jsonl (canonical fallback when gateway.jsonl is missing) + rpcPath := findRPCMessagesPath(logDir) + if rpcPath != "" { + gatewayLogsLog.Printf("gateway.jsonl not found; falling back to rpc-messages.jsonl: %s", rpcPath) + return parseRPCMessages(rpcPath, verbose) + } gatewayLogsLog.Printf("gateway.jsonl not found at: %s or %s", gatewayLogPath, mcpLogsPath) return nil, errors.New("gateway.jsonl not found") } @@ -376,12 +597,144 @@ func getSortedServerNames(metrics *GatewayMetrics) []string { return names } +// buildToolCallsFromRPCMessages reads rpc-messages.jsonl and builds MCPToolCall records. +// Duration is computed by pairing outgoing requests with incoming responses. +// Input/output sizes are not available in rpc-messages.jsonl and will be 0. +func buildToolCallsFromRPCMessages(logPath string) ([]MCPToolCall, error) { + file, err := os.Open(logPath) + if err != nil { + return nil, fmt.Errorf("failed to open rpc-messages.jsonl: %w", err) + } + defer file.Close() + + type pendingCall struct { + serverID string + toolName string + timestamp time.Time + } + pending := make(map[string]*pendingCall) // key: "/" + + // Collect requests first to pair with responses + type rawEntry struct { + entry RPCMessageEntry + req rpcRequestPayload + resp rpcResponsePayload + valid bool + } + var entries []rawEntry + + scanner := bufio.NewScanner(file) + buf := make([]byte, maxScannerBufferSize) + scanner.Buffer(buf, maxScannerBufferSize) + + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + var e RPCMessageEntry + if err := json.Unmarshal([]byte(line), &e); err != nil { + continue + } + entries = append(entries, rawEntry{entry: e, valid: true}) + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading rpc-messages.jsonl: %w", err) + } + + // First pass: index outgoing tool-call requests by (serverID, id) + for i := range entries { + e := &entries[i] + if e.entry.Direction != "OUT" || e.entry.Type != "REQUEST" { + continue + } + if err := json.Unmarshal(e.entry.Payload, &e.req); err != nil || e.req.Method != "tools/call" { + continue + } + var params rpcToolCallParams + if err := json.Unmarshal(e.req.Params, ¶ms); err != nil || params.Name == "" { + continue + } + if e.req.ID == nil { + continue + } + t, err := time.Parse(time.RFC3339Nano, e.entry.Timestamp) + if err != nil { + continue + } + key := fmt.Sprintf("%s/%v", e.entry.ServerID, e.req.ID) + pending[key] = &pendingCall{ + serverID: e.entry.ServerID, + toolName: params.Name, + timestamp: t, + } + } + + // Second pass: build MCPToolCall records + var toolCalls []MCPToolCall + processedKeys := make(map[string]bool) + + for i := range entries { + e := &entries[i] + switch { + case e.entry.Direction == "OUT" && e.entry.Type == "REQUEST": + // Outgoing tool-call request – we'll emit the record when we see the response + // (or after if no response found) + case e.entry.Direction == "IN" && e.entry.Type == "RESPONSE": + if err := json.Unmarshal(e.entry.Payload, &e.resp); err != nil { + continue + } + if e.resp.ID == nil { + continue + } + key := fmt.Sprintf("%s/%v", e.entry.ServerID, e.resp.ID) + p, ok := pending[key] + if !ok { + continue + } + processedKeys[key] = true + + call := MCPToolCall{ + Timestamp: p.timestamp.Format(time.RFC3339Nano), + ServerName: p.serverID, + ToolName: p.toolName, + Status: "success", + } + if e.resp.Error != nil { + call.Status = "error" + call.Error = e.resp.Error.Message + } + if t, err := time.Parse(time.RFC3339Nano, e.entry.Timestamp); err == nil { + d := t.Sub(p.timestamp) + if d >= 0 { + call.Duration = timeutil.FormatDuration(d) + } + } + toolCalls = append(toolCalls, call) + } + } + + // Emit any requests that never received a response + for key, p := range pending { + if !processedKeys[key] { + toolCalls = append(toolCalls, MCPToolCall{ + Timestamp: p.timestamp.Format(time.RFC3339Nano), + ServerName: p.serverID, + ToolName: p.toolName, + Status: "unknown", + }) + } + } + + return toolCalls, nil +} + // extractMCPToolUsageData creates detailed MCP tool usage data from gateway metrics func extractMCPToolUsageData(logDir string, verbose bool) (*MCPToolUsageData, error) { - // Parse gateway logs + // Parse gateway logs (falls back to rpc-messages.jsonl automatically) gatewayMetrics, err := parseGatewayLogs(logDir, verbose) if err != nil { - // Return nil if gateway.jsonl doesn't exist (not an error for workflows without MCP) + // Return nil if no log file exists (not an error for workflows without MCP) if strings.Contains(err.Error(), "not found") { return nil, nil } @@ -398,72 +751,87 @@ func extractMCPToolUsageData(logDir string, verbose bool) (*MCPToolUsageData, er Servers: []MCPServerStats{}, } - // Read gateway.jsonl again to get individual tool call records - // Try root directory first (for older logs where gateway.jsonl was in the root) + // Read the log file again to get individual tool call records. + // Prefer gateway.jsonl; fall back to rpc-messages.jsonl when not available. gatewayLogPath := filepath.Join(logDir, "gateway.jsonl") + usingRPCMessages := false - // Check if gateway.jsonl exists in root if _, err := os.Stat(gatewayLogPath); os.IsNotExist(err) { - // Try mcp-logs subdirectory (new path after artifact download) mcpLogsPath := filepath.Join(logDir, "mcp-logs", "gateway.jsonl") if _, err := os.Stat(mcpLogsPath); os.IsNotExist(err) { - return nil, errors.New("gateway.jsonl not found") + // Fall back to rpc-messages.jsonl + rpcPath := findRPCMessagesPath(logDir) + if rpcPath == "" { + return nil, errors.New("gateway.jsonl not found") + } + gatewayLogPath = rpcPath + usingRPCMessages = true + } else { + gatewayLogPath = mcpLogsPath } - gatewayLogPath = mcpLogsPath } - file, err := os.Open(gatewayLogPath) - if err != nil { - return nil, fmt.Errorf("failed to open gateway.jsonl: %w", err) - } - defer file.Close() - - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := strings.TrimSpace(scanner.Text()) - if line == "" { - continue + if usingRPCMessages { + // Build tool call records from rpc-messages.jsonl + toolCalls, err := buildToolCallsFromRPCMessages(gatewayLogPath) + if err != nil { + return nil, fmt.Errorf("failed to read rpc-messages.jsonl: %w", err) } - - var entry GatewayLogEntry - if err := json.Unmarshal([]byte(line), &entry); err != nil { - continue // Skip malformed lines + mcpData.ToolCalls = toolCalls + } else { + file, err := os.Open(gatewayLogPath) + if err != nil { + return nil, fmt.Errorf("failed to open gateway.jsonl: %w", err) } + defer file.Close() - // Only process tool call events - if entry.Event == "tool_call" || entry.Event == "rpc_call" || entry.Event == "request" { - toolName := entry.ToolName - if toolName == "" { - toolName = entry.Method - } - - // Skip entries without tool information - if entry.ServerName == "" || toolName == "" { + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { continue } - // Create individual tool call record - toolCall := MCPToolCall{ - Timestamp: entry.Timestamp, - ServerName: entry.ServerName, - ToolName: toolName, - Method: entry.Method, - InputSize: entry.InputSize, - OutputSize: entry.OutputSize, - Status: entry.Status, - Error: entry.Error, + var entry GatewayLogEntry + if err := json.Unmarshal([]byte(line), &entry); err != nil { + continue // Skip malformed lines } - if entry.Duration > 0 { - toolCall.Duration = timeutil.FormatDuration(time.Duration(entry.Duration * float64(time.Millisecond))) - } + // Only process tool call events + if entry.Event == "tool_call" || entry.Event == "rpc_call" || entry.Event == "request" { + toolName := entry.ToolName + if toolName == "" { + toolName = entry.Method + } + + // Skip entries without tool information + if entry.ServerName == "" || toolName == "" { + continue + } + + // Create individual tool call record + toolCall := MCPToolCall{ + Timestamp: entry.Timestamp, + ServerName: entry.ServerName, + ToolName: toolName, + Method: entry.Method, + InputSize: entry.InputSize, + OutputSize: entry.OutputSize, + Status: entry.Status, + Error: entry.Error, + } + + if entry.Duration > 0 { + toolCall.Duration = timeutil.FormatDuration(time.Duration(entry.Duration * float64(time.Millisecond))) + } - mcpData.ToolCalls = append(mcpData.ToolCalls, toolCall) + mcpData.ToolCalls = append(mcpData.ToolCalls, toolCall) + } } - } - if err := scanner.Err(); err != nil { - return nil, fmt.Errorf("error reading gateway.jsonl: %w", err) + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("error reading gateway.jsonl: %w", err) + } } // Build summary statistics from aggregated metrics diff --git a/pkg/cli/gateway_logs_test.go b/pkg/cli/gateway_logs_test.go index f2879c6c27..4b2f83aff1 100644 --- a/pkg/cli/gateway_logs_test.go +++ b/pkg/cli/gateway_logs_test.go @@ -486,3 +486,201 @@ func TestParseGatewayLogsFromMCPLogsSubdirectory(t *testing.T) { require.True(t, ok, "should have github server metrics") assert.Equal(t, 3, githubMetrics.RequestCount, "should have 3 total calls for github server") } + +func TestParseRPCMessages(t *testing.T) { + tests := []struct { + name string + logContent string + wantServers int + wantRequests int + wantToolCalls int + wantErrors int + wantErr bool + }{ + { + name: "valid rpc-messages with tool calls", + logContent: `{"timestamp":"2024-01-12T10:00:00.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_repository","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:00.150000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"result":{"content":[]}}} +{"timestamp":"2024-01-12T10:00:01.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"list_issues","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:01.250000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":2,"result":{"content":[]}}} +`, + wantServers: 1, + wantRequests: 2, + wantToolCalls: 2, + wantErrors: 0, + wantErr: false, + }, + { + name: "rpc-messages with error response", + logContent: `{"timestamp":"2024-01-12T10:00:00.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"get_repository","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:00.050000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"error":{"code":-32000,"message":"connection timeout"}}} +`, + wantServers: 1, + wantRequests: 1, + wantToolCalls: 1, + wantErrors: 1, + wantErr: false, + }, + { + name: "rpc-messages with multiple servers", + logContent: `{"timestamp":"2024-01-12T10:00:00.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_repos","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:00.100000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"result":{}}} +{"timestamp":"2024-01-12T10:00:01.000000000Z","direction":"OUT","type":"REQUEST","server_id":"playwright","payload":{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"navigate","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:01.500000000Z","direction":"IN","type":"RESPONSE","server_id":"playwright","payload":{"jsonrpc":"2.0","id":2,"result":{}}} +`, + wantServers: 2, + wantRequests: 2, + wantToolCalls: 2, + wantErrors: 0, + wantErr: false, + }, + { + name: "rpc-messages skips non-tools/call methods", + logContent: `{"timestamp":"2024-01-12T10:00:00.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"method":"tools/list","params":{}}} +{"timestamp":"2024-01-12T10:00:00.010000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"result":{"tools":[]}}} +{"timestamp":"2024-01-12T10:00:01.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"get_repository","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:01.150000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":2,"result":{}}} +`, + wantServers: 1, + wantRequests: 1, + wantToolCalls: 1, + wantErrors: 0, + wantErr: false, + }, + { + name: "empty file", + logContent: "", + wantServers: 0, + wantRequests: 0, + wantErrors: 0, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tmpDir := t.TempDir() + logPath := filepath.Join(tmpDir, "rpc-messages.jsonl") + err := os.WriteFile(logPath, []byte(tt.logContent), 0644) + require.NoError(t, err, "should write test rpc-messages.jsonl") + + metrics, err := parseRPCMessages(logPath, false) + + if tt.wantErr { + require.Error(t, err) + return + } + + require.NoError(t, err, "parseRPCMessages should not return error") + require.NotNil(t, metrics, "metrics should not be nil") + + assert.Len(t, metrics.Servers, tt.wantServers, "server count mismatch") + assert.Equal(t, tt.wantRequests, metrics.TotalRequests, "total requests mismatch") + assert.Equal(t, tt.wantToolCalls, metrics.TotalToolCalls, "total tool calls mismatch") + assert.Equal(t, tt.wantErrors, metrics.TotalErrors, "total errors mismatch") + }) + } +} + +func TestParseGatewayLogsFallsBackToRPCMessages(t *testing.T) { + tmpDir := t.TempDir() + + // Create mcp-logs/rpc-messages.jsonl (no gateway.jsonl present) + mcpLogsDir := filepath.Join(tmpDir, "mcp-logs") + require.NoError(t, os.MkdirAll(mcpLogsDir, 0755)) + + rpcContent := `{"timestamp":"2024-01-12T10:00:00.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_issues","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:00.200000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"result":{}}} +` + err := os.WriteFile(filepath.Join(mcpLogsDir, "rpc-messages.jsonl"), []byte(rpcContent), 0644) + require.NoError(t, err, "should write rpc-messages.jsonl") + + // parseGatewayLogs should fall back to rpc-messages.jsonl + metrics, err := parseGatewayLogs(tmpDir, false) + require.NoError(t, err, "should fall back to rpc-messages.jsonl") + require.NotNil(t, metrics, "metrics should not be nil") + + assert.Equal(t, 1, metrics.TotalRequests, "should have 1 request from rpc-messages.jsonl") + assert.Len(t, metrics.Servers, 1, "should have 1 server") + + _, hasGitHub := metrics.Servers["github"] + assert.True(t, hasGitHub, "should have github server") +} + +func TestFindRPCMessagesPath(t *testing.T) { + t.Run("rpc-messages in mcp-logs subdirectory", func(t *testing.T) { + tmpDir := t.TempDir() + mcpDir := filepath.Join(tmpDir, "mcp-logs") + require.NoError(t, os.MkdirAll(mcpDir, 0755)) + rpcPath := filepath.Join(mcpDir, "rpc-messages.jsonl") + require.NoError(t, os.WriteFile(rpcPath, []byte("{}"), 0644)) + + result := findRPCMessagesPath(tmpDir) + assert.Equal(t, rpcPath, result, "should find rpc-messages.jsonl in mcp-logs") + }) + + t.Run("rpc-messages in root directory", func(t *testing.T) { + tmpDir := t.TempDir() + rpcPath := filepath.Join(tmpDir, "rpc-messages.jsonl") + require.NoError(t, os.WriteFile(rpcPath, []byte("{}"), 0644)) + + result := findRPCMessagesPath(tmpDir) + assert.Equal(t, rpcPath, result, "should find rpc-messages.jsonl in root") + }) + + t.Run("mcp-logs subdirectory takes priority over root", func(t *testing.T) { + tmpDir := t.TempDir() + mcpDir := filepath.Join(tmpDir, "mcp-logs") + require.NoError(t, os.MkdirAll(mcpDir, 0755)) + mcpPath := filepath.Join(mcpDir, "rpc-messages.jsonl") + rootPath := filepath.Join(tmpDir, "rpc-messages.jsonl") + require.NoError(t, os.WriteFile(mcpPath, []byte("{}"), 0644)) + require.NoError(t, os.WriteFile(rootPath, []byte("{}"), 0644)) + + result := findRPCMessagesPath(tmpDir) + assert.Equal(t, mcpPath, result, "mcp-logs should take priority over root") + }) + + t.Run("not found returns empty string", func(t *testing.T) { + tmpDir := t.TempDir() + result := findRPCMessagesPath(tmpDir) + assert.Empty(t, result, "should return empty string when not found") + }) +} + +func TestBuildToolCallsFromRPCMessages(t *testing.T) { + tmpDir := t.TempDir() + + rpcContent := `{"timestamp":"2024-01-12T10:00:00.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"list_issues","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:00.200000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":1,"result":{}}} +{"timestamp":"2024-01-12T10:00:01.000000000Z","direction":"OUT","type":"REQUEST","server_id":"github","payload":{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"get_repository","arguments":{}}}} +{"timestamp":"2024-01-12T10:00:01.050000000Z","direction":"IN","type":"RESPONSE","server_id":"github","payload":{"jsonrpc":"2.0","id":2,"error":{"code":-32000,"message":"rate limit"}}} +` + logPath := filepath.Join(tmpDir, "rpc-messages.jsonl") + require.NoError(t, os.WriteFile(logPath, []byte(rpcContent), 0644)) + + calls, err := buildToolCallsFromRPCMessages(logPath) + require.NoError(t, err, "should build tool calls without error") + require.Len(t, calls, 2, "should have 2 tool calls") + + // Find each call (order may vary) + var listIssues, getRepo *MCPToolCall + for i := range calls { + switch calls[i].ToolName { + case "list_issues": + listIssues = &calls[i] + case "get_repository": + getRepo = &calls[i] + } + } + + require.NotNil(t, listIssues, "should have list_issues call") + assert.Equal(t, "github", listIssues.ServerName, "server name should be github") + assert.Equal(t, "success", listIssues.Status, "status should be success") + assert.NotEmpty(t, listIssues.Duration, "duration should be set for paired request/response") + + require.NotNil(t, getRepo, "should have get_repository call") + assert.Equal(t, "github", getRepo.ServerName, "server name should be github") + assert.Equal(t, "error", getRepo.Status, "status should be error") + assert.Equal(t, "rate limit", getRepo.Error, "error message should be set") +}