-
Notifications
You must be signed in to change notification settings - Fork 198
[AURON #1693] split one large batch to many small batch after sort merge join #1694
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR addresses issue #1693 by introducing batch size normalization for sort merge join output. The sort merge join operator can produce very large batches that exceed the configured batch size, leading to memory and performance issues. The solution splits these large batches into smaller ones and then applies coalescing to ensure optimal batch sizes.
Key Changes:
- Added
split_with_default_batch_sizemethod to ExecutionContext that splits large batches into chunks of target batch size - Modified SortMergeJoinExec to apply both split and coalesce operations to its output stream
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| native-engine/datafusion-ext-plans/src/common/execution_context.rs | Introduces split_with_default_batch_size method with internal SplitLargeBatchStream that slices large batches into smaller chunks using the configured batch size |
| native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs | Updates output_with_streams to wrap the output with both split and coalesce operations for batch size normalization |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| pub fn split_with_default_batch_size( | ||
| self: &Arc<Self>, | ||
| input: SendableRecordBatchStream, | ||
| ) -> SendableRecordBatchStream { |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The split_with_default_batch_size method lacks documentation. Consider adding a doc comment that explains:
- The purpose of this method (splitting large batches into smaller ones)
- The behavior when batches are already smaller than the target size
- The target batch size used (from
batch_size()) - The relationship with
coalesce_with_default_batch_size
Example:
/// Splits large record batches into smaller batches with sizes not exceeding
/// the default batch size. Batches smaller than or equal to the target size
/// are passed through unchanged. Empty batches are filtered out.
///
/// This is typically used in combination with `coalesce_with_default_batch_size`
/// to normalize batch sizes in a stream.
| pub fn split_with_default_batch_size( | ||
| self: &Arc<Self>, | ||
| input: SendableRecordBatchStream, | ||
| ) -> SendableRecordBatchStream { | ||
| struct SplitLargeBatchStream { | ||
| input: SendableRecordBatchStream, | ||
| current_batch: Option<RecordBatch>, | ||
| current_offset: usize, | ||
| } | ||
|
|
||
| impl SplitLargeBatchStream { | ||
| fn split_next_chunk(&mut self) -> Option<RecordBatch> { | ||
| let batch = self.current_batch.as_ref()?; | ||
| let target_batch_size = batch_size(); | ||
| let num_rows = batch.num_rows(); | ||
|
|
||
| if self.current_offset >= num_rows { | ||
| self.current_batch = None; | ||
| return None; | ||
| } | ||
|
|
||
| let chunk_size = std::cmp::min(target_batch_size, num_rows - self.current_offset); | ||
| let chunk = batch.slice(self.current_offset, chunk_size); | ||
| self.current_offset += chunk_size; | ||
|
|
||
| if self.current_offset >= num_rows { | ||
| self.current_batch = None; | ||
| } | ||
|
|
||
| Some(chunk) | ||
| } | ||
| } | ||
|
|
||
| impl RecordBatchStream for SplitLargeBatchStream { | ||
| fn schema(&self) -> SchemaRef { | ||
| self.input.schema() | ||
| } | ||
| } | ||
|
|
||
| impl Stream for SplitLargeBatchStream { | ||
| type Item = Result<RecordBatch>; | ||
|
|
||
| fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { | ||
| loop { | ||
| if let Some(chunk) = self.split_next_chunk() { | ||
| return Poll::Ready(Some(Ok(chunk))); | ||
| } | ||
|
|
||
| match ready!(self.input.as_mut().poll_next_unpin(cx)) { | ||
| Some(Ok(batch)) => { | ||
| if batch.is_empty() { | ||
| continue; | ||
| } | ||
|
|
||
| let target_batch_size = batch_size(); | ||
| if target_batch_size == 0 { | ||
| return Poll::Ready(Some(Err(DataFusionError::Internal( | ||
| "Invalid batch size: 0".to_string(), | ||
| )))); | ||
| } | ||
|
|
||
| let num_rows = batch.num_rows(); | ||
| if num_rows <= target_batch_size { | ||
| return Poll::Ready(Some(Ok(batch))); | ||
| } else { | ||
| self.current_batch = Some(batch); | ||
| self.current_offset = 0; | ||
| } | ||
| } | ||
| Some(Err(e)) => return Poll::Ready(Some(Err(e))), | ||
| None => return Poll::Ready(None), | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Box::pin(SplitLargeBatchStream { | ||
| input, | ||
| current_batch: None, | ||
| current_offset: 0, | ||
| }) | ||
| } |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new split_with_default_batch_size method lacks test coverage. Consider adding unit tests to verify:
- Splitting large batches (e.g., 10000 rows) into multiple smaller batches
- Passing through batches already at or below target size
- Handling empty batches
- Error handling for zero batch size
- Edge cases like batches with exactly target_batch_size rows
This is important to ensure the splitting logic works correctly, especially for the error case at line 199-202.
| Ok(exec_ctx | ||
| .coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output))) |
Copilot
AI
Dec 3, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The composition coalesce_with_default_batch_size(split_with_default_batch_size(output)) may create unnecessary overhead. After splitting large batches to the target size, the coalesce operation will immediately try to merge them back if they're too small (less than 1/4 of batch_size based on line 275 of execution_context.rs).
Consider whether both operations are needed here, or if just split_with_default_batch_size would suffice for the sort merge join output. The coalesce operation is typically used for small batches from sources like filters or unions, but after splitting, batches should already be close to the target size.
| Ok(exec_ctx | |
| .coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output))) | |
| Ok(exec_ctx.split_with_default_batch_size(output)) |
|
@XorSum I wonder why do we need to do this manual batch split, the join algorithm itself already have batch_size handling logic, there is bug in it? auron/native-engine/datafusion-ext-plans/src/joins/smj/full_join.rs Lines 55 to 57 in 1051790
|
Thanks for the comment! I do not yet understand how the join batch_size handling logic works, however I did find a reproducible bug(#1693). An extremely large batch (100 million rows) is generated after the join operation, and lead to the We can verify this fact by printing the num_rows in
|
In some extreme cases which have a lot of duplicated keys in both join sides, the |


Which issue does this PR close?
Closes #1693.
Rationale for this change
What changes are included in this PR?
split_with_default_batch_sizemethod, which split large batch into many small batches with default - batch_size.Are there any user-facing changes?
No
How was this patch tested?
Performed manual testing with the code specified in the issue.