diff --git a/src/cli.ts b/src/cli.ts index 40ed606..b9b5302 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -15,6 +15,8 @@ import { commit, url, rm, + rmTracked, + resync, ls, config, watch, @@ -248,6 +250,39 @@ program await rm(path); }); +// Rm-tracked command — remove a specific path from the snapshot +program + .command("rm-tracked") + .summary("Give up tracking a chronically unavailable path") + .description( + "Remove a file from the pushwork snapshot (and by default from the local filesystem). " + + "Use this when a tracked file has been unavailable on the remote for so many syncs " + + "that you want pushwork to stop trying to reconcile it." + ) + .argument("", "Relative path of the tracked file") + .argument("[path]", "Directory path (default: current directory)", ".") + .option("--keep-local", "Keep the local file; only remove it from the snapshot", false) + .action(async (file, pathArg, opts) => { + await rmTracked(pathArg, file, { + keepLocal: opts.keepLocal, + }); + }); + +// Resync command — re-push a tracked file as a fresh document +program + .command("resync") + .summary("Re-push a tracked file as a fresh Automerge document") + .description( + "Create a new Automerge document from the current local content and update " + + "the parent directory entry to point at it. Useful when the existing document " + + "has become chronically unavailable or corrupted on the remote." + ) + .argument("", "Relative path of the tracked file") + .argument("[path]", "Directory path (default: current directory)", ".") + .action(async (file, pathArg) => { + await resync(pathArg, file); + }); + // List command program .command("ls") diff --git a/src/commands.ts b/src/commands.ts index 9b949af..78824a0 100644 --- a/src/commands.ts +++ b/src/commands.ts @@ -66,8 +66,15 @@ async function initializeRepository( } // Create repository and sync engine - const repo = await createRepo(resolvedPath, config, sub); - const syncEngine = new SyncEngine(repo, resolvedPath, config); + const { repo, requiresRehydrate, recoveryReason } = await createRepo( + resolvedPath, + config, + sub + ); + const syncEngine = new SyncEngine(repo, resolvedPath, config, { + requiresRehydrate, + recoveryReason, + }); return { config, repo, syncEngine }; } @@ -122,10 +129,17 @@ async function setupCommandContext( } // Create repo with config - const repo = await createRepo(resolvedPath, config, sub); + const { repo, requiresRehydrate, recoveryReason } = await createRepo( + resolvedPath, + config, + sub + ); // Create sync engine - const syncEngine = new SyncEngine(repo, resolvedPath, config); + const syncEngine = new SyncEngine(repo, resolvedPath, config, { + requiresRehydrate, + recoveryReason, + }); return { repo, @@ -531,23 +545,52 @@ export async function status( // Show verbose details if requested if (options.verbose && syncStatus.snapshot?.rootDirectoryUrl) { - const rootHandle = await repo.find( - syncStatus.snapshot.rootDirectoryUrl - ); - const rootDoc = await rootHandle.doc(); - - if (rootDoc) { - out.infoBlock("HEADS"); - out.arr(rootHandle.heads()); - - if (syncStatus.snapshot && syncStatus.snapshot.files.size > 0) { - out.infoBlock("TRACKED FILES"); - const filesObj: Record = {}; - syncStatus.snapshot.files.forEach((entry, filePath) => { - filesObj[filePath] = entry.url; - }); - out.obj(filesObj); + try { + const rootHandle = await repo.find( + syncStatus.snapshot.rootDirectoryUrl + ); + const rootDoc = await rootHandle.doc(); + + if (rootDoc) { + out.infoBlock("HEADS"); + out.arr(rootHandle.heads()); + + if (syncStatus.snapshot && syncStatus.snapshot.files.size > 0) { + out.infoBlock("TRACKED FILES"); + const filesObj: Record = {}; + syncStatus.snapshot.files.forEach((entry, filePath) => { + filesObj[filePath] = entry.url; + }); + out.obj(filesObj); + } } + } catch (error) { + out.warn(`Warning: Could not load root document details: ${error}`); + } + } + + // Chronic-unavailable section: paths whose consecutive-unavailable + // counter is non-zero. These are files pushwork has repeatedly + // declined to reconcile because their remote state could not be + // confirmed. Always shown in verbose mode, independent of root-doc + // availability, because the counter is a pure snapshot property. + if (options.verbose && syncStatus.snapshot) { + const chronic: Array<{ path: string; count: number }> = []; + syncStatus.snapshot.files.forEach((entry, filePath) => { + const n = entry.consecutiveUnavailableCount ?? 0; + if (n > 0) chronic.push({ path: filePath, count: n }); + }); + if (chronic.length > 0) { + chronic.sort((a, b) => b.count - a.count); + out.infoBlock("CHRONICALLY UNAVAILABLE"); + const obj: Record = {}; + for (const { path: p, count } of chronic) { + obj[p] = `${count} consecutive sync(s) unavailable`; + } + out.obj(obj); + out.info( + "Resolve with: pushwork rm-tracked or pushwork resync " + ); } } @@ -749,6 +792,76 @@ export async function rm(targetPath: string = "."): Promise { process.exit(); } +/** + * Remove a tracked path from the snapshot. Used to give up on a file + * that has been chronically unavailable on the remote. + */ +export async function rmTracked( + targetPath: string, + relativePath: string, + options: { keepLocal?: boolean } = {} +): Promise { + const { syncEngine, repo } = await setupCommandContext(targetPath, { + syncEnabled: false, + }); + + const deleteLocal = !options.keepLocal; + out.task( + `Removing ${relativePath} from snapshot${deleteLocal ? " and local disk" : ""}` + ); + try { + const removed = await syncEngine.removeTrackedPath(relativePath, { + deleteLocal, + }); + if (!removed) { + out.done(); + out.warn(`Path not tracked: ${relativePath}`); + await safeRepoShutdown(repo); + out.exit(1); + return; + } + out.done(); + out.success(`Removed ${relativePath}`); + } catch (error) { + out.crash(error); + } finally { + await safeRepoShutdown(repo); + } +} + +/** + * Force-recreate the remote document for a tracked path from current + * local content. Used to recover a file whose Automerge document has + * become chronically unavailable — mints a fresh doc and updates the + * parent directory entry to point at it. + */ +export async function resync( + targetPath: string, + relativePath: string +): Promise { + const { syncEngine, repo } = await setupCommandContext(targetPath); + + out.task(`Re-syncing ${relativePath}`); + try { + const ok = await syncEngine.resyncPath(relativePath); + if (!ok) { + out.done(); + out.warn( + `Could not re-sync ${relativePath} (not tracked or local file missing)` + ); + await safeRepoShutdown(repo); + out.exit(1); + return; + } + out.done(); + out.success(`Re-synced ${relativePath} to a fresh document`); + } catch (error) { + out.crash(error); + } finally { + await safeRepoShutdown(repo); + } +} + export async function commit( targetPath: string, _options: CommandOptions = {} diff --git a/src/core/change-detection.ts b/src/core/change-detection.ts index 933a356..d7a84d8 100644 --- a/src/core/change-detection.ts +++ b/src/core/change-detection.ts @@ -21,6 +21,7 @@ import { joinAndNormalizePath, getPlainUrl, readDocContent, + RemoteLookup, } from "../utils" import {isContentEqual, contentHash} from "../utils/content" import {out} from "../utils/output" @@ -34,6 +35,23 @@ function debug(...args: any[]) { * Change detection engine */ export class ChangeDetector { + /** + * Paths (snapshot-tracked files) for which `detectRemoteChanges` + * observed a `RemoteLookup` of kind `unavailable` during the most + * recent `detectChanges` call. These paths were deliberately not + * emitted as deletions (Phase 1/2) because we cannot confirm their + * absence on the remote. Callers may use this to maintain a + * consecutive-unavailable counter per path — see `SnapshotFileEntry`. + */ + private _lastSkippedUnavailablePaths: Set = new Set() + + /** + * Paths for which `detectRemoteChanges` observed a successful lookup + * (kind `found` or `absent`) during the most recent `detectChanges` + * call. Callers should reset the per-path counter for these paths. + */ + private _lastConfirmedPaths: Set = new Set() + constructor( private repo: Repo, private rootPath: string, @@ -41,6 +59,23 @@ export class ChangeDetector { private artifactDirectories: string[] = [] ) {} + /** + * Paths whose remote state could not be determined during the last + * `detectChanges` call. See `SnapshotFileEntry.consecutiveUnavailableCount`. + */ + getLastSkippedUnavailablePaths(): ReadonlySet { + return this._lastSkippedUnavailablePaths + } + + /** + * Paths whose remote state was confirmed (present or absent) during + * the last `detectChanges` call. The per-path unavailable counter + * should be reset to 0 for these paths. + */ + getLastConfirmedPaths(): ReadonlySet { + return this._lastConfirmedPaths + } + /** * Check if a file path is inside an artifact directory. * Artifact files use RawString and are always replaced wholesale, @@ -57,6 +92,8 @@ export class ChangeDetector { */ async detectChanges(snapshot: SyncSnapshot, excludePaths?: Set): Promise { const changes: DetectedChange[] = [] + this._lastSkippedUnavailablePaths = new Set() + this._lastConfirmedPaths = new Set() // Get current filesystem state const currentFiles = await this.getCurrentFilesystemState() @@ -247,13 +284,32 @@ export class ChangeDetector { Array.from(snapshot.files.entries()).map( async ([relativePath, snapshotEntry]) => { // Find the file's current entry in the remote directory hierarchy - const remoteEntry = await this.findInRemoteDirectory( + const lookup = await this.findInRemoteDirectory( snapshot.rootDirectoryUrl, relativePath ) - if (!remoteEntry) { - // File was removed from remote directory listing + if (lookup.kind === "unavailable") { + // We could not read the authoritative directory for + // this path. Do NOT emit a change: we have no basis + // to conclude anything about the remote state. The + // next sync will try again. Record the skip so the + // caller can maintain a consecutive-unavailable + // counter — chronic unavailability requires user + // intervention. + this._lastSkippedUnavailablePaths.add(relativePath) + debug( + `detectRemoteChanges: skipping ${relativePath} — directory unavailable (${lookup.reason})` + ) + return + } + + // Any kind of successful lookup (found or absent) counts + // as confirmed for the chronic-unavailability tracker. + this._lastConfirmedPaths.add(relativePath) + + if (lookup.kind === "absent") { + // File was confirmed absent from the remote directory listing const localContent = await this.getLocalContent(relativePath) // Only report as deleted if local file still exists @@ -267,11 +323,15 @@ export class ChangeDetector { remoteContent: null, // File deleted remotely localHead: snapshotEntry.head, remoteHead: snapshotEntry.head, + confirmedAbsent: true, }) } return } + // lookup.kind === "found" + const remoteEntry = lookup.entry + // Check if the document was replaced entirely (new URL). // This happens when a peer replaces an artifact file, fixes a // legacy immutable string, or recreates a failed document. @@ -696,13 +756,22 @@ export class ChangeDetector { /** * Find a file's entry in the remote directory hierarchy. - * Returns the entry (with name, type, url) or null if not found. + * + * Returns a tri-state `RemoteLookup` that distinguishes: + * - `found`: the directory doc was read and the file entry exists + * - `absent`: the directory doc was read and the file entry is absent + * - `unavailable`: the directory doc could not be read (transient) + * + * Callers that perform destructive operations MUST check for `absent` + * specifically, not just "not found". */ private async findInRemoteDirectory( rootDirectoryUrl: AutomergeUrl | undefined, filePath: string - ): Promise<{ name: string; type: string; url: AutomergeUrl } | null> { - if (!rootDirectoryUrl) return null + ): Promise { + if (!rootDirectoryUrl) { + return { kind: "unavailable", reason: "no root directory URL" } + } return findFileInDirectoryHierarchy( this.repo, rootDirectoryUrl, diff --git a/src/core/sync-engine.ts b/src/core/sync-engine.ts index e0dc773..e77e311 100644 --- a/src/core/sync-engine.ts +++ b/src/core/sync-engine.ts @@ -30,7 +30,10 @@ import { getPlainUrl, updateTextContent, readDocContent, + writeSyncLock, + clearSyncLock, } from "../utils" +import type {RecoveryReason} from "../utils/repo-factory" import {isContentEqual, contentHash} from "../utils/content" import {waitForSync, waitForBidirectionalSync} from "../utils/network-sync" import {SnapshotManager} from "./snapshot" @@ -134,13 +137,28 @@ export class SyncEngine { // Path depth determines sync order (deepest first) private handlesByPath: Map> = new Map() private config: DirectoryConfig + /** + * True when the next sync must rehydrate or catch up before running + * ordinary change detection. Set when the repo was created from a + * wiped cache (torn-write) or when the previous sync's lock file + * was still present (incomplete previous sync). The gate in sync() + * interprets this together with `recoveryReason`. + */ + private requiresRehydrate: boolean + private recoveryReason: RecoveryReason constructor( private repo: Repo, private rootPath: string, - config: DirectoryConfig + config: DirectoryConfig, + options: { + requiresRehydrate?: boolean + recoveryReason?: RecoveryReason + } = {} ) { this.config = config + this.requiresRehydrate = options.requiresRehydrate ?? false + this.recoveryReason = options.recoveryReason ?? null this.snapshotManager = new SnapshotManager(rootPath) this.changeDetector = new ChangeDetector( repo, @@ -221,6 +239,127 @@ export class SyncEngine { await this.snapshotManager.save(snapshot) } + /** + * Remove a tracked path from the snapshot. Used by the `rm-tracked` + * command to let the user give up on a chronically unavailable path. + * + * - If `deleteLocal` is true, the local file is also removed from disk. + * - If `deleteLocal` is false (default), the local file is left alone. + * + * Either way, the snapshot entry is removed AND the entry is removed + * from the parent directory document on the remote side (so a peer + * observing the directory won't keep trying to sync a doc we've + * disavowed). The orphaned FileDocument is left on the server. + * + * Returns `true` if a tracked entry was found and removed, `false` + * if the path was not tracked. + */ + async removeTrackedPath( + relativePath: string, + options: { deleteLocal?: boolean } = {} + ): Promise { + const snapshot = await this.snapshotManager.load() + if (!snapshot) return false + if (!snapshot.files.has(relativePath)) return false + + debug(`rm-tracked: removing ${relativePath} (deleteLocal=${!!options.deleteLocal})`) + + // Remove entry from parent directory doc (best effort; the + // snapshot-level removal is authoritative for pushwork's state). + try { + await this.removeFileFromDirectory(snapshot, relativePath) + } catch (e) { + debug(`rm-tracked: failed to remove from parent directory: ${e}`) + } + + this.snapshotManager.removeFileEntry(snapshot, relativePath) + + if (options.deleteLocal) { + const localPath = joinAndNormalizePath(this.rootPath, relativePath) + try { + await removePath(localPath) + } catch (e) { + debug(`rm-tracked: local delete failed (may be absent): ${e}`) + } + } + + await this.snapshotManager.save(snapshot) + return true + } + + /** + * Force-recreate the remote document for `relativePath` from the + * current local content. Used by the `resync` command when the + * snapshot's URL points to a document that's chronically + * unavailable — rather than give up and delete, we mint a fresh doc + * and rewire the parent directory entry to point at it. + * + * Returns `true` on success, `false` if the local file is missing. + */ + async resyncPath(relativePath: string): Promise { + const snapshot = await this.snapshotManager.load() + if (!snapshot) return false + + const existing = snapshot.files.get(relativePath) + if (!existing) { + debug(`resync: ${relativePath} not tracked`) + return false + } + + // Read current local content. + const localPath = joinAndNormalizePath(this.rootPath, relativePath) + let content: string | Uint8Array + try { + const { readFileContent } = await import("../utils/fs") + content = await readFileContent(localPath) + } catch (e) { + debug(`resync: failed to read local ${localPath}: ${e}`) + return false + } + + const fakeChange: DetectedChange = { + path: relativePath, + changeType: ChangeType.LOCAL_ONLY, + fileType: + typeof content === "string" ? FileType.TEXT : FileType.BINARY, + localContent: content, + remoteContent: null, + } + + debug(`resync: creating fresh doc for ${relativePath}`) + const newHandle = await this.createRemoteFile(fakeChange) + if (!newHandle) return false + + // Replace the directory entry with the new URL. + const parentDir = relativePath.includes("/") + ? relativePath.slice(0, relativePath.lastIndexOf("/")) + : "" + try { + await this.removeFileFromDirectory(snapshot, relativePath) + } catch (e) { + debug(`resync: stale parent-dir removal failed (OK): ${e}`) + } + await this.ensureDirectoryDocument(snapshot, parentDir) + const entryUrl = this.getEntryUrl(newHandle, relativePath) + await this.addFileToDirectory(snapshot, relativePath, entryUrl) + + // Update snapshot. + this.snapshotManager.updateFileEntry(snapshot, relativePath, { + path: localPath, + url: entryUrl, + head: newHandle.heads(), + extension: getFileExtension(relativePath), + mimeType: getEnhancedMimeType(relativePath), + ...(this.isArtifactPath(relativePath) + ? { contentHash: contentHash(content) } + : {}), + consecutiveUnavailableCount: 0, + }) + + await this.snapshotManager.save(snapshot) + return true + } + /** * Reset the snapshot, clearing all tracked files and directories. * Preserves the rootDirectoryUrl so sync can still operate. @@ -318,6 +457,213 @@ export class SyncEngine { } } + /** + * Fetch every document referenced by the snapshot and wait for each + * to become ready. Used as a post-recovery gate when the local + * automerge cache was wiped (torn-write recovery) to ensure change + * detection sees a full remote view rather than mistaking unavailable + * documents for deletions. + * + * Retries each document with exponential backoff. Returns `ok: false` + * with the list of still-unavailable paths if any document cannot be + * fetched; callers should abort the sync in that case. + * + * Does NOT verify directory contents or file content validity — only + * that `repo.find(url)` resolves and `doc()` returns a non-nullish + * document. Further validation happens in change detection. + */ + private async rehydrateTrackedDocuments( + snapshot: SyncSnapshot + ): Promise<{ ok: true } | { ok: false; missing: string[] }> { + const maxAttempts = 6 + const baseDelayMs = 500 + const maxDelayMs = 5000 + + // Collect all URLs to rehydrate: root, tracked directories, tracked files. + const targets: { path: string; url: AutomergeUrl }[] = [] + if (snapshot.rootDirectoryUrl) { + targets.push({ path: "", url: getPlainUrl(snapshot.rootDirectoryUrl) }) + } + for (const [dirPath, entry] of snapshot.directories.entries()) { + targets.push({ path: dirPath || "(root)", url: getPlainUrl(entry.url) }) + } + for (const [filePath, entry] of snapshot.files.entries()) { + targets.push({ path: filePath, url: getPlainUrl(entry.url) }) + } + + debug( + `rehydrate: fetching ${targets.length} documents from sync server` + ) + + const missing: string[] = [] + let checked = 0 + const reportEvery = Math.max(1, Math.floor(targets.length / 10)) + + await Promise.all( + targets.map(async ({ path: docPath, url }) => { + let lastError: unknown + for (let attempt = 0; attempt < maxAttempts; attempt++) { + try { + const handle = await this.repo.find(url) + const doc = handle.doc() + if (doc != null) { + checked++ + if (checked % reportEvery === 0) { + out.taskLine( + `Rehydrated ${checked}/${targets.length}`, + false + ) + } + return + } + lastError = new Error("doc() returned nullish") + } catch (e) { + lastError = e + } + const delay = Math.min( + baseDelayMs * Math.pow(2, attempt), + maxDelayMs + ) + await new Promise(r => setTimeout(r, delay)) + } + debug( + `rehydrate: FAILED ${docPath} after ${maxAttempts} attempts (${lastError})` + ) + missing.push(docPath) + }) + ) + + if (missing.length > 0) { + return { ok: false, missing } + } + return { ok: true } + } + + /** + * Update per-file `consecutiveUnavailableCount` based on what the + * most recent `detectChanges` observed, then decide whether sync + * can proceed. + * + * - For paths in `getLastSkippedUnavailablePaths()`: increment. + * - For paths in `getLastConfirmedPaths()`: reset to 0. + * + * Escalation policy: + * count >= WARN_AT: prominent warning with recovery hints + * count >= BLOCK_AT: hard error; sync aborts until user resolves + */ + private updateUnavailableCountsAndGate( + snapshot: SyncSnapshot, + result: SyncResult + ): "proceed" | "abort" { + const WARN_AT = 5 + const BLOCK_AT = 20 + + const skipped = this.changeDetector.getLastSkippedUnavailablePaths() + const confirmed = this.changeDetector.getLastConfirmedPaths() + + // Reset counters for confirmed paths. + for (const p of confirmed) { + const entry = snapshot.files.get(p) + if (entry && entry.consecutiveUnavailableCount) { + debug(`chronic-unavail: ${p} recovered; resetting count`) + entry.consecutiveUnavailableCount = 0 + } + } + + // Increment counters for skipped paths, collect warn/block lists. + const blocked: { path: string; count: number }[] = [] + const warning: { path: string; count: number }[] = [] + for (const p of skipped) { + const entry = snapshot.files.get(p) + if (!entry) continue + const next = (entry.consecutiveUnavailableCount ?? 0) + 1 + entry.consecutiveUnavailableCount = next + debug(`chronic-unavail: ${p} unavailable; count=${next}`) + if (next >= BLOCK_AT) { + blocked.push({ path: p, count: next }) + } else if (next >= WARN_AT) { + warning.push({ path: p, count: next }) + } + } + + // Emit warnings for paths that crossed the warn threshold. + for (const { path: p, count } of warning) { + out.taskLine( + `File ${p} has been unavailable on the remote for ${count} consecutive syncs. ` + + `Possible causes: server unhealthy, document orphaned, or peer deleted the file ` + + `(deletion not confirmed so pushwork is preserving it locally). ` + + `Recovery: \`pushwork rm-tracked ${p}\` to give up, \`pushwork resync ${p}\` to re-push local content.`, + true + ) + result.warnings.push( + `${p}: unavailable ${count} consecutive syncs (warning)` + ) + } + + // Hard-abort if any path reached the block threshold. + if (blocked.length > 0) { + const msg = + `${blocked.length} file(s) have been unavailable on the remote for 20+ consecutive syncs. ` + + `Sync blocked to prevent silent staleness. Run \`pushwork status --verbose\` to inspect, ` + + `then resolve each with \`pushwork rm-tracked \` or \`pushwork resync \`.` + out.taskLine(msg, true) + for (const { path: p, count } of blocked) { + out.taskLine(` ${p} (count=${count})`, true) + } + result.errors.push({ + path: "sync", + operation: "chronic-unavailable", + error: new Error(msg), + recoverable: true, + }) + result.success = false + return "abort" + } + + return "proceed" + } + + /** + * Catch-up pull for incomplete-sync recovery. + * + * Used when a sync.lock was left from an unclean previous exit. We + * can't assume the local filesystem reflects all the remote changes + * that arrived during (or after) the interrupted run — so before + * ordinary change detection we pull any outstanding remote changes. + * + * This is idempotent: running it when there are no remote changes is + * a no-op. If the process dies again during catch-up, the sync.lock + * (still present from sync() startup) stays in place and the next + * run repeats the catch-up. + * + * We deliberately run only REMOTE_ONLY changes here — not + * BOTH_CHANGED, which is handled by the full sync's merge path. + * Applying BOTH_CHANGED in the catch-up phase risks overwriting a + * legitimate local edit with stale remote content in cases where + * change-detection misattributes. + */ + private async catchUpRemoteChanges( + snapshot: SyncSnapshot + ): Promise { + const changes = await this.changeDetector.detectChanges(snapshot) + const remoteOnly = changes.filter( + c => c.changeType === ChangeType.REMOTE_ONLY + ) + debug( + `catch-up: ${changes.length} changes detected, ${remoteOnly.length} remote-only to apply` + ) + if (remoteOnly.length === 0) { + return { + success: true, + filesChanged: 0, + directoriesChanged: 0, + errors: [], + warnings: [], + } + } + return this.pullRemoteChanges(remoteOnly, snapshot) + } + /** * Recreate documents that failed to sync. Creates new Automerge documents * with the same content and updates all references (snapshot, parent directory). @@ -441,6 +787,19 @@ export class SyncEngine { // Reset tracked handles for sync this.handlesByPath = new Map() + // Write the sync-in-progress marker. If this sync does NOT exit + // cleanly (Ctrl-C, crash, SIGKILL), the marker persists and the + // next startup will treat this as an incomplete-sync recovery and + // run a catch-up pull before ordinary change detection. + const pushworkDir = path.join(this.rootPath, ".pushwork") + try { + await writeSyncLock(pushworkDir) + } catch (e) { + debug(`sync: failed to write sync.lock: ${e}`) + // Non-fatal: proceed anyway. Worst case we lose the recovery + // signal if this sync dies uncleanly. + } + try { // Load current snapshot const snapshot = @@ -498,6 +857,90 @@ export class SyncEngine { } } + // Phase 3a: rehydrate gate after torn-write recovery. + // + // If the local automerge cache was wiped due to corrupt storage, + // every snapshot-tracked document must be re-fetched from the + // sync server before change detection is safe. Running + // detectChanges while documents are still in flight causes + // change-detection to observe `remoteContent: null` for + // unavailable docs, which (absent the Phase 1 guard) would + // manifest as spurious local file deletions. + // + // We abort the sync if rehydration cannot complete within the + // retry window. Aborting is safer than proceeding: the user's + // local files remain intact and the next sync can try again. + if ( + this.requiresRehydrate && + this.config.sync_enabled && + snapshot.rootDirectoryUrl + ) { + if (this.recoveryReason === "torn-write") { + out.update("Rehydrating documents after cache recovery") + debug( + `sync: rehydrate-gate (torn-write): ${snapshot.files.size} files + ${snapshot.directories.size} directories` + ) + const rehydrated = await this.rehydrateTrackedDocuments(snapshot) + if (!rehydrated.ok) { + const msg = + `Cannot verify remote state after cache recovery: ` + + `${rehydrated.missing.length} document(s) unavailable ` + + `(${rehydrated.missing.slice(0, 5).join(", ")}` + + `${rehydrated.missing.length > 5 ? "..." : ""}). ` + + `Sync aborted; local files untouched. Retry later.` + debug(`sync: rehydrate-gate ABORT: ${msg}`) + out.taskLine(msg, true) + result.success = false + result.errors.push({ + path: "sync", + operation: "rehydrate", + error: new Error(msg), + recoverable: true, + }) + return result + } + debug("sync: rehydrate-gate passed, proceeding to change detection") + } else if (this.recoveryReason === "incomplete-sync") { + // Previous sync exited uncleanly (Ctrl-C, crash, SIGKILL). + // Before running ordinary change detection, pull any + // remote changes that arrived during or after the + // interrupted run. Otherwise local-change detection + // may see stale local content (pre-interrupted-pull) + // and push it back over newer remote state. + out.update("Catching up on remote changes after incomplete sync") + debug( + `sync: catch-up pull (incomplete-sync): ${snapshot.files.size} files + ${snapshot.directories.size} directories` + ) + try { + const caughtUp = await this.catchUpRemoteChanges(snapshot) + debug( + `sync: catch-up complete: ${caughtUp.filesChanged} file(s) pulled, ${caughtUp.errors.length} errors` + ) + if (caughtUp.errors.length > 0) { + // Non-fatal: log and proceed. If critical docs are + // truly unavailable the normal pull phase will also + // fail cleanly and Phase 1/2 guards will prevent + // destructive actions. + result.warnings.push( + ...caughtUp.errors.map( + e => `catch-up pull for ${e.path}: ${e.error.message}` + ) + ) + } + } catch (e) { + debug(`sync: catch-up pull threw: ${e}`) + out.taskLine( + `Catch-up pull failed: ${e instanceof Error ? e.message : e}. Proceeding with normal sync.`, + true + ) + } + } + // One-shot: don't re-run the gate on subsequent syncs in + // the same process. + this.requiresRehydrate = false + this.recoveryReason = null + } + // Detect all changes debug("sync: detecting changes") out.update("Detecting local and remote changes") @@ -505,6 +948,15 @@ export class SyncEngine { const prePushFilePaths = new Set(snapshot.files.keys()) const changes = await this.changeDetector.detectChanges(snapshot) + // Phase 5.i: update the consecutive-unavailable counter for + // each tracked file based on what change-detection observed. + // Then decide whether to warn (N=5) or hard-abort (N=20). + const gateResult = this.updateUnavailableCountsAndGate(snapshot, result) + if (gateResult === "abort") { + await this.snapshotManager.save(snapshot) // persist updated counts + return result + } + // Detect moves const {moves, remainingChanges} = await this.moveDetector.detectMoves( changes, @@ -727,6 +1179,17 @@ export class SyncEngine { recoverable: false, }) return result + } finally { + // Clear the sync-in-progress marker. We reach this block on + // ALL normal termination paths (success, caught error, explicit + // return). If the process dies without running this (SIGKILL, + // crash, power loss), the marker persists and next startup + // treats that as an incomplete-sync recovery. + try { + await clearSyncLock(pushworkDir) + } catch (e) { + debug(`sync: failed to clear sync.lock: ${e}`) + } } } @@ -1040,22 +1503,87 @@ export class SyncEngine { ) } + // Determine the content to write. Most changes carry it directly + // on `change.remoteContent`, but artifact-path changes deliberately + // skip content reads in change detection and set `remoteContent` + // to null. For those, we must fetch the content here before + // deciding what to do. + let contentToWrite: string | Uint8Array | null = change.remoteContent + + if (contentToWrite === null && this.isArtifactPath(change.path) && !change.confirmedAbsent) { + // Artifact path with no content carried: fetch it live. + // "null content" here means "we skipped the read", NOT "delete + // the local file". + const artifactUrl = change.remoteUrl ?? snapshot.files.get(change.path)?.url + if (artifactUrl) { + try { + const handle = await this.repo.find(getPlainUrl(artifactUrl)) + contentToWrite = readDocContent(handle.doc()?.content) + } catch (e) { + debug( + `skip-delete path=${change.path} reason=artifact-fetch-failed: ${e}` + ) + out.taskLine( + `Preserved artifact ${change.path}: remote fetch failed (${e})`, + true + ) + return + } + } + + if (contentToWrite === null) { + debug( + `skip-delete path=${change.path} reason=artifact-content-unavailable` + ) + out.taskLine( + `Preserved artifact ${change.path}: remote content unavailable`, + true + ) + return + } + } + // Check for null (empty string/Uint8Array are valid content) - if (change.remoteContent === null) { - // File was deleted remotely + if (contentToWrite === null) { + // A local deletion is only safe when we have positive evidence + // that the file is genuinely absent from the authoritative + // remote state (e.g. the parent directory document was + // successfully read and the file was not in its entries). + // + // `remoteContent === null` by itself is ambiguous: it can mean + // the doc was unavailable, the fetch timed out, the parent + // directory had not yet synced, or a parse error occurred. + // Treating any of these as a deletion causes data loss during + // torn-write recovery and slow-server syncs. + if (!change.confirmedAbsent) { + out.taskLine( + `Preserved ${change.path}: remote content unavailable, deletion not confirmed`, + true + ) + debug( + `skip-delete path=${change.path} reason=unconfirmed-absent` + ) + return + } + // File was deleted remotely (confirmed via directory read) + debug(`apply-delete path=${change.path} reason=confirmed-absent`) await removePath(localPath) this.snapshotManager.removeFileEntry(snapshot, change.path) return } // Create or update local file - await writeFileContent(localPath, change.remoteContent) + await writeFileContent(localPath, contentToWrite) // Update or create snapshot entry for this file const snapshotEntry = snapshot.files.get(change.path) if (snapshotEntry) { // Update existing entry snapshotEntry.head = change.remoteHead + // Successful pull: clear any chronic-unavailability count. + if (snapshotEntry.consecutiveUnavailableCount) { + snapshotEntry.consecutiveUnavailableCount = 0 + } // If the remote document was replaced (new URL), update the snapshot URL if (change.remoteUrl) { const fileHandle = await this.repo.find(change.remoteUrl) @@ -1066,14 +1594,14 @@ export class SyncEngine { // We need to find the remote file's URL from the directory hierarchy if (snapshot.rootDirectoryUrl) { try { - const fileEntry = await findFileInDirectoryHierarchy( + const lookup = await findFileInDirectoryHierarchy( this.repo, snapshot.rootDirectoryUrl, change.path ) - if (fileEntry) { - const fileHandle = await this.repo.find(fileEntry.url) + if (lookup.kind === "found") { + const fileHandle = await this.repo.find(lookup.entry.url) const entryUrl = this.getEntryUrl(fileHandle, change.path) this.snapshotManager.updateFileEntry(snapshot, change.path, { path: localPath, diff --git a/src/types/documents.ts b/src/types/documents.ts index b675d4b..e70c714 100644 --- a/src/types/documents.ts +++ b/src/types/documents.ts @@ -88,4 +88,18 @@ export interface DetectedChange { remoteHead?: UrlHeads /** New remote URL when the remote document was replaced (artifact URL change) */ remoteUrl?: AutomergeUrl + /** + * True if this change was emitted on the basis of a successful read of + * the authoritative remote state (e.g. the parent directory document + * was read and the file was observed to be absent from its entries). + * + * False or undefined means the change was inferred from a failed read, + * timeout, or missing document — which can happen transiently during + * torn-write recovery, slow-server syncs, or propagation delays. + * + * Consumers that perform destructive operations (e.g. deleting a local + * file on the basis of `remoteContent === null`) MUST check this flag + * and refuse to act on unconfirmed absences. + */ + confirmedAbsent?: boolean } diff --git a/src/types/snapshot.ts b/src/types/snapshot.ts index 71f751c..41ef038 100644 --- a/src/types/snapshot.ts +++ b/src/types/snapshot.ts @@ -10,6 +10,18 @@ export interface SnapshotFileEntry { extension: string; // File extension mimeType: string; // MIME type contentHash?: string; // SHA-256 of content at last sync (used by artifact files to skip remote reads) + /** + * Number of consecutive syncs for which this path could not be + * looked up on the remote (RemoteLookup returned `unavailable`). + * + * Reset to 0 on any successful `found` or `absent` lookup AND after + * a successful pull that wrote this file's content locally. + * + * At 5, pushwork prints a prominent warning with recovery hints. + * At 20, the sync is blocked with a hard error until the user + * resolves via `pushwork rm-tracked ` or `pushwork resync `. + */ + consecutiveUnavailableCount?: number; } /** diff --git a/src/utils/directory.ts b/src/utils/directory.ts index 52e7a21..f5277b4 100644 --- a/src/utils/directory.ts +++ b/src/utils/directory.ts @@ -4,7 +4,7 @@ import { parseAutomergeUrl, stringifyAutomergeUrl, } from "@automerge/automerge-repo"; -import { DirectoryDocument } from "../types"; +import { DirectoryDocument, DirectoryEntry } from "../types"; /** * Get a plain URL (without heads) from any URL. @@ -16,6 +16,27 @@ export function getPlainUrl(url: AutomergeUrl): AutomergeUrl { return stringifyAutomergeUrl({ documentId }); } +/** + * Result of a remote directory lookup. Distinguishes three cases that + * callers must handle differently: + * + * - `found`: the authoritative directory document was read and the target + * file entry was present in it. + * - `absent`: the authoritative directory document was read and the target + * file entry was NOT present in it. This is positive evidence that the + * file was removed from the remote directory. + * - `unavailable`: the lookup could not be completed (document not yet + * synced, fetch timed out, parse error, etc.). The caller does NOT know + * whether the file is present or absent remotely. + * + * Destructive operations (e.g. deleting a local file) must only act on + * `absent`, never on `unavailable`. + */ +export type RemoteLookup = + | { kind: "found"; entry: DirectoryEntry } + | { kind: "absent" } + | { kind: "unavailable"; reason: string }; + /** * Find a file in the directory hierarchy by path. * @@ -26,48 +47,97 @@ export function getPlainUrl(url: AutomergeUrl): AutomergeUrl { * 2. These URLs may have been captured when the subdirectory was empty * 3. Using versioned URLs would make files appear to not exist * 4. This would trigger false "remote deletion" detection + * + * Returns a tri-state `RemoteLookup`: + * - `found` when the directory doc was read and the file entry exists + * - `absent` when the directory doc was read and the file entry does NOT exist + * - `unavailable` when any directory doc along the path could not be read + * + * Never conflates "not in directory" with "could not read directory". */ export async function findFileInDirectoryHierarchy( repo: Repo, directoryUrl: AutomergeUrl, filePath: string -): Promise<{ name: string; type: string; url: AutomergeUrl } | null> { - try { - const pathParts = filePath.split("/"); - let currentDirUrl = getPlainUrl(directoryUrl); +): Promise { + const pathParts = filePath.split("/"); + let currentDirUrl = getPlainUrl(directoryUrl); + + // Navigate through directories to find the parent directory + for (let i = 0; i < pathParts.length - 1; i++) { + const dirName = pathParts[i]; + let dirDoc: DirectoryDocument | undefined; - // Navigate through directories to find the parent directory - for (let i = 0; i < pathParts.length - 1; i++) { - const dirName = pathParts[i]; + try { const dirHandle = await repo.find(currentDirUrl); - const dirDoc = await dirHandle.doc(); + dirDoc = dirHandle.doc(); + } catch (error) { + return { + kind: "unavailable", + reason: `failed to fetch intermediate directory at ${pathParts.slice(0, i + 1).join("/")}: ${error}`, + }; + } - if (!dirDoc) return null; + if (!dirDoc) { + return { + kind: "unavailable", + reason: `intermediate directory not ready at ${pathParts.slice(0, i + 1).join("/")}`, + }; + } - const subDirEntry = dirDoc.docs.find( - (entry: { name: string; type: string; url: AutomergeUrl }) => - entry.name === dirName && entry.type === "folder" - ); + const subDirEntry = dirDoc.docs.find( + (entry: { name: string; type: string; url: AutomergeUrl }) => + entry.name === dirName && entry.type === "folder" + ); - if (!subDirEntry) return null; - currentDirUrl = getPlainUrl(subDirEntry.url); + // The directory was read successfully but the intermediate folder is + // not in its listing. From the caller's perspective this means the + // target path is absent from the remote hierarchy — whoever was + // holding it removed the whole parent folder. + if (!subDirEntry) { + return { kind: "absent" }; } + currentDirUrl = getPlainUrl(subDirEntry.url); + } + + // Now look for the file in the final directory + const fileName = pathParts[pathParts.length - 1]; + let finalDirDoc: DirectoryDocument | undefined; - // Now look for the file in the final directory - const fileName = pathParts[pathParts.length - 1]; + try { const finalDirHandle = await repo.find(currentDirUrl); - const finalDirDoc = await finalDirHandle.doc(); + finalDirDoc = finalDirHandle.doc(); + } catch (error) { + return { + kind: "unavailable", + reason: `failed to fetch parent directory of ${filePath}: ${error}`, + }; + } - if (!finalDirDoc) return null; + if (!finalDirDoc) { + return { + kind: "unavailable", + reason: `parent directory not ready for ${filePath}`, + }; + } - const fileEntry = finalDirDoc.docs.find( - (entry: { name: string; type: string; url: AutomergeUrl }) => - entry.name === fileName && entry.type === "file" - ); + const fileEntry = finalDirDoc.docs.find( + (entry: { name: string; type: string; url: AutomergeUrl }) => + entry.name === fileName && entry.type === "file" + ); - return fileEntry || null; - } catch (error) { - // Failed to find file in hierarchy - return null; + if (!fileEntry) { + return { kind: "absent" }; } + + // Spread into a plain object so callers never hold onto an Automerge + // proxy past the dirDoc.docs iteration. + return { + kind: "found", + entry: { + name: fileEntry.name, + type: fileEntry.type as "file" | "folder", + url: fileEntry.url, + }, + }; } diff --git a/src/utils/index.ts b/src/utils/index.ts index 454bb58..4807298 100644 --- a/src/utils/index.ts +++ b/src/utils/index.ts @@ -2,3 +2,4 @@ export * from "./fs" export * from "./mime-types" export * from "./directory" export * from "./text-diff" +export * from "./sync-lock" diff --git a/src/utils/repo-factory.ts b/src/utils/repo-factory.ts index 4ec1697..861150b 100644 --- a/src/utils/repo-factory.ts +++ b/src/utils/repo-factory.ts @@ -3,6 +3,7 @@ import { NodeFSStorageAdapter } from "@automerge/automerge-repo-storage-nodefs"; import * as fs from "fs/promises"; import * as path from "path"; import { DirectoryConfig } from "../types"; +import { readSyncLock, isStaleSyncLock, clearSyncLock } from "./sync-lock"; /** * Perform a real ESM dynamic import that tsc won't rewrite to require(). @@ -76,6 +77,36 @@ async function hasCorruptStorage(dir: string): Promise { return false; } +/** + * Reason the next sync needs to rehydrate / catch up before running + * normal change detection. + * + * - `torn-write`: 0-byte file(s) detected in .pushwork/automerge/, + * the cache was wiped. Every document must be re-fetched from the + * sync server before change detection is safe. + * - `incomplete-sync`: a `.pushwork/sync.lock` marker was present at + * startup, indicating the previous sync did not exit cleanly + * (Ctrl-C, crash, SIGKILL, etc.). A catch-up pull is required + * before ordinary sync to avoid overwriting remote changes that + * arrived during the interrupted run. + * - `null`: no recovery needed; previous sync (if any) completed cleanly. + */ +export type RecoveryReason = "torn-write" | "incomplete-sync" | null; + +/** + * Result of `createRepo`. + * + * `requiresRehydrate` is true when the next sync must perform extra + * steps before running ordinary change detection — either a full + * rehydrate from the server (torn write) or a catch-up pull of remote + * changes (incomplete previous sync). See `sync-engine.ts`. + */ +export interface CreateRepoResult { + repo: Repo; + requiresRehydrate: boolean; + recoveryReason: RecoveryReason; +} + /** * Create an Automerge repository with configuration-based setup. * @@ -91,7 +122,7 @@ export async function createRepo( workingDir: string, config: DirectoryConfig, sub: boolean = false -): Promise { +): Promise { const RepoClass = await getRepoClass(); const syncToolDir = path.join(workingDir, ".pushwork"); @@ -100,10 +131,26 @@ export async function createRepo( // Detect and recover from corrupt local storage (0-byte files left by // incomplete writes from a previous run). Wipe the cache so the Repo // hydrates cleanly from the sync server. + let recoveryReason: RecoveryReason = null; if (await hasCorruptStorage(automergeDir)) { console.warn("[pushwork] Corrupt local storage detected, clearing cache..."); await fs.rm(automergeDir, { recursive: true, force: true }); await fs.mkdir(automergeDir, { recursive: true }); + recoveryReason = "torn-write"; + } else { + // Check for a stale sync.lock left over from an unclean exit. A + // live lock (e.g. a second pushwork process running concurrently) + // is NOT treated as incomplete-sync — only stale locks are. + const lock = await readSyncLock(syncToolDir); + if (lock !== null && isStaleSyncLock(lock)) { + console.warn( + `[pushwork] Previous sync did not complete cleanly (pid=${lock.pid}, age=${Math.round( + (Date.now() - lock.startedAt) / 1000 + )}s). Will run catch-up pull before normal sync.` + ); + await clearSyncLock(syncToolDir); + recoveryReason = "incomplete-sync"; + } } const storage = new NodeFSStorageAdapter(automergeDir); @@ -114,10 +161,15 @@ export async function createRepo( endpoints.push(config.sync_server); } - return new RepoClass({ + const repo = new RepoClass({ storage, subductionWebsocketEndpoints: endpoints, }); + return { + repo, + requiresRehydrate: recoveryReason !== null, + recoveryReason, + }; } // Default: WebSocket sync adapter @@ -138,5 +190,10 @@ export async function createRepo( repoConfig.network = [networkAdapter]; } - return new RepoClass(repoConfig); + const repo = new RepoClass(repoConfig); + return { + repo, + requiresRehydrate: recoveryReason !== null, + recoveryReason, + }; } diff --git a/src/utils/sync-lock.ts b/src/utils/sync-lock.ts new file mode 100644 index 0000000..69e1064 --- /dev/null +++ b/src/utils/sync-lock.ts @@ -0,0 +1,103 @@ +/** + * Sync-in-progress marker. + * + * A `.pushwork/sync.lock` file is written at the start of every sync + * operation and cleared on clean completion. If the file is still + * present at startup, it means the previous sync did NOT finish + * cleanly (Ctrl-C, SIGKILL, OOM, crash, power loss, etc.). Pushwork + * treats that as "requires rehydrate" and runs a catch-up pull before + * ordinary change detection — see `SyncEngine.sync`. + * + * The lock records PID and start time so stale locks from dead + * processes can be recognized and ignored. + */ + +import * as fs from "fs/promises"; +import * as path from "path"; + +export interface SyncLock { + /** Process ID that wrote the lock. */ + pid: number; + /** Unix millisecond timestamp when the lock was written. */ + startedAt: number; +} + +/** Max lock age before it's treated as stale regardless of PID liveness. */ +const MAX_LOCK_AGE_MS = 24 * 60 * 60 * 1000; + +/** + * Path to the lock file for a given `.pushwork` directory. + */ +export function syncLockPath(pushworkDir: string): string { + return path.join(pushworkDir, "sync.lock"); +} + +/** + * Write the sync-in-progress marker. Overwrites any existing lock. + */ +export async function writeSyncLock(pushworkDir: string): Promise { + const lock: SyncLock = { + pid: process.pid, + startedAt: Date.now(), + }; + await fs.mkdir(pushworkDir, { recursive: true }); + await fs.writeFile(syncLockPath(pushworkDir), JSON.stringify(lock), "utf8"); +} + +/** + * Read the sync lock if it exists. Returns `null` when no lock is + * present or the file is malformed. + */ +export async function readSyncLock( + pushworkDir: string +): Promise { + try { + const raw = await fs.readFile(syncLockPath(pushworkDir), "utf8"); + const parsed = JSON.parse(raw); + if ( + typeof parsed !== "object" || + parsed === null || + typeof parsed.pid !== "number" || + typeof parsed.startedAt !== "number" + ) { + return null; + } + return parsed as SyncLock; + } catch { + return null; + } +} + +/** + * Remove the sync lock. Idempotent: succeeds even if the lock is absent. + */ +export async function clearSyncLock(pushworkDir: string): Promise { + try { + await fs.unlink(syncLockPath(pushworkDir)); + } catch (e: any) { + if (e?.code !== "ENOENT") throw e; + } +} + +/** + * True if the given lock appears to be stale — either it's older than + * MAX_LOCK_AGE_MS or its recorded PID is no longer alive. + * + * Note: `process.kill(pid, 0)` is a no-op signal check. It succeeds if + * the process exists and we have permission to signal it, and throws + * ESRCH if the process is gone. We treat EPERM (permission) as "exists" + * to be conservative — if another user owns that PID, the lock is not + * necessarily ours to invalidate. + */ +export function isStaleSyncLock(lock: SyncLock, now: number = Date.now()): boolean { + if (now - lock.startedAt > MAX_LOCK_AGE_MS) return true; + if (lock.pid === process.pid) return false; + try { + process.kill(lock.pid, 0); + return false; // process is alive + } catch (e: any) { + if (e?.code === "ESRCH") return true; // process gone + if (e?.code === "EPERM") return false; // exists, not ours to kill + return true; // unknown error: treat as stale + } +} diff --git a/test/integration/chronic-unavailable.test.ts b/test/integration/chronic-unavailable.test.ts new file mode 100644 index 0000000..a3c7744 --- /dev/null +++ b/test/integration/chronic-unavailable.test.ts @@ -0,0 +1,210 @@ +/** + * Integration test for the Phase 5.i user-facing levers: + * - `pushwork status --verbose` reports chronically unavailable paths + * - `pushwork rm-tracked ` removes the tracked entry + * - `pushwork rm-tracked --keep-local` preserves the local file + * + * We simulate chronic unavailability by directly editing + * `.pushwork/snapshot.json` to set `consecutiveUnavailableCount` on a + * tracked file entry. This exercises the user-facing commands without + * requiring a flaky sync server. + */ + +import * as fs from "fs/promises"; +import * as path from "path"; +import * as tmp from "tmp"; +import { execSync } from "child_process"; + +describe("chronic-unavailable recovery levers", () => { + let tmpDir: string; + let cleanup: () => void; + const pushworkCmd = `node "${path.join(__dirname, "../../dist/cli.js")}"`; + + beforeAll(() => { + execSync("pnpm build", { + cwd: path.join(__dirname, "../.."), + stdio: "pipe", + }); + }); + + beforeEach(() => { + const tmpObj = tmp.dirSync({ unsafeCleanup: true }); + tmpDir = tmpObj.name; + cleanup = tmpObj.removeCallback; + }); + + afterEach(() => { + cleanup(); + }); + + /** + * Load snapshot.json, apply a mutation, and write it back. Useful + * for seeding test state (consecutiveUnavailableCount, etc). + */ + async function mutateSnapshot( + mutator: (raw: any) => void + ): Promise { + const snapPath = path.join(tmpDir, ".pushwork", "snapshot.json"); + const raw = JSON.parse(await fs.readFile(snapPath, "utf8")); + mutator(raw); + await fs.writeFile(snapPath, JSON.stringify(raw, null, 2), "utf8"); + } + + /** + * Construct a minimal pushwork state without calling `pushwork init`. + * We can't rely on the CLI init here because it connects to a real + * sync server and blocks; this test only exercises local-only + * commands (status, rm-tracked, resync) so we can fabricate the + * snapshot directly. + * + * The fabricated snapshot has plausible-looking Automerge URLs but + * no actual documents backing them — commands that don't touch the + * repo (like rm-tracked on a non-artifact path) work fine. + */ + async function initTestRepo(files: Record): Promise { + for (const [p, contents] of Object.entries(files)) { + const full = path.join(tmpDir, p); + await fs.mkdir(path.dirname(full), { recursive: true }); + await fs.writeFile(full, contents); + } + + const pushworkDir = path.join(tmpDir, ".pushwork"); + await fs.mkdir(pushworkDir, { recursive: true }); + await fs.mkdir(path.join(pushworkDir, "automerge"), { recursive: true }); + + await fs.writeFile( + path.join(pushworkDir, "config.json"), + JSON.stringify( + { + sync_enabled: false, + exclude_patterns: [ + ".git", + "node_modules", + "*.tmp", + ".pushwork", + ".DS_Store", + ], + artifact_directories: ["dist"], + sync: { move_detection_threshold: 0.7 }, + }, + null, + 2 + ) + ); + + // Generate the Automerge URLs via the installed library so they + // pass validation. + const { generateAutomergeUrl } = await import("@automerge/automerge-repo"); + const rootUrl = generateAutomergeUrl(); + + const fileEntries: Array<[string, unknown]> = []; + for (const p of Object.keys(files)) { + fileEntries.push([ + p, + { + path: path.join(tmpDir, p), + url: generateAutomergeUrl(), + head: [], + extension: path.extname(p).slice(1), + mimeType: "text/plain", + }, + ]); + } + + await fs.writeFile( + path.join(pushworkDir, "snapshot.json"), + JSON.stringify( + { + timestamp: Date.now(), + rootPath: tmpDir, + rootDirectoryUrl: rootUrl, + files: fileEntries, + directories: [], + }, + null, + 2 + ) + ); + } + + it("status --verbose surfaces paths with non-zero consecutiveUnavailableCount", async () => { + await initTestRepo({ "chronic.txt": "hello" }); + + // Seed a chronic count on chronic.txt. + await mutateSnapshot(raw => { + for (const pair of raw.files) { + const [relPath, entry] = pair; + if (relPath === "chronic.txt") { + entry.consecutiveUnavailableCount = 7; + } + } + }); + + const output = execSync(`${pushworkCmd} status --verbose "${tmpDir}"`, { + stdio: "pipe", + timeout: 30000, + }).toString("utf8"); + + expect(output).toMatch(/CHRONICALLY UNAVAILABLE/); + expect(output).toMatch(/chronic\.txt/); + expect(output).toMatch(/7/); + expect(output).toMatch(/rm-tracked|resync/); + }, 120000); + + it("rm-tracked removes the entry from the snapshot and the local file by default", async () => { + await initTestRepo({ "doomed.txt": "bye" }); + + execSync(`${pushworkCmd} rm-tracked doomed.txt "${tmpDir}"`, { + stdio: "pipe", + timeout: 30000, + }); + + // Snapshot entry gone. + const snapPath = path.join(tmpDir, ".pushwork", "snapshot.json"); + const snap = JSON.parse(await fs.readFile(snapPath, "utf8")); + const paths = new Set(snap.files.map((pair: [string, unknown]) => pair[0])); + expect(paths.has("doomed.txt")).toBe(false); + + // Local file also gone (default behavior). + await expect(fs.access(path.join(tmpDir, "doomed.txt"))).rejects.toThrow(); + }, 120000); + + it("rm-tracked --keep-local preserves the local file", async () => { + await initTestRepo({ "stays.txt": "stays here" }); + + execSync( + `${pushworkCmd} rm-tracked stays.txt --keep-local "${tmpDir}"`, + { + stdio: "pipe", + timeout: 30000, + } + ); + + const snapPath = path.join(tmpDir, ".pushwork", "snapshot.json"); + const snap = JSON.parse(await fs.readFile(snapPath, "utf8")); + const paths = new Set(snap.files.map((pair: [string, unknown]) => pair[0])); + expect(paths.has("stays.txt")).toBe(false); + + // Local file preserved. + const content = await fs.readFile( + path.join(tmpDir, "stays.txt"), + "utf8" + ); + expect(content).toBe("stays here"); + }, 120000); + + it("rm-tracked exits non-zero for an untracked path", async () => { + await initTestRepo({ "only.txt": "x" }); + + let exitCode = 0; + try { + execSync(`${pushworkCmd} rm-tracked nonexistent.txt "${tmpDir}"`, { + stdio: "pipe", + timeout: 15000, + }); + } catch (e: any) { + exitCode = e.status ?? -1; + } + expect(exitCode).not.toBe(0); + }, 60000); +}); diff --git a/test/integration/sync-lock-recovery.test.ts b/test/integration/sync-lock-recovery.test.ts new file mode 100644 index 0000000..ca36b01 --- /dev/null +++ b/test/integration/sync-lock-recovery.test.ts @@ -0,0 +1,154 @@ +/** + * Integration test for incomplete-sync recovery via the sync.lock + * marker (Phase 3a.i + 3a.ii). + * + * When a sync is interrupted (Ctrl-C, crash, SIGKILL, power loss), the + * `.pushwork/sync.lock` it wrote on startup is never cleared. On the + * next sync, pushwork detects the stale lock and runs a catch-up pull + * before ordinary change detection. + * + * Invariants tested: + * 1. A stale lock with a dead PID is detected and cleared. + * 2. The subsequent sync completes without deleting any local files. + * 3. A fresh run (no prior lock) doesn't trigger catch-up behavior. + * 4. The sync.lock is removed after a clean sync. + */ + +import * as fs from "fs/promises"; +import * as path from "path"; +import * as tmp from "tmp"; +import { execSync } from "child_process"; + +describe("sync-lock recovery preserves local files", () => { + let tmpDir: string; + let cleanup: () => void; + const pushworkCmd = `node "${path.join(__dirname, "../../dist/cli.js")}"`; + + beforeAll(() => { + execSync("pnpm build", { + cwd: path.join(__dirname, "../.."), + stdio: "pipe", + }); + }); + + beforeEach(() => { + const tmpObj = tmp.dirSync({ unsafeCleanup: true }); + tmpDir = tmpObj.name; + cleanup = tmpObj.removeCallback; + }); + + afterEach(() => { + cleanup(); + }); + + async function disableSync() { + const configPath = path.join(tmpDir, ".pushwork", "config.json"); + const cfg = JSON.parse(await fs.readFile(configPath, "utf8")); + cfg.sync_enabled = false; + await fs.writeFile(configPath, JSON.stringify(cfg, null, 2)); + } + + async function writeStaleLock(pid: number, startedAt: number) { + const lockPath = path.join(tmpDir, ".pushwork", "sync.lock"); + await fs.writeFile( + lockPath, + JSON.stringify({ pid, startedAt }), + "utf8" + ); + } + + async function lockExists(): Promise { + try { + await fs.access(path.join(tmpDir, ".pushwork", "sync.lock")); + return true; + } catch { + return false; + } + } + + it("clears a stale sync.lock from a dead PID and preserves local files", async () => { + await fs.writeFile(path.join(tmpDir, "preserved.txt"), "keep me"); + + try { + execSync(`${pushworkCmd} init "${tmpDir}"`, { + stdio: "pipe", + timeout: 60000, + }); + } catch { + // Acceptable on flaky network; we only need the snapshot to exist. + } + await expect( + fs.access(path.join(tmpDir, ".pushwork", "snapshot.json")) + ).resolves.toBeUndefined(); + + await disableSync(); + + // Simulate an interrupted previous sync by writing a lock whose + // PID is definitely dead. PID 999999 is essentially guaranteed + // to not be running. + await writeStaleLock(999999, Date.now() - 60000); + expect(await lockExists()).toBe(true); + + // Read the stale lock's pid so we can confirm it's different from + // whatever pid might end up in a fresh lock below. + const lockPathStr = path.join(tmpDir, ".pushwork", "sync.lock"); + const staleContent = JSON.parse(await fs.readFile(lockPathStr, "utf8")); + expect(staleContent.pid).toBe(999999); + + // Run a new sync. createRepo should detect the stale lock and + // clear it, marking recoveryReason="incomplete-sync". The sync + // itself must not delete the user's file. + try { + execSync(`${pushworkCmd} sync "${tmpDir}"`, { + stdio: "pipe", + timeout: 45000, + }); + } catch { + // Sync may timeout or fail. The invariants we check below still + // hold: file preservation + stale-lock clearance. + } + + // File must still exist with original content. + const content = await fs.readFile( + path.join(tmpDir, "preserved.txt"), + "utf8" + ); + expect(content).toBe("keep me"); + + // The stale lock (pid=999999) must have been cleared by + // createRepo. If a NEW lock is present afterward it means sync + // didn't finish cleanly (e.g. subprocess timeout) — that's + // acceptable and the NEXT sync will treat *that* as incomplete. + // What's not acceptable is the original stale lock persisting. + try { + const current = JSON.parse(await fs.readFile(lockPathStr, "utf8")); + expect(current.pid).not.toBe(999999); + } catch (e: any) { + if (e.code !== "ENOENT") { + // File exists but couldn't be parsed; either way not the stale lock + // we're protecting against. + } + // ENOENT means the lock was fully cleared. Accept. + } + }, 150000); + + it("does not flag a fresh install as requiring catch-up recovery", async () => { + // Plain init with no prior state should not produce a sync.lock + // after it completes (lock is written at sync start, cleared at + // sync end). We're only checking that the final state has no lock + // — whether init succeeds against the sync server isn't our concern + // for this invariant. + await fs.writeFile(path.join(tmpDir, "a.txt"), "a"); + + try { + execSync(`${pushworkCmd} init "${tmpDir}"`, { + stdio: "pipe", + timeout: 60000, + }); + } catch { + // Acceptable. + } + + expect(await lockExists()).toBe(false); + }, 90000); +}); diff --git a/test/integration/torn-write-recovery.test.ts b/test/integration/torn-write-recovery.test.ts new file mode 100644 index 0000000..9f7f592 --- /dev/null +++ b/test/integration/torn-write-recovery.test.ts @@ -0,0 +1,163 @@ +/** + * Integration test for torn-write recovery. + * + * Before the Phase 1/2/3a fixes, this sequence could cause data loss: + * + * 1. pushwork init (creates snapshot + cache) + * 2. Simulate torn write: truncate one file in .pushwork/automerge/ + * to 0 bytes + * 3. pushwork sync + * -> repo-factory wipes the entire cache (hasCorruptStorage detected) + * -> sync engine starts change detection + * -> documents not yet refetched from server + * -> change-detection reports "remote content unavailable" as + * "file deleted remotely" (remoteContent === null) + * -> applyRemoteChangeToLocal deletes the user's file + * + * After the fixes, pushwork must preserve the local file even when the + * remote state cannot be read — either by skipping the pull phase for + * unconfirmed absences (Phase 1) or by aborting before change detection + * when rehydration fails (Phase 3a). + * + * We disable sync by pre-writing a `.pushwork/config.json` before the + * init so the test is fully hermetic: no network calls, no sync-server + * dependency, no flaky timing on retries. The critical invariant we + * test is "local files are NOT deleted on a sync that encounters + * unavailable remote state", which holds regardless of whether the + * network is involved. + */ + +import * as fs from "fs/promises"; +import * as path from "path"; +import * as tmp from "tmp"; +import { execSync } from "child_process"; + +describe("torn-write recovery preserves local files", () => { + let tmpDir: string; + let cleanup: () => void; + const pushworkCmd = `node "${path.join(__dirname, "../../dist/cli.js")}"`; + + beforeAll(() => { + execSync("pnpm build", { + cwd: path.join(__dirname, "../.."), + stdio: "pipe", + }); + }); + + beforeEach(() => { + const tmpObj = tmp.dirSync({ unsafeCleanup: true }); + tmpDir = tmpObj.name; + cleanup = tmpObj.removeCallback; + }); + + /** + * Disable sync in the .pushwork/config.json after init. Used to make + * the test hermetic (no sync-server dependency) for the corruption + * + recovery phase. + */ + async function disableSync() { + const configPath = path.join(tmpDir, ".pushwork", "config.json"); + const cfg = JSON.parse(await fs.readFile(configPath, "utf8")); + cfg.sync_enabled = false; + await fs.writeFile(configPath, JSON.stringify(cfg, null, 2)); + } + + afterEach(() => { + cleanup(); + }); + + /** Recursively list all files under a directory. */ + async function listAllFiles(dir: string): Promise { + const entries = await fs.readdir(dir, { withFileTypes: true }); + const out: string[] = []; + for (const entry of entries) { + const full = path.join(dir, entry.name); + if (entry.isDirectory()) { + out.push(...(await listAllFiles(full))); + } else if (entry.isFile()) { + out.push(full); + } + } + return out; + } + + it("does not delete local files after torn-write cache recovery", async () => { + // 1. Lay down some user files. + const userFiles = [ + "a.txt", + "b.txt", + "subdir/c.txt", + "subdir/nested/d.txt", + ]; + await fs.mkdir(path.join(tmpDir, "subdir", "nested"), { recursive: true }); + for (const f of userFiles) { + await fs.writeFile(path.join(tmpDir, f), `content of ${f}`); + } + + // init creates snapshot + cache. This contacts the real sync + // server (default behavior); allow up to 60s for that round-trip + // since it's comparable to other existing integration tests. Once + // init is done we disable sync so subsequent commands are local-only. + try { + execSync(`${pushworkCmd} init "${tmpDir}"`, { + stdio: "pipe", + timeout: 60000, + }); + } catch { + // If the sync server is unreachable during CI, accept the + // failure — we still proceed if the snapshot was written. + } + + // Confirm init actually wrote the snapshot and automerge cache. + const snapshotPath = path.join(tmpDir, ".pushwork", "snapshot.json"); + await expect(fs.access(snapshotPath)).resolves.toBeUndefined(); + + // Disable sync for the remainder of the test so the corruption + // + recovery phase runs hermetically. + await disableSync(); + + // Sanity: user files exist post-init. + for (const f of userFiles) { + await expect( + fs.access(path.join(tmpDir, f)) + ).resolves.toBeUndefined(); + } + + // 2. Simulate a torn write by truncating ONE file in the cache. + // hasCorruptStorage will detect this and wipe the whole cache. + const automergeDir = path.join(tmpDir, ".pushwork", "automerge"); + const cacheFiles = await listAllFiles(automergeDir); + expect(cacheFiles.length).toBeGreaterThan(0); + await fs.truncate(cacheFiles[0], 0); + + // 3. Run sync. The cache is wiped, in-memory docs are gone. + // Without the fixes this would cause change detection to see + // all snapshot files as "remote content unavailable → delete". + // With the fixes, Phase 1 (confirmedAbsent) skips all those + // deletions. + // + // Sync may exit 0 or non-zero (Phase 3a doesn't abort because + // sync is disabled; Phase 1 does its job and skips). Either way + // user files must be preserved. + try { + execSync(`${pushworkCmd} sync "${tmpDir}"`, { + stdio: "pipe", + timeout: 30000, + }); + } catch { + // Acceptable: we care about file preservation, not sync success. + } + + // 4. Verify: every user file still on disk with original content. + const missing: string[] = []; + for (const f of userFiles) { + try { + const content = await fs.readFile(path.join(tmpDir, f), "utf8"); + expect(content).toBe(`content of ${f}`); + } catch { + missing.push(f); + } + } + expect(missing).toEqual([]); + }, 120000); +}); diff --git a/test/unit/directory-lookup.test.ts b/test/unit/directory-lookup.test.ts new file mode 100644 index 0000000..dba59b8 --- /dev/null +++ b/test/unit/directory-lookup.test.ts @@ -0,0 +1,245 @@ +/** + * Tests for `findFileInDirectoryHierarchy`, which returns a tri-state + * `RemoteLookup`: { kind: "found" | "absent" | "unavailable" }. + * + * The distinction matters because destructive operations (e.g. deleting + * a local file on the basis of remote state) must only act on a + * confirmed "absent" result, never on "unavailable". + * + * We construct a fake Repo that implements only the subset of the + * interface that `findFileInDirectoryHierarchy` uses (`find(url)` + * returning a handle with `.doc()`), allowing us to exercise the + * lookup logic without Wasm or network dependencies. + */ + +import { generateAutomergeUrl } from "@automerge/automerge-repo"; +import { findFileInDirectoryHierarchy } from "../../src/utils/directory"; +import { DirectoryDocument } from "../../src/types"; + +type FakeDirDoc = DirectoryDocument | undefined | "throw"; + +class FakeRepo { + // url (plain, no heads) -> directory document contents, undefined to + // simulate "not ready", "throw" to simulate repo.find rejection. + private docs = new Map(); + + setDir(url: string, doc: FakeDirDoc) { + this.docs.set(url, doc); + } + + async find(url: string): Promise<{ doc(): T | undefined }> { + const entry = this.docs.get(url); + if (entry === "throw") { + throw new Error("document unavailable"); + } + return { + doc: () => entry as unknown as T, + }; + } +} + +function mkDir( + url: string, + entries: { name: string; type: "file" | "folder"; url: string }[] +): DirectoryDocument { + return { + "@patchwork": { type: "folder" }, + name: url, + title: url, + docs: entries as any, + }; +} + +describe("findFileInDirectoryHierarchy tri-state RemoteLookup", () => { + // Use generated Automerge URLs because parseAutomergeUrl validates + // the base58 encoding — arbitrary strings like "automerge:root" fail. + const ROOT = generateAutomergeUrl(); + const SUB = generateAutomergeUrl(); + const SUB2 = generateAutomergeUrl(); + const FILE = generateAutomergeUrl(); + + it("returns {kind: 'found'} when the file is present in the final directory", async () => { + const repo = new FakeRepo(); + repo.setDir( + ROOT, + mkDir("root", [{ name: "foo.txt", type: "file", url: FILE }]) + ); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "foo.txt" + ); + + expect(result.kind).toBe("found"); + if (result.kind === "found") { + expect(result.entry.name).toBe("foo.txt"); + expect(result.entry.type).toBe("file"); + expect(result.entry.url).toBe(FILE); + } + }); + + it("returns {kind: 'absent'} when the directory was read but the file is not in it", async () => { + const repo = new FakeRepo(); + repo.setDir( + ROOT, + mkDir("root", [ + { name: "something-else.txt", type: "file", url: FILE }, + ]) + ); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "missing.txt" + ); + + expect(result.kind).toBe("absent"); + }); + + it("returns {kind: 'unavailable'} when the final directory document is not ready", async () => { + const repo = new FakeRepo(); + repo.setDir(ROOT, undefined); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "foo.txt" + ); + + expect(result.kind).toBe("unavailable"); + if (result.kind === "unavailable") { + expect(result.reason).toMatch(/not ready/i); + } + }); + + it("returns {kind: 'unavailable'} when repo.find rejects for the final directory", async () => { + const repo = new FakeRepo(); + repo.setDir(ROOT, "throw"); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "foo.txt" + ); + + expect(result.kind).toBe("unavailable"); + if (result.kind === "unavailable") { + expect(result.reason).toMatch(/failed to fetch/i); + } + }); + + it("returns {kind: 'unavailable'} when an intermediate directory is not ready", async () => { + // Need a 3-deep path so 'sub' is truly intermediate (not the final parent). + const repo = new FakeRepo(); + repo.setDir( + ROOT, + mkDir("root", [{ name: "sub", type: "folder", url: SUB }]) + ); + repo.setDir(SUB, undefined); // intermediate not ready + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "sub/inner/foo.txt" + ); + + expect(result.kind).toBe("unavailable"); + if (result.kind === "unavailable") { + expect(result.reason).toMatch(/intermediate/i); + } + }); + + it("returns {kind: 'unavailable'} when the parent directory of the target file is not ready", async () => { + const repo = new FakeRepo(); + repo.setDir( + ROOT, + mkDir("root", [{ name: "sub", type: "folder", url: SUB }]) + ); + repo.setDir(SUB, undefined); // parent (final) dir not ready + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "sub/foo.txt" + ); + + expect(result.kind).toBe("unavailable"); + if (result.kind === "unavailable") { + expect(result.reason).toMatch(/parent directory/i); + } + }); + + it("returns {kind: 'absent'} when an intermediate folder is missing from its parent (authoritative read)", async () => { + // Read of root succeeds; root has no "sub" folder entry. + // From the caller's perspective this is positive evidence that the + // target path is absent from the remote hierarchy. + const repo = new FakeRepo(); + repo.setDir(ROOT, mkDir("root", [])); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "sub/foo.txt" + ); + + expect(result.kind).toBe("absent"); + }); + + it("navigates multi-level directory hierarchies correctly", async () => { + const repo = new FakeRepo(); + repo.setDir( + ROOT, + mkDir("root", [{ name: "a", type: "folder", url: SUB }]) + ); + repo.setDir( + SUB, + mkDir("a", [{ name: "b", type: "folder", url: SUB2 }]) + ); + repo.setDir( + SUB2, + mkDir("b", [{ name: "c.txt", type: "file", url: FILE }]) + ); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "a/b/c.txt" + ); + + expect(result.kind).toBe("found"); + if (result.kind === "found") { + expect(result.entry.url).toBe(FILE); + } + }); + + it("returns a plain object for the found entry (no Automerge proxy leak)", async () => { + const repo = new FakeRepo(); + repo.setDir( + ROOT, + mkDir("root", [{ name: "foo.txt", type: "file", url: FILE }]) + ); + + const result = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "foo.txt" + ); + + expect(result.kind).toBe("found"); + if (result.kind === "found") { + // The returned entry should be a plain object that doesn't reference + // the doc's internal representation. Mutating it must not affect + // the original doc contents. + result.entry.name = "mutated"; + const second = await findFileInDirectoryHierarchy( + repo as any, + ROOT, + "foo.txt" + ); + if (second.kind === "found") { + expect(second.entry.name).toBe("foo.txt"); + } + } + }); +}); diff --git a/test/unit/sync-lock.test.ts b/test/unit/sync-lock.test.ts new file mode 100644 index 0000000..26dd4f7 --- /dev/null +++ b/test/unit/sync-lock.test.ts @@ -0,0 +1,127 @@ +/** + * Unit tests for `src/utils/sync-lock.ts`. + * + * The sync lock is written at the start of every sync() and cleared on + * clean completion. If a lock is still present at startup, the previous + * sync didn't finish cleanly — treat as incomplete-sync recovery. + */ + +import * as fs from "fs/promises"; +import * as path from "path"; +import { tmpdir } from "os"; +import { + writeSyncLock, + readSyncLock, + clearSyncLock, + isStaleSyncLock, + syncLockPath, + SyncLock, +} from "../../src/utils/sync-lock"; + +describe("sync-lock", () => { + let dir: string; + + beforeEach(async () => { + dir = await fs.mkdtemp(path.join(tmpdir(), "pushwork-sync-lock-test-")); + }); + + afterEach(async () => { + await fs.rm(dir, { recursive: true, force: true }); + }); + + describe("write + read round-trip", () => { + it("writes and reads a lock with pid and startedAt", async () => { + await writeSyncLock(dir); + const lock = await readSyncLock(dir); + expect(lock).not.toBeNull(); + expect(lock!.pid).toBe(process.pid); + expect(typeof lock!.startedAt).toBe("number"); + expect(Date.now() - lock!.startedAt).toBeLessThan(5000); + }); + + it("creates the directory if it does not exist", async () => { + const nested = path.join(dir, "does-not-exist-yet"); + await writeSyncLock(nested); + const lock = await readSyncLock(nested); + expect(lock).not.toBeNull(); + }); + + it("overwrites an existing lock", async () => { + await writeSyncLock(dir); + const first = await readSyncLock(dir); + await new Promise(r => setTimeout(r, 5)); + await writeSyncLock(dir); + const second = await readSyncLock(dir); + expect(second!.startedAt).toBeGreaterThanOrEqual(first!.startedAt); + }); + }); + + describe("readSyncLock", () => { + it("returns null when the lock file does not exist", async () => { + const lock = await readSyncLock(dir); + expect(lock).toBeNull(); + }); + + it("returns null when the lock file is malformed JSON", async () => { + await fs.writeFile(syncLockPath(dir), "not json", "utf8"); + const lock = await readSyncLock(dir); + expect(lock).toBeNull(); + }); + + it("returns null when the lock payload is missing required fields", async () => { + await fs.writeFile( + syncLockPath(dir), + JSON.stringify({ wrong: "shape" }), + "utf8" + ); + const lock = await readSyncLock(dir); + expect(lock).toBeNull(); + }); + }); + + describe("clearSyncLock", () => { + it("removes an existing lock", async () => { + await writeSyncLock(dir); + expect(await readSyncLock(dir)).not.toBeNull(); + await clearSyncLock(dir); + expect(await readSyncLock(dir)).toBeNull(); + }); + + it("is idempotent when no lock exists", async () => { + await expect(clearSyncLock(dir)).resolves.toBeUndefined(); + await expect(clearSyncLock(dir)).resolves.toBeUndefined(); + }); + }); + + describe("isStaleSyncLock", () => { + it("returns false for a lock whose pid is this process", () => { + const lock: SyncLock = { pid: process.pid, startedAt: Date.now() }; + expect(isStaleSyncLock(lock)).toBe(false); + }); + + it("returns true for a lock whose pid does not exist", () => { + // PID 999999 is extremely unlikely to be a running process. + const lock: SyncLock = { pid: 999999, startedAt: Date.now() }; + expect(isStaleSyncLock(lock)).toBe(true); + }); + + it("returns true for an ancient lock regardless of pid liveness", () => { + const ancient: SyncLock = { + pid: process.pid, + startedAt: Date.now() - 48 * 60 * 60 * 1000, // 48 hours ago + }; + expect(isStaleSyncLock(ancient)).toBe(true); + }); + + it("respects the now parameter for age checks", () => { + const lock: SyncLock = { + pid: process.pid, + startedAt: 1_000_000_000_000, // old + }; + // If `now` is right after startedAt, not stale (young). + expect(isStaleSyncLock(lock, 1_000_000_001_000)).toBe(false); + // If `now` is far in the future, stale (too old). + expect(isStaleSyncLock(lock, 1_000_000_000_000 + 48 * 60 * 60 * 1000 + 1)).toBe(true); + }); + }); +});