Conversation
a0f9db7 to
ddf19aa
Compare
757e0e9 to
3074828
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a reconcile-cleanup path that detects records hard-deleted in Stripe but missed by the event stream, and tombstones them in the destination on the next reconcile pass.
This introduces two optional connector capabilities:
Destination.getStaleRecords— emits batches of{ stream, ids[] }for rows whose destination-stamped_last_synced_atpredates the currentsyncRunStartedAt, optionally scoped by afilter(e.g.{ _account_id }) for safe multi-tenant operation.Source.verifyRecords— given those batches, re-fetches each record upstream and yields arecordDeleted: truemessage for anything Stripe returns as404or{ deleted: true }.Composing the two via the destination's existing
writepath turns "missing in source" into a tombstone in the destination — no special delete primitive required.Destination stamp column
The "is this row stale?" check needs a destination-stamped timestamp that advances on every successful sync. Both destinations now use the same column name:
postgres_last_synced_atmain; this PR makesupsertManyactually populate it.google_sheets_last_synced_atmain, aligned with the Postgres column name for cross-destination consistency._last_synced_atis intentionally not marked as a volatile column in the Postgres upsert — every sync (including no-op rows) advances the timestamp, so each reconcile pass converges on a shrinking set of stale ids instead of an ever-growing pile.Temporal activity
reconcileCleanup(pipelineId, syncRunStartedAt)resolves the destination via a small whitelist (postgres,google_sheets), composesgetStaleRecords → verifyRecords → write, and heartbeats per stream and every 15s while writing. Failures are logged and swallowed so the next reconcile interval re-runs.The activity is registered in
createActivitiesand a proxy is declared inworkflows/_shared.tsfor future workflow integration. No production workflow calls it yet — kept inert pending a separate change to wire it intopipeline-lifecycle.CI
docs(Vercel deploy) ande2e_cdnjobs (if: false) — the Vercel project was deleted. Re-enable by removing the flag once a new project is provisioned. Both jobs are kept in the workflow file unchanged, so flipping them back on is a one-character change.Test plan
E2E (
MockActivityEnvironmentruns the production activity end-to-end). Each suite seeds two customers via the in-process engine, hard-deletes one in Stripe without replaying thecustomer.deletedevent, runs the activity, and asserts the doomed row is tombstoned while the survivor remains. Thecustomersstream name matches the Stripe source's catalog onmain.