Skip to content

Commit 59e605c

Browse files
committed
impl(sprint-13/W-I4): D-CSV-17 — rayon par_* variants for QualiaStream/InferenceStream/SplatFieldStream (OQ-CSV-7 rayon feature gate; OQ-CSV-8 fixed cache-line chunks; CI matrix hpc-stream-parallel job co-shipped)
- par_qualia_stream: 8-row fixed chunks (8B×8=64B cache line, OQ-CSV-8) - par_inference_stream: 8-row fixed chunks (8B×8=64B cache line, OQ-CSV-8) - par_splat_field_stream: 4-row fixed chunks (16B×4=64B cache line, OQ-CSV-8) - All three return impl IndexedParallelIterator (enumerate+zip+collect determinism, spec §2) - 18 new tests: 6 per stream (T-P-Q-1..6, T-P-I-1..6, T-P-S-1..6), all pass - mod.rs: pub use registration for par_* under cfg(feature = "rayon") (AP5 clean) - CI: hpc-stream-parallel job added to ci.yaml + conclusion needs (D-CSV-17 CI ownership) https://claude.ai/code/session_01UwJuKqP828qyX1VkLgGJFS
1 parent 4770a87 commit 59e605c

5 files changed

Lines changed: 387 additions & 8 deletions

File tree

.github/workflows/ci.yaml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,27 @@ jobs:
150150
- name: Test native + approx
151151
run: cargo nextest run -p ndarray --features native,approx
152152

153+
hpc-stream-parallel:
154+
# D-CSV-17 (sprint-13 W-I4): rayon par_* variants for hpc::stream.
155+
# This job co-ships with the par_* implementation so the rayon feature
156+
# gate is always exercised alongside the code it guards — prevents
157+
# silent-dead-code drift (spec §0 + worker-template-v2 §5 CI ownership).
158+
runs-on: ubuntu-latest
159+
name: hpc-stream-parallel/rayon
160+
steps:
161+
- uses: actions/checkout@v4
162+
- uses: dtolnay/rust-toolchain@1.95.0
163+
- uses: Swatinem/rust-cache@v2
164+
- uses: taiki-e/install-action@nextest
165+
- name: cargo check (no rayon — scalar path unchanged)
166+
run: cargo check -p ndarray
167+
- name: cargo check --features rayon
168+
run: cargo check -p ndarray --features rayon
169+
- name: par_* stream tests (--features rayon, hpc::stream filter)
170+
run: cargo nextest run -p ndarray --features rayon -E 'test(hpc::stream)'
171+
- name: clippy --features rayon
172+
run: cargo clippy -p ndarray --features rayon --lib -- -D warnings
173+
153174
blas-msrv:
154175
runs-on: ubuntu-latest
155176
name: blas-msrv
@@ -247,6 +268,7 @@ jobs:
247268
- nostd
248269
- tests
249270
- native-backend
271+
- hpc-stream-parallel
250272
- miri
251273
- cross_test
252274
- cargo-careful

src/hpc/stream/inference.rs

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
//! over the inference-mantissa lane of the EdgeColumn SoA. Used by the
44
//! integer-SIMD MUL evaluation hot path (D-CSV-8 sprint-12 SIMD vec).
55
//!
6-
//! Pure iterator scaffold; `par_inference_stream` rayon variant is sprint-13+.
6+
//! Pure iterator scaffold; `par_inference_stream` rayon variant wired in
7+
//! sprint-13 (D-CSV-17) behind `#[cfg(feature = "rayon")]`.
78
89
// Local mirror of CausalEdge64 shape (bit-compatible with causal_edge::CausalEdge64).
910
// No cross-crate import: ndarray is the producer; causal-edge is the consumer.
@@ -113,6 +114,35 @@ impl<'a> ExactSizeIterator for InferenceStream<'a> {
113114
}
114115
}
115116

117+
// ─── Rayon-parallel variant (D-CSV-17, sprint-13) ─────────────────────────────
118+
119+
#[cfg(feature = "rayon")]
120+
use rayon::prelude::*;
121+
122+
/// Rayon-parallel forward-iterator over `&[InferenceRow]`.
123+
///
124+
/// Mirrors `par_qualia_stream` semantics. Returns `impl IndexedParallelIterator`
125+
/// so callers retain `enumerate()`, `zip()`, and order-preserving `collect()`.
126+
///
127+
/// For `InferenceRow` (8 B/row, `repr(C, align(8))`), 8 rows fill one 64-byte
128+
/// cache line; callers folding into ordered structures should call
129+
/// `.with_min_len(8)` to align chunks (OQ-CSV-8 fixed chunk for InferenceRow).
130+
///
131+
/// Particularly useful for the integer-SIMD MUL evaluation hot path (D-CSV-8):
132+
/// folding the inference mantissa lane across millions of EdgeColumn rows
133+
/// benefits from work-stealing on multi-core hosts.
134+
///
135+
/// # Determinism
136+
///
137+
/// See §6 of pr-sprint-13-rayon-streams.md. Pattern A (order-insensitive folds
138+
/// like `.sum()`, `.count()`) and Pattern B (`collect()`) are safe.
139+
/// Pattern C (non-commutative accumulators) requires per-callsite analysis.
140+
#[cfg(feature = "rayon")]
141+
#[inline]
142+
pub fn par_inference_stream(rows: &[InferenceRow]) -> impl IndexedParallelIterator<Item = (usize, &InferenceRow)> {
143+
rows.par_iter().enumerate()
144+
}
145+
116146
#[cfg(test)]
117147
mod tests {
118148
use super::*;
@@ -217,3 +247,81 @@ mod tests {
217247
assert_eq!(first.1 .0, 10);
218248
}
219249
}
250+
251+
// ─── Rayon par_* tests (D-CSV-17) ─────────────────────────────────────────────
252+
253+
#[cfg(all(test, feature = "rayon"))]
254+
mod par_tests {
255+
use super::{par_inference_stream, InferenceRow, InferenceStream};
256+
use rayon::prelude::*;
257+
use std::sync::atomic::{AtomicUsize, Ordering};
258+
259+
/// T-P-I-1: par_inference_stream yields all N items.
260+
#[test]
261+
fn test_par_inference_yields_all() {
262+
let rows: Vec<InferenceRow> = (0u64..1024).map(InferenceRow).collect();
263+
let count = par_inference_stream(&rows).count();
264+
assert_eq!(count, 1024);
265+
}
266+
267+
/// T-P-I-2: par_inference_stream on empty slice yields zero items.
268+
#[test]
269+
fn test_par_inference_empty() {
270+
let rows: Vec<InferenceRow> = vec![];
271+
let count = par_inference_stream(&rows).count();
272+
assert_eq!(count, 0);
273+
}
274+
275+
/// T-P-I-3: par_iter result equals serial iter result (as sorted sets).
276+
#[test]
277+
fn test_par_inference_matches_serial() {
278+
let rows: Vec<InferenceRow> = (0u64..256).map(InferenceRow).collect();
279+
let mut par: Vec<u64> = par_inference_stream(&rows)
280+
.map(|(i, r)| (i as u64) ^ r.0)
281+
.collect();
282+
let mut ser: Vec<u64> = InferenceStream::new(&rows)
283+
.map(|(i, r)| (i as u64) ^ r.0)
284+
.collect();
285+
par.sort_unstable();
286+
ser.sort_unstable();
287+
assert_eq!(par, ser);
288+
}
289+
290+
/// T-P-I-4: filter on inference_mantissa() — sign-extension behaves
291+
/// identically under parallel access.
292+
///
293+
/// 256 rows with mantissa varying 0..16 cyclically via bits 46-49:
294+
/// - raw values 0..7 → mantissa 0..7 (non-negative): 128 rows
295+
/// - raw values 8..15 → mantissa -8..-1 (negative): 128 rows
296+
#[test]
297+
fn test_par_inference_filter_mantissa() {
298+
let rows: Vec<InferenceRow> = (0u64..256).map(|i| InferenceRow((i & 0xF) << 46)).collect();
299+
let neg = par_inference_stream(&rows)
300+
.filter(|(_, r)| r.inference_mantissa() < 0)
301+
.count();
302+
assert_eq!(neg, 128);
303+
}
304+
305+
/// T-P-I-5: with_min_len(8) knob compiles and yields all items.
306+
/// 8 rows × 8 B = 64 B = one cache line (OQ-CSV-8 fixed chunk for InferenceRow).
307+
#[test]
308+
fn test_par_inference_min_len() {
309+
let rows: Vec<InferenceRow> = (0u64..1024).map(InferenceRow).collect();
310+
let count = par_inference_stream(&rows).with_min_len(8).count();
311+
assert_eq!(count, 1024);
312+
}
313+
314+
/// T-P-I-6: thread-safety — InferenceRow is Send + Sync; verified by
315+
/// mutating an AtomicUsize from the parallel for_each closure.
316+
#[test]
317+
fn test_par_inference_send_sync() {
318+
fn assert_send_sync<T: Send + Sync>() {}
319+
assert_send_sync::<InferenceRow>();
320+
let rows: Vec<InferenceRow> = (0u64..1024).map(InferenceRow).collect();
321+
let counter = AtomicUsize::new(0);
322+
par_inference_stream(&rows).for_each(|_| {
323+
counter.fetch_add(1, Ordering::Relaxed);
324+
});
325+
assert_eq!(counter.load(Ordering::Relaxed), 1024);
326+
}
327+
}

src/hpc/stream/mod.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
//! Per cognitive-substrate-convergence-v1.md §5 L-20.
33
//!
44
//! Sprint-12 scope (W-F4/5/6): `QualiaStream` + `InferenceStream` +
5-
//! `SplatFieldStream` forward-iterator scaffolds. Sprint-13+:
6-
//! `par_*` rayon variants once rayon is wired into the ndarray
7-
//! feature gate.
5+
//! `SplatFieldStream` forward-iterator scaffolds. Sprint-13 (D-CSV-17):
6+
//! `par_*` rayon variants wired behind `#[cfg(feature = "rayon")]`.
87
98
pub mod inference;
109
pub mod qualia;
@@ -13,3 +12,10 @@ pub mod splat_field;
1312
pub use inference::{InferenceRow, InferenceStream};
1413
pub use qualia::{QualiaI4Row, QualiaStream};
1514
pub use splat_field::{SplatField, SplatFieldStream};
15+
16+
#[cfg(feature = "rayon")]
17+
pub use inference::par_inference_stream;
18+
#[cfg(feature = "rayon")]
19+
pub use qualia::par_qualia_stream;
20+
#[cfg(feature = "rayon")]
21+
pub use splat_field::par_splat_field_stream;

src/hpc/stream/qualia.rs

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
//! QualiaColumn SoA layout introduced by D-CSV-5b.
55
//!
66
//! Yields `(row_index, &QualiaI4Row)` tuples. Pure iterator scaffold; the
7-
//! `par_qualia_stream` rayon-parallel variant is sprint-13+ once rayon is
8-
//! wired into the ndarray feature gate.
7+
//! `par_qualia_stream` rayon-parallel variant is wired in sprint-13 (D-CSV-17)
8+
//! behind `#[cfg(feature = "rayon")]`.
99
1010
// NOTE: do NOT import lance-graph-contract here (would create circular dep
1111
// since contract is *consumer* of ndarray). Define a minimal local mirror
@@ -108,6 +108,50 @@ impl<'a> ExactSizeIterator for QualiaStream<'a> {
108108
}
109109
}
110110

111+
// ─── Rayon-parallel variant (D-CSV-17, sprint-13) ─────────────────────────────
112+
113+
#[cfg(feature = "rayon")]
114+
use rayon::prelude::*;
115+
116+
/// Rayon-parallel forward-iterator over a borrowed `&[QualiaI4Row]` slice.
117+
///
118+
/// Yields `(row_index, &QualiaI4Row)` tuples. Unlike the scalar
119+
/// `QualiaStream`, iteration order is **not** guaranteed to be ascending
120+
/// by index; rayon's work-stealing scheduler may process chunks
121+
/// out-of-order. See §6 of pr-sprint-13-rayon-streams.md for the
122+
/// determinism contract callers must respect.
123+
///
124+
/// Returns `impl IndexedParallelIterator` (not bare `ParallelIterator`) so
125+
/// callers can use `enumerate()`, `zip()`, and `collect()` with guaranteed
126+
/// index-order preservation (rayon's `IndexedParallelIterator::collect` is
127+
/// contract-guaranteed to preserve original order).
128+
///
129+
/// # Chunk-size note
130+
///
131+
/// `QualiaI4Row` is 8 bytes (`repr(C, align(8))`), so 8 rows fit one 64-byte
132+
/// cache line. For folds into ordered structures, call `.with_min_len(8)` to
133+
/// align chunks to cache-line boundaries (see OQ-CSV-8).
134+
///
135+
/// # Example
136+
///
137+
/// ```
138+
/// # #[cfg(feature = "rayon")] {
139+
/// use ndarray::hpc::stream::qualia::{QualiaI4Row, par_qualia_stream};
140+
/// use rayon::prelude::*;
141+
///
142+
/// let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
143+
/// let total_nonzero: usize = par_qualia_stream(&rows)
144+
/// .filter(|(_, r)| r.0 != 0)
145+
/// .count();
146+
/// assert_eq!(total_nonzero, 1023); // QualiaI4Row(0) is the lone zero
147+
/// # }
148+
/// ```
149+
#[cfg(feature = "rayon")]
150+
#[inline]
151+
pub fn par_qualia_stream(rows: &[QualiaI4Row]) -> impl IndexedParallelIterator<Item = (usize, &QualiaI4Row)> {
152+
rows.par_iter().enumerate()
153+
}
154+
111155
// ─── Tests ────────────────────────────────────────────────────────────────────
112156

113157
#[cfg(test)]
@@ -199,3 +243,79 @@ mod tests {
199243
assert_eq!(ExactSizeIterator::len(&stream), 0);
200244
}
201245
}
246+
247+
// ─── Rayon par_* tests (D-CSV-17) ─────────────────────────────────────────────
248+
249+
#[cfg(all(test, feature = "rayon"))]
250+
mod par_tests {
251+
use super::{par_qualia_stream, QualiaI4Row, QualiaStream};
252+
use rayon::prelude::*;
253+
use std::sync::atomic::{AtomicUsize, Ordering};
254+
255+
/// T-P-Q-1: par_qualia_stream yields all N items.
256+
#[test]
257+
fn test_par_qualia_yields_all() {
258+
let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
259+
let count = par_qualia_stream(&rows).count();
260+
assert_eq!(count, 1024);
261+
}
262+
263+
/// T-P-Q-2: par_qualia_stream on empty slice yields zero items.
264+
#[test]
265+
fn test_par_qualia_empty() {
266+
let rows: Vec<QualiaI4Row> = vec![];
267+
let count = par_qualia_stream(&rows).count();
268+
assert_eq!(count, 0);
269+
}
270+
271+
/// T-P-Q-3: par_iter result equals serial iter result (as sorted sets).
272+
/// Both iterators produce the same (index XOR value) pairs; sorting
273+
/// makes the comparison order-independent.
274+
#[test]
275+
fn test_par_qualia_matches_serial() {
276+
let rows: Vec<QualiaI4Row> = (0u64..256).map(QualiaI4Row).collect();
277+
let mut par: Vec<u64> = par_qualia_stream(&rows)
278+
.map(|(i, r)| (i as u64) ^ r.0)
279+
.collect();
280+
let mut ser: Vec<u64> = QualiaStream::new(&rows)
281+
.map(|(i, r)| (i as u64) ^ r.0)
282+
.collect();
283+
par.sort_unstable();
284+
ser.sort_unstable();
285+
assert_eq!(par, ser);
286+
}
287+
288+
/// T-P-Q-4: par_iter with filter is correct (count of even-valued rows).
289+
#[test]
290+
fn test_par_qualia_with_filter() {
291+
let rows: Vec<QualiaI4Row> = (0u64..512).map(QualiaI4Row).collect();
292+
let count_even = par_qualia_stream(&rows)
293+
.filter(|(_, r)| r.0 % 2 == 0)
294+
.count();
295+
// Values 0, 2, 4, ..., 510 → 256 even values.
296+
assert_eq!(count_even, 256);
297+
}
298+
299+
/// T-P-Q-5: with_min_len(8) knob compiles and yields all items.
300+
/// 8 rows × 8 B = 64 B = one cache line (OQ-CSV-8 fixed chunk for QualiaI4).
301+
#[test]
302+
fn test_par_qualia_min_len() {
303+
let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
304+
let count = par_qualia_stream(&rows).with_min_len(8).count();
305+
assert_eq!(count, 1024);
306+
}
307+
308+
/// T-P-Q-6: thread-safety — QualiaI4Row is Send + Sync; verified by
309+
/// mutating an AtomicUsize from the parallel for_each closure.
310+
#[test]
311+
fn test_par_qualia_send_sync() {
312+
fn assert_send_sync<T: Send + Sync>() {}
313+
assert_send_sync::<QualiaI4Row>();
314+
let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
315+
let counter = AtomicUsize::new(0);
316+
par_qualia_stream(&rows).for_each(|_| {
317+
counter.fetch_add(1, Ordering::Relaxed);
318+
});
319+
assert_eq!(counter.load(Ordering::Relaxed), 1024);
320+
}
321+
}

0 commit comments

Comments
 (0)