-
Notifications
You must be signed in to change notification settings - Fork 2
Phase 2 TOC Building
This page aggregates all Phase 2 documentation for the TOC Building phase.
Segmentation, summarization, and time hierarchy construction.
phase: 02-toc-building plan: 01 type: execute wave: 1 depends_on: [] files_modified:
- crates/memory-types/src/segment.rs
- crates/memory-types/src/lib.rs
- crates/memory-toc/Cargo.toml
- crates/memory-toc/src/lib.rs
- crates/memory-toc/src/segmenter.rs
- crates/memory-toc/src/config.rs
- Cargo.toml autonomous: true
must_haves: truths: - "Segment struct exists with events, overlap_events, start/end times, token count" - "SegmentationConfig has time_threshold (30min), token_threshold (4K), overlap settings" - "SegmentBuilder detects boundaries based on time gaps and token counts" - "Overlap events from previous segment included for context continuity" - "Token counting works for event text" artifacts: - path: "crates/memory-types/src/segment.rs" provides: "Segment type with overlap support" exports: ["Segment"] - path: "crates/memory-toc/src/segmenter.rs" provides: "Segmentation engine with boundary detection" exports: ["SegmentBuilder", "SegmentationConfig"] - path: "crates/memory-toc/src/config.rs" provides: "TOC configuration types" exports: ["TocConfig", "SegmentationConfig"] key_links: - from: "crates/memory-toc/src/segmenter.rs" to: "crates/memory-types/src/event.rs" via: "Event processing" pattern: "Event"
Purpose: Enable TOC building by creating coherent conversation segments that can be summarized. Output: SegmentBuilder that produces Segment structs with proper overlap handling.
<execution_context> @/Users/richardhightower/.claude/get-shit-done/workflows/execute-plan.md @/Users/richardhightower/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-toc-building/02-RESEARCH.md Task 1: Create memory-toc crate and add Segment type - Cargo.toml - crates/memory-toc/Cargo.toml - crates/memory-types/src/segment.rs - crates/memory-types/src/lib.rs Create new memory-toc crate and add Segment type to memory-types.Update workspace Cargo.toml to add memory-toc:
# Add to [workspace] members
members = [
"crates/memory-types",
"crates/memory-storage",
"crates/memory-service",
"crates/memory-daemon",
"crates/memory-toc",
]
# Add to [workspace.dependencies]
tiktoken-rs = "0.5"
async-trait = "0.1"Create crates/memory-toc/Cargo.toml:
[package]
name = "memory-toc"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
[dependencies]
memory-types = { path = "../memory-types" }
memory-storage = { path = "../memory-storage" }
tiktoken-rs = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
ulid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }Create crates/memory-types/src/segment.rs:
//! Segment type for conversation segmentation.
//!
//! Segments group related events for summarization.
//! Per TOC-03: Created on time threshold (30 min) or token threshold (4K).
//! Per TOC-04: Include overlap for context continuity.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use crate::Event;
/// A segment of conversation events.
///
/// Segments are the leaf nodes of the TOC hierarchy, containing
/// actual events that will be summarized.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Segment {
/// Unique segment identifier
pub segment_id: String,
/// Events in the overlap window (from previous segment for context)
/// Per TOC-04: Provides context continuity
#[serde(default)]
pub overlap_events: Vec<Event>,
/// Events in this segment (excluding overlap)
pub events: Vec<Event>,
/// Start time of the segment (first event, excluding overlap)
#[serde(with = "chrono::serde::ts_milliseconds")]
pub start_time: DateTime<Utc>,
/// End time of the segment (last event)
#[serde(with = "chrono::serde::ts_milliseconds")]
pub end_time: DateTime<Utc>,
/// Token count of events (excluding overlap)
pub token_count: usize,
}
impl Segment {
/// Create a new segment
pub fn new(
segment_id: String,
events: Vec<Event>,
start_time: DateTime<Utc>,
end_time: DateTime<Utc>,
token_count: usize,
) -> Self {
Self {
segment_id,
overlap_events: Vec::new(),
events,
start_time,
end_time,
token_count,
}
}
/// Add overlap events from previous segment
pub fn with_overlap(mut self, overlap_events: Vec<Event>) -> Self {
self.overlap_events = overlap_events;
self
}
/// Get all events (overlap + main) for summarization
pub fn all_events(&self) -> Vec<&Event> {
self.overlap_events.iter().chain(self.events.iter()).collect()
}
/// Serialize to JSON bytes
pub fn to_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
/// Deserialize from JSON bytes
pub fn from_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{EventRole, EventType};
fn create_test_event(text: &str) -> Event {
Event::new(
ulid::Ulid::new().to_string(),
"session-123".to_string(),
Utc::now(),
EventType::UserMessage,
EventRole::User,
text.to_string(),
)
}
#[test]
fn test_segment_creation() {
let events = vec![create_test_event("Hello"), create_test_event("World")];
let start = events[0].timestamp;
let end = events[1].timestamp;
let segment = Segment::new(
"seg-123".to_string(),
events.clone(),
start,
end,
100,
);
assert_eq!(segment.events.len(), 2);
assert_eq!(segment.token_count, 100);
}
#[test]
fn test_segment_with_overlap() {
let overlap = vec![create_test_event("Context")];
let events = vec![create_test_event("Main")];
let start = events[0].timestamp;
let end = events[0].timestamp;
let segment = Segment::new("seg-123".to_string(), events, start, end, 50)
.with_overlap(overlap);
assert_eq!(segment.overlap_events.len(), 1);
assert_eq!(segment.all_events().len(), 2);
}
#[test]
fn test_segment_serialization() {
let events = vec![create_test_event("Test")];
let start = events[0].timestamp;
let segment = Segment::new("seg-123".to_string(), events, start, start, 25);
let bytes = segment.to_bytes().unwrap();
let decoded = Segment::from_bytes(&bytes).unwrap();
assert_eq!(segment.segment_id, decoded.segment_id);
assert_eq!(segment.token_count, decoded.token_count);
}
}Update crates/memory-types/src/lib.rs to export Segment:
//! Domain types for agent-memory.
//!
//! This crate contains the core data structures used throughout
//! the agent-memory system.
mod config;
mod error;
mod event;
mod grip;
mod outbox;
mod segment;
mod toc;
pub use config::{Settings, SummarizerSettings};
pub use error::TypeError;
pub use event::{Event, EventRole, EventType};
pub use grip::Grip;
pub use outbox::{OutboxEntry, OutboxEntryType};
pub use segment::Segment;
pub use toc::{TocBullet, TocLevel, TocNode};Create crates/memory-toc/src/lib.rs:
//! TOC building library for agent-memory.
//!
//! Provides:
//! - Event segmentation (TOC-03, TOC-04)
//! - Summarization trait (SUMM-01)
//! - TOC hierarchy building (TOC-01, TOC-02, TOC-05)
pub mod config;
pub mod segmenter;
pub use config::{SegmentationConfig, TocConfig};
pub use segmenter::SegmentBuilder;Create crates/memory-toc/src/config.rs:
//! Configuration for TOC building.
use serde::{Deserialize, Serialize};
/// Configuration for event segmentation.
///
/// Per TOC-03: Segment on time threshold (30 min) or token threshold (4K).
/// Per TOC-04: Overlap for context continuity.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentationConfig {
/// Maximum time gap before starting new segment (milliseconds)
/// Per TOC-03: Default 30 minutes
pub time_threshold_ms: i64,
/// Maximum tokens before starting new segment
/// Per TOC-03: Default 4000 tokens
pub token_threshold: usize,
/// Overlap time to include from previous segment (milliseconds)
/// Per TOC-04: Default 5 minutes
pub overlap_time_ms: i64,
/// Overlap tokens to include from previous segment
/// Per TOC-04: Default 500 tokens
pub overlap_tokens: usize,
/// Maximum text length to count for tool results (to avoid explosion)
pub max_tool_result_chars: usize,
}
impl Default for SegmentationConfig {
fn default() -> Self {
Self {
time_threshold_ms: 30 * 60 * 1000, // 30 minutes
token_threshold: 4000,
overlap_time_ms: 5 * 60 * 1000, // 5 minutes
overlap_tokens: 500,
max_tool_result_chars: 1000,
}
}
}
/// Overall TOC configuration.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TocConfig {
/// Segmentation settings
pub segmentation: SegmentationConfig,
/// Minimum events to create a segment
pub min_events_per_segment: usize,
}
impl Default for TocConfig {
fn default() -> Self {
Self {
segmentation: SegmentationConfig::default(),
min_events_per_segment: 2,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = SegmentationConfig::default();
assert_eq!(config.time_threshold_ms, 30 * 60 * 1000);
assert_eq!(config.token_threshold, 4000);
assert_eq!(config.overlap_time_ms, 5 * 60 * 1000);
assert_eq!(config.overlap_tokens, 500);
}
#[test]
fn test_config_serialization() {
let config = TocConfig::default();
let json = serde_json::to_string(&config).unwrap();
let decoded: TocConfig = serde_json::from_str(&json).unwrap();
assert_eq!(config.segmentation.token_threshold, decoded.segmentation.token_threshold);
}
}Create crates/memory-toc/src/segmenter.rs:
//! Event segmentation engine.
//!
//! Per TOC-03: Creates segments on time threshold (30 min) or token threshold (4K).
//! Per TOC-04: Includes overlap for context continuity.
use chrono::{DateTime, Utc};
use tracing::{debug, trace};
use memory_types::{Event, EventType, Segment};
use crate::config::SegmentationConfig;
/// Token counter for events.
pub struct TokenCounter {
/// Maximum chars for tool results
max_tool_result_chars: usize,
}
impl TokenCounter {
pub fn new(max_tool_result_chars: usize) -> Self {
Self { max_tool_result_chars }
}
/// Count tokens in event text.
///
/// Uses tiktoken for accurate OpenAI token counting.
/// Truncates tool results to avoid token explosion.
pub fn count_event(&self, event: &Event) -> usize {
let text = if event.event_type == EventType::ToolResult {
// Truncate tool results to avoid token explosion
let len = event.text.len().min(self.max_tool_result_chars);
&event.text[..len]
} else {
&event.text
};
// Use tiktoken for accurate counting
// Fall back to estimate if tiktoken unavailable
match tiktoken_rs::cl100k_base() {
Ok(bpe) => bpe.encode_with_special_tokens(text).len(),
Err(_) => {
// Rough estimate: ~4 chars per token
(text.len() / 4).max(1)
}
}
}
/// Count tokens in multiple events.
pub fn count_events(&self, events: &[Event]) -> usize {
events.iter().map(|e| self.count_event(e)).sum()
}
}
/// Builder for creating segments from a stream of events.
///
/// Detects segment boundaries based on:
/// - Time gaps (TOC-03: 30 min default)
/// - Token thresholds (TOC-03: 4K default)
///
/// Includes overlap from previous segment (TOC-04).
pub struct SegmentBuilder {
config: SegmentationConfig,
token_counter: TokenCounter,
/// Events in current segment being built
current_events: Vec<Event>,
/// Token count of current segment
current_tokens: usize,
/// Time of last event
last_event_time: Option<DateTime<Utc>>,
/// Events to include as overlap in next segment
overlap_buffer: Vec<Event>,
/// Tokens in overlap buffer
overlap_tokens: usize,
}
impl SegmentBuilder {
/// Create a new segment builder with the given configuration.
pub fn new(config: SegmentationConfig) -> Self {
let token_counter = TokenCounter::new(config.max_tool_result_chars);
Self {
config,
token_counter,
current_events: Vec::new(),
current_tokens: 0,
last_event_time: None,
overlap_buffer: Vec::new(),
overlap_tokens: 0,
}
}
/// Add an event to the builder.
///
/// Returns Some(Segment) if a boundary was detected and segment completed.
pub fn add_event(&mut self, event: Event) -> Option<Segment> {
let event_tokens = self.token_counter.count_event(&event);
trace!(
event_id = %event.event_id,
tokens = event_tokens,
"Processing event"
);
// Check for time gap boundary
if let Some(last_time) = self.last_event_time {
let gap_ms = event.timestamp.timestamp_millis() - last_time.timestamp_millis();
if gap_ms > self.config.time_threshold_ms && !self.current_events.is_empty() {
debug!(
gap_ms = gap_ms,
threshold = self.config.time_threshold_ms,
"Time gap boundary detected"
);
let segment = self.flush_segment();
self.add_event_internal(event, event_tokens);
return Some(segment);
}
}
// Check for token threshold boundary
if self.current_tokens + event_tokens > self.config.token_threshold
&& !self.current_events.is_empty()
{
debug!(
current_tokens = self.current_tokens,
event_tokens = event_tokens,
threshold = self.config.token_threshold,
"Token threshold boundary detected"
);
let segment = self.flush_segment();
self.add_event_internal(event, event_tokens);
return Some(segment);
}
// No boundary, add to current segment
self.add_event_internal(event, event_tokens);
None
}
/// Internal method to add event to current segment.
fn add_event_internal(&mut self, event: Event, event_tokens: usize) {
self.last_event_time = Some(event.timestamp);
self.current_events.push(event);
self.current_tokens += event_tokens;
}
/// Flush current events as a completed segment.
fn flush_segment(&mut self) -> Segment {
let events = std::mem::take(&mut self.current_events);
let tokens = self.current_tokens;
self.current_tokens = 0;
let start_time = events.first().map(|e| e.timestamp).unwrap_or_else(Utc::now);
let end_time = events.last().map(|e| e.timestamp).unwrap_or_else(Utc::now);
// Create segment with overlap from previous
let overlap = std::mem::take(&mut self.overlap_buffer);
let segment_id = format!("seg:{}", ulid::Ulid::new());
debug!(
segment_id = %segment_id,
events = events.len(),
overlap = overlap.len(),
tokens = tokens,
"Created segment"
);
// Build overlap buffer for next segment
self.build_overlap_buffer(&events);
Segment::new(segment_id, events, start_time, end_time, tokens).with_overlap(overlap)
}
/// Build overlap buffer for next segment from current events.
fn build_overlap_buffer(&mut self, events: &[Event]) {
if events.is_empty() {
return;
}
let end_time = events.last().unwrap().timestamp;
let overlap_start_ms = end_time.timestamp_millis() - self.config.overlap_time_ms;
let mut overlap_events = Vec::new();
let mut overlap_tokens = 0;
// Collect events within overlap time window, up to token limit
for event in events.iter().rev() {
if event.timestamp.timestamp_millis() < overlap_start_ms {
break;
}
let tokens = self.token_counter.count_event(event);
if overlap_tokens + tokens > self.config.overlap_tokens {
break;
}
overlap_events.push(event.clone());
overlap_tokens += tokens;
}
// Reverse to maintain chronological order
overlap_events.reverse();
self.overlap_buffer = overlap_events;
self.overlap_tokens = overlap_tokens;
trace!(
overlap_events = self.overlap_buffer.len(),
overlap_tokens = self.overlap_tokens,
"Built overlap buffer"
);
}
/// Flush any remaining events as a final segment.
///
/// Call this when processing is complete to get any remaining events.
pub fn flush(&mut self) -> Option<Segment> {
if self.current_events.is_empty() {
return None;
}
Some(self.flush_segment())
}
/// Check if builder has pending events.
pub fn has_pending(&self) -> bool {
!self.current_events.is_empty()
}
/// Get current token count.
pub fn current_token_count(&self) -> usize {
self.current_tokens
}
/// Get current event count.
pub fn current_event_count(&self) -> usize {
self.current_events.len()
}
}
/// Process a batch of events into segments.
pub fn segment_events(events: Vec<Event>, config: SegmentationConfig) -> Vec<Segment> {
let mut builder = SegmentBuilder::new(config);
let mut segments = Vec::new();
for event in events {
if let Some(segment) = builder.add_event(event) {
segments.push(segment);
}
}
// Flush any remaining events
if let Some(segment) = builder.flush() {
segments.push(segment);
}
segments
}
#[cfg(test)]
mod tests {
use super::*;
use memory_types::{EventRole, EventType};
fn create_event_at(text: &str, timestamp_ms: i64) -> Event {
let ulid = ulid::Ulid::from_parts(timestamp_ms as u64, rand::random());
Event::new(
ulid.to_string(),
"session-123".to_string(),
chrono::Utc.timestamp_millis_opt(timestamp_ms).unwrap(),
EventType::UserMessage,
EventRole::User,
text.to_string(),
)
}
use chrono::TimeZone;
#[test]
fn test_token_counter_basic() {
let counter = TokenCounter::new(1000);
let event = create_event_at("Hello, world!", 1000);
let tokens = counter.count_event(&event);
assert!(tokens > 0);
assert!(tokens < 10); // "Hello, world!" should be ~4 tokens
}
#[test]
fn test_token_counter_truncates_tool_results() {
let counter = TokenCounter::new(100);
let mut event = create_event_at(&"x".repeat(1000), 1000);
event.event_type = EventType::ToolResult;
let tokens = counter.count_event(&event);
// Should be based on truncated text (100 chars), not full 1000
assert!(tokens < 50);
}
#[test]
fn test_segment_builder_time_boundary() {
let config = SegmentationConfig {
time_threshold_ms: 1000, // 1 second for testing
token_threshold: 10000,
overlap_time_ms: 500,
overlap_tokens: 100,
max_tool_result_chars: 1000,
};
let mut builder = SegmentBuilder::new(config);
// Events within 1 second - no boundary
assert!(builder.add_event(create_event_at("First", 1000)).is_none());
assert!(builder.add_event(create_event_at("Second", 1500)).is_none());
// Event after 2 second gap - boundary
let segment = builder.add_event(create_event_at("After gap", 4000));
assert!(segment.is_some());
let seg = segment.unwrap();
assert_eq!(seg.events.len(), 2);
}
#[test]
fn test_segment_builder_token_boundary() {
let config = SegmentationConfig {
time_threshold_ms: 1000000, // Very high to not trigger
token_threshold: 10, // Very low to trigger
overlap_time_ms: 500,
overlap_tokens: 5,
max_tool_result_chars: 1000,
};
let mut builder = SegmentBuilder::new(config);
// First event
assert!(builder.add_event(create_event_at("Short", 1000)).is_none());
// Long event should trigger boundary
let segment = builder.add_event(create_event_at(
"This is a much longer message that should exceed the token threshold",
2000,
));
assert!(segment.is_some());
}
#[test]
fn test_segment_builder_overlap() {
let config = SegmentationConfig {
time_threshold_ms: 1000,
token_threshold: 10000,
overlap_time_ms: 500,
overlap_tokens: 1000,
max_tool_result_chars: 1000,
};
let mut builder = SegmentBuilder::new(config);
// Add events
builder.add_event(create_event_at("Early", 1000));
builder.add_event(create_event_at("Middle", 1200));
builder.add_event(create_event_at("Late", 1400));
// Trigger boundary
let segment1 = builder.add_event(create_event_at("After gap", 5000)).unwrap();
assert_eq!(segment1.events.len(), 3);
// Add more events and flush
builder.add_event(create_event_at("New event", 5500));
let segment2 = builder.flush().unwrap();
// Second segment should have overlap from first
assert!(!segment2.overlap_events.is_empty());
}
#[test]
fn test_segment_events_batch() {
let config = SegmentationConfig {
time_threshold_ms: 1000,
token_threshold: 10000,
overlap_time_ms: 100,
overlap_tokens: 50,
max_tool_result_chars: 1000,
};
let events = vec![
create_event_at("Event 1", 1000),
create_event_at("Event 2", 1500),
create_event_at("Event 3", 5000), // Gap
create_event_at("Event 4", 5500),
];
let segments = segment_events(events, config);
assert_eq!(segments.len(), 2);
}
#[test]
fn test_flush_empty_builder() {
let mut builder = SegmentBuilder::new(SegmentationConfig::default());
assert!(builder.flush().is_none());
}
}<success_criteria>
- Segment type exists with overlap support (TOC-04)
- SegmentationConfig has correct defaults (TOC-03: 30min, 4K tokens)
- SegmentBuilder detects both time and token boundaries
- Token counting handles tool result truncation
- Overlap buffer correctly populated for context continuity
- All tests pass </success_criteria>
- Created
crates/memory-toc/src/config.rswith configurable parameters:-
time_gap_threshold: 30 minutes default -
token_threshold: 4000 tokens default -
overlap_duration: 5 minutes default -
overlap_tokens: 500 tokens default -
max_tool_result_size: 1000 bytes default
-
- Created
crates/memory-types/src/segment.rswith:-
Segmentstruct withoverlap_eventsandeventsfields -
all_events()method for combining overlap and main events - JSON serialization/deserialization support
-
- Created
crates/memory-toc/src/segmenter.rswith:-
TokenCounterusing tiktoken-rs for accurate counting -
SegmentBuilderwith time-gap and token-threshold boundary detection - Overlap buffer management for context continuity
-
segment_events()convenience function
-
| File | Purpose | Exports |
|---|---|---|
config.rs |
Segmentation config |
SegmentationConfig, TocConfig
|
segment.rs |
Segment type | Segment |
segmenter.rs |
Segmentation engine |
SegmentBuilder, TokenCounter, segment_events
|
-
cargo build -p memory-toccompiles -
cargo build -p memory-typescompiles - All segmentation tests pass (7 tests)
- All memory-types tests pass (13 tests)
- TOC-03: Segmentation parameters configurable
- TOC-04: Overlap events with preceding segments
phase: 02-toc-building plan: 02 type: execute wave: 1 depends_on: [] files_modified:
- crates/memory-toc/src/summarizer.rs
- crates/memory-toc/src/summarizer/mod.rs
- crates/memory-toc/src/summarizer/api.rs
- crates/memory-toc/src/summarizer/mock.rs
- crates/memory-toc/src/lib.rs
- crates/memory-toc/Cargo.toml autonomous: true
must_haves: truths: - "Summarizer trait exists with summarize_events and summarize_children methods" - "Summary struct has title, bullets, keywords fields" - "ApiSummarizer implements Summarizer using HTTP API calls" - "MockSummarizer exists for testing" - "Error handling for API failures with retry capability" artifacts: - path: "crates/memory-toc/src/summarizer/mod.rs" provides: "Summarizer trait and Summary type" exports: ["Summarizer", "Summary", "SummarizerError"] - path: "crates/memory-toc/src/summarizer/api.rs" provides: "API-based summarizer implementation" exports: ["ApiSummarizer", "ApiSummarizerConfig"] - path: "crates/memory-toc/src/summarizer/mock.rs" provides: "Mock summarizer for testing" exports: ["MockSummarizer"] key_links: - from: "crates/memory-toc/src/summarizer/api.rs" to: "reqwest" via: "HTTP client" pattern: "reqwest::Client"
Purpose: Enable TOC nodes to have meaningful summaries (title, bullets, keywords) generated from conversation events. Output: Summarizer trait with API implementation (OpenAI/Claude compatible) and mock for testing.
<execution_context> @/Users/richardhightower/.claude/get-shit-done/workflows/execute-plan.md @/Users/richardhightower/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-toc-building/02-RESEARCH.md Task 1: Add summarizer dependencies to memory-toc - crates/memory-toc/Cargo.toml - Cargo.toml Add HTTP client and retry dependencies for API summarizer.Update workspace Cargo.toml [workspace.dependencies]:
# Add these to [workspace.dependencies] if not present
reqwest = { version = "0.12", features = ["json"] }
backoff = { version = "0.4", features = ["tokio"] }
secrecy = { version = "0.10", features = ["serde"] }Update crates/memory-toc/Cargo.toml:
[package]
name = "memory-toc"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
[dependencies]
memory-types = { path = "../memory-types" }
memory-storage = { path = "../memory-storage" }
tiktoken-rs = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }
ulid = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tracing = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
reqwest = { workspace = true }
backoff = { workspace = true }
secrecy = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
wiremock = "0.6"Create crates/memory-toc/src/summarizer/mod.rs:
//! Summarization trait and implementations.
//!
//! Per SUMM-01: Pluggable Summarizer trait (async, supports API and local LLM).
//! Per SUMM-02: Generates title, bullets, keywords from events.
//! Per SUMM-04: Rollup summarizer aggregates child node summaries.
mod api;
mod mock;
pub use api::{ApiSummarizer, ApiSummarizerConfig};
pub use mock::MockSummarizer;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use memory_types::Event;
/// Error type for summarization operations.
#[derive(Debug, Error)]
pub enum SummarizerError {
#[error("API request failed: {0}")]
ApiError(String),
#[error("Failed to parse API response: {0}")]
ParseError(String),
#[error("Rate limit exceeded")]
RateLimitExceeded,
#[error("Invalid configuration: {0}")]
ConfigError(String),
#[error("Timeout waiting for response")]
Timeout,
#[error("No events to summarize")]
NoEvents,
}
/// Output from summarization.
///
/// Per SUMM-02: Contains title, bullets, and keywords.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Summary {
/// Brief title capturing the main topic (5-10 words)
pub title: String,
/// Key points from the conversation (3-5 bullets)
pub bullets: Vec<String>,
/// Keywords for search and filtering (3-7 keywords)
pub keywords: Vec<String>,
}
impl Summary {
/// Create a new summary.
pub fn new(title: String, bullets: Vec<String>, keywords: Vec<String>) -> Self {
Self {
title,
bullets,
keywords,
}
}
/// Create an empty/placeholder summary.
pub fn empty() -> Self {
Self {
title: String::new(),
bullets: Vec::new(),
keywords: Vec::new(),
}
}
}
/// Pluggable summarizer trait.
///
/// Per SUMM-01: Async trait supporting API and local LLM.
#[async_trait]
pub trait Summarizer: Send + Sync {
/// Generate a summary from conversation events.
///
/// Per SUMM-02: Generates title, bullets, keywords.
async fn summarize_events(&self, events: &[Event]) -> Result<Summary, SummarizerError>;
/// Generate a rollup summary from child summaries.
///
/// Per SUMM-04: Aggregates child node summaries for parent TOC nodes.
async fn summarize_children(&self, summaries: &[Summary]) -> Result<Summary, SummarizerError>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_summary_creation() {
let summary = Summary::new(
"Discussed authentication".to_string(),
vec!["Implemented JWT".to_string(), "Fixed token refresh".to_string()],
vec!["auth".to_string(), "jwt".to_string()],
);
assert_eq!(summary.title, "Discussed authentication");
assert_eq!(summary.bullets.len(), 2);
assert_eq!(summary.keywords.len(), 2);
}
#[test]
fn test_summary_empty() {
let summary = Summary::empty();
assert!(summary.title.is_empty());
assert!(summary.bullets.is_empty());
assert!(summary.keywords.is_empty());
}
#[test]
fn test_summary_serialization() {
let summary = Summary::new(
"Test".to_string(),
vec!["Bullet 1".to_string()],
vec!["keyword".to_string()],
);
let json = serde_json::to_string(&summary).unwrap();
let decoded: Summary = serde_json::from_str(&json).unwrap();
assert_eq!(summary.title, decoded.title);
}
}Update crates/memory-toc/src/lib.rs:
//! TOC building library for agent-memory.
//!
//! Provides:
//! - Event segmentation (TOC-03, TOC-04)
//! - Summarization trait (SUMM-01, SUMM-02, SUMM-04)
//! - TOC hierarchy building (TOC-01, TOC-02, TOC-05)
pub mod config;
pub mod segmenter;
pub mod summarizer;
pub use config::{SegmentationConfig, TocConfig};
pub use segmenter::SegmentBuilder;
pub use summarizer::{ApiSummarizer, ApiSummarizerConfig, MockSummarizer, Summary, Summarizer, SummarizerError};Create crates/memory-toc/src/summarizer/mock.rs:
//! Mock summarizer for testing.
use async_trait::async_trait;
use memory_types::Event;
use super::{Summary, Summarizer, SummarizerError};
/// Mock summarizer that generates deterministic summaries.
///
/// Useful for testing without making API calls.
pub struct MockSummarizer {
/// Prefix for generated titles
title_prefix: String,
}
impl MockSummarizer {
/// Create a new mock summarizer.
pub fn new() -> Self {
Self {
title_prefix: "Summary of".to_string(),
}
}
/// Create with custom title prefix.
pub fn with_title_prefix(prefix: impl Into<String>) -> Self {
Self {
title_prefix: prefix.into(),
}
}
}
impl Default for MockSummarizer {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Summarizer for MockSummarizer {
async fn summarize_events(&self, events: &[Event]) -> Result<Summary, SummarizerError> {
if events.is_empty() {
return Err(SummarizerError::NoEvents);
}
// Extract some info from events for mock summary
let first_event = &events[0];
let last_event = &events[events.len() - 1];
let title = format!(
"{} {} events",
self.title_prefix,
events.len()
);
let bullets = vec![
format!("First message: {}", truncate(&first_event.text, 50)),
format!("Last message: {}", truncate(&last_event.text, 50)),
format!("Total events: {}", events.len()),
];
// Extract keywords from event text
let keywords = extract_mock_keywords(events);
Ok(Summary::new(title, bullets, keywords))
}
async fn summarize_children(&self, summaries: &[Summary]) -> Result<Summary, SummarizerError> {
if summaries.is_empty() {
return Err(SummarizerError::NoEvents);
}
let title = format!(
"{} {} child summaries",
self.title_prefix,
summaries.len()
);
// Collect bullets from children (first bullet from each)
let bullets: Vec<String> = summaries
.iter()
.filter_map(|s| s.bullets.first().cloned())
.take(5)
.collect();
// Merge keywords from all children
let mut all_keywords: Vec<String> = summaries
.iter()
.flat_map(|s| s.keywords.clone())
.collect();
all_keywords.sort();
all_keywords.dedup();
let keywords = all_keywords.into_iter().take(7).collect();
Ok(Summary::new(title, bullets, keywords))
}
}
/// Truncate text to max length, adding "..." if truncated.
fn truncate(text: &str, max_len: usize) -> String {
if text.len() <= max_len {
text.to_string()
} else {
format!("{}...", &text[..max_len.saturating_sub(3)])
}
}
/// Extract mock keywords from events (simple word extraction).
fn extract_mock_keywords(events: &[Event]) -> Vec<String> {
let all_text: String = events.iter().map(|e| e.text.as_str()).collect::<Vec<_>>().join(" ");
// Simple keyword extraction: split by whitespace, filter short words
let words: Vec<String> = all_text
.split_whitespace()
.filter(|w| w.len() > 3)
.map(|w| w.to_lowercase())
.filter(|w| !is_stopword(w))
.collect();
// Count and sort by frequency
let mut word_counts: std::collections::HashMap<String, usize> = std::collections::HashMap::new();
for word in words {
*word_counts.entry(word).or_insert(0) += 1;
}
let mut sorted: Vec<_> = word_counts.into_iter().collect();
sorted.sort_by(|a, b| b.1.cmp(&a.1));
sorted.into_iter().take(5).map(|(w, _)| w).collect()
}
/// Check if word is a common stopword.
fn is_stopword(word: &str) -> bool {
const STOPWORDS: &[&str] = &[
"the", "and", "for", "that", "this", "with", "from", "have", "has",
"been", "were", "will", "would", "could", "should", "there", "their",
"what", "when", "where", "which", "about", "into", "through",
];
STOPWORDS.contains(&word)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use memory_types::{EventRole, EventType};
fn create_test_event(text: &str) -> Event {
Event::new(
ulid::Ulid::new().to_string(),
"session-123".to_string(),
Utc::now(),
EventType::UserMessage,
EventRole::User,
text.to_string(),
)
}
#[tokio::test]
async fn test_mock_summarize_events() {
let summarizer = MockSummarizer::new();
let events = vec![
create_test_event("How do I implement authentication?"),
create_test_event("Use JWT tokens for stateless auth"),
];
let summary = summarizer.summarize_events(&events).await.unwrap();
assert!(summary.title.contains("2 events"));
assert_eq!(summary.bullets.len(), 3);
assert!(!summary.keywords.is_empty());
}
#[tokio::test]
async fn test_mock_summarize_empty() {
let summarizer = MockSummarizer::new();
let result = summarizer.summarize_events(&[]).await;
assert!(matches!(result, Err(SummarizerError::NoEvents)));
}
#[tokio::test]
async fn test_mock_summarize_children() {
let summarizer = MockSummarizer::new();
let summaries = vec![
Summary::new(
"Day 1".to_string(),
vec!["Worked on auth".to_string()],
vec!["auth".to_string()],
),
Summary::new(
"Day 2".to_string(),
vec!["Fixed bugs".to_string()],
vec!["bugs".to_string()],
),
];
let rollup = summarizer.summarize_children(&summaries).await.unwrap();
assert!(rollup.title.contains("2 child summaries"));
assert!(rollup.keywords.contains(&"auth".to_string()));
}
#[tokio::test]
async fn test_mock_custom_prefix() {
let summarizer = MockSummarizer::with_title_prefix("Overview of");
let events = vec![create_test_event("Test event")];
let summary = summarizer.summarize_events(&events).await.unwrap();
assert!(summary.title.starts_with("Overview of"));
}
}Create crates/memory-toc/src/summarizer/api.rs:
//! API-based summarizer using OpenAI-compatible endpoints.
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use reqwest::Client;
use secrecy::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize};
use std::time::Duration;
use tracing::{debug, error, warn};
use memory_types::Event;
use super::{Summary, Summarizer, SummarizerError};
/// Configuration for API-based summarizer.
#[derive(Debug, Clone)]
pub struct ApiSummarizerConfig {
/// API base URL (e.g., "https://api.openai.com/v1")
pub base_url: String,
/// Model to use (e.g., "gpt-4o-mini", "claude-3-haiku-20240307")
pub model: String,
/// API key
pub api_key: SecretString,
/// Request timeout
pub timeout: Duration,
/// Maximum retries on failure
pub max_retries: u32,
}
impl ApiSummarizerConfig {
/// Create config for OpenAI API.
pub fn openai(api_key: impl Into<String>, model: impl Into<String>) -> Self {
Self {
base_url: "https://api.openai.com/v1".to_string(),
model: model.into(),
api_key: SecretString::from(api_key.into()),
timeout: Duration::from_secs(60),
max_retries: 3,
}
}
/// Create config for Claude API.
pub fn claude(api_key: impl Into<String>, model: impl Into<String>) -> Self {
Self {
base_url: "https://api.anthropic.com/v1".to_string(),
model: model.into(),
api_key: SecretString::from(api_key.into()),
timeout: Duration::from_secs(60),
max_retries: 3,
}
}
}
/// API-based summarizer implementation.
pub struct ApiSummarizer {
client: Client,
config: ApiSummarizerConfig,
}
impl ApiSummarizer {
/// Create a new API summarizer.
pub fn new(config: ApiSummarizerConfig) -> Result<Self, SummarizerError> {
let client = Client::builder()
.timeout(config.timeout)
.build()
.map_err(|e| SummarizerError::ConfigError(e.to_string()))?;
Ok(Self { client, config })
}
/// Build prompt for event summarization.
fn build_events_prompt(&self, events: &[Event]) -> String {
let events_text: String = events
.iter()
.map(|e| {
let timestamp = e.timestamp.format("%Y-%m-%d %H:%M:%S");
format!("[{}] {}: {}", timestamp, e.role, e.text)
})
.collect::<Vec<_>>()
.join("\n\n");
format!(
r#"Summarize this conversation segment for a Table of Contents entry.
CONVERSATION:
{events_text}
Provide your response in JSON format:
{{
"title": "Brief title (5-10 words)",
"bullets": ["Key point 1", "Key point 2", "Key point 3"],
"keywords": ["keyword1", "keyword2", "keyword3"]
}}
Guidelines:
- Title should capture the main topic or activity
- 3-5 bullet points summarizing key discussions or decisions
- 3-7 keywords for search/filtering
- Focus on what would help someone find this conversation later"#
)
}
/// Build prompt for rollup summarization.
fn build_rollup_prompt(&self, summaries: &[Summary]) -> String {
let summaries_text: String = summaries
.iter()
.enumerate()
.map(|(i, s)| {
let bullets = s.bullets.join("\n - ");
format!(
"### Summary {}\nTitle: {}\nBullets:\n - {}\nKeywords: {}",
i + 1,
s.title,
bullets,
s.keywords.join(", ")
)
})
.collect::<Vec<_>>()
.join("\n\n");
format!(
r#"Create a higher-level summary by aggregating these child summaries.
CHILD SUMMARIES:
{summaries_text}
Provide your response in JSON format:
{{
"title": "Brief title (5-10 words)",
"bullets": ["Key point 1", "Key point 2", "Key point 3"],
"keywords": ["keyword1", "keyword2", "keyword3"]
}}
Guidelines:
- Title should capture the overall theme
- 3-5 bullet points covering the most important topics across all children
- 3-7 keywords representing major themes
- Focus on themes and patterns, not individual details"#
)
}
/// Call the API with retry logic.
async fn call_api(&self, prompt: &str) -> Result<String, SummarizerError> {
let mut backoff = ExponentialBackoff {
max_elapsed_time: Some(Duration::from_secs(120)),
..Default::default()
};
let mut attempts = 0;
loop {
attempts += 1;
debug!(attempt = attempts, "Calling summarization API");
match self.make_request(prompt).await {
Ok(response) => return Ok(response),
Err(e) => {
if attempts >= self.config.max_retries {
error!(error = %e, "Max retries exceeded");
return Err(e);
}
match backoff.next_backoff() {
Some(duration) => {
warn!(
error = %e,
retry_in_ms = duration.as_millis(),
"API call failed, retrying"
);
tokio::time::sleep(duration).await;
}
None => {
error!(error = %e, "Backoff exhausted");
return Err(e);
}
}
}
}
}
}
/// Make a single API request.
async fn make_request(&self, prompt: &str) -> Result<String, SummarizerError> {
// Build request based on API type
let is_anthropic = self.config.base_url.contains("anthropic");
let response = if is_anthropic {
self.make_anthropic_request(prompt).await?
} else {
self.make_openai_request(prompt).await?
};
Ok(response)
}
/// Make OpenAI-compatible API request.
async fn make_openai_request(&self, prompt: &str) -> Result<String, SummarizerError> {
#[derive(Serialize)]
struct OpenAIRequest {
model: String,
messages: Vec<OpenAIMessage>,
response_format: OpenAIResponseFormat,
}
#[derive(Serialize)]
struct OpenAIMessage {
role: String,
content: String,
}
#[derive(Serialize)]
struct OpenAIResponseFormat {
#[serde(rename = "type")]
format_type: String,
}
#[derive(Deserialize)]
struct OpenAIResponse {
choices: Vec<OpenAIChoice>,
}
#[derive(Deserialize)]
struct OpenAIChoice {
message: OpenAIMessageResponse,
}
#[derive(Deserialize)]
struct OpenAIMessageResponse {
content: String,
}
let request = OpenAIRequest {
model: self.config.model.clone(),
messages: vec![OpenAIMessage {
role: "user".to_string(),
content: prompt.to_string(),
}],
response_format: OpenAIResponseFormat {
format_type: "json_object".to_string(),
},
};
let url = format!("{}/chat/completions", self.config.base_url);
let response = self
.client
.post(&url)
.header("Authorization", format!("Bearer {}", self.config.api_key.expose_secret()))
.header("Content-Type", "application/json")
.json(&request)
.send()
.await
.map_err(|e| SummarizerError::ApiError(e.to_string()))?;
if response.status() == 429 {
return Err(SummarizerError::RateLimitExceeded);
}
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(SummarizerError::ApiError(format!(
"HTTP {}: {}",
status, body
)));
}
let response_body: OpenAIResponse = response
.json()
.await
.map_err(|e| SummarizerError::ParseError(e.to_string()))?;
response_body
.choices
.first()
.map(|c| c.message.content.clone())
.ok_or_else(|| SummarizerError::ParseError("No choices in response".to_string()))
}
/// Make Anthropic API request.
async fn make_anthropic_request(&self, prompt: &str) -> Result<String, SummarizerError> {
#[derive(Serialize)]
struct AnthropicRequest {
model: String,
max_tokens: u32,
messages: Vec<AnthropicMessage>,
}
#[derive(Serialize)]
struct AnthropicMessage {
role: String,
content: String,
}
#[derive(Deserialize)]
struct AnthropicResponse {
content: Vec<AnthropicContent>,
}
#[derive(Deserialize)]
struct AnthropicContent {
text: String,
}
let request = AnthropicRequest {
model: self.config.model.clone(),
max_tokens: 1024,
messages: vec![AnthropicMessage {
role: "user".to_string(),
content: prompt.to_string(),
}],
};
let url = format!("{}/messages", self.config.base_url);
let response = self
.client
.post(&url)
.header("x-api-key", self.config.api_key.expose_secret())
.header("anthropic-version", "2023-06-01")
.header("Content-Type", "application/json")
.json(&request)
.send()
.await
.map_err(|e| SummarizerError::ApiError(e.to_string()))?;
if response.status() == 429 {
return Err(SummarizerError::RateLimitExceeded);
}
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(SummarizerError::ApiError(format!(
"HTTP {}: {}",
status, body
)));
}
let response_body: AnthropicResponse = response
.json()
.await
.map_err(|e| SummarizerError::ParseError(e.to_string()))?;
response_body
.content
.first()
.map(|c| c.text.clone())
.ok_or_else(|| SummarizerError::ParseError("No content in response".to_string()))
}
/// Parse JSON response into Summary.
fn parse_summary(&self, response: &str) -> Result<Summary, SummarizerError> {
// Try to extract JSON from response (in case there's extra text)
let json_str = extract_json(response);
serde_json::from_str(&json_str).map_err(|e| {
SummarizerError::ParseError(format!("Failed to parse summary JSON: {}", e))
})
}
}
/// Extract JSON object from text (handles markdown code blocks).
fn extract_json(text: &str) -> String {
// Check for markdown code block
if let Some(start) = text.find("```json") {
if let Some(end) = text[start + 7..].find("```") {
return text[start + 7..start + 7 + end].trim().to_string();
}
}
// Check for plain code block
if let Some(start) = text.find("```") {
if let Some(end) = text[start + 3..].find("```") {
return text[start + 3..start + 3 + end].trim().to_string();
}
}
// Find first { and last }
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}')) {
return text[start..=end].to_string();
}
text.to_string()
}
#[async_trait]
impl Summarizer for ApiSummarizer {
async fn summarize_events(&self, events: &[Event]) -> Result<Summary, SummarizerError> {
if events.is_empty() {
return Err(SummarizerError::NoEvents);
}
let prompt = self.build_events_prompt(events);
let response = self.call_api(&prompt).await?;
self.parse_summary(&response)
}
async fn summarize_children(&self, summaries: &[Summary]) -> Result<Summary, SummarizerError> {
if summaries.is_empty() {
return Err(SummarizerError::NoEvents);
}
let prompt = self.build_rollup_prompt(summaries);
let response = self.call_api(&prompt).await?;
self.parse_summary(&response)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_extract_json_plain() {
let text = r#"{"title": "Test", "bullets": [], "keywords": []}"#;
let json = extract_json(text);
assert_eq!(json, text);
}
#[test]
fn test_extract_json_code_block() {
let text = r#"Here's the summary:
```json
{"title": "Test", "bullets": [], "keywords": []}
```"#;
let json = extract_json(text);
assert!(json.contains("Test"));
}
#[test]
fn test_extract_json_with_prefix() {
let text = r#"Sure! Here's your summary: {"title": "Test", "bullets": [], "keywords": []}"#;
let json = extract_json(text);
assert!(json.starts_with('{'));
assert!(json.ends_with('}'));
}
#[test]
fn test_openai_config() {
let config = ApiSummarizerConfig::openai("test-key", "gpt-4o-mini");
assert!(config.base_url.contains("openai"));
assert_eq!(config.model, "gpt-4o-mini");
}
#[test]
fn test_claude_config() {
let config = ApiSummarizerConfig::claude("test-key", "claude-3-haiku-20240307");
assert!(config.base_url.contains("anthropic"));
assert_eq!(config.model, "claude-3-haiku-20240307");
}
}<success_criteria>
- Summarizer trait exists (SUMM-01)
- Summary has title, bullets, keywords (SUMM-02)
- summarize_children for rollup (SUMM-04)
- API implementation with retry
- Mock implementation for testing
- All tests pass </success_criteria>
- Created
crates/memory-toc/src/summarizer/mod.rswith:-
Summarizerasync trait (Send + Sync) -
summarize_events()for conversation events -
summarize_children()for rollup summaries -
Summarystruct with title, bullets, keywords -
SummarizerErrorenum with comprehensive error types
-
- Created
crates/memory-toc/src/summarizer/api.rswith:- OpenAI-compatible API requests
- Anthropic API requests
- Exponential backoff retry logic
- Rate limit handling
- JSON response parsing from markdown code blocks
- Created
crates/memory-toc/src/summarizer/mock.rswith:- Deterministic summaries for testing
- Keyword extraction from events
- Customizable title prefix
| File | Purpose | Exports |
|---|---|---|
summarizer/mod.rs |
Trait definition |
Summarizer, Summary, SummarizerError
|
summarizer/api.rs |
API implementation |
ApiSummarizer, ApiSummarizerConfig
|
summarizer/mock.rs |
Mock for testing | MockSummarizer |
-
cargo build -p memory-toccompiles - All summarizer tests pass (10 tests)
- Mock summarizer supports both event and rollup summarization
- SUMM-01: Pluggable Summarizer trait (async, supports API and local LLM)
- SUMM-02: Generates title, bullets, keywords from events
- SUMM-04: Rollup summarizer aggregates child node summaries
phase: 02-toc-building plan: 03 type: execute wave: 2 depends_on: ["02-01", "02-02"] files_modified:
- crates/memory-toc/src/builder.rs
- crates/memory-toc/src/node_id.rs
- crates/memory-toc/src/rollup.rs
- crates/memory-toc/src/lib.rs
- crates/memory-storage/src/db.rs
- crates/memory-storage/src/column_families.rs autonomous: true
must_haves: truths: - "TOC nodes exist at all levels (Year, Month, Week, Day, Segment)" - "Node IDs encode level and time period hierarchically" - "Segments summarized and stored as TOC nodes" - "Rollup jobs aggregate child nodes into parent summaries" - "Checkpoints enable crash recovery for rollup jobs" - "Versioned nodes append new version, don't mutate (TOC-06)" artifacts: - path: "crates/memory-toc/src/builder.rs" provides: "TOC hierarchy builder" exports: ["TocBuilder"] - path: "crates/memory-toc/src/node_id.rs" provides: "Node ID generation and parsing" exports: ["NodeId", "generate_node_id"] - path: "crates/memory-toc/src/rollup.rs" provides: "Rollup job implementation" exports: ["RollupJob", "RollupCheckpoint"] key_links: - from: "crates/memory-toc/src/builder.rs" to: "crates/memory-storage/src/db.rs" via: "Storage for TOC nodes" pattern: "Storage" - from: "crates/memory-toc/src/rollup.rs" to: "crates/memory-toc/src/summarizer/mod.rs" via: "Summarizer for rollups" pattern: "summarize_children"
Purpose: Build the complete TOC hierarchy from segments to year, with crash-recoverable rollup jobs. Output: TocBuilder that creates segment nodes and rollup jobs that aggregate into parent nodes.
<execution_context> @/Users/richardhightower/.claude/get-shit-done/workflows/execute-plan.md @/Users/richardhightower/.claude/get-shit-done/templates/summary.md </execution_context>
@.planning/PROJECT.md @.planning/ROADMAP.md @.planning/STATE.md @.planning/phases/02-toc-building/02-RESEARCH.md @.planning/phases/02-toc-building/02-01-SUMMARY.md @.planning/phases/02-toc-building/02-02-SUMMARY.md Task 1: Add TOC storage methods to Storage - crates/memory-storage/src/db.rs - crates/memory-storage/src/column_families.rs Add methods for storing and retrieving TOC nodes with versioning.Update crates/memory-storage/src/db.rs to add TOC methods:
Add these methods to the Storage impl block:
use crate::column_families::{CF_TOC_NODES, CF_TOC_LATEST};
impl Storage {
// ... existing methods ...
/// Store a TOC node with versioning (TOC-06).
///
/// Appends a new version rather than mutating.
/// Updates toc_latest to point to new version.
pub fn put_toc_node(&self, node: &memory_types::TocNode) -> Result<(), StorageError> {
let nodes_cf = self.db.cf_handle(CF_TOC_NODES)
.ok_or_else(|| StorageError::ColumnFamilyNotFound(CF_TOC_NODES.to_string()))?;
let latest_cf = self.db.cf_handle(CF_TOC_LATEST)
.ok_or_else(|| StorageError::ColumnFamilyNotFound(CF_TOC_LATEST.to_string()))?;
// Get current version
let latest_key = format!("latest:{}", node.node_id);
let current_version = self.db.get_cf(&latest_cf, &latest_key)?
.map(|b| {
if b.len() >= 4 {
u32::from_be_bytes([b[0], b[1], b[2], b[3]])
} else {
0
}
})
.unwrap_or(0);
let new_version = current_version + 1;
let versioned_key = format!("toc:{}:v{:06}", node.node_id, new_version);
// Update node version
let mut versioned_node = node.clone();
versioned_node.version = new_version;
let node_bytes = versioned_node.to_bytes()
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
// Atomic write: node + latest pointer
let mut batch = rocksdb::WriteBatch::default();
batch.put_cf(&nodes_cf, versioned_key.as_bytes(), &node_bytes);
batch.put_cf(&latest_cf, latest_key.as_bytes(), &new_version.to_be_bytes());
self.db.write(batch)?;
debug!(node_id = %node.node_id, version = new_version, "Stored TOC node");
Ok(())
}
/// Get the latest version of a TOC node.
pub fn get_toc_node(&self, node_id: &str) -> Result<Option<memory_types::TocNode>, StorageError> {
let nodes_cf = self.db.cf_handle(CF_TOC_NODES)
.ok_or_else(|| StorageError::ColumnFamilyNotFound(CF_TOC_NODES.to_string()))?;
let latest_cf = self.db.cf_handle(CF_TOC_LATEST)
.ok_or_else(|| StorageError::ColumnFamilyNotFound(CF_TOC_LATEST.to_string()))?;
// Get latest version number
let latest_key = format!("latest:{}", node_id);
let version = match self.db.get_cf(&latest_cf, &latest_key)? {
Some(b) if b.len() >= 4 => u32::from_be_bytes([b[0], b[1], b[2], b[3]]),
_ => return Ok(None),
};
// Get versioned node
let versioned_key = format!("toc:{}:v{:06}", node_id, version);
match self.db.get_cf(&nodes_cf, versioned_key.as_bytes())? {
Some(bytes) => {
let node = memory_types::TocNode::from_bytes(&bytes)
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
Ok(Some(node))
}
None => Ok(None),
}
}
/// Get TOC nodes by level, optionally filtered by time range.
pub fn get_toc_nodes_by_level(
&self,
level: memory_types::TocLevel,
start_time: Option<chrono::DateTime<chrono::Utc>>,
end_time: Option<chrono::DateTime<chrono::Utc>>,
) -> Result<Vec<memory_types::TocNode>, StorageError> {
let nodes_cf = self.db.cf_handle(CF_TOC_NODES)
.ok_or_else(|| StorageError::ColumnFamilyNotFound(CF_TOC_NODES.to_string()))?;
let latest_cf = self.db.cf_handle(CF_TOC_LATEST)
.ok_or_else(|| StorageError::ColumnFamilyNotFound(CF_TOC_LATEST.to_string()))?;
let level_prefix = format!("toc:{}:", level);
let mut nodes = Vec::new();
// Iterate through latest pointers to find all nodes of this level
let iter = self.db.iterator_cf(
&latest_cf,
rocksdb::IteratorMode::From(format!("latest:toc:{}:", level).as_bytes(), rocksdb::Direction::Forward),
);
for item in iter {
let (key, value) = item?;
let key_str = String::from_utf8_lossy(&key);
// Stop if we've passed this level's prefix
if !key_str.starts_with(&format!("latest:toc:{}:", level)) {
break;
}
// Get the node_id from key
let node_id = key_str.trim_start_matches("latest:");
if value.len() >= 4 {
let version = u32::from_be_bytes([value[0], value[1], value[2], value[3]]);
let versioned_key = format!("{}:v{:06}", node_id, version);
if let Some(bytes) = self.db.get_cf(&nodes_cf, versioned_key.as_bytes())? {
let node = memory_types::TocNode::from_bytes(&bytes)
.map_err(|e| StorageError::SerializationError(e.to_string()))?;
// Filter by time range if specified
let include = match (start_time, end_time) {
(Some(start), Some(end)) => node.end_time >= start && node.start_time <= end,
(Some(start), None) => node.end_time >= start,
(None, Some(end)) => node.start_time <= end,
(None, None) => true,
};
if include {
nodes.push(node);
}
}
}
}
// Sort by start_time
nodes.sort_by(|a, b| a.start_time.cmp(&b.start_time));
Ok(nodes)
}
/// Get child nodes of a parent node.
pub fn get_child_nodes(&self, parent_node_id: &str) -> Result<Vec<memory_types::TocNode>, StorageError> {
let parent = self.get_toc_node(parent_node_id)?;
match parent {
Some(node) => {
let mut children = Vec::new();
for child_id in &node.child_node_ids {
if let Some(child) = self.get_toc_node(child_id)? {
children.push(child);
}
}
children.sort_by(|a, b| a.start_time.cmp(&b.start_time));
Ok(children)
}
None => Ok(Vec::new()),
}
}
}Create crates/memory-toc/src/node_id.rs:
//! TOC node ID generation and parsing.
//!
//! Node IDs encode the level and time period for hierarchical organization.
//! Format: "toc:{level}:{time_identifier}"
use chrono::{DateTime, Datelike, IsoWeek, Utc, Weekday};
use memory_types::TocLevel;
/// Generate a node ID for the given level and time.
///
/// Examples:
/// - Year: "toc:year:2024"
/// - Month: "toc:month:2024:01"
/// - Week: "toc:week:2024:W03"
/// - Day: "toc:day:2024-01-15"
/// - Segment: "toc:segment:2024-01-15:01HN4QXKN6..."
pub fn generate_node_id(level: TocLevel, time: DateTime<Utc>) -> String {
match level {
TocLevel::Year => format!("toc:year:{}", time.year()),
TocLevel::Month => format!("toc:month:{}:{:02}", time.year(), time.month()),
TocLevel::Week => {
let iso_week = time.iso_week();
format!("toc:week:{}:W{:02}", iso_week.year(), iso_week.week())
}
TocLevel::Day => format!("toc:day:{}", time.format("%Y-%m-%d")),
TocLevel::Segment => format!(
"toc:segment:{}:{}",
time.format("%Y-%m-%d"),
ulid::Ulid::new()
),
}
}
/// Generate a node ID for a segment with a specific ULID.
pub fn generate_segment_node_id(time: DateTime<Utc>, segment_ulid: &str) -> String {
format!("toc:segment:{}:{}", time.format("%Y-%m-%d"), segment_ulid)
}
/// Get the parent node ID for a given node ID.
///
/// Returns None for year-level nodes (no parent).
pub fn get_parent_node_id(node_id: &str) -> Option<String> {
let parts: Vec<&str> = node_id.split(':').collect();
if parts.len() < 3 || parts[0] != "toc" {
return None;
}
match parts[1] {
"segment" => {
// toc:segment:2024-01-15:ulid -> toc:day:2024-01-15
if parts.len() >= 3 {
Some(format!("toc:day:{}", parts[2]))
} else {
None
}
}
"day" => {
// toc:day:2024-01-15 -> toc:week:2024:W03
if parts.len() >= 3 {
if let Ok(date) = chrono::NaiveDate::parse_from_str(parts[2], "%Y-%m-%d") {
let iso_week = date.iso_week();
return Some(format!("toc:week:{}:W{:02}", iso_week.year(), iso_week.week()));
}
}
None
}
"week" => {
// toc:week:2024:W03 -> toc:month:2024:01
// Need to find which month the week belongs to (use middle of week)
if parts.len() >= 4 {
if let (Ok(year), Ok(week)) = (
parts[2].parse::<i32>(),
parts[3].trim_start_matches('W').parse::<u32>(),
) {
// Get the Thursday of the week to determine the month
if let Some(date) = chrono::NaiveDate::from_isoywd_opt(year, week, Weekday::Thu) {
return Some(format!("toc:month:{}:{:02}", date.year(), date.month()));
}
}
}
None
}
"month" => {
// toc:month:2024:01 -> toc:year:2024
if parts.len() >= 3 {
Some(format!("toc:year:{}", parts[2]))
} else {
None
}
}
"year" => None, // Year has no parent
_ => None,
}
}
/// Parse level from node ID.
pub fn parse_level(node_id: &str) -> Option<TocLevel> {
let parts: Vec<&str> = node_id.split(':').collect();
if parts.len() < 2 || parts[0] != "toc" {
return None;
}
match parts[1] {
"year" => Some(TocLevel::Year),
"month" => Some(TocLevel::Month),
"week" => Some(TocLevel::Week),
"day" => Some(TocLevel::Day),
"segment" => Some(TocLevel::Segment),
_ => None,
}
}
/// Generate human-readable title for a node.
pub fn generate_title(level: TocLevel, time: DateTime<Utc>) -> String {
match level {
TocLevel::Year => format!("{}", time.year()),
TocLevel::Month => time.format("%B %Y").to_string(),
TocLevel::Week => {
let iso_week = time.iso_week();
format!("Week {} of {}", iso_week.week(), iso_week.year())
}
TocLevel::Day => time.format("%A, %B %d, %Y").to_string(),
TocLevel::Segment => time.format("%B %d, %Y at %H:%M").to_string(),
}
}
/// Get the time boundaries for a level at a given time.
pub fn get_time_boundaries(level: TocLevel, time: DateTime<Utc>) -> (DateTime<Utc>, DateTime<Utc>) {
use chrono::{Duration, NaiveTime, TimeZone};
match level {
TocLevel::Year => {
let start = Utc.with_ymd_and_hms(time.year(), 1, 1, 0, 0, 0).unwrap();
let end = Utc.with_ymd_and_hms(time.year() + 1, 1, 1, 0, 0, 0).unwrap() - Duration::milliseconds(1);
(start, end)
}
TocLevel::Month => {
let start = Utc.with_ymd_and_hms(time.year(), time.month(), 1, 0, 0, 0).unwrap();
let next_month = if time.month() == 12 {
Utc.with_ymd_and_hms(time.year() + 1, 1, 1, 0, 0, 0).unwrap()
} else {
Utc.with_ymd_and_hms(time.year(), time.month() + 1, 1, 0, 0, 0).unwrap()
};
let end = next_month - Duration::milliseconds(1);
(start, end)
}
TocLevel::Week => {
let iso_week = time.iso_week();
let monday = chrono::NaiveDate::from_isoywd_opt(iso_week.year(), iso_week.week(), Weekday::Mon).unwrap();
let start = Utc.from_utc_datetime(&monday.and_time(NaiveTime::MIN));
let end = start + Duration::days(7) - Duration::milliseconds(1);
(start, end)
}
TocLevel::Day => {
let date = time.date_naive();
let start = Utc.from_utc_datetime(&date.and_time(NaiveTime::MIN));
let end = start + Duration::days(1) - Duration::milliseconds(1);
(start, end)
}
TocLevel::Segment => {
// Segments have explicit boundaries, not calculated
(time, time)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
#[test]
fn test_generate_node_id_year() {
let time = Utc.with_ymd_and_hms(2024, 6, 15, 12, 0, 0).unwrap();
let id = generate_node_id(TocLevel::Year, time);
assert_eq!(id, "toc:year:2024");
}
#[test]
fn test_generate_node_id_month() {
let time = Utc.with_ymd_and_hms(2024, 1, 15, 12, 0, 0).unwrap();
let id = generate_node_id(TocLevel::Month, time);
assert_eq!(id, "toc:month:2024:01");
}
#[test]
fn test_generate_node_id_week() {
let time = Utc.with_ymd_and_hms(2024, 1, 18, 12, 0, 0).unwrap();
let id = generate_node_id(TocLevel::Week, time);
assert!(id.starts_with("toc:week:2024:W"));
}
#[test]
fn test_generate_node_id_day() {
let time = Utc.with_ymd_and_hms(2024, 1, 15, 12, 0, 0).unwrap();
let id = generate_node_id(TocLevel::Day, time);
assert_eq!(id, "toc:day:2024-01-15");
}
#[test]
fn test_generate_node_id_segment() {
let time = Utc.with_ymd_and_hms(2024, 1, 15, 12, 0, 0).unwrap();
let id = generate_node_id(TocLevel::Segment, time);
assert!(id.starts_with("toc:segment:2024-01-15:"));
}
#[test]
fn test_get_parent_node_id() {
assert_eq!(
get_parent_node_id("toc:day:2024-01-15"),
Some("toc:week:2024:W03".to_string())
);
assert_eq!(
get_parent_node_id("toc:month:2024:01"),
Some("toc:year:2024".to_string())
);
assert_eq!(get_parent_node_id("toc:year:2024"), None);
}
#[test]
fn test_parse_level() {
assert_eq!(parse_level("toc:year:2024"), Some(TocLevel::Year));
assert_eq!(parse_level("toc:month:2024:01"), Some(TocLevel::Month));
assert_eq!(parse_level("toc:day:2024-01-15"), Some(TocLevel::Day));
assert_eq!(parse_level("invalid"), None);
}
#[test]
fn test_generate_title() {
let time = Utc.with_ymd_and_hms(2024, 1, 15, 12, 0, 0).unwrap();
assert_eq!(generate_title(TocLevel::Year, time), "2024");
assert_eq!(generate_title(TocLevel::Month, time), "January 2024");
}
#[test]
fn test_get_time_boundaries_day() {
let time = Utc.with_ymd_and_hms(2024, 1, 15, 12, 30, 0).unwrap();
let (start, end) = get_time_boundaries(TocLevel::Day, time);
assert_eq!(start, Utc.with_ymd_and_hms(2024, 1, 15, 0, 0, 0).unwrap());
assert!(end > start);
assert!(end < Utc.with_ymd_and_hms(2024, 1, 16, 0, 0, 0).unwrap());
}
}Update crates/memory-toc/src/lib.rs:
//! TOC building library for agent-memory.
//!
//! Provides:
//! - Event segmentation (TOC-03, TOC-04)
//! - Summarization trait (SUMM-01, SUMM-02, SUMM-04)
//! - TOC hierarchy building (TOC-01, TOC-02, TOC-05)
//! - Node ID generation
pub mod builder;
pub mod config;
pub mod node_id;
pub mod rollup;
pub mod segmenter;
pub mod summarizer;
pub use builder::TocBuilder;
pub use config::{SegmentationConfig, TocConfig};
pub use node_id::{generate_node_id, generate_title, get_parent_node_id, parse_level};
pub use rollup::{RollupCheckpoint, RollupJob};
pub use segmenter::SegmentBuilder;
pub use summarizer::{ApiSummarizer, ApiSummarizerConfig, MockSummarizer, Summary, Summarizer, SummarizerError};Create crates/memory-toc/src/builder.rs:
//! TOC hierarchy builder.
//!
//! Builds TOC nodes from segments and ensures parent nodes exist.
use std::sync::Arc;
use chrono::{DateTime, Utc};
use tracing::{debug, info};
use memory_storage::Storage;
use memory_types::{Segment, TocBullet, TocLevel, TocNode};
use crate::node_id::{generate_node_id, generate_title, get_parent_node_id, get_time_boundaries};
use crate::summarizer::{Summary, Summarizer, SummarizerError};
/// Error type for TOC building.
#[derive(Debug, thiserror::Error)]
pub enum BuilderError {
#[error("Storage error: {0}")]
Storage(#[from] memory_storage::StorageError),
#[error("Summarization error: {0}")]
Summarizer(#[from] SummarizerError),
#[error("Invalid segment: {0}")]
InvalidSegment(String),
}
/// Builder for TOC hierarchy.
///
/// Processes segments and creates TOC nodes at all levels.
pub struct TocBuilder {
storage: Arc<Storage>,
summarizer: Arc<dyn Summarizer>,
}
impl TocBuilder {
/// Create a new TocBuilder.
pub fn new(storage: Arc<Storage>, summarizer: Arc<dyn Summarizer>) -> Self {
Self { storage, summarizer }
}
/// Process a segment and create/update TOC nodes.
///
/// Creates:
/// 1. Segment-level node from the segment
/// 2. Ensures parent nodes exist up to Year level
pub async fn process_segment(&self, segment: &Segment) -> Result<TocNode, BuilderError> {
if segment.events.is_empty() {
return Err(BuilderError::InvalidSegment("Segment has no events".to_string()));
}
info!(
segment_id = %segment.segment_id,
events = segment.events.len(),
"Processing segment"
);
// Summarize the segment
let all_events: Vec<_> = segment.all_events().cloned().collect();
let summary = self.summarizer.summarize_events(&all_events).await?;
// Create segment node
let segment_node = self.create_segment_node(segment, summary)?;
self.storage.put_toc_node(&segment_node)?;
// Ensure parent nodes exist and are updated
self.ensure_parents(&segment_node).await?;
Ok(segment_node)
}
/// Create a segment-level TOC node.
fn create_segment_node(&self, segment: &Segment, summary: Summary) -> Result<TocNode, BuilderError> {
let node_id = format!("toc:segment:{}:{}",
segment.start_time.format("%Y-%m-%d"),
segment.segment_id.trim_start_matches("seg:")
);
let bullets: Vec<TocBullet> = summary.bullets
.into_iter()
.map(TocBullet::new)
.collect();
let mut node = TocNode::new(
node_id,
TocLevel::Segment,
summary.title,
segment.start_time,
segment.end_time,
);
node.bullets = bullets;
node.keywords = summary.keywords;
Ok(node)
}
/// Ensure parent nodes exist up to Year level.
async fn ensure_parents(&self, child_node: &TocNode) -> Result<(), BuilderError> {
let mut current_id = child_node.node_id.clone();
let mut child_level = child_node.level;
while let Some(parent_level) = child_level.parent() {
if let Some(parent_id) = get_parent_node_id(¤t_id) {
// Check if parent exists
let parent = self.storage.get_toc_node(&parent_id)?;
if let Some(mut parent_node) = parent {
// Update parent's child list if needed
if !parent_node.child_node_ids.contains(¤t_id) {
parent_node.child_node_ids.push(current_id.clone());
self.storage.put_toc_node(&parent_node)?;
debug!(
parent = %parent_id,
child = %current_id,
"Added child to existing parent"
);
}
} else {
// Create parent node with placeholder summary
let parent_node = self.create_parent_node(&parent_id, parent_level, child_node, ¤t_id)?;
self.storage.put_toc_node(&parent_node)?;
debug!(
parent = %parent_id,
level = %parent_level,
"Created new parent node"
);
}
current_id = parent_id;
child_level = parent_level;
} else {
break;
}
}
Ok(())
}
/// Create a parent node with placeholder summary.
fn create_parent_node(
&self,
parent_id: &str,
level: TocLevel,
child: &TocNode,
child_id: &str,
) -> Result<TocNode, BuilderError> {
let (start_time, end_time) = get_time_boundaries(level, child.start_time);
let title = generate_title(level, child.start_time);
let mut node = TocNode::new(
parent_id.to_string(),
level,
title,
start_time,
end_time,
);
node.child_node_ids.push(child_id.to_string());
// Placeholder bullet - will be replaced by rollup job
node.bullets.push(TocBullet::new("Summary pending..."));
Ok(node)
}
/// Get all segment nodes for a day.
pub fn get_segments_for_day(&self, date: DateTime<Utc>) -> Result<Vec<TocNode>, BuilderError> {
let day_id = generate_node_id(TocLevel::Day, date);
self.storage.get_child_nodes(&day_id).map_err(BuilderError::from)
}
}
#[cfg(test)]
mod tests {
use super::*;
use memory_types::{Event, EventRole, EventType};
use tempfile::TempDir;
use crate::summarizer::MockSummarizer;
fn create_test_storage() -> (Arc<Storage>, TempDir) {
let temp_dir = TempDir::new().unwrap();
let storage = Arc::new(Storage::open(temp_dir.path()).unwrap());
(storage, temp_dir)
}
fn create_test_event(text: &str, timestamp_ms: i64) -> Event {
let ulid = ulid::Ulid::from_parts(timestamp_ms as u64, rand::random());
Event::new(
ulid.to_string(),
"session-123".to_string(),
chrono::Utc.timestamp_millis_opt(timestamp_ms).unwrap(),
EventType::UserMessage,
EventRole::User,
text.to_string(),
)
}
use chrono::TimeZone;
#[tokio::test]
async fn test_process_segment_creates_node() {
let (storage, _temp) = create_test_storage();
let summarizer = Arc::new(MockSummarizer::new());
let builder = TocBuilder::new(storage.clone(), summarizer);
let events = vec![
create_test_event("Hello", 1706540400000), // 2024-01-29
create_test_event("World", 1706540500000),
];
let segment = Segment::new(
"seg:test123".to_string(),
events.clone(),
events[0].timestamp,
events[1].timestamp,
100,
);
let node = builder.process_segment(&segment).await.unwrap();
assert_eq!(node.level, TocLevel::Segment);
assert!(!node.bullets.is_empty());
}
#[tokio::test]
async fn test_process_segment_creates_parents() {
let (storage, _temp) = create_test_storage();
let summarizer = Arc::new(MockSummarizer::new());
let builder = TocBuilder::new(storage.clone(), summarizer);
let events = vec![create_test_event("Test", 1706540400000)];
let segment = Segment::new(
"seg:test456".to_string(),
events.clone(),
events[0].timestamp,
events[0].timestamp,
50,
);
builder.process_segment(&segment).await.unwrap();
// Check that day node was created
let day_node = storage.get_toc_node("toc:day:2024-01-29").unwrap();
assert!(day_node.is_some());
// Check that year node was created
let year_node = storage.get_toc_node("toc:year:2024").unwrap();
assert!(year_node.is_some());
}
}Create crates/memory-toc/src/rollup.rs:
//! Rollup jobs for aggregating child TOC nodes.
//!
//! Per TOC-05: Day/Week/Month rollup jobs with checkpointing.
//! Per SUMM-04: Rollup summarizer aggregates child node summaries.
use std::sync::Arc;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use tracing::{debug, info, warn};
use memory_storage::Storage;
use memory_types::{TocBullet, TocLevel, TocNode};
use crate::summarizer::{Summary, Summarizer, SummarizerError};
/// Checkpoint for rollup job crash recovery.
///
/// Per STOR-03 and TOC-05: Enables crash recovery.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RollupCheckpoint {
/// Job identifier
pub job_name: String,
/// Level being processed
pub level: TocLevel,
/// Last successfully processed time period
#[serde(with = "chrono::serde::ts_milliseconds")]
pub last_processed_time: DateTime<Utc>,
/// Number of nodes processed in current run
pub processed_count: usize,
/// When this checkpoint was created
#[serde(with = "chrono::serde::ts_milliseconds")]
pub created_at: DateTime<Utc>,
}
impl RollupCheckpoint {
pub fn new(job_name: String, level: TocLevel) -> Self {
Self {
job_name,
level,
last_processed_time: DateTime::<Utc>::MIN_UTC,
processed_count: 0,
created_at: Utc::now(),
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
pub fn from_bytes(bytes: &[u8]) -> Result<Self, serde_json::Error> {
serde_json::from_slice(bytes)
}
}
/// Error type for rollup operations.
#[derive(Debug, thiserror::Error)]
pub enum RollupError {
#[error("Storage error: {0}")]
Storage(#[from] memory_storage::StorageError),
#[error("Summarization error: {0}")]
Summarizer(#[from] SummarizerError),
#[error("No child level for {0}")]
NoChildLevel(TocLevel),
#[error("Checkpoint error: {0}")]
Checkpoint(String),
}
/// Rollup job for aggregating child nodes into parent summaries.
pub struct RollupJob {
storage: Arc<Storage>,
summarizer: Arc<dyn Summarizer>,
level: TocLevel,
/// Minimum age of period before rollup (avoids rolling up incomplete periods)
min_age: Duration,
}
impl RollupJob {
/// Create a new rollup job for the specified level.
///
/// min_age: Minimum age of a period before it can be rolled up.
/// This prevents rolling up periods that are still receiving events.
pub fn new(
storage: Arc<Storage>,
summarizer: Arc<dyn Summarizer>,
level: TocLevel,
min_age: Duration,
) -> Self {
Self {
storage,
summarizer,
level,
min_age,
}
}
/// Create rollup jobs for all levels.
pub fn create_all(
storage: Arc<Storage>,
summarizer: Arc<dyn Summarizer>,
) -> Vec<Self> {
vec![
Self::new(storage.clone(), summarizer.clone(), TocLevel::Day, Duration::hours(1)),
Self::new(storage.clone(), summarizer.clone(), TocLevel::Week, Duration::hours(24)),
Self::new(storage.clone(), summarizer.clone(), TocLevel::Month, Duration::hours(24)),
Self::new(storage.clone(), summarizer.clone(), TocLevel::Year, Duration::days(7)),
]
}
/// Run the rollup job.
///
/// Processes nodes that need rollup since the last checkpoint.
pub async fn run(&self) -> Result<usize, RollupError> {
let job_name = format!("rollup_{}", self.level);
info!(job = %job_name, level = %self.level, "Starting rollup job");
// Load checkpoint
let checkpoint = self.load_checkpoint(&job_name)?;
let start_time = checkpoint.map(|c| c.last_processed_time).unwrap_or(DateTime::<Utc>::MIN_UTC);
// Get nodes at this level that need rollup
let cutoff_time = Utc::now() - self.min_age;
let nodes = self.storage.get_toc_nodes_by_level(
self.level,
Some(start_time),
Some(cutoff_time),
)?;
let mut processed = 0;
for node in nodes {
// Skip if period is too recent
if node.end_time > cutoff_time {
debug!(
node_id = %node.node_id,
"Skipping node - period not yet closed"
);
continue;
}
// Get children
let children = self.storage.get_child_nodes(&node.node_id)?;
if children.is_empty() {
debug!(node_id = %node.node_id, "Skipping node - no children");
continue;
}
// Convert children to summaries
let summaries: Vec<Summary> = children
.iter()
.map(|c| Summary::new(
c.title.clone(),
c.bullets.iter().map(|b| b.text.clone()).collect(),
c.keywords.clone(),
))
.collect();
// Generate rollup summary
let rollup_summary = self.summarizer.summarize_children(&summaries).await?;
// Update node with rollup summary
let mut updated_node = node.clone();
updated_node.title = rollup_summary.title;
updated_node.bullets = rollup_summary.bullets
.into_iter()
.map(TocBullet::new)
.collect();
updated_node.keywords = rollup_summary.keywords;
// Ensure child IDs are up to date
updated_node.child_node_ids = children.iter().map(|c| c.node_id.clone()).collect();
self.storage.put_toc_node(&updated_node)?;
// Save checkpoint after each node
self.save_checkpoint(&job_name, &updated_node)?;
processed += 1;
debug!(
node_id = %updated_node.node_id,
children = children.len(),
"Rolled up node"
);
}
info!(
job = %job_name,
processed = processed,
"Rollup job complete"
);
Ok(processed)
}
/// Load checkpoint from storage.
fn load_checkpoint(&self, job_name: &str) -> Result<Option<RollupCheckpoint>, RollupError> {
match self.storage.get_checkpoint(job_name)? {
Some(bytes) => {
let checkpoint = RollupCheckpoint::from_bytes(&bytes)
.map_err(|e| RollupError::Checkpoint(e.to_string()))?;
Ok(Some(checkpoint))
}
None => Ok(None),
}
}
/// Save checkpoint to storage.
fn save_checkpoint(&self, job_name: &str, node: &TocNode) -> Result<(), RollupError> {
let checkpoint = RollupCheckpoint {
job_name: job_name.to_string(),
level: self.level,
last_processed_time: node.end_time,
processed_count: 1,
created_at: Utc::now(),
};
let bytes = checkpoint.to_bytes()
.map_err(|e| RollupError::Checkpoint(e.to_string()))?;
self.storage.put_checkpoint(job_name, &bytes)?;
Ok(())
}
}
/// Run all rollup jobs in sequence.
pub async fn run_all_rollups(
storage: Arc<Storage>,
summarizer: Arc<dyn Summarizer>,
) -> Result<usize, RollupError> {
let jobs = RollupJob::create_all(storage, summarizer);
let mut total = 0;
for job in jobs {
total += job.run().await?;
}
Ok(total)
}
#[cfg(test)]
mod tests {
use super::*;
use memory_types::{Event, EventRole, EventType, Segment};
use tempfile::TempDir;
use chrono::TimeZone;
use crate::summarizer::MockSummarizer;
use crate::builder::TocBuilder;
fn create_test_storage() -> (Arc<Storage>, TempDir) {
let temp_dir = TempDir::new().unwrap();
let storage = Arc::new(Storage::open(temp_dir.path()).unwrap());
(storage, temp_dir)
}
fn create_test_event(text: &str, timestamp_ms: i64) -> Event {
let ulid = ulid::Ulid::from_parts(timestamp_ms as u64, rand::random());
Event::new(
ulid.to_string(),
"session-123".to_string(),
Utc.timestamp_millis_opt(timestamp_ms).unwrap(),
EventType::UserMessage,
EventRole::User,
text.to_string(),
)
}
#[test]
fn test_checkpoint_serialization() {
let checkpoint = RollupCheckpoint::new("test_job".to_string(), TocLevel::Day);
let bytes = checkpoint.to_bytes().unwrap();
let decoded = RollupCheckpoint::from_bytes(&bytes).unwrap();
assert_eq!(checkpoint.job_name, decoded.job_name);
assert_eq!(checkpoint.level, decoded.level);
}
#[tokio::test]
async fn test_rollup_job_no_children() {
let (storage, _temp) = create_test_storage();
let summarizer = Arc::new(MockSummarizer::new());
let job = RollupJob::new(
storage,
summarizer,
TocLevel::Day,
Duration::zero(), // No min age for testing
);
let result = job.run().await.unwrap();
assert_eq!(result, 0); // No nodes to process
}
#[tokio::test]
async fn test_rollup_job_with_segments() {
let (storage, _temp) = create_test_storage();
let summarizer = Arc::new(MockSummarizer::new());
// First, create some segments using TocBuilder
let builder = TocBuilder::new(storage.clone(), summarizer.clone());
// Create segment in the past
let past_time = Utc::now() - Duration::days(2);
let events = vec![
Event::new(
ulid::Ulid::new().to_string(),
"session".to_string(),
past_time,
EventType::UserMessage,
EventRole::User,
"Test event".to_string(),
),
];
let segment = Segment::new(
"seg:test".to_string(),
events.clone(),
past_time,
past_time,
50,
);
builder.process_segment(&segment).await.unwrap();
// Run rollup job
let job = RollupJob::new(
storage.clone(),
summarizer,
TocLevel::Day,
Duration::hours(1),
);
let result = job.run().await.unwrap();
// Result depends on whether the day node has children
// This tests the basic flow works without errors
assert!(result >= 0);
}
}<success_criteria>
- TOC nodes at all levels (Year, Month, Week, Day, Segment) - TOC-01
- Node IDs hierarchically structured
- Versioned node storage (TOC-06)
- Rollup jobs with checkpointing (TOC-05)
- Parent nodes created and linked
- All tests pass </success_criteria>
- Updated
crates/memory-storage/src/db.rswith:-
put_toc_node()- Store versioned TOC node (TOC-06 compliance) -
get_toc_node()- Get latest version of a node -
get_toc_nodes_by_level()- Query nodes by level with optional time filter -
get_child_nodes()- Get children of a parent node - Added 5 new tests for TOC storage methods
-
- Created
crates/memory-toc/src/node_id.rswith:-
generate_node_id()- Create hierarchical node IDs -
get_parent_node_id()- Navigate hierarchy upward -
parse_level()- Extract level from node ID -
generate_title()- Human-readable titles -
get_time_boundaries()- Calculate level time periods
-
- Created
crates/memory-toc/src/builder.rswith:-
TocBuilderfor segment processing - Automatic parent node creation up to Year level
- Summary generation using Summarizer trait
- Child node linking
-
- Created
crates/memory-toc/src/rollup.rswith:-
RollupJobfor aggregating child nodes -
RollupCheckpointfor crash recovery (STOR-03, TOC-05) -
run_all_rollups()convenience function - Configurable minimum age to avoid incomplete periods
-
| File | Purpose | Exports |
|---|---|---|
db.rs |
TOC storage |
put_toc_node, get_toc_node, etc. |
node_id.rs |
ID generation |
generate_node_id, get_parent_node_id, etc. |
builder.rs |
Hierarchy builder |
TocBuilder, BuilderError
|
rollup.rs |
Rollup jobs |
RollupJob, RollupCheckpoint, run_all_rollups
|
-
cargo build -p memory-toccompiles -
cargo build -p memory-storagecompiles -
cargo test --workspacepasses all 78 tests:- memory-storage: 14 tests
- memory-toc: 35 tests
- memory-types: 13 tests
- memory-service: 7 tests
- memory-daemon: 9 tests
- TOC-01: TOC nodes at all levels (Year, Month, Week, Day, Segment)
- TOC-02: Nodes store title, bullets, keywords, child_node_ids
- TOC-05: Rollup jobs with checkpointing
- TOC-06: Versioned node storage (append new version, don't mutate)
- STOR-03: Checkpoint storage for crash recovery
Researched: 2026-01-30 Domain: Event segmentation, LLM summarization, time hierarchy construction, background job processing Confidence: HIGH
Phase 2 builds the Table of Contents (TOC) - the hierarchical navigation structure enabling agents to find conversations without brute-force scanning. Research focused on four areas: event segmentation (time/token boundaries with overlap), LLM summarization (pluggable trait supporting API and local inference), time hierarchy construction (Year→Month→Week→Day→Segment), and background job checkpointing.
The standard approach uses time-gap detection (30 min) combined with token counting (4K) for segment boundaries, with overlap windows (5 min or 500 tokens) for context continuity. Summarization is async via a pluggable trait supporting OpenAI/Claude APIs or local models. TOC nodes are built bottom-up from segments, with rollup jobs aggregating children at each level. Checkpoints ensure crash recovery.
Primary recommendation: Start with segmentation engine (02-01), then summarizer trait with API implementation (02-02), finally TOC hierarchy builder with rollup jobs (02-03).
| Library | Version | Purpose | Why Standard |
|---|---|---|---|
| tiktoken-rs | 0.5+ | Token counting for OpenAI models | Accurate token estimation, used by openai-api-rs |
| async-trait | 0.1 | Async traits for Summarizer | Standard async abstraction for traits |
| reqwest | 0.12 | HTTP client for API calls | De facto Rust HTTP client |
| serde_json | 1.0 | JSON serialization | API request/response handling |
| Library | Version | Purpose | When to Use |
|---|---|---|---|
| tokio | 1.49 | Async runtime | Background job execution |
| backoff | 0.4 | Retry with exponential backoff | API rate limiting |
| secrecy | 0.10 | Secret string handling | API key storage |
| Instead of | Could Use | Tradeoff |
|---|---|---|
| tiktoken-rs | tokenizers | tokenizers is HuggingFace-focused, tiktoken matches OpenAI exactly |
| reqwest | hyper | hyper is lower-level, reqwest has better ergonomics |
| Manual retry | tower::retry | tower adds complexity for simple API calls |
What: Detect segment boundaries based on time gaps between events. When to use: Primary segmentation trigger (TOC-03). Example:
/// Segment boundary detection based on time gaps
pub struct SegmentationConfig {
/// Maximum time gap before starting new segment (TOC-03: 30 min)
pub time_threshold_ms: i64,
/// Maximum tokens before starting new segment (TOC-03: 4K)
pub token_threshold: usize,
/// Overlap for context continuity (TOC-04: 5 min)
pub overlap_time_ms: i64,
/// Overlap tokens (TOC-04: 500 tokens)
pub overlap_tokens: usize,
}
impl Default for SegmentationConfig {
fn default() -> Self {
Self {
time_threshold_ms: 30 * 60 * 1000, // 30 minutes
token_threshold: 4000,
overlap_time_ms: 5 * 60 * 1000, // 5 minutes
overlap_tokens: 500,
}
}
}What: Include overlap events from previous segment for context continuity. When to use: All segmentation (TOC-04). Example:
/// A segment of events with optional overlap from previous segment
pub struct Segment {
/// Unique segment identifier
pub segment_id: String,
/// Events in the overlap window (from previous segment)
pub overlap_events: Vec<Event>,
/// Events in this segment (excluding overlap)
pub events: Vec<Event>,
/// Start time of the segment (excluding overlap)
pub start_time: DateTime<Utc>,
/// End time of the segment
pub end_time: DateTime<Utc>,
/// Token count (excluding overlap)
pub token_count: usize,
}What: Async trait for generating summaries from events. When to use: All summarization (SUMM-01, SUMM-02). Example:
/// Output from summarization
pub struct Summary {
pub title: String,
pub bullets: Vec<String>,
pub keywords: Vec<String>,
}
/// Pluggable summarizer trait (SUMM-01)
#[async_trait::async_trait]
pub trait Summarizer: Send + Sync {
/// Generate a summary from events (SUMM-02)
async fn summarize_events(&self, events: &[Event]) -> Result<Summary, SummarizerError>;
/// Generate a rollup summary from child summaries (SUMM-04)
async fn summarize_children(&self, summaries: &[Summary]) -> Result<Summary, SummarizerError>;
}What: Summarizer implementation using OpenAI/Claude API. When to use: Default production summarizer. Example:
pub struct ApiSummarizer {
client: reqwest::Client,
api_key: secrecy::SecretString,
model: String,
base_url: String,
}
#[async_trait::async_trait]
impl Summarizer for ApiSummarizer {
async fn summarize_events(&self, events: &[Event]) -> Result<Summary, SummarizerError> {
let prompt = build_events_prompt(events);
let response = self.call_api(&prompt).await?;
parse_summary_response(&response)
}
async fn summarize_children(&self, summaries: &[Summary]) -> Result<Summary, SummarizerError> {
let prompt = build_rollup_prompt(summaries);
let response = self.call_api(&prompt).await?;
parse_summary_response(&response)
}
}What: Hierarchical node IDs that encode level and time period. When to use: All TOC node creation. Example:
/// Generate node ID based on level and time period
pub fn generate_node_id(level: TocLevel, time: DateTime<Utc>) -> String {
match level {
TocLevel::Year => format!("toc:year:{}", time.year()),
TocLevel::Month => format!("toc:month:{}:{:02}", time.year(), time.month()),
TocLevel::Week => {
let week = time.iso_week();
format!("toc:week:{}:W{:02}", week.year(), week.week())
}
TocLevel::Day => format!("toc:day:{}", time.format("%Y-%m-%d")),
TocLevel::Segment => format!("toc:segment:{}:{}",
time.format("%Y-%m-%d"),
ulid::Ulid::new()
),
}
}What: Background job that aggregates child nodes into parent summaries. When to use: Day/Week/Month rollup (TOC-05). Example:
/// Checkpoint for crash recovery (STOR-03, TOC-05)
#[derive(Serialize, Deserialize)]
pub struct RollupCheckpoint {
pub job_name: String,
pub level: TocLevel,
pub last_processed_time: DateTime<Utc>,
pub processed_count: usize,
}
pub async fn run_rollup_job(
storage: &Storage,
summarizer: &dyn Summarizer,
level: TocLevel,
) -> Result<(), Error> {
let job_name = format!("rollup_{}", level);
// Load checkpoint for crash recovery
let checkpoint = load_checkpoint(storage, &job_name)?;
let start_time = checkpoint.map(|c| c.last_processed_time).unwrap_or(DateTime::UNIX_EPOCH);
// Find nodes at child level that need rollup
let child_level = level.child().ok_or(Error::NoChildLevel)?;
let children = get_nodes_since(storage, child_level, start_time)?;
// Group by parent period and summarize
for (parent_id, child_nodes) in group_by_parent(children) {
let summaries: Vec<Summary> = child_nodes.iter()
.map(|n| Summary { title: n.title.clone(), bullets: n.bullets.clone(), keywords: n.keywords.clone() })
.collect();
let rollup = summarizer.summarize_children(&summaries).await?;
let parent_node = create_or_update_node(storage, &parent_id, level, rollup)?;
// Save checkpoint after each parent
save_checkpoint(storage, &job_name, parent_node.end_time)?;
}
Ok(())
}What: Append new version instead of mutating existing node. When to use: All TOC updates (TOC-06). Example:
/// Storage keys for versioned TOC nodes
/// toc_nodes CF: "toc:{node_id}:v{version}" -> TocNode bytes
/// toc_latest CF: "latest:{node_id}" -> latest version number
pub fn put_toc_node(storage: &Storage, node: &TocNode) -> Result<(), Error> {
let nodes_cf = storage.cf_handle(CF_TOC_NODES)?;
let latest_cf = storage.cf_handle(CF_TOC_LATEST)?;
// Get current version
let latest_key = format!("latest:{}", node.node_id);
let current_version = storage.get(&latest_cf, &latest_key)?
.map(|b| u32::from_be_bytes(b.try_into().unwrap()))
.unwrap_or(0);
let new_version = current_version + 1;
let versioned_key = format!("toc:{}:v{}", node.node_id, new_version);
let mut node = node.clone();
node.version = new_version;
let mut batch = WriteBatch::default();
batch.put_cf(&nodes_cf, versioned_key, node.to_bytes()?);
batch.put_cf(&latest_cf, latest_key, new_version.to_be_bytes());
storage.write(batch)?;
Ok(())
}What goes wrong: Tool results (file contents, command output) inflate token counts dramatically. Why it happens: Naive counting includes full tool output text. How to avoid: Truncate or summarize tool results before counting.
fn count_tokens_for_event(event: &Event) -> usize {
let text = if event.event_type == EventType::ToolResult {
// Truncate tool results to reasonable length
&event.text[..event.text.len().min(1000)]
} else {
&event.text
};
tiktoken_rs::cl100k_base().unwrap().encode_with_special_tokens(text).len()
}What goes wrong: Overlap window too small, summarizer lacks context. Why it happens: Events referenced in current segment occurred in overlap period. How to avoid: Include overlap events in summarization input, mark them as context.
fn prepare_for_summarization(segment: &Segment) -> Vec<Event> {
let mut all_events = segment.overlap_events.clone();
all_events.extend(segment.events.clone());
// Mark overlap events for summarizer
for event in &mut all_events[..segment.overlap_events.len()] {
event.metadata.insert("_overlap".to_string(), "true".to_string());
}
all_events
}What goes wrong: Burst of summarization calls hits rate limit, job fails. Why it happens: No backoff or throttling. How to avoid: Use exponential backoff with jitter.
use backoff::{ExponentialBackoff, retry};
async fn call_api_with_retry<T, F, Fut>(f: F) -> Result<T, Error>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = Result<T, Error>>,
{
let backoff = ExponentialBackoff::default();
retry(backoff, || async { f().await.map_err(backoff::Error::transient) }).await
}What goes wrong: Crash between processing and checkpoint, work repeated or lost. Why it happens: Checkpoint written separately from node update. How to avoid: Write node and checkpoint in same atomic batch.
What goes wrong: Day rollup runs before all segments for that day exist. Why it happens: New events still arriving, rollup job triggered too early. How to avoid: Only rollup periods that are "closed" (current period excluded).
fn should_rollup_period(period_end: DateTime<Utc>) -> bool {
// Only rollup if period ended at least 1 hour ago
period_end + Duration::hours(1) < Utc::now()
}pub struct SegmentBuilder {
config: SegmentationConfig,
current_events: Vec<Event>,
current_tokens: usize,
last_event_time: Option<DateTime<Utc>>,
}
impl SegmentBuilder {
pub fn new(config: SegmentationConfig) -> Self {
Self {
config,
current_events: Vec::new(),
current_tokens: 0,
last_event_time: None,
}
}
/// Add an event, returns Some(Segment) if boundary reached
pub fn add_event(&mut self, event: Event) -> Option<Segment> {
let event_tokens = count_tokens(&event.text);
// Check time gap boundary
if let Some(last_time) = self.last_event_time {
let gap_ms = event.timestamp.timestamp_millis() - last_time.timestamp_millis();
if gap_ms > self.config.time_threshold_ms {
return Some(self.flush_segment());
}
}
// Check token boundary
if self.current_tokens + event_tokens > self.config.token_threshold {
return Some(self.flush_segment());
}
self.current_events.push(event.clone());
self.current_tokens += event_tokens;
self.last_event_time = Some(event.timestamp);
None
}
/// Flush current events as a segment
fn flush_segment(&mut self) -> Segment {
let events = std::mem::take(&mut self.current_events);
let start_time = events.first().map(|e| e.timestamp).unwrap_or_else(Utc::now);
let end_time = events.last().map(|e| e.timestamp).unwrap_or_else(Utc::now);
self.current_tokens = 0;
self.last_event_time = None;
Segment {
segment_id: format!("seg:{}", ulid::Ulid::new()),
overlap_events: Vec::new(), // Filled by caller
events,
start_time,
end_time,
token_count: self.current_tokens,
}
}
}fn build_events_prompt(events: &[Event]) -> String {
let events_text: String = events.iter()
.map(|e| format!("[{}] {}: {}", e.timestamp, e.role, e.text))
.collect::<Vec<_>>()
.join("\n\n");
format!(r#"Summarize this conversation segment for a Table of Contents entry.
CONVERSATION:
{events_text}
Provide your response in JSON format:
{{
"title": "Brief title (5-10 words)",
"bullets": ["Key point 1", "Key point 2", "Key point 3"],
"keywords": ["keyword1", "keyword2", "keyword3"]
}}
Guidelines:
- Title should capture the main topic or activity
- 3-5 bullet points summarizing key discussions or decisions
- 3-7 keywords for search/filtering
- Focus on what would help someone find this conversation later"#)
}-
Summarization API Choice
- What we know: OpenAI and Claude APIs both work well
- What's unclear: Which model is best for summarization (gpt-4o-mini vs claude-3-haiku)
- Recommendation: Start with gpt-4o-mini (cheaper, sufficient quality), make model configurable
-
Real-time vs Batch Segmentation
- What we know: Events arrive continuously via hooks
- What's unclear: Process immediately or batch?
- Recommendation: Batch via outbox processing - check outbox periodically, process segments in batches
-
Rollup Frequency
- What we know: Need rollup for Day→Week→Month→Year
- What's unclear: How often to run rollup jobs
- Recommendation: Day rollup hourly, Week/Month/Year daily via cron-like scheduler
- tiktoken-rs documentation - Token counting
- OpenAI API documentation - Chat completions
- Phase 1 research - Storage patterns, checkpointing
- Claude API documentation - Alternative summarizer
- backoff crate documentation - Retry patterns
Generated by GSD Phase Researcher, 2026-01-30