From 6ef0cb78b89962f9d40761473077485c10513e42 Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 01:00:33 +0200 Subject: [PATCH 1/8] check stale records, fetch if they exist --- apps/service/src/temporal/activities/index.ts | 2 + .../temporal/activities/reconcile-cleanup.ts | 93 +++++++++ .../service/src/temporal/workflows/_shared.ts | 6 + e2e/stripe-reconcile-cleanup.test.ts | 179 ++++++++++++++++++ packages/destination-postgres/src/index.ts | 36 ++++ packages/protocol/src/protocol.ts | 26 +++ packages/source-stripe/src/index.ts | 108 ++++++++++- 7 files changed, 449 insertions(+), 1 deletion(-) create mode 100644 apps/service/src/temporal/activities/reconcile-cleanup.ts create mode 100644 e2e/stripe-reconcile-cleanup.test.ts diff --git a/apps/service/src/temporal/activities/index.ts b/apps/service/src/temporal/activities/index.ts index 5179b9912..c1decfed2 100644 --- a/apps/service/src/temporal/activities/index.ts +++ b/apps/service/src/temporal/activities/index.ts @@ -4,6 +4,7 @@ import { createDiscoverCatalogActivity } from './discover-catalog.js' import { createPipelineSetupActivity } from './pipeline-setup.js' import { createPipelineSyncActivity } from './pipeline-sync.js' import { createPipelineTeardownActivity } from './pipeline-teardown.js' +import { createReconcileCleanupActivity } from './reconcile-cleanup.js' import type { PipelineStore } from '../../lib/stores.js' export function createActivities(opts: { engineUrl: string; pipelineStore: PipelineStore }) { @@ -15,6 +16,7 @@ export function createActivities(opts: { engineUrl: string; pipelineStore: Pipel pipelineSync: createPipelineSyncActivity(context), pipelineTeardown: createPipelineTeardownActivity(context), updatePipelineStatus: createUpdatePipelineStatusActivity(context), + reconcileCleanup: createReconcileCleanupActivity(context), } } diff --git a/apps/service/src/temporal/activities/reconcile-cleanup.ts b/apps/service/src/temporal/activities/reconcile-cleanup.ts new file mode 100644 index 000000000..4824c4ca1 --- /dev/null +++ b/apps/service/src/temporal/activities/reconcile-cleanup.ts @@ -0,0 +1,93 @@ +import { heartbeat } from '@temporalio/activity' +import { createStripeSource, type Config as StripeSourceConfig } from '@stripe/sync-source-stripe' +import destinationPostgres, { + type Config as PostgresDestConfig, +} from '@stripe/sync-destination-postgres' +import type { ActivitiesContext } from './_shared.js' +import { log } from '../../logger.js' + +export function createReconcileCleanupActivity(context: ActivitiesContext) { + return async function reconcileCleanup( + pipelineId: string, + syncRunStartedAt: string + ): Promise { + const pipeline = await context.pipelineStore.get(pipelineId) + const { source, destination, streams } = pipeline + + if (destination.type !== 'postgres' || source.type !== 'stripe') { + // Only stripe→postgres is supported today. + return + } + + // Configs were validated against connector schemas at pipeline create time, + // so the runtime shape matches the connector's strict Config type. + const sourceConfig = source[source.type] as unknown as StripeSourceConfig + const destConfig = destination[destination.type] as unknown as PostgresDestConfig + + const catalog = { + streams: + streams?.map((s) => ({ + stream: { name: s.name, newer_than_field: '_updated_at', primary_key: [['id']] }, + sync_mode: s.sync_mode || 'incremental', + destination_sync_mode: 'append_dedup' as const, + })) ?? [], + } + if (catalog.streams.length === 0) return + + // Restrict cleanup to records owned by this Stripe account so multi-tenant + // schemas don't accidentally hard-delete rows that belong to a sibling sync. + const filter = sourceConfig.account_id ? { _account_id: sourceConfig.account_id } : undefined + if (!filter) { + log.warn( + { pipelineId }, + 'reconcile_cleanup: source has no account_id — running unscoped (unsafe in multi-tenant schemas)' + ) + } + + const stripeSource = createStripeSource() + + try { + heartbeat({ phase: 'starting', pipelineId }) + + // Wrap the destination's batches so we heartbeat per stream. + async function* heartbeatedStaleRecords() { + const inner = destinationPostgres.getStaleRecords!({ + config: destConfig, + catalog, + syncRunStartedAt, + filter, + }) + for await (const batch of inner) { + heartbeat({ phase: 'verifying', stream: batch.stream, ids: batch.ids.length }) + yield batch + } + } + + const verificationMessages = stripeSource.verifyRecords!( + { config: sourceConfig, catalog }, + heartbeatedStaleRecords() + ) + + const writeOutput = destinationPostgres.write( + { config: destConfig, catalog }, + verificationMessages + ) + + let deleteCount = 0 + let lastHb = Date.now() + for await (const m of writeOutput) { + if (m.type === 'record' && m.record.recordDeleted) deleteCount++ + if (Date.now() - lastHb >= 15_000) { + heartbeat({ phase: 'writing', deletes: deleteCount }) + lastHb = Date.now() + } + } + + log.info({ pipelineId, deleteCount, syncRunStartedAt }, 'reconcile_cleanup: completed') + } catch (err) { + // Cleanup is best-effort — log and swallow so the workflow's reconcile + // loop keeps running on the next interval. + log.error({ err, pipelineId, syncRunStartedAt }, 'reconcile_cleanup: failed') + } + } +} diff --git a/apps/service/src/temporal/workflows/_shared.ts b/apps/service/src/temporal/workflows/_shared.ts index c3f09d46f..1500ae7e3 100644 --- a/apps/service/src/temporal/workflows/_shared.ts +++ b/apps/service/src/temporal/workflows/_shared.ts @@ -19,6 +19,12 @@ export const { pipelineSync } = proxyActivities({ retry: retryPolicy, }) +export const { reconcileCleanup } = proxyActivities({ + startToCloseTimeout: '1h', // Could take a while due to rate limiting + heartbeatTimeout: '2m', + retry: retryPolicy, +}) + export const { discoverCatalog } = proxyActivities({ startToCloseTimeout: '10m', heartbeatTimeout: '2m', diff --git a/e2e/stripe-reconcile-cleanup.test.ts b/e2e/stripe-reconcile-cleanup.test.ts new file mode 100644 index 000000000..10d32a129 --- /dev/null +++ b/e2e/stripe-reconcile-cleanup.test.ts @@ -0,0 +1,179 @@ +/** + * Verifies the Temporal `reconcileCleanup` activity tombstones rows for + * records that were hard-deleted in Stripe without the corresponding + * `*.deleted` event being processed — the "missed delete" path that + * complements stripe-delete.test.ts (the event-driven path). + * + * Seeds destination rows via in-process engine, then runs the production + * activity through `MockActivityEnvironment` so the composition + * (`pg.getStaleRecords` → `stripe.verifyRecords` → `pg.write`) is exercised + * end-to-end with a Temporal Activity Context active (heartbeats become no-ops). + */ +import pg from 'pg' +import Stripe from 'stripe' +import { afterAll, beforeAll, expect, it } from 'vitest' +import { MockActivityEnvironment } from '@temporalio/testing' +import source from '@stripe/sync-source-stripe' +import destinationPostgres from '@stripe/sync-destination-postgres' +import { createEngine } from '@stripe/sync-engine' +import type { ConnectorResolver } from '@stripe/sync-engine' +import { createActivities } from '@stripe/sync-service' +import type { Pipeline } from '@stripe/sync-service' +import { drain } from '@stripe/sync-protocol' +import { describeWithEnv } from './test-helpers.js' + +const POSTGRES_URL = + process.env.POSTGRES_URL ?? 'postgresql://postgres:postgres@localhost:5432/postgres' +const ts = new Date() + .toISOString() + .replace(/[-:T.Z]/g, '') + .slice(0, 15) +const STREAM = 'customer' +const BACKFILL_LIMIT = 10 + +function memoryPipelineStore() { + const data = new Map() + return { + async get(id: string) { + const p = data.get(id) + if (!p) throw new Error(`Pipeline not found: ${id}`) + return p + }, + async set(id: string, pipeline: Pipeline) { + data.set(id, pipeline) + }, + async update(id: string, patch: Partial>) { + const existing = data.get(id) + if (!existing) throw new Error(`Pipeline not found: ${id}`) + const updated = { ...existing, ...patch, id } as Pipeline + data.set(id, updated) + return updated + }, + async delete(id: string) { + data.delete(id) + }, + async list() { + return [...data.values()] + }, + } +} + +describeWithEnv( + 'temporal reconcile-cleanup activity → postgres (missed delete)', + ['STRIPE_API_KEY'], + ({ STRIPE_API_KEY }) => { + const SCHEMA = `e2e_recon_pg_${ts}` + const PIPELINE_ID = `pipe_recon_${ts}` + let pool: pg.Pool + let stripe: Stripe + + const sourceConfig = { api_key: STRIPE_API_KEY, backfill_limit: BACKFILL_LIMIT } + const destConfig = { url: POSTGRES_URL, schema: SCHEMA, batch_size: 100 } + + const resolver: ConnectorResolver = { + resolveSource: async (name) => { + if (name !== 'stripe') throw new Error(`Unknown source: ${name}`) + return source + }, + resolveDestination: async (name) => { + if (name !== 'postgres') throw new Error(`Unknown destination: ${name}`) + return destinationPostgres + }, + sources: () => new Map(), + destinations: () => new Map(), + } + + function makePipeline() { + return { + source: { type: 'stripe', stripe: sourceConfig }, + destination: { type: 'postgres', postgres: destConfig }, + streams: [{ name: STREAM }], + } + } + + beforeAll(async () => { + pool = new pg.Pool({ connectionString: POSTGRES_URL }) + await pool.query('SELECT 1') + await pool.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + stripe = new Stripe(STRIPE_API_KEY) + const account = await stripe.accounts.retrieve() + console.log(`\n Postgres: ${POSTGRES_URL} (schema: ${SCHEMA})`) + console.log(` Stripe account: ${account.id}`) + console.log(` Pipeline: ${PIPELINE_ID}`) + }) + + afterAll(async () => { + if (!pool) return + if (!process.env.KEEP_TEST_DATA) { + await pool.query(`DROP SCHEMA IF EXISTS "${SCHEMA}" CASCADE`) + } + await pool.end() + }) + + it('tombstones customers deleted in stripe without a delete event', async () => { + const engine = await createEngine(resolver) + const pipeline = makePipeline() + const pipelineStore = memoryPipelineStore() + await pipelineStore.set(PIPELINE_ID, { id: PIPELINE_ID, ...pipeline } as Pipeline) + + await drain(engine.pipeline_setup(pipeline)) + + const survivor = await stripe.customers.create({ + name: `e2e-recon-survivor-${Date.now()}`, + }) + const doomed = await stripe.customers.create({ + name: `e2e-recon-doomed-${Date.now()}`, + }) + const cleanupIds = new Set([survivor.id, doomed.id]) + + try { + // Backfill-only sync (no websocket, no event polling) — both rows + // land in postgres with `_synced_at ≈ T0`. + for await (const _msg of engine.pipeline_sync(pipeline)) { + void _msg + } + + const seeded = await pool.query<{ id: string }>( + `SELECT id FROM "${SCHEMA}"."${STREAM}" WHERE id = ANY($1)`, + [[survivor.id, doomed.id]] + ) + expect(new Set(seeded.rows.map((r) => r.id))).toEqual(new Set([survivor.id, doomed.id])) + + // Hard-delete one customer WITHOUT replaying the customer.deleted + // event — this is the "missed delete" reconcile-cleanup catches. + await stripe.customers.del(doomed.id) + cleanupIds.delete(doomed.id) + + // `_synced_at` is set with millisecond precision by the destination, + // so a small forward skew guarantees `syncRunStartedAt > _synced_at`. + await new Promise((r) => setTimeout(r, 50)) + const syncRunStartedAt = new Date().toISOString() + + // engineUrl is unused by reconcileCleanup (it instantiates connectors + // in-process); other activities in the bundle don't run here. + const activities = createActivities({ engineUrl: 'http://unused', pipelineStore }) + + const env = new MockActivityEnvironment() + await env.run(activities.reconcileCleanup, PIPELINE_ID, syncRunStartedAt) + + const after = await pool.query<{ id: string }>( + `SELECT id FROM "${SCHEMA}"."${STREAM}" WHERE id = ANY($1)`, + [[survivor.id, doomed.id]] + ) + const remaining = new Set(after.rows.map((r) => r.id)) + expect(remaining.has(survivor.id), `survivor ${survivor.id} was tombstoned`).toBe(true) + expect(remaining.has(doomed.id), `doomed ${doomed.id} was not tombstoned`).toBe(false) + console.log(` Survived: ${survivor.id}`) + console.log(` Tombstoned: ${doomed.id}`) + } finally { + if (!process.env.KEEP_TEST_DATA) { + for (const id of cleanupIds) { + try { + await stripe.customers.del(id) + } catch {} + } + } + } + }, 180_000) + } +) diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 411f986b4..25c7fa043 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -550,6 +550,42 @@ const destination = { await endPool(pool, 'write') } }, + async *getStaleRecords({ config, catalog, syncRunStartedAt, filter }) { + const pool = await createInstrumentedPool(config, 'getStaleRecords') + const BATCH_SIZE = 1000 + + try { + for (const configuredStream of catalog.streams) { + const streamName = configuredStream.stream.name + const filterEntries = Object.entries(filter ?? {}) + const filterClauses = filterEntries.map(([col], i) => `${ident(col)} = $${i + 2}`) + const whereClauses = ['_synced_at < $1', ...filterClauses].join(' AND ') + const query = sql`SELECT id FROM ${qualifiedTable(config.schema, streamName)} WHERE ${whereClauses}` + const params: unknown[] = [syncRunStartedAt, ...filterEntries.map(([, v]) => v)] + + try { + const result = await pool.query<{ id: string }>(query, params) + log.debug( + { stream: streamName, schema: config.schema, count: result.rows.length }, + 'getStaleRecords: query complete' + ) + if (result.rows.length === 0) continue + + for (let i = 0; i < result.rows.length; i += BATCH_SIZE) { + const batch = result.rows.slice(i, i + BATCH_SIZE).map((r) => r.id) + yield { stream: streamName, ids: batch } + } + } catch (err: unknown) { + const error = err as Error & { code?: string } + // 42P01 = undefined_table — ignore (first run before setup, or stream not yet created). + if (error.code === '42P01') continue + log.error({ stream: streamName, err }, 'getStaleRecords: query failed') + } + } + } finally { + await endPool(pool, 'getStaleRecords') + } + }, } satisfies Destination export default destination diff --git a/packages/protocol/src/protocol.ts b/packages/protocol/src/protocol.ts index 19e4314fb..0a596e3b6 100644 --- a/packages/protocol/src/protocol.ts +++ b/packages/protocol/src/protocol.ts @@ -701,6 +701,11 @@ export type TeardownOutput = z.infer // // Every method returns AsyncIterable — everything is a stream. +export interface StaleRecordsBatch { + stream: string + ids: string[] +} + /** * Reads data from an upstream system by emitting messages. * @@ -744,6 +749,15 @@ export interface Source< $stdin?: AsyncIterable ): AsyncIterable + /** + * Verify whether stale records still exist in the source system. + * Yields RecordMessage with recordDeleted: true for any that are missing. + */ + verifyRecords?( + params: { config: TConfig; catalog: ConfiguredCatalog }, + $stdin: AsyncIterable + ): AsyncIterable + /** Provision external resources (webhook endpoints, replication slots, etc.). */ setup?(params: { config: TConfig; catalog: ConfiguredCatalog }): AsyncIterable @@ -787,6 +801,18 @@ export interface Destination = Record ): AsyncIterable + /** + * Get batches of IDs for records that weren't updated in the current sync run. + * `filter` (e.g. `{ _account_id: 'acct_X' }`) scopes the query so multi-tenant + * schemas only return records owned by the caller. + */ + getStaleRecords?(params: { + config: TConfig + catalog: ConfiguredCatalog + syncRunStartedAt: string + filter?: Record + }): AsyncIterable + /** Provision downstream resources (schemas, tables, etc.). */ setup?(params: { config: TConfig; catalog: ConfiguredCatalog }): AsyncIterable diff --git a/packages/source-stripe/src/index.ts b/packages/source-stripe/src/index.ts index 8d86388a7..6ea5f8ae7 100644 --- a/packages/source-stripe/src/index.ts +++ b/packages/source-stripe/src/index.ts @@ -6,6 +6,7 @@ import type { DiscoverOutput, SetupOutput, TeardownOutput, + StaleRecordsBatch, } from '@stripe/sync-protocol' import { createSourceMessageFactory, withAbortOnReturn } from '@stripe/sync-protocol' import defaultSpec from './spec.js' @@ -13,7 +14,12 @@ import type { Config } from './spec.js' import type { StripeEvent } from './spec.js' import { buildResourceRegistry, EXCLUDED_TABLES } from './resourceRegistry.js' import { catalogFromOpenApi, stampAccountIdEnum } from './catalog.js' -import { BUNDLED_API_VERSION, resolveOpenApiSpec, SpecParser } from '@stripe/sync-openapi' +import { + BUNDLED_API_VERSION, + resolveOpenApiSpec, + SpecParser, + StripeApiRequestError, +} from '@stripe/sync-openapi' import { processStripeEvent } from './process-event.js' import { processWebhookInput, createInputQueue, startWebhookServer } from './src-webhook.js' import { listApiBackfill, errorToConnectionStatus } from './src-list-api.js' @@ -274,6 +280,106 @@ export function createStripeSource( } }, + verifyRecords({ config, catalog }, $stdin) { + return withAbortOnReturn((signal) => + (async function* () { + const apiVersion = config.api_version ?? BUNDLED_API_VERSION + const liveMode = + config.api_key.startsWith('sk_live_') || config.api_key.startsWith('rk_live_') + const maxRequestsPerSecond = config.rate_limit ?? (liveMode ? 50 : 10) + const rateLimiter = externalRateLimiter ?? createInMemoryRateLimiter(maxRequestsPerSecond) + + const client = makeClient({ ...config, api_version: apiVersion }, undefined, signal) + const resolved = await resolveOpenApiSpec({ apiVersion }, makeApiFetch(signal)) + const streamNames = new Set(catalog.streams.map((s) => s.stream.name)) + const registry = buildResourceRegistry( + resolved.spec, + config.api_key, + resolved.apiVersion, + config.base_url, + streamNames, + signal + ) + + let accountId = config.account_id + if (!accountId) { + try { + accountId = (await resolveAccountMetadata(config, client)).accountId + } catch (err) { + log.warn({ err }, 'verifyRecords: failed to resolve account_id') + } + } + + log.info( + { account_id: accountId, streams: catalog.streams.length }, + 'verifyRecords: started' + ) + + for await (const batch of $stdin) { + const resourceConfig = Object.values(registry).find( + (cfg) => cfg.tableName === batch.stream + ) + if (!resourceConfig?.retrieveFn) { + log.warn( + { stream: batch.stream }, + 'verifyRecords: stream has no retrieveFn — skipping' + ) + continue + } + + const startedAt = Date.now() + let confirmedDeleted = 0 + + for (let i = 0; i < batch.ids.length; i += maxRequestsPerSecond) { + const chunk = batch.ids.slice(i, i + maxRequestsPerSecond) + const results = await Promise.all( + chunk.map(async (id) => { + await rateLimiter() + signal?.throwIfAborted() + try { + const result = (await resourceConfig.retrieveFn!(id)) as { + deleted?: boolean + } | null + return result?.deleted === true ? id : null + } catch (err: unknown) { + if (err instanceof StripeApiRequestError && err.status === 404) return id + log.warn({ id, stream: batch.stream, err }, 'verifyRecords: lookup failed') + return null + } + }) + ) + + for (const id of results) { + if (!id) continue + confirmedDeleted++ + const data: Record = { + id, + _updated_at: Math.floor(Date.now() / 1000), + } + if (accountId) data._account_id = accountId + yield msg.record({ + stream: batch.stream, + recordDeleted: true, + data, + emitted_at: new Date().toISOString(), + }) + } + } + + log.info( + { + stream: batch.stream, + checked: batch.ids.length, + deleted: confirmedDeleted, + elapsed_ms: Date.now() - startedAt, + }, + 'verifyRecords: stream done' + ) + } + })() + ) + }, + read({ config, catalog, state }, $stdin?) { return withAbortOnReturn((signal) => (async function* () { From 8dd9c17e85226865dedc21c59e28f71695d381e8 Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 06:01:43 +0200 Subject: [PATCH 2/8] sheet destination --- .../temporal/activities/reconcile-cleanup.ts | 54 ++++-- e2e/stripe-reconcile-cleanup.test.ts | 152 +++++++++++++-- .../src/index.test.ts | 173 ++++++++++++++++++ .../destination-google-sheets/src/index.ts | 76 +++++++- 4 files changed, 431 insertions(+), 24 deletions(-) diff --git a/apps/service/src/temporal/activities/reconcile-cleanup.ts b/apps/service/src/temporal/activities/reconcile-cleanup.ts index 4824c4ca1..87892ef1b 100644 --- a/apps/service/src/temporal/activities/reconcile-cleanup.ts +++ b/apps/service/src/temporal/activities/reconcile-cleanup.ts @@ -3,9 +3,27 @@ import { createStripeSource, type Config as StripeSourceConfig } from '@stripe/s import destinationPostgres, { type Config as PostgresDestConfig, } from '@stripe/sync-destination-postgres' +import destinationSheets, { + type Config as SheetsDestConfig, +} from '@stripe/sync-destination-google-sheets' +import type { Destination } from '@stripe/sync-protocol' import type { ActivitiesContext } from './_shared.js' import { log } from '../../logger.js' +type SupportedDestType = 'postgres' | 'google_sheets' + +function resolveDestination( + type: string +): { destination: Destination>; type: SupportedDestType } | undefined { + if (type === 'postgres') { + return { destination: destinationPostgres as Destination>, type } + } + if (type === 'google_sheets') { + return { destination: destinationSheets as Destination>, type } + } + return undefined +} + export function createReconcileCleanupActivity(context: ActivitiesContext) { return async function reconcileCleanup( pipelineId: string, @@ -14,15 +32,22 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) { const pipeline = await context.pipelineStore.get(pipelineId) const { source, destination, streams } = pipeline - if (destination.type !== 'postgres' || source.type !== 'stripe') { - // Only stripe→postgres is supported today. + if (source.type !== 'stripe') { + // Only stripe sources support verifyRecords today. + return + } + const resolved = resolveDestination(destination.type) + if (!resolved) { + // Destination doesn't implement getStaleRecords yet. return } // Configs were validated against connector schemas at pipeline create time, - // so the runtime shape matches the connector's strict Config type. + // so the runtime shape matches each connector's strict Config type. const sourceConfig = source[source.type] as unknown as StripeSourceConfig - const destConfig = destination[destination.type] as unknown as PostgresDestConfig + const destConfig = destination[destination.type] as unknown as + | PostgresDestConfig + | SheetsDestConfig const catalog = { streams: @@ -39,20 +64,24 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) { const filter = sourceConfig.account_id ? { _account_id: sourceConfig.account_id } : undefined if (!filter) { log.warn( - { pipelineId }, + { pipelineId, destinationType: resolved.type }, 'reconcile_cleanup: source has no account_id — running unscoped (unsafe in multi-tenant schemas)' ) } const stripeSource = createStripeSource() + const dest = resolved.destination + // Guaranteed by `resolveDestination`'s whitelist: every type that resolves + // here is a destination that ships a `getStaleRecords` implementation. + const getStaleRecords = dest.getStaleRecords! try { - heartbeat({ phase: 'starting', pipelineId }) + heartbeat({ phase: 'starting', pipelineId, destinationType: resolved.type }) // Wrap the destination's batches so we heartbeat per stream. async function* heartbeatedStaleRecords() { - const inner = destinationPostgres.getStaleRecords!({ - config: destConfig, + const inner = getStaleRecords({ + config: destConfig as Record, catalog, syncRunStartedAt, filter, @@ -68,8 +97,8 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) { heartbeatedStaleRecords() ) - const writeOutput = destinationPostgres.write( - { config: destConfig, catalog }, + const writeOutput = dest.write( + { config: destConfig as Record, catalog }, verificationMessages ) @@ -83,7 +112,10 @@ export function createReconcileCleanupActivity(context: ActivitiesContext) { } } - log.info({ pipelineId, deleteCount, syncRunStartedAt }, 'reconcile_cleanup: completed') + log.info( + { pipelineId, destinationType: resolved.type, deleteCount, syncRunStartedAt }, + 'reconcile_cleanup: completed' + ) } catch (err) { // Cleanup is best-effort — log and swallow so the workflow's reconcile // loop keeps running on the next interval. diff --git a/e2e/stripe-reconcile-cleanup.test.ts b/e2e/stripe-reconcile-cleanup.test.ts index 10d32a129..ba4476c0a 100644 --- a/e2e/stripe-reconcile-cleanup.test.ts +++ b/e2e/stripe-reconcile-cleanup.test.ts @@ -1,20 +1,17 @@ /** * Verifies the Temporal `reconcileCleanup` activity tombstones rows for - * records that were hard-deleted in Stripe without the corresponding - * `*.deleted` event being processed — the "missed delete" path that - * complements stripe-delete.test.ts (the event-driven path). - * - * Seeds destination rows via in-process engine, then runs the production - * activity through `MockActivityEnvironment` so the composition - * (`pg.getStaleRecords` → `stripe.verifyRecords` → `pg.write`) is exercised - * end-to-end with a Temporal Activity Context active (heartbeats become no-ops). + * records hard-deleted in Stripe without a `*.deleted` event — the "missed + * delete" path complementing stripe-delete.test.ts. Two suites (postgres, + * google_sheets) run the production activity via `MockActivityEnvironment`. */ import pg from 'pg' import Stripe from 'stripe' +import { google } from 'googleapis' import { afterAll, beforeAll, expect, it } from 'vitest' import { MockActivityEnvironment } from '@temporalio/testing' import source from '@stripe/sync-source-stripe' import destinationPostgres from '@stripe/sync-destination-postgres' +import destinationSheets, { readSheet } from '@stripe/sync-destination-google-sheets' import { createEngine } from '@stripe/sync-engine' import type { ConnectorResolver } from '@stripe/sync-engine' import { createActivities } from '@stripe/sync-service' @@ -129,9 +126,7 @@ describeWithEnv( try { // Backfill-only sync (no websocket, no event polling) — both rows // land in postgres with `_synced_at ≈ T0`. - for await (const _msg of engine.pipeline_sync(pipeline)) { - void _msg - } + await drain(engine.pipeline_sync(pipeline)) const seeded = await pool.query<{ id: string }>( `SELECT id FROM "${SCHEMA}"."${STREAM}" WHERE id = ANY($1)`, @@ -177,3 +172,138 @@ describeWithEnv( }, 180_000) } ) + +// MARK: - Google Sheets + +describeWithEnv( + 'temporal reconcile-cleanup activity → google sheets (missed delete)', + ['STRIPE_API_KEY', 'GOOGLE_CLIENT_ID', 'GOOGLE_CLIENT_SECRET', 'GOOGLE_REFRESH_TOKEN'], + ({ STRIPE_API_KEY, GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET, GOOGLE_REFRESH_TOKEN }) => { + const PIPELINE_ID = `pipe_recon_sheets_${ts}` + let stripe: Stripe + let sheetsClient: ReturnType + let driveClient: ReturnType + let spreadsheetId = process.env.GOOGLE_SPREADSHEET_ID ?? '' + let createdSpreadsheetHere = false + + const sourceConfig = { api_key: STRIPE_API_KEY, backfill_limit: BACKFILL_LIMIT } + + const resolver: ConnectorResolver = { + resolveSource: async (name) => { + if (name !== 'stripe') throw new Error(`Unknown source: ${name}`) + return source + }, + resolveDestination: async (name) => { + if (name !== 'google_sheets') throw new Error(`Unknown destination: ${name}`) + return destinationSheets + }, + sources: () => new Map(), + destinations: () => new Map(), + } + + function makePipeline() { + return { + source: { type: 'stripe', stripe: sourceConfig }, + destination: { + type: 'google_sheets', + google_sheets: { + client_id: GOOGLE_CLIENT_ID, + client_secret: GOOGLE_CLIENT_SECRET, + refresh_token: GOOGLE_REFRESH_TOKEN, + ...(spreadsheetId ? { spreadsheet_id: spreadsheetId } : {}), + spreadsheet_title: `e2e-recon-sheets-${ts}`, + batch_size: 50, + }, + }, + streams: [{ name: STREAM }], + } + } + + beforeAll(async () => { + stripe = new Stripe(STRIPE_API_KEY) + const auth = new google.auth.OAuth2(GOOGLE_CLIENT_ID, GOOGLE_CLIENT_SECRET) + auth.setCredentials({ refresh_token: GOOGLE_REFRESH_TOKEN }) + sheetsClient = google.sheets({ version: 'v4', auth }) + driveClient = google.drive({ version: 'v3', auth }) + }) + + afterAll(async () => { + if (createdSpreadsheetHere && spreadsheetId && !process.env.KEEP_TEST_DATA) { + try { + await driveClient.files.delete({ fileId: spreadsheetId }) + } catch {} + } + }) + + it('tombstones customers deleted in stripe without a delete event', async () => { + const engine = await createEngine(resolver) + + // pipeline_setup creates the spreadsheet if needed and emits the new + // id via destination_config — capture so the second pipeline run reuses it. + for await (const m of engine.pipeline_setup(makePipeline())) { + if ( + m.type === 'control' && + m.control.control_type === 'destination_config' && + typeof m.control.destination_config.spreadsheet_id === 'string' && + m.control.destination_config.spreadsheet_id !== spreadsheetId + ) { + spreadsheetId = m.control.destination_config.spreadsheet_id + createdSpreadsheetHere = true + } + } + expect(spreadsheetId, 'no spreadsheet_id available (env or destination)').toBeTruthy() + console.log(`\n Sheets: https://docs.google.com/spreadsheets/d/${spreadsheetId}/`) + console.log(` Pipeline: ${PIPELINE_ID}`) + + const pipeline = makePipeline() + const pipelineStore = memoryPipelineStore() + await pipelineStore.set(PIPELINE_ID, { id: PIPELINE_ID, ...pipeline } as Pipeline) + + const survivor = await stripe.customers.create({ + name: `e2e-recon-sheets-survivor-${Date.now()}`, + }) + const doomed = await stripe.customers.create({ + name: `e2e-recon-sheets-doomed-${Date.now()}`, + }) + const cleanupIds = new Set([survivor.id, doomed.id]) + + try { + // Backfill seeds both customers with `_synced_at ≈ T0`. + await drain(engine.pipeline_sync(pipeline)) + + const seededRows = await readSheet(sheetsClient, spreadsheetId, STREAM) + const seededHeader = (seededRows[0] ?? []) as string[] + const idIdx = seededHeader.indexOf('id') + expect(idIdx, 'id column missing in sheet header').toBeGreaterThanOrEqual(0) + const seededIds = new Set(seededRows.slice(1).map((row) => String(row[idIdx] ?? ''))) + expect(seededIds.has(survivor.id)).toBe(true) + expect(seededIds.has(doomed.id)).toBe(true) + + await stripe.customers.del(doomed.id) + cleanupIds.delete(doomed.id) + + await new Promise((r) => setTimeout(r, 50)) + const syncRunStartedAt = new Date().toISOString() + + const activities = createActivities({ engineUrl: 'http://unused', pipelineStore }) + const env = new MockActivityEnvironment() + await env.run(activities.reconcileCleanup, PIPELINE_ID, syncRunStartedAt) + + const afterRows = await readSheet(sheetsClient, spreadsheetId, STREAM) + const afterIds = new Set(afterRows.slice(1).map((row) => String(row[idIdx] ?? ''))) + expect(afterIds.has(survivor.id), `survivor ${survivor.id} was tombstoned`).toBe(true) + expect(afterIds.has(doomed.id), `doomed ${doomed.id} was not tombstoned`).toBe(false) + console.log(` Survived: ${survivor.id}`) + console.log(` Tombstoned: ${doomed.id}`) + } finally { + if (!process.env.KEEP_TEST_DATA) { + for (const id of cleanupIds) { + try { + await stripe.customers.del(id) + } catch {} + } + } + } + }, 240_000) + } +) diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index a1d1d6aff..a044fb570 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -2050,3 +2050,176 @@ describe('enum constraints on any column', () => { ).toBeUndefined() }) }) + +describe('getStaleRecords', () => { + const catalog: ConfiguredCatalog = { + streams: [ + { + stream: { name: 'customer', primary_key: [['id']], newer_than_field: '_updated_at' }, + sync_mode: 'incremental', + destination_sync_mode: 'append_dedup', + }, + ], + } + + it('returns ids whose _synced_at predates syncRunStartedAt', async () => { + const { sheets, getSpreadsheetIds } = createMemorySheets() + const dest = createDestination(sheets) + + await collect( + dest.write( + { config: cfg(), catalog }, + toAsyncIter([ + record('customer', { id: 'cus_1', name: 'Alice', _account_id: 'acct_A' }), + record('customer', { id: 'cus_2', name: 'Bob', _account_id: 'acct_A' }), + ]) + ) + ) + + const spreadsheetId = getSpreadsheetIds()[0] + const syncRunStartedAt = new Date(Date.now() + 60_000).toISOString() + + const batches: { stream: string; ids: string[] }[] = [] + for await (const batch of dest.getStaleRecords!({ + config: cfg({ spreadsheet_id: spreadsheetId }), + catalog, + syncRunStartedAt, + })) { + batches.push(batch) + } + + expect(batches).toEqual([{ stream: 'customer', ids: ['cus_1', 'cus_2'] }]) + }) + + it('does not return rows whose _synced_at is at or after syncRunStartedAt', async () => { + const { sheets, getSpreadsheetIds } = createMemorySheets() + const dest = createDestination(sheets) + + await collect( + dest.write( + { config: cfg(), catalog }, + toAsyncIter([record('customer', { id: 'cus_1', name: 'Alice', _account_id: 'acct_A' })]) + ) + ) + + const spreadsheetId = getSpreadsheetIds()[0] + const syncRunStartedAt = new Date(0).toISOString() + + const batches: { stream: string; ids: string[] }[] = [] + for await (const batch of dest.getStaleRecords!({ + config: cfg({ spreadsheet_id: spreadsheetId }), + catalog, + syncRunStartedAt, + })) { + batches.push(batch) + } + + expect(batches).toEqual([]) + }) + + it('scopes results to rows matching the filter (e.g. _account_id)', async () => { + const { sheets, getSpreadsheetIds } = createMemorySheets() + const dest = createDestination(sheets) + + await collect( + dest.write( + { config: cfg(), catalog }, + toAsyncIter([ + record('customer', { id: 'cus_a1', name: 'Alice', _account_id: 'acct_A' }), + record('customer', { id: 'cus_b1', name: 'Bob', _account_id: 'acct_B' }), + record('customer', { id: 'cus_a2', name: 'Carol', _account_id: 'acct_A' }), + ]) + ) + ) + + const spreadsheetId = getSpreadsheetIds()[0] + const syncRunStartedAt = new Date(Date.now() + 60_000).toISOString() + + const batches: { stream: string; ids: string[] }[] = [] + for await (const batch of dest.getStaleRecords!({ + config: cfg({ spreadsheet_id: spreadsheetId }), + catalog, + syncRunStartedAt, + filter: { _account_id: 'acct_A' }, + })) { + batches.push(batch) + } + + expect(batches).toEqual([{ stream: 'customer', ids: ['cus_a1', 'cus_a2'] }]) + }) + + it('skips streams whose tab is missing', async () => { + const { sheets, getSpreadsheetIds } = createMemorySheets() + const dest = createDestination(sheets) + + await collect( + dest.write( + { config: cfg(), catalog }, + toAsyncIter([record('customer', { id: 'cus_1', name: 'Alice', _account_id: 'acct_A' })]) + ) + ) + const spreadsheetId = getSpreadsheetIds()[0] + + const multiCatalog: ConfiguredCatalog = { + streams: [ + ...catalog.streams, + { + stream: { name: 'invoice', primary_key: [['id']], newer_than_field: '_updated_at' }, + sync_mode: 'incremental', + destination_sync_mode: 'append_dedup', + }, + ], + } + + const batches: { stream: string; ids: string[] }[] = [] + for await (const batch of dest.getStaleRecords!({ + config: cfg({ spreadsheet_id: spreadsheetId }), + catalog: multiCatalog, + syncRunStartedAt: new Date(Date.now() + 60_000).toISOString(), + })) { + batches.push(batch) + } + + expect(batches.map((b) => b.stream)).toEqual(['customer']) + }) + + it('batches results in chunks of 1000', async () => { + const { sheets, getSpreadsheetIds } = createMemorySheets() + const dest = createDestination(sheets) + const COUNT = 2_500 + + const records = Array.from({ length: COUNT }, (_, i) => + record('customer', { id: `cus_${i}`, name: `n${i}`, _account_id: 'acct_A' }) + ) + await collect(dest.write({ config: cfg({ batch_size: 500 }), catalog }, toAsyncIter(records))) + const spreadsheetId = getSpreadsheetIds()[0] + + const batches: { stream: string; ids: string[] }[] = [] + for await (const batch of dest.getStaleRecords!({ + config: cfg({ spreadsheet_id: spreadsheetId }), + catalog, + syncRunStartedAt: new Date(Date.now() + 60_000).toISOString(), + })) { + batches.push(batch) + } + + expect(batches.map((b) => b.ids.length)).toEqual([1000, 1000, 500]) + expect(batches.flatMap((b) => b.ids)).toHaveLength(COUNT) + }) + + it('returns nothing when spreadsheet_id is unset', async () => { + const { sheets } = createMemorySheets() + const dest = createDestination(sheets) + + const batches: { stream: string; ids: string[] }[] = [] + for await (const batch of dest.getStaleRecords!({ + config: cfg(), + catalog, + syncRunStartedAt: new Date().toISOString(), + })) { + batches.push(batch) + } + + expect(batches).toEqual([]) + }) +}) diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index bb55c2f29..c264715bb 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -667,8 +667,8 @@ export function createDestination( } if (deleteRowNumbers.size > 0) { - // Google sheets API omits trailing blank cells, so we add - // an extra empty cell. + // Google sheets API omits trailing blank cells, so we add + // an extra empty cell. const blankRow = new Array(headers.length + 1).fill('') const deleteList = [...deleteRowNumbers].sort((a, b) => a - b) @@ -969,6 +969,78 @@ export function createDestination( } } }, + + async *getStaleRecords({ config, catalog, syncRunStartedAt, filter }) { + if (!config.spreadsheet_id) return + const streamNames = catalog.streams.map((cs) => cs.stream.name) + if (streamNames.length === 0) return + + const sheets = sheetsClient ?? makeSheetsClient(config) + const BATCH_SIZE = 1000 + const filterEntries = Object.entries(filter ?? {}) + + // One `values.batchGet` covers all streams to stay under the read quota. + // If any tab is missing the call 400s and cleanup skips this run. + let perStream: Map + try { + perStream = await batchReadSheets(sheets, config.spreadsheet_id, streamNames) + } catch (err) { + log.warn( + { err, spreadsheetId: config.spreadsheet_id, streams: streamNames.length }, + 'getStaleRecords: batchGet failed — skipping (will retry next interval)' + ) + return + } + + for (const streamName of streamNames) { + const rows = perStream.get(streamName) ?? [] + if (rows.length < 2) continue + + const headers = (rows[0] as unknown[]).map((h) => String(h ?? '')) + const idIdx = headers.indexOf('id') + const syncedAtIdx = headers.indexOf('_synced_at') + if (idIdx < 0 || syncedAtIdx < 0) { + log.warn( + { stream: streamName, headers }, + 'getStaleRecords: missing id or _synced_at column — skipping stream' + ) + continue + } + const filterIdx = filterEntries.map(([col, value]) => ({ + idx: headers.indexOf(col), + value, + col, + })) + if (filterIdx.some((f) => f.idx < 0)) { + log.warn( + { stream: streamName, filter: filterEntries.map(([col]) => col) }, + 'getStaleRecords: filter column missing — skipping stream (refusing to run unscoped)' + ) + continue + } + + const stale: string[] = [] + for (let r = 1; r < rows.length; r++) { + const row = (rows[r] ?? []) as unknown[] + const syncedAt = String(row[syncedAtIdx] ?? '') + // ISO 8601 UTC with fixed precision (`new Date().toISOString()`), + // so lexicographic compare is equivalent to chronological. + if (!syncedAt || syncedAt >= syncRunStartedAt) continue + if (filterIdx.some((f) => String(row[f.idx] ?? '') !== f.value)) continue + const id = String(row[idIdx] ?? '') + if (!id) continue + stale.push(id) + } + + log.debug( + { stream: streamName, spreadsheetId: config.spreadsheet_id, count: stale.length }, + 'getStaleRecords: scan complete' + ) + for (let i = 0; i < stale.length; i += BATCH_SIZE) { + yield { stream: streamName, ids: stale.slice(i, i + BATCH_SIZE) } + } + } + }, } satisfies Destination return destination From 3f5a3e885c475cac9137f3a86a7a46616f567497 Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 08:17:01 +0200 Subject: [PATCH 3/8] main reverted _synced_at --- .../__tests__/integration.test.ts | 8 +++++-- .../src/index.test.ts | 24 +++++++++---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/packages/destination-google-sheets/__tests__/integration.test.ts b/packages/destination-google-sheets/__tests__/integration.test.ts index 0707faae8..b0e771be0 100644 --- a/packages/destination-google-sheets/__tests__/integration.test.ts +++ b/packages/destination-google-sheets/__tests__/integration.test.ts @@ -25,8 +25,12 @@ describeWithEnv( } function stripUpdatedAt(rows: unknown[][]): unknown[][] { - const idx = rows[0]?.indexOf('_updated_at') ?? -1 - return idx < 0 ? rows : rows.map((row) => row.filter((_, i) => i !== idx)) + const header = rows[0] ?? [] + const indexes = new Set( + ['_updated_at', '_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0) + ) + if (indexes.size === 0) return rows + return rows.map((row) => row.filter((_, i) => !indexes.has(i))) } it.skip('writes records to an existing spreadsheet and reads them back', async () => { diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index a044fb570..b8d221ea5 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -18,22 +18,19 @@ import { import { createMemorySheets } from '../__tests__/memory-sheets.js' /** - * Strip the source-provided `_updated_at` column from a 2D rows array. + * Strip metadata timestamp columns from a 2D rows array. * - * The source stamps every record with `_updated_at` (unix seconds, from - * `event.created` for webhooks or the HTTP `Date` header for backfill); - * the destination passes the value through verbatim. Most tests don't - * care about its exact value and just want to assert on source data; - * use this helper at the assertion site to drop the column before - * comparing. Tests that exercise the column itself stay in the - * `_updated_at column` describe block at the bottom of this file. + * The destination stamps `_synced_at`; the source may stamp `_updated_at`. + * Most tests only care about source data, so drop both at assertion sites. */ function stripUpdatedAt(rows: unknown[][] | undefined): unknown[][] { if (!rows || rows.length === 0) return rows ?? [] const header = rows[0] as unknown[] - const idx = header.indexOf('_updated_at') - if (idx < 0) return rows - return rows.map((row) => row.filter((_, i) => i !== idx)) + const indexes = new Set( + ['_updated_at', '_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0) + ) + if (indexes.size === 0) return rows + return rows.map((row) => row.filter((_, i) => !indexes.has(i))) } /** Collect all output from the destination's write() generator. */ @@ -1789,8 +1786,11 @@ describe('_updated_at column (source-owned, passthrough)', () => { ) const rows = getData(getSpreadsheetIds()[0], 'users')! - expect(rows[0]).toEqual(['id', 'name']) + expect(rows[0]).toEqual(['id', 'name', '_synced_at']) expect(rows[0]).not.toContain('_updated_at') + const syncedAtIdx = (rows[0] as string[]).indexOf('_synced_at') + expect(syncedAtIdx).toBeGreaterThanOrEqual(0) + expect(Date.parse(String(rows[1][syncedAtIdx]))).not.toBeNaN() }) it('passes the source-provided _updated_at through verbatim', async () => { From a210f93f429e81d7502271a45e553c8e99cc604a Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 18:56:20 +0200 Subject: [PATCH 4/8] customers --- e2e/stripe-reconcile-cleanup.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/stripe-reconcile-cleanup.test.ts b/e2e/stripe-reconcile-cleanup.test.ts index ba4476c0a..b0f9a831c 100644 --- a/e2e/stripe-reconcile-cleanup.test.ts +++ b/e2e/stripe-reconcile-cleanup.test.ts @@ -25,7 +25,7 @@ const ts = new Date() .toISOString() .replace(/[-:T.Z]/g, '') .slice(0, 15) -const STREAM = 'customer' +const STREAM = 'customers' const BACKFILL_LIMIT = 10 function memoryPipelineStore() { From 5234b4e7fcd14de07d2f6691d8ef63e6c42ac495 Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 19:19:19 +0200 Subject: [PATCH 5/8] use last_synced_at --- packages/destination-postgres/src/index.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/packages/destination-postgres/src/index.ts b/packages/destination-postgres/src/index.ts index 25c7fa043..43162f2e5 100644 --- a/packages/destination-postgres/src/index.ts +++ b/packages/destination-postgres/src/index.ts @@ -117,6 +117,7 @@ export async function upsertMany( skipped_count: 0, } + const syncedAt = new Date().toISOString() const records = entries.map((e) => { const ts = e[newerThanField] as unknown if (typeof ts !== 'number' || !Number.isFinite(ts)) { @@ -124,7 +125,11 @@ export async function upsertMany( `upsertMany: record missing source-stamped "${newerThanField}" (table=${schema}.${table}, id=${String(e.id)}). See DDR-009.` ) } - return { _raw_data: e, _updated_at: new Date(ts * 1000).toISOString() } + return { + _raw_data: e, + _last_synced_at: syncedAt, + _updated_at: new Date(ts * 1000).toISOString(), + } }) return await upsertWithStats(pool, records, { @@ -559,7 +564,7 @@ const destination = { const streamName = configuredStream.stream.name const filterEntries = Object.entries(filter ?? {}) const filterClauses = filterEntries.map(([col], i) => `${ident(col)} = $${i + 2}`) - const whereClauses = ['_synced_at < $1', ...filterClauses].join(' AND ') + const whereClauses = ['_last_synced_at < $1', ...filterClauses].join(' AND ') const query = sql`SELECT id FROM ${qualifiedTable(config.schema, streamName)} WHERE ${whereClauses}` const params: unknown[] = [syncRunStartedAt, ...filterEntries.map(([, v]) => v)] From e46ce39a0e39d0766e7ba67989cab34624a6baa3 Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 19:19:40 +0200 Subject: [PATCH 6/8] disable vercel ci test --- .github/workflows/ci.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 36208e8cd..47e367eb8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -855,9 +855,11 @@ jobs: # --------------------------------------------------------------------------- # Docs — deploy to Vercel (always, so e2e_cdn can test the live CDN) + # Disabled: Vercel project deleted. Re-enable by removing `if: false`. # --------------------------------------------------------------------------- docs: name: Deploy Docs + if: false runs-on: ubuntu-24.04-arm outputs: deployment_url: ${{ steps.deploy.outputs.url }} @@ -964,9 +966,11 @@ jobs: # --------------------------------------------------------------------------- # E2E CDN — verify stripe-sync.dev/stripe-api-specs after Vercel deploy + # Disabled with `docs` (depends on its deployment_url output). # --------------------------------------------------------------------------- e2e_cdn: name: E2E CDN + if: false needs: [docs] runs-on: ubuntu-24.04-arm From 33e8dab670cc4fc2a956c6fae6b8d1e1d8fad429 Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 19:39:37 +0200 Subject: [PATCH 7/8] make sure it's called _last_syned_at in all places --- e2e/stripe-reconcile-cleanup.test.ts | 8 ++++---- .../__tests__/integration.test.ts | 2 +- packages/destination-google-sheets/src/index.test.ts | 12 ++++++------ packages/destination-google-sheets/src/index.ts | 12 ++++++++---- 4 files changed, 19 insertions(+), 15 deletions(-) diff --git a/e2e/stripe-reconcile-cleanup.test.ts b/e2e/stripe-reconcile-cleanup.test.ts index b0f9a831c..2707b1056 100644 --- a/e2e/stripe-reconcile-cleanup.test.ts +++ b/e2e/stripe-reconcile-cleanup.test.ts @@ -125,7 +125,7 @@ describeWithEnv( try { // Backfill-only sync (no websocket, no event polling) — both rows - // land in postgres with `_synced_at ≈ T0`. + // land in postgres with `_last_synced_at ≈ T0`. await drain(engine.pipeline_sync(pipeline)) const seeded = await pool.query<{ id: string }>( @@ -139,8 +139,8 @@ describeWithEnv( await stripe.customers.del(doomed.id) cleanupIds.delete(doomed.id) - // `_synced_at` is set with millisecond precision by the destination, - // so a small forward skew guarantees `syncRunStartedAt > _synced_at`. + // `_last_synced_at` is set with millisecond precision by the destination, + // so a small forward skew guarantees `syncRunStartedAt > _last_synced_at`. await new Promise((r) => setTimeout(r, 50)) const syncRunStartedAt = new Date().toISOString() @@ -268,7 +268,7 @@ describeWithEnv( const cleanupIds = new Set([survivor.id, doomed.id]) try { - // Backfill seeds both customers with `_synced_at ≈ T0`. + // Backfill seeds both customers with `_last_synced_at ≈ T0`. await drain(engine.pipeline_sync(pipeline)) const seededRows = await readSheet(sheetsClient, spreadsheetId, STREAM) diff --git a/packages/destination-google-sheets/__tests__/integration.test.ts b/packages/destination-google-sheets/__tests__/integration.test.ts index b0e771be0..d301596ca 100644 --- a/packages/destination-google-sheets/__tests__/integration.test.ts +++ b/packages/destination-google-sheets/__tests__/integration.test.ts @@ -27,7 +27,7 @@ describeWithEnv( function stripUpdatedAt(rows: unknown[][]): unknown[][] { const header = rows[0] ?? [] const indexes = new Set( - ['_updated_at', '_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0) + ['_updated_at', '_last_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0) ) if (indexes.size === 0) return rows return rows.map((row) => row.filter((_, i) => !indexes.has(i))) diff --git a/packages/destination-google-sheets/src/index.test.ts b/packages/destination-google-sheets/src/index.test.ts index b8d221ea5..7162b23a3 100644 --- a/packages/destination-google-sheets/src/index.test.ts +++ b/packages/destination-google-sheets/src/index.test.ts @@ -20,14 +20,14 @@ import { createMemorySheets } from '../__tests__/memory-sheets.js' /** * Strip metadata timestamp columns from a 2D rows array. * - * The destination stamps `_synced_at`; the source may stamp `_updated_at`. + * The destination stamps `_last_synced_at`; the source may stamp `_updated_at`. * Most tests only care about source data, so drop both at assertion sites. */ function stripUpdatedAt(rows: unknown[][] | undefined): unknown[][] { if (!rows || rows.length === 0) return rows ?? [] const header = rows[0] as unknown[] const indexes = new Set( - ['_updated_at', '_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0) + ['_updated_at', '_last_synced_at'].map((name) => header.indexOf(name)).filter((idx) => idx >= 0) ) if (indexes.size === 0) return rows return rows.map((row) => row.filter((_, i) => !indexes.has(i))) @@ -1786,9 +1786,9 @@ describe('_updated_at column (source-owned, passthrough)', () => { ) const rows = getData(getSpreadsheetIds()[0], 'users')! - expect(rows[0]).toEqual(['id', 'name', '_synced_at']) + expect(rows[0]).toEqual(['id', 'name', '_last_synced_at']) expect(rows[0]).not.toContain('_updated_at') - const syncedAtIdx = (rows[0] as string[]).indexOf('_synced_at') + const syncedAtIdx = (rows[0] as string[]).indexOf('_last_synced_at') expect(syncedAtIdx).toBeGreaterThanOrEqual(0) expect(Date.parse(String(rows[1][syncedAtIdx]))).not.toBeNaN() }) @@ -2062,7 +2062,7 @@ describe('getStaleRecords', () => { ], } - it('returns ids whose _synced_at predates syncRunStartedAt', async () => { + it('returns ids whose _last_synced_at predates syncRunStartedAt', async () => { const { sheets, getSpreadsheetIds } = createMemorySheets() const dest = createDestination(sheets) @@ -2091,7 +2091,7 @@ describe('getStaleRecords', () => { expect(batches).toEqual([{ stream: 'customer', ids: ['cus_1', 'cus_2'] }]) }) - it('does not return rows whose _synced_at is at or after syncRunStartedAt', async () => { + it('does not return rows whose _last_synced_at is at or after syncRunStartedAt', async () => { const { sheets, getSpreadsheetIds } = createMemorySheets() const dest = createDestination(sheets) diff --git a/packages/destination-google-sheets/src/index.ts b/packages/destination-google-sheets/src/index.ts index c264715bb..ba0636f3d 100644 --- a/packages/destination-google-sheets/src/index.ts +++ b/packages/destination-google-sheets/src/index.ts @@ -846,10 +846,14 @@ export function createDestination( } } - const headers = await ensureHeadersForRecord(stream, cleanData) + const syncedData: Record = { + ...cleanData, + _last_synced_at: new Date().toISOString(), + } + const headers = await ensureHeadersForRecord(stream, syncedData) const tsFields = streamTimestampFields.get(stream) const row = headers.map((header) => { - const value = cleanData[header] + const value = syncedData[header] if (tsFields?.has(header) && typeof value === 'number') { return unixToIso(value) } @@ -998,11 +1002,11 @@ export function createDestination( const headers = (rows[0] as unknown[]).map((h) => String(h ?? '')) const idIdx = headers.indexOf('id') - const syncedAtIdx = headers.indexOf('_synced_at') + const syncedAtIdx = headers.indexOf('_last_synced_at') if (idIdx < 0 || syncedAtIdx < 0) { log.warn( { stream: streamName, headers }, - 'getStaleRecords: missing id or _synced_at column — skipping stream' + 'getStaleRecords: missing id or _last_synced_at column — skipping stream' ) continue } From 30748284a28f638a56dff474cd609edc624a39be Mon Sep 17 00:00:00 2001 From: Yostra Date: Thu, 7 May 2026 20:11:25 +0200 Subject: [PATCH 8/8] add a second object type --- e2e/stripe-reconcile-cleanup.test.ts | 123 ++++++++++++++++++++++----- 1 file changed, 101 insertions(+), 22 deletions(-) diff --git a/e2e/stripe-reconcile-cleanup.test.ts b/e2e/stripe-reconcile-cleanup.test.ts index 2707b1056..494cd2d2f 100644 --- a/e2e/stripe-reconcile-cleanup.test.ts +++ b/e2e/stripe-reconcile-cleanup.test.ts @@ -25,7 +25,8 @@ const ts = new Date() .toISOString() .replace(/[-:T.Z]/g, '') .slice(0, 15) -const STREAM = 'customers' +const CUSTOMERS_STREAM = 'customers' +const PRODUCTS_STREAM = 'products' const BACKFILL_LIMIT = 10 function memoryPipelineStore() { @@ -84,7 +85,7 @@ describeWithEnv( return { source: { type: 'stripe', stripe: sourceConfig }, destination: { type: 'postgres', postgres: destConfig }, - streams: [{ name: STREAM }], + streams: [{ name: CUSTOMERS_STREAM }, { name: PRODUCTS_STREAM }], } } @@ -107,7 +108,7 @@ describeWithEnv( await pool.end() }) - it('tombstones customers deleted in stripe without a delete event', async () => { + it('tombstones deleted customers and products', async () => { const engine = await createEngine(resolver) const pipeline = makePipeline() const pipelineStore = memoryPipelineStore() @@ -121,23 +122,39 @@ describeWithEnv( const doomed = await stripe.customers.create({ name: `e2e-recon-doomed-${Date.now()}`, }) - const cleanupIds = new Set([survivor.id, doomed.id]) + const productSurvivor = await stripe.products.create({ + name: `e2e-recon-product-survivor-${Date.now()}`, + }) + const productDoomed = await stripe.products.create({ + name: `e2e-recon-product-doomed-${Date.now()}`, + }) + const cleanupCustomerIds = new Set([survivor.id, doomed.id]) + const cleanupProductIds = new Set([productSurvivor.id, productDoomed.id]) try { - // Backfill-only sync (no websocket, no event polling) — both rows + // Backfill-only sync (no websocket, no event polling) — all rows // land in postgres with `_last_synced_at ≈ T0`. await drain(engine.pipeline_sync(pipeline)) const seeded = await pool.query<{ id: string }>( - `SELECT id FROM "${SCHEMA}"."${STREAM}" WHERE id = ANY($1)`, + `SELECT id FROM "${SCHEMA}"."${CUSTOMERS_STREAM}" WHERE id = ANY($1)`, [[survivor.id, doomed.id]] ) expect(new Set(seeded.rows.map((r) => r.id))).toEqual(new Set([survivor.id, doomed.id])) + const seededProducts = await pool.query<{ id: string }>( + `SELECT id FROM "${SCHEMA}"."${PRODUCTS_STREAM}" WHERE id = ANY($1)`, + [[productSurvivor.id, productDoomed.id]] + ) + expect(new Set(seededProducts.rows.map((r) => r.id))).toEqual( + new Set([productSurvivor.id, productDoomed.id]) + ) - // Hard-delete one customer WITHOUT replaying the customer.deleted + // Hard-delete one object per stream WITHOUT replaying the *.deleted // event — this is the "missed delete" reconcile-cleanup catches. await stripe.customers.del(doomed.id) - cleanupIds.delete(doomed.id) + cleanupCustomerIds.delete(doomed.id) + await stripe.products.del(productDoomed.id) + cleanupProductIds.delete(productDoomed.id) // `_last_synced_at` is set with millisecond precision by the destination, // so a small forward skew guarantees `syncRunStartedAt > _last_synced_at`. @@ -152,21 +169,43 @@ describeWithEnv( await env.run(activities.reconcileCleanup, PIPELINE_ID, syncRunStartedAt) const after = await pool.query<{ id: string }>( - `SELECT id FROM "${SCHEMA}"."${STREAM}" WHERE id = ANY($1)`, + `SELECT id FROM "${SCHEMA}"."${CUSTOMERS_STREAM}" WHERE id = ANY($1)`, [[survivor.id, doomed.id]] ) const remaining = new Set(after.rows.map((r) => r.id)) expect(remaining.has(survivor.id), `survivor ${survivor.id} was tombstoned`).toBe(true) expect(remaining.has(doomed.id), `doomed ${doomed.id} was not tombstoned`).toBe(false) - console.log(` Survived: ${survivor.id}`) - console.log(` Tombstoned: ${doomed.id}`) + + const afterProducts = await pool.query<{ id: string }>( + `SELECT id FROM "${SCHEMA}"."${PRODUCTS_STREAM}" WHERE id = ANY($1)`, + [[productSurvivor.id, productDoomed.id]] + ) + const remainingProducts = new Set(afterProducts.rows.map((r) => r.id)) + expect( + remainingProducts.has(productSurvivor.id), + `product survivor ${productSurvivor.id} was tombstoned` + ).toBe(true) + expect( + remainingProducts.has(productDoomed.id), + `product doomed ${productDoomed.id} was not tombstoned` + ).toBe(false) + + console.log(` Customer survived: ${survivor.id}`) + console.log(` Customer tombstoned: ${doomed.id}`) + console.log(` Product survived: ${productSurvivor.id}`) + console.log(` Product tombstoned: ${productDoomed.id}`) } finally { if (!process.env.KEEP_TEST_DATA) { - for (const id of cleanupIds) { + for (const id of cleanupCustomerIds) { try { await stripe.customers.del(id) } catch {} } + for (const id of cleanupProductIds) { + try { + await stripe.products.del(id) + } catch {} + } } } }, 180_000) @@ -215,7 +254,7 @@ describeWithEnv( batch_size: 50, }, }, - streams: [{ name: STREAM }], + streams: [{ name: CUSTOMERS_STREAM }, { name: PRODUCTS_STREAM }], } } @@ -235,7 +274,7 @@ describeWithEnv( } }) - it('tombstones customers deleted in stripe without a delete event', async () => { + it('tombstones deleted customers and products', async () => { const engine = await createEngine(resolver) // pipeline_setup creates the spreadsheet if needed and emits the new @@ -265,13 +304,20 @@ describeWithEnv( const doomed = await stripe.customers.create({ name: `e2e-recon-sheets-doomed-${Date.now()}`, }) - const cleanupIds = new Set([survivor.id, doomed.id]) + const productSurvivor = await stripe.products.create({ + name: `e2e-recon-sheets-product-survivor-${Date.now()}`, + }) + const productDoomed = await stripe.products.create({ + name: `e2e-recon-sheets-product-doomed-${Date.now()}`, + }) + const cleanupCustomerIds = new Set([survivor.id, doomed.id]) + const cleanupProductIds = new Set([productSurvivor.id, productDoomed.id]) try { - // Backfill seeds both customers with `_last_synced_at ≈ T0`. + // Backfill seeds both streams with `_last_synced_at ≈ T0`. await drain(engine.pipeline_sync(pipeline)) - const seededRows = await readSheet(sheetsClient, spreadsheetId, STREAM) + const seededRows = await readSheet(sheetsClient, spreadsheetId, CUSTOMERS_STREAM) const seededHeader = (seededRows[0] ?? []) as string[] const idIdx = seededHeader.indexOf('id') expect(idIdx, 'id column missing in sheet header').toBeGreaterThanOrEqual(0) @@ -279,8 +325,20 @@ describeWithEnv( expect(seededIds.has(survivor.id)).toBe(true) expect(seededIds.has(doomed.id)).toBe(true) + const seededProducts = await readSheet(sheetsClient, spreadsheetId, PRODUCTS_STREAM) + const seededProductHeader = (seededProducts[0] ?? []) as string[] + const productIdIdx = seededProductHeader.indexOf('id') + expect(productIdIdx, 'id column missing in products header').toBeGreaterThanOrEqual(0) + const seededProductIds = new Set( + seededProducts.slice(1).map((row) => String(row[productIdIdx] ?? '')) + ) + expect(seededProductIds.has(productSurvivor.id)).toBe(true) + expect(seededProductIds.has(productDoomed.id)).toBe(true) + await stripe.customers.del(doomed.id) - cleanupIds.delete(doomed.id) + cleanupCustomerIds.delete(doomed.id) + await stripe.products.del(productDoomed.id) + cleanupProductIds.delete(productDoomed.id) await new Promise((r) => setTimeout(r, 50)) const syncRunStartedAt = new Date().toISOString() @@ -289,19 +347,40 @@ describeWithEnv( const env = new MockActivityEnvironment() await env.run(activities.reconcileCleanup, PIPELINE_ID, syncRunStartedAt) - const afterRows = await readSheet(sheetsClient, spreadsheetId, STREAM) + const afterRows = await readSheet(sheetsClient, spreadsheetId, CUSTOMERS_STREAM) const afterIds = new Set(afterRows.slice(1).map((row) => String(row[idIdx] ?? ''))) expect(afterIds.has(survivor.id), `survivor ${survivor.id} was tombstoned`).toBe(true) expect(afterIds.has(doomed.id), `doomed ${doomed.id} was not tombstoned`).toBe(false) - console.log(` Survived: ${survivor.id}`) - console.log(` Tombstoned: ${doomed.id}`) + + const afterProducts = await readSheet(sheetsClient, spreadsheetId, PRODUCTS_STREAM) + const afterProductIds = new Set( + afterProducts.slice(1).map((row) => String(row[productIdIdx] ?? '')) + ) + expect( + afterProductIds.has(productSurvivor.id), + `product survivor ${productSurvivor.id} was tombstoned` + ).toBe(true) + expect( + afterProductIds.has(productDoomed.id), + `product doomed ${productDoomed.id} was not tombstoned` + ).toBe(false) + + console.log(` Customer survived: ${survivor.id}`) + console.log(` Customer tombstoned: ${doomed.id}`) + console.log(` Product survived: ${productSurvivor.id}`) + console.log(` Product tombstoned: ${productDoomed.id}`) } finally { if (!process.env.KEEP_TEST_DATA) { - for (const id of cleanupIds) { + for (const id of cleanupCustomerIds) { try { await stripe.customers.del(id) } catch {} } + for (const id of cleanupProductIds) { + try { + await stripe.products.del(id) + } catch {} + } } } }, 240_000)