diff --git a/native/Cargo.lock b/native/Cargo.lock index 340f0fe0cd..bc015fc388 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1834,12 +1834,13 @@ dependencies = [ "aws-config", "aws-credential-types", "bytes", - "crc32fast", "criterion", "datafusion", + "datafusion-comet-common", "datafusion-comet-jni-bridge", "datafusion-comet-objectstore-hdfs", "datafusion-comet-proto", + "datafusion-comet-shuffle", "datafusion-comet-spark-expr", "datafusion-datasource", "datafusion-functions-nested", @@ -1856,7 +1857,6 @@ dependencies = [ "lazy_static", "log", "log4rs", - "lz4_flex 0.13.0", "mimalloc", "num", "object_store", @@ -1872,15 +1872,12 @@ dependencies = [ "rand 0.10.0", "reqwest", "serde_json", - "simd-adler32", - "snap", "tempfile", "tikv-jemalloc-ctl", "tikv-jemallocator", "tokio", "url", "uuid", - "zstd", ] [[package]] @@ -1949,6 +1946,31 @@ dependencies = [ "prost-build", ] +[[package]] +name = "datafusion-comet-shuffle" +version = "0.15.0" +dependencies = [ + "arrow", + "async-trait", + "bytes", + "crc32fast", + "criterion", + "datafusion", + "datafusion-comet-common", + "datafusion-comet-jni-bridge", + "datafusion-comet-spark-expr", + "futures", + "itertools 0.14.0", + "jni", + "log", + "lz4_flex 0.13.0", + "simd-adler32", + "snap", + "tempfile", + "tokio", + "zstd", +] + [[package]] name = "datafusion-comet-spark-expr" version = "0.15.0" diff --git a/native/Cargo.toml b/native/Cargo.toml index 693221b157..e75c1fd241 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -16,8 +16,8 @@ # under the License. [workspace] -default-members = ["core", "spark-expr", "common", "proto", "jni-bridge"] -members = ["core", "spark-expr", "common", "proto", "jni-bridge", "hdfs", "fs-hdfs"] +default-members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle"] +members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle", "hdfs", "fs-hdfs"] resolver = "2" [workspace.package] @@ -46,6 +46,7 @@ datafusion-comet-spark-expr = { path = "spark-expr" } datafusion-comet-common = { path = "common" } datafusion-comet-jni-bridge = { path = "jni-bridge" } datafusion-comet-proto = { path = "proto" } +datafusion-comet-shuffle = { path = "shuffle" } chrono = { version = "0.4", default-features = false, features = ["clock"] } chrono-tz = { version = "0.10" } futures = "0.3.32" diff --git a/native/common/README.md b/native/common/README.md new file mode 100644 index 0000000000..842b441b53 --- /dev/null +++ b/native/common/README.md @@ -0,0 +1,25 @@ + + +# datafusion-comet-common: Common Types + +This crate provides common types shared across Apache DataFusion Comet crates and is maintained as part of the +[Apache DataFusion Comet] subproject. + +[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/ diff --git a/native/common/src/lib.rs b/native/common/src/lib.rs index 9319d7347f..a9549badb1 100644 --- a/native/common/src/lib.rs +++ b/native/common/src/lib.rs @@ -17,6 +17,9 @@ mod error; mod query_context; +pub mod tracing; +mod utils; pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult}; pub use query_context::{create_query_context_map, QueryContext, QueryContextMap}; +pub use utils::bytes_to_i128; diff --git a/native/common/src/tracing.rs b/native/common/src/tracing.rs new file mode 100644 index 0000000000..58bea64a7a --- /dev/null +++ b/native/common/src/tracing.rs @@ -0,0 +1,147 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use datafusion::common::instant::Instant; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Write}; +use std::sync::{Arc, LazyLock, Mutex}; + +pub static RECORDER: LazyLock = LazyLock::new(Recorder::new); + +/// Log events using Chrome trace format JSON +/// https://github.com/catapult-project/catapult/blob/main/tracing/README.md +pub struct Recorder { + now: Instant, + writer: Arc>>, +} + +impl Default for Recorder { + fn default() -> Self { + Self::new() + } +} + +impl Recorder { + pub fn new() -> Self { + let file = OpenOptions::new() + .create(true) + .append(true) + .open("comet-event-trace.json") + .expect("Error writing tracing"); + + let mut writer = BufWriter::new(file); + + // Write start of JSON array. Note that there is no requirement to write + // the closing ']'. + writer + .write_all("[ ".as_bytes()) + .expect("Error writing tracing"); + Self { + now: Instant::now(), + writer: Arc::new(Mutex::new(writer)), + } + } + pub fn begin_task(&self, name: &str) { + self.log_event(name, "B") + } + + pub fn end_task(&self, name: &str) { + self.log_event(name, "E") + } + + pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) { + let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize; + let json = format!( + "{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }} }},\n", + Self::get_thread_id(), + self.now.elapsed().as_micros() + ); + let mut writer = self.writer.lock().unwrap(); + writer + .write_all(json.as_bytes()) + .expect("Error writing tracing"); + } + + fn log_event(&self, name: &str, ph: &str) { + let json = format!( + "{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": 1, \"tid\": {}, \"ts\": {} }},\n", + name, + Self::get_thread_id(), + self.now.elapsed().as_micros() + ); + let mut writer = self.writer.lock().unwrap(); + writer + .write_all(json.as_bytes()) + .expect("Error writing tracing"); + } + + fn get_thread_id() -> u64 { + let thread_id = std::thread::current().id(); + format!("{thread_id:?}") + .trim_start_matches("ThreadId(") + .trim_end_matches(")") + .parse() + .expect("Error parsing thread id") + } +} + +pub fn trace_begin(name: &str) { + RECORDER.begin_task(name); +} + +pub fn trace_end(name: &str) { + RECORDER.end_task(name); +} + +pub fn log_memory_usage(name: &str, value: u64) { + RECORDER.log_memory_usage(name, value); +} + +pub fn with_trace(label: &str, tracing_enabled: bool, f: F) -> T +where + F: FnOnce() -> T, +{ + if tracing_enabled { + trace_begin(label); + } + + let result = f(); + + if tracing_enabled { + trace_end(label); + } + + result +} + +pub async fn with_trace_async(label: &str, tracing_enabled: bool, f: F) -> T +where + F: FnOnce() -> Fut, + Fut: std::future::Future, +{ + if tracing_enabled { + trace_begin(label); + } + + let result = f().await; + + if tracing_enabled { + trace_end(label); + } + + result +} diff --git a/native/common/src/utils.rs b/native/common/src/utils.rs new file mode 100644 index 0000000000..12283db30d --- /dev/null +++ b/native/common/src/utils.rs @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Converts a slice of bytes to i128. The bytes are serialized in big-endian order by +/// `BigInteger.toByteArray()` in Java. +pub fn bytes_to_i128(slice: &[u8]) -> i128 { + let mut bytes = [0; 16]; + let mut i = 0; + while i != 16 && i != slice.len() { + bytes[i] = slice[slice.len() - 1 - i]; + i += 1; + } + + // if the decimal is negative, we need to flip all the bits + if (slice[0] as i8) < 0 { + while i < 16 { + bytes[i] = !bytes[i]; + i += 1; + } + } + + i128::from_le_bytes(bytes) +} diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index 3f305a631d..b66830ecb5 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -47,10 +47,6 @@ log = "0.4" log4rs = "1.4.0" prost = "0.14.3" jni = "0.21" -snap = "1.1" -# we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation -lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] } -zstd = "0.13.3" rand = { workspace = true } num = { workspace = true } bytes = { workspace = true } @@ -62,11 +58,11 @@ datafusion-physical-expr-adapter = { workspace = true } datafusion-datasource = { workspace = true } datafusion-spark = { workspace = true } once_cell = "1.18.0" -crc32fast = "1.3.2" -simd-adler32 = "0.3.7" +datafusion-comet-common = { workspace = true } datafusion-comet-spark-expr = { workspace = true } datafusion-comet-jni-bridge = { workspace = true } datafusion-comet-proto = { workspace = true } +datafusion-comet-shuffle = { workspace = true } object_store = { workspace = true } url = { workspace = true } aws-config = { workspace = true } @@ -121,14 +117,6 @@ harness = false name = "bit_util" harness = false -[[bench]] -name = "row_columnar" -harness = false - -[[bench]] -name = "shuffle_writer" -harness = false - [[bench]] name = "parquet_decode" harness = false diff --git a/native/core/src/execution/mod.rs b/native/core/src/execution/mod.rs index 85fc672461..f556fce41c 100644 --- a/native/core/src/execution/mod.rs +++ b/native/core/src/execution/mod.rs @@ -23,7 +23,7 @@ pub(crate) mod metrics; pub mod operators; pub(crate) mod planner; pub mod serde; -pub mod shuffle; +pub use datafusion_comet_shuffle as shuffle; pub(crate) mod sort; pub(crate) mod spark_plan; pub use datafusion_comet_spark_expr::timezone; diff --git a/native/core/src/execution/tracing.rs b/native/core/src/execution/tracing.rs index 01351565f5..b02006efb9 100644 --- a/native/core/src/execution/tracing.rs +++ b/native/core/src/execution/tracing.rs @@ -15,128 +15,4 @@ // specific language governing permissions and limitations // under the License. -use datafusion::common::instant::Instant; -use once_cell::sync::Lazy; -use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; -use std::sync::{Arc, Mutex}; - -pub(crate) static RECORDER: Lazy = Lazy::new(Recorder::new); - -/// Log events using Chrome trace format JSON -/// https://github.com/catapult-project/catapult/blob/main/tracing/README.md -pub struct Recorder { - now: Instant, - writer: Arc>>, -} - -impl Recorder { - pub fn new() -> Self { - let file = OpenOptions::new() - .create(true) - .append(true) - .open("comet-event-trace.json") - .expect("Error writing tracing"); - - let mut writer = BufWriter::new(file); - - // Write start of JSON array. Note that there is no requirement to write - // the closing ']'. - writer - .write_all("[ ".as_bytes()) - .expect("Error writing tracing"); - Self { - now: Instant::now(), - writer: Arc::new(Mutex::new(writer)), - } - } - pub fn begin_task(&self, name: &str) { - self.log_event(name, "B") - } - - pub fn end_task(&self, name: &str) { - self.log_event(name, "E") - } - - pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) { - let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize; - let json = format!( - "{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }} }},\n", - Self::get_thread_id(), - self.now.elapsed().as_micros() - ); - let mut writer = self.writer.lock().unwrap(); - writer - .write_all(json.as_bytes()) - .expect("Error writing tracing"); - } - - fn log_event(&self, name: &str, ph: &str) { - let json = format!( - "{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": 1, \"tid\": {}, \"ts\": {} }},\n", - name, - Self::get_thread_id(), - self.now.elapsed().as_micros() - ); - let mut writer = self.writer.lock().unwrap(); - writer - .write_all(json.as_bytes()) - .expect("Error writing tracing"); - } - - fn get_thread_id() -> u64 { - let thread_id = std::thread::current().id(); - format!("{thread_id:?}") - .trim_start_matches("ThreadId(") - .trim_end_matches(")") - .parse() - .expect("Error parsing thread id") - } -} - -pub(crate) fn trace_begin(name: &str) { - RECORDER.begin_task(name); -} - -pub(crate) fn trace_end(name: &str) { - RECORDER.end_task(name); -} - -pub(crate) fn log_memory_usage(name: &str, value: u64) { - RECORDER.log_memory_usage(name, value); -} - -pub(crate) fn with_trace(label: &str, tracing_enabled: bool, f: F) -> T -where - F: FnOnce() -> T, -{ - if tracing_enabled { - trace_begin(label); - } - - let result = f(); - - if tracing_enabled { - trace_end(label); - } - - result -} - -pub(crate) async fn with_trace_async(label: &str, tracing_enabled: bool, f: F) -> T -where - F: FnOnce() -> Fut, - Fut: std::future::Future, -{ - if tracing_enabled { - trace_begin(label); - } - - let result = f().await; - - if tracing_enabled { - trace_end(label); - } - - result -} +pub(crate) use datafusion_comet_common::tracing::*; diff --git a/native/core/src/execution/utils.rs b/native/core/src/execution/utils.rs index f95423aa70..2fe6f8758f 100644 --- a/native/core/src/execution/utils.rs +++ b/native/core/src/execution/utils.rs @@ -97,23 +97,4 @@ impl SparkArrowConvert for ArrayData { } } -/// Converts a slice of bytes to i128. The bytes are serialized in big-endian order by -/// `BigInteger.toByteArray()` in Java. -pub fn bytes_to_i128(slice: &[u8]) -> i128 { - let mut bytes = [0; 16]; - let mut i = 0; - while i != 16 && i != slice.len() { - bytes[i] = slice[slice.len() - 1 - i]; - i += 1; - } - - // if the decimal is negative, we need to flip all the bits - if (slice[0] as i8) < 0 { - while i < 16 { - bytes[i] = !bytes[i]; - i += 1; - } - } - - i128::from_le_bytes(bytes) -} +pub use datafusion_comet_common::bytes_to_i128; diff --git a/native/jni-bridge/README.md b/native/jni-bridge/README.md new file mode 100644 index 0000000000..d49a3c2565 --- /dev/null +++ b/native/jni-bridge/README.md @@ -0,0 +1,25 @@ + + +# datafusion-comet-jni-bridge: JNI Bridge + +This crate provides the JNI interaction layer for Apache DataFusion Comet and is maintained as part of the +[Apache DataFusion Comet] subproject. + +[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/ diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml new file mode 100644 index 0000000000..e28827edc2 --- /dev/null +++ b/native/shuffle/Cargo.toml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-comet-shuffle" +description = "Apache DataFusion Comet: shuffle writer and reader" +version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +authors = { workspace = true } +readme = { workspace = true } +license = { workspace = true } +edition = { workspace = true } + +publish = false + +[dependencies] +arrow = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +crc32fast = "1.3.2" +datafusion = { workspace = true } +datafusion-comet-common = { workspace = true } +datafusion-comet-jni-bridge = { workspace = true } +datafusion-comet-spark-expr = { workspace = true } +futures = { workspace = true } +itertools = "0.14.0" +jni = "0.21" +log = "0.4" +lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] } +simd-adler32 = "0.3.7" +snap = "1.1" +tokio = { version = "1", features = ["rt-multi-thread"] } +zstd = "0.13.3" + +[dev-dependencies] +criterion = { version = "0.7", features = ["async", "async_tokio", "async_std"] } +datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } +itertools = "0.14.0" +tempfile = "3.26.0" + +[lib] +name = "datafusion_comet_shuffle" +path = "src/lib.rs" + +[[bench]] +name = "shuffle_writer" +harness = false + +[[bench]] +name = "row_columnar" +harness = false diff --git a/native/shuffle/README.md b/native/shuffle/README.md new file mode 100644 index 0000000000..8fba6b0323 --- /dev/null +++ b/native/shuffle/README.md @@ -0,0 +1,25 @@ + + +# datafusion-comet-shuffle: Shuffle Writer and Reader + +This crate provides the shuffle writer and reader implementation for Apache DataFusion Comet and is maintained as part +of the [Apache DataFusion Comet] subproject. + +[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/ diff --git a/native/core/benches/row_columnar.rs b/native/shuffle/benches/row_columnar.rs similarity index 99% rename from native/core/benches/row_columnar.rs rename to native/shuffle/benches/row_columnar.rs index 4ee1539060..7d3951b4d5 100644 --- a/native/core/benches/row_columnar.rs +++ b/native/shuffle/benches/row_columnar.rs @@ -22,11 +22,11 @@ //! list, and map types. use arrow::datatypes::{DataType as ArrowDataType, Field, Fields}; -use comet::execution::shuffle::spark_unsafe::row::{ +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_comet_shuffle::spark_unsafe::row::{ process_sorted_row_partition, SparkUnsafeObject, SparkUnsafeRow, }; -use comet::execution::shuffle::CompressionCodec; -use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion_comet_shuffle::CompressionCodec; use std::sync::Arc; use tempfile::Builder; diff --git a/native/core/benches/shuffle_writer.rs b/native/shuffle/benches/shuffle_writer.rs similarity index 99% rename from native/core/benches/shuffle_writer.rs rename to native/shuffle/benches/shuffle_writer.rs index 0857ef78c6..27abd919fa 100644 --- a/native/core/benches/shuffle_writer.rs +++ b/native/shuffle/benches/shuffle_writer.rs @@ -19,9 +19,6 @@ use arrow::array::builder::{Date32Builder, Decimal128Builder, Int32Builder}; use arrow::array::{builder::StringBuilder, Array, Int32Array, RecordBatch}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::row::{RowConverter, SortField}; -use comet::execution::shuffle::{ - CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, -}; use criterion::{criterion_group, criterion_main, Criterion}; use datafusion::datasource::memory::MemorySourceConfig; use datafusion::datasource::source::DataSourceExec; @@ -32,6 +29,9 @@ use datafusion::{ physical_plan::{common::collect, ExecutionPlan}, prelude::SessionContext, }; +use datafusion_comet_shuffle::{ + CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, +}; use itertools::Itertools; use std::io::Cursor; use std::sync::Arc; diff --git a/native/core/src/execution/shuffle/codec.rs b/native/shuffle/src/codec.rs similarity index 99% rename from native/core/src/execution/shuffle/codec.rs rename to native/shuffle/src/codec.rs index 33e6989d4c..c8edc2468c 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/shuffle/src/codec.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -use crate::errors::{CometError, CometResult}; use arrow::array::RecordBatch; use arrow::datatypes::Schema; use arrow::ipc::reader::StreamReader; @@ -25,6 +24,7 @@ use crc32fast::Hasher; use datafusion::common::DataFusionError; use datafusion::error::Result; use datafusion::physical_plan::metrics::Time; +use datafusion_comet_jni_bridge::errors::{CometError, CometResult}; use simd_adler32::Adler32; use std::io::{Cursor, Seek, SeekFrom, Write}; diff --git a/native/core/src/execution/shuffle/comet_partitioning.rs b/native/shuffle/src/comet_partitioning.rs similarity index 98% rename from native/core/src/execution/shuffle/comet_partitioning.rs rename to native/shuffle/src/comet_partitioning.rs index b8d68cd21e..c269539a62 100644 --- a/native/core/src/execution/shuffle/comet_partitioning.rs +++ b/native/shuffle/src/comet_partitioning.rs @@ -47,7 +47,7 @@ impl CometPartitioning { } } -pub(super) fn pmod(hash: u32, n: usize) -> usize { +pub(crate) fn pmod(hash: u32, n: usize) -> usize { let hash = hash as i32; let n = n as i32; let r = hash % n; diff --git a/native/core/src/execution/shuffle/mod.rs b/native/shuffle/src/lib.rs similarity index 88% rename from native/core/src/execution/shuffle/mod.rs rename to native/shuffle/src/lib.rs index 6018cff50f..7c2fc8403f 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/shuffle/src/lib.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -pub(crate) mod codec; -mod comet_partitioning; -mod metrics; -mod partitioners; +pub mod codec; +pub(crate) mod comet_partitioning; +pub(crate) mod metrics; +pub(crate) mod partitioners; mod shuffle_writer; pub mod spark_unsafe; -mod writers; +pub(crate) mod writers; pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; pub use comet_partitioning::CometPartitioning; diff --git a/native/core/src/execution/shuffle/metrics.rs b/native/shuffle/src/metrics.rs similarity index 84% rename from native/core/src/execution/shuffle/metrics.rs rename to native/shuffle/src/metrics.rs index 33b51c3cd8..1aba4677db 100644 --- a/native/core/src/execution/shuffle/metrics.rs +++ b/native/shuffle/src/metrics.rs @@ -19,34 +19,34 @@ use datafusion::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; -pub(super) struct ShufflePartitionerMetrics { +pub(crate) struct ShufflePartitionerMetrics { /// metrics - pub(super) baseline: BaselineMetrics, + pub(crate) baseline: BaselineMetrics, /// Time to perform repartitioning - pub(super) repart_time: Time, + pub(crate) repart_time: Time, /// Time encoding batches to IPC format - pub(super) encode_time: Time, + pub(crate) encode_time: Time, /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. - pub(super) write_time: Time, + pub(crate) write_time: Time, /// Number of input batches - pub(super) input_batches: Count, + pub(crate) input_batches: Count, /// count of spills during the execution of the operator - pub(super) spill_count: Count, + pub(crate) spill_count: Count, /// total spilled bytes during the execution of the operator - pub(super) spilled_bytes: Count, + pub(crate) spilled_bytes: Count, /// The original size of spilled data. Different to `spilled_bytes` because of compression. - pub(super) data_size: Count, + pub(crate) data_size: Count, } impl ShufflePartitionerMetrics { - pub(super) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { Self { baseline: BaselineMetrics::new(metrics, partition), repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), diff --git a/native/core/src/execution/shuffle/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs similarity index 83% rename from native/core/src/execution/shuffle/partitioners/mod.rs rename to native/shuffle/src/partitioners/mod.rs index b9058f66f4..a6d589677e 100644 --- a/native/core/src/execution/shuffle/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -22,12 +22,12 @@ mod single_partition; use arrow::record_batch::RecordBatch; use datafusion::common::Result; -pub(super) use multi_partition::MultiPartitionShuffleRepartitioner; -pub(super) use partitioned_batch_iterator::PartitionedBatchIterator; -pub(super) use single_partition::SinglePartitionShufflePartitioner; +pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; +pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; +pub(crate) use single_partition::SinglePartitionShufflePartitioner; #[async_trait::async_trait] -pub(super) trait ShufflePartitioner: Send + Sync { +pub(crate) trait ShufflePartitioner: Send + Sync { /// Insert a batch into the partitioner async fn insert_batch(&mut self, batch: RecordBatch) -> Result<()>; /// Write shuffle data and shuffle index file to disk diff --git a/native/core/src/execution/shuffle/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs similarity index 98% rename from native/core/src/execution/shuffle/partitioners/multi_partition.rs rename to native/shuffle/src/partitioners/multi_partition.rs index 9c366ad462..42290c5510 100644 --- a/native/core/src/execution/shuffle/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -15,16 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::partitioned_batch_iterator::{ +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::partitioned_batch_iterator::{ PartitionedBatchIterator, PartitionedBatchesProducer, }; -use crate::execution::shuffle::partitioners::ShufflePartitioner; -use crate::execution::shuffle::writers::{BufBatchWriter, PartitionWriter}; -use crate::execution::shuffle::{ - comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter, -}; -use crate::execution::tracing::{with_trace, with_trace_async}; +use crate::partitioners::ShufflePartitioner; +use crate::writers::{BufBatchWriter, PartitionWriter}; +use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; use arrow::array::{ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; @@ -32,6 +29,7 @@ use datafusion::common::DataFusionError; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::metrics::Time; +use datafusion_comet_common::tracing::{with_trace, with_trace_async}; use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; use itertools::Itertools; use std::fmt; diff --git a/native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs b/native/shuffle/src/partitioners/partitioned_batch_iterator.rs similarity index 100% rename from native/core/src/execution/shuffle/partitioners/partitioned_batch_iterator.rs rename to native/shuffle/src/partitioners/partitioned_batch_iterator.rs diff --git a/native/core/src/execution/shuffle/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs similarity index 96% rename from native/core/src/execution/shuffle/partitioners/single_partition.rs rename to native/shuffle/src/partitioners/single_partition.rs index eeca4458cc..5801ef613b 100644 --- a/native/core/src/execution/shuffle/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::ShufflePartitioner; -use crate::execution::shuffle::writers::BufBatchWriter; -use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter}; +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::ShufflePartitioner; +use crate::writers::BufBatchWriter; +use crate::{CompressionCodec, ShuffleBlockWriter}; use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs similarity index 98% rename from native/core/src/execution/shuffle/shuffle_writer.rs rename to native/shuffle/src/shuffle_writer.rs index fe1bf0fccf..e649aaac69 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -17,12 +17,11 @@ //! Defines the External shuffle repartition plan. -use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::{ +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::{ MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, }; -use crate::execution::shuffle::{CometPartitioning, CompressionCodec}; -use crate::execution::tracing::with_trace_async; +use crate::{CometPartitioning, CompressionCodec}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -39,6 +38,7 @@ use datafusion::{ Statistics, }, }; +use datafusion_comet_common::tracing::with_trace_async; use futures::{StreamExt, TryFutureExt, TryStreamExt}; use std::{ any::Any, @@ -265,7 +265,7 @@ async fn external_shuffle( #[cfg(test)] mod test { use super::*; - use crate::execution::shuffle::{read_ipc_compressed, ShuffleBlockWriter}; + use crate::{read_ipc_compressed, ShuffleBlockWriter}; use arrow::array::{Array, StringArray, StringBuilder}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; @@ -591,7 +591,7 @@ mod test { #[test] #[cfg_attr(miri, ignore)] fn test_batch_coalescing_reduces_size() { - use crate::execution::shuffle::writers::BufBatchWriter; + use crate::writers::BufBatchWriter; use arrow::array::Int32Array; // Create a wide schema to amplify per-block schema overhead diff --git a/native/core/src/execution/shuffle/spark_unsafe/list.rs b/native/shuffle/src/spark_unsafe/list.rs similarity index 98% rename from native/core/src/execution/shuffle/spark_unsafe/list.rs rename to native/shuffle/src/spark_unsafe/list.rs index d9c93b1c6e..4eb293895c 100644 --- a/native/core/src/execution/shuffle/spark_unsafe/list.rs +++ b/native/shuffle/src/spark_unsafe/list.rs @@ -15,14 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - errors::CometError, - execution::shuffle::spark_unsafe::{ - map::append_map_elements, - row::{ - append_field, downcast_builder_ref, impl_primitive_accessors, SparkUnsafeObject, - SparkUnsafeRow, - }, +use crate::spark_unsafe::{ + map::append_map_elements, + row::{ + append_field, downcast_builder_ref, impl_primitive_accessors, SparkUnsafeObject, + SparkUnsafeRow, }, }; use arrow::array::{ @@ -34,6 +31,7 @@ use arrow::array::{ MapBuilder, }; use arrow::datatypes::{DataType, TimeUnit}; +use datafusion_comet_jni_bridge::errors::CometError; /// Generates bulk append methods for primitive types in SparkUnsafeArray. /// diff --git a/native/core/src/execution/shuffle/spark_unsafe/map.rs b/native/shuffle/src/spark_unsafe/map.rs similarity index 97% rename from native/core/src/execution/shuffle/spark_unsafe/map.rs rename to native/shuffle/src/spark_unsafe/map.rs index 19b67c43dc..57444cee7a 100644 --- a/native/core/src/execution/shuffle/spark_unsafe/map.rs +++ b/native/shuffle/src/spark_unsafe/map.rs @@ -15,12 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::{ - errors::CometError, - execution::shuffle::spark_unsafe::list::{append_to_builder, SparkUnsafeArray}, -}; +use crate::spark_unsafe::list::{append_to_builder, SparkUnsafeArray}; use arrow::array::builder::{ArrayBuilder, MapBuilder, MapFieldNames}; use arrow::datatypes::{DataType, FieldRef}; +use datafusion_comet_jni_bridge::errors::CometError; pub struct SparkUnsafeMap { pub(crate) keys: SparkUnsafeArray, diff --git a/native/core/src/execution/shuffle/spark_unsafe/mod.rs b/native/shuffle/src/spark_unsafe/mod.rs similarity index 100% rename from native/core/src/execution/shuffle/spark_unsafe/mod.rs rename to native/shuffle/src/spark_unsafe/mod.rs diff --git a/native/core/src/execution/shuffle/spark_unsafe/row.rs b/native/shuffle/src/spark_unsafe/row.rs similarity index 99% rename from native/core/src/execution/shuffle/spark_unsafe/row.rs rename to native/shuffle/src/spark_unsafe/row.rs index 7ebf18d8d0..da980af8f9 100644 --- a/native/core/src/execution/shuffle/spark_unsafe/row.rs +++ b/native/shuffle/src/spark_unsafe/row.rs @@ -17,18 +17,10 @@ //! Utils for supporting native sort-based columnar shuffle. -use crate::{ - errors::CometError, - execution::{ - shuffle::{ - codec::{Checksum, ShuffleBlockWriter}, - spark_unsafe::{ - list::{append_list_element, SparkUnsafeArray}, - map::{append_map_elements, get_map_key_value_fields, SparkUnsafeMap}, - }, - }, - utils::bytes_to_i128, - }, +use crate::codec::{Checksum, ShuffleBlockWriter}; +use crate::spark_unsafe::{ + list::{append_list_element, SparkUnsafeArray}, + map::{append_map_elements, get_map_key_value_fields, SparkUnsafeMap}, }; use arrow::array::{ builder::{ @@ -44,6 +36,8 @@ use arrow::compute::cast; use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use arrow::error::ArrowError; use datafusion::physical_plan::metrics::Time; +use datafusion_comet_common::bytes_to_i128; +use datafusion_comet_jni_bridge::errors::CometError; use jni::sys::{jint, jlong}; use std::{ fs::OpenOptions, @@ -403,7 +397,7 @@ macro_rules! get_field_builder { } // Expose the macro for other modules. -use crate::execution::shuffle::CompressionCodec; +use crate::CompressionCodec; pub(crate) use downcast_builder_ref; /// Appends field of row to the given struct builder. `dt` is the data type of the field. diff --git a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs b/native/shuffle/src/writers/buf_batch_writer.rs similarity index 99% rename from native/core/src/execution/shuffle/writers/buf_batch_writer.rs rename to native/shuffle/src/writers/buf_batch_writer.rs index 8d056d7bb0..6344a8e5f2 100644 --- a/native/core/src/execution/shuffle/writers/buf_batch_writer.rs +++ b/native/shuffle/src/writers/buf_batch_writer.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::shuffle::ShuffleBlockWriter; +use crate::ShuffleBlockWriter; use arrow::array::RecordBatch; use arrow::compute::kernels::coalesce::BatchCoalescer; use datafusion::physical_plan::metrics::Time; diff --git a/native/core/src/execution/shuffle/writers/mod.rs b/native/shuffle/src/writers/mod.rs similarity index 89% rename from native/core/src/execution/shuffle/writers/mod.rs rename to native/shuffle/src/writers/mod.rs index d41363b7fb..b58989e46c 100644 --- a/native/core/src/execution/shuffle/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -18,5 +18,5 @@ mod buf_batch_writer; mod partition_writer; -pub(super) use buf_batch_writer::BufBatchWriter; -pub(super) use partition_writer::PartitionWriter; +pub(crate) use buf_batch_writer::BufBatchWriter; +pub(crate) use partition_writer::PartitionWriter; diff --git a/native/core/src/execution/shuffle/writers/partition_writer.rs b/native/shuffle/src/writers/partition_writer.rs similarity index 94% rename from native/core/src/execution/shuffle/writers/partition_writer.rs rename to native/shuffle/src/writers/partition_writer.rs index 7c2dbe0444..48017871db 100644 --- a/native/core/src/execution/shuffle/writers/partition_writer.rs +++ b/native/shuffle/src/writers/partition_writer.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use crate::execution::shuffle::metrics::ShufflePartitionerMetrics; -use crate::execution::shuffle::partitioners::PartitionedBatchIterator; -use crate::execution::shuffle::writers::buf_batch_writer::BufBatchWriter; -use crate::execution::shuffle::ShuffleBlockWriter; +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::PartitionedBatchIterator; +use crate::writers::buf_batch_writer::BufBatchWriter; +use crate::ShuffleBlockWriter; use datafusion::common::DataFusionError; use datafusion::execution::disk_manager::RefCountedTempFile; use datafusion::execution::runtime_env::RuntimeEnv;