Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 78 additions & 17 deletions src/daemon/uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ export class EpisodeUploader {
async upload(): Promise<{
episodesUploaded: number;
sessionsProcessed: number;
cursorAdvanced: boolean;
sources: Array<{ source: string; episodes: number; sessions: number }>;
}> {
let totalEpisodes = 0;
let totalSessions = 0;
let anyCursorAdvanced = false;
const sources: Array<{
source: string;
episodes: number;
Expand All @@ -65,6 +67,7 @@ export class EpisodeUploader {
const result = await this.uploadSource(reader);
totalEpisodes += result.episodes;
totalSessions += result.sessions;
if (result.cursorAdvanced) anyCursorAdvanced = true;
if (result.episodes > 0 || result.sessions > 0) {
sources.push({
source: reader.source,
Expand All @@ -91,13 +94,14 @@ export class EpisodeUploader {
return {
episodesUploaded: totalEpisodes,
sessionsProcessed: totalSessions,
cursorAdvanced: anyCursorAdvanced,
sources,
};
}

private async uploadSource(
reader: IEpisodeReader,
): Promise<{ episodes: number; sessions: number }> {
): Promise<{ episodes: number; sessions: number; cursorAdvanced: boolean }> {
// Daemon cursor and pending_episodes both live in the server-local DB.
const cursor = await this.serverStateDb.getDaemonCursor(reader.source);

Expand All @@ -107,7 +111,7 @@ export class EpisodeUploader {
);

if (candidateSessions.length === 0) {
return { episodes: 0, sessions: 0 };
return { episodes: 0, sessions: 0, cursorAdvanced: false };
}

const candidateIds = candidateSessions.map((s) => s.id);
Expand All @@ -131,7 +135,7 @@ export class EpisodeUploader {
episodes = reader.getNewEpisodes(candidateIds, processedRanges);
} catch (err) {
logger.error(`[daemon/${reader.source}] getNewEpisodes failed:`, err);
return { episodes: 0, sessions: 0 };
return { episodes: 0, sessions: 0, cursorAdvanced: false };
}

const newEpisodes = episodes.filter(
Expand Down Expand Up @@ -180,22 +184,56 @@ export class EpisodeUploader {
}
}

// Advance daemon cursor — mirrors consolidation engine's boundary-safety logic.
// Uses lastSuccessMaxTime (not all episodes) so a failed insert doesn't
// silently advance the cursor past episodes that were never uploaded.
// Advance daemon cursor.
//
// There are three cases to handle:
//
// 1. Some episodes were uploaded successfully → advance cursor to the max
// message time of the last uploaded episode (lastSuccessMaxTime).
// If we also hit the batch limit, cap at (lastSession.maxMessageTime - 1)
// to avoid skipping sessions that share the boundary timestamp.
//
// 2. No episodes were produced (all sessions skipped — e.g. too few
// messages, or all episodes already uploaded) and no insert failures →
// advance cursor past all examined sessions. Without this, the daemon
// would re-fetch the same batch of ineligible sessions every cycle and
// the cursor would never advance (starvation bug).
//
// 3. An insert failure caused the upload loop to break early → advance
// only to lastSuccessMaxTime, which stays at the old cursor if nothing
// was uploaded before the failure. The failed episode will be retried
// on the next daemon run.
const lastSession = candidateSessions[candidateSessions.length - 1];
const hitBatchLimit =
candidateSessions.length === config.consolidation.maxSessionsPerRun;

const maxTime = lastSuccessMaxTime;
// Detect whether the upload loop completed without insert failures.
// If it did, we've fully examined all candidate sessions and can safely
// advance past them even if none produced episodes.
const allUploadsSucceeded = uploadedCount === newEpisodes.length;

let newCursor = maxTime;
if (hitBatchLimit) {
let newCursor = lastSuccessMaxTime;
if (allUploadsSucceeded && uploadedCount === 0) {
// Case 2: all sessions examined, none produced episodes.
// Advance past the entire batch so the next run picks up fresh sessions.
if (hitBatchLimit) {
// Cap at lastSession - 1 to avoid skipping a session that might
// share the boundary timestamp with the next batch's first session.
newCursor = Math.max(
newCursor,
lastSession.maxMessageTime - 1,
);
} else {
newCursor = Math.max(newCursor, lastSession.maxMessageTime);
}
} else if (hitBatchLimit) {
// Case 1 with batch limit: cap so we don't skip boundary sessions.
const cap = lastSession.maxMessageTime - 1;
if (cap > cursor.lastMessageTimeCreated) {
newCursor = Math.min(newCursor, cap);
}
} else {
// Case 1 without batch limit: advance past all examined sessions.
newCursor = Math.max(newCursor, lastSession.maxMessageTime);
}

Expand All @@ -215,6 +253,7 @@ export class EpisodeUploader {
return {
episodes: uploadedCount,
sessions: uploadedSessionIds.size,
cursorAdvanced: newCursor > cursor.lastMessageTimeCreated,
};
}

Expand All @@ -233,16 +272,33 @@ export class EpisodeUploader {
`[daemon] Starting. Upload interval: ${Math.round(intervalMs / 1000)}s. User: ${this.userId}`,
);

// Run immediately on start
await this.upload();
let timeoutHandle: ReturnType<typeof setTimeout> | null = null;

const interval = setInterval(async () => {
await this.upload().catch((err) => {
const scheduleNext = (immediate: boolean) => {
if (shuttingDown) return;
timeoutHandle = setTimeout(
() => void runCycle(),
immediate ? 0 : intervalMs,
);
};

const runCycle = async () => {
if (shuttingDown) return;
try {
const result = await this.upload();
// Run immediately if episodes were uploaded OR the cursor advanced
// (e.g. past ineligible sessions). Only sleep for intervalMs when
// all sources are genuinely caught up with no new sessions.
scheduleNext(
result.episodesUploaded > 0 || result.cursorAdvanced,
);
} catch (err) {
logger.error("[daemon] Upload cycle failed:", err);
});
}, intervalMs);
scheduleNext(false);
}
};

// Graceful shutdown: stop the interval, run caller cleanup, then exit.
// Graceful shutdown: stop the timeout, run caller cleanup, then exit.
// Uses process.on (not once) so both SIGTERM and SIGINT are always handled.
// The re-entrancy guard prevents double-cleanup if both signals fire in rapid
// succession before the async onShutdown resolves (process.once would leave
Expand All @@ -251,7 +307,7 @@ export class EpisodeUploader {
const cleanup = async () => {
if (shuttingDown) return;
shuttingDown = true;
clearInterval(interval);
if (timeoutHandle !== null) clearTimeout(timeoutHandle);
logger.log("[daemon] Stopping…");
if (onShutdown) {
await onShutdown().catch((err) => {
Expand All @@ -262,7 +318,12 @@ export class EpisodeUploader {
process.exit(0);
};

// Register shutdown handlers before first run so a SIGTERM during
// the initial upload cycle is handled cleanly.
process.on("SIGTERM", () => void cleanup());
process.on("SIGINT", () => void cleanup());

// Run immediately on start, then self-schedule via scheduleNext.
await runCycle();
}
}
59 changes: 59 additions & 0 deletions tests/daemon.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,65 @@ describe("EpisodeUploader.upload", () => {
expect(pending[0].id).toBe("pre-existing");
});

it("advances cursor past sessions that produce zero episodes (starvation fix)", async () => {
// Reproduces the starvation bug: when all candidate sessions have too few
// messages (e.g. 2-message automated sessions), getNewEpisodes returns [],
// and the old cursor logic would never advance — re-fetching the same
// batch of ineligible sessions on every daemon cycle.
const now = Date.now();
const oldCursor = now - 100000;
await db.updateDaemonCursor("opencode", {
lastMessageTimeCreated: oldCursor,
});

// 50 sessions that all produce 0 episodes (reader returns empty array)
const sessions = Array.from({ length: 50 }, (_, i) => ({
id: `empty-session-${i}`,
maxMessageTime: oldCursor + 1000 * (i + 1),
}));

const reader = makeMockReader("opencode", sessions, []);

const uploader = new EpisodeUploader([reader], db, "alice");
const result = await uploader.upload();

expect(result.episodesUploaded).toBe(0);

const cursor = await db.getDaemonCursor("opencode");
// Cursor must advance past the examined sessions — it must NOT stay at oldCursor.
// With hitBatchLimit=true (50 sessions = default maxSessionsPerRun), the cursor
// should advance to lastSession.maxMessageTime - 1.
const expectedCursor = sessions[sessions.length - 1].maxMessageTime - 1;
expect(cursor.lastMessageTimeCreated).toBe(expectedCursor);
});

it("advances cursor past all sessions when batch is not full and zero episodes", async () => {
// When the batch is NOT full (fewer sessions than maxSessionsPerRun),
// cursor should advance to lastSession.maxMessageTime (not -1).
const now = Date.now();
const oldCursor = now - 100000;
await db.updateDaemonCursor("opencode", {
lastMessageTimeCreated: oldCursor,
});

// Only 3 sessions, well below the batch limit
const sessions = [
{ id: "s1", maxMessageTime: oldCursor + 1000 },
{ id: "s2", maxMessageTime: oldCursor + 2000 },
{ id: "s3", maxMessageTime: oldCursor + 3000 },
];

const reader = makeMockReader("opencode", sessions, []);

const uploader = new EpisodeUploader([reader], db, "alice");
await uploader.upload();

const cursor = await db.getDaemonCursor("opencode");
expect(cursor.lastMessageTimeCreated).toBe(
sessions[sessions.length - 1].maxMessageTime,
);
});

it("skips a failed reader and continues with others", async () => {
const now = Date.now();
const failingReader: IEpisodeReader = {
Expand Down