Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/healthy-feet-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite': patch
---

Updated live.incrementalQuery to maintain object references across changes.
5 changes: 5 additions & 0 deletions .changeset/olive-clouds-knock.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@electric-sql/pglite': patch
---

Fixed truncated result set when overwriting incrementalQuery items at same location within a transaction.
69 changes: 37 additions & 32 deletions packages/pglite/src/live/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ import type {
Results,
Transaction,
} from '../interface'
import { debounceMutex, DoublyLinkedList, formatQuery, uuid } from '../utils.js'
import type {
LiveQueryOptions,
LiveIncrementalQueryOptions,
Change,
LiveChanges,
LiveChangesOptions,
LiveIncrementalQueryOptions,
LiveNamespace,
LiveQuery,
LiveChanges,
Change,
LiveQueryOptions,
LiveQueryResults,
} from './interface'
import { uuid, formatQuery, debounceMutex } from '../utils.js'

export type {
Change,
LiveChanges,
LiveNamespace,
LiveQuery,
LiveChanges,
Change,
LiveQueryResults,
} from './interface.js'

Expand Down Expand Up @@ -326,7 +326,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
...(
await tx.query<any>(`
SELECT column_name, data_type, udt_name
FROM information_schema.columns
FROM information_schema.columns
WHERE table_name = 'live_query_${id}_view'
`)
).rows,
Expand All @@ -349,7 +349,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}),
data_diff AS (
-- INSERT operations: Include all columns
SELECT
SELECT
'INSERT' AS __op__,
${columns
.map(
Expand All @@ -363,7 +363,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
WHERE prev.${key} IS NULL
UNION ALL
-- DELETE operations: Include only the primary key
SELECT
SELECT
'DELETE' AS __op__,
${columns
.map(({ column_name, data_type, udt_name }) => {
Expand All @@ -380,14 +380,14 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
WHERE curr.${key} IS NULL
UNION ALL
-- UPDATE operations: Include only changed columns
SELECT
SELECT
'UPDATE' AS __op__,
${columns
.map(({ column_name, data_type, udt_name }) =>
column_name === key
? `curr."${column_name}" AS "${column_name}"`
: `CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
: `CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN curr."${column_name}"
ELSE NULL${data_type === 'USER-DEFINED' ? `::${udt_name}` : ``}
END AS "${column_name}"`,
Expand All @@ -398,9 +398,9 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
.map(
({ column_name }) =>
`CASE
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN '${column_name}'
ELSE NULL
WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}"
THEN '${column_name}'
ELSE NULL
END`,
)
.join(
Expand Down Expand Up @@ -440,7 +440,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
await pg.transaction(async (tx) => {
// Populate the state table
await tx.exec(`
INSERT INTO live_query_${id}_state${stateSwitch}
INSERT INTO live_query_${id}_state${stateSwitch}
SELECT * FROM live_query_${id}_view;
`)

Expand Down Expand Up @@ -575,7 +575,8 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
? [callback]
: []
const rowsMap: Map<any, any> = new Map()
const afterMap: Map<any, any> = new Map()
const idList = new DoublyLinkedList<any>()
const rowCache = new WeakMap<any, any>()
let lastRows: T[] = []
let firstRun = true

Expand All @@ -594,28 +595,27 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
switch (op) {
case 'RESET':
rowsMap.clear()
afterMap.clear()
idList.clear()
break
case 'INSERT':
rowsMap.set(obj[key], obj)
afterMap.set(obj.__after__, obj[key])

idList.insert(obj[key], obj.__after__)
break
case 'DELETE': {
const oldObj = rowsMap.get(obj[key])
case 'DELETE':
rowsMap.delete(obj[key])
// null is the starting point, we don't delete it as another insert
// may have happened thats replacing it
if (oldObj.__after__ !== null) {
afterMap.delete(oldObj.__after__)
}

idList.delete(obj[key])
break
}
case 'UPDATE': {
const newObj = { ...(rowsMap.get(obj[key]) ?? {}) }
const oldObj = rowsMap.get(obj[key])
const newObj = { ...(oldObj ?? {}) }

for (const columnName of changedColumns) {
newObj[columnName] = obj[columnName]
if (columnName === '__after__') {
afterMap.set(obj.__after__, obj[key])
idList.delete(obj[key])
idList.insert(newObj[key], newObj.__after__)
}
}
rowsMap.set(obj[key], newObj)
Expand All @@ -628,13 +628,18 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => {
const rows: T[] = []
let lastKey: any = null
for (let i = 0; i < rowsMap.size; i++) {
const nextKey = afterMap.get(lastKey)
const nextKey = idList.getAfter(lastKey)
const obj = rowsMap.get(nextKey)
if (!obj) {
break
}
// Remove the __after__ key from the exposed row
const cleanObj = { ...obj }
const cleanObj = rowCache.get(obj) ?? { ...obj }

if (!rowCache.has(obj)) {
rowCache.set(obj, cleanObj)
}

delete cleanObj.__after__
rows.push(cleanObj)
lastKey = nextKey
Expand Down
48 changes: 47 additions & 1 deletion packages/pglite/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { PGliteInterface, Transaction } from './interface.js'
import { serialize as serializeProtocol } from '@electric-sql/pg-protocol'
import type { PGliteInterface, Transaction } from './interface.js'
import { parseDescribeStatementResults } from './parse.js'
import { TEXT } from './types.js'

Expand Down Expand Up @@ -247,3 +247,49 @@ export function toPostgresName(input: string): string {
}
return output
}

export class DoublyLinkedList<T> {
#afterMap = new Map<T | null, T>()
#beforeMap = new Map<T | null, T>()

clear() {
this.#afterMap.clear()
this.#beforeMap.clear()
}

getAfter(afterId: T) {
return this.#afterMap.get(afterId)
}

insert(id: T, afterId: T) {
const existingNext = this.#afterMap.get(afterId)
if (existingNext !== undefined) {
this.#afterMap.set(id, existingNext)
this.#beforeMap.set(existingNext, id)
}
this.#afterMap.set(afterId, id)
this.#beforeMap.set(id, afterId)
}

delete(id: T) {
const prevKey = this.#beforeMap.get(id)
const nextKey = this.#afterMap.get(id)

if (prevKey !== null && prevKey !== undefined) {
if (nextKey !== null && nextKey !== undefined) {
this.#afterMap.set(prevKey, nextKey)
this.#beforeMap.set(nextKey, prevKey)
} else {
this.#afterMap.delete(prevKey)
}
} else {
if (nextKey === null || prevKey === undefined) {
this.#afterMap.delete(prevKey!)
}
this.#beforeMap.delete(nextKey!)
}

this.#afterMap.delete(id)
this.#beforeMap.delete(id)
}
}
72 changes: 69 additions & 3 deletions packages/pglite/tests/live.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, it, expect } from 'vitest'
import { describe, expect, it } from 'vitest'
import { testEsmCjsAndDTC } from './test-utils.ts'

await testEsmCjsAndDTC(async (importType) => {
Expand Down Expand Up @@ -461,6 +461,9 @@ await testEsmCjsAndDTC(async (importType) => {
eventTarget.addEventListener('change', resolve, { once: true }),
)

// Check that references haven't changed between updates.
expect(initialResults.rows[0]).toBe(updatedResults.rows[0])

expect(updatedResults.rows).toEqual([
{ id: 1, number: 10 },
{ id: 2, number: 20 },
Expand Down Expand Up @@ -498,6 +501,69 @@ await testEsmCjsAndDTC(async (importType) => {
])
})

it('basic live incremental query with overwrites', async () => {
const db = await PGlite.create({
extensions: { live },
})

await db.exec(`
CREATE TABLE IF NOT EXISTS testTable (
id INT PRIMARY KEY,
number INT
);
`)

await db.exec(`
CREATE TABLE IF NOT EXISTS childTable (
id INT PRIMARY KEY,
parentId INT
);
`)

await db.exec(`
INSERT INTO testTable (id, number)
SELECT i, i*10 AS number FROM generate_series(1, 5) AS t(i);
`)

let updatedResults
const eventTarget = new EventTarget()

const { initialResults } = await db.live.incrementalQuery(
'SELECT * FROM testTable WHERE number <= 50 ORDER BY number;',
[],
'id',
(result) => {
updatedResults = result
eventTarget.dispatchEvent(new Event('change'))
},
)

expect(initialResults.rows).toEqual([
{ id: 1, number: 10 },
{ id: 2, number: 20 },
{ id: 3, number: 30 },
{ id: 4, number: 40 },
{ id: 5, number: 50 },
])

await db.transaction(async (tx) => {
await tx.exec('DELETE FROM testTable WHERE id = 3;')
await tx.exec('INSERT INTO testTable (id, number) VALUES (6, 35);')
})

await new Promise((resolve) =>
eventTarget.addEventListener('change', resolve, { once: true }),
)

expect(updatedResults.rows).toEqual([
{ id: 1, number: 10 },
{ id: 2, number: 20 },
{ id: 6, number: 35 },
{ id: 4, number: 40 },
{ id: 5, number: 50 },
])
})

it('basic live incremental query with limit 1', async () => {
const db = await PGlite.create({
extensions: { live },
Expand Down Expand Up @@ -1292,9 +1358,9 @@ await testEsmCjsAndDTC(async (importType) => {
const { initialResults, unsubscribe } = await db.live.query(
`SELECT
id,
statement
statement
FROM testTable
WHERE
WHERE
statement ILIKE '%pglite%'
ORDER BY id;`,
[],
Expand Down
Loading