Webhooks + API for automating conversation flows and knowledge base management#27
Conversation
GeorgiZhelev
commented
Mar 13, 2026
- feat(convex): add automation api and webhook scaffolding
- feat(convex): refine automation api and webhook scaffolding
- Make API and webhooks more robust
- add automation API CRUD for articles & collections (2.1b)
|
@GeorgiZhelev is attempting to deploy a commit to the djanogly's projects Team on Vercel. A member of the Team first needs to authorize it. |
Review Summary by QodoAutomation API and webhook system with full CRUD operations, event delivery, and credential management
WalkthroughsDescription• **Comprehensive automation API and webhook system** with full CRUD operations for conversations, messages, visitors, tickets, articles, collections, and outbound messages • **Webhook delivery system** with HMAC-SHA256 signing, exponential backoff retry logic (5 attempts), and manual replay functionality • **API credential management** with secure osk_ prefixed credentials, SHA-256 hashing, scope-based access control, and audit logging • **Authentication and rate limiting** via withAutomationAuth middleware with workspace-level (120 req/min) and credential-level (60 req/min) limits • **Conversation claim management** for external automation with 5-minute lease expiration, renewal support, and escalation/release operations • **Event emission system** that triggers webhook deliveries for matching subscriptions with cursor-based pagination and filtering • **AI agent automation claim suppression** to prevent responses when conversation is claimed by external automation • **Article and collection write operations** refactored into reusable core helpers with embedding management and status transitions • **Database schema** for automation credentials, events, webhooks, deliveries, conversation claims, and idempotency keys • **HTTP API routes** exposing 40+ endpoints under /api/v1/ namespace with proper error handling and validation • **Webhook secret encryption** using AES-GCM with environment-based key management • **Comprehensive test coverage** including CRUD operations, security, idempotency, rate limiting, pagination, and event emission • **React hooks and UI components** for managing credentials, webhooks, and delivery logs in settings • **Cron jobs** for expiring stale claims and cleaning up expired idempotency keys • **Audit logging** with 24 new automation-related action types Diagramflowchart LR
A["API Credentials<br/>osk_ prefix<br/>SHA-256 hash"] -->|withAutomationAuth| B["Rate Limiting<br/>120/min workspace<br/>60/min credential"]
B -->|validates| C["HTTP Routes<br/>40+ endpoints<br/>/api/v1/"]
C -->|CRUD ops| D["Resources<br/>Conversations<br/>Messages<br/>Tickets<br/>Articles"]
D -->|emits| E["Automation Events<br/>Cursor pagination<br/>Filtering"]
E -->|triggers| F["Webhook Subscriptions<br/>Event filtering<br/>Resource types"]
F -->|delivers| G["Webhook Worker<br/>HMAC-SHA256<br/>Exponential backoff<br/>5 retries"]
H["Conversation Claims<br/>5-min lease<br/>Renewal support"] -->|suppresses| I["AI Agent<br/>Claim detection<br/>Graceful handling"]
J["Article/Collection<br/>Core Helpers<br/>Embedding mgmt"] -->|refactored| D
File Changes1. packages/convex/tests/automationFixes.test.ts
|
Code Review by Qodo
1.
|
| // Check idempotency key if provided | ||
| if (args.idempotencyKey) { | ||
| const existing = await ctx.db | ||
| .query("automationIdempotencyKeys") | ||
| .withIndex("by_workspace_key", (q) => | ||
| q.eq("workspaceId", args.workspaceId).eq("key", args.idempotencyKey!) | ||
| ) | ||
| .first(); | ||
|
|
||
| if (existing && existing.expiresAt >= Date.now()) { | ||
| return { cached: true, result: existing.responseSnapshot }; | ||
| } | ||
| } | ||
|
|
||
| // Perform the message send (same logic as sendMessageForAutomation) | ||
| const conv = await ctx.db.get(args.conversationId); | ||
| if (!conv || conv.workspaceId !== args.workspaceId) { | ||
| throw new Error("Conversation not found"); | ||
| } | ||
|
|
||
| const claim = await ctx.db | ||
| .query("automationConversationClaims") | ||
| .withIndex("by_conversation_status", (q) => | ||
| q.eq("conversationId", args.conversationId).eq("status", "active") | ||
| ) | ||
| .first(); | ||
|
|
||
| if (!claim || claim.credentialId !== args.credentialId) { | ||
| throw new Error("No active claim for this conversation. Claim the conversation first."); | ||
| } | ||
|
|
||
| if (claim.expiresAt < Date.now()) { | ||
| throw new Error("Claim has expired. Renew or re-claim the conversation."); | ||
| } | ||
|
|
||
| const now = Date.now(); | ||
| const messageId = await ctx.db.insert("messages", { | ||
| conversationId: args.conversationId, | ||
| senderId: `automation:${args.actorName}`, | ||
| senderType: "bot", | ||
| content: args.content, | ||
| automationCredentialId: args.credentialId, | ||
| createdAt: now, | ||
| }); | ||
|
|
||
| await ctx.db.patch(args.conversationId, { | ||
| updatedAt: now, | ||
| lastMessageAt: now, | ||
| unreadByVisitor: (conv.unreadByVisitor || 0) + 1, | ||
| }); | ||
|
|
||
| await ctx.db.patch(claim._id, { | ||
| expiresAt: now + 5 * 60 * 1000, | ||
| }); | ||
|
|
||
| await logAudit(ctx, { | ||
| workspaceId: args.workspaceId, | ||
| actorType: "api", | ||
| action: "automation.message.sent", | ||
| resourceType: "message", | ||
| resourceId: String(messageId), | ||
| metadata: { credentialId: String(args.credentialId) }, | ||
| }); | ||
|
|
||
| await emitAutomationEvent(ctx, { | ||
| workspaceId: args.workspaceId, | ||
| eventType: "message.created", | ||
| resourceType: "message", | ||
| resourceId: messageId, | ||
| data: { conversationId: args.conversationId, senderType: "bot", channel: conv.channel ?? "chat" }, | ||
| }); | ||
|
|
||
| const result = { id: messageId }; | ||
|
|
||
| // Store idempotency key if provided | ||
| if (args.idempotencyKey) { | ||
| await ctx.db.insert("automationIdempotencyKeys", { | ||
| workspaceId: args.workspaceId, | ||
| key: args.idempotencyKey, | ||
| credentialId: args.credentialId, | ||
| resourceType: "message", | ||
| resourceId: String(messageId), | ||
| responseSnapshot: result, | ||
| expiresAt: now + IDEMPOTENCY_TTL_MS, | ||
| }); |
There was a problem hiding this comment.
2. Idempotency race duplicates 🐞 Bug ✓ Correctness
sendMessageIdempotent is vulnerable to concurrent requests using the same Idempotency-Key: both can miss the initial lookup and create separate messages before inserting their idempotency rows. This breaks the endpoint’s idempotency guarantee and can create duplicate outbound messages on client retries.
Agent Prompt
### Issue description
`sendMessageIdempotent` implements idempotency as a read-then-write pattern. Under concurrent requests with the same `Idempotency-Key`, both can see no existing key and both create a message before writing the idempotency row, producing duplicates.
### Issue Context
Convex indexes do not enforce uniqueness. When the initial lookup returns no documents, the mutation reads no documents, so concurrent mutations can both proceed without conflicts.
### Fix Focus Areas
- packages/convex/convex/automationApiInternals.ts[583-684]
- packages/convex/convex/schema/automationTables.ts[94-110]
### Implementation direction
- Introduce a deterministic, conflict-inducing “lock”/reservation mechanism for `(workspaceId, key)` before creating the message (e.g., reserve an idempotency record first and make concurrent requests conflict on a shared document), then patch it with `responseSnapshot` after success.
- Ensure subsequent requests:
- return the stored `responseSnapshot` when present and unexpired
- do **not** create a new message when a reservation exists for the same key
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
There was a problem hiding this comment.
False positive. Convex mutations are serializable and automatically retried on OCC conflicts. This mutation reads the idempotency-key range before inserting, so a concurrent insert for the same key causes a retry; the retried execution then returns the cached result
| // 1. Check workspace-level rate limit first (120 req/min) | ||
| const wsRateLimit = await ctx.db | ||
| .query("automationWorkspaceRateLimits") | ||
| .withIndex("by_workspace", (q) => q.eq("workspaceId", args.workspaceId)) | ||
| .first(); | ||
|
|
||
| if (wsRateLimit) { | ||
| if (now > wsRateLimit.windowStart + RATE_LIMIT_WINDOW_MS) { | ||
| await ctx.db.patch(wsRateLimit._id, { windowStart: now, count: 1 }); | ||
| } else if (wsRateLimit.count >= WORKSPACE_RATE_LIMIT) { | ||
| const retryAfter = Math.ceil((wsRateLimit.windowStart + RATE_LIMIT_WINDOW_MS - now) / 1000); | ||
| return { allowed: false, retryAfter }; | ||
| } else { | ||
| await ctx.db.patch(wsRateLimit._id, { count: wsRateLimit.count + 1 }); | ||
| } | ||
| } else { | ||
| await ctx.db.insert("automationWorkspaceRateLimits", { | ||
| workspaceId: args.workspaceId, | ||
| windowStart: now, | ||
| count: 1, | ||
| }); | ||
| } |
There was a problem hiding this comment.
3. Workspace rate-limit bypass 🐞 Bug ⛯ Reliability
checkRateLimit can create multiple automationWorkspaceRateLimits rows for the same workspace under concurrent first requests, and later requests read an arbitrary .first() row. This makes workspace-level rate limiting unreliable and can allow exceeding the intended per-workspace cap.
Agent Prompt
### Issue description
Workspace-level rate limiting can be bypassed because `checkRateLimit` inserts a new `automationWorkspaceRateLimits` doc when none exists; under concurrency, multiple docs can be created, and later `.first()` reads only one of them.
### Issue Context
Convex indexes do not enforce uniqueness. If multiple rows exist for the same workspace, the code does not reconcile them.
### Fix Focus Areas
- packages/convex/convex/lib/automationAuth.ts[141-175]
- packages/convex/convex/schema/automationTables.ts[106-110]
### Implementation direction
- Make the rate-limit row a true singleton per workspace:
- create it at workspace provisioning time (preferred), or
- store counters on the `workspaces` document itself, or
- add a dedicated singleton table keyed by workspace with a conflict-inducing update path.
- If existing duplicates are possible, add reconciliation/cleanup logic (pick one canonical row and delete/merge others).
ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools
There was a problem hiding this comment.
False positive. Same Convex serializable OCC reasoning applies here: the mutation reads the workspace-scoped index range before inserting, so a concurrent insert into that range causes a retry rather than two committed cold-start rows