Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions src/reader/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/

import type { Storage, RequestOptions } from "../storage/storage.js";
import { NotFoundError, AbortError } from "../storage/storage.js";
import { NotFoundError, isAbortError } from "../storage/storage.js";
import {
getBranchRefDirPath,
getBranchRefPath,
Expand Down Expand Up @@ -63,7 +63,7 @@ export class Repository {
* @param options - Optional request options (signal for cancellation)
* @returns ParsedRepo if v2 format, null if v1 format
* @throws Error if repo file exists but fails to parse
* @throws AbortError if the operation was aborted
* @throws An error named "AbortError" if the operation was aborted
*/
private async loadRepoInfo(
options?: RequestOptions,
Expand All @@ -83,7 +83,7 @@ export class Repository {
return this.repoInfo;
} catch (error) {
// Propagate abort errors
if (error instanceof AbortError) {
if (isAbortError(error)) {
this.repoInfoAttempted = false; // Allow retry
throw error;
}
Expand Down Expand Up @@ -167,7 +167,7 @@ export class Repository {
}
} catch (error) {
// Propagate abort errors
if (error instanceof AbortError) {
if (isAbortError(error)) {
throw error;
}
// Listing not supported - try legacy path only
Expand Down Expand Up @@ -208,7 +208,7 @@ export class Repository {
}
} catch (error) {
// Propagate abort errors
if (error instanceof AbortError) {
if (isAbortError(error)) {
throw error;
}
// Listing not supported - return legacy path without checking exists.
Expand Down Expand Up @@ -265,7 +265,7 @@ export class Repository {
}
}
} catch (error) {
if (error instanceof AbortError) {
if (isAbortError(error)) {
throw error;
}
throw new Error("Cannot list branches: storage does not support listing");
Expand Down Expand Up @@ -322,7 +322,7 @@ export class Repository {
}
}
} catch (error) {
if (error instanceof AbortError) {
if (isAbortError(error)) {
throw error;
}
throw new Error("Cannot list tags: storage does not support listing");
Expand Down
94 changes: 32 additions & 62 deletions src/reader/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/

import type { Storage, ByteRange, RequestOptions } from "../storage/storage.js";
import { AbortError } from "../storage/storage.js";
import { decompress } from "fzstd";
import { LRUCache } from "../cache/lru.js";
import {
Expand Down Expand Up @@ -307,8 +306,7 @@ export class ReadSession {
coords: number[],
options?: RequestOptions,
): Promise<Uint8Array | null> {
// Early abort check
if (options?.signal?.aborted) return null;
options?.signal?.throwIfAborted();

const node = this.getNode(path);
if (!node || node.nodeData.type !== "array") {
Expand All @@ -318,28 +316,22 @@ export class ReadSession {
// Find the manifest that contains this chunk
const arrayData = node.nodeData;

try {
for (const manifestRef of arrayData.manifests) {
// Check if this manifest covers the requested coordinates
if (!this.coordsInExtents(coords, manifestRef.extents)) {
continue;
}
for (const manifestRef of arrayData.manifests) {
// Check if this manifest covers the requested coordinates
if (!this.coordsInExtents(coords, manifestRef.extents)) {
continue;
}

// Load the manifest with signal
const manifest = await this.loadManifest(manifestRef.objectId, options);
// Load the manifest with signal
const manifest = await this.loadManifest(manifestRef.objectId, options);

// Find the chunk reference
const chunkRef = findChunkRef(manifest, node.id, coords);
if (!chunkRef) continue;
// Find the chunk reference
const chunkRef = findChunkRef(manifest, node.id, coords);
if (!chunkRef) continue;

// Fetch the chunk data based on payload type with signal
const payload = getChunkPayload(chunkRef);
return this.fetchChunkPayload(payload, options);
}
} catch (error) {
// Mid-flight abort → return null
if (error instanceof AbortError) return null;
throw error;
// Fetch the chunk data based on payload type with signal
const payload = getChunkPayload(chunkRef);
return this.fetchChunkPayload(payload, options);
}

return null;
Expand All @@ -364,7 +356,7 @@ export class ReadSession {
range: { offset: number; length: number } | { suffixLength: number },
options?: RequestOptions,
): Promise<Uint8Array | null> {
if (options?.signal?.aborted) return null;
options?.signal?.throwIfAborted();

const node = this.getNode(path);
if (!node || node.nodeData.type !== "array") {
Expand All @@ -373,22 +365,17 @@ export class ReadSession {

const arrayData = node.nodeData;

try {
for (const manifestRef of arrayData.manifests) {
if (!this.coordsInExtents(coords, manifestRef.extents)) {
continue;
}
for (const manifestRef of arrayData.manifests) {
if (!this.coordsInExtents(coords, manifestRef.extents)) {
continue;
}

const manifest = await this.loadManifest(manifestRef.objectId, options);
const chunkRef = findChunkRef(manifest, node.id, coords);
if (!chunkRef) continue;
const manifest = await this.loadManifest(manifestRef.objectId, options);
const chunkRef = findChunkRef(manifest, node.id, coords);
if (!chunkRef) continue;

const payload = getChunkPayload(chunkRef);
return this.fetchChunkPayloadRange(payload, range, options);
}
} catch (error) {
if (error instanceof AbortError) return null;
throw error;
const payload = getChunkPayload(chunkRef);
return this.fetchChunkPayloadRange(payload, range, options);
}

return null;
Expand Down Expand Up @@ -469,19 +456,10 @@ export class ReadSession {
signal: options?.signal,
};

let response: Response;
try {
const client = options?.fetchClient;
response = client
? await client.fetch(httpUrl, fetchInit)
: await fetch(httpUrl, fetchInit);
} catch (error) {
// Translate abort errors to our class (handles DOMException and other implementations)
if (error instanceof Error && error.name === "AbortError") {
throw new AbortError();
}
throw error;
}
const client = options?.fetchClient;
const response = client
? await client.fetch(httpUrl, fetchInit)
: await fetch(httpUrl, fetchInit);

if (response.status === 412) {
throw new Error(
Expand Down Expand Up @@ -596,18 +574,10 @@ export class ReadSession {
signal: options?.signal,
};

let response: Response;
try {
const client = options?.fetchClient;
response = client
? await client.fetch(httpUrl, fetchInit)
: await fetch(httpUrl, fetchInit);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
throw new AbortError();
}
throw error;
}
const client = options?.fetchClient;
const response = client
? await client.fetch(httpUrl, fetchInit)
: await fetch(httpUrl, fetchInit);

if (response.status === 412) {
throw new Error(
Expand Down
22 changes: 5 additions & 17 deletions src/storage/http-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import type { Storage, ByteRange, RequestOptions } from "./storage.js";
import { NotFoundError, StorageError, AbortError } from "./storage.js";
import { NotFoundError, StorageError, isAbortError } from "./storage.js";

/** Options for HTTP storage */
export interface HttpStorageOptions {
Expand Down Expand Up @@ -65,10 +65,7 @@ export class HttpStorage implements Storage {
range?: ByteRange,
options?: RequestOptions,
): Promise<Uint8Array> {
// Early abort check
if (options?.signal?.aborted) {
throw new AbortError();
}
options?.signal?.throwIfAborted();

const url = this.getUrl(path);
const headers = this.getHeaders(range);
Expand All @@ -83,10 +80,7 @@ export class HttpStorage implements Storage {
signal: options?.signal,
});
} catch (error) {
// Translate abort errors to our class (handles DOMException and other implementations)
if (error instanceof Error && error.name === "AbortError") {
throw new AbortError();
}
if (isAbortError(error)) throw error;
throw new StorageError(
`Failed to fetch ${url}: ${error instanceof Error ? error.message : String(error)}`,
error instanceof Error ? error : undefined,
Expand All @@ -109,10 +103,7 @@ export class HttpStorage implements Storage {
}

async exists(path: string, options?: RequestOptions): Promise<boolean> {
// Early abort check
if (options?.signal?.aborted) {
throw new AbortError();
}
options?.signal?.throwIfAborted();

const url = this.getUrl(path);

Expand All @@ -126,10 +117,7 @@ export class HttpStorage implements Storage {

return response.ok;
} catch (error) {
// Rethrow abort errors (handles DOMException and other implementations)
if (error instanceof Error && error.name === "AbortError") {
throw new AbortError();
}
if (isAbortError(error)) throw error;
return false;
}
}
Expand Down
21 changes: 13 additions & 8 deletions src/storage/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,17 @@ export class StorageError extends Error {
}
}

/** Error thrown when an operation is aborted */
export class AbortError extends Error {
constructor(message = "Operation aborted") {
super(message);
this.name = "AbortError";
}
type ErrorWithName = { name: unknown };

function hasErrorName(error: unknown): error is ErrorWithName {
return typeof error === "object" && error !== null && "name" in error;
}

/** True for web-platform abort errors from fetch/AbortSignal. */
export function isAbortError(
error: unknown,
): error is ErrorWithName & { name: "AbortError" } {
return hasErrorName(error) && error.name === "AbortError";
}

/**
Expand All @@ -105,7 +110,7 @@ export interface Storage {
* @returns Object data as bytes
* @throws NotFoundError if the object doesn't exist
* @throws StorageError for other errors
* @throws AbortError if the operation was aborted
* @throws An error named "AbortError" if the operation was aborted
*/
getObject(
path: string,
Expand All @@ -119,7 +124,7 @@ export interface Storage {
* @param path - Path to the object
* @param options - Optional request options (signal for cancellation)
* @returns True if the object exists
* @throws AbortError if the operation was aborted
* @throws An error named "AbortError" if the operation was aborted
*/
exists(path: string, options?: RequestOptions): Promise<boolean>;

Expand Down
24 changes: 18 additions & 6 deletions src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Repository } from "./reader/repository.js";
import { ReadSession } from "./reader/session.js";
import { HttpStorage } from "./storage/http-storage.js";
import type { Storage, FetchClient } from "./storage/storage.js";
import { NotFoundError } from "./storage/storage.js";
import type { NodeSnapshot } from "./format/flatbuffers/types.js";

/**
Expand Down Expand Up @@ -219,7 +220,7 @@ export class IcechunkStore implements AsyncReadable {
key: AbsolutePath,
opts?: { signal?: AbortSignal },
): Promise<Uint8Array | undefined> {
if (opts?.signal?.aborted) return undefined;
opts?.signal?.throwIfAborted();

const parsed = parseZarrKey(key);
const resolvedPath = this.resolvePath(parsed.path);
Expand All @@ -237,8 +238,15 @@ export class IcechunkStore implements AsyncReadable {
azureAccount: this.azureAccount,
});
return chunk ?? undefined;
} catch {
return undefined;
} catch (err) {
// Only treat "chunk legitimately absent" as undefined. Propagate
// everything else — swallowing aborts/network errors here causes
// zarrita's Array.getChunk to return a fillValue chunk, which
// downstream consumers cache as valid data (e.g. zarr-layer commits
// it to region.data, producing permanent blank regions even after
// repainting).
if (err instanceof NotFoundError) return undefined;
throw err;
}
}

Expand All @@ -259,7 +267,7 @@ export class IcechunkStore implements AsyncReadable {
range: RangeQuery,
opts?: { signal?: AbortSignal },
): Promise<Uint8Array | undefined> {
if (opts?.signal?.aborted) return undefined;
opts?.signal?.throwIfAborted();

const parsed = parseZarrKey(key);
const resolvedPath = this.resolvePath(parsed.path);
Expand Down Expand Up @@ -289,8 +297,12 @@ export class IcechunkStore implements AsyncReadable {
return data.slice(-range.suffixLength);
}
return data.slice(range.offset, range.offset + range.length);
} catch {
return undefined;
} catch (err) {
// Same rationale as `get()`: don't swallow aborts or real errors,
// or zarrita will silently fall back to a fillValue chunk and
// consumers will cache that garbage as valid data.
if (err instanceof NotFoundError) return undefined;
throw err;
}
}

Expand Down
11 changes: 11 additions & 0 deletions tests/reader/repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ describe("Repository", () => {
);
});

it("should propagate native abort errors while opening", async () => {
const abortError = new DOMException("Aborted", "AbortError");
const storage = new (class extends MockStorage {
async getObject(): Promise<Uint8Array> {
throw abortError;
}
})();

await expect(Repository.open({ storage })).rejects.toBe(abortError);
});

it("should open a valid v1 repository with main branch", async () => {
const snapshotId = createMockSnapshotId(1);
const storage = new MockStorage({
Expand Down
Loading
Loading