Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 197 additions & 1 deletion docs/sessions.md
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,198 @@ const myStorage: SessionProvider = {

---

## Postgres (External Database)

The built-in providers use Durable Object SQLite. If you need session data in an external Postgres database — for cross-DO queries, analytics, or shared state — use `PostgresSessionProvider` and `PostgresContextProvider`.

These work with any Postgres-compatible database (Neon, Supabase, PlanetScale, etc.) via [Cloudflare Hyperdrive](https://developers.cloudflare.com/hyperdrive/) for connection pooling.

### Setup

#### 1. Create a Postgres database

Use any Postgres provider and copy the connection string.

#### 2. Create a Hyperdrive config

```bash
npx wrangler hyperdrive create my-session-db \
--connection-string="postgresql://user:password@host:port/dbname"
```

Copy the returned Hyperdrive ID.

#### 3. Create the tables

The Postgres user typically won't have `CREATE TABLE` permissions. Run this once in your database console:

```sql
CREATE TABLE IF NOT EXISTS assistant_messages (
id TEXT PRIMARY KEY,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should use session_id as part of the primary key.

Since this table is shared across sessions, id by itself makes message ids global to the whole db. If two sessions ever use the same message id, one of them gets silently skipped.

Could we make this use

PRIMARY KEY (session_id, id)

and update the insert conflict to match

session_id TEXT NOT NULL DEFAULT '',
parent_id TEXT,
role TEXT NOT NULL,
content TEXT NOT NULL,
text_content TEXT NOT NULL DEFAULT '',
created_at TIMESTAMPTZ DEFAULT NOW(),
content_tsv TSVECTOR GENERATED ALWAYS AS (to_tsvector('english', text_content)) STORED
);
CREATE INDEX IF NOT EXISTS idx_assistant_msg_parent ON assistant_messages (parent_id);
CREATE INDEX IF NOT EXISTS idx_assistant_msg_session ON assistant_messages (session_id);
CREATE INDEX IF NOT EXISTS idx_assistant_msg_fts ON assistant_messages USING GIN (content_tsv);

CREATE TABLE IF NOT EXISTS assistant_compactions (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL DEFAULT '',
summary TEXT NOT NULL,
from_message_id TEXT NOT NULL,
to_message_id TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS cf_agents_context_blocks (
label TEXT PRIMARY KEY,
content TEXT NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS cf_agents_search_entries (
label TEXT NOT NULL,
key TEXT NOT NULL,
content TEXT NOT NULL,
content_tsv TSVECTOR GENERATED ALWAYS AS (to_tsvector('english', content)) STORED,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
PRIMARY KEY (label, key)
);
CREATE INDEX IF NOT EXISTS idx_search_entries_fts ON cf_agents_search_entries USING GIN (content_tsv);
```

#### 4. Configure wrangler

```jsonc
{
"compatibility_flags": ["nodejs_compat"],
"hyperdrive": [
{
"binding": "HYPERDRIVE",
"id": "<your-hyperdrive-id>"
}
],
"placement": {
"region": "aws:us-east-1" // match your database region
}
}
```

#### 5. Wire it up

```typescript
import { Agent, callable } from "agents";
import {
Session,
PostgresSessionProvider,
PostgresContextProvider,
PostgresSearchProvider
} from "agents/experimental/memory/session";
import { Client } from "pg";

class MyAgent extends Agent<Env> {
private _session?: Session;
private _pgClient?: Client;

/**
* Open a `pg.Client` against Hyperdrive once, and reuse it.
* The providers take the raw client directly — no wrapper needed.
*/
private async getPgClient(): Promise<Client> {
if (this._pgClient) return this._pgClient;
const client = new Client({
connectionString: this.env.HYPERDRIVE.connectionString
});
await client.connect();
// Only cache after connect() resolves so a failed attempt doesn't
// permanently poison the instance.
this._pgClient = client;
return client;
}

private async getSession(): Promise<Session> {
if (this._session) return this._session;

const client = await this.getPgClient();
const sessionId = this.ctx.id.toString();

this._session = Session.create(
new PostgresSessionProvider(client, sessionId)
)
.withContext("soul", {
provider: {
get: async () => "You are a helpful assistant."
}
})
.withContext("memory", {
description: "Short facts",
maxTokens: 1100,
provider: new PostgresContextProvider(client, `memory_${sessionId}`)
})
.withContext("knowledge", {
description: "Searchable knowledge base",
provider: new PostgresSearchProvider(client)
})
.withCachedPrompt(
new PostgresContextProvider(client, `_prompt_${sessionId}`)
);

return this._session;
}
}
```

### How it works

When `Session.create()` receives a `SessionProvider` instead of a `SqlProvider`, it skips all SQLite auto-wiring. This means:

- **Context blocks need explicit providers.** No auto-wiring to SQLite — each `withContext()` call needs a `provider` option, or the block will be read-only with no storage.
- **`withCachedPrompt()` needs an explicit provider.** Pass a `PostgresContextProvider` to persist the frozen system prompt.
- **Broadcaster is skipped.** WebSocket status broadcasts (`CF_AGENT_SESSION` events) only work with `SqlProvider`-based sessions.
- **All Session methods are async.** `getHistory()`, `getMessage()`, etc. return Promises since the underlying storage is async.

### System prompt lifecycle

- **`freezeSystemPrompt()`** — returns the cached prompt from the store. On first call (cache miss), loads blocks from providers, renders, and persists. Subsequent calls return the stored value without re-rendering. This preserves LLM prefix cache hits.
- **`refreshSystemPrompt()`** — force reloads blocks from providers, re-renders, and updates the store. Call this to invalidate the cached prompt (e.g. after `clearMessages`).

### Connection types

The Postgres providers accept either of:

- A raw `pg.Client` (or any object with a compatible `query(text, values)` method) — the recommended path for Hyperdrive.
- Any object implementing `PostgresConnection` — useful for tests or custom drivers.

```typescript
// For tests or custom drivers
interface PostgresConnection {
execute(
query: string,
args?: (string | number | boolean | null)[]
): Promise<{ rows: Record<string, unknown>[] }>;
}
```

Internally the providers use `?` placeholders; when a `pg`-style client is passed, those are rewritten to `$1, $2, …` automatically.

### Search

Two levels of search are available:

- **Message search** — `PostgresSessionProvider.searchMessages()` searches conversation history via the `content_tsv` column on `assistant_messages`.
- **Knowledge search** — `PostgresSearchProvider` provides a searchable context block backed by `cf_agents_search_entries`. The LLM can index content via `set_context` and query it via `search_context`. Uses `tsvector` + GIN index with English stemming and `ts_rank` for relevance ranking.

The migration SQL above includes both tables with tsvector columns and GIN indexes — search works out of the box.

---

## Utilities

Exported from `agents/experimental/memory/utils`:
Expand Down Expand Up @@ -719,6 +911,9 @@ import {
AgentContextProvider,
AgentSearchProvider,
R2SkillProvider,
PostgresSessionProvider,
PostgresContextProvider,
PostgresSearchProvider,

// Type guards
isWritableProvider,
Expand All @@ -741,7 +936,8 @@ import {
type SearchResult,
type SessionProvider,
type StoredCompaction,
type SqlProvider
type SqlProvider,
type PostgresConnection
} from "agents/experimental/memory/session";
```

Expand Down
4 changes: 2 additions & 2 deletions examples/resumable-stream-chat/src/client.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,12 @@ function Chat() {
ChatMessage
>({
agent,
onData(part) {
onData(part: { type: string; data: unknown }) {
// Capture transient thinking parts from the onData callback.
// These are ephemeral — not persisted and not in message.parts.
if (part.type === "data-thinking") {
// part.data is typed as ThinkingData here — no cast needed
setThinkingData(part.data);
setThinkingData(part.data as ThinkingData);
}
}
});
Expand Down
12 changes: 6 additions & 6 deletions experimental/session-memory/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class ChatAgent extends Agent<Env> {
parts: [{ type: "text", text: message }]
});

const history = this.session.getHistory();
const history = await this.session.getHistory();
const truncated = truncateOlderMessages(history);

const result = streamText({
Expand Down Expand Up @@ -139,18 +139,18 @@ export class ChatAgent extends Agent<Env> {
}

@callable()
getMessages(): UIMessage[] {
return this.session.getHistory() as UIMessage[];
async getMessages(): Promise<UIMessage[]> {
return (await this.session.getHistory()) as UIMessage[];
}

@callable()
search(query: string) {
async search(query: string) {
return this.session.search(query);
}

@callable()
clearMessages(): void {
this.session.clearMessages();
async clearMessages(): Promise<void> {
await this.session.clearMessages();
}
}

Expand Down
10 changes: 5 additions & 5 deletions experimental/session-multichat/src/server.ts
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ export class MultiSessionAgent extends Agent<Env> {
}

@callable()
deleteChat(chatId: string) {
this.manager.delete(chatId);
async deleteChat(chatId: string) {
await this.manager.delete(chatId);
}

// ── Chat ──────────────────────────────────────────────────────
Expand All @@ -100,7 +100,7 @@ export class MultiSessionAgent extends Agent<Env> {
parts: [{ type: "text", text: message }]
});

const history = session.getHistory();
const history = await session.getHistory();
const truncated = truncateOlderMessages(history);

const result = streamText({
Expand Down Expand Up @@ -148,8 +148,8 @@ export class MultiSessionAgent extends Agent<Env> {
}

@callable()
getHistory(chatId: string): UIMessage[] {
return this.manager.getSession(chatId).getHistory() as UIMessage[];
async getHistory(chatId: string): Promise<UIMessage[]> {
return (await this.manager.getSession(chatId).getHistory()) as UIMessage[];
}

@callable()
Expand Down
79 changes: 79 additions & 0 deletions experimental/session-planetscale/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Postgres Session Example

Agent with session history stored in an external Postgres database via [Cloudflare Hyperdrive](https://developers.cloudflare.com/hyperdrive/) instead of Durable Object SQLite.

## Why external Postgres?

DO SQLite is great for per-user state, but sessions live and die with the DO. An external database gives you:

- **Cross-DO queries** — search across all conversations from any Worker
- **Analytics** — run SQL against your conversation data directly
- **Decoupled lifecycle** — session data survives DO eviction, migration, and resets
- **Shared state** — multiple DOs or services can read/write the same session tables

## Setup

### 1. Create a Postgres database

Use any Postgres provider (Neon, Supabase, PlanetScale, etc.) and copy the connection string.

### 2. Create a Hyperdrive config

```bash
npx wrangler hyperdrive create my-session-db \
--connection-string="postgresql://user:password@host:port/dbname"
```

Update `wrangler.jsonc` with the returned Hyperdrive ID.

### 3. Create the tables

Run the migration SQL from [docs/sessions.md](../../docs/sessions.md#3-create-the-tables) in your database console. The providers do not auto-create tables — migrations are managed by you.

### 4. Deploy

```bash
npm install
npm run deploy
```

## How it works

The key difference from the standard `session-memory` example:

```ts
// Standard: auto-wires to DO SQLite
const session = Session.create(this)
.withContext("memory", { maxTokens: 1100 })
.withCachedPrompt();

// Postgres: pass providers explicitly
const conn = wrapPgClient(pgClient);

const session = Session.create(new PostgresSessionProvider(conn, sessionId))
.withContext("memory", {
maxTokens: 1100,
provider: new PostgresContextProvider(conn, `memory_${sessionId}`)
})
.withContext("knowledge", {
provider: new PostgresSearchProvider(conn)
})
.withCachedPrompt(new PostgresContextProvider(conn, `_prompt_${sessionId}`));
```

When `Session.create()` receives a `SessionProvider` (not a `SqlProvider`), it skips all SQLite auto-wiring. Context blocks and the prompt cache need explicit providers since there's no DO storage to fall back to.

## Connection interface

The providers use `?` placeholders internally. This example wraps the `pg` driver to convert them to `$1, $2, ...`:

```ts
interface PostgresConnection {
execute(
query: string,
args?: (string | number | boolean | null)[]
): Promise<{ rows: Record<string, unknown>[] }>;
}
```

Any Postgres driver with a compatible `execute()` method works.
7 changes: 7 additions & 0 deletions experimental/session-planetscale/env.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
declare namespace Cloudflare {
interface Env {
AI: Ai;
HYPERDRIVE: Hyperdrive;
}
}
interface Env extends Cloudflare.Env {}
Loading
Loading