Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,16 @@ curl -X POST -H "Authorization: Bearer <token>" \
http://127.0.0.1:3179/consolidate # HTTP API
```

### Process inbox (batch / cron)

```bash
knowledge-server process-inbox [dir]
```

One-shot command that runs the full pipeline end-to-end: uploads Markdown files from the inbox directory, consolidates all pending episodes, and runs KB synthesis. Designed for batch or cron use (e.g. Cloud Run Jobs) — no long-running server needed.

The directory defaults to `LOCAL_FILES_DIR` (or `~/knowledge`) if omitted.

### Test knowledge activation

```bash
Expand Down
86 changes: 9 additions & 77 deletions src/commands/consolidate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ import { ConsolidationEngine } from "../consolidation/consolidate.js";
import { PendingEpisodesReader } from "../consolidation/readers/pending.js";
import { StoreRegistry } from "../db/store-registry.js";
import { logger } from "../logger.js";
import {
drainConsolidation,
runEmbeddingCheck,
runSynthesisPass,
} from "./consolidation-drain.js";

/**
* `knowledge-server consolidate`
Expand Down Expand Up @@ -36,13 +41,7 @@ export async function runConsolidate(): Promise<void> {
// Check for embedding model change before consolidating — ensures all
// vectors are consistent when reconsolidation compares new extractions
// against existing entries.
try {
await activation.checkAndReEmbed();
} catch (e) {
console.error(
`Warning: embedding model check failed — ${e instanceof Error ? e.message : String(e)}`,
);
}
await runEmbeddingCheck(activation);

const { pendingSessions } = await consolidation.checkPending();

Expand All @@ -57,81 +56,14 @@ export async function runConsolidate(): Promise<void> {
);
}

let batch = 1;
let totalSessions = 0;
let totalCreated = 0;
let totalUpdated = 0;

// tryLock timeout — spin at most 10 seconds before giving up.
// In CLI mode there should be no concurrent callers, but if the server is
// running and holds the lock we surface a clear error rather than looping
// indefinitely.
const LOCK_TIMEOUT_MS = 10_000;
const LOCK_POLL_MS = 500;

while (true) {
if (!consolidation.tryLock()) {
// Shouldn't happen in CLI mode (no concurrent callers), but guard
// against a running server holding the lock.
let waited = 0;
while (!consolidation.tryLock()) {
if (waited >= LOCK_TIMEOUT_MS) {
console.error(
"Could not acquire consolidation lock after 10 s — is the server running? Stop it first.",
);
process.exit(1);
}
await new Promise((r) => setTimeout(r, LOCK_POLL_MS));
waited += LOCK_POLL_MS;
}
}
try {
const result = await consolidation.consolidate();

if (result.sessionsProcessed === 0) break;

totalSessions += result.sessionsProcessed;
totalCreated += result.entriesCreated;
totalUpdated += result.entriesUpdated;

console.log(
` Batch ${batch}: ${result.sessionsProcessed} sessions → ` +
`${result.entriesCreated} created, ${result.entriesUpdated} updated`,
);
batch++;
} catch (err) {
console.error(
`Batch ${batch} failed: ${err instanceof Error ? err.message : String(err)}`,
);
break;
} finally {
consolidation.unlock();
}
}
const { totalSessions, totalCreated, totalUpdated } =
await drainConsolidation(consolidation);

// Run KB synthesis once after all batches, same as the server-side drain.
// Runs unconditionally — existing entries may still be ripe even when no
// new sessions were processed (e.g. after a re-embedding pass).
// Wrapped in its own try/catch so a synthesis failure doesn't suppress the
// completion summary.
console.log("\nRunning KB synthesis pass...");
try {
if (consolidation.tryLock()) {
try {
await consolidation.runSynthesis();
} finally {
consolidation.unlock();
}
} else {
console.warn(
"Warning: could not acquire lock for synthesis — skipping.",
);
}
} catch (e) {
console.error(
`Warning: KB synthesis failed — ${e instanceof Error ? e.message : String(e)}`,
);
}
await runSynthesisPass(consolidation);

console.log("");
console.log("Consolidation complete.");
Expand Down
129 changes: 129 additions & 0 deletions src/commands/consolidation-drain.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import type { ActivationEngine } from "../activation/activate.js";
import type { ConsolidationEngine } from "../consolidation/consolidate.js";

/**
* Results from a full consolidation drain (all batches).
*/
export interface DrainResult {
totalSessions: number;
totalCreated: number;
totalUpdated: number;
}

/**
* Run the embedding model consistency check before consolidation.
*
* Non-fatal — logs a warning and continues if the check fails.
*/
export async function runEmbeddingCheck(
activation: ActivationEngine,
): Promise<void> {
try {
await activation.checkAndReEmbed();
} catch (e) {
console.error(
`Warning: embedding model check failed — ${e instanceof Error ? e.message : String(e)}`,
);
}
}

/**
* Acquire the consolidation lock, polling up to `timeoutMs`.
*
* Calls `process.exit(1)` if the lock cannot be acquired within the timeout —
* in CLI mode this indicates the server is holding the lock and the user should
* stop it first.
*/
function acquireLockOrExit(
consolidation: ConsolidationEngine,
timeoutMs = 10_000,
pollMs = 500,
): Promise<void> {
if (consolidation.tryLock()) return Promise.resolve();

return (async () => {
let waited = 0;
while (!consolidation.tryLock()) {
if (waited >= timeoutMs) {
console.error(
"Could not acquire consolidation lock after 10 s — is the server running? Stop it first.",
);
process.exit(1);
}
await new Promise((r) => setTimeout(r, pollMs));
waited += pollMs;
}
})();
}

/**
* Drain all pending sessions through consolidation in batches.
*
* Repeatedly calls `consolidation.consolidate()` until no sessions remain or a
* batch fails. Each batch acquires/releases the consolidation lock independently.
*
* Returns aggregate totals across all batches.
*/
export async function drainConsolidation(
consolidation: ConsolidationEngine,
): Promise<DrainResult> {
let batch = 1;
let totalSessions = 0;
let totalCreated = 0;
let totalUpdated = 0;

while (true) {
await acquireLockOrExit(consolidation);

try {
const result = await consolidation.consolidate();

if (result.sessionsProcessed === 0) break;

totalSessions += result.sessionsProcessed;
totalCreated += result.entriesCreated;
totalUpdated += result.entriesUpdated;

console.log(
` Batch ${batch}: ${result.sessionsProcessed} sessions → ` +
`${result.entriesCreated} created, ${result.entriesUpdated} updated`,
);
batch++;
} catch (err) {
console.error(
`Batch ${batch} failed: ${err instanceof Error ? err.message : String(err)}`,
);
break;
} finally {
consolidation.unlock();
}
}

return { totalSessions, totalCreated, totalUpdated };
}

/**
* Run KB synthesis under the consolidation lock.
*
* Non-fatal — logs a warning and continues if the lock can't be acquired or
* synthesis fails.
*/
export async function runSynthesisPass(
consolidation: ConsolidationEngine,
): Promise<void> {
try {
if (consolidation.tryLock()) {
try {
await consolidation.runSynthesis();
} finally {
consolidation.unlock();
}
} else {
console.warn("Warning: could not acquire lock for synthesis — skipping.");
}
} catch (e) {
console.error(
`Warning: KB synthesis failed — ${e instanceof Error ? e.message : String(e)}`,
);
}
}
137 changes: 137 additions & 0 deletions src/commands/process-inbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import { existsSync } from "node:fs";
import { resolve } from "node:path";
import { ActivationEngine } from "../activation/activate.js";
import { resolveUserId } from "../config-file.js";
import { ConsolidationEngine } from "../consolidation/consolidate.js";
import { PendingEpisodesReader } from "../consolidation/readers/pending.js";
import { LocalFilesEpisodeReader } from "../daemon/readers/local-files.js";
import { EpisodeUploader } from "../daemon/uploader.js";
import { DaemonDB } from "../db/daemon/index.js";
import { StoreRegistry } from "../db/store-registry.js";
import { logger } from "../logger.js";
import {
drainConsolidation,
runEmbeddingCheck,
runSynthesisPass,
} from "./consolidation-drain.js";

/**
* `knowledge-server process-inbox [dir]`
*
* One-shot command that runs the full pipeline end-to-end:
*
* 1. **Upload** — reads Markdown files from the inbox directory (like the
* daemon's local-files reader) and inserts them as pending episodes.
* 2. **Consolidate** — drains all pending episodes through the full
* consolidation pipeline (extraction → reconsolidation → contradiction
* scan → decay → embeddings).
* 3. **KB synthesis** — runs a synthesis pass over the knowledge graph to
* discover higher-order principles from accumulated entries.
*
* Designed for batch/cron use (e.g. Cloud Run Jobs): start, process, exit.
* No long-running server or daemon — just the pipeline.
*
* @param args CLI arguments. First positional argument is the inbox directory
* path. Falls back to LOCAL_FILES_DIR / ~/knowledge if omitted.
*/
export async function runProcessInbox(args: string[]): Promise<void> {
const { config } = await import("../config.js");
logger.init(config.logPath);

// ── Resolve inbox directory ──────────────────────────────────────────────
const inboxDir = args[0] ? resolve(args[0]) : config.localFilesDir;

if (!existsSync(inboxDir)) {
console.error(
`Inbox directory not found: ${inboxDir}\nPass a directory path as the first argument, or set LOCAL_FILES_DIR.`,
);
process.exit(1);
}

console.log(`Inbox directory: ${inboxDir}`);

// ── Phase 1: Upload (inbox → pending_episodes) ──────────────────────────
console.log("\n── Phase 1: Upload ──");

const registry = await StoreRegistry.create();
const db = registry.writableStore();
const { serverStateDb } = registry;

// Daemon-local DB for cursor tracking (always local SQLite).
const daemonDb = new DaemonDB();
const userId = resolveUserId();

// Only the local-files reader, pointed at the inbox directory.
const reader = new LocalFilesEpisodeReader(inboxDir);
const uploader = new EpisodeUploader(
[reader],
serverStateDb,
daemonDb,
userId,
);

let uploadResult: Awaited<ReturnType<typeof uploader.upload>>;
try {
uploadResult = await uploader.upload();
} finally {
reader.close();
daemonDb.close();
}

if (uploadResult.episodesUploaded > 0) {
console.log(
` Uploaded ${uploadResult.episodesUploaded} episodes from ${uploadResult.sessionsProcessed} sessions.`,
);
} else {
console.log(" No new files to process.");
}

// ── Phase 2: Consolidation (pending_episodes → knowledge) ───────────────
console.log("\n── Phase 2: Consolidate ──");

const activation = new ActivationEngine(
db,
registry.readStores(),
registry.writableStores(),
);
const consolidation = new ConsolidationEngine(
db,
serverStateDb,
activation,
[new PendingEpisodesReader(serverStateDb)],
registry.domainRouter,
);

try {
// Check embedding model consistency before consolidating.
await runEmbeddingCheck(activation);

const { pendingSessions } = await consolidation.checkPending();

if (pendingSessions > 0) {
console.log(
` ${pendingSessions} sessions pending — starting consolidation...`,
);
} else {
console.log(" No pending sessions. Running KB synthesis pass only...");
}

const { totalSessions, totalCreated, totalUpdated } =
await drainConsolidation(consolidation);

// ── Phase 3: KB Synthesis ────────────────────────────────────────────
console.log("\n── Phase 3: KB Synthesis ──");
await runSynthesisPass(consolidation);

// ── Summary ──────────────────────────────────────────────────────────
console.log("");
console.log("Pipeline complete.");
console.log(` Files uploaded: ${uploadResult.episodesUploaded}`);
console.log(` Sessions processed: ${totalSessions}`);
console.log(` Entries created: ${totalCreated}`);
console.log(` Entries updated: ${totalUpdated}`);
} finally {
consolidation.close();
await registry.close();
}
}
Loading
Loading