Skip to content
6 changes: 6 additions & 0 deletions .changeset/record-persistence-boundary.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@moonshot-ai/agent-core": patch
"@moonshot-ai/kimi-code": patch
---

Move wire metadata handling into the record layer and keep persistence backends limited to storage operations.
3 changes: 0 additions & 3 deletions packages/agent-core/src/agent/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,6 @@ export class Agent {
: undefined),
);
this.records.onRecord = config.onRecord;
this.records.onError = (error, record) => {
this.emitRecordsWriteError(error, record);
};
this.fullCompaction = new FullCompaction(this, config.compactionStrategy);
this.context = new ContextMemory(this);
this.config = new ConfigState(this);
Expand Down
51 changes: 32 additions & 19 deletions packages/agent-core/src/agent/records/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
import type { Agent } from '..';
import type { AgentRecord, AgentRecordPersistence } from './types';
import {
AGENT_WIRE_PROTOCOL_VERSION,
type AgentRecord,
type AgentRecordPersistence,
} from './types';

export * from './types';
export { FileSystemAgentRecordPersistence } from './wire-file';
export type { FileSystemAgentRecordPersistenceOptions } from './wire-file';
export {
FileSystemAgentRecordPersistence,
InMemoryAgentRecordPersistence,
} from './persistence';
export type { FileSystemAgentRecordPersistenceOptions } from './persistence';

// Contract: restore MUST NOT emit UI events, call the LLM, execute tools, or
// touch the filesystem in a way that triggers external side effects. Each case
// should reproduce the in-memory state the live handler left behind, nothing more.
export function restoreAgentRecord(agent: Agent, input: AgentRecord): void {
switch (input.type) {
case 'metadata':
return;
case 'turn.prompt':
agent.turn.restorePrompt();
return;
Expand Down Expand Up @@ -82,10 +91,9 @@ export function restoreAgentRecord(agent: Agent, input: AgentRecord): void {
}

export class AgentRecords {
private readonly records: AgentRecord[] = [];
private _restoring = false;
private metadataInitialized = false;
onRecord?: (record: AgentRecord) => void;
onError?: (error: unknown, record: AgentRecord) => void;

constructor(
private readonly restoreRecord: (record: AgentRecord) => void,
Expand All @@ -100,11 +108,23 @@ export class AgentRecords {
if (this._restoring) return;
const stamped: AgentRecord =
record.time !== undefined ? record : { ...record, time: Date.now() };
this.records.push(stamped);
if (
this.persistence !== undefined &&
!this.metadataInitialized &&
stamped.type !== 'metadata'
) {
this.persistence.append({
type: 'metadata',
protocol_version: AGENT_WIRE_PROTOCOL_VERSION,
created_at: Date.now(),
});
this.metadataInitialized = true;
}
if (stamped.type === 'metadata') {
this.metadataInitialized = true;
}
this.persistence?.append(stamped);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve async append error handling in record logging

AgentRecords.logRecord now calls persistence.append synchronously and ignores its return value, so a custom persistence passed via AgentConfig.persistence that still implements append as async (the pre-change contract) can reject without being observed. In that case record-write failures are no longer routed through the existing error path and may surface as unhandled promise rejections while turn APIs continue as if persistence succeeded, which is a regression from the previous append(...).catch(...) behavior.

Useful? React with 👍 / 👎.

this.onRecord?.(stamped);
void this.persistence?.append(stamped).catch((error) => {
this.onError?.(error, stamped);
});
}

restore(record: AgentRecord): void {
Expand All @@ -119,20 +139,13 @@ export class AgentRecords {
async replay(): Promise<void> {
if (!this.persistence) throw new Error('No persistence provided for AgentRecords');
for await (const record of this.persistence.read()) {
this.records.push(record);
this._restoring = true;
try {
this.restoreRecord(record);
} finally {
this._restoring = false;
if (!this.metadataInitialized) {
this.metadataInitialized = true;
}
this.restore(record);
}
}

snapshot(): readonly AgentRecord[] {
return [...this.records];
}

async flush(): Promise<void> {
await this.persistence?.flush();
}
Expand Down
203 changes: 203 additions & 0 deletions packages/agent-core/src/agent/records/persistence.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
import { createReadStream } from 'node:fs';
import { mkdir, open } from 'node:fs/promises';
import { dirname } from 'node:path';

import { syncDir } from '../../utils/fs';
import { type AgentRecord, type AgentRecordPersistence } from './types';

export interface FileSystemAgentRecordPersistenceOptions {
readonly onError?: ((error: unknown) => void) | undefined;
}

export class InMemoryAgentRecordPersistence implements AgentRecordPersistence {
readonly records: AgentRecord[] = [];

constructor(records: readonly AgentRecord[] = []) {
this.records.push(...records);
}

async *read(): AsyncIterable<AgentRecord> {
for (const record of this.records) {
yield record;
}
}

append(input: AgentRecord): void {
this.records.push(input);
}

rewrite(records: readonly AgentRecord[]): void {
this.records.splice(0, this.records.length, ...records);
}

async flush(): Promise<void> {}

async close(): Promise<void> {}
}

export class FileSystemAgentRecordPersistence implements AgentRecordPersistence {
private readonly pendingRecords: AgentRecord[] = [];
private shouldClear = false;
private directorySynced = false;
private flushPromise: Promise<void> | undefined;
private error: unknown;

constructor(
private readonly filePath: string,
private readonly options: FileSystemAgentRecordPersistenceOptions = {},
) {}

async *read(): AsyncIterable<AgentRecord> {
await this.flush();

let line = '';
let lineNumber = 0;
const stream = createReadStream(this.filePath, { encoding: 'utf8' });
try {
for await (const chunk of stream) {
line += chunk;
let newlineIndex = line.indexOf('\n');
while (newlineIndex !== -1) {
const rawLine = line.slice(0, newlineIndex);
line = line.slice(newlineIndex + 1);
lineNumber++;

const record = parseRecordLine(
rawLine.endsWith('\r') ? rawLine.slice(0, -1) : rawLine,
lineNumber,
this.filePath,
false,
);
if (record !== undefined) yield record;

newlineIndex = line.indexOf('\n');
}
}
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === 'ENOENT') return;
throw error;
}

if (line.length > 0) {
lineNumber++;
const record = parseRecordLine(line, lineNumber, this.filePath, true);
if (record !== undefined) yield record;
}
}

append(input: AgentRecord): void {
this.throwIfError();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent record logging from throwing after persistence errors

The filesystem backend now latches write failures (this.error) and append() throws synchronously on every later call via throwIfError(). Because AgentRecords.logRecord calls append without a try/catch, any subsequent prompt/config/tool action that logs a record can fail with a raw filesystem exception instead of only emitting records.write_failed, effectively making the agent unusable after a transient I/O failure.

Useful? React with 👍 / 👎.

this.pendingRecords.push(input);
this.scheduleFlush();
}

rewrite(records: readonly AgentRecord[]): void {
this.throwIfError();
this.shouldClear = true;
this.pendingRecords.splice(0, this.pendingRecords.length, ...records);
this.scheduleFlush();
}

async flush(): Promise<void> {
this.throwIfError();
while (
this.flushPromise !== undefined ||
this.shouldClear ||
this.pendingRecords.length > 0
) {
await this.ensureFlush();
this.throwIfError();
}
}

async close(): Promise<void> {
await this.flush();
}

private scheduleFlush(): void {
void this.ensureFlush().catch((error) => {
this.options.onError?.(error);
});
}

private ensureFlush(): Promise<void> {
if (this.flushPromise !== undefined) return this.flushPromise;

const promise = this.drainPendingRecords()
.catch((error: unknown) => {
this.error = error;
// oxlint-disable-next-line typescript-eslint/only-throw-error
throw error;
})
.finally(() => {
if (this.flushPromise === promise) {
this.flushPromise = undefined;
}
if (
this.error === undefined &&
(this.shouldClear || this.pendingRecords.length > 0)
) {
this.scheduleFlush();
}
});
this.flushPromise = promise;
return promise;
}

private throwIfError(): void {
// oxlint-disable-next-line typescript-eslint/only-throw-error
if (this.error !== undefined) throw this.error;
}

private async drainPendingRecords(): Promise<void> {
while (this.shouldClear || this.pendingRecords.length > 0) {
await this.drainBatch();
}
}

private async drainBatch(): Promise<void> {
const shouldClear = this.shouldClear;
const batch = this.pendingRecords.splice(0);
this.shouldClear = false;

const content = batch.map((e) => JSON.stringify(e) + '\n').join('');
const directory = dirname(this.filePath);
await mkdir(directory, { recursive: true });

const fh = await open(this.filePath, shouldClear ? 'w' : 'a');
try {
if (content.length > 0) {
await fh.writeFile(content, 'utf8');
}
await fh.sync();
} finally {
await fh.close();
}

if (!this.directorySynced) {
await syncDir(directory);
this.directorySynced = true;
}
}
}

function parseRecordLine(
line: string,
lineNumber: number,
filePath: string,
allowTruncated: boolean,
): AgentRecord | undefined {
if (line.length === 0) return undefined;
try {
return JSON.parse(line) as AgentRecord;
} catch (parseError) {
// Tolerate a truncated trailing line — last write may have crashed
// mid-flush; everything before is still well-formed.
if (allowTruncated) return undefined;
throw new Error(
`wire.jsonl: corrupted line ${lineNumber} in ${filePath}: ${String(parseError)}`,
{ cause: parseError },
);
}
}
8 changes: 7 additions & 1 deletion packages/agent-core/src/agent/records/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import type { UserToolRegistration } from '../tool';
import type { UsageRecordScope } from '../usage';

export interface AgentRecordEvents {
metadata: {
protocol_version: string;
created_at: number;
};

'turn.prompt': {
input: readonly ContentPart[];
origin: PromptOrigin;
Expand Down Expand Up @@ -85,7 +90,8 @@ export const AGENT_WIRE_PROTOCOL_VERSION = '1.0';

export interface AgentRecordPersistence {
read(): AsyncIterable<AgentRecord>;
append(input: AgentRecord): Promise<void>;
append(input: AgentRecord): void;
rewrite(records: readonly AgentRecord[]): void;
Comment on lines +93 to +94
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid breaking exported persistence interface in patch release

AgentRecordPersistence is exported from the public agent API, and this change makes rewrite mandatory while changing append from Promise<void> to void. Any external persistence implementation compiled against the previous contract will now fail type-checking (missing rewrite) or silently change behavior around async append completion, so this is a breaking API change that should be compatibility-gated rather than shipped as a patch-level contract swap.

Useful? React with 👍 / 👎.

flush(): Promise<void>;
close(): Promise<void>;
}
Loading
Loading