Skip to content
Open
32 changes: 27 additions & 5 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
# under the License.

[workspace]
default-members = ["core", "spark-expr", "common", "proto", "jni-bridge"]
members = ["core", "spark-expr", "common", "proto", "jni-bridge", "hdfs", "fs-hdfs"]
default-members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle"]
members = ["core", "spark-expr", "common", "proto", "jni-bridge", "shuffle", "hdfs", "fs-hdfs"]
resolver = "2"

[workspace.package]
Expand Down Expand Up @@ -46,6 +46,7 @@ datafusion-comet-spark-expr = { path = "spark-expr" }
datafusion-comet-common = { path = "common" }
datafusion-comet-jni-bridge = { path = "jni-bridge" }
datafusion-comet-proto = { path = "proto" }
datafusion-comet-shuffle = { path = "shuffle" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.10" }
futures = "0.3.32"
Expand Down
25 changes: 25 additions & 0 deletions native/common/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# datafusion-comet-common: Common Types

This crate provides common types shared across Apache DataFusion Comet crates and is maintained as part of the
[Apache DataFusion Comet] subproject.

[Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/
3 changes: 3 additions & 0 deletions native/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

mod error;
mod query_context;
pub mod tracing;
mod utils;

pub use error::{decimal_overflow_error, SparkError, SparkErrorWithContext, SparkResult};
pub use query_context::{create_query_context_map, QueryContext, QueryContextMap};
pub use utils::bytes_to_i128;
147 changes: 147 additions & 0 deletions native/common/src/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion::common::instant::Instant;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::sync::{Arc, LazyLock, Mutex};

pub static RECORDER: LazyLock<Recorder> = LazyLock::new(Recorder::new);

/// Log events using Chrome trace format JSON
/// https://github.com/catapult-project/catapult/blob/main/tracing/README.md
pub struct Recorder {
now: Instant,
writer: Arc<Mutex<BufWriter<File>>>,
}

impl Default for Recorder {
fn default() -> Self {
Self::new()
}
}

impl Recorder {
pub fn new() -> Self {
let file = OpenOptions::new()
.create(true)
.append(true)
.open("comet-event-trace.json")
.expect("Error writing tracing");

let mut writer = BufWriter::new(file);

// Write start of JSON array. Note that there is no requirement to write
// the closing ']'.
writer
.write_all("[ ".as_bytes())
.expect("Error writing tracing");
Self {
now: Instant::now(),
writer: Arc::new(Mutex::new(writer)),
}
}
pub fn begin_task(&self, name: &str) {
self.log_event(name, "B")
}

pub fn end_task(&self, name: &str) {
self.log_event(name, "E")
}

pub fn log_memory_usage(&self, name: &str, usage_bytes: u64) {
let usage_mb = (usage_bytes as f64 / 1024.0 / 1024.0) as usize;
let json = format!(
"{{ \"name\": \"{name}\", \"cat\": \"PERF\", \"ph\": \"C\", \"pid\": 1, \"tid\": {}, \"ts\": {}, \"args\": {{ \"{name}\": {usage_mb} }} }},\n",
Self::get_thread_id(),
self.now.elapsed().as_micros()
);
let mut writer = self.writer.lock().unwrap();
writer
.write_all(json.as_bytes())
.expect("Error writing tracing");
}

fn log_event(&self, name: &str, ph: &str) {
let json = format!(
"{{ \"name\": \"{}\", \"cat\": \"PERF\", \"ph\": \"{ph}\", \"pid\": 1, \"tid\": {}, \"ts\": {} }},\n",
name,
Self::get_thread_id(),
self.now.elapsed().as_micros()
);
let mut writer = self.writer.lock().unwrap();
writer
.write_all(json.as_bytes())
.expect("Error writing tracing");
}

fn get_thread_id() -> u64 {
let thread_id = std::thread::current().id();
format!("{thread_id:?}")
.trim_start_matches("ThreadId(")
.trim_end_matches(")")
.parse()
.expect("Error parsing thread id")
}
}

pub fn trace_begin(name: &str) {
RECORDER.begin_task(name);
}

pub fn trace_end(name: &str) {
RECORDER.end_task(name);
}

pub fn log_memory_usage(name: &str, value: u64) {
RECORDER.log_memory_usage(name, value);
}

pub fn with_trace<T, F>(label: &str, tracing_enabled: bool, f: F) -> T
where
F: FnOnce() -> T,
{
if tracing_enabled {
trace_begin(label);
}

let result = f();

if tracing_enabled {
trace_end(label);
}

result
}

pub async fn with_trace_async<F, Fut, T>(label: &str, tracing_enabled: bool, f: F) -> T
where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = T>,
{
if tracing_enabled {
trace_begin(label);
}

let result = f().await;

if tracing_enabled {
trace_end(label);
}

result
}
37 changes: 37 additions & 0 deletions native/common/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Converts a slice of bytes to i128. The bytes are serialized in big-endian order by
/// `BigInteger.toByteArray()` in Java.
pub fn bytes_to_i128(slice: &[u8]) -> i128 {
let mut bytes = [0; 16];
let mut i = 0;
while i != 16 && i != slice.len() {
bytes[i] = slice[slice.len() - 1 - i];
i += 1;
}

// if the decimal is negative, we need to flip all the bits
if (slice[0] as i8) < 0 {
while i < 16 {
bytes[i] = !bytes[i];
i += 1;
}
}

i128::from_le_bytes(bytes)
}
16 changes: 2 additions & 14 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,6 @@ log = "0.4"
log4rs = "1.4.0"
prost = "0.14.3"
jni = "0.21"
snap = "1.1"
# we disable default features in lz4_flex to force the use of the faster unsafe encoding and decoding implementation
lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] }
zstd = "0.13.3"
rand = { workspace = true }
num = { workspace = true }
bytes = { workspace = true }
Expand All @@ -62,11 +58,11 @@ datafusion-physical-expr-adapter = { workspace = true }
datafusion-datasource = { workspace = true }
datafusion-spark = { workspace = true }
once_cell = "1.18.0"
crc32fast = "1.3.2"
simd-adler32 = "0.3.7"
datafusion-comet-common = { workspace = true }
datafusion-comet-spark-expr = { workspace = true }
datafusion-comet-jni-bridge = { workspace = true }
datafusion-comet-proto = { workspace = true }
datafusion-comet-shuffle = { workspace = true }
object_store = { workspace = true }
url = { workspace = true }
aws-config = { workspace = true }
Expand Down Expand Up @@ -121,14 +117,6 @@ harness = false
name = "bit_util"
harness = false

[[bench]]
name = "row_columnar"
harness = false

[[bench]]
name = "shuffle_writer"
harness = false

[[bench]]
name = "parquet_decode"
harness = false
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub(crate) mod metrics;
pub mod operators;
pub(crate) mod planner;
pub mod serde;
pub mod shuffle;
pub use datafusion_comet_shuffle as shuffle;
pub(crate) mod sort;
pub(crate) mod spark_plan;
pub use datafusion_comet_spark_expr::timezone;
Expand Down
Loading
Loading