Skip to content

Commit bf7b119

Browse files
hyperpolymathclaude
andcommitted
feat(verisimdb): add Groove health mesh and feedback-o-tron endpoints
New endpoints on the VeriSimDB groove router: - GET /.well-known/groove/mesh — returns cached peer health mesh status from background monitor that probes groove peers every 30s - POST /.well-known/groove/feedback — accepts feedback events routed through the Groove mesh, stores in an in-memory feedback store - GET /.well-known/groove/feedback — lists all stored feedback entries Architecture follows existing Axum patterns with separate state types per sub-router, merged via Router::merge(). Mesh monitor runs as a tokio background task using spawn_blocking for TCP probes. Manifest updated to declare feedback and health-mesh capabilities. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6d29f29 commit bf7b119

File tree

1 file changed

+317
-2
lines changed

1 file changed

+317
-2
lines changed

verisimdb/rust-core/verisim-api/src/groove.rs

Lines changed: 317 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,22 @@ fn manifest() -> serde_json::Value {
155155
"endpoint": "/api/v1/vql/execute",
156156
"requires_auth": false,
157157
"panel_compatible": true
158+
},
159+
"feedback": {
160+
"type": "feedback",
161+
"description": "Groove-routed feedback receiver — stores feedback targeted at VeriSimDB",
162+
"protocol": "http",
163+
"endpoint": "/.well-known/groove/feedback",
164+
"requires_auth": false,
165+
"panel_compatible": false
166+
},
167+
"health-mesh": {
168+
"type": "health-mesh",
169+
"description": "Inter-service health mesh — monitors peer status via groove probing",
170+
"protocol": "http",
171+
"endpoint": "/.well-known/groove/mesh",
172+
"requires_auth": false,
173+
"panel_compatible": true
158174
}
159175
},
160176
"consumes": ["integrity", "scanning"],
@@ -427,15 +443,294 @@ async fn groove_status_handler(State(groove): State<GrooveState>) -> impl IntoRe
427443
}))
428444
}
429445

446+
// --- Health Mesh ---
447+
448+
/// Cached health state of groove peers, updated by the mesh monitor.
449+
#[derive(Debug, Clone, Serialize)]
450+
pub struct PeerHealth {
451+
/// Peer's self-reported service ID.
452+
pub service_id: String,
453+
/// Port the peer was discovered on.
454+
pub port: u16,
455+
/// "up", "degraded", or "down".
456+
pub status: String,
457+
/// Unix timestamp (milliseconds) of last successful probe.
458+
pub last_seen_ms: u64,
459+
}
460+
461+
/// Shared mesh state: list of peer health entries.
462+
#[derive(Debug, Clone)]
463+
pub struct MeshState {
464+
peers: Arc<Mutex<Vec<PeerHealth>>>,
465+
last_probe_ms: Arc<Mutex<u64>>,
466+
}
467+
468+
impl MeshState {
469+
/// Create an empty mesh state.
470+
pub fn new() -> Self {
471+
Self {
472+
peers: Arc::new(Mutex::new(Vec::new())),
473+
last_probe_ms: Arc::new(Mutex::new(0)),
474+
}
475+
}
476+
}
477+
478+
impl Default for MeshState {
479+
fn default() -> Self {
480+
Self::new()
481+
}
482+
}
483+
484+
/// Known ports to probe for groove peers (excluding our own port).
485+
const MESH_PROBE_PORTS: &[u16] = &[6473, 8000, 8081, 8091, 8092];
486+
487+
/// Probe all known groove peers and update the mesh state.
488+
///
489+
/// Called periodically by the mesh monitor background task.
490+
fn probe_mesh_peers(mesh: &MeshState) {
491+
let now_ms = unix_now_ms();
492+
let mut results = Vec::new();
493+
494+
for &port in MESH_PROBE_PORTS {
495+
let addr_str = format!("127.0.0.1:{}", port);
496+
let addr: std::net::SocketAddr = match addr_str.parse() {
497+
Ok(a) => a,
498+
Err(_) => continue,
499+
};
500+
501+
match std::net::TcpStream::connect_timeout(&addr, std::time::Duration::from_millis(500)) {
502+
Ok(mut stream) => {
503+
use std::io::{Read, Write};
504+
stream.set_read_timeout(Some(std::time::Duration::from_millis(500))).ok();
505+
stream.set_write_timeout(Some(std::time::Duration::from_millis(500))).ok();
506+
507+
let request = format!(
508+
"GET /.well-known/groove/status HTTP/1.0\r\nHost: {}\r\nConnection: close\r\n\r\n",
509+
addr_str
510+
);
511+
512+
if stream.write_all(request.as_bytes()).is_ok() {
513+
let mut buf = vec![0u8; 4096];
514+
let service_id = match stream.read(&mut buf) {
515+
Ok(n) if n > 0 => {
516+
let resp = String::from_utf8_lossy(&buf[..n]);
517+
extract_service_id(&resp)
518+
}
519+
_ => "unknown".to_string(),
520+
};
521+
results.push(PeerHealth {
522+
service_id,
523+
port,
524+
status: "up".to_string(),
525+
last_seen_ms: now_ms,
526+
});
527+
}
528+
}
529+
Err(_) => {
530+
// Peer unreachable — don't include in results.
531+
}
532+
}
533+
}
534+
535+
if let Ok(mut peers) = mesh.peers.lock() {
536+
*peers = results;
537+
}
538+
if let Ok(mut ts) = mesh.last_probe_ms.lock() {
539+
*ts = now_ms;
540+
}
541+
}
542+
543+
/// Extract service_id from a groove status HTTP response body.
544+
fn extract_service_id(response: &str) -> String {
545+
// Find body after headers.
546+
let body = if let Some(idx) = response.find("\r\n\r\n") {
547+
&response[idx + 4..]
548+
} else {
549+
response
550+
};
551+
552+
if let Ok(v) = serde_json::from_str::<serde_json::Value>(body) {
553+
if let Some(id) = v.get("service").and_then(|s| s.as_str()) {
554+
return id.to_string();
555+
}
556+
if let Some(id) = v.get("service_id").and_then(|s| s.as_str()) {
557+
return id.to_string();
558+
}
559+
}
560+
561+
"unknown".to_string()
562+
}
563+
564+
/// Spawn the mesh health monitor as a background tokio task.
565+
///
566+
/// Probes peers every 30 seconds. The MeshState is shared with the
567+
/// HTTP handler via Arc.
568+
pub fn spawn_mesh_monitor(mesh: MeshState) {
569+
tokio::spawn(async move {
570+
loop {
571+
// Run probe on a blocking thread to avoid blocking the async runtime.
572+
let mesh_clone = mesh.clone();
573+
let _ = tokio::task::spawn_blocking(move || {
574+
probe_mesh_peers(&mesh_clone);
575+
})
576+
.await;
577+
578+
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
579+
}
580+
});
581+
}
582+
583+
/// GET /.well-known/groove/mesh — Return the cached mesh health view.
584+
async fn groove_mesh_handler(State(mesh): State<MeshState>) -> impl IntoResponse {
585+
let peers = mesh.peers.lock().unwrap_or_else(|e| e.into_inner()).clone();
586+
let last_probe = *mesh.last_probe_ms.lock().unwrap_or_else(|e| e.into_inner());
587+
588+
Json(serde_json::json!({
589+
"service_id": "verisimdb",
590+
"timestamp_ms": unix_now_ms(),
591+
"last_probe_ms": last_probe,
592+
"peer_count": peers.len(),
593+
"peers": peers,
594+
}))
595+
}
596+
597+
// --- Feedback ---
598+
599+
/// Shared feedback store: timestamped feedback entries.
600+
#[derive(Debug, Clone)]
601+
pub struct FeedbackStore {
602+
entries: Arc<Mutex<Vec<FeedbackEntry>>>,
603+
}
604+
605+
/// A single feedback entry received via the Groove mesh.
606+
#[derive(Debug, Clone, Serialize)]
607+
pub struct FeedbackEntry {
608+
pub id: String,
609+
pub timestamp_ms: u64,
610+
pub source_service: String,
611+
pub target_service: String,
612+
pub category: String,
613+
pub message: String,
614+
pub metadata: serde_json::Value,
615+
}
616+
617+
/// Maximum stored feedback entries.
618+
const MAX_FEEDBACK_ENTRIES: usize = 10_000;
619+
620+
impl FeedbackStore {
621+
/// Create an empty feedback store.
622+
pub fn new() -> Self {
623+
Self {
624+
entries: Arc::new(Mutex::new(Vec::new())),
625+
}
626+
}
627+
}
628+
629+
impl Default for FeedbackStore {
630+
fn default() -> Self {
631+
Self::new()
632+
}
633+
}
634+
635+
/// Body for POST /.well-known/groove/feedback.
636+
#[derive(Debug, Deserialize)]
637+
pub struct FeedbackRequest {
638+
#[serde(default = "default_feedback_type")]
639+
pub r#type: String,
640+
#[serde(default = "default_verisimdb")]
641+
pub target_service: String,
642+
#[serde(default = "default_other")]
643+
pub category: String,
644+
#[serde(default)]
645+
pub message: String,
646+
#[serde(default)]
647+
pub metadata: serde_json::Value,
648+
#[serde(default = "default_unknown")]
649+
pub source_service: String,
650+
}
651+
652+
fn default_feedback_type() -> String { "feedback".to_string() }
653+
fn default_verisimdb() -> String { "verisimdb".to_string() }
654+
fn default_other() -> String { "other".to_string() }
655+
fn default_unknown() -> String { "unknown".to_string() }
656+
657+
/// POST /.well-known/groove/feedback — Receive feedback from the Groove mesh.
658+
async fn groove_feedback_handler(
659+
State(store): State<FeedbackStore>,
660+
Json(req): Json<FeedbackRequest>,
661+
) -> impl IntoResponse {
662+
let valid_categories = ["bug", "feature", "ux", "performance", "other"];
663+
if !valid_categories.contains(&req.category.as_str()) {
664+
return (
665+
StatusCode::BAD_REQUEST,
666+
Json(serde_json::json!({
667+
"ok": false,
668+
"error": format!("invalid category: {}", req.category),
669+
})),
670+
);
671+
}
672+
673+
let now_ms = unix_now_ms();
674+
let id = format!("groove-feedback-{now_ms}");
675+
676+
let entry = FeedbackEntry {
677+
id: id.clone(),
678+
timestamp_ms: now_ms,
679+
source_service: req.source_service,
680+
target_service: req.target_service.clone(),
681+
category: req.category,
682+
message: req.message,
683+
metadata: req.metadata,
684+
};
685+
686+
{
687+
let mut entries = store.entries.lock().expect("feedback lock poisoned");
688+
entries.push(entry);
689+
if entries.len() > MAX_FEEDBACK_ENTRIES {
690+
entries.remove(0);
691+
}
692+
}
693+
694+
info!(id = %id, "Groove feedback accepted");
695+
696+
(
697+
StatusCode::OK,
698+
Json(serde_json::json!({
699+
"ok": true,
700+
"routed_to": req.target_service,
701+
"id": id,
702+
})),
703+
)
704+
}
705+
706+
/// GET /.well-known/groove/feedback — List stored feedback entries.
707+
async fn groove_feedback_list_handler(
708+
State(store): State<FeedbackStore>,
709+
) -> impl IntoResponse {
710+
let entries = store.entries.lock().unwrap_or_else(|e| e.into_inner()).clone();
711+
Json(serde_json::json!({
712+
"count": entries.len(),
713+
"entries": entries,
714+
}))
715+
}
716+
430717
/// Build the groove sub-router.
431718
///
432719
/// Mounted at `/.well-known/groove` in the main application router.
433720
/// Uses its own `GrooveState` (not `AppState`) to keep the connection
434721
/// tracker lightweight and independent of the database layer.
722+
///
723+
/// Includes health mesh monitoring and feedback-o-tron endpoints.
435724
pub fn groove_router() -> Router {
436725
let groove_state = GrooveState::new();
726+
let mesh_state = MeshState::new();
727+
let feedback_store = FeedbackStore::new();
728+
729+
// Spawn the background mesh monitor task.
730+
spawn_mesh_monitor(mesh_state.clone());
437731

438-
Router::new()
732+
// Connection lifecycle sub-router (uses GrooveState).
733+
let connection_router = Router::new()
439734
.route(
440735
"/.well-known/groove",
441736
get(groove_manifest_handler),
@@ -456,7 +751,27 @@ pub fn groove_router() -> Router {
456751
"/.well-known/groove/status",
457752
get(groove_status_handler),
458753
)
459-
.with_state(groove_state)
754+
.with_state(groove_state);
755+
756+
// Health mesh sub-router (uses MeshState).
757+
let mesh_router = Router::new()
758+
.route(
759+
"/.well-known/groove/mesh",
760+
get(groove_mesh_handler),
761+
)
762+
.with_state(mesh_state);
763+
764+
// Feedback sub-router (uses FeedbackStore).
765+
let feedback_router = Router::new()
766+
.route(
767+
"/.well-known/groove/feedback",
768+
get(groove_feedback_list_handler).post(groove_feedback_handler),
769+
)
770+
.with_state(feedback_store);
771+
772+
connection_router
773+
.merge(mesh_router)
774+
.merge(feedback_router)
460775
}
461776

462777
// --- Helpers ---

0 commit comments

Comments
 (0)