diff --git a/.changeset/record-persistence-boundary.md b/.changeset/record-persistence-boundary.md new file mode 100644 index 0000000..493e8a3 --- /dev/null +++ b/.changeset/record-persistence-boundary.md @@ -0,0 +1,6 @@ +--- +"@moonshot-ai/agent-core": patch +"@moonshot-ai/kimi-code": patch +--- + +Move wire metadata handling into the record layer and keep persistence backends limited to storage operations. diff --git a/packages/agent-core/src/agent/index.ts b/packages/agent-core/src/agent/index.ts index d502ae3..5423ea5 100644 --- a/packages/agent-core/src/agent/index.ts +++ b/packages/agent-core/src/agent/index.ts @@ -146,9 +146,6 @@ export class Agent { : undefined), ); this.records.onRecord = config.onRecord; - this.records.onError = (error, record) => { - this.emitRecordsWriteError(error, record); - }; this.fullCompaction = new FullCompaction(this, config.compactionStrategy); this.context = new ContextMemory(this); this.config = new ConfigState(this); diff --git a/packages/agent-core/src/agent/records/index.ts b/packages/agent-core/src/agent/records/index.ts index 29de0cf..cc0fed9 100644 --- a/packages/agent-core/src/agent/records/index.ts +++ b/packages/agent-core/src/agent/records/index.ts @@ -1,15 +1,24 @@ import type { Agent } from '..'; -import type { AgentRecord, AgentRecordPersistence } from './types'; +import { + AGENT_WIRE_PROTOCOL_VERSION, + type AgentRecord, + type AgentRecordPersistence, +} from './types'; export * from './types'; -export { FileSystemAgentRecordPersistence } from './wire-file'; -export type { FileSystemAgentRecordPersistenceOptions } from './wire-file'; +export { + FileSystemAgentRecordPersistence, + InMemoryAgentRecordPersistence, +} from './persistence'; +export type { FileSystemAgentRecordPersistenceOptions } from './persistence'; // Contract: restore MUST NOT emit UI events, call the LLM, execute tools, or // touch the filesystem in a way that triggers external side effects. Each case // should reproduce the in-memory state the live handler left behind, nothing more. export function restoreAgentRecord(agent: Agent, input: AgentRecord): void { switch (input.type) { + case 'metadata': + return; case 'turn.prompt': agent.turn.restorePrompt(); return; @@ -82,10 +91,9 @@ export function restoreAgentRecord(agent: Agent, input: AgentRecord): void { } export class AgentRecords { - private readonly records: AgentRecord[] = []; private _restoring = false; + private metadataInitialized = false; onRecord?: (record: AgentRecord) => void; - onError?: (error: unknown, record: AgentRecord) => void; constructor( private readonly restoreRecord: (record: AgentRecord) => void, @@ -100,11 +108,23 @@ export class AgentRecords { if (this._restoring) return; const stamped: AgentRecord = record.time !== undefined ? record : { ...record, time: Date.now() }; - this.records.push(stamped); + if ( + this.persistence !== undefined && + !this.metadataInitialized && + stamped.type !== 'metadata' + ) { + this.persistence.append({ + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: Date.now(), + }); + this.metadataInitialized = true; + } + if (stamped.type === 'metadata') { + this.metadataInitialized = true; + } + this.persistence?.append(stamped); this.onRecord?.(stamped); - void this.persistence?.append(stamped).catch((error) => { - this.onError?.(error, stamped); - }); } restore(record: AgentRecord): void { @@ -119,20 +139,13 @@ export class AgentRecords { async replay(): Promise { if (!this.persistence) throw new Error('No persistence provided for AgentRecords'); for await (const record of this.persistence.read()) { - this.records.push(record); - this._restoring = true; - try { - this.restoreRecord(record); - } finally { - this._restoring = false; + if (!this.metadataInitialized) { + this.metadataInitialized = true; } + this.restore(record); } } - snapshot(): readonly AgentRecord[] { - return [...this.records]; - } - async flush(): Promise { await this.persistence?.flush(); } diff --git a/packages/agent-core/src/agent/records/persistence.ts b/packages/agent-core/src/agent/records/persistence.ts new file mode 100644 index 0000000..b89aa58 --- /dev/null +++ b/packages/agent-core/src/agent/records/persistence.ts @@ -0,0 +1,203 @@ +import { createReadStream } from 'node:fs'; +import { mkdir, open } from 'node:fs/promises'; +import { dirname } from 'node:path'; + +import { syncDir } from '../../utils/fs'; +import { type AgentRecord, type AgentRecordPersistence } from './types'; + +export interface FileSystemAgentRecordPersistenceOptions { + readonly onError?: ((error: unknown) => void) | undefined; +} + +export class InMemoryAgentRecordPersistence implements AgentRecordPersistence { + readonly records: AgentRecord[] = []; + + constructor(records: readonly AgentRecord[] = []) { + this.records.push(...records); + } + + async *read(): AsyncIterable { + for (const record of this.records) { + yield record; + } + } + + append(input: AgentRecord): void { + this.records.push(input); + } + + rewrite(records: readonly AgentRecord[]): void { + this.records.splice(0, this.records.length, ...records); + } + + async flush(): Promise {} + + async close(): Promise {} +} + +export class FileSystemAgentRecordPersistence implements AgentRecordPersistence { + private readonly pendingRecords: AgentRecord[] = []; + private shouldClear = false; + private directorySynced = false; + private flushPromise: Promise | undefined; + private error: unknown; + + constructor( + private readonly filePath: string, + private readonly options: FileSystemAgentRecordPersistenceOptions = {}, + ) {} + + async *read(): AsyncIterable { + await this.flush(); + + let line = ''; + let lineNumber = 0; + const stream = createReadStream(this.filePath, { encoding: 'utf8' }); + try { + for await (const chunk of stream) { + line += chunk; + let newlineIndex = line.indexOf('\n'); + while (newlineIndex !== -1) { + const rawLine = line.slice(0, newlineIndex); + line = line.slice(newlineIndex + 1); + lineNumber++; + + const record = parseRecordLine( + rawLine.endsWith('\r') ? rawLine.slice(0, -1) : rawLine, + lineNumber, + this.filePath, + false, + ); + if (record !== undefined) yield record; + + newlineIndex = line.indexOf('\n'); + } + } + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === 'ENOENT') return; + throw error; + } + + if (line.length > 0) { + lineNumber++; + const record = parseRecordLine(line, lineNumber, this.filePath, true); + if (record !== undefined) yield record; + } + } + + append(input: AgentRecord): void { + this.throwIfError(); + this.pendingRecords.push(input); + this.scheduleFlush(); + } + + rewrite(records: readonly AgentRecord[]): void { + this.throwIfError(); + this.shouldClear = true; + this.pendingRecords.splice(0, this.pendingRecords.length, ...records); + this.scheduleFlush(); + } + + async flush(): Promise { + this.throwIfError(); + while ( + this.flushPromise !== undefined || + this.shouldClear || + this.pendingRecords.length > 0 + ) { + await this.ensureFlush(); + this.throwIfError(); + } + } + + async close(): Promise { + await this.flush(); + } + + private scheduleFlush(): void { + void this.ensureFlush().catch((error) => { + this.options.onError?.(error); + }); + } + + private ensureFlush(): Promise { + if (this.flushPromise !== undefined) return this.flushPromise; + + const promise = this.drainPendingRecords() + .catch((error: unknown) => { + this.error = error; + // oxlint-disable-next-line typescript-eslint/only-throw-error + throw error; + }) + .finally(() => { + if (this.flushPromise === promise) { + this.flushPromise = undefined; + } + if ( + this.error === undefined && + (this.shouldClear || this.pendingRecords.length > 0) + ) { + this.scheduleFlush(); + } + }); + this.flushPromise = promise; + return promise; + } + + private throwIfError(): void { + // oxlint-disable-next-line typescript-eslint/only-throw-error + if (this.error !== undefined) throw this.error; + } + + private async drainPendingRecords(): Promise { + while (this.shouldClear || this.pendingRecords.length > 0) { + await this.drainBatch(); + } + } + + private async drainBatch(): Promise { + const shouldClear = this.shouldClear; + const batch = this.pendingRecords.splice(0); + this.shouldClear = false; + + const content = batch.map((e) => JSON.stringify(e) + '\n').join(''); + const directory = dirname(this.filePath); + await mkdir(directory, { recursive: true }); + + const fh = await open(this.filePath, shouldClear ? 'w' : 'a'); + try { + if (content.length > 0) { + await fh.writeFile(content, 'utf8'); + } + await fh.sync(); + } finally { + await fh.close(); + } + + if (!this.directorySynced) { + await syncDir(directory); + this.directorySynced = true; + } + } +} + +function parseRecordLine( + line: string, + lineNumber: number, + filePath: string, + allowTruncated: boolean, +): AgentRecord | undefined { + if (line.length === 0) return undefined; + try { + return JSON.parse(line) as AgentRecord; + } catch (parseError) { + // Tolerate a truncated trailing line — last write may have crashed + // mid-flush; everything before is still well-formed. + if (allowTruncated) return undefined; + throw new Error( + `wire.jsonl: corrupted line ${lineNumber} in ${filePath}: ${String(parseError)}`, + { cause: parseError }, + ); + } +} diff --git a/packages/agent-core/src/agent/records/types.ts b/packages/agent-core/src/agent/records/types.ts index 83442af..f67625b 100644 --- a/packages/agent-core/src/agent/records/types.ts +++ b/packages/agent-core/src/agent/records/types.ts @@ -10,6 +10,11 @@ import type { UserToolRegistration } from '../tool'; import type { UsageRecordScope } from '../usage'; export interface AgentRecordEvents { + metadata: { + protocol_version: string; + created_at: number; + }; + 'turn.prompt': { input: readonly ContentPart[]; origin: PromptOrigin; @@ -85,7 +90,8 @@ export const AGENT_WIRE_PROTOCOL_VERSION = '1.0'; export interface AgentRecordPersistence { read(): AsyncIterable; - append(input: AgentRecord): Promise; + append(input: AgentRecord): void; + rewrite(records: readonly AgentRecord[]): void; flush(): Promise; close(): Promise; } diff --git a/packages/agent-core/src/agent/records/wire-file.ts b/packages/agent-core/src/agent/records/wire-file.ts deleted file mode 100644 index d16fe62..0000000 --- a/packages/agent-core/src/agent/records/wire-file.ts +++ /dev/null @@ -1,193 +0,0 @@ -import { mkdir, open, readFile, stat } from 'node:fs/promises'; -import { dirname } from 'node:path'; - -import { syncDir } from '../../utils/fs'; -import { - AGENT_WIRE_PROTOCOL_VERSION, - type AgentRecord, - type AgentRecordPersistence, -} from './types'; - -interface AgentWireMetadata { - readonly type: 'metadata'; - readonly protocol_version: string; - readonly created_at: number; -} - -type WireFileRecord = AgentRecord | AgentWireMetadata; - -export interface FileSystemAgentRecordPersistenceOptions { - readonly onError?: ((error: unknown) => void) | undefined; -} - -class AsyncSerialQueue { - private tail: Promise = Promise.resolve(); - - run(task: () => Promise): Promise { - const next = this.tail.then(task, task); - this.tail = next.catch(() => { - /* swallow so a rejected task does not poison the chain */ - }); - return next; - } -} - -// Single-writer per file: the "is file empty?" check before emitting the -// header is racy across processes, but multi-writer wire.jsonl is unsupported. -export class FileSystemAgentRecordPersistence implements AgentRecordPersistence { - private readonly queue = new AsyncSerialQueue(); - private readonly pending: WireFileRecord[] = []; - private closed = false; - private closing = false; - private directorySynced = false; - private drainScheduled = false; - private lastBackgroundError: Error | undefined; - private headerPromise: Promise | undefined; - - constructor( - private readonly filePath: string, - private readonly options: FileSystemAgentRecordPersistenceOptions = {}, - ) {} - - async *read(): AsyncIterable { - await this.flush(); - - let text: string; - try { - text = await readFile(this.filePath, 'utf8'); - } catch (error) { - const code = (error as NodeJS.ErrnoException).code; - if (code === 'ENOENT') return; - throw error; - } - - const lines = text.split('\n'); - for (let i = 0; i < lines.length; i++) { - const line = lines[i]; - if (line === undefined || line.length === 0) continue; - - let parsed: WireFileRecord; - try { - parsed = JSON.parse(line) as WireFileRecord; - } catch (parseError) { - // Tolerate a truncated trailing line — last write may have crashed - // mid-flush; everything before is still well-formed. - if (i === lines.length - 1) continue; - throw new Error( - `wire.jsonl: corrupted line ${i + 1} in ${this.filePath}: ${String(parseError)}`, - { cause: parseError }, - ); - } - if (parsed.type === 'metadata') continue; - yield parsed; - } - } - - async append(input: AgentRecord): Promise { - if (this.closed || this.closing) { - throw new Error('FileSystemAgentRecordPersistence: append on closed persistence'); - } - await this.ensureHeader(); - if (this.closed || this.closing) { - throw new Error('FileSystemAgentRecordPersistence: append on closed persistence'); - } - this.pending.push(input); - this.scheduleDrain(); - } - - async flush(): Promise { - await this.headerPromise; - try { - await this.queue.run(async () => { - while (this.pending.length > 0 && !this.closed) { - await this.drainBatch(); - } - }); - } catch (error) { - this.options.onError?.(error); - throw error; - } - - if (this.lastBackgroundError !== undefined) { - const error = this.lastBackgroundError; - this.lastBackgroundError = undefined; - throw error; - } - } - - async close(): Promise { - if (this.closed) return; - this.closing = true; - try { - await this.flush(); - this.closed = true; - } catch (error) { - this.closing = false; - throw error; - } - } - - private ensureHeader(): Promise { - this.headerPromise ??= this.writeHeaderIfNeeded(); - return this.headerPromise; - } - - private async writeHeaderIfNeeded(): Promise { - let isEmpty = true; - try { - const stats = await stat(this.filePath); - isEmpty = stats.size === 0; - } catch (error) { - const code = (error as NodeJS.ErrnoException).code; - if (code !== 'ENOENT') throw error; - } - if (!isEmpty) return; - - this.pending.unshift({ - type: 'metadata', - protocol_version: AGENT_WIRE_PROTOCOL_VERSION, - created_at: Date.now(), - }); - } - - private scheduleDrain(): void { - if (this.drainScheduled || this.closed) return; - this.drainScheduled = true; - queueMicrotask(() => { - this.drainScheduled = false; - if (this.closed || this.pending.length === 0) return; - this.queue - .run(async () => { - while (this.pending.length > 0 && !this.closed) { - await this.drainBatch(); - } - }) - .catch((error) => { - this.lastBackgroundError = error as Error; - this.options.onError?.(error); - }); - }); - } - - private async drainBatch(): Promise { - if (this.pending.length === 0) return; - - const batch = this.pending.splice(0); - const lines = batch.map((e) => JSON.stringify(e) + '\n'); - - await mkdir(dirname(this.filePath), { recursive: true }); - - const fh = await open(this.filePath, 'a'); - try { - await fh.appendFile(lines.join(''), 'utf8'); - await fh.sync(); - } finally { - await fh.close(); - } - - if (!this.directorySynced) { - await syncDir(dirname(this.filePath)); - this.directorySynced = true; - } - } -} diff --git a/packages/agent-core/test/agent/file-system-record-persistence.test.ts b/packages/agent-core/test/agent/file-system-record-persistence.test.ts index 723955e..41d84cd 100644 --- a/packages/agent-core/test/agent/file-system-record-persistence.test.ts +++ b/packages/agent-core/test/agent/file-system-record-persistence.test.ts @@ -7,7 +7,9 @@ import { afterEach, describe, expect, it } from 'vitest'; import { AGENT_WIRE_PROTOCOL_VERSION, + AgentRecords, FileSystemAgentRecordPersistence, + InMemoryAgentRecordPersistence, type AgentRecord, } from '../../src/agent/records'; @@ -32,11 +34,11 @@ async function readLines(path: string): Promise { } describe('FileSystemAgentRecordPersistence', () => { - it('writes a metadata header on the first append', async () => { + it('writes only the appended record', async () => { const wirePath = await makeWirePath(); const persistence = new FileSystemAgentRecordPersistence(wirePath); - await persistence.append({ + persistence.append({ type: 'turn.prompt', input: [{ type: 'text', text: 'hello' }], origin: { kind: 'user' }, @@ -44,20 +46,15 @@ describe('FileSystemAgentRecordPersistence', () => { await persistence.close(); const lines = await readLines(wirePath); - expect(lines).toHaveLength(2); - const meta = JSON.parse(lines[0]!) as Record; - expect(meta).toMatchObject({ - type: 'metadata', - protocol_version: AGENT_WIRE_PROTOCOL_VERSION, - }); - expect(typeof meta['created_at']).toBe('number'); + expect(lines).toHaveLength(1); + expect(JSON.parse(lines[0]!)['type']).toBe('turn.prompt'); }); - it('does not re-emit a metadata header when the file already has content', async () => { + it('appends to an existing file without injecting records', async () => { const wirePath = await makeWirePath(); const first = new FileSystemAgentRecordPersistence(wirePath); - await first.append({ + first.append({ type: 'turn.prompt', input: [{ type: 'text', text: 'one' }], origin: { kind: 'user' }, @@ -65,7 +62,7 @@ describe('FileSystemAgentRecordPersistence', () => { await first.close(); const second = new FileSystemAgentRecordPersistence(wirePath); - await second.append({ + second.append({ type: 'turn.prompt', input: [{ type: 'text', text: 'two' }], origin: { kind: 'user' }, @@ -73,16 +70,22 @@ describe('FileSystemAgentRecordPersistence', () => { await second.close(); const lines = await readLines(wirePath); - // 1 metadata + 2 turn.prompt records. - expect(lines).toHaveLength(3); - const metaLines = lines.filter((l) => l.includes('"type":"metadata"')); - expect(metaLines).toHaveLength(1); + expect(lines).toHaveLength(2); + expect(lines.map((line) => JSON.parse(line)['type'])).toEqual([ + 'turn.prompt', + 'turn.prompt', + ]); }); - it('filters the metadata header out of read() output', async () => { + it('returns appended metadata records from read() output', async () => { const wirePath = await makeWirePath(); const persistence = new FileSystemAgentRecordPersistence(wirePath); - await persistence.append({ + persistence.append({ + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: 1, + }); + persistence.append({ type: 'turn.prompt', input: [{ type: 'text', text: 'hi' }], origin: { kind: 'user' }, @@ -92,28 +95,236 @@ describe('FileSystemAgentRecordPersistence', () => { const reader = new FileSystemAgentRecordPersistence(wirePath); const records: AgentRecord[] = []; for await (const r of reader.read()) records.push(r); - expect(records).toHaveLength(1); - expect(records[0]!.type).toBe('turn.prompt'); + expect(records).toHaveLength(2); + expect(records[0]).toMatchObject({ + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + }); + expect(records[1]!.type).toBe('turn.prompt'); }); - it('rejects an append that resumes after close starts', async () => { + it('rewrites records from the beginning and then appends after them', async () => { const wirePath = await makeWirePath(); const persistence = new FileSystemAgentRecordPersistence(wirePath); + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'old' }], + origin: { kind: 'user' }, + }); + persistence.rewrite([ + { + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: 1, + }, + { + type: 'turn.prompt', + input: [{ type: 'text', text: 'new' }], + origin: { kind: 'user' }, + }, + ]); + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'later' }], + origin: { kind: 'user' }, + }); + await persistence.flush(); - const appendPromise = persistence.append({ + const lines = await readLines(wirePath); + expect(lines.map((line) => JSON.parse(line)['type'])).toEqual([ + 'metadata', + 'turn.prompt', + 'turn.prompt', + ]); + expect(JSON.parse(lines[1]!)['input'][0]['text']).toBe('new'); + expect(JSON.parse(lines[2]!)['input'][0]['text']).toBe('later'); + }); + + it('rewrites already flushed records from the beginning', async () => { + const wirePath = await makeWirePath(); + const persistence = new FileSystemAgentRecordPersistence(wirePath); + persistence.append({ type: 'turn.prompt', - input: [{ type: 'text', text: 'late' }], + input: [{ type: 'text', text: 'old' }], origin: { kind: 'user' }, }); - const closePromise = persistence.close(); + await persistence.flush(); - await expect(appendPromise).rejects.toThrow( - 'FileSystemAgentRecordPersistence: append on closed persistence', - ); - await closePromise; + persistence.rewrite([ + { + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: 1, + }, + { + type: 'turn.prompt', + input: [{ type: 'text', text: 'new' }], + origin: { kind: 'user' }, + }, + ]); + await persistence.flush(); + + const lines = await readLines(wirePath); + expect(lines.map((line) => JSON.parse(line)['type'])).toEqual([ + 'metadata', + 'turn.prompt', + ]); + expect(JSON.parse(lines[1]!)['input'][0]['text']).toBe('new'); + }); + + it('flushes pending records on close', async () => { + const wirePath = await makeWirePath(); + const persistence = new FileSystemAgentRecordPersistence(wirePath); + + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'late' }], + origin: { kind: 'user' }, + }); + await persistence.close(); const lines = await readLines(wirePath); expect(lines).toHaveLength(1); - expect(JSON.parse(lines[0]!)['type']).toBe('metadata'); + expect(JSON.parse(lines[0]!)['type']).toBe('turn.prompt'); + }); + + it('enters error state after a write failure', async () => { + const wirePath = await makeWirePath(); + await mkdir(wirePath); + const persistence = new FileSystemAgentRecordPersistence(wirePath); + + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'first' }], + origin: { kind: 'user' }, + }); + await expect(persistence.flush()).rejects.toBeInstanceOf(Error); + + expect(() => { + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'second' }], + origin: { kind: 'user' }, + }); + }).toThrow(); + expect(() => { + persistence.rewrite([ + { + type: 'turn.prompt', + input: [{ type: 'text', text: 'rewrite' }], + origin: { kind: 'user' }, + }, + ]); + }).toThrow(); + await expect(persistence.flush()).rejects.toBeInstanceOf(Error); + }); +}); + +describe('InMemoryAgentRecordPersistence', () => { + it('stores appended records and replaces them on rewrite', async () => { + const persistence = new InMemoryAgentRecordPersistence(); + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'one' }], + origin: { kind: 'user' }, + }); + persistence.rewrite([ + { + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: 1, + }, + ]); + + const records: AgentRecord[] = []; + for await (const record of persistence.read()) records.push(record); + + expect(records).toEqual([ + { + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: 1, + }, + ]); + expect(persistence.records).toEqual(records); + }); +}); + +describe('AgentRecords persistence metadata', () => { + it('writes metadata before the first persisted record', async () => { + const wirePath = await makeWirePath(); + const persistence = new FileSystemAgentRecordPersistence(wirePath); + const records = new AgentRecords(() => {}, persistence); + + records.logRecord({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'hello' }], + origin: { kind: 'user' }, + }); + await records.flush(); + + const lines = await readLines(wirePath); + expect(lines).toHaveLength(2); + const meta = JSON.parse(lines[0]!) as Record; + expect(meta).toMatchObject({ + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + }); + expect(typeof meta['created_at']).toBe('number'); + expect(JSON.parse(lines[1]!)['type']).toBe('turn.prompt'); + }); + + it('does not write metadata when replaying an empty stream', async () => { + const wirePath = await makeWirePath(); + const persistence = new FileSystemAgentRecordPersistence(wirePath); + const records = new AgentRecords(() => {}, persistence); + + await records.replay(); + records.logRecord({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'one' }], + origin: { kind: 'user' }, + }); + await records.flush(); + + const lines = await readLines(wirePath); + expect(lines).toHaveLength(2); + expect(lines.map((line) => JSON.parse(line)['type'])).toEqual([ + 'metadata', + 'turn.prompt', + ]); + }); + + it('does not duplicate metadata after replaying existing records', async () => { + const wirePath = await makeWirePath(); + const persistence = new FileSystemAgentRecordPersistence(wirePath); + persistence.append({ + type: 'metadata', + protocol_version: AGENT_WIRE_PROTOCOL_VERSION, + created_at: 1, + }); + persistence.append({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'one' }], + origin: { kind: 'user' }, + }); + await persistence.flush(); + + const records = new AgentRecords(() => {}, persistence); + await records.replay(); + records.logRecord({ + type: 'turn.prompt', + input: [{ type: 'text', text: 'two' }], + origin: { kind: 'user' }, + }); + await records.flush(); + + const lines = await readLines(wirePath); + expect(lines.map((line) => JSON.parse(line)['type'])).toEqual([ + 'metadata', + 'turn.prompt', + 'turn.prompt', + ]); + expect(lines.filter((line) => JSON.parse(line)['type'] === 'metadata')).toHaveLength(1); }); }); diff --git a/packages/agent-core/test/agent/harness/agent.ts b/packages/agent-core/test/agent/harness/agent.ts index 3e57bb6..43a72b1 100644 --- a/packages/agent-core/test/agent/harness/agent.ts +++ b/packages/agent-core/test/agent/harness/agent.ts @@ -14,6 +14,7 @@ import { } from '../../../src/agent'; import type { CompactionStrategy } from '../../../src/agent/compaction'; import type { ApprovalResponse } from '../../../src/agent/permission'; +import { InMemoryAgentRecordPersistence } from '../../../src/agent/records'; import type { KimiConfig } from '../../../src/config'; import type { ExecutableToolResult } from '../../../src/loop'; import type { Logger } from '../../../src/logging'; @@ -283,7 +284,7 @@ export class AgentTestContext { providerManager: this.agent.providerManager, generate: failOnResumeGenerate, compactionStrategy: this.options.compactionStrategy, - persistence: new ReplayAgentPersistence(this.recordHistory), + persistence: new InMemoryAgentRecordPersistence(this.recordHistory.map(cloneRecord)), }); await resumed.agent.resume(); @@ -439,6 +440,7 @@ export class AgentTestContext { return { read: () => this.readAndCapturePersistence(persistence), append: (event) => persistence.append(event), + rewrite: (records) => persistence.rewrite(records), flush: () => persistence.flush(), close: () => persistence.close(), }; @@ -483,24 +485,6 @@ export class AgentTestContext { } } -class ReplayAgentPersistence implements AgentRecordPersistence { - constructor(private readonly events: readonly AgentRecord[]) {} - - async *read(): AsyncIterable { - for (const event of this.events) { - yield cloneRecord(event); - } - } - - async append(_event: AgentRecord): Promise { - throw new Error('Resume replay unexpectedly appended a record'); - } - - async flush(): Promise {} - - async close(): Promise {} -} - const failOnResumeGenerate: GenerateFn = async () => { throw new Error('Resume replay unexpectedly called the LLM'); }; diff --git a/packages/agent-core/test/agent/resume.test.ts b/packages/agent-core/test/agent/resume.test.ts index c75a283..c32c5ef 100644 --- a/packages/agent-core/test/agent/resume.test.ts +++ b/packages/agent-core/test/agent/resume.test.ts @@ -4,7 +4,8 @@ import { join } from 'node:path'; import { describe, expect, it, vi } from 'vitest'; -import type { AgentRecord, AgentRecordPersistence } from '../../src/agent'; +import type { AgentRecord } from '../../src/agent'; +import { InMemoryAgentRecordPersistence } from '../../src/agent/records'; import { appendTaskOutput, writeTask } from '../../src/tools/background/persist'; import { createFakeKaos } from '../tools/fixtures/fake-kaos'; import { testAgent } from './harness/agent'; @@ -282,24 +283,13 @@ describe('Agent resume', () => { }); }); -class RecordingAgentPersistence implements AgentRecordPersistence { +class RecordingAgentPersistence extends InMemoryAgentRecordPersistence { readonly appended: AgentRecord[] = []; - constructor(private readonly events: readonly AgentRecord[]) {} - - async *read(): AsyncIterable { - for (const event of this.events) { - yield event; - } - } - - async append(input: AgentRecord): Promise { + override append(input: AgentRecord): void { this.appended.push(input); + super.append(input); } - - async flush(): Promise {} - - async close(): Promise {} } function resumeHistory(): AgentRecord[] { diff --git a/packages/agent-core/test/agent/skill-tool-manager.test.ts b/packages/agent-core/test/agent/skill-tool-manager.test.ts index 7fadfff..9a4ddd4 100644 --- a/packages/agent-core/test/agent/skill-tool-manager.test.ts +++ b/packages/agent-core/test/agent/skill-tool-manager.test.ts @@ -5,7 +5,7 @@ import { join } from 'node:path'; import { localKaos } from '@moonshot-ai/kaos'; import { describe, expect, it, vi } from 'vitest'; -import { Agent } from '../../src/agent'; +import { Agent, type AgentRecord } from '../../src/agent'; import { ProviderManager } from '../../src/providers/provider-manager'; import type { ApprovalResponse, SDKAgentRPC, SDKSessionRPC } from '../../src/rpc'; import { Session } from '../../src/session'; @@ -136,6 +136,10 @@ describe('ToolManager SkillTool registration', () => { const skills = new SkillRegistry(); skills.register(makeSkill('review')); const agent = makeAgent(skills); + const wireRecords: AgentRecord[] = []; + agent.records.onRecord = (record) => { + wireRecords.push(record); + }; const skillTool = agent.tools.loopTools.find((tool) => tool.name === 'Skill'); if (!(skillTool instanceof SkillTool)) { throw new Error('Expected SkillTool to be active'); @@ -149,9 +153,7 @@ describe('ToolManager SkillTool registration', () => { }); expect(result.output).toContain('loaded inline'); - expect( - agent.records.snapshot().find((record) => record.type === 'context.append_message'), - ).toMatchObject({ + expect(wireRecords.find((record) => record.type === 'context.append_message')).toMatchObject({ type: 'context.append_message', message: { role: 'user',