Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 160 additions & 2 deletions internal/server/mock/logs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@ import (
"context"
"fmt"
"slices"
"strings"
"time"

"google.golang.org/protobuf/types/known/timestamppb"

"github.com/skpr/api/pb"
)

Expand All @@ -14,6 +17,13 @@ const (
StreamFPM = "fpm"
)

// Raw JSON payloads reused from the Tail mock, kept here so fixture events
// look realistic without duplicating the strings across methods.
const (
rawNginx = `{ "body_bytes_sent": "37", "http_forward": "10.0.39.194", "http_header": "-", "http_referrer": "-", "http_user_agent": "ELB-HealthChecker/2.0", "http_x_amzn_trace_id": "-", "remote_addr": "10.0.39.194", "remote_user": "-", "request": "GET /readyz HTTP/1.1", "request_id": "6a06b1bf387b54f5f88cd1cac8c75de1", "request_method": "GET", "request_time": "0.001", "request_uri": "/readyz", "request_uri_query": "-", "server_name": "", "status": "200", "timestamp": "2025-03-25T01:21:50+00:00", "upstream_addr": "127.0.0.1:9000", "upstream_cache_status": "-", "upstream_http_x_drupal_cache": "-", "upstream_http_x_drupal_dynamic_cache": "-", "upstream_response_length": "23", "upstream_response_time": "0.002", "upstream_status": "200" }`
rawFPM = `{ "body_bytes_sent": "0", "client_ip": "-", "cpu": "0.00", "headers": { "Cache-Control": "no-cache, no-store, must-revalidate, max-age=0" }, "http_referrer": "", "http_user_agent": "kube-probe/1.31+", "memory": "2097152", "remote_addr": "127.0.0.1", "remote_user": "", "request_id": "eaff1b73b352356ea0a321a250c1d591", "request_time": "0.001", "request_uri": "/readyz", "skpr_component": "fpm", "status": "200", "timestamp": "2025-03-25T01:21:26+0000" }`
)

// Server implements the GRPC "events" definition.
type Server struct {
pb.UnimplementedLogsServer
Expand Down Expand Up @@ -48,9 +58,9 @@ func (s *Server) Tail(req *pb.LogTailRequest, server pb.Logs_TailServer) error {
message := ""
switch req.Stream {
case StreamNginx:
message = `{ "body_bytes_sent": "37", "http_forward": "10.0.39.194", "http_header": "-", "http_referrer": "-", "http_user_agent": "ELB-HealthChecker/2.0", "http_x_amzn_trace_id": "-", "remote_addr": "10.0.39.194", "remote_user": "-", "request": "GET /readyz HTTP/1.1", "request_id": "6a06b1bf387b54f5f88cd1cac8c75de1", "request_method": "GET", "request_time": "0.001", "request_uri": "/readyz", "request_uri_query": "-", "server_name": "", "status": "200", "timestamp": "2025-03-25T01:21:50+00:00", "upstream_addr": "127.0.0.1:9000", "upstream_cache_status": "-", "upstream_http_x_drupal_cache": "-", "upstream_http_x_drupal_dynamic_cache": "-", "upstream_response_length": "23", "upstream_response_time": "0.002", "upstream_status": "200" }`
message = rawNginx
case StreamFPM:
message = `{ "body_bytes_sent": "0", "client_ip": "-", "cpu": "0.00", "headers": { "Cache-Control": "no-cache, no-store, must-revalidate, max-age=0" }, "http_referrer": "", "http_user_agent": "kube-probe/1.31+", "memory": "2097152", "remote_addr": "127.0.0.1", "remote_user": "", "request_id": "eaff1b73b352356ea0a321a250c1d591", "request_time": "0.001", "request_uri": "/readyz", "skpr_component": "fpm", "status": "200", "timestamp": "2025-03-25T01:21:26+0000" }`
message = rawFPM
}
pbMessage := &pb.LogTailResponse{
Message: message,
Expand All @@ -65,3 +75,151 @@ func (s *Server) Tail(req *pb.LogTailRequest, server pb.Logs_TailServer) error {

return nil
}

// buildMockEvents returns a fresh fixture with timestamps relative to now.
// Built per request so events always appear recent to the caller.
func buildMockEvents() []*pb.LogEvent {
now := time.Now()
return []*pb.LogEvent{
{
Timestamp: timestamppb.New(now.Add(-1 * time.Minute)),
Stream: StreamNginx,
Message: rawNginx,
},
{
Timestamp: timestamppb.New(now.Add(-3 * time.Minute)),
Stream: StreamFPM,
Message: rawFPM,
},
{
Timestamp: timestamppb.New(now.Add(-7 * time.Minute)),
Stream: StreamNginx,
Message: rawNginx,
},
{
Timestamp: timestamppb.New(now.Add(-12 * time.Minute)),
Stream: StreamFPM,
Message: rawFPM,
},
{
Timestamp: timestamppb.New(now.Add(-25 * time.Minute)),
Stream: StreamNginx,
Message: rawNginx,
},
{
Timestamp: timestamppb.New(now.Add(-45 * time.Minute)),
Stream: StreamFPM,
Message: rawFPM,
},
}
}

// Query streams matching log events followed by a terminal metadata message.
// The Window oneof (Timeframe or TimeRange) on the filter is accepted but ignored by the mock.
func (s *Server) Query(req *pb.LogQueryRequest, stream pb.Logs_QueryServer) error {
if req.Filter == nil {
return fmt.Errorf("filter not provided")
}

if req.Filter.Environment == "" {
return fmt.Errorf("environment not provided")
}

events := buildMockEvents()
scanned := int64(len(events))

// Filter by stream selection if provided.
if len(req.Filter.Streams) > 0 {
filtered := events[:0:0]
for _, evt := range events {
if slices.Contains(req.Filter.Streams, evt.Stream) {
filtered = append(filtered, evt)
}
}
events = filtered
}

// Filter by substring queries if provided. Each entry must be satisfied:
// include entries require the substring to be present, exclude entries
// require it to be absent. Empty Value entries are skipped.
if len(req.Filter.Contains) > 0 {
filtered := events[:0:0]
for _, evt := range events {
ok := true
for _, f := range req.Filter.Contains {
if f == nil || f.Value == "" {
continue
}
has := strings.Contains(evt.Message, f.Value)
if f.Exclude && has {
ok = false
break
}
if !f.Exclude && !has {
ok = false
break
}
}
if ok {
filtered = append(filtered, evt)
}
}
events = filtered
}

// Apply result cap.
if req.Limit > 0 && int(req.Limit) < len(events) {
events = events[:req.Limit]
}

const batchSize = 2
for i := 0; i < len(events); i += batchSize {
end := i + batchSize
if end > len(events) {
end = len(events)
}
resp := &pb.LogQueryResponse{
Body: &pb.LogQueryResponse_Batch{
Batch: &pb.LogEventBatch{Events: events[i:end]},
},
}
if err := stream.Send(resp); err != nil {
return err
}
}

meta := &pb.LogQueryResponse{
Body: &pb.LogQueryResponse_Meta{
Meta: &pb.LogQueryMeta{
Scanned: scanned,
RanAt: timestamppb.Now(),
},
},
}
return stream.Send(meta)
}

// Summarise returns a canned AI-style summary of the requested log window.
// The Prompt field is ignored by the mock.
func (s *Server) Summarise(ctx context.Context, req *pb.LogSummariseRequest) (*pb.LogSummariseResponse, error) {
if req.Filter == nil {
return nil, fmt.Errorf("filter not provided")
}

if req.Filter.Environment == "" {
return nil, fmt.Errorf("environment not provided")
}

return &pb.LogSummariseResponse{
Overview: "Traffic is nominal with a small number of 5xx errors originating from the fpm stream; nginx is healthy.",
Bullets: []string{
"99.2% of requests returned 2xx",
"3 elevated-error windows detected in fpm between 01:20-01:40 UTC",
"No WAF blocks observed in the window",
},
SuggestedActions: []string{
"Inspect fpm error logs around 01:30 UTC for root cause",
"Consider increasing the fpm worker count if load continues to rise",
},
}, nil
}
94 changes: 94 additions & 0 deletions logs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@ syntax = "proto3";

package workflow;

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

option go_package = "./pb";

service logs {
rpc Tail (LogTailRequest) returns (stream LogTailResponse) {}
rpc ListStreams (LogListStreamsRequest) returns (LogListStreamsResponse) {}
rpc Query (LogQueryRequest) returns (stream LogQueryResponse) {}
rpc Summarise (LogSummariseRequest) returns (LogSummariseResponse) {}
}

/**
Expand Down Expand Up @@ -39,3 +44,92 @@ message LogListStreamsResponse {
repeated string Streams = 1; // Streams available for an environment
string Default = 2; // Default stream
}

/**
* Shared filter block for bounded log queries and summarisation
*/
message LogFilter {
string Environment = 1; // Name of the environment
repeated string Streams = 2; // Stream IDs to include; empty means all
// Time window for the query; exactly one variant should be set.
oneof Window {
google.protobuf.Duration Timeframe = 3; // Window duration relative to now
LogTimeRange TimeRange = 4; // Custom absolute time range
}
repeated LogContainsFilter Contains = 5; // Substring filters; an event must satisfy every entry (include AND, exclude AND)
}

/**
* A single substring filter applied to log event messages
*/
message LogContainsFilter {
string Value = 1; // Substring to match against the event message
bool Exclude = 2; // false = event message must contain Value; true = event message must NOT contain Value
}

/**
* Custom time range for a log query
*/
message LogTimeRange {
google.protobuf.Timestamp From = 1; // Start of range
google.protobuf.Timestamp To = 2; // End of range
}

/**
* Run a bounded log query
*/
message LogQueryRequest {
LogFilter Filter = 1; // Filter block
int32 Limit = 2; // Cap on returned events; 0 for server default
}

/**
* Streamed query response: batches of events followed by a final Meta message
*/
message LogQueryResponse {
oneof Body {
LogEventBatch Batch = 1; // A batch of matched log events
LogQueryMeta Meta = 2; // Final message summarising the run
}
}

/**
* A batch of log events emitted as a single streamed response chunk
*/
message LogEventBatch {
repeated LogEvent Events = 1; // Events in this batch
}

/**
* A single log event returned by a query
*/
message LogEvent {
google.protobuf.Timestamp Timestamp = 1; // Event timestamp
string Stream = 2; // Stream ID e.g. "nginx","fpm","cli","cloudfront","invalidations","waf"
string Message = 3; // Original payload (typically JSON), serialised as a string
}

/**
* Metadata for a completed query run, emitted as the final stream message
*/
message LogQueryMeta {
int64 Scanned = 1; // Total events scanned across selected streams
google.protobuf.Timestamp RanAt = 2; // Timestamp of when the query ran
}

/**
* Summarise a log window using a natural-language prompt
*/
message LogSummariseRequest {
LogFilter Filter = 1; // Filter block describing the window to summarise
string Prompt = 2; // Natural-language question for the summariser
}

/**
* AI-generated summary of a log window
*/
message LogSummariseResponse {
string Overview = 1; // One-paragraph summary
repeated string Bullets = 2; // Notable events / observations
repeated string SuggestedActions = 3; // Suggested follow-up actions
}
Loading