diff --git a/.gitignore b/.gitignore index 65b40030d57..2ed384868d7 100644 --- a/.gitignore +++ b/.gitignore @@ -224,3 +224,6 @@ compile_commands.json # cargo sweep output sweep.timestamp +# Pending cargo-insta snapshots +*.pending-snap + diff --git a/Cargo.lock b/Cargo.lock index a2936c9bdb5..92c6180c42d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -159,12 +159,9 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.8.0" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d03449bb8ca2cc2ef70869af31463d1ae5ccc8fa3e334b307203fbf815207e" -dependencies = [ - "rustversion", -] +checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "arcref" @@ -651,9 +648,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.36" +version = "0.4.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "98ec5f6c2f8bc326c994cb9e241cc257ddaba9afa8555a43cffbb5dd86efaa37" +checksum = "0e86f6d3dc9dc4352edeea6b8e499e13e3f5dc3b964d7ca5fd411415a3498473" dependencies = [ "compression-codecs", "compression-core", @@ -753,7 +750,7 @@ checksum = "3b43422f69d8ff38f95f1b2bb76517c91589a924d1559a0e935d7c8ce0274c11" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -793,7 +790,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -810,7 +807,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -854,9 +851,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.12" +version = "1.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96571e6996817bf3d58f6b569e4b9fd2e9d2fcf9f7424eed07b2ce9bb87535e5" +checksum = "a0149602eeaf915158e14029ba0c78dedb8c08d554b024d54c8f239aab46511d" dependencies = [ "aws-credential-types", "aws-runtime", @@ -884,9 +881,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.11" +version = "1.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3cd362783681b15d136480ad555a099e82ecd8e2d10a841e14dfd0078d67fee3" +checksum = "b01c9521fa01558f750d183c8c68c81b0155b9d193a4ba7f84c36bd1b6d04a06" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -896,9 +893,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.15.2" +version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a88aab2464f1f25453baa7a07c84c5b7684e274054ba06817f382357f77a288" +checksum = "6b5ce75405893cd713f9ab8e297d8e438f624dde7d706108285f7e17a25a180f" dependencies = [ "aws-lc-sys", "zeroize", @@ -906,9 +903,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.35.0" +version = "0.34.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45afffdee1e7c9126814751f88dddc747f41d91da16c9551a0f1e8a11e788a1" +checksum = "179c3777a8b5e70e90ea426114ffc565b2c1a9f82f6c4a0c5a34aa6ef5e781b6" dependencies = [ "cc", "cmake", @@ -918,9 +915,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.5.17" +version = "1.5.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d81b5b2898f6798ad58f484856768bca817e3cd9de0974c24ae0f1113fe88f1b" +checksum = "7ce527fb7e53ba9626fc47824f25e256250556c40d8f81d27dd92aa38239d632" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -942,9 +939,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.91.0" +version = "1.90.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee6402a36f27b52fe67661c6732d684b2635152b676aa2babbfb5204f99115d" +checksum = "4f18e53542c522459e757f81e274783a78f8c81acdfc8d1522ee8a18b5fb1c66" dependencies = [ "aws-credential-types", "aws-runtime", @@ -964,9 +961,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.93.0" +version = "1.92.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a45a7f750bbd170ee3677671ad782d90b894548f4e4ae168302c57ec9de5cb3e" +checksum = "532f4d866012ffa724a4385c82e8dd0e59f0ca0e600f3f22d4c03b6824b34e4a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -986,9 +983,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.95.0" +version = "1.94.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55542378e419558e6b1f398ca70adb0b2088077e79ad9f14eb09441f2f7b2164" +checksum = "1be6fbbfa1a57724788853a623378223fe828fc4c09b146c992f0c95b6256174" dependencies = [ "aws-credential-types", "aws-runtime", @@ -1009,9 +1006,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.3.7" +version = "1.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e523e1c4e8e7e8ff219d732988e22bfeae8a1cafdbe6d9eca1546fa080be7c" +checksum = "c35452ec3f001e1f2f6db107b6373f1f48f05ec63ba2c5c9fa91f07dad32af11" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -1031,9 +1028,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "1.2.7" +version = "1.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ee19095c7c4dda59f1697d028ce704c24b2d33c6718790c7f1d5a3015b4107c" +checksum = "127fcfad33b7dfc531141fda7e1c402ac65f88aca5511a4d31e2e3d2cd01ce9c" dependencies = [ "futures-util", "pin-project-lite", @@ -1042,9 +1039,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.62.6" +version = "0.62.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "826141069295752372f8203c17f28e30c464d22899a43a0c9fd9c458d469c88b" +checksum = "445d5d720c99eed0b4aa674ed00d835d9b1427dd73e04adaf2f94c6b2d6f9fca" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -1063,9 +1060,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-client" -version = "1.1.5" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59e62db736db19c488966c8d787f52e6270be565727236fd5579eaa301e7bc4a" +checksum = "623254723e8dfd535f566ee7b2381645f8981da086b5c4aa26c0c41582bb1d2c" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -1087,27 +1084,27 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.61.9" +version = "0.61.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49fa1213db31ac95288d981476f78d05d9cbb0353d22cdf3472cc05bb02f6551" +checksum = "2db31f727935fc63c6eeae8b37b438847639ec330a9161ece694efba257e0c54" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-observability" -version = "0.1.5" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17f616c3f2260612fe44cede278bafa18e73e6479c4e393e2c4518cf2a9a228a" +checksum = "2d1881b1ea6d313f9890710d65c158bdab6fb08c91ea825f74c1c8c357baf4cc" dependencies = [ "aws-smithy-runtime-api", ] [[package]] name = "aws-smithy-query" -version = "0.60.9" +version = "0.60.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae5d689cf437eae90460e944a58b5668530d433b4ff85789e69d2f2a556e057d" +checksum = "d28a63441360c477465f80c7abac3b9c4d075ca638f982e605b7dc2a2c7156c9" dependencies = [ "aws-smithy-types", "urlencoding", @@ -1115,9 +1112,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.9.5" +version = "1.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a392db6c583ea4a912538afb86b7be7c5d8887d91604f50eb55c262ee1b4a5f5" +checksum = "0bbe9d018d646b96c7be063dd07987849862b0e6d07c778aad7d93d1be6c1ef0" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -1139,9 +1136,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.9.3" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab0d43d899f9e508300e587bf582ba54c27a452dd0a9ea294690669138ae14a2" +checksum = "ec7204f9fd94749a7c53b26da1b961b4ac36bf070ef1e0b94bb09f79d4f6c193" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -1156,9 +1153,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.3.5" +version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "905cb13a9895626d49cf2ced759b062d913834c7482c38e49557eac4e6193f01" +checksum = "25f535879a207fce0db74b679cfc3e91a3159c8144d717d55f5832aea9eef46e" dependencies = [ "base64-simd", "bytes", @@ -1179,18 +1176,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.13" +version = "0.60.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b2f670422ff42bf7065031e72b45bc52a3508bd089f743ea90731ca2b6ea57" +checksum = "eab77cdd036b11056d2a30a7af7b775789fb024bf216acc13884c6c97752ae56" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "1.3.11" +version = "1.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d980627d2dd7bfc32a3c025685a033eeab8d365cc840c631ef59d1b8f428164" +checksum = "d79fb68e3d7fe5d4833ea34dc87d2e97d26d3086cb3da660bb6b1f76d98680b6" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -1229,9 +1226,9 @@ dependencies = [ [[package]] name = "base64ct" -version = "1.8.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d809780667f4410e7c41b07f52439b94d2bdf8528eeedc287fa38d3b7f95d82" +checksum = "55248b47b0caf0546f7988906588779981c43bb1bc9d0c44087278f80cdb44ba" [[package]] name = "better_io" @@ -1241,9 +1238,9 @@ checksum = "ef0a3155e943e341e557863e69a708999c94ede624e37865c8e2a91b94efa78f" [[package]] name = "bigdecimal" -version = "0.4.10" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6867f1565b3aad85681f1015055b087fcfd840d6aeee6eee7f2da317603695" +checksum = "560f42649de9fa436b73517378a147ec21f6c997a546581df4b4b31677828934" dependencies = [ "autocfg", "libm", @@ -1269,7 +1266,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -1401,7 +1398,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -1424,7 +1421,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -1580,7 +1577,7 @@ dependencies = [ "quote", "serde", "serde_json", - "syn 2.0.110", + "syn 2.0.113", "tempfile", "toml", ] @@ -1707,7 +1704,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -1718,9 +1715,9 @@ checksum = "a1d728cc89cf3aee9ff92b05e62b19ee65a02b5702cff7d5a377e32c6ae29d8d" [[package]] name = "cmake" -version = "0.1.57" +version = "0.1.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" dependencies = [ "cc", ] @@ -1738,9 +1735,9 @@ dependencies = [ [[package]] name = "codspeed" -version = "4.2.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f0d98d97fd75ca4489a1a0997820a6521531085e7c8a98941bd0e1264d567dd" +checksum = "c3b847e05a34be5c38f3f2a5052178a3bd32e6b5702f3ea775efde95c483a539" dependencies = [ "anyhow", "cc", @@ -1756,9 +1753,9 @@ dependencies = [ [[package]] name = "codspeed-divan-compat" -version = "4.2.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4179ec5518e79efcd02ed50aa483ff807902e43c85146e87fff58b9cffc06078" +checksum = "f0f0e9fe5eaa39995ec35e46407f7154346cc25bd1300c64c21636f3d00cb2cc" dependencies = [ "clap", "codspeed", @@ -1769,23 +1766,23 @@ dependencies = [ [[package]] name = "codspeed-divan-compat-macros" -version = "4.2.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15eaee97aa5bceb32cc683fe25cd6373b7fc48baee5c12471996b58b6ddf0d7c" +checksum = "88c8babf2a40fd2206a2e030cf020d0d58144cd56e1dc408bfba02cdefb08b4f" dependencies = [ "divan-macros", "itertools 0.14.0", "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] name = "codspeed-divan-compat-walltime" -version = "4.2.1" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c38671153aa73be075d6019cab5ab1e6b31d36644067c1ac4cef73bf9723ce33" +checksum = "7f26092328e12a36704ffc552f379c6405dd94d3149970b79b22d371717c2aae" dependencies = [ "cfg-if", "clap", @@ -1861,7 +1858,7 @@ dependencies = [ "indicatif", "itertools 0.14.0", "lance-bench", - "parquet 57.0.0", + "parquet 57.1.0", "regex", "serde", "tokio", @@ -1872,9 +1869,9 @@ dependencies = [ [[package]] name = "compression-codecs" -version = "0.4.35" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0f7ac3e5b97fdce45e8922fb05cae2c37f7bbd63d30dd94821dacfd8f3f2bf2" +checksum = "302266479cb963552d11bd042013a58ef1adc56768016c8b82b4199488f2d4ad" dependencies = [ "compression-core", "flate2", @@ -2031,9 +2028,9 @@ dependencies = [ [[package]] name = "crc" -version = "3.4.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +checksum = "9710d3b3739c2e349eb44fe848ad0b7c8cb1e42bd87ee49371df2f7acaf3e675" dependencies = [ "crc-catalog", ] @@ -2216,7 +2213,7 @@ dependencies = [ "proc-macro2", "quote", "scratch", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2230,7 +2227,7 @@ dependencies = [ "indexmap", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2248,7 +2245,7 @@ dependencies = [ "indexmap", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2282,7 +2279,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2296,7 +2293,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2307,7 +2304,7 @@ checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ "darling_core 0.20.11", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2318,7 +2315,7 @@ checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" dependencies = [ "darling_core 0.21.3", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -2387,8 +2384,7 @@ dependencies = [ [[package]] name = "datafusion" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ba7cb113e9c0bedf9e9765926031e132fa05a1b09ba6e93a6d1a4d7044457b8" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "arrow-schema 57.1.0", @@ -2425,10 +2421,9 @@ dependencies = [ "log", "object_store", "parking_lot", - "parquet 57.0.0", + "parquet 57.1.0", "rand 0.9.2", "regex", - "rstest", "sqlparser 0.59.0", "tempfile", "tokio", @@ -2488,8 +2483,7 @@ dependencies = [ [[package]] name = "datafusion-catalog" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a3a799f914a59b1ea343906a0486f17061f39509af74e874a866428951130d" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2536,8 +2530,7 @@ dependencies = [ [[package]] name = "datafusion-catalog-listing" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db1b113c80d7a0febcd901476a57aef378e717c54517a163ed51417d87621b0" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2554,7 +2547,6 @@ dependencies = [ "itertools 0.14.0", "log", "object_store", - "tokio", ] [[package]] @@ -2583,20 +2575,19 @@ dependencies = [ [[package]] name = "datafusion-common" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c10f7659e96127d25e8366be7c8be4109595d6a2c3eac70421f380a7006a1b0" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "ahash 0.8.12", "arrow 57.1.0", "arrow-ipc 57.1.0", "chrono", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "libc", "log", "object_store", - "parquet 57.0.0", + "parquet 57.1.0", "paste", "sqlparser 0.59.0", "tokio", @@ -2617,8 +2608,7 @@ dependencies = [ [[package]] name = "datafusion-common-runtime" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b92065bbc6532c6651e2f7dd30b55cba0c7a14f860c7e1d15f165c41a1868d95" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "futures", "log", @@ -2657,8 +2647,7 @@ dependencies = [ [[package]] name = "datafusion-datasource" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde13794244bc7581cd82f6fff217068ed79cdc344cafe4ab2c3a1c3510b38d6" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2686,8 +2675,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-arrow" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804fa9b4ecf3157982021770617200ef7c1b2979d57bec9044748314775a9aea" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "arrow-ipc 57.1.0", @@ -2735,8 +2723,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-csv" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a1641a40b259bab38131c5e6f48fac0717bedb7dc93690e604142a849e0568" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2783,8 +2770,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-json" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adeacdb00c1d37271176f8fb6a1d8ce096baba16ea7a4b2671840c5c9c64fe85" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2805,8 +2791,7 @@ dependencies = [ [[package]] name = "datafusion-datasource-parquet" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43d0b60ffd66f28bfb026565d62b0a6cbc416da09814766a3797bba7d85a3cd9" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2828,7 +2813,7 @@ dependencies = [ "log", "object_store", "parking_lot", - "parquet 57.0.0", + "parquet 57.1.0", "tokio", ] @@ -2841,8 +2826,7 @@ checksum = "99ee6b1d9a80d13f9deb2291f45c07044b8e62fb540dbde2453a18be17a36429" [[package]] name = "datafusion-doc" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b99e13947667b36ad713549237362afb054b2d8f8cc447751e23ec61202db07" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" [[package]] name = "datafusion-execution" @@ -2867,11 +2851,11 @@ dependencies = [ [[package]] name = "datafusion-execution" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63695643190679037bc946ad46a263b62016931547bf119859c511f7ff2f5178" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", + "chrono", "dashmap", "datafusion-common 51.0.0", "datafusion-expr 51.0.0", @@ -2908,8 +2892,7 @@ dependencies = [ [[package]] name = "datafusion-expr" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9a4787cbf5feb1ab351f789063398f67654a6df75c4d37d7f637dc96f951a91" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -2943,8 +2926,7 @@ dependencies = [ [[package]] name = "datafusion-expr-common" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ce2fb1b8c15c9ac45b0863c30b268c69dc9ee7a1ee13ecf5d067738338173dc" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "datafusion-common 51.0.0", @@ -2985,13 +2967,13 @@ dependencies = [ [[package]] name = "datafusion-functions" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "794a9db7f7b96b3346fc007ff25e994f09b8f0511b4cf7dff651fadfe3ebb28f" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "arrow-buffer 57.1.0", "base64", "chrono", + "chrono-tz", "datafusion-common 51.0.0", "datafusion-doc 51.0.0", "datafusion-execution 51.0.0", @@ -3032,8 +3014,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c25210520a9dcf9c2b2cbbce31ebd4131ef5af7fc60ee92b266dc7d159cb305" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "ahash 0.8.12", "arrow 57.1.0", @@ -3066,8 +3047,7 @@ dependencies = [ [[package]] name = "datafusion-functions-aggregate-common" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62f4a66f3b87300bb70f4124b55434d2ae3fe80455f3574701d0348da040b55d" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "ahash 0.8.12", "arrow 57.1.0", @@ -3101,8 +3081,7 @@ dependencies = [ [[package]] name = "datafusion-functions-nested" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae5c06eed03918dc7fe7a9f082a284050f0e9ecf95d72f57712d1496da03b8c4" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "arrow-ord 57.1.0", @@ -3140,8 +3119,7 @@ dependencies = [ [[package]] name = "datafusion-functions-table" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db4fed1d71738fbe22e2712d71396db04c25de4111f1ec252b8f4c6d3b25d7f5" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "async-trait", @@ -3174,8 +3152,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d92206aa5ae21892f1552b4d61758a862a70956e6fd7a95cb85db1de74bc6d1" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "datafusion-common 51.0.0", @@ -3202,8 +3179,7 @@ dependencies = [ [[package]] name = "datafusion-functions-window-common" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53ae9bcc39800820d53a22d758b3b8726ff84a5a3e24cecef04ef4e5fdf1c7cc" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "datafusion-common 51.0.0", "datafusion-physical-expr-common 51.0.0", @@ -3217,18 +3193,17 @@ checksum = "ec6f637bce95efac05cdfb9b6c19579ed4aa5f6b94d951cfa5bb054b7bb4f730" dependencies = [ "datafusion-expr 50.3.0", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] name = "datafusion-macros" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1063ad4c9e094b3f798acee16d9a47bd7372d9699be2de21b05c3bd3f34ab848" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "datafusion-doc 51.0.0", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -3253,8 +3228,7 @@ dependencies = [ [[package]] name = "datafusion-optimizer" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f35f9ec5d08b87fd1893a30c2929f2559c2f9806ca072d8fefca5009dc0f06a" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "chrono", @@ -3295,8 +3269,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c30cc8012e9eedcb48bbe112c6eff4ae5ed19cf3003cb0f505662e88b7014c5d" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "ahash 0.8.12", "arrow 57.1.0", @@ -3306,12 +3279,13 @@ dependencies = [ "datafusion-functions-aggregate-common 51.0.0", "datafusion-physical-expr-common 51.0.0", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.14.0", "parking_lot", "paste", "petgraph 0.8.3", + "tokio", ] [[package]] @@ -3332,8 +3306,7 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-adapter" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9ff2dbd476221b1f67337699eff432781c4e6e1713d2aefdaa517dfbf79768" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "datafusion-common 51.0.0", @@ -3361,15 +3334,17 @@ dependencies = [ [[package]] name = "datafusion-physical-expr-common" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90da43e1ec550b172f34c87ec68161986ced70fd05c8d2a2add66eef9c276f03" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "ahash 0.8.12", "arrow 57.1.0", + "chrono", "datafusion-common 51.0.0", "datafusion-expr-common 51.0.0", - "hashbrown 0.14.5", + "hashbrown 0.16.1", + "indexmap", "itertools 0.14.0", + "parking_lot", ] [[package]] @@ -3394,8 +3369,7 @@ dependencies = [ [[package]] name = "datafusion-physical-optimizer" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce9804f799acd7daef3be7aaffe77c0033768ed8fdbf5fb82fc4c5f2e6bc14e6" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "datafusion-common 51.0.0", @@ -3443,26 +3417,25 @@ dependencies = [ [[package]] name = "datafusion-physical-plan" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0acf0ad6b6924c6b1aa7d213b181e012e2d3ec0a64ff5b10ee6282ab0f8532ac" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "ahash 0.8.12", "arrow 57.1.0", "arrow-ord 57.1.0", "arrow-schema 57.1.0", "async-trait", - "chrono", "datafusion-common 51.0.0", "datafusion-common-runtime 51.0.0", "datafusion-execution 51.0.0", "datafusion-expr 51.0.0", + "datafusion-functions 51.0.0", "datafusion-functions-aggregate-common 51.0.0", "datafusion-functions-window-common 51.0.0", "datafusion-physical-expr 51.0.0", "datafusion-physical-expr-common 51.0.0", "futures", "half", - "hashbrown 0.14.5", + "hashbrown 0.16.1", "indexmap", "itertools 0.14.0", "log", @@ -3492,8 +3465,7 @@ dependencies = [ [[package]] name = "datafusion-pruning" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac2c2498a1f134a9e11a9f5ed202a2a7d7e9774bd9249295593053ea3be999db" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "datafusion-common 51.0.0", @@ -3533,8 +3505,7 @@ dependencies = [ [[package]] name = "datafusion-session" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f96eebd17555386f459037c65ab73aae8df09f464524c709d6a3134ad4f4776" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "async-trait", "datafusion-common 51.0.0", @@ -3563,8 +3534,7 @@ dependencies = [ [[package]] name = "datafusion-sql" version = "51.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fc195fe60634b2c6ccfd131b487de46dc30eccae8a3c35a13f136e7f440414f" +source = "git+https://github.com/apache/datafusion?branch=branch-52#aee5cd9f3517b2ac9536fd4eb254f1e1349711df" dependencies = [ "arrow 57.1.0", "bigdecimal", @@ -3638,7 +3608,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -3659,7 +3629,7 @@ dependencies = [ "convert_case", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -3703,7 +3673,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -3714,7 +3684,7 @@ checksum = "8dc51d98e636f5e3b0759a39257458b22619cac7e96d932da6eeb052891bb67c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -3813,7 +3783,7 @@ checksum = "685adfa4d6f3d765a26bc5dbc936577de9abf756c1feeb3089b01dd395034842" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -4204,7 +4174,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -4254,17 +4224,16 @@ dependencies = [ [[package]] name = "generator" -version = "0.8.8" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52f04ae4152da20c76fe800fa48659201d5cf627c5149ca0b707b69d7eef6cf9" +checksum = "605183a538e3e2a9c1038635cc5c2d194e2ee8fd0d1b66b8349fad7dbacce5a2" dependencies = [ "cc", "cfg-if", "libc", "log", "rustversion", - "windows-link 0.2.1", - "windows-result 0.3.4", + "windows", ] [[package]] @@ -4348,9 +4317,9 @@ checksum = "f9e2d4c0a8296178d8802098410ca05d86b17a10bb5ab559b3fb404c1f948220" [[package]] name = "h2" -version = "0.4.13" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" +checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" dependencies = [ "atomic-waker", "bytes", @@ -4603,9 +4572,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.19" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" +checksum = "52e9a2a24dc5c6821e71a7030e1e14b7b632acac55c40e9d2e082c621261bb56" dependencies = [ "base64", "bytes", @@ -4785,7 +4754,7 @@ checksum = "a0eb5a3343abf848c0984fe4604b2b105da9539376e24fc0a3b0007411ae4fd9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -4833,14 +4802,13 @@ dependencies = [ [[package]] name = "insta" -version = "1.46.0" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b66886d14d18d420ab5052cbff544fc5d34d0b2cdd35eb5976aaa10a4a472e5" +checksum = "e8732d3774162a0851e3f2b150eb98f31a9885dd75985099421d393385a01dfd" dependencies = [ "console 0.15.11", "once_cell", "similar", - "tempfile", ] [[package]] @@ -4853,7 +4821,7 @@ dependencies = [ "indoc", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -4879,9 +4847,9 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" [[package]] name = "iri-string" -version = "0.7.10" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397" dependencies = [ "memchr", "serde", @@ -4958,7 +4926,7 @@ checksum = "b787bebb543f8969132630c51fd0afab173a86c6abae56ff3b9e5e3e3f9f6e58" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -5704,9 +5672,9 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.12" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d0b95e02c851351f877147b7deea7b1afb1df71b63aa5f8270716e0c5720616" +checksum = "416f7e718bdb06000964960ffa43b4335ad4012ae8b99060261aa4a8088d5ccb" dependencies = [ "bitflags 2.10.0", "libc", @@ -6062,7 +6030,7 @@ checksum = "b093064383341eb3271f42e381cb8f10a01459478446953953c75d24bd339fc0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", "target-features", ] @@ -6081,7 +6049,7 @@ dependencies = [ "libc", "log", "openssl", - "openssl-probe 0.1.6", + "openssl-probe", "openssl-sys", "schannel", "security-framework 2.11.1", @@ -6246,9 +6214,9 @@ dependencies = [ [[package]] name = "ntapi" -version = "0.4.2" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c70f219e21142367c70c0b30c6a9e3a14d55b4d12a204d897fbec83a0363f081" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" dependencies = [ "winapi", ] @@ -6325,7 +6293,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -6397,7 +6365,7 @@ checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -6554,7 +6522,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -6563,12 +6531,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" -[[package]] -name = "openssl-probe" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f50d9b3dabb09ecd771ad0aa242ca6894994c130308ca3d7684634df8037391" - [[package]] name = "openssl-sys" version = "0.9.111" @@ -6747,7 +6709,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -6814,9 +6776,9 @@ dependencies = [ [[package]] name = "parquet" -version = "57.0.0" +version = "57.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0f31027ef1af7549f7cec603a9a21dce706d3f8d7c2060a68f43c1773be95a" +checksum = "be3e4f6d320dd92bfa7d612e265d7d08bba0a240bab86af3425e1d255a511d89" dependencies = [ "ahash 0.8.12", "arrow-array 57.1.0", @@ -6834,7 +6796,7 @@ dependencies = [ "futures", "half", "hashbrown 0.16.1", - "lz4_flex 0.11.5", + "lz4_flex 0.12.0", "num-bigint", "num-integer", "num-traits", @@ -6950,7 +6912,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7034,7 +6996,7 @@ dependencies = [ "phf_shared 0.11.3", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7072,7 +7034,7 @@ checksum = "6e918e4ff8c4549eb882f14b3a4bc8c8bc93de829416eacf579f1207a8fbf861" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7225,7 +7187,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7267,7 +7229,7 @@ dependencies = [ "proc-macro-error-attr2", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7325,7 +7287,7 @@ dependencies = [ "prost 0.13.5", "prost-types 0.13.5", "regex", - "syn 2.0.110", + "syn 2.0.113", "tempfile", ] @@ -7345,7 +7307,7 @@ dependencies = [ "prost 0.14.1", "prost-types 0.14.1", "regex", - "syn 2.0.110", + "syn 2.0.113", "tempfile", ] @@ -7359,7 +7321,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7372,7 +7334,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7385,7 +7347,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7492,7 +7454,7 @@ dependencies = [ "proc-macro2", "pyo3-macros-backend", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7505,7 +7467,7 @@ dependencies = [ "proc-macro2", "pyo3-build-config", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7580,7 +7542,7 @@ dependencies = [ "once_cell", "socket2 0.6.1", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -7729,9 +7691,9 @@ dependencies = [ [[package]] name = "rangemap" -version = "1.7.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "973443cf09a9c8656b574a866ab68dfa19f0867d0340648c7d2f6a71b8a8ea68" +checksum = "acbbbbea733ec66275512d0b9694f34102e7d5406fdbe2ad8d21b28dce92887c" [[package]] name = "ratatui" @@ -7881,7 +7843,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -7968,9 +7930,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.12.28" +version = "0.12.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +checksum = "3b4c14b2d9afca6a60277086b0cc6a6ae0b568f6f7916c943a8cdc79f8be240f" dependencies = [ "base64", "bytes", @@ -8032,9 +7994,9 @@ dependencies = [ [[package]] name = "rkyv" -version = "0.7.46" +version = "0.7.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2297bf9c81a3f0dc96bc9521370b88f054168c29826a75e89c55ff196e7ed6a1" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" dependencies = [ "bitvec", "bytecheck", @@ -8050,9 +8012,9 @@ dependencies = [ [[package]] name = "rkyv_derive" -version = "0.7.46" +version = "0.7.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84d7b42d4b8d06048d3ac8db0eb31bcb942cbeb709f0b5f2b2ebde398d3038f5" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" dependencies = [ "proc-macro2", "quote", @@ -8081,9 +8043,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.9.10" +version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" +checksum = "40a0376c50d0358279d9d643e4bf7b7be212f1f4ff1da9070a7b54d22ef75c88" dependencies = [ "const-oid", "digest", @@ -8125,7 +8087,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.110", + "syn 2.0.113", "unicode-ident", ] @@ -8137,7 +8099,7 @@ checksum = "b3a8fb4672e840a587a66fc577a5491375df51ddb88f2a2c2a792598c326fe14" dependencies = [ "quote", "rand 0.8.5", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -8225,9 +8187,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.36" +version = "0.23.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" +checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" dependencies = [ "aws-lc-rs", "once_cell", @@ -8240,11 +8202,11 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.8.3" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +checksum = "9980d917ebb0c0536119ba501e90834767bffc3d60641457fd84a1f3fd337923" dependencies = [ - "openssl-probe 0.2.0", + "openssl-probe", "rustls-pki-types", "schannel", "security-framework 3.5.1", @@ -8261,9 +8223,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.13.2" +version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21e6f2ab2928ca4291b86736a8bd920a277a399bba1589409d72154ff87c1282" +checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a" dependencies = [ "web-time", "zeroize", @@ -8452,7 +8414,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -8476,14 +8438,14 @@ checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] name = "serde_spanned" -version = "1.0.4" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8bbf91e5a4d6315eee45e704372590b30e260ee83af6639d64557f51b067776" +checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392" dependencies = [ "serde_core", ] @@ -8710,7 +8672,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -8783,7 +8745,7 @@ checksum = "da5fc6819faabb412da764b99d3b713bb55083c11e7e0c00144d386cd6a1939c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -8854,7 +8816,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -8866,7 +8828,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -8888,9 +8850,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.110" +version = "2.0.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a99801b5bd34ede4cf3fc688c5919368fea4e4814a4664359503e6015b280aea" +checksum = "678faa00651c9eb72dd2020cbdf275d92eccb2400d568e419efdd64838145cb4" dependencies = [ "proc-macro2", "quote", @@ -8914,7 +8876,7 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -9280,7 +9242,7 @@ dependencies = [ "quote", "regex", "reqwest", - "syn 2.0.110", + "syn 2.0.113", "sysinfo 0.35.2", "uzers", "which", @@ -9321,7 +9283,7 @@ checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -9332,7 +9294,7 @@ checksum = "3ff15c8ecd7de3849db632e14d18d2571fa09dfc5ed93479bc4485c7a517c913" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -9466,7 +9428,7 @@ checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -9560,9 +9522,9 @@ dependencies = [ [[package]] name = "toml_writer" -version = "1.0.6+spec-1.1.0" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" +checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" [[package]] name = "tonic" @@ -9682,7 +9644,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -9782,9 +9744,9 @@ dependencies = [ [[package]] name = "unicase" -version = "2.9.0" +version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" [[package]] name = "unicode-ident" @@ -9883,14 +9845,14 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.18.1" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" +checksum = "e2e054861b4bd027cd373e18e8d8d8e6548085000e41290d95ce0c373a654b4a" dependencies = [ "atomic", "getrandom 0.3.4", "js-sys", - "serde", + "serde_core", "wasm-bindgen", ] @@ -9932,7 +9894,7 @@ dependencies = [ "fastlanes", "itertools 0.14.0", "mimalloc", - "parquet 57.0.0", + "parquet 57.1.0", "rand 0.9.2", "serde_json", "tokio", @@ -10074,7 +10036,7 @@ dependencies = [ "noodles-bgzf", "noodles-vcf", "parking_lot", - "parquet 57.0.0", + "parquet 57.1.0", "rand 0.9.2", "regex", "reqwest", @@ -10784,7 +10746,7 @@ dependencies = [ "humansize", "indicatif", "itertools 0.14.0", - "parquet 57.0.0", + "parquet 57.1.0", "ratatui", "taffy", "tokio", @@ -10940,7 +10902,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", "wasm-bindgen-shared", ] @@ -10988,9 +10950,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "1.0.5" +version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c" +checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" dependencies = [ "rustls-pki-types", ] @@ -11163,7 +11125,7 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -11174,7 +11136,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -11597,7 +11559,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", "synstructure", ] @@ -11618,7 +11580,7 @@ checksum = "c640b22cd9817fae95be82f0d2f90b11f7605f6c319d16705c459b27ac2cbc26" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -11638,7 +11600,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", "synstructure", ] @@ -11653,13 +11615,13 @@ dependencies = [ [[package]] name = "zeroize_derive" -version = "1.4.3" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] @@ -11692,7 +11654,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.110", + "syn 2.0.113", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6c62d41c9d1..320834f6b0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,16 +63,16 @@ anyhow = "1.0.95" arbitrary = "1.3.2" arcref = "0.2.0" arrayref = "0.3.7" -arrow-arith = "57" -arrow-array = "57" -arrow-buffer = "57" -arrow-cast = "57" -arrow-data = "57" -arrow-ipc = "57" -arrow-ord = "57" -arrow-schema = "57" -arrow-select = "57" -arrow-string = "57" +arrow-arith = "57.1" +arrow-array = "57.1" +arrow-buffer = "57.1" +arrow-cast = "57.1" +arrow-data = "57.1" +arrow-ipc = "57.1" +arrow-ord = "57.1" +arrow-schema = "57.1" +arrow-select = "57.1" +arrow-string = "57.1" async-compat = "0.2.5" async-fs = "2.2.0" async-stream = "0.3.6" @@ -80,7 +80,7 @@ async-trait = "0.1.89" bindgen = "0.72.0" bit-vec = "0.8.0" bitvec = "1.0.1" -bytes = "1.10" +bytes = "1.11" bzip2 = "0.6.0" cbindgen = "0.29.0" cc = "1.2" @@ -122,7 +122,7 @@ get_dir = "0.5.0" glob = "0.3.2" goldenfile = "1" half = { version = "2.7.1", features = ["std", "num-traits"] } -hashbrown = "0.16.0" +hashbrown = "0.16.1" humansize = "2.1.3" indicatif = "0.18.0" insta = "1.43" @@ -149,7 +149,7 @@ opentelemetry = "0.31.0" opentelemetry-otlp = "0.31.0" opentelemetry_sdk = "0.31.0" parking_lot = { version = "0.12.3", features = ["nightly"] } -parquet = "57" +parquet = "57.1" paste = "1.0.15" pco = "0.4.4" pin-project-lite = "0.2.15" @@ -203,7 +203,7 @@ tracing = { version = "0.1.41", default-features = false } tracing-perfetto = "0.1.5" tracing-subscriber = "0.3" url = "2.5.7" -uuid = { version = "1.18", features = ["js"] } +uuid = { version = "1.19", features = ["js"] } walkdir = "2.5.0" wasm-bindgen-futures = "0.4.39" witchcraft-metrics = "1.0.1" @@ -335,3 +335,20 @@ lto = false [profile.bench_assert] debug-assertions = true inherits = "bench" + +[patch.crates-io] +datafusion = { version = "51", default-features = false, features = [ + "sql", +], git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-catalog = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-common = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-common-runtime = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-datasource = { version = "51", default-features = false, git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-execution = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-expr = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-functions = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-physical-expr = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-physical-expr-adapter = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-physical-expr-common = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-physical-plan = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } +datafusion-pruning = { version = "51", git = "https://github.com/apache/datafusion", branch = "branch-52" } diff --git a/benchmarks/datafusion-bench/src/lib.rs b/benchmarks/datafusion-bench/src/lib.rs index 29ee77b422a..97dce7e89c3 100644 --- a/benchmarks/datafusion-bench/src/lib.rs +++ b/benchmarks/datafusion-bench/src/lib.rs @@ -11,9 +11,9 @@ use datafusion::datasource::file_format::csv::CsvFormat; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::provider::DefaultTableFactory; use datafusion::execution::SessionStateBuilder; +use datafusion::execution::cache::DefaultListFilesCache; use datafusion::execution::cache::cache_manager::CacheManagerConfig; use datafusion::execution::cache::cache_unit::DefaultFileStatisticsCache; -use datafusion::execution::cache::cache_unit::DefaultListFilesCache; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::prelude::SessionConfig; use datafusion::prelude::SessionContext; diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 080a44dfc71..8d2b32f39fb 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -5,10 +5,17 @@ use std::sync::Arc; use arrow_schema::DataType; use arrow_schema::Schema; +use datafusion_common::Result as DFResult; +use datafusion_common::exec_datafusion_err; +use datafusion_common::tree_node::TreeNode; +use datafusion_common::tree_node::TreeNodeRecursion; use datafusion_expr::Operator as DFOperator; use datafusion_functions::core::getfield::GetFieldFunc; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::projection::ProjectionExpr; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr; use datafusion_physical_plan::expressions as df_expr; use itertools::Itertools; @@ -16,9 +23,6 @@ use vortex::compute::LikeOptions; use vortex::dtype::DType; use vortex::dtype::Nullability; use vortex::dtype::arrow::FromArrowType; -use vortex::error::VortexResult; -use vortex::error::vortex_bail; -use vortex::error::vortex_err; use vortex::expr::Binary; use vortex::expr::Expression; use vortex::expr::Like; @@ -31,20 +35,27 @@ use vortex::expr::is_null; use vortex::expr::list_contains; use vortex::expr::lit; use vortex::expr::not; +use vortex::expr::pack; use vortex::expr::root; use vortex::scalar::Scalar; use crate::convert::FromDataFusion; +/// Result of splitting a projection into Vortex expressions and leftover DataFusion projections. +pub struct ProcessedProjection { + pub scan_projection: Expression, + pub leftover_projection: ProjectionExprs, +} + /// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty. pub(crate) fn make_vortex_predicate( expr_convertor: &dyn ExpressionConvertor, predicate: &[Arc], -) -> VortexResult> { +) -> DFResult> { let exprs = predicate .iter() .map(|e| expr_convertor.convert(e.as_ref())) - .collect::>>()?; + .collect::>>()?; Ok(exprs.into_iter().reduce(and)) } @@ -55,7 +66,16 @@ pub trait ExpressionConvertor: Send + Sync { fn can_be_pushed_down(&self, expr: &Arc, schema: &Schema) -> bool; /// Try and convert a DataFusion [`PhysicalExpr`] into a Vortex [`Expression`]. - fn convert(&self, expr: &dyn PhysicalExpr) -> VortexResult; + fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult; + + /// Split a projection into Vortex expressions that can be pushed down and leftover + /// DataFusion projections that need to be evaluated after the scan. + fn split_projection( + &self, + source_projection: ProjectionExprs, + input_schema: &Schema, + output_schema: &Schema, + ) -> DFResult; } /// The default [`ExpressionConvertor`]. @@ -64,46 +84,44 @@ pub struct DefaultExpressionConvertor {} impl DefaultExpressionConvertor { /// Attempts to convert a DataFusion ScalarFunctionExpr to a Vortex expression. - fn try_convert_scalar_function( - &self, - scalar_fn: &ScalarFunctionExpr, - ) -> VortexResult { + fn try_convert_scalar_function(&self, scalar_fn: &ScalarFunctionExpr) -> DFResult { if let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::(scalar_fn) { let source_expr = get_field_fn .args() .first() - .ok_or_else(|| vortex_err!("get_field missing source expression"))? + .ok_or_else(|| exec_datafusion_err!("get_field missing source expression"))? .as_ref(); let field_name_expr = get_field_fn .args() .get(1) - .ok_or_else(|| vortex_err!("get_field missing field name argument"))?; + .ok_or_else(|| exec_datafusion_err!("get_field missing field name argument"))?; let field_name = field_name_expr .as_any() .downcast_ref::() - .ok_or_else(|| vortex_err!("get_field field name must be a literal"))? + .ok_or_else(|| exec_datafusion_err!("get_field field name must be a literal"))? .value() .try_as_str() .flatten() - .ok_or_else(|| vortex_err!("get_field field name must be a UTF-8 string"))?; + .ok_or_else(|| { + exec_datafusion_err!("get_field field name must be a UTF-8 string") + })?; return Ok(get_item(field_name.to_string(), self.convert(source_expr)?)); } - tracing::debug!( - function_name = scalar_fn.name(), - "Unsupported ScalarFunctionExpr" - ); - vortex_bail!("Unsupported ScalarFunctionExpr: {}", scalar_fn.name()) + Err(exec_datafusion_err!( + "Unsupported ScalarFunctionExpr: {}", + scalar_fn.name() + )) } } impl ExpressionConvertor for DefaultExpressionConvertor { fn can_be_pushed_down(&self, expr: &Arc, schema: &Schema) -> bool { - can_be_pushed_down(expr, schema) + can_be_pushed_down_impl(expr, schema) } - fn convert(&self, df: &dyn PhysicalExpr) -> VortexResult { + fn convert(&self, df: &dyn PhysicalExpr) -> DFResult { // TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep // for that node, up to any `and` or `or` node. if let Some(binary_expr) = df.as_any().downcast_ref::() { @@ -168,7 +186,7 @@ impl ExpressionConvertor for DefaultExpressionConvertor { if let Some(lit) = e.as_any().downcast_ref::() { Ok(Scalar::from_df(lit.value())) } else { - Err(vortex_err!("Failed to cast sub-expression")) + Err(exec_datafusion_err!("Failed to cast sub-expression")) } }) .try_collect()?; @@ -187,11 +205,93 @@ impl ExpressionConvertor for DefaultExpressionConvertor { return self.try_convert_scalar_function(scalar_fn); } - vortex_bail!("Couldn't convert DataFusion physical {df} expression to a vortex expression") + Err(exec_datafusion_err!( + "Couldn't convert DataFusion physical {df} expression to a vortex expression" + )) + } + + fn split_projection( + &self, + source_projection: ProjectionExprs, + input_schema: &Schema, + output_schema: &Schema, + ) -> DFResult { + let mut scan_projection = vec![]; + let mut leftover_projection: Vec = vec![]; + + for projection_expr in source_projection.iter() { + let r = projection_expr.expr.apply(|node| { + // We only pull column children of scalar functions that we can't push into the scan. + if let Some(scalar_fn_expr) = node.as_any().downcast_ref::() + && !can_scalar_fn_be_pushed_down(scalar_fn_expr) + { + scan_projection.extend( + collect_columns(node) + .into_iter() + .map(|c| (c.name().to_string(), get_item(c.name(), root()))), + ); + + leftover_projection.push(projection_expr.clone()); + return Ok(TreeNodeRecursion::Stop); + } + + // If the projection contains a `CastColumnExpr` that casts to physical types that don't have a 1:1 mapping + // with Vortex's types system, we make sure to pull the input from the file and keep the projection + if let Some(cast_expr) = node.as_any().downcast_ref::() + && is_dtype_incompatible(cast_expr.target_field().data_type()) + { + scan_projection.push(( + cast_expr.input_field().name().clone(), + self.convert(cast_expr.expr().as_ref())?, + )); + leftover_projection.push(projection_expr.clone()); + return Ok(TreeNodeRecursion::Stop); + } + + // DataFusion assumes different decimal types can be coerced. + // Vortex expects a perfect match so we don't push it down. + if let Some(binary_expr) = node.as_any().downcast_ref::() + && binary_expr.op().is_numerical_operators() + && (is_decimal(&binary_expr.left().data_type(input_schema)?) + && is_decimal(&binary_expr.right().data_type(input_schema)?)) + { + scan_projection.extend( + collect_columns(node) + .into_iter() + .map(|c| (c.name().to_string(), get_item(c.name(), root()))), + ); + + leftover_projection.push(projection_expr.clone()); + return Ok(TreeNodeRecursion::Stop); + } + + Ok(TreeNodeRecursion::Continue) + })?; + + // if we didn't stop early + if matches!(r, TreeNodeRecursion::Continue) { + scan_projection.push(( + projection_expr.alias.clone(), + self.convert(projection_expr.expr.as_ref())?, + )); + leftover_projection.push(ProjectionExpr { + expr: Arc::new(df_expr::Column::new_with_schema( + projection_expr.alias.as_str(), + output_schema, + )?), + alias: projection_expr.alias.clone(), + }); + } + } + + Ok(ProcessedProjection { + scan_projection: pack(scan_projection, Nullability::NonNullable), + leftover_projection: leftover_projection.into(), + }) } } -fn try_operator_from_df(value: &DFOperator) -> VortexResult { +fn try_operator_from_df(value: &DFOperator) -> DFResult { match value { DFOperator::Eq => Ok(Operator::Eq), DFOperator::NotEq => Ok(Operator::NotEq), @@ -236,12 +336,14 @@ fn try_operator_from_df(value: &DFOperator) -> VortexResult { | DFOperator::QuestionAnd | DFOperator::QuestionPipe => { tracing::debug!(operator = %value, "Can't pushdown binary_operator operator"); - Err(vortex_err!("Unsupported datafusion operator {value}")) + Err(exec_datafusion_err!( + "Unsupported datafusion operator {value}" + )) } } } -pub(crate) fn can_be_pushed_down(df_expr: &Arc, schema: &Schema) -> bool { +fn can_be_pushed_down_impl(df_expr: &Arc, schema: &Schema) -> bool { // We currently do not support pushdown of dynamic expressions in DF. // See issue: https://github.com/vortex-data/vortex/issues/4034 if is_dynamic_physical_expr(df_expr) { @@ -257,7 +359,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &Arc, schema: &Schem .ok() .is_some_and(|field| supported_data_types(field.data_type())) } else if let Some(like) = expr.downcast_ref::() { - can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema) + can_be_pushed_down_impl(like.expr(), schema) + && can_be_pushed_down_impl(like.pattern(), schema) } else if let Some(lit) = expr.downcast_ref::() { supported_data_types(&lit.value().data_type()) } else if expr.downcast_ref::().is_some() @@ -265,12 +368,15 @@ pub(crate) fn can_be_pushed_down(df_expr: &Arc, schema: &Schem { true } else if let Some(is_null) = expr.downcast_ref::() { - can_be_pushed_down(is_null.arg(), schema) + can_be_pushed_down_impl(is_null.arg(), schema) } else if let Some(is_not_null) = expr.downcast_ref::() { - can_be_pushed_down(is_not_null.arg(), schema) + can_be_pushed_down_impl(is_not_null.arg(), schema) } else if let Some(in_list) = expr.downcast_ref::() { - can_be_pushed_down(in_list.expr(), schema) - && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) + can_be_pushed_down_impl(in_list.expr(), schema) + && in_list + .list() + .iter() + .all(|e| can_be_pushed_down_impl(e, schema)) } else if let Some(scalar_fn) = expr.downcast_ref::() { can_scalar_fn_be_pushed_down(scalar_fn) } else { @@ -282,8 +388,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &Arc, schema: &Schem fn can_binary_be_pushed_down(binary: &df_expr::BinaryExpr, schema: &Schema) -> bool { let is_op_supported = try_operator_from_df(binary.op()).is_ok(); is_op_supported - && can_be_pushed_down(binary.left(), schema) - && can_be_pushed_down(binary.right(), schema) + && can_be_pushed_down_impl(binary.left(), schema) + && can_be_pushed_down_impl(binary.right(), schema) } fn supported_data_types(dt: &DataType) -> bool { @@ -324,6 +430,36 @@ fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool { ScalarFunctionExpr::try_downcast_func::(scalar_fn).is_some() } +fn is_dtype_incompatible(dt: &DataType) -> bool { + use DataType::*; + + dt.is_run_ends_type() + || is_decimal(dt) + || matches!( + dt, + Dictionary(..) + | Utf8 + | LargeUtf8 + | Binary + | LargeBinary + | FixedSizeBinary(_) + | FixedSizeList(..) + | ListView(..) + | LargeListView(..) + ) +} + +// TODO(adam): Replace with `DataType::is_decimal` once its released. +fn is_decimal(dt: &DataType) -> bool { + matches!( + dt, + DataType::Decimal32(_, _) + | DataType::Decimal64(_, _) + | DataType::Decimal128(_, _) + | DataType::Decimal256(_, _) + ) +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -548,7 +684,7 @@ mod tests { fn test_can_be_pushed_down_column_supported(test_schema: Schema) { let col_expr = Arc::new(df_expr::Column::new("id", 0)) as Arc; - assert!(can_be_pushed_down(&col_expr, &test_schema)); + assert!(can_be_pushed_down_impl(&col_expr, &test_schema)); } #[rstest] @@ -556,14 +692,14 @@ mod tests { let col_expr = Arc::new(df_expr::Column::new("unsupported_list", 5)) as Arc; - assert!(!can_be_pushed_down(&col_expr, &test_schema)); + assert!(!can_be_pushed_down_impl(&col_expr, &test_schema)); } #[rstest] fn test_can_be_pushed_down_column_not_found(test_schema: Schema) { let col_expr = Arc::new(df_expr::Column::new("nonexistent", 99)) as Arc; - assert!(!can_be_pushed_down(&col_expr, &test_schema)); + assert!(!can_be_pushed_down_impl(&col_expr, &test_schema)); } #[rstest] @@ -571,7 +707,7 @@ mod tests { let lit_expr = Arc::new(df_expr::Literal::new(ScalarValue::Int32(Some(42)))) as Arc; - assert!(can_be_pushed_down(&lit_expr, &test_schema)); + assert!(can_be_pushed_down_impl(&lit_expr, &test_schema)); } #[rstest] @@ -581,7 +717,7 @@ mod tests { let lit_expr = Arc::new(df_expr::Literal::new(unsupported_literal)) as Arc; - assert!(!can_be_pushed_down(&lit_expr, &test_schema)); + assert!(!can_be_pushed_down_impl(&lit_expr, &test_schema)); } #[rstest] @@ -592,7 +728,7 @@ mod tests { let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right)) as Arc; - assert!(can_be_pushed_down(&binary_expr, &test_schema)); + assert!(can_be_pushed_down_impl(&binary_expr, &test_schema)); } #[rstest] @@ -606,7 +742,7 @@ mod tests { right, )) as Arc; - assert!(!can_be_pushed_down(&binary_expr, &test_schema)); + assert!(!can_be_pushed_down_impl(&binary_expr, &test_schema)); } #[rstest] @@ -617,7 +753,7 @@ mod tests { let binary_expr = Arc::new(df_expr::BinaryExpr::new(left, DFOperator::Eq, right)) as Arc; - assert!(!can_be_pushed_down(&binary_expr, &test_schema)); + assert!(!can_be_pushed_down_impl(&binary_expr, &test_schema)); } #[rstest] @@ -629,7 +765,7 @@ mod tests { let like_expr = Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc; - assert!(can_be_pushed_down(&like_expr, &test_schema)); + assert!(can_be_pushed_down_impl(&like_expr, &test_schema)); } #[rstest] @@ -641,6 +777,6 @@ mod tests { let like_expr = Arc::new(df_expr::LikeExpr::new(false, false, expr, pattern)) as Arc; - assert!(!can_be_pushed_down(&like_expr, &test_schema)); + assert!(!can_be_pushed_down_impl(&like_expr, &test_schema)); } } diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 245f5547ac3..00aafe0f946 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -10,7 +10,6 @@ use vortex::expr::stats::Precision; mod convert; mod persistent; -pub mod vendor; pub use convert::exprs::ExpressionConvertor; pub use persistent::*; diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index f8a64c0320c..98182eb6b5c 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -17,16 +17,17 @@ use datafusion_common::Result as DFResult; use datafusion_common::Statistics; use datafusion_common::config::ConfigField; use datafusion_common::config_namespace; +use datafusion_common::internal_datafusion_err; use datafusion_common::not_impl_err; use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::stats::Precision; use datafusion_common_runtime::SpawnedTask; +use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_compression_type::FileCompressionType; use datafusion_datasource::file_format::FileFormat; use datafusion_datasource::file_format::FileFormatFactory; use datafusion_datasource::file_scan_config::FileScanConfig; -use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_datasource::file_sink_config::FileSinkConfig; use datafusion_datasource::sink::DataSinkExec; use datafusion_datasource::source::DataSourceExec; @@ -363,6 +364,7 @@ impl FileFormat for VortexFormat { is_constant.as_exact().map(|_| Precision::Exact(1)) }) .unwrap_or(Precision::Absent), + byte_size: Precision::Absent, } }) .collect::>(); @@ -386,14 +388,12 @@ impl FileFormat for VortexFormat { _state: &dyn Session, file_scan_config: FileScanConfig, ) -> DFResult> { - let source = VortexSource::new(self.session.clone(), self.file_cache.clone()); - let source = Arc::new(source); + // We make sure the scan's source is the right type, but we don't have anything else to do here. + if !file_scan_config.file_source().as_any().is::() { + return Err(internal_datafusion_err!("Expected VortexSource")); + } - Ok(DataSourceExec::from_data_source( - FileScanConfigBuilder::from(file_scan_config) - .with_source(source) - .build(), - )) + Ok(DataSourceExec::from_data_source(file_scan_config)) } async fn create_writer_physical_plan( @@ -413,8 +413,9 @@ impl FileFormat for VortexFormat { Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _) } - fn file_source(&self) -> Arc { + fn file_source(&self, table_schema: TableSchema) -> Arc { Arc::new(VortexSource::new( + table_schema, self.session.clone(), self.file_cache.clone(), )) diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index a409c4a4d27..21033dbbccb 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -66,7 +66,6 @@ mod tests { use vortex::array::arrays::VarBinArray; use vortex::array::validity::Validity; use vortex::buffer::buffer; - use vortex::error::vortex_err; use vortex::file::WriteOptionsSessionExt; use vortex::session::VortexSession; @@ -75,10 +74,8 @@ mod tests { use crate::persistent::register_vortex_format_factory; #[rstest] - #[case(Some(1))] - #[case(None)] #[tokio::test] - async fn query_file(#[case] limit: Option) -> anyhow::Result<()> { + async fn test_query_file(#[values(Some(1), None)] limit: Option) -> anyhow::Result<()> { let session = VortexSession::default(); let temp_dir = tempdir()?; let strings = ChunkedArray::from_iter([ @@ -100,28 +97,26 @@ mod tests { Validity::NonNullable, )?; - let filepath = temp_dir.path().join("data.vortex"); - let mut f = OpenOptions::new() .write(true) - .create(true) - .truncate(true) - .open(&filepath) + .create_new(true) + .open(temp_dir.path().join("data.vortex")) .await?; - session + let summary = session .write_options() .write(&mut f, st.to_array_stream()) .await?; + assert_eq!(summary.row_count(), 8); + + // For reasons I don't understand, this is suddenly important on windows + drop(f); + let ctx = SessionContext::default(); let format = Arc::new(VortexFormat::new(session)); - let table_url = ListingTableUrl::parse( - temp_dir - .path() - .to_str() - .ok_or_else(|| vortex_err!("Path is not valid UTF-8"))?, - )?; + + let table_url = ListingTableUrl::parse(temp_dir.path().to_str().unwrap())?; assert!(table_url.is_collection()); let config = ListingTableConfig::new(table_url) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index b5474f7bc1e..6aabe4d0348 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -5,20 +5,23 @@ use std::ops::Range; use std::sync::Arc; use std::sync::Weak; -use arrow_schema::ArrowError; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; +use datafusion_common::ScalarValue; use datafusion_common::arrow::array::RecordBatch; +use datafusion_common::exec_datafusion_err; use datafusion_datasource::FileRange; use datafusion_datasource::PartitionedFile; use datafusion_datasource::TableSchema; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::projection::ProjectionExprs; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr::split_conjunction; +use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use datafusion_physical_expr_adapter::replace_columns_with_literals; use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr; use datafusion_physical_plan::metrics::Count; use datafusion_pruning::FilePruner; @@ -29,15 +32,9 @@ use futures::stream; use object_store::ObjectStore; use object_store::path::Path; use tracing::Instrument; -use vortex::array::Array; use vortex::array::ArrayRef; use vortex::array::arrow::ArrowArrayExecutor; -use vortex::dtype::FieldName; use vortex::error::VortexError; -use vortex::error::VortexResult; -use vortex::error::vortex_err; -use vortex::expr::root; -use vortex::expr::select; use vortex::layout::LayoutReader; use vortex::layout::layouts::USE_VORTEX_OPERATORS; use vortex::metrics::VortexMetrics; @@ -49,7 +46,7 @@ use vortex_utils::aliases::dash_map::Entry; use super::cache::VortexFileCache; use crate::VortexAccessPlan; use crate::convert::exprs::ExpressionConvertor; -use crate::convert::exprs::can_be_pushed_down; +use crate::convert::exprs::ProcessedProjection; use crate::convert::exprs::make_vortex_predicate; use crate::persistent::stream::PrunableStream; @@ -59,7 +56,7 @@ pub(crate) struct VortexOpener { pub object_store: Arc, /// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is /// all fields in the final scan result not including the partition columns. - pub projection: Option>, + pub projection: ProjectionExprs, /// Filter expression optimized for pushdown into Vortex scan operations. /// This may be a subset of file_pruning_predicate containing only expressions /// that Vortex can efficiently evaluate. @@ -67,8 +64,7 @@ pub(crate) struct VortexOpener { /// Filter expression used by DataFusion's FilePruner to eliminate files based on /// statistics and partition values without opening them. pub file_pruning_predicate: Option, - pub expr_adapter_factory: Option>, - pub schema_adapter_factory: Arc, + pub expr_adapter_factory: Arc, /// This is the table's schema without partition columns. It may contain fields which do /// not exist in the file, and are supplied by the `schema_adapter_factory`. pub table_schema: TableSchema, @@ -95,34 +91,43 @@ impl FileOpener for VortexOpener { fn open(&self, file: PartitionedFile) -> DFResult { let session = self.session.clone(); let object_store = self.object_store.clone(); - let projection = self.projection.clone(); + + let mut projection = self.projection.clone(); let mut filter = self.filter.clone(); + let file_pruning_predicate = self.file_pruning_predicate.clone(); let expr_adapter_factory = self.expr_adapter_factory.clone(); let file_cache = self.file_cache.clone(); let table_schema = self.table_schema.clone(); + let logical_file_schema = table_schema.file_schema().clone(); let batch_size = self.batch_size; let limit = self.limit; let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; - let projected_schema = match projection.as_ref() { - None => table_schema.file_schema().clone(), - Some(indices) => Arc::new(table_schema.file_schema().project(indices)?), - }; - - let schema_adapter = self - .schema_adapter_factory - .create(projected_schema, table_schema.table_schema().clone()); - - // Update partition column access in the filter to use literals instead - let partition_fields = self.table_schema.table_partition_cols().clone(); - let table_schema = self.table_schema.clone(); - let expr_convertor = self.expression_convertor.clone(); + // Replace column access for partition columns with literals + #[allow(clippy::disallowed_types)] + let literal_value_cols = table_schema + .table_partition_cols() + .iter() + .map(|f| f.name()) + .cloned() + .zip(file.partition_values.clone()) + .collect::>(); + + if !literal_value_cols.is_empty() { + projection = projection.try_map_exprs(|expr| { + replace_columns_with_literals(Arc::clone(&expr), &literal_value_cols) + })?; + filter = filter + .map(|p| replace_columns_with_literals(p, &literal_value_cols)) + .transpose()?; + } + Ok(async move { // Create FilePruner when we have a predicate and either dynamic expressions // or file statistics available. The pruner can eliminate files without @@ -135,20 +140,18 @@ impl FileOpener for VortexOpener { // to work with. Static predicates without stats won't benefit from pruning. is_dynamic_physical_expr(p) || file.has_statistics() }) - .map(|predicate| { - FilePruner::new( + .and_then(|predicate| { + FilePruner::try_new( predicate.clone(), - table_schema.file_schema(), - partition_fields, - file.clone(), + &logical_file_schema, + &file, Count::default(), ) - }) - .transpose()?; + }); // Check if this file should be pruned based on statistics/partition values. // Returns empty stream if file can be skipped entirely. - if let Some(file_pruner) = &mut file_pruner + if let Some(file_pruner) = file_pruner.as_mut() && file_pruner.should_prune()? { return Ok(stream::empty().boxed()); @@ -157,54 +160,53 @@ impl FileOpener for VortexOpener { let vxf = file_cache .try_get(&file.object_meta, object_store) .await - .map_err(|e| { - DataFusionError::Execution(format!("Failed to open Vortex file {e}")) - })?; + .map_err(|e| exec_datafusion_err!("Failed to open Vortex file {e}"))?; let physical_file_schema = Arc::new(vxf.dtype().to_arrow_schema().map_err(|e| { - DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) + exec_datafusion_err!("Failed to convert file schema to arrow: {e}") })?); - if let Some(expr_adapter_factory) = expr_adapter_factory { - // Replace column access for partition columns with literals - let partition_values = table_schema - .table_partition_cols() - .iter() - .cloned() - .zip(file.partition_values) - .collect(); - - // The adapter rewrites the expression to the local file schema, allowing - // for schema evolution and divergence between the table's schema and individual files. - filter = filter - .map(|filter| { - let expr = expr_adapter_factory - .create( - Arc::clone(table_schema.file_schema()), - Arc::clone(&physical_file_schema), - ) - .with_partition_values(partition_values) - .rewrite(filter)?; - - // Expression might now reference columns that don't exist in the file, so we can give it - // another simplification pass. - PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr) - }) - .transpose()?; - } + let projected_physical_schema = projection.project_schema(&logical_file_schema)?; + + let expr_adapter = expr_adapter_factory.create( + Arc::clone(&logical_file_schema), + Arc::clone(&physical_file_schema), + ); + + let simplifier = PhysicalExprSimplifier::new(&physical_file_schema); + + // The adapter rewrites the expressions to the local file schema, allowing + // for schema evolution and divergence between the table's schema and individual files. + let filter = filter + .map(|filter| { + // Expression might now reference columns that don't exist in the file, so we can give it + // another simplification pass. + simplifier.simplify(expr_adapter.rewrite(filter)?) + }) + .transpose()?; + let projection = + projection.try_map_exprs(|p| simplifier.simplify(expr_adapter.rewrite(p)?))?; + + let ProcessedProjection { + scan_projection, + leftover_projection, + } = expr_convertor.split_projection( + projection, + &physical_file_schema, + &projected_physical_schema, + )?; + + let stream_schema = scan_projection + .return_dtype(vxf.dtype()) + .and_then(|dtype| dtype.to_arrow_schema()) + .map_err(|_e| exec_datafusion_err!("oops"))?; + + let leftover_projection = leftover_projection + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + + let projected_physical_schema = leftover_projection.project_schema(&stream_schema)?; + let projector = leftover_projection.make_projector(&stream_schema)?; - // Use the pre-created schema adapter to map logical_file_schema to projected_schema. - // Since logical_file_schema has the same field names as logical_schema (which the adapter - // was created with), this works correctly and gives us the projection indices. - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&physical_file_schema)?; - - // Build the Vortex projection expression using field names from logical_file_schema - let field_names: Vec = adapted_projections - .into_iter() - .map(|index| FieldName::from(physical_file_schema.field(index).name().as_str())) - .collect(); - let projection_expr = select(field_names, root()); // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. let layout_reader = match layout_reader.entry(file.object_meta.location.clone()) { Entry::Occupied(mut occupied_entry) => { @@ -261,10 +263,12 @@ impl FileOpener for VortexOpener { split_conjunction(&f) .into_iter() .cloned() - .partition(|expr| can_be_pushed_down(expr, &physical_file_schema)); + .partition(|expr| { + expr_convertor.can_be_pushed_down(expr, &physical_file_schema) + }); if !unpushed.is_empty() { - return Some(VortexResult::Err(vortex_err!( + return Some(Err(exec_datafusion_err!( r#"VortexSource accepted but failed to push {} filters. This should never happen if you have a properly configured PhysicalExprAdapterFactory configured on the source. @@ -279,8 +283,7 @@ impl FileOpener for VortexOpener { make_vortex_predicate(expr_convertor.as_ref(), &pushed).transpose() }) - .transpose() - .map_err(|e| DataFusionError::External(e.into()))?; + .transpose()?; if let Some(limit) = limit && filter.is_none() @@ -291,21 +294,18 @@ impl FileOpener for VortexOpener { let chunk_session = session.clone(); let stream = scan_builder .with_metrics(metrics) - .with_projection(projection_expr) + .with_projection(scan_projection) .with_some_filter(filter) .with_ordered(has_output_ordering) .map(move |chunk| { if *USE_VORTEX_OPERATORS { - let schema = chunk.dtype().to_arrow_schema()?; - chunk.execute_record_batch(&schema, &chunk_session) + chunk.execute_record_batch(&projected_physical_schema, &chunk_session) } else { RecordBatch::try_from(chunk.as_ref()) } }) .into_stream() - .map_err(|e| { - DataFusionError::Execution(format!("Failed to create Vortex stream: {e}")) - })? + .map_err(|e| exec_datafusion_err!("Failed to create Vortex stream: {e}"))? .map_ok(move |rb| { // We try and slice the stream into respecting datafusion's configured batch size. stream::iter( @@ -328,13 +328,19 @@ impl FileOpener for VortexOpener { ) }) .map_err(move |e: VortexError| { - ArrowError::ExternalError(Box::new(e.with_context(format!( + DataFusionError::External(Box::new(e.with_context(format!( "Failed to read Vortex file: {}", file.object_meta.location )))) }) .try_flatten() - .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) + .map(move |batch| { + if projector.projection().as_ref().is_empty() { + batch + } else { + batch.and_then(|b| projector.project_batch(&b)) + } + }) .boxed(); if let Some(file_pruner) = file_pruner { @@ -390,27 +396,30 @@ mod tests { use datafusion::arrow::util::display::FormatOptions; use datafusion::arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::record_batch; - use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::logical_expr::col; use datafusion::logical_expr::lit; use datafusion::physical_expr::planner::logical2physical; use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion::scalar::ScalarValue; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions as df_expr; + use datafusion_physical_expr::projection::ProjectionExpr; use insta::assert_snapshot; use itertools::Itertools; use object_store::memory::InMemory; use rstest::rstest; use vortex::VortexSessionDefault; use vortex::array::arrow::FromArrowArray; + use vortex::buffer::Buffer; use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; + use vortex::scan::Selection; use vortex::session::VortexSession; use super::*; use crate::VortexAccessPlan; use crate::convert::exprs::DefaultExpressionConvertor; - use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory; static SESSION: LazyLock = LazyLock::new(VortexSession::default); @@ -478,17 +487,15 @@ mod tests { object_store: Arc, table_schema: TableSchema, filter: Option, - expr_adapter_factory: Option>, ) -> VortexOpener { VortexOpener { session: SESSION.clone(), object_store, - projection: Some([0].into()), + projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), filter, file_pruning_predicate: None, - // no adapter - expr_adapter_factory, - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), table_schema, batch_size: 100, @@ -501,10 +508,7 @@ mod tests { } #[tokio::test] - async fn test_open_with_adapter() -> anyhow::Result<()> { - let expr_adapter_factory: Arc = - Arc::new(DF52PhysicalExprAdapterFactory); - + async fn test_open() -> anyhow::Result<()> { let object_store = Arc::new(InMemory::new()) as Arc; let file_path = "part=1/file.vortex"; let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); @@ -524,12 +528,7 @@ mod tests { let filter = col("part").eq(lit(1)); let filter = logical2physical(&filter, table_schema.table_schema()); - let opener = make_opener( - object_store.clone(), - table_schema.clone(), - Some(filter), - Some(expr_adapter_factory.clone()), - ); + let opener = make_opener(object_store.clone(), table_schema.clone(), Some(filter)); let stream = opener.open(file.clone()).unwrap().await.unwrap(); let data = stream.try_collect::>().await?; @@ -542,12 +541,7 @@ mod tests { let filter = col("part").eq(lit(2)); let filter = logical2physical(&filter, table_schema.table_schema()); - let opener = make_opener( - object_store.clone(), - table_schema.clone(), - Some(filter), - Some(expr_adapter_factory), - ); + let opener = make_opener(object_store.clone(), table_schema.clone(), Some(filter)); let stream = opener.open(file.clone()).unwrap().await.unwrap(); let data = stream.try_collect::>().await?; @@ -557,51 +551,10 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_open_without_adapter() -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "part=1/file.vortex"; - let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size = - write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - let file_schema = batch.schema(); - let mut file = PartitionedFile::new(file_path.to_string(), data_size); - file.partition_values = vec![ScalarValue::Int32(Some(1))]; - - let table_schema = TableSchema::new( - file_schema.clone(), - vec![Arc::new(Field::new("part", DataType::Int32, false))], - ); - - // filter matches partition value - let filter = col("part").eq(lit(1)); - let filter = logical2physical(&filter, table_schema.table_schema()); - - let opener = make_opener( - object_store.clone(), - table_schema.clone(), - Some(filter), - None, - ); - - let result = opener.open(file.clone()).unwrap().await; - assert!(result.is_err()); - - Ok(()) - } #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _))] - // If we don't have a physical expr adapter, we just drop filters on partition values. - // This is currently not supported, the work to support it requires to rewrite the predicate with appropriate casts. - // Seems like datafusion is moving towards having DefaultPhysicalExprAdapterFactory be always provided, which would make it work OOTB. - // See: https://github.com/apache/datafusion/issues/16800 - // #[case(None)] #[tokio::test] - async fn test_open_files_different_table_schema( - #[case] expr_adapter_factory: Option>, - ) -> anyhow::Result<()> { + async fn test_open_files_different_table_schema() -> anyhow::Result<()> { let object_store = Arc::new(InMemory::new()) as Arc; let file1 = { @@ -630,11 +583,10 @@ mod tests { let make_opener = |filter| VortexOpener { session: SESSION.clone(), object_store: object_store.clone(), - projection: Some([0].into()), + projection: ProjectionExprs::from_indices(&[0], table_schema.file_schema()), filter: Some(filter), file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), file_cache: VortexFileCache::new(1, 1, SESSION.clone()), table_schema: table_schema.clone(), batch_size: 100, @@ -714,11 +666,10 @@ mod tests { let opener = VortexOpener { session: SESSION.clone(), object_store: object_store.clone(), - projection: Some([0, 1, 2].into()), + projection: ProjectionExprs::from_indices(&[0, 1, 2], &table_schema), filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), file_cache: VortexFileCache::new(1, 1, SESSION.clone()), table_schema: TableSchema::from_file_schema(table_schema.clone()), batch_size: 100, @@ -808,7 +759,6 @@ mod tests { &col("my_struct").is_not_null(), table_schema.table_schema(), )), - Some(Arc::new(DF52PhysicalExprAdapterFactory) as _), ); // The opener should be able to open the file with a filter on the @@ -865,11 +815,13 @@ mod tests { let opener = VortexOpener { session: SESSION.clone(), object_store: object_store.clone(), - projection: Some(projection.into()), + projection: ProjectionExprs::from_indices( + projection.as_ref(), + table_schema.file_schema(), + ), filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), file_cache: VortexFileCache::new(1, 1, SESSION.clone()), table_schema: table_schema.clone(), batch_size: 100, @@ -917,7 +869,7 @@ mod tests { fn make_test_opener( object_store: Arc, schema: SchemaRef, - projection: Option>, + projection: ProjectionExprs, ) -> VortexOpener { VortexOpener { session: SESSION.clone(), @@ -925,8 +877,7 @@ mod tests { projection, filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), file_cache: VortexFileCache::new(1, 1, SESSION.clone()), table_schema: TableSchema::from_file_schema(schema), batch_size: 100, @@ -952,7 +903,7 @@ mod tests { let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - let table_schema = batch.schema(); + let schema = batch.schema(); let mut file = PartitionedFile::new(file_path.to_string(), data_size); file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection( Selection::IncludeByIndex(Buffer::from_iter(vec![1, 3, 5, 7])), @@ -960,8 +911,8 @@ mod tests { let opener = make_test_opener( object_store.clone(), - table_schema.clone(), - Some(vec![0, 1].into()), + schema.clone(), + ProjectionExprs::from_indices(&[0, 1], &schema), ); let stream = opener.open(file)?.await?; @@ -986,10 +937,6 @@ mod tests { #[tokio::test] // Test that Selection::ExcludeByIndex excludes specific row indices. async fn test_selection_exclude_by_index() -> anyhow::Result<()> { - use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - use vortex::buffer::Buffer; - use vortex::scan::Selection; - let object_store = Arc::new(InMemory::new()) as Arc; let file_path = "/path/file.vortex"; @@ -997,7 +944,7 @@ mod tests { let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - let table_schema = batch.schema(); + let schema = batch.schema(); let mut file = PartitionedFile::new(file_path.to_string(), data_size); file.extensions = Some(Arc::new(VortexAccessPlan::default().with_selection( Selection::ExcludeByIndex(Buffer::from_iter(vec![0, 2, 4, 6, 8])), @@ -1005,8 +952,8 @@ mod tests { let opener = make_test_opener( object_store.clone(), - table_schema.clone(), - Some(vec![0, 1].into()), + schema.clone(), + ProjectionExprs::from_indices(&[0, 1], &schema), ); let stream = opener.open(file)?.await?; @@ -1041,7 +988,7 @@ mod tests { let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - let table_schema = batch.schema(); + let schema = batch.schema(); let mut file = PartitionedFile::new(file_path.to_string(), data_size); file.extensions = Some(Arc::new( VortexAccessPlan::default().with_selection(Selection::All), @@ -1049,8 +996,8 @@ mod tests { let opener = make_test_opener( object_store.clone(), - table_schema.clone(), - Some(vec![0].into()), + schema.clone(), + ProjectionExprs::from_indices(&[0], &schema), ); let stream = opener.open(file)?.await?; @@ -1072,14 +1019,14 @@ mod tests { let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - let table_schema = batch.schema(); + let schema = batch.schema(); let file = PartitionedFile::new(file_path.to_string(), data_size); // file.extensions is None by default let opener = make_test_opener( object_store.clone(), - table_schema.clone(), - Some(vec![0].into()), + schema.clone(), + ProjectionExprs::from_indices(&[0], &schema), ); let stream = opener.open(file)?.await?; @@ -1090,4 +1037,74 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_projection_expr_pushdown() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + let batch = record_batch!( + ("a", Int32, vec![Some(1), Some(2), Some(3)]), + ("b", Int32, vec![Some(10), Some(20), Some(30)]) + ) + .unwrap(); + let data_size = + write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; + + let file_schema = batch.schema(); + let table_schema = TableSchema::from_file_schema(file_schema.clone()); + + // Create a projection that includes an arithmetic expression: a + b * 2 + let col_a = df_expr::col("a", &file_schema)?; + let col_b = df_expr::col("b", &file_schema)?; + let two = df_expr::lit(ScalarValue::Int32(Some(2))); + + // b * 2 + let b_times_2 = df_expr::binary(col_b, Operator::Multiply, two, &file_schema)?; + // a + (b * 2) + let a_plus_b_times_2 = df_expr::binary(col_a, Operator::Plus, b_times_2, &file_schema)?; + + let projection = ProjectionExprs::new(vec![ProjectionExpr::new( + a_plus_b_times_2, + "result".to_string(), + )]); + + let opener = VortexOpener { + session: SESSION.clone(), + object_store: object_store.clone(), + projection, + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory), + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + table_schema, + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + expression_convertor: Arc::new(DefaultExpressionConvertor::default()), + }; + + let file = PartitionedFile::new(file_path.to_string(), data_size); + let stream = opener.open(file)?.await?; + let data = stream.try_collect::>().await?; + + // Expected: a + b * 2 + // row 0: 1 + 10 * 2 = 21 + // row 1: 2 + 20 * 2 = 42 + // row 2: 3 + 30 * 2 = 63 + assert_snapshot!(pretty_format_batches_with_options(&data, &FormatOptions::new().with_types_info(true))?.to_string(), @r" + +--------+ + | result | + | Int32 | + +--------+ + | 21 | + | 42 | + | 63 | + +--------+ + "); + + Ok(()) + } } diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index c5cdea8ee7f..1c18d92fb44 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -3,13 +3,12 @@ use std::any::Any; use std::sync::Arc; -use std::sync::atomic::AtomicU64; -use std::sync::atomic::Ordering; use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; +use datafusion_common::exec_datafusion_err; use datafusion_common_runtime::JoinSet; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::file_sink_config::FileSink; @@ -33,6 +32,7 @@ use vortex::dtype::DType; use vortex::dtype::arrow::FromArrowType; use vortex::error::VortexResult; use vortex::file::WriteOptionsSessionExt; +use vortex::file::WriteSummary; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; use vortex::session::VortexSession; @@ -108,16 +108,12 @@ impl FileSink for VortexSink { mut file_stream_rx: DemuxedStreamReceiver, object_store: Arc, ) -> DFResult { - // This is a hack - let row_counter = Arc::new(AtomicU64::new(0)); - - let mut file_write_tasks: JoinSet> = JoinSet::new(); + let mut file_write_tasks: JoinSet> = JoinSet::new(); // TODO(adamg): // 1. We can probably be better at signaling how much memory we're consuming (potentially when reading too), see ParquetSink::spawn_writer_tasks_and_join. while let Some((path, rx)) = file_stream_rx.recv().await { let session = self.session.clone(); - let row_counter = row_counter.clone(); let object_store = object_store.clone(); let writer_schema = get_writer_schema(&self.config); let dtype = DType::from_arrow(writer_schema); @@ -125,41 +121,39 @@ impl FileSink for VortexSink { // We need to spawn work because there's a dependency between the different files. If one file has too many batches buffered, // the demux task might deadlock itself. file_write_tasks.spawn(async move { - let stream = ReceiverStream::new(rx).map(move |rb| { - row_counter.fetch_add(rb.num_rows() as u64, Ordering::Relaxed); - VortexResult::Ok(ArrayRef::from_arrow(rb, false)) - }); + let stream = ReceiverStream::new(rx) + .map(move |rb| VortexResult::Ok(ArrayRef::from_arrow(rb, false))); let stream_adapter = ArrayStreamAdapter::new(dtype, stream); - let mut sink = ObjectStoreWriter::new(object_store.clone(), &path) + let mut object_writer = ObjectStoreWriter::new(object_store, &path) .await - .map_err(|e| { - DataFusionError::Execution(format!( - "Failed to create ObjectStoreWriter: {e}" - )) - })?; + .map_err(|e| exec_datafusion_err!("Failed to create ObjectStoreWriter: {e}"))?; - session + let summary = session .write_options() - .write(&mut sink, stream_adapter) + .write(&mut object_writer, stream_adapter) .await - .map_err(|e| { - DataFusionError::Execution(format!("Failed to write Vortex file: {e}")) - })?; + .map_err(|e| exec_datafusion_err!("Failed to write Vortex file: {e}"))?; - sink.shutdown().await.map_err(|e| { - DataFusionError::Execution(format!("Failed to shutdown Vortex writer: {e}")) - })?; + object_writer + .shutdown() + .await + .map_err(|e| exec_datafusion_err!("Failed to shutdown Vortex writer: {e}"))?; - Ok(path) + Ok((path, summary)) }); } + let mut row_count = 0; + while let Some(result) = file_write_tasks.join_next().await { match result { - Ok(path) => { - let path = path?; + Ok(r) => { + let (path, summary) = r?; + + row_count += summary.row_count(); + tracing::info!(path = %path, "Successfully written file"); } Err(e) => { @@ -177,7 +171,7 @@ impl FileSink for VortexSink { .await .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??; - Ok(row_counter.load(Ordering::SeqCst)) + Ok(row_count) } } diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 584396279fd..cab3ea275d9 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -7,17 +7,15 @@ use std::sync::Arc; use std::sync::Weak; use datafusion_common::Result as DFResult; -use datafusion_common::Statistics; use datafusion_common::config::ConfigOptions; use datafusion_datasource::TableSchema; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::conjunction; -use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use datafusion_physical_expr::projection::ProjectionExprs; +use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; @@ -39,8 +37,6 @@ use super::metrics::PARTITION_LABEL; use super::opener::VortexOpener; use crate::convert::exprs::DefaultExpressionConvertor; use crate::convert::exprs::ExpressionConvertor; -use crate::convert::exprs::can_be_pushed_down; -use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory; /// Execution plan for reading one or more Vortex files, intended to be consumed by [`DataSourceExec`]. /// @@ -49,6 +45,8 @@ use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory; pub struct VortexSource { pub(crate) session: VortexSession, pub(crate) file_cache: VortexFileCache, + pub(crate) table_schema: TableSchema, + pub(crate) projection: ProjectionExprs, /// Combined predicate expression containing all filters from DataFusion query planning. /// Used with FilePruner to skip files based on statistics and partition values. pub(crate) full_predicate: Option, @@ -56,10 +54,6 @@ pub struct VortexSource { /// These are expressions that Vortex can efficiently evaluate during scanning. pub(crate) vortex_predicate: Option, pub(crate) batch_size: Option, - pub(crate) projected_statistics: Option, - pub(crate) table_schema: Option, - pub(crate) schema_adapter_factory: Option>, - pub(crate) expr_adapter_factory: Option>, _unused_df_metrics: ExecutionPlanMetricsSet, /// Shared layout readers, the source only lives as long as one scan. /// @@ -69,17 +63,23 @@ pub struct VortexSource { } impl VortexSource { - pub(crate) fn new(session: VortexSession, file_cache: VortexFileCache) -> Self { + pub(crate) fn new( + table_schema: TableSchema, + session: VortexSession, + file_cache: VortexFileCache, + ) -> Self { + let full_schema = table_schema.table_schema(); + let indices = (0..full_schema.fields().len()).collect::>(); + let projection = ProjectionExprs::from_indices(&indices, full_schema); + Self { session, file_cache, + table_schema, + projection, full_predicate: None, vortex_predicate: None, batch_size: None, - projected_statistics: None, - table_schema: None, - schema_adapter_factory: None, - expr_adapter_factory: None, _unused_df_metrics: Default::default(), layout_readers: Arc::new(DashMap::default()), expression_convertor: Arc::new(DefaultExpressionConvertor::default()), @@ -102,7 +102,7 @@ impl FileSource for VortexSource { object_store: Arc, base_config: &FileScanConfig, partition: usize, - ) -> Arc { + ) -> DFResult> { let partition_metrics = self .session .metrics() @@ -112,51 +112,19 @@ impl FileSource for VortexSource { .batch_size .vortex_expect("batch_size must be supplied to VortexSource"); - let expr_adapter = self + let expr_adapter_factory = base_config .expr_adapter_factory - .as_ref() - .or(base_config.expr_adapter_factory.as_ref()); - - if expr_adapter.is_some() { - tracing::warn!( - "Schema evolution with VortexSource may not work as expected if you override the adapter." - ); - } - - let schema_adapter = self.schema_adapter_factory.as_ref(); - - // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details. - let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) { - (Some(expr_adapter), Some(schema_adapter)) => { - (Some(expr_adapter.clone()), schema_adapter.clone()) - } - (Some(expr_adapter), None) => ( - Some(expr_adapter.clone()), - Arc::new(DefaultSchemaAdapterFactory) as _, - ), - (None, Some(schema_adapter)) => { - // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory` - (None, schema_adapter.clone()) - } - (None, None) => ( - Some(Arc::new(DF52PhysicalExprAdapterFactory) as _), - Arc::new(DefaultSchemaAdapterFactory) as _, - ), - }; - - let projection = base_config.file_column_projection_indices().map(Arc::from); - - let table_schema = base_config.table_schema.clone(); + .clone() + .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory)); let opener = VortexOpener { session: self.session.clone(), object_store, - projection, + projection: self.projection.clone(), filter: self.vortex_predicate.clone(), file_pruning_predicate: self.full_predicate.clone(), expr_adapter_factory, - schema_adapter_factory, - table_schema, + table_schema: self.table_schema.clone(), file_cache: self.file_cache.clone(), batch_size, limit: base_config.limit, @@ -166,7 +134,7 @@ impl FileSource for VortexSource { expression_convertor: Arc::new(DefaultExpressionConvertor::default()), }; - Arc::new(opener) + Ok(Arc::new(opener)) } fn as_any(&self) -> &dyn Any { @@ -179,22 +147,6 @@ impl FileSource for VortexSource { Arc::new(source) } - fn with_schema(&self, schema: TableSchema) -> Arc { - let mut source = self.clone(); - source.table_schema = Some(schema); - Arc::new(source) - } - - fn with_projection(&self, _config: &FileScanConfig) -> Arc { - Arc::new(self.clone()) - } - - fn with_statistics(&self, statistics: Statistics) -> Arc { - let mut source = self.clone(); - source.projected_statistics = Some(statistics); - Arc::new(source) - } - fn filter(&self) -> Option> { self.vortex_predicate.clone() } @@ -203,19 +155,6 @@ impl FileSource for VortexSource { &self._unused_df_metrics } - fn statistics(&self) -> DFResult { - let statistics = self - .projected_statistics - .clone() - .vortex_expect("projected_statistics must be set"); - - if self.vortex_predicate.is_some() { - Ok(statistics.to_inexact()) - } else { - Ok(statistics) - } - } - fn file_type(&self) -> &str { VORTEX_FILE_EXTENSION } @@ -237,6 +176,10 @@ impl FileSource for VortexSource { Ok(()) } + fn supports_repartitioning(&self) -> bool { + true + } + fn try_pushdown_filters( &self, filters: Vec>, @@ -248,12 +191,6 @@ impl FileSource for VortexSource { )); } - let Some(table_schema) = self.table_schema.as_ref() else { - return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; filters.len()], - )); - }; - let mut source = self.clone(); // Combine new filters with existing predicate for file pruning. @@ -268,7 +205,10 @@ impl FileSource for VortexSource { let supported_filters = filters .into_iter() .map(|expr| { - if can_be_pushed_down(&expr, table_schema.file_schema()) { + if self + .expression_convertor + .can_be_pushed_down(&expr, self.table_schema.file_schema()) + { PushedDownPredicate::supported(expr) } else { PushedDownPredicate::unsupported(expr) @@ -309,16 +249,20 @@ impl FileSource for VortexSource { .with_updated_node(Arc::new(source) as _)) } - fn with_schema_adapter_factory( + fn try_pushdown_projection( &self, - factory: Arc, - ) -> DFResult> { + projection: &ProjectionExprs, + ) -> DFResult>> { let mut source = self.clone(); - source.schema_adapter_factory = Some(factory); - Ok(Arc::new(source)) + source.projection = self.projection.try_merge(projection)?; + Ok(Some(Arc::new(source))) + } + + fn projection(&self) -> Option<&ProjectionExprs> { + Some(&self.projection) } - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() + fn table_schema(&self) -> &TableSchema { + &self.table_schema } } diff --git a/vortex-datafusion/src/vendor/mod.rs b/vortex-datafusion/src/vendor/mod.rs deleted file mode 100644 index f673c9cbc5f..00000000000 --- a/vortex-datafusion/src/vendor/mod.rs +++ /dev/null @@ -1,7 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! We are vendoring the physical expression adapter fixes for nested structs -//! that will be released in DF 52. - -pub mod schema_rewriter; diff --git a/vortex-datafusion/src/vendor/schema_rewriter.rs b/vortex-datafusion/src/vendor/schema_rewriter.rs deleted file mode 100644 index f9c5fcdf347..00000000000 --- a/vortex-datafusion/src/vendor/schema_rewriter.rs +++ /dev/null @@ -1,304 +0,0 @@ -// SPDX-FileCopyrightText: 2016-2025 Copyright The Apache Software Foundation -// SPDX-FileCopyrightText: 2025 Copyright the Vortex contributors -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileComment: Derived from upstream file datafusion/physical-expr-adapter/src/schema_rewriter.rs at commit e571b49 at https://github.com/apache/datafusion -// SPDX-FileNotice: https://github.com/apache/datafusion/blob/e571b49e0983892597a8f92e5d1502b17a15b180/NOTICE.txt - -#![allow(missing_docs)] - -//! Physical expression schema rewriting utilities -//! -//! NOTE(aduffy): this is vendored until DF 52 is released, at which point this should -//! all be deleted. - -use std::sync::Arc; - -use datafusion_common::Result; -use datafusion_common::ScalarValue; -use datafusion_common::arrow::compute::can_cast_types; -use datafusion_common::arrow::datatypes::DataType; -use datafusion_common::arrow::datatypes::FieldRef; -use datafusion_common::arrow::datatypes::Schema; -use datafusion_common::arrow::datatypes::SchemaRef; -use datafusion_common::exec_err; -use datafusion_common::nested_struct::validate_struct_compatibility; -use datafusion_common::tree_node::Transformed; -use datafusion_common::tree_node::TransformedResult; -use datafusion_common::tree_node::TreeNode; -use datafusion_functions::core::getfield::GetFieldFunc; -use datafusion_physical_expr::ScalarFunctionExpr; -use datafusion_physical_expr::expressions::CastColumnExpr; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::expressions::{self}; -use datafusion_physical_expr_adapter::PhysicalExprAdapter; -use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; -use datafusion_physical_expr_common::physical_expr::PhysicalExpr; - -#[derive(Debug, Clone)] -pub struct DF52PhysicalExprAdapterFactory; - -impl PhysicalExprAdapterFactory for DF52PhysicalExprAdapterFactory { - fn create( - &self, - logical_file_schema: SchemaRef, - physical_file_schema: SchemaRef, - ) -> Arc { - Arc::new(DF52PhysicalExprAdapter { - logical_file_schema, - physical_file_schema, - partition_values: Vec::new(), - }) - } -} - -#[derive(Debug, Clone)] -pub struct DF52PhysicalExprAdapter { - logical_file_schema: SchemaRef, - physical_file_schema: SchemaRef, - partition_values: Vec<(FieldRef, ScalarValue)>, -} - -impl DF52PhysicalExprAdapter { - /// Create a new instance of the default physical expression adapter. - /// - /// This adapter rewrites expressions to match the physical schema of the file being scanned, - /// handling type mismatches and missing columns by filling them with default values. - pub fn new(logical_file_schema: SchemaRef, physical_file_schema: SchemaRef) -> Self { - Self { - logical_file_schema, - physical_file_schema, - partition_values: Vec::new(), - } - } -} - -impl PhysicalExprAdapter for DF52PhysicalExprAdapter { - fn rewrite(&self, expr: Arc) -> Result> { - let rewriter = DefaultPhysicalExprAdapterRewriter { - logical_file_schema: &self.logical_file_schema, - physical_file_schema: &self.physical_file_schema, - partition_fields: &self.partition_values, - }; - expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr))) - .data() - } - - fn with_partition_values( - &self, - partition_values: Vec<(FieldRef, ScalarValue)>, - ) -> Arc { - Arc::new(DF52PhysicalExprAdapter { - partition_values, - ..self.clone() - }) - } -} - -struct DefaultPhysicalExprAdapterRewriter<'a> { - logical_file_schema: &'a Schema, - physical_file_schema: &'a Schema, - partition_fields: &'a [(FieldRef, ScalarValue)], -} - -impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { - fn rewrite_expr( - &self, - expr: Arc, - ) -> Result>> { - if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? { - return Ok(Transformed::yes(transformed)); - } - - if let Some(column) = expr.as_any().downcast_ref::() { - return self.rewrite_column(Arc::clone(&expr), column); - } - - Ok(Transformed::no(expr)) - } - - /// Attempt to rewrite struct field access expressions to return null if the field does not exist in the physical schema. - /// Note that this does *not* handle nested struct fields, only top-level struct field access. - /// See for more details. - fn try_rewrite_struct_field_access( - &self, - expr: &Arc, - ) -> Result>> { - let get_field_expr = - match ScalarFunctionExpr::try_downcast_func::(expr.as_ref()) { - Some(expr) => expr, - None => return Ok(None), - }; - - let source_expr = match get_field_expr.args().first() { - Some(expr) => expr, - None => return Ok(None), - }; - - let field_name_expr = match get_field_expr.args().get(1) { - Some(expr) => expr, - None => return Ok(None), - }; - - let lit = match field_name_expr - .as_any() - .downcast_ref::() - { - Some(lit) => lit, - None => return Ok(None), - }; - - let field_name = match lit.value().try_as_str().flatten() { - Some(name) => name, - None => return Ok(None), - }; - - let column = match source_expr.as_any().downcast_ref::() { - Some(column) => column, - None => return Ok(None), - }; - - let physical_field = match self.physical_file_schema.field_with_name(column.name()) { - Ok(field) => field, - Err(_) => return Ok(None), - }; - - let physical_struct_fields = match physical_field.data_type() { - DataType::Struct(fields) => fields, - _ => return Ok(None), - }; - - if physical_struct_fields - .iter() - .any(|f| f.name() == field_name) - { - return Ok(None); - } - - let logical_field = match self.logical_file_schema.field_with_name(column.name()) { - Ok(field) => field, - Err(_) => return Ok(None), - }; - - let logical_struct_fields = match logical_field.data_type() { - DataType::Struct(fields) => fields, - _ => return Ok(None), - }; - - let logical_struct_field = match logical_struct_fields - .iter() - .find(|f| f.name() == field_name) - { - Some(field) => field, - None => return Ok(None), - }; - - let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?; - Ok(Some(expressions::lit(null_value))) - } - - fn rewrite_column( - &self, - expr: Arc, - column: &Column, - ) -> Result>> { - // Get the logical field for this column if it exists in the logical schema - let logical_field = match self.logical_file_schema.field_with_name(column.name()) { - Ok(field) => field, - Err(e) => { - // If the column is a partition field, we can use the partition value - if let Some(partition_value) = self.get_partition_value(column.name()) { - return Ok(Transformed::yes(expressions::lit(partition_value))); - } - // This can be hit if a custom rewrite injected a reference to a column that doesn't exist in the logical schema. - // For example, a pre-computed column that is kept only in the physical schema. - // If the column exists in the physical schema, we can still use it. - if let Ok(physical_field) = self.physical_file_schema.field_with_name(column.name()) - { - // If the column exists in the physical schema, we can use it in place of the logical column. - // This is nice to users because if they do a rewrite that results in something like `physical_int32_col = 123u64` - // we'll at least handle the casts for them. - physical_field - } else { - // A completely unknown column that doesn't exist in either schema! - // This should probably never be hit unless something upstream broke, but nonetheless it's better - // for us to return a handleable error than to panic / do something unexpected. - return Err(e.into()); - } - } - }; - - // Check if the column exists in the physical schema - let physical_column_index = match self.physical_file_schema.index_of(column.name()) { - Ok(index) => index, - Err(_) => { - if !logical_field.is_nullable() { - return exec_err!( - "Non-nullable column '{}' is missing from the physical schema", - column.name() - ); - } - // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. - // TODO: do we need to sync this with what the `SchemaAdapter` actually does? - // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! - // See https://github.com/apache/datafusion/issues/16527 - let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; - return Ok(Transformed::yes(expressions::lit(null_value))); - } - }; - let physical_field = self.physical_file_schema.field(physical_column_index); - - let column = match ( - column.index() == physical_column_index, - logical_field.data_type() == physical_field.data_type(), - ) { - // If the column index matches and the data types match, we can use the column as is - (true, true) => return Ok(Transformed::no(expr)), - // If the indexes or data types do not match, we need to create a new column expression - (true, _) => column.clone(), - (false, _) => Column::new_with_schema(logical_field.name(), self.physical_file_schema)?, - }; - - if logical_field.data_type() == physical_field.data_type() { - // If the data types match, we can use the column as is - return Ok(Transformed::yes(Arc::new(column))); - } - - // We need to cast the column to the logical data type - // TODO: add optimization to move the cast from the column to literal expressions in the case of `col = 123` - // since that's much cheaper to evalaute. - // See https://github.com/apache/datafusion/issues/15780#issuecomment-2824716928 - match (physical_field.data_type(), logical_field.data_type()) { - (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => { - validate_struct_compatibility(physical_fields, logical_fields)?; - } - _ => { - let is_compatible = - can_cast_types(physical_field.data_type(), logical_field.data_type()); - if !is_compatible { - return exec_err!( - "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", - column.name(), - physical_field.data_type(), - logical_field.data_type() - ); - } - } - } - - let cast_expr = Arc::new(CastColumnExpr::new( - Arc::new(column), - Arc::new(physical_field.clone()), - Arc::new(logical_field.clone()), - None, - )); - - Ok(Transformed::yes(cast_expr)) - } - - fn get_partition_value(&self, column_name: &str) -> Option { - self.partition_fields - .iter() - .find(|(field, _)| field.name() == column_name) - .map(|(_, value)| value.clone()) - } -}