Skip to content
Open
8 changes: 7 additions & 1 deletion src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
* Mounts auth, workspace, session, chat, skills, and sources routes.
*/
import { createRouter } from '@agentuity/runtime';
import { auth, authMiddleware, authRoutes } from '../auth';
import { auth, authMiddleware, apiKeyMiddleware, authRoutes } from '../auth';
import workspaceRoutes from '../routes/workspaces';
import taskRoutes from '../routes/tasks';
import sessionRoutes from '../routes/sessions';
import sessionDetailRoutes from '../routes/session-detail';
import chatRoutes from '../routes/chat';
Expand Down Expand Up @@ -36,6 +37,11 @@ api.get('/auth-methods', (c) => {
// Shared session routes (public — no authentication required)
api.route('/shared', sharedRoutes);

// Public Tasks API (API key authentication)
api.use('/v1/tasks/*', apiKeyMiddleware);
api.use('/v1/tasks', apiKeyMiddleware);
api.route('/v1/tasks', taskRoutes);

// All other routes require authentication
api.use('/*', authMiddleware);

Expand Down
80 changes: 80 additions & 0 deletions src/lib/webhook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Webhook invocation utility with retry logic.
*
* Fires a POST to the caller-supplied webhook URL when a task
* reaches a terminal state (completed, error, terminated).
*/

export interface WebhookPayload {
taskId: string;
status: 'completed' | 'error' | 'terminated';
repoUrl?: string;
branch?: string;
summary?: string;
prUrl?: string;
error?: string;
completedAt: string;
}

interface WebhookOptions {
/** Maximum number of delivery attempts (default: 3). */
maxAttempts?: number;
/** Initial backoff in ms before the first retry (default: 1000). */
initialBackoffMs?: number;
}

/**
* Deliver a webhook payload via POST with exponential-backoff retry.
*
* Returns `true` if the webhook was delivered (2xx response),
* `false` if all attempts failed.
*/
export async function deliverWebhook(
url: string,
payload: WebhookPayload,
options: WebhookOptions = {},
): Promise<boolean> {
const maxAttempts = options.maxAttempts ?? 3;
const initialBackoffMs = options.initialBackoffMs ?? 1_000;

for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'User-Agent': 'Agentuity-Coder/1.0',
'X-Webhook-Attempt': String(attempt),
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(10_000), // 10s timeout per attempt
});

if (response.ok) {
return true;
}

// Non-retryable client errors (4xx except 429)
if (response.status >= 400 && response.status < 500 && response.status !== 429) {
console.warn(
`[webhook] Non-retryable ${response.status} from ${url} (attempt ${attempt}/${maxAttempts})`,
);
return false;
}
} catch (err) {
console.warn(
`[webhook] Delivery attempt ${attempt}/${maxAttempts} to ${url} failed:`,
err instanceof Error ? err.message : err,
);
}

// Exponential backoff before next retry
if (attempt < maxAttempts) {
const backoff = initialBackoffMs * Math.pow(2, attempt - 1);
await new Promise((r) => setTimeout(r, backoff));
}
}

console.error(`[webhook] All ${maxAttempts} attempts to ${url} failed`);
return false;
}
90 changes: 81 additions & 9 deletions src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import { and, eq } from '@agentuity/drizzle';
import { getOpencodeClient, buildBasicAuthHeader } from '../opencode';
import { sandboxListFiles, sandboxReadFile, sandboxExecute, sandboxWriteFiles } from '@agentuity/server';
import { normalizeSandboxPath } from '../lib/path-utils';
import { parseMetadata } from '../lib/parse-metadata';
import { deliverWebhook } from '../lib/webhook';
import type { WebhookPayload } from '../lib/webhook';
import { decrypt } from '../lib/encryption';
import { SpanStatusCode } from '@opentelemetry/api';
import { COMMAND_TO_AGENT, TEMPLATE_COMMANDS } from '../lib/agent-commands';
Expand Down Expand Up @@ -94,6 +97,70 @@ function isAllowedFilename(filename: string) {

const api = createRouter();

// ---------------------------------------------------------------------------
// Session completion detection & webhook delivery
// ---------------------------------------------------------------------------

/**
* Detect whether an SSE event signals session completion (session.idle).
* When detected, update DB status to 'completed' and fire webhook if configured.
*/
export async function handleSessionCompletionEvent(
event: any,
sessionId: string,
opencodeSessionId: string,
): Promise<void> {
// OpenCode emits "session.idle" when the AI finishes its work
const eventType = event?.type;
if (eventType !== 'session.idle') return;

// Check if this event is for our session
const props = event?.properties;
const eventSessionId =
props?.sessionID || props?.info?.sessionID || props?.info?.id || props?.part?.sessionID;
if (eventSessionId && eventSessionId !== opencodeSessionId) return;

// Only handle completion for API-created tasks — leave web UI sessions untouched
const [current] = await db
.select()
.from(chatSessions)
.where(eq(chatSessions.id, sessionId))
.limit(1);
if (!current) return;

const metadata = parseMetadata(current);
if (metadata.source !== 'api') return;
if (current.status === 'completed') return; // Already processed — prevent duplicate webhooks

// Update session status to 'completed' (only if still active, to win the race)
const [updated] = await db
.update(chatSessions)
.set({ status: 'completed', updatedAt: new Date() })
.where(and(eq(chatSessions.id, sessionId), eq(chatSessions.status, 'active')))
.returning();

if (!updated) return;

// Check for webhook URL in metadata
const webhookUrl = typeof metadata.webhookUrl === 'string' ? metadata.webhookUrl : null;
if (!webhookUrl) return;

// Build webhook payload
const payload: WebhookPayload = {
taskId: sessionId,
status: 'completed',
repoUrl: typeof metadata.repoUrl === 'string' ? metadata.repoUrl : undefined,
branch: typeof metadata.branch === 'string' ? metadata.branch : undefined,
prUrl: (metadata.pullRequest as any)?.url ?? undefined,
completedAt: new Date().toISOString(),
};

// Fire-and-forget webhook delivery (don't block SSE)
deliverWebhook(webhookUrl, payload).catch((err) => {
console.error(`[webhook] Failed to deliver for session ${sessionId}:`, err);
});
}

// ---------------------------------------------------------------------------
// GET /api/sessions/:id/messages — fetch existing messages for page load
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -371,20 +438,25 @@ api.get(
const jsonStr = line.slice(6).trim();
if (!jsonStr) continue;

try {
const event = JSON.parse(jsonStr);
// Filter by session
const props = (event as any)?.properties;
const eventSessionId =
props?.sessionID ||
props?.info?.sessionID ||
props?.info?.id ||
props?.part?.sessionID;
try {
const event = JSON.parse(jsonStr);
// Filter by session
const props = (event as any)?.properties;
const eventSessionId =
props?.sessionID ||
props?.info?.sessionID ||
props?.info?.id ||
props?.part?.sessionID;

if (eventSessionId && eventSessionId !== session.opencodeSessionId) {
continue;
}

// Detect session completion and trigger webhook (fire-and-forget)
handleSessionCompletionEvent(event, session.id, session.opencodeSessionId!).catch(
(err) => console.error('[webhook] Completion event handling failed:', err),
);

await safeWrite({ data: JSON.stringify(event) });
} catch {
// Skip malformed events
Expand Down
Loading