Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 153 additions & 0 deletions src/cache/single-flight.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/**
* Single-flight wrapper for cache-aside fetches.
*
* Wraps a result cache with an in-flight Map so concurrent misses for the
* same key share one fetch. The shared fetch is reference-counted against
* its waiters: each caller's `signal` rejects only that caller's wait,
* but if every waiter has aborted the underlying fetch is aborted too and
* the pending entry is dropped, so the next caller starts fresh instead of
* being stuck behind a hung request. New waiters arriving while the fetch
* is in flight bump the refcount, so a single straggler keeps the work
* alive for everyone who follows.
*/

export interface CacheLike<K, V> {
get(key: K): V | undefined;
set(key: K, value: V): void;
}

export interface SingleFlight<K, V> {
/**
* Resolve `key` from the cache, falling through to `fetcher` on miss.
* Concurrent callers for the same key share one in-flight `fetcher`
* invocation; the result is written to `cache` on success.
*
* `signal` rejects only the caller's await. The underlying fetch keeps
* running unless every waiter has aborted, in which case the
* `AbortSignal` passed to `fetcher` fires and the pending entry is
* dropped so the next caller starts fresh.
*
* `fetcher` may throw synchronously; the throw is converted to a
* rejected promise so `load`'s `Promise<V>` contract holds.
*/
load(
key: K,
fetcher: (signal: AbortSignal) => Promise<V>,
signal?: AbortSignal,
): Promise<V>;
}

interface Pending<V> {
promise: Promise<V>;
controller: AbortController;
refCount: number;
}

/** Build a SingleFlight backed by `cache`. */
export function singleFlight<K, V>(cache: CacheLike<K, V>): SingleFlight<K, V> {
const pending = new Map<K, Pending<V>>();

return {
load(
key: K,
fetcher: (signal: AbortSignal) => Promise<V>,
signal?: AbortSignal,
): Promise<V> {
// Reject (don't throw) when the caller is already aborted — callers
// expect a Promise back, including in the pre-aborted case.
if (signal?.aborted) return Promise.reject(makeAbortError());

const hit = cache.get(key);
if (hit !== undefined) return Promise.resolve(hit);

let entry = pending.get(key);
if (!entry) {
const controller = new AbortController();
const promise = invokeFetcher(fetcher, controller.signal)
.then((value) => {
cache.set(key, value);
return value;
})
.finally(() => {
if (pending.get(key)?.promise === promise) pending.delete(key);
});
const fresh: Pending<V> = { promise, controller, refCount: 0 };
pending.set(key, fresh);
entry = fresh;
}
const owned = entry;
owned.refCount++;

const release = () => {
owned.refCount--;
if (owned.refCount > 0) return;
// Last waiter just released. If the entry is still in `pending`
// the fetch hasn't settled yet — drop the slot synchronously so
// any new caller starts fresh, then abort the in-flight work.
// If `.finally` already cleared it, the fetch resolved and the
// controller.abort() is a no-op.
if (pending.get(key) === owned) {
pending.delete(key);
owned.controller.abort();
}
};

if (!signal) {
return owned.promise.then(
(value) => {
release();
return value;
},
(error) => {
release();
throw error;
},
);
}

return new Promise<V>((resolve, reject) => {
let done = false;
// `finish` ensures release runs exactly once per caller and wins
// the resolve/reject race between abort and underlying settle.
const finish = (): boolean => {
if (done) return false;
done = true;
signal.removeEventListener("abort", handleAbort);
release();
return true;
};
const handleAbort = () => {
if (finish()) reject(makeAbortError());
};
signal.addEventListener("abort", handleAbort, { once: true });
owned.promise.then(
(value) => {
if (finish()) resolve(value);
},
(error) => {
if (finish()) reject(error);
},
);
});
},
};
}

/**
* Invoke `fetcher`, converting a synchronous throw into a rejected
* Promise so callers always observe a Promise-shaped failure.
*/
function invokeFetcher<V>(
fetcher: (signal: AbortSignal) => Promise<V>,
signal: AbortSignal,
): Promise<V> {
try {
return fetcher(signal);
} catch (error) {
return Promise.reject(error);
}
}

function makeAbortError(): Error {
return new DOMException("The operation was aborted.", "AbortError");
Comment thread
Shane98c marked this conversation as resolved.
}
56 changes: 39 additions & 17 deletions src/reader/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
} from "../storage/storage.js";
import { decompress } from "fzstd";
import { LRUCache } from "../cache/lru.js";
import { singleFlight, type SingleFlight } from "../cache/single-flight.js";
import {
parseHeader,
validateFileType,
Expand Down Expand Up @@ -84,6 +85,14 @@ export class ReadSession {
private snapshot: Snapshot;
private specVersion: SpecVersion;
private manifestCache: LRUCache<string, Manifest>;
/**
* Single-flight loader over `manifestCache`: concurrent misses for the
* same manifest share one fetch instead of racing N identical GETs.
* Critical for fan-out reads that all need the same manifest (e.g. the
* many parallel chunk reads issued when a renderer crosses into a new
* pyramid level).
*/
private manifestLoader: SingleFlight<string, Manifest>;
/**
* Bounded cache of `AsyncReadable`s wrapped with zarrita's range coalescer.
* The cache key includes request-option identities that must not share one
Expand All @@ -110,6 +119,7 @@ export class ReadSession {
this.snapshot = snapshot;
this.specVersion = specVersion;
this.manifestCache = new LRUCache(maxManifestCacheSize);
this.manifestLoader = singleFlight(this.manifestCache);
}

private getFetchClientKey(fetchClient: FetchClient | undefined): string {
Expand Down Expand Up @@ -281,38 +291,50 @@ export class ReadSession {
};
}

/** Load and parse a manifest from storage */
private async loadManifest(
/**
* Load and parse a manifest from storage.
*
* Concurrent calls for the same manifest are coalesced through
* `manifestLoader` so a fan-out of chunk reads — the typical case
* when many tiles cross into a new pyramid level at once — issues one
* HTTP GET instead of one per caller. A caller's `signal` rejects only
* that caller's await; the shared fetch is aborted only once every
* active waiter has aborted.
*/
private loadManifest(
manifestId: ObjectId12,
options?: RequestOptions,
): Promise<Manifest> {
const idStr = encodeObjectId12(manifestId);
return this.manifestLoader.load(
idStr,
(signal) => this.fetchManifest(idStr, signal),
options?.signal,
);
}

// Check cache first
const cached = this.manifestCache.get(idStr);
if (cached) return cached;

// Load from storage with signal
/**
* Fetch and parse a single manifest. The `signal` here is the
* single-flight loader's own signal — it fires only when every waiter
* has aborted, so a hung manifest fetch doesn't trap later callers
* behind it.
*/
private async fetchManifest(
idStr: string,
signal: AbortSignal,
): Promise<Manifest> {
const path = getManifestPath(idStr);
const data = await this.storage.getObject(path, undefined, options);
const data = await this.storage.getObject(path, undefined, { signal });

// Parse header
const header = parseHeader(data);
validateFileType(header, FileType.Manifest);

// Decompress if needed
let flatbufferData = getDataAfterHeader(data);
if (header.compression === CompressionAlgorithm.Zstd) {
flatbufferData = decompress(flatbufferData);
}

// Parse FlatBuffer
const manifest = parseManifest(flatbufferData);

// Cache it
this.manifestCache.set(idStr, manifest);

return manifest;
return parseManifest(flatbufferData);
}

/**
Expand Down
Loading
Loading