diff --git a/src/providers/index.ts b/src/providers/index.ts index 5f71566..4febb95 100644 --- a/src/providers/index.ts +++ b/src/providers/index.ts @@ -5,6 +5,7 @@ import { Mem0Provider } from "./mem0" import { ZepProvider } from "./zep" import { FilesystemProvider } from "./filesystem" import { RAGProvider } from "./rag" +import { RagZoomProvider } from "./ragzoom" const providers: Record Provider> = { supermemory: SupermemoryProvider, @@ -12,6 +13,7 @@ const providers: Record Provider> = { zep: ZepProvider, filesystem: FilesystemProvider, rag: RAGProvider, + ragzoom: RagZoomProvider, } export function createProvider(name: ProviderName): Provider { @@ -39,4 +41,4 @@ export function getProviderInfo(name: ProviderName): { } } -export { SupermemoryProvider, Mem0Provider, ZepProvider, FilesystemProvider, RAGProvider } +export { SupermemoryProvider, Mem0Provider, ZepProvider, FilesystemProvider, RAGProvider, RagZoomProvider } diff --git a/src/providers/ragzoom/bridge.py b/src/providers/ragzoom/bridge.py new file mode 100644 index 0000000..dca7703 --- /dev/null +++ b/src/providers/ragzoom/bridge.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +""" +RagZoom HTTP Bridge Server + +A thin FastAPI server that wraps the RagZoom gRPC Python client, +exposing a simple REST API for the memorybench TypeScript provider. + +Endpoints: + POST /ingest — batch_append content into a document + POST /search — agentic search over a document + GET /status/:id — document indexing status (completion_pct) + POST /clear — clear a document + GET /health — health check + +Start: python3 bridge.py [--port 8079] [--grpc-address localhost:50052] +""" + +from __future__ import annotations + +import argparse +import logging +import os +import sys +import time + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import uvicorn + +# Ensure ragzoom is importable (pip install -e ~/dynamic-summary) +try: + from ragzoom.wrapper import RagZoom, AppendUnit +except ImportError as exc: + print( + f"ERROR: Cannot import ragzoom: {exc}\n" + "Make sure ragzoom is installed: pip install -e ~/dynamic-summary", + file=sys.stderr, + ) + sys.exit(1) + +logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") +logger = logging.getLogger("ragzoom-bridge") + +app = FastAPI(title="RagZoom MemoryBench Bridge", version="0.1.0") + +# Global RagZoom client — initialized at startup +_rz: RagZoom | None = None + + +def get_rz() -> RagZoom: + if _rz is None: + raise RuntimeError("RagZoom client not initialized") + return _rz + + +# ── Request/Response models ────────────────────────────────────────────────── + + +class AppendUnitPayload(BaseModel): + text: str + time_start: str | None = None + time_end: str | None = None + + +class IngestRequest(BaseModel): + document_id: str + units: list[AppendUnitPayload] + + +class IngestResponse(BaseModel): + document_id: str + chunks_created: int + leaf_count: int + + +class SearchRequest(BaseModel): + question: str + document_id: str + + +class SearchResponse(BaseModel): + answer: str + + +class StatusResponse(BaseModel): + document_id: str + exists: bool + completion_pct: float + leaf_count: int + node_count: int + has_pending_work: bool + + +class ClearRequest(BaseModel): + document_id: str + + +class ClearResponse(BaseModel): + document_id: str + cleared: bool + + +# ── Endpoints ──────────────────────────────────────────────────────────────── + + +@app.get("/health") +def health(): + return {"status": "ok"} + + +@app.post("/ingest", response_model=IngestResponse) +def ingest(req: IngestRequest): + rz = get_rz() + units = [] + for u in req.units: + if u.time_start and u.time_end: + units.append(AppendUnit(text=u.text, time_start=u.time_start, time_end=u.time_end)) + else: + units.append(AppendUnit(text=u.text)) + + try: + result = rz.batch_append(req.document_id, units) + except Exception as exc: + logger.error("batch_append failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + + # Get leaf count from status + try: + status = rz.get_document_status(req.document_id) + leaf_count = status.leaf_count + except Exception: + leaf_count = 0 + + return IngestResponse( + document_id=req.document_id, + chunks_created=result.chunks_created, + leaf_count=leaf_count, + ) + + +@app.post("/search", response_model=SearchResponse) +def search(req: SearchRequest): + rz = get_rz() + try: + result = rz.search(req.question, req.document_id) + except Exception as exc: + logger.error("search failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + + return SearchResponse(answer=result.answer) + + +@app.get("/status/{document_id}", response_model=StatusResponse) +def status(document_id: str): + rz = get_rz() + try: + s = rz.get_document_status(document_id) + except Exception as exc: + logger.error("get_document_status failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + + # Also check work queue status for has_pending_work + has_pending = False + if s.exists: + try: + from ragzoom.client.grpc_client import GrpcRagzoomClient + # Re-use the address from the RagZoom wrapper + addr = rz._address + if addr: + with GrpcRagzoomClient(addr) as client: + work = client.get_document_work_status(document_id) + has_pending = work.has_pending_work + except Exception: + # If we can't check work status, infer from completion_pct + has_pending = s.completion_pct < 100.0 + + return StatusResponse( + document_id=s.document_id, + exists=s.exists, + completion_pct=s.completion_pct, + leaf_count=s.leaf_count, + node_count=s.node_count, + has_pending_work=has_pending, + ) + + +@app.post("/clear", response_model=ClearResponse) +def clear(req: ClearRequest): + rz = get_rz() + try: + rz.clear(req.document_id) + except Exception as exc: + logger.error("clear failed: %s", exc) + raise HTTPException(status_code=500, detail=str(exc)) + + return ClearResponse(document_id=req.document_id, cleared=True) + + +# ── Main ───────────────────────────────────────────────────────────────────── + + +def main(): + global _rz + + parser = argparse.ArgumentParser(description="RagZoom HTTP Bridge for MemoryBench") + parser.add_argument( + "--port", type=int, default=int(os.environ.get("RAGZOOM_BRIDGE_PORT", "8079")), + help="HTTP port (default: 8079 or RAGZOOM_BRIDGE_PORT env)", + ) + parser.add_argument( + "--grpc-address", + default=os.environ.get("RAGZOOM_SERVER_ADDRESS", "localhost:50052"), + help="RagZoom gRPC address (default: localhost:50052 or RAGZOOM_SERVER_ADDRESS env)", + ) + parser.add_argument( + "--timeout", type=float, default=300.0, + help="gRPC timeout in seconds for long operations like search (default: 300)", + ) + args = parser.parse_args() + + logger.info("Connecting to RagZoom gRPC at %s", args.grpc_address) + _rz = RagZoom(server_address=args.grpc_address, timeout=args.timeout) + + logger.info("Starting bridge on port %d", args.port) + uvicorn.run(app, host="0.0.0.0", port=args.port, log_level="info") + + +if __name__ == "__main__": + main() diff --git a/src/providers/ragzoom/index.ts b/src/providers/ragzoom/index.ts new file mode 100644 index 0000000..2dadf48 --- /dev/null +++ b/src/providers/ragzoom/index.ts @@ -0,0 +1,282 @@ +import type { + Provider, + ProviderConfig, + IngestOptions, + IngestResult, + SearchOptions, + IndexingProgressCallback, +} from "../../types/provider" +import type { UnifiedSession } from "../../types/unified" +import { logger } from "../../utils/logger" +import { RAGZOOM_PROMPTS } from "./prompts" + +/** + * RagZoom Memory Provider + * + * Connects to the RagZoom system via a thin HTTP bridge server that wraps + * RagZoom's gRPC Python API. The bridge must be running before this + * provider is used. + * + * Architecture: + * memorybench (TS) --HTTP--> bridge.py (Python) --gRPC--> ragzoom daemon + * + * Configuration (ProviderConfig): + * - baseUrl: URL of the bridge server (default: http://localhost:8079) + * - apiKey: not required (passed through but unused — RagZoom is local) + * + * Each benchmark run uses the containerTag as the RagZoom document_id, + * providing full namespace isolation between runs. + */ +export class RagZoomProvider implements Provider { + name = "ragzoom" + prompts = RAGZOOM_PROMPTS + concurrency = { + default: 5, + ingest: 3, + indexing: 10, + } + + private baseUrl: string = "" + + async initialize(config: ProviderConfig): Promise { + this.baseUrl = (config.baseUrl || "http://localhost:8079").replace(/\/$/, "") + + // Health check + try { + const resp = await fetch(`${this.baseUrl}/health`) + if (!resp.ok) { + throw new Error(`Bridge health check failed: ${resp.status} ${resp.statusText}`) + } + const data = (await resp.json()) as { status: string } + if (data.status !== "ok") { + throw new Error(`Bridge health check returned unexpected status: ${data.status}`) + } + } catch (err) { + if (err instanceof TypeError && String(err).includes("fetch")) { + throw new Error( + `Cannot connect to RagZoom bridge at ${this.baseUrl}. ` + + `Make sure the bridge server is running: python3 src/providers/ragzoom/bridge.py` + ) + } + throw err + } + + logger.info(`Initialized RagZoom provider (bridge: ${this.baseUrl})`) + } + + /** + * Ingest sessions by converting them to AppendUnit payloads and + * sending to the bridge's /ingest endpoint. + * + * Each session's messages are formatted as a text block with + * speaker/role labels and optional timestamps. The containerTag + * is used as the RagZoom document_id. + */ + async ingest(sessions: UnifiedSession[], options: IngestOptions): Promise { + const documentId = options.containerTag + const units: Array<{ text: string; time_start?: string; time_end?: string }> = [] + + for (const session of sessions) { + // Format session messages into a single text block + const formattedDate = session.metadata?.formattedDate as string + const isoDate = session.metadata?.date as string + + let sessionText = "" + if (formattedDate) { + sessionText += `[Session: ${session.sessionId} | Date: ${formattedDate}]\n\n` + } else { + sessionText += `[Session: ${session.sessionId}]\n\n` + } + + for (const msg of session.messages) { + const speaker = msg.speaker || msg.role + const timestamp = msg.timestamp ? ` (${msg.timestamp})` : "" + sessionText += `${speaker}${timestamp}: ${msg.content}\n\n` + } + + // Build the AppendUnit with optional temporal metadata + const unit: { text: string; time_start?: string; time_end?: string } = { + text: sessionText.trim(), + } + + // If messages have timestamps, use the first and last as the time range + const timestamps = session.messages + .map((m) => m.timestamp) + .filter((t): t is string => !!t) + + if (timestamps.length > 0) { + unit.time_start = timestamps[0] + unit.time_end = timestamps[timestamps.length - 1] + } else if (isoDate) { + // Fall back to session-level date + unit.time_start = isoDate + unit.time_end = isoDate + } + + units.push(unit) + } + + if (units.length === 0) { + return { documentIds: [] } + } + + const resp = await fetch(`${this.baseUrl}/ingest`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + document_id: documentId, + units, + }), + }) + + if (!resp.ok) { + const detail = await resp.text() + throw new Error(`RagZoom ingest failed: ${resp.status} ${detail}`) + } + + const result = (await resp.json()) as { document_id: string; chunks_created: number } + logger.debug( + `Ingested ${sessions.length} sessions into doc "${documentId}" (${result.chunks_created} chunks created)` + ) + + // Return the document_id as the single "document" for indexing tracking + return { documentIds: [documentId] } + } + + /** + * Poll the bridge's /status endpoint until the document is fully indexed. + * + * RagZoom indexes asynchronously via background workers that build the + * summary tree. We poll completion_pct until it reaches 100% or there + * is no pending work. + */ + async awaitIndexing( + result: IngestResult, + containerTag: string, + onProgress?: IndexingProgressCallback + ): Promise { + const documentIds = result.documentIds + if (documentIds.length === 0) { + onProgress?.({ completedIds: [], failedIds: [], total: 0 }) + return + } + + const total = documentIds.length + const completedIds: string[] = [] + const failedIds: string[] = [] + let backoffMs = 1000 + + onProgress?.({ completedIds: [], failedIds: [], total }) + + const pending = new Set(documentIds) + + while (pending.size > 0) { + for (const docId of Array.from(pending)) { + try { + const resp = await fetch(`${this.baseUrl}/status/${encodeURIComponent(docId)}`) + if (!resp.ok) { + logger.warn(`Status check failed for ${docId}: ${resp.status}`) + continue + } + + const status = (await resp.json()) as { + exists: boolean + completion_pct: number + has_pending_work: boolean + } + + if (!status.exists) { + // Document hasn't been created yet — still processing + continue + } + + // Done when completion is 100% or no pending work + if (status.completion_pct >= 100.0 || !status.has_pending_work) { + pending.delete(docId) + completedIds.push(docId) + logger.debug(`Document "${docId}" indexing complete (${status.completion_pct}%)`) + } else { + logger.debug( + `Document "${docId}" indexing: ${status.completion_pct.toFixed(1)}%, pending work: ${status.has_pending_work}` + ) + } + } catch (err) { + logger.warn(`Error checking status for ${docId}: ${err}`) + } + } + + onProgress?.({ completedIds: [...completedIds], failedIds: [...failedIds], total }) + + if (pending.size > 0) { + await new Promise((r) => setTimeout(r, backoffMs)) + backoffMs = Math.min(backoffMs * 1.5, 10000) + } + } + + if (failedIds.length > 0) { + logger.warn(`${failedIds.length} documents failed indexing`) + } + } + + /** + * Search via RagZoom's agentic search endpoint. + * + * Returns an array with a single result object containing the + * synthesized answer from the search agent. + */ + async search(query: string, options: SearchOptions): Promise { + const documentId = options.containerTag + + const resp = await fetch(`${this.baseUrl}/search`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + question: query, + document_id: documentId, + }), + }) + + if (!resp.ok) { + const detail = await resp.text() + throw new Error(`RagZoom search failed: ${resp.status} ${detail}`) + } + + const result = (await resp.json()) as { answer: string } + + logger.debug(`Search for "${query.substring(0, 50)}..." returned answer (${result.answer.length} chars)`) + + // Return as array for consistency with other providers + return [ + { + answer: result.answer, + document_id: documentId, + question: query, + }, + ] + } + + /** + * Clear a document by sending a POST to the bridge's /clear endpoint. + */ + async clear(containerTag: string): Promise { + try { + const resp = await fetch(`${this.baseUrl}/clear`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ document_id: containerTag }), + }) + + if (!resp.ok) { + const detail = await resp.text() + logger.warn(`Clear failed for "${containerTag}": ${resp.status} ${detail}`) + return + } + + logger.info(`Cleared RagZoom document: ${containerTag}`) + } catch (err) { + logger.warn(`Error clearing document "${containerTag}": ${err}`) + } + } +} + +export default RagZoomProvider diff --git a/src/providers/ragzoom/prompts.ts b/src/providers/ragzoom/prompts.ts new file mode 100644 index 0000000..0f0d5c9 --- /dev/null +++ b/src/providers/ragzoom/prompts.ts @@ -0,0 +1,72 @@ +import type { ProviderPrompts } from "../../types/prompts" + +/** + * RagZoom search results contain a single synthesized answer from the + * agentic search agent, not an array of chunks. We wrap it as context. + */ +interface RagZoomSearchResult { + answer: string + document_id: string + question: string +} + +function buildRagZoomContext(context: unknown[]): string { + const results = context as RagZoomSearchResult[] + + if (results.length === 0) { + return "No relevant information was retrieved." + } + + // RagZoom search returns a single synthesized answer per query + return results + .map((result, i) => { + return `[Memory Search Result ${i + 1}]\n${result.answer}` + }) + .join("\n\n---\n\n") +} + +export function buildRagZoomAnswerPrompt( + question: string, + context: unknown[], + questionDate?: string +): string { + const retrievedContext = buildRagZoomContext(context) + + return `You are a question-answering system. Based on the retrieved memory context below, answer the question. + +Question: ${question} +Question Date: ${questionDate || "Not specified"} + +Retrieved Memory Context (from RagZoom agentic search): +${retrievedContext} + +**Understanding the Context:** +The context comes from RagZoom's agentic search, which iteratively zooms into a hierarchical +summary tree of conversation history. The search agent has already done multi-step retrieval +and reasoning to produce a synthesized answer. The context above is the search agent's output. + +**How to Answer:** +1. Read the search result carefully — it is a targeted answer from the memory system +2. Use the information provided to answer the question directly +3. If the search result addresses the question, relay the relevant information +4. If the search result says it could not find the information, respond with "I don't know" +5. For temporal questions, use any date references in the search result + +Instructions: +- Base your answer ONLY on the provided context +- Provide a clear, concise answer +- Do not make up information not present in the context +- If the context does not contain enough information, respond with "I don't know" + +Reasoning: +[Your step-by-step reasoning process here] + +Answer: +[Your final answer here]` +} + +export const RAGZOOM_PROMPTS: ProviderPrompts = { + answerPrompt: buildRagZoomAnswerPrompt, +} + +export default RAGZOOM_PROMPTS diff --git a/src/types/provider.ts b/src/types/provider.ts index cdc0228..afee1bd 100644 --- a/src/types/provider.ts +++ b/src/types/provider.ts @@ -47,4 +47,4 @@ export interface Provider { clear(containerTag: string): Promise } -export type ProviderName = "supermemory" | "mem0" | "zep" | "filesystem" | "rag" +export type ProviderName = "supermemory" | "mem0" | "zep" | "filesystem" | "rag" | "ragzoom" diff --git a/src/utils/config.ts b/src/utils/config.ts index 8ac1268..0c5fa10 100644 --- a/src/utils/config.ts +++ b/src/utils/config.ts @@ -30,6 +30,8 @@ export function getProviderConfig(provider: string): { apiKey: string; baseUrl?: return { apiKey: config.openaiApiKey } // Filesystem uses OpenAI for memory extraction case "rag": return { apiKey: config.openaiApiKey } // RAG provider uses OpenAI for embeddings + case "ragzoom": + return { apiKey: "", baseUrl: process.env.RAGZOOM_BRIDGE_URL || "http://localhost:8079" } // RagZoom is local, no API key needed default: throw new Error(`Unknown provider: ${provider}`) }