-
Notifications
You must be signed in to change notification settings - Fork 0
fix(codec): await nested Arrow in torch tensors #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@CodeRabbit full review |
✅ Actions performedFull review triggered. |
📝 WalkthroughWalkthroughThe changes add asynchronous envelope decoding support by introducing Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/utils/codec.ts`:
- Around line 261-369: decodeEnvelopeAsync duplicates most logic from the
synchronous decodeEnvelope; extract the shared
envelope-matching/field-extraction logic into a single core function (e.g.,
decodeEnvelopeCore) that takes a decodeArrow decoder returning T | Promise<T>
and a recurse/callback to decode nested values, then have decodeEnvelope call
the core synchronously (or wrap a sync decoder) and decodeEnvelopeAsync await
the core; update torch.tensor handling to call the provided recurse for nested
value decoding and remove the duplicated branches from decodeEnvelopeAsync so
both variants reuse the same implementation.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
src/utils/codec.tstest/runtime_codec.test.ts
🧰 Additional context used
🧬 Code graph analysis (2)
test/runtime_codec.test.ts (1)
src/utils/codec.ts (3)
registerArrowDecoder(102-106)CodecEnvelope(43-90)decodeValueAsync(374-376)
src/utils/codec.ts (1)
src/index.ts (1)
decodeValueAsync(71-71)
🔇 Additional comments (3)
src/utils/codec.ts (2)
331-346: The async torch tensor decode fix looks correct.The key change at line 339 properly awaits the nested
decodeEnvelopeAsynccall, ensuring that when the innerndarrayuses Arrow encoding, the Promise is resolved before being assigned toTorchTensor.data. This directly addresses issue#21.
374-376: LGTM!The change to use
decodeEnvelopeAsyncwithtryDecodeArrowTablecorrectly enables async decoding throughout the envelope hierarchy.test/runtime_codec.test.ts (1)
395-419: Test is correct as written—no changes needed.The
registerArrowDecoderAPI accepts synchronous decoders only (type signature:(bytes: Uint8Array) => ArrowTable | Uint8Array). The sync decoder in the test correctly matches this contract.The bug fix addressed was about properly awaiting nested envelope decoding in
decodeEnvelopeAsync(see line 339 in codec.ts), not about supporting async decoders. The test validates this by confirming that a torch tensor containing a nested Arrow-encoded ndarray is correctly decoded with proper await semantics in the recursive decoding path.Likely an incorrect or invalid review comment.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| async function decodeEnvelopeAsync<T>( | ||
| value: unknown, | ||
| decodeArrow: (bytes: Uint8Array) => T | Promise<T> | ||
| ): Promise<T | unknown> { | ||
| if (!isObject(value)) { | ||
| return value; | ||
| } | ||
| const marker = (value as { __tywrap__?: unknown }).__tywrap__; | ||
| if ( | ||
| (marker === 'dataframe' || marker === 'series') && | ||
| (value as { encoding?: unknown }).encoding === 'arrow' && | ||
| typeof (value as { b64?: unknown }).b64 === 'string' | ||
| ) { | ||
| const bytes = fromBase64(String((value as { b64: string }).b64)); | ||
| return await decodeArrow(bytes); | ||
| } | ||
| if ( | ||
| marker === 'dataframe' && | ||
| (value as { encoding?: unknown }).encoding === 'json' && | ||
| 'data' in (value as object) | ||
| ) { | ||
| return (value as { data: unknown }).data; | ||
| } | ||
| if ( | ||
| marker === 'series' && | ||
| (value as { encoding?: unknown }).encoding === 'json' && | ||
| 'data' in (value as object) | ||
| ) { | ||
| return (value as { data: unknown }).data; | ||
| } | ||
| if (marker === 'ndarray') { | ||
| if ( | ||
| (value as { encoding?: unknown }).encoding === 'arrow' && | ||
| typeof (value as { b64?: unknown }).b64 === 'string' | ||
| ) { | ||
| const bytes = fromBase64(String((value as { b64: string }).b64)); | ||
| return await decodeArrow(bytes); | ||
| } | ||
| if ((value as { encoding?: unknown }).encoding === 'json' && 'data' in (value as object)) { | ||
| return (value as { data: unknown }).data; | ||
| } | ||
| } | ||
| if ( | ||
| marker === 'scipy.sparse' && | ||
| (value as { encoding?: unknown }).encoding === 'json' && | ||
| typeof (value as { format?: unknown }).format === 'string' && | ||
| Array.isArray((value as { shape?: unknown }).shape) && | ||
| Array.isArray((value as { data?: unknown }).data) | ||
| ) { | ||
| const sparse = value as { | ||
| format: 'csr' | 'csc' | 'coo'; | ||
| shape: readonly number[]; | ||
| data: readonly unknown[]; | ||
| indices?: readonly number[]; | ||
| indptr?: readonly number[]; | ||
| row?: readonly number[]; | ||
| col?: readonly number[]; | ||
| dtype?: string; | ||
| }; | ||
| return { | ||
| format: sparse.format, | ||
| shape: sparse.shape, | ||
| data: sparse.data, | ||
| indices: sparse.indices, | ||
| indptr: sparse.indptr, | ||
| row: sparse.row, | ||
| col: sparse.col, | ||
| dtype: sparse.dtype, | ||
| } satisfies SparseMatrix; | ||
| } | ||
| if (marker === 'torch.tensor' && (value as { encoding?: unknown }).encoding === 'ndarray') { | ||
| const torchValue = value as { | ||
| value?: unknown; | ||
| shape?: readonly number[]; | ||
| dtype?: string; | ||
| device?: string; | ||
| }; | ||
| if ('value' in (torchValue as object)) { | ||
| const decoded = await decodeEnvelopeAsync(torchValue.value, decodeArrow); | ||
| return { | ||
| data: decoded, | ||
| shape: torchValue.shape, | ||
| dtype: torchValue.dtype, | ||
| device: torchValue.device, | ||
| } satisfies TorchTensor; | ||
| } | ||
| } | ||
| if ( | ||
| marker === 'sklearn.estimator' && | ||
| (value as { encoding?: unknown }).encoding === 'json' && | ||
| typeof (value as { className?: unknown }).className === 'string' && | ||
| typeof (value as { module?: unknown }).module === 'string' && | ||
| isObject((value as { params?: unknown }).params) | ||
| ) { | ||
| const estimator = value as { | ||
| className: string; | ||
| module: string; | ||
| version?: string; | ||
| params: Record<string, unknown>; | ||
| }; | ||
| return { | ||
| className: estimator.className, | ||
| module: estimator.module, | ||
| version: estimator.version, | ||
| params: estimator.params, | ||
| } satisfies SklearnEstimator; | ||
| } | ||
| return value as unknown; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Significant code duplication between decodeEnvelope and decodeEnvelopeAsync.
The new async function duplicates ~100 lines from the sync version. This creates maintenance burden—any future envelope type or logic change must be applied in both places.
Consider refactoring to share the common logic. One approach: make the core function always async-compatible by accepting a decoder that may return T | Promise<T>, then have the sync wrapper use a blocking pattern or keep sync-only paths separate.
♻️ Suggested approach to reduce duplication
// Example: Single implementation that handles both sync and async
async function decodeEnvelopeCore<T>(
value: unknown,
decodeArrow: (bytes: Uint8Array) => T | Promise<T>,
recurse: (v: unknown) => T | Promise<T | unknown>
): Promise<T | unknown> {
// ... shared envelope matching logic ...
// For torch.tensor:
if (marker === 'torch.tensor' && ...) {
const decoded = await recurse(torchValue.value);
return { data: decoded, ... };
}
// ...
}
// Sync version wraps with sync decoder
export function decodeValue(value: unknown): DecodedValue {
// Use sync-only path or validate decoder is sync
}
// Async version
export async function decodeValueAsync(value: unknown): Promise<DecodedValue> {
return decodeEnvelopeCore(value, tryDecodeArrowTable,
v => decodeEnvelopeCore(v, tryDecodeArrowTable, ...));
}🤖 Prompt for AI Agents
In `@src/utils/codec.ts` around lines 261 - 369, decodeEnvelopeAsync duplicates
most logic from the synchronous decodeEnvelope; extract the shared
envelope-matching/field-extraction logic into a single core function (e.g.,
decodeEnvelopeCore) that takes a decodeArrow decoder returning T | Promise<T>
and a recurse/callback to decode nested values, then have decodeEnvelope call
the core synchronously (or wrap a sync decoder) and decodeEnvelopeAsync await
the core; update torch.tensor handling to call the provided recurse for nested
value decoding and remove the duplicated branches from decodeEnvelopeAsync so
both variants reuse the same implementation.
Fixes #21\n\n- Make async decoding await nested Arrow ndarrays inside torch tensor envelopes.\n- Add a regression test to ensure TorchTensor.data is a concrete value, not a Promise.\n