Skip to content

Conversation

@guan404ming
Copy link
Member

@guan404ming guan404ming commented Dec 4, 2025

Purpose of PR

Integrates Apache Arrow to enable efficient columnar data processing for quantum encoding operations. This addition provides a standardized path for handling structured data inputs alongside the existing raw &[f64] interface.

Related Issues or PRs

Changes Made

  • Bug fix
  • New feature
  • Refactoring
  • Documentation
  • Test
  • CI/CD pipeline
  • Other

Breaking Changes

  • Yes
  • No

Checklist

  • Added or updated unit tests for all changes
  • Added or updated documentation for all changes
  • Successfully built and ran all unit tests or manual tests locally
  • PR title follows "MAHOUT-XXX: Brief Description" format (if related to an issue)
  • Code follows ASF guidelines

@guan404ming guan404ming changed the base branch from main to dev-qdp December 4, 2025 11:29
@guan404ming guan404ming marked this pull request as ready for review December 4, 2025 11:33
@guan404ming guan404ming changed the title Integrate Apache Arrow for data processing [QDP] Integrate Apache Arrow for data processing Dec 4, 2025
@guan404ming guan404ming requested a review from rich7420 December 4, 2025 11:35
@guan404ming guan404ming marked this pull request as draft December 4, 2025 11:41
@rich7420
Copy link
Contributor

rich7420 commented Dec 4, 2025

Thanks for the patch @guan404ming !!!
I think we could check about zero-copy parts like arrow_to_vec and Vec::push.
They will cause extra memory-copy and memory-reallocation.

@guan404ming guan404ming marked this pull request as ready for review December 4, 2025 16:38
@guan404ming
Copy link
Member Author

Thanks for the patch @guan404ming !!!
I think we could check about zero-copy parts like arrow_to_vec and Vec::push.
They will cause extra memory-copy and memory-reallocation.

Nice suggestion, I've updated with much more optimized version. Thanks!

)]));

// Create Float64Array from slice
let array = Float64Array::from(Vec::from(data));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use Float64Array::from_iter_values(data.iter().copied()) to avoid the extra allocation.

Comment on lines 191 to 194
pub fn read_parquet_to_arrow<P: AsRef<Path>>(path: P) -> Result<Float64Array> {
let data = read_parquet(path)?;
Ok(Float64Array::from(data))
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Directly constructing an Arrow array via ParquetRecordBatchReader would avoid an extra copy.

@rich7420
Copy link
Contributor

rich7420 commented Dec 5, 2025

Looks good, but the current implementation forces a memory copy (Vec allocation) even when we want to use Arrow directly. We should refactor io.rs so that read_parquet_to_arrow is the base implementation, ensuring true zero-copy performance for the pipeline.
origin: Disk -> Arrow -> Vec (copy) -> Arrow (copy) -> GPU
we need: Disk -> Arrow -> Arrow (Zero-copy Reference) -> GPU (through Pointer)
I think so, plz correct me if I'm wrong.

@400Ping
Copy link

400Ping commented Dec 5, 2025

Looks good, but the current implementation forces a memory copy (Vec allocation) even when we want to use Arrow directly. We should refactor io.rs so that read_parquet_to_arrow is the base implementation, ensuring true zero-copy performance for the pipeline.

origin: Disk -> Arrow -> Vec (copy) -> Arrow (copy) -> GPU

we need: Disk -> Arrow -> Arrow (Zero-copy Reference) -> GPU (through Pointer)

I think so, plz correct me if I'm wrong.

I agree with @rich7420 as well.

@guan404ming
Copy link
Member Author

guan404ming commented Dec 5, 2025

Looks good, but the current implementation forces a memory copy (Vec allocation) even when we want to use Arrow directly. We should refactor io.rs so that read_parquet_to_arrow is the base implementation, ensuring true zero-copy performance for the pipeline.
origin: Disk -> Arrow -> Vec (copy) -> Arrow (copy) -> GPU
we need: Disk -> Arrow -> Arrow (Zero-copy Reference) -> GPU (through Pointer)
I think so, plz correct me if I'm wrong.

I think you're right, thanks for pointing out. I've updated the implementation to only.
Disk ----> Arrow Buffers (pointer only) -----> GPU

}

#[test]
fn test_chunked_zero_copy_api() {
Copy link
Contributor

@ryankert01 ryankert01 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's hard but should we verify if the mem address are the same? Looks like it only do value check.

Might be something like this: (which only ensure the pointer's in mmap)

let chunk_ptr = chunks[0].values().as_ptr() as *const u8;

assert!(chunk_ptr >= mmap_ptr);
assert!(chunk_ptr < unsafe { mmap_ptr.add(mmap_len) });

Copy link
Contributor

@rich7420 rich7420 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ryankert01 you're right. It seems current test verifies data integrity but not the zero-copy behavior. I think that verifying memory addresses strictly requires unsafe access to the underlying Arrow buffer pointers. We should add a test case that inspects array.values().as_ptr() to ensure it aligns with expectations, though fully verifying mmap alignment is tricky in tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a followup too

@rich7420
Copy link
Contributor

rich7420 commented Dec 5, 2025

@guan404ming Great work on the io refactor! Thanks!
We need to upgrade our amplitude.rs because:
encode_from_parquet currently falls back to encode_chunked, which merges all chunks into a huge Vec on CPU. This breaks zero-copy for large files.
We should override encode_chunked in AmplitudeEncoder to stream chunks directly to the GPU without merging.
What do you think?
I could send a PR for this part.

@guan404ming
Copy link
Member Author

guan404ming commented Dec 5, 2025

@guan404ming Great work on the io refactor! Thanks!
We need to upgrade our amplitude.rs because:
encode_from_parquet currently falls back to encode_chunked, which merges all chunks into a huge Vec on CPU. This breaks zero-copy for large files.
We should override encode_chunked in AmplitudeEncoder to stream chunks directly to the GPU without merging.
What do you think?
I could send a PR for this part.

I previously plan to send following PR for this but forget to add in PR description. You definitely could help with this, thanks!

Copy link

@400Ping 400Ping left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OverAll LGTM

@guan404ming
Copy link
Member Author

guan404ming commented Dec 5, 2025

After our offline discussion and investigation, we confirmed that Parquet cannot achieve true zero-copy from disk to memory because its data is stored in compressed and encoded form and must be decoded before use. We’ll continue keeping Parquet for now given its convenience and practicality. Also I’ve updated the TODO to migrate the encoder to a chunk-based API.

I would follow up on Arrow IPC as a potential path toward a real zero-copy data pipeline.

@guan404ming guan404ming changed the title [QDP] Integrate Apache Arrow for data processing [QDP] Integrate Apache Arrow and Parquet for data processing Dec 5, 2025
@guan404ming
Copy link
Member Author

cc @rich7420 @ryankert01 @400Ping

@rich7420
Copy link
Contributor

rich7420 commented Dec 5, 2025

Sure! I will improve the path of RAM -> GPU on #689 .

@guan404ming
Copy link
Member Author

I'm going to merge this, please feel free to open PR to refine this. Thanks for all review and discussion!

@guan404ming guan404ming merged commit 64637fa into apache:dev-qdp Dec 5, 2025
2 checks passed
@guan404ming guan404ming deleted the integrate-arrow-rs branch December 5, 2025 15:15
@guan404ming
Copy link
Member Author

LOL

@rich7420
Copy link
Contributor

rich7420 commented Dec 5, 2025

Speed as Rocket.

guan404ming added a commit to guan404ming/mahout that referenced this pull request Dec 11, 2025
)

* Integrate Apache Arrow & Parquet for data processing

* Optimize Arrow Float64Array handling in io

* Add chunked Arrow Float64Array support

* Refactor encoding to support chunked Arrow Float64Array input

* Refactor I/O and encoding documentation to remove zero-copy
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants