diff --git a/.changeset/healthy-feet-exercise.md b/.changeset/healthy-feet-exercise.md new file mode 100644 index 000000000..fd37bf4d4 --- /dev/null +++ b/.changeset/healthy-feet-exercise.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite': patch +--- + +Updated live.incrementalQuery to maintain object references across changes. diff --git a/.changeset/olive-clouds-knock.md b/.changeset/olive-clouds-knock.md new file mode 100644 index 000000000..f27ccbc9b --- /dev/null +++ b/.changeset/olive-clouds-knock.md @@ -0,0 +1,5 @@ +--- +'@electric-sql/pglite': patch +--- + +Fixed truncated result set when overwriting incrementalQuery items at same location within a transaction. diff --git a/packages/pglite/src/live/index.ts b/packages/pglite/src/live/index.ts index b698dd587..484999af3 100644 --- a/packages/pglite/src/live/index.ts +++ b/packages/pglite/src/live/index.ts @@ -4,23 +4,23 @@ import type { Results, Transaction, } from '../interface' +import { debounceMutex, DoublyLinkedList, formatQuery, uuid } from '../utils.js' import type { - LiveQueryOptions, - LiveIncrementalQueryOptions, + Change, + LiveChanges, LiveChangesOptions, + LiveIncrementalQueryOptions, LiveNamespace, LiveQuery, - LiveChanges, - Change, + LiveQueryOptions, LiveQueryResults, } from './interface' -import { uuid, formatQuery, debounceMutex } from '../utils.js' export type { + Change, + LiveChanges, LiveNamespace, LiveQuery, - LiveChanges, - Change, LiveQueryResults, } from './interface.js' @@ -326,7 +326,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { ...( await tx.query(` SELECT column_name, data_type, udt_name - FROM information_schema.columns + FROM information_schema.columns WHERE table_name = 'live_query_${id}_view' `) ).rows, @@ -349,7 +349,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { curr AS (SELECT LAG("${key}") OVER () as __after__, * FROM live_query_${id}_state${curr}), data_diff AS ( -- INSERT operations: Include all columns - SELECT + SELECT 'INSERT' AS __op__, ${columns .map( @@ -363,7 +363,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { WHERE prev.${key} IS NULL UNION ALL -- DELETE operations: Include only the primary key - SELECT + SELECT 'DELETE' AS __op__, ${columns .map(({ column_name, data_type, udt_name }) => { @@ -380,14 +380,14 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { WHERE curr.${key} IS NULL UNION ALL -- UPDATE operations: Include only changed columns - SELECT + SELECT 'UPDATE' AS __op__, ${columns .map(({ column_name, data_type, udt_name }) => column_name === key ? `curr."${column_name}" AS "${column_name}"` - : `CASE - WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" + : `CASE + WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" THEN curr."${column_name}" ELSE NULL${data_type === 'USER-DEFINED' ? `::${udt_name}` : ``} END AS "${column_name}"`, @@ -398,9 +398,9 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { .map( ({ column_name }) => `CASE - WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" - THEN '${column_name}' - ELSE NULL + WHEN curr."${column_name}" IS DISTINCT FROM prev."${column_name}" + THEN '${column_name}' + ELSE NULL END`, ) .join( @@ -440,7 +440,7 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { await pg.transaction(async (tx) => { // Populate the state table await tx.exec(` - INSERT INTO live_query_${id}_state${stateSwitch} + INSERT INTO live_query_${id}_state${stateSwitch} SELECT * FROM live_query_${id}_view; `) @@ -575,7 +575,8 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { ? [callback] : [] const rowsMap: Map = new Map() - const afterMap: Map = new Map() + const idList = new DoublyLinkedList() + const rowCache = new WeakMap() let lastRows: T[] = [] let firstRun = true @@ -594,28 +595,27 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { switch (op) { case 'RESET': rowsMap.clear() - afterMap.clear() + idList.clear() break case 'INSERT': rowsMap.set(obj[key], obj) - afterMap.set(obj.__after__, obj[key]) + + idList.insert(obj[key], obj.__after__) break - case 'DELETE': { - const oldObj = rowsMap.get(obj[key]) + case 'DELETE': rowsMap.delete(obj[key]) - // null is the starting point, we don't delete it as another insert - // may have happened thats replacing it - if (oldObj.__after__ !== null) { - afterMap.delete(oldObj.__after__) - } + + idList.delete(obj[key]) break - } case 'UPDATE': { - const newObj = { ...(rowsMap.get(obj[key]) ?? {}) } + const oldObj = rowsMap.get(obj[key]) + const newObj = { ...(oldObj ?? {}) } + for (const columnName of changedColumns) { newObj[columnName] = obj[columnName] if (columnName === '__after__') { - afterMap.set(obj.__after__, obj[key]) + idList.delete(obj[key]) + idList.insert(newObj[key], newObj.__after__) } } rowsMap.set(obj[key], newObj) @@ -628,13 +628,18 @@ const setup = async (pg: PGliteInterface, _emscriptenOpts: any) => { const rows: T[] = [] let lastKey: any = null for (let i = 0; i < rowsMap.size; i++) { - const nextKey = afterMap.get(lastKey) + const nextKey = idList.getAfter(lastKey) const obj = rowsMap.get(nextKey) if (!obj) { break } // Remove the __after__ key from the exposed row - const cleanObj = { ...obj } + const cleanObj = rowCache.get(obj) ?? { ...obj } + + if (!rowCache.has(obj)) { + rowCache.set(obj, cleanObj) + } + delete cleanObj.__after__ rows.push(cleanObj) lastKey = nextKey diff --git a/packages/pglite/src/utils.ts b/packages/pglite/src/utils.ts index 0cbde2f34..c4b7bd62e 100644 --- a/packages/pglite/src/utils.ts +++ b/packages/pglite/src/utils.ts @@ -1,5 +1,5 @@ -import type { PGliteInterface, Transaction } from './interface.js' import { serialize as serializeProtocol } from '@electric-sql/pg-protocol' +import type { PGliteInterface, Transaction } from './interface.js' import { parseDescribeStatementResults } from './parse.js' import { TEXT } from './types.js' @@ -247,3 +247,49 @@ export function toPostgresName(input: string): string { } return output } + +export class DoublyLinkedList { + #afterMap = new Map() + #beforeMap = new Map() + + clear() { + this.#afterMap.clear() + this.#beforeMap.clear() + } + + getAfter(afterId: T) { + return this.#afterMap.get(afterId) + } + + insert(id: T, afterId: T) { + const existingNext = this.#afterMap.get(afterId) + if (existingNext !== undefined) { + this.#afterMap.set(id, existingNext) + this.#beforeMap.set(existingNext, id) + } + this.#afterMap.set(afterId, id) + this.#beforeMap.set(id, afterId) + } + + delete(id: T) { + const prevKey = this.#beforeMap.get(id) + const nextKey = this.#afterMap.get(id) + + if (prevKey !== null && prevKey !== undefined) { + if (nextKey !== null && nextKey !== undefined) { + this.#afterMap.set(prevKey, nextKey) + this.#beforeMap.set(nextKey, prevKey) + } else { + this.#afterMap.delete(prevKey) + } + } else { + if (nextKey === null || prevKey === undefined) { + this.#afterMap.delete(prevKey!) + } + this.#beforeMap.delete(nextKey!) + } + + this.#afterMap.delete(id) + this.#beforeMap.delete(id) + } +} diff --git a/packages/pglite/tests/live.test.ts b/packages/pglite/tests/live.test.ts index f955ee9d7..5307f1078 100644 --- a/packages/pglite/tests/live.test.ts +++ b/packages/pglite/tests/live.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from 'vitest' +import { describe, expect, it } from 'vitest' import { testEsmCjsAndDTC } from './test-utils.ts' await testEsmCjsAndDTC(async (importType) => { @@ -461,6 +461,9 @@ await testEsmCjsAndDTC(async (importType) => { eventTarget.addEventListener('change', resolve, { once: true }), ) + // Check that references haven't changed between updates. + expect(initialResults.rows[0]).toBe(updatedResults.rows[0]) + expect(updatedResults.rows).toEqual([ { id: 1, number: 10 }, { id: 2, number: 20 }, @@ -498,6 +501,69 @@ await testEsmCjsAndDTC(async (importType) => { ]) }) + it('basic live incremental query with overwrites', async () => { + const db = await PGlite.create({ + extensions: { live }, + }) + + await db.exec(` + CREATE TABLE IF NOT EXISTS testTable ( + id INT PRIMARY KEY, + number INT + ); + `) + + await db.exec(` + CREATE TABLE IF NOT EXISTS childTable ( + id INT PRIMARY KEY, + parentId INT + ); + `) + + await db.exec(` + INSERT INTO testTable (id, number) + SELECT i, i*10 AS number FROM generate_series(1, 5) AS t(i); + `) + + let updatedResults + const eventTarget = new EventTarget() + + const { initialResults } = await db.live.incrementalQuery( + 'SELECT * FROM testTable WHERE number <= 50 ORDER BY number;', + [], + 'id', + (result) => { + updatedResults = result + eventTarget.dispatchEvent(new Event('change')) + }, + ) + + expect(initialResults.rows).toEqual([ + { id: 1, number: 10 }, + { id: 2, number: 20 }, + { id: 3, number: 30 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]) + + await db.transaction(async (tx) => { + await tx.exec('DELETE FROM testTable WHERE id = 3;') + await tx.exec('INSERT INTO testTable (id, number) VALUES (6, 35);') + }) + + await new Promise((resolve) => + eventTarget.addEventListener('change', resolve, { once: true }), + ) + + expect(updatedResults.rows).toEqual([ + { id: 1, number: 10 }, + { id: 2, number: 20 }, + { id: 6, number: 35 }, + { id: 4, number: 40 }, + { id: 5, number: 50 }, + ]) + }) + it('basic live incremental query with limit 1', async () => { const db = await PGlite.create({ extensions: { live }, @@ -1292,9 +1358,9 @@ await testEsmCjsAndDTC(async (importType) => { const { initialResults, unsubscribe } = await db.live.query( `SELECT id, - statement + statement FROM testTable - WHERE + WHERE statement ILIKE '%pglite%' ORDER BY id;`, [],