diff --git a/Cargo.lock b/Cargo.lock index 5c6e2b009f1..e62a4e38f38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,18 @@ dependencies = [ "regex-syntax", ] +[[package]] +name = "as-slice" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45403b49e3954a4b8428a0ac21a4b7afadccf92bfd96273f1a58cd4812496ae0" +dependencies = [ + "generic-array 0.12.4", + "generic-array 0.13.3", + "generic-array 0.14.8", + "stable_deref_trait", +] + [[package]] name = "async-channel" version = "2.5.0" @@ -594,6 +606,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-polyfill" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cf2bce30dfe09ef0bfaef228b9d414faaf7e563035494d7fe092dba54b300f4" +dependencies = [ + "critical-section", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -954,6 +975,58 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59446ce19cd142f8833f856eb31f3eb097812d1479ab224f54d72428ca21ea22" +dependencies = [ + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backon" version = "1.5.2" @@ -1070,6 +1143,26 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bincode" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" +dependencies = [ + "bincode_derive", + "serde", + "unty", +] + +[[package]] +name = "bincode_derive" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" +dependencies = [ + "virtue", +] + [[package]] name = "bindgen" version = "0.72.1" @@ -1151,7 +1244,7 @@ version = "0.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" dependencies = [ - "generic-array", + "generic-array 0.14.8", ] [[package]] @@ -1160,7 +1253,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8894febbff9f758034a5b8e12d87918f56dfc64a8e1fe757d65e29041538d93" dependencies = [ - "generic-array", + "generic-array 0.14.8", ] [[package]] @@ -1479,7 +1572,7 @@ checksum = "af491d569909a7e4dee0ad7db7f5341fef5c614d5b8ec8cf765732aba3cff681" dependencies = [ "serde", "termcolor", - "unicode-width 0.1.14", + "unicode-width 0.2.0", ] [[package]] @@ -1555,7 +1648,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1785,6 +1878,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "critical-section" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "790eea4361631c5e7d22598ecd5723ff611904e3344ce8720784c93e3d83d40b" + [[package]] name = "crossbeam-channel" version = "0.5.15" @@ -1883,7 +1982,7 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ - "generic-array", + "generic-array 0.14.8", "typenum", ] @@ -2801,7 +2900,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -2862,6 +2961,16 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "earcutr" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79127ed59a85d7687c409e9978547cffb7dc79675355ed22da6b66fd5f6ead01" +dependencies = [ + "itertools 0.11.0", + "num-traits", +] + [[package]] name = "either" version = "1.15.0" @@ -2970,7 +3079,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -3045,6 +3154,19 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8eb564c5c7423d25c886fb561d1e4ee69f72354d16918afa32c08811f6b6a55" +[[package]] +name = "fastbloom" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18c1ddb9231d8554c2d6bdf4cfaabf0c59251658c68b6c95cd52dd0c513a912a" +dependencies = [ + "getrandom 0.3.4", + "libm", + "rand 0.9.2", + "serde", + "siphasher", +] + [[package]] name = "fastdivide" version = "0.4.2" @@ -3128,6 +3250,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float_eq" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a80e3145d8ad11ba0995949bbcf48b9df2be62772b3d351ef017dff6ecb853" + +[[package]] +name = "float_next_after" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8" + [[package]] name = "fnv" version = "1.0.7" @@ -3333,6 +3467,24 @@ dependencies = [ "windows", ] +[[package]] +name = "generic-array" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffdf9f34f1447443d37393cc6c2b8313aebddcd96906caf34e54c68d8e57d7bd" +dependencies = [ + "typenum", +] + +[[package]] +name = "generic-array" +version = "0.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f797e67af32588215eaaab8327027ee8e71b9dd0b2b26996aedf20c030fce309" +dependencies = [ + "typenum", +] + [[package]] name = "generic-array" version = "0.14.8" @@ -3343,6 +3495,77 @@ dependencies = [ "version_check", ] +[[package]] +name = "geo" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fc1a1678e54befc9b4bcab6cd43b8e7f834ae8ea121118b0fd8c42747675b4a" +dependencies = [ + "earcutr", + "float_next_after", + "geo-types", + "geographiclib-rs", + "i_overlay", + "log", + "num-traits", + "robust", + "rstar 0.12.2", + "spade", +] + +[[package]] +name = "geo-types" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24f8647af4005fa11da47cd56252c6ef030be8fa97bdbf355e7dfb6348f0a82c" +dependencies = [ + "approx", + "num-traits", + "rayon", + "rstar 0.10.0", + "rstar 0.11.0", + "rstar 0.12.2", + "rstar 0.8.4", + "rstar 0.9.3", + "serde", +] + +[[package]] +name = "geographiclib-rs" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f611040a2bb37eaa29a78a128d1e92a378a03e0b6e66ae27398d42b1ba9a7841" +dependencies = [ + "libm", +] + +[[package]] +name = "geojson" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e26f3c45b36fccc9cf2805e61d4da6bc4bbd5a3a9589b01afa3a40eff703bd79" +dependencies = [ + "log", + "serde", + "serde_json", + "thiserror 2.0.17", +] + +[[package]] +name = "geozero" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5f28f34864745eb2f123c990c6ffd92c1584bd39439b3f27ff2a0f4ea5b309b" +dependencies = [ + "geo-types", + "geojson", + "log", + "scroll", + "serde_json", + "thiserror 1.0.69", + "wkt", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -3425,6 +3648,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3o" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a78209b5ec22ba72456c935d8822a035d5f36f50f5bd5b289640dc93afa4965" +dependencies = [ + "ahash", + "either", + "float_eq", + "geo", + "h3o-bit", + "libm", + "serde", + "serde_repr", +] + +[[package]] +name = "h3o-bit" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b42eb4efef1f96510ae1a33b2682562a677d504641e9903a77bf5c666b9013e" + [[package]] name = "half" version = "2.7.1" @@ -3445,6 +3690,33 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ed72152641d513a6084a3907bfcba3f35ae2d3335c22ce2242969c25ff8e46" +[[package]] +name = "hash32" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4041af86e63ac4298ce40e5cca669066e75b6f1aa3390fe2561ffa5e1d9f4cc" +dependencies = [ + "byteorder", +] + +[[package]] +name = "hash32" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c35f58762feb77d74ebe43bdbc3210f09be9fe6742234d573bacc26ed92b67" +dependencies = [ + "byteorder", +] + +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + [[package]] name = "hashbrown" version = "0.14.5" @@ -3477,6 +3749,41 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "heapless" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "634bd4d29cbf24424d0a4bfcbf80c6960129dc24424752a7d1d1390607023422" +dependencies = [ + "as-slice", + "generic-array 0.14.8", + "hash32 0.1.1", + "stable_deref_trait", +] + +[[package]] +name = "heapless" +version = "0.7.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdc6457c0eb62c71aac4bc17216026d8410337c4126773b9c5daba343f17964f" +dependencies = [ + "atomic-polyfill", + "hash32 0.2.1", + "rustc_version", + "spin", + "stable_deref_trait", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32 0.3.1", + "stable_deref_trait", +] + [[package]] name = "heck" version = "0.5.0" @@ -3581,6 +3888,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "humansize" version = "2.1.3" @@ -3610,6 +3923,7 @@ dependencies = [ "http 1.3.1", "http-body 1.0.1", "httparse", + "httpdate", "itoa", "pin-project-lite", "pin-utils", @@ -3671,6 +3985,49 @@ dependencies = [ "serde", ] +[[package]] +name = "i_float" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "010025c2c532c8d82e42d0b8bb5184afa449fa6f06c709ea9adcb16c49ae405b" +dependencies = [ + "libm", +] + +[[package]] +name = "i_key_sort" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9190f86706ca38ac8add223b2aed8b1330002b5cdbbce28fb58b10914d38fc27" + +[[package]] +name = "i_overlay" +version = "4.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcccbd4e4274e0f80697f5fbc6540fdac533cce02f2081b328e68629cce24f9" +dependencies = [ + "i_float", + "i_key_sort", + "i_shape", + "i_tree", + "rayon", +] + +[[package]] +name = "i_shape" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ea154b742f7d43dae2897fcd5ead86bc7b5eefcedd305a7ebf9f69d44d61082" +dependencies = [ + "i_float", +] + +[[package]] +name = "i_tree" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35e6d558e6d4c7b82bc51d9c771e7a927862a161a7d87bf2b0541450e0e20915" + [[package]] name = "iana-time-zone" version = "0.1.64" @@ -3835,7 +4192,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" dependencies = [ "equivalent", - "hashbrown 0.15.5", + "hashbrown 0.16.1", ] [[package]] @@ -3865,7 +4222,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "879f10e63c20629ecabbb64a8010319738c66a5cd0c29b02d63d272b03751d01" dependencies = [ "block-padding", - "generic-array", + "generic-array 0.14.8", ] [[package]] @@ -3929,6 +4286,15 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" +[[package]] +name = "itertools" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c173a5686ce8bfa551b3563d0c2170bf24ca44da99c7ca4bfdab5418c3fe57" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.12.1" @@ -3983,7 +4349,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -4861,6 +5227,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "matrixmultiply" version = "0.3.10" @@ -5188,7 +5560,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -5722,6 +6094,12 @@ dependencies = [ "rand_xoshiro 0.6.0", ] +[[package]] +name = "pdqselect" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec91767ecc0a0bbe558ce8c9da33c068066c57ecc8bb8477ef8c1ad3ef77c27" + [[package]] name = "pem" version = "3.0.6" @@ -6265,7 +6643,7 @@ dependencies = [ "once_cell", "socket2", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -6627,6 +7005,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "robust" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e27ee8bb91ca0adcf0ecb116293afa12d393f9c2b9b9cd54d33e8078fe19839" + [[package]] name = "rsa" version = "0.9.8" @@ -6648,6 +7032,67 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rstar" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a45c0e8804d37e4d97e55c6f258bc9ad9c5ee7b07437009dd152d764949a27c" +dependencies = [ + "heapless 0.6.1", + "num-traits", + "pdqselect", + "serde", + "smallvec", +] + +[[package]] +name = "rstar" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b40f1bfe5acdab44bc63e6699c28b74f75ec43afb59f3eda01e145aff86a25fa" +dependencies = [ + "heapless 0.7.17", + "num-traits", + "serde", + "smallvec", +] + +[[package]] +name = "rstar" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f39465655a1e3d8ae79c6d9e007f4953bfc5d55297602df9dc38f9ae9f1359a" +dependencies = [ + "heapless 0.7.17", + "num-traits", + "serde", + "smallvec", +] + +[[package]] +name = "rstar" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73111312eb7a2287d229f06c00ff35b51ddee180f017ab6dec1f69d62ac098d6" +dependencies = [ + "heapless 0.7.17", + "num-traits", + "serde", + "smallvec", +] + +[[package]] +name = "rstar" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "421400d13ccfd26dfa5858199c30a5d76f9c54e0dba7575273025b43c5175dbb" +dependencies = [ + "heapless 0.8.0", + "num-traits", + "serde", + "smallvec", +] + [[package]] name = "rstest" version = "0.26.1" @@ -6739,7 +7184,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6752,7 +7197,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6870,6 +7315,12 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d68f2ec51b097e4c1a75b681a8bec621909b5e91f15bb7b840c4f2f7b01148b2" +[[package]] +name = "scroll" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04c565b551bafbef4157586fa379538366e4385d42082f255bfd96e4fe8519da" + [[package]] name = "scrypt" version = "0.11.0" @@ -6969,6 +7420,17 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -7156,6 +7618,9 @@ name = "siphasher" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +dependencies = [ + "serde", +] [[package]] name = "sketches-ddsketch" @@ -7241,11 +7706,26 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "spade" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb313e1c8afee5b5647e00ee0fe6855e3d529eb863a0fdae1d60006c4d1e9990" +dependencies = [ + "hashbrown 0.15.5", + "num-traits", + "robust", + "smallvec", +] + [[package]] name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" @@ -7663,7 +8143,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -8004,6 +8484,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -8058,6 +8539,7 @@ version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d15d90a0b5c19378952d479dc858407149d7bb45a14de0142f6c534b16fc647" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -8234,6 +8716,12 @@ version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" +[[package]] +name = "unty" +version = "0.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" + [[package]] name = "url" version = "2.5.7" @@ -8294,18 +8782,30 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" +[[package]] +name = "virtue" +version = "0.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" + [[package]] name = "vortex" version = "0.1.0" dependencies = [ "anyhow", "arrow-array", + "axum", "codspeed-divan-compat", "fastlanes", + "futures", + "geo", + "geo-types", + "geozero", "itertools 0.14.0", "mimalloc", "parquet", "rand 0.9.2", + "serde", "serde_json", "tokio", "tracing", @@ -8388,6 +8888,9 @@ dependencies = [ "enum-map", "flatbuffers", "futures", + "geo", + "geo-types", + "geozero", "getrandom 0.3.4", "goldenfile", "humansize", @@ -8897,8 +9400,14 @@ dependencies = [ "arrow-buffer", "async-stream", "async-trait", + "bincode", + "fastbloom", "flatbuffers", "futures", + "geo", + "geo-types", + "geozero", + "h3o", "itertools 0.14.0", "kanal", "log", @@ -8912,6 +9421,7 @@ dependencies = [ "prost 0.14.1", "rstest", "rustc-hash", + "serde", "termtree", "tokio", "uuid", @@ -9356,7 +9866,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -9752,6 +10262,18 @@ dependencies = [ "serde-value", ] +[[package]] +name = "wkt" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54f7f1ff4ea4c18936d6cd26a6fd24f0003af37e951a8e0e8b9e9a2d0bd0a46d" +dependencies = [ + "geo-types", + "log", + "num-traits", + "thiserror 1.0.69", +] + [[package]] name = "writeable" version = "0.6.1" diff --git a/Cargo.toml b/Cargo.toml index 7a676a4006e..202a40bdd9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,6 +75,7 @@ async-compat = "0.2.5" async-fs = "2.2.0" async-stream = "0.3.6" async-trait = "0.1.89" +bincode = "2" bindgen = "0.72.0" bit-vec = "0.8.0" bitvec = "1.0.1" @@ -116,13 +117,18 @@ dyn-hash = "1.0.0" enum-iterator = "2.0.0" enum-map = "2.7.3" erased-serde = "0.4" +fastbloom = "0.14.0" fastlanes = "0.5" flatbuffers = "25.2.10" fsst-rs = "0.5.5" futures = { version = "0.3.31", default-features = false } fuzzy-matcher = "0.3" +geo = "0.31.0" +geo-types = "0.7.18" +geozero = "0.14.0" glob = "0.3.2" goldenfile = "1" +h3o = "0.9.3" half = { version = "2.6", features = ["std", "num-traits"] } hashbrown = "0.16.0" humansize = "2.1.3" diff --git a/encodings/zstd/src/array.rs b/encodings/zstd/src/array.rs index aba5f711c02..c6d0d475941 100644 --- a/encodings/zstd/src/array.rs +++ b/encodings/zstd/src/array.rs @@ -306,6 +306,8 @@ impl ZstdArray { (Some(ByteBuffer::from(dict)), compressor) }; + // TODO(aduffy): dictionary training + let mut frame_metas = vec![]; let mut frames = vec![]; for i in 0..n_frames { diff --git a/presentation/adam.png b/presentation/adam.png new file mode 100644 index 00000000000..8b46fbc497c Binary files /dev/null and b/presentation/adam.png differ diff --git a/presentation/blog.png b/presentation/blog.png new file mode 100644 index 00000000000..aaeac183fb4 Binary files /dev/null and b/presentation/blog.png differ diff --git a/presentation/geozero.png b/presentation/geozero.png new file mode 100644 index 00000000000..1c9b4ac97d9 Binary files /dev/null and b/presentation/geozero.png differ diff --git a/presentation/h3.png b/presentation/h3.png new file mode 100644 index 00000000000..63d6b567257 Binary files /dev/null and b/presentation/h3.png differ diff --git a/presentation/hackweek.md b/presentation/hackweek.md new file mode 100644 index 00000000000..dd5d70b1786 --- /dev/null +++ b/presentation/hackweek.md @@ -0,0 +1,131 @@ +--- +title: Putting Vortex on the Map +sub_title: adding geospatial features to Vortex (again) +author: Andrew Duffy +--- + +# Background + +* Vortex puts pluggability front and center + * Layouts + * Encodings + * Expressions +* It would be fun to apply Vortex to new domains that we + don't have a lot of prebuilt demos for +* I like mapping data + + + + + +# Background: Spatial data + +*all of this is about vector data, not raster + +* Encode _geometry_ alongside some set of _properties_ +* Query patterns: filtered scans (`ST_Contains`, `ST_Intersects`), aggregates +* Different encoding schems, GeoParquet uses WKB to be compact + +![wkb](wkb.png) + +* Lean on Rust geo ecosystem + +![geozero](geozero.png) + + + + +# Background: The Dataset + +* Microsoft OpenBuildings + +![blog](blog.png) + +* Global coverage of the world + + + + +# Plan: Indexing + +* Implement a new `GeoLayout`, like ZonedLayout but for spatial indexing + +* For each 8K row chunk, build a bloom filter of *H3 Cell IDs* + +![h3](h3.png) + +* Can treat cell IDs like `u64` and insert into bloom filter + + + +# Plan: Layouts + + +* Implement an `ST_Contains` expression, implement pruning for it in our new layout +* Implement new strategy to write compact files with the custom index structure + +```rust +/// Make a strategy which has special handling for DType::Binary chunks named "geometry". +fn make_rtree_strategy() -> Arc { + let validity = Arc::new(FlatLayoutStrategy::default()); + let fallback = WriteStrategyBuilder::new() + .with_compressor(CompactCompressor::default()) + .build(); + + // override the handling of the "geometry" column + let leaf_writers = HashMap::from_iter([( + FieldPath::from_name(FieldName::from("geometry")), + geometry_writer(), + )]); + + Arc::new(PathStrategy::new(leaf_writers, validity, fallback)) +} +``` + +* (Bonus: built new strategy that allows you to override the writer for leaf columns by fieldpath) + + + +# How'd we do + +Source GeoParquet file: + +> 1.2 GB + +Vortex File with H3 Bloom Filter Index: + +> 0.9 GB! + +**~22% smaller!** + + + +## Follow up work: + +* Improve write performance +* Add BBOX for coarse filtering +* Implement more geospatial functions with pushdown + * `to_geojson(col("geom"))` + * `ST_Area`, `ST_Intersect`, see [geodatafusion](https://github.com/datafusion-contrib/geodatafusion) +* Integrate into SedonaDB + +![sedonadb](sedonadb.png) + + + +# What People are Saying + +![pmarca](pmarca.png) + + + +![roon](roon.png) + + + +![adam](adam.png) + + + + +# Demo diff --git a/presentation/pmarca.png b/presentation/pmarca.png new file mode 100644 index 00000000000..245209b2a74 Binary files /dev/null and b/presentation/pmarca.png differ diff --git a/presentation/qgis_fail.png b/presentation/qgis_fail.png new file mode 100644 index 00000000000..2e4e121b477 Binary files /dev/null and b/presentation/qgis_fail.png differ diff --git a/presentation/roon.png b/presentation/roon.png new file mode 100644 index 00000000000..34515fd1883 Binary files /dev/null and b/presentation/roon.png differ diff --git a/presentation/sedonadb.png b/presentation/sedonadb.png new file mode 100644 index 00000000000..9a84ddad0cf Binary files /dev/null and b/presentation/sedonadb.png differ diff --git a/presentation/trump.png b/presentation/trump.png new file mode 100644 index 00000000000..f983aa4ae92 Binary files /dev/null and b/presentation/trump.png differ diff --git a/presentation/wkb.png b/presentation/wkb.png new file mode 100644 index 00000000000..db342130dae Binary files /dev/null and b/presentation/wkb.png differ diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index ae29d06ae81..68c06234387 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -37,6 +37,9 @@ enum-iterator = { workspace = true } enum-map = { workspace = true } flatbuffers = { workspace = true } futures = { workspace = true, features = ["alloc", "async-await", "std"] } +geo = { workspace = true } +geo-types = { workspace = true } +geozero = { workspace = true, features = ["with-wkb"] } getrandom_v03 = { workspace = true } goldenfile = { workspace = true, optional = true } humansize = { workspace = true } diff --git a/vortex-array/src/expr/exprs/geospatial/mod.rs b/vortex-array/src/expr/exprs/geospatial/mod.rs new file mode 100644 index 00000000000..97de9765ba2 --- /dev/null +++ b/vortex-array/src/expr/exprs/geospatial/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod st_contains; diff --git a/vortex-array/src/expr/exprs/geospatial/st_contains.rs b/vortex-array/src/expr/exprs/geospatial/st_contains.rs new file mode 100644 index 00000000000..b8dbf3d2578 --- /dev/null +++ b/vortex-array/src/expr/exprs/geospatial/st_contains.rs @@ -0,0 +1,214 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! An implementation of an ST_Contains expression type. + +// use crate::expr::functions::{ArgName, Arity, EmptyOptions, ExecutionArgs, FunctionId, VTable}; +use std::fmt::Formatter; + +use geo::Centroid; +use geo::Contains; +use geo_types::Geometry; +use geozero::GeozeroGeometry; +use geozero::geo_types::GeoWriter; +use geozero::wkb; +use vortex_buffer::BitBuffer; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; +use vortex_scalar::Scalar; + +use crate::Array; +use crate::ArrayRef; +use crate::IntoArray; +use crate::ToCanonical; +use crate::accessor::ArrayAccessor; +use crate::arrays::BoolArray; +use crate::arrays::ConstantArray; +use crate::expr::ChildName; +use crate::expr::ExprId; +use crate::expr::ExpressionView; +use crate::expr::Literal; +use crate::expr::VTable; +use crate::expr::traversal::Node; +use crate::vtable::ValidityHelper; + +pub struct STContains; + +impl VTable for STContains { + type Instance = (); + + fn id(&self) -> ExprId { + ExprId::from("vortex.geo.contains") + } + + fn validate(&self, expr: &ExpressionView) -> VortexResult<()> { + vortex_ensure!( + expr.children_count() == 2, + "ST_Contains expression must have exactly 2 children" + ); + + let _lhs = &expr.children()[0]; + let _rhs = &expr.children()[1]; + + // TODO(aduffy): do other checks on the lhs/rhs + + Ok(()) + } + + fn child_name(&self, _instance: &Self::Instance, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::new_ref("geom_a"), + 1 => ChildName::new_ref("geom_b"), + _ => unreachable!("child_name called with invalid child_idx"), + } + } + + fn fmt_sql(&self, expr: &ExpressionView, f: &mut Formatter<'_>) -> std::fmt::Result { + let lhs = &expr.children()[0]; + let rhs = &expr.children()[1]; + + write!(f, "ST_CONTAINS(")?; + lhs.fmt_sql(f)?; + write!(f, ", ")?; + rhs.fmt_sql(f)?; + write!(f, ")") + } + + fn return_dtype(&self, expr: &ExpressionView, scope: &DType) -> VortexResult { + let lhs = &expr.children()[0]; + let rhs = &expr.children()[1]; + let nullability = + lhs.return_dtype(scope)?.nullability() | rhs.return_dtype(scope)?.nullability(); + Ok(DType::Bool(nullability)) + } + + fn evaluate(&self, expr: &ExpressionView, scope: &ArrayRef) -> VortexResult { + let lhs = &expr.children()[0]; + let rhs = &expr.children()[1]; + + match (lhs.as_opt::(), rhs.as_opt::()) { + (Some(l), Some(r)) => { + // Both are literals + let len = scope.len(); + + let l_v = l.data().as_binary().value(); + let r_v = r.data().as_binary().value(); + let constant = match (l_v, r_v) { + (Some(wkb_l), Some(wkb_r)) => { + let geom_l = parse_wkb(&wkb_l); + let geom_r = parse_wkb(&wkb_r); + Scalar::bool(geom_l.contains(&geom_r), Nullability::NonNullable) + } + _ => Scalar::null(DType::Bool(Nullability::Nullable)), + }; + + Ok(ConstantArray::new(constant, len).into_array()) + } + (Some(l), None) => { + // lhs is literal, rhs is an array that we need to iterate over. + let rhs = rhs.evaluate(scope)?; + let len = rhs.len(); + + let Some(wkb_l) = l.data().as_binary().value() else { + return Ok(ConstantArray::new( + Scalar::null(DType::Bool(Nullability::Nullable)), + len, + ) + .into_array()); + }; + + let geom_l = parse_wkb(&wkb_l); + + let rhs = rhs.to_varbinview(); + let validity = rhs.validity().clone(); + + rhs.with_iterator(|iter| { + let matches = iter + .map(|rhs_value| match rhs_value { + None => false, + Some(wkb_r) => { + let geom_r = parse_wkb(wkb_r); + // Get centroid of the geometry + let _centroid = geom_r.centroid(); + geom_l.contains(&geom_r) + } + }) + .collect::(); + + Ok(BoolArray::from_bit_buffer(matches, validity).into_array()) + }) + } + (None, Some(r)) => { + // rhs is literal, lhs is an array that we need to iterate over + let lhs = lhs.evaluate(scope)?; + let len = lhs.len(); + + let Some(wkb_r) = r.data().as_binary().value() else { + return Ok(ConstantArray::new( + Scalar::null(DType::Bool(Nullability::Nullable)), + len, + ) + .into_array()); + }; + + let geom_r = parse_wkb(&wkb_r); + + let lhs = lhs.to_varbinview(); + let validity = lhs.validity().clone(); + lhs.with_iterator(|iter| { + let matches = iter + .map(|v| match v { + None => false, + Some(wkb_l) => { + let geom_l = parse_wkb(wkb_l); + geom_l.contains(&geom_r) + } + }) + .collect::(); + + Ok(BoolArray::from_bit_buffer(matches, validity).into_array()) + }) + } + (None, None) => { + // lhs and rhs are both arrays, we need to zip/iterate them both. + let lhs = lhs.evaluate(scope)?.to_varbinview(); + let rhs = rhs.evaluate(scope)?.to_varbinview(); + + // And the validities together. + let validity = lhs.validity().clone().and(rhs.validity().clone()); + + let len = rhs.len(); + + // TODO(aduffy): hoist validity checking + let matches = BitBuffer::collect_bool(len, |index| { + if lhs.is_invalid(index) || rhs.is_invalid(index) { + return false; + } + + let l_v = lhs.bytes_at(index); + let r_v = rhs.bytes_at(index); + + let geom_l = parse_wkb(&l_v); + let geom_r = parse_wkb(&r_v); + + geom_l.contains(&geom_r) + }); + + Ok(BoolArray::from_bit_buffer(matches, validity).into_array()) + } + } + } +} + +fn parse_wkb(wkb: &[u8]) -> Geometry { + let mut writer = GeoWriter::new(); + wkb::Wkb(wkb) + .process_geom(&mut writer) + .expect("wkb parsing left"); + writer.take_geometry().expect("wkb should yield geometry") +} diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index 7965311aae2..d0b30a7a72c 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -5,6 +5,7 @@ pub(crate) mod between; pub(crate) mod binary; pub(crate) mod cast; pub(crate) mod dynamic; +pub(crate) mod geospatial; pub(crate) mod get_item; pub(crate) mod is_null; pub(crate) mod like; @@ -22,6 +23,7 @@ pub use between::*; pub use binary::*; pub use cast::*; pub use dynamic::*; +pub use geospatial::*; pub use get_item::*; pub use is_null::*; pub use like::*; diff --git a/vortex-array/src/expr/exprs/scalar_fn.rs b/vortex-array/src/expr/exprs/scalar_fn.rs index 20d898c8530..3575c5007a7 100644 --- a/vortex-array/src/expr/exprs/scalar_fn.rs +++ b/vortex-array/src/expr/exprs/scalar_fn.rs @@ -119,15 +119,15 @@ impl VTable for ScalarFnExpr { }) } - fn stat_falsification( - &self, - _expr: &ExpressionView, - _catalog: &dyn StatsCatalog, - ) -> Option { - // TODO(ngates): ideally this is implemented as optimizer rules over a `falsify` and - // `verify` expressions. - todo!() - } + // fn stat_falsification( + // &self, + // _expr: &ExpressionView, + // _catalog: &dyn StatsCatalog, + // ) -> Option { + // // TODO(ngates): ideally this is implemented as optimizer rules over a `falsify` and + // // `verify` expressions. + // todo!() + // } fn stat_expression( &self, diff --git a/vortex-array/src/scalar_fns/geospatial/mod.rs b/vortex-array/src/scalar_fns/geospatial/mod.rs new file mode 100644 index 00000000000..4f3563cc2a5 --- /dev/null +++ b/vortex-array/src/scalar_fns/geospatial/mod.rs @@ -0,0 +1,178 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! An implementation of an ST_Contains expression type as a ScalarFn. +//! +//! The Vectors don't seem to be complete enough to use ScalarFn for non-trivial things +//! so for now this is unused. + +use std::ops::BitAnd; + +use geo::Contains; +use geo_types::Geometry; +use geozero::GeozeroGeometry; +use geozero::geo_types::GeoWriter; +use geozero::wkb; +use vortex_buffer::BitBuffer; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_vector::Datum; +use vortex_vector::Scalar; +use vortex_vector::Vector; +use vortex_vector::VectorOps; +use vortex_vector::bool::BoolScalar; +use vortex_vector::bool::BoolVector; + +use crate::expr::functions::ArgName; +use crate::expr::functions::Arity; +use crate::expr::functions::EmptyOptions; +use crate::expr::functions::ExecutionArgs; +use crate::expr::functions::FunctionId; +use crate::expr::functions::VTable; + +pub struct STContains; + +impl VTable for STContains { + type Options = EmptyOptions; + + fn id(&self) -> FunctionId { + FunctionId::from("vortex.geo.contains") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(2) + } + + fn arg_name(&self, _options: &Self::Options, arg_idx: usize) -> ArgName { + match arg_idx { + 0 => ArgName::new_ref("GeomA"), + 1 => ArgName::new_ref("GeomB"), + _ => unreachable!("ST_Contains must be called with exactly 2 arguments"), + } + } + + fn return_dtype(&self, _options: &Self::Options, arg_types: &[DType]) -> VortexResult { + // The result is a Bool column with the nullability of its arguments + let result_nullability = arg_types[0].nullability() | arg_types[1].nullability(); + Ok(DType::Bool(result_nullability)) + } + + fn execute(&self, _options: &Self::Options, args: &ExecutionArgs) -> VortexResult { + // Force each element to perform a datum operation here. + // The inner option must be a Literal + let geoma = args.input_datums(0); + let geomb = args.input_datums(1); + + vortex_ensure!( + args.input_type(0).is_binary() && args.input_type(1).is_binary(), + "Arguments to ST_Contains must be binary" + ); + + // If we have two values, compare them. + match (geoma, geomb) { + (Datum::Scalar(geoma_scalar), Datum::Scalar(geomb_scalar)) => { + let geoma = geoma_scalar + .as_binary() + .value() + .ok_or_else(|| vortex_err!("literal argument to ST_Contains cannot be NULL"))?; + + let geomb = geomb_scalar + .as_binary() + .value() + .ok_or_else(|| vortex_err!("literal argument to ST_Contains cannot be NULL"))?; + + let contain = parse_wkb(&geoma).contains(&parse_wkb(&geomb)); + + Ok(Datum::Scalar(Scalar::Bool(BoolScalar::new(Some(contain))))) + } + (Datum::Scalar(geoma_scalar), Datum::Vector(geomb_vector)) => { + let geoma = geoma_scalar + .as_binary() + .value() + .ok_or_else(|| vortex_err!("literal argument to ST_Contains cannot be NULL"))?; + + let geoma = parse_wkb(&geoma); + + let geomb_bin = geomb_vector.as_binary(); + let matches = BitBuffer::collect_bool(geomb_bin.len(), |index| { + match geomb_bin.get_ref(index) { + None => false, + Some(geomb_buf) => { + let geomb = parse_wkb(geomb_buf); + geoma.contains(&geomb) + } + } + }); + Ok(Datum::Vector(Vector::Bool(BoolVector::new( + matches, + geomb_vector.validity().clone(), + )))) + } + (Datum::Vector(geoma_vector), Datum::Scalar(geomb_scalar)) => { + let geomb = geomb_scalar + .as_binary() + .value() + .ok_or_else(|| vortex_err!("literal argument to ST_Contains cannot be NULL"))?; + + let geomb = parse_wkb(&geomb); + + let geoma_bin = geoma_vector.as_binary(); + let matches = BitBuffer::collect_bool(geoma_bin.len(), |index| { + match geoma_bin.get_ref(index) { + None => false, + Some(geoma_buf) => { + let geoma = parse_wkb(geoma_buf); + geoma.contains(&geomb) + } + } + }); + Ok(Datum::Vector(Vector::Bool(BoolVector::new( + matches, + geoma_vector.validity().clone(), + )))) + } + (Datum::Vector(geoma_vector), Datum::Vector(geomb_vector)) => { + vortex_ensure!( + geoma_vector.len() == geomb_vector.len(), + "ST_Contains input vectors must have same length" + ); + + let geoma_bin = geoma_vector.as_binary(); + let geomb_bin = geomb_vector.as_binary(); + + let matches = BitBuffer::collect_bool(geoma_bin.len(), |index| { + let wkb_a = geoma_bin.get_ref(index); + let wkb_b = geomb_bin.get_ref(index); + + match (wkb_a, wkb_b) { + (Some(a), Some(b)) => contains_wkb_slow(a, b), + _ => false, + } + }); + + let validity = geoma_bin.validity().bitand(geomb_bin.validity()); + + Ok(Datum::Vector(Vector::Bool(BoolVector::new( + matches, validity, + )))) + } + } + } +} + +fn contains_wkb_slow(left: &[u8], right: &[u8]) -> bool { + let left_geom = parse_wkb(left); + let right_geom = parse_wkb(right); + + left_geom.contains(&right_geom) +} + +fn parse_wkb(wkb: &[u8]) -> Geometry { + let mut writer = GeoWriter::new(); + wkb::Wkb(wkb) + .process_geom(&mut writer) + .expect("wkb parsing left"); + writer.take_geometry().expect("wkb should yield geometry") +} diff --git a/vortex-array/src/scalar_fns/mod.rs b/vortex-array/src/scalar_fns/mod.rs index f00b0d0e7c2..e0b50458102 100644 --- a/vortex-array/src/scalar_fns/mod.rs +++ b/vortex-array/src/scalar_fns/mod.rs @@ -25,6 +25,8 @@ pub mod get_item; pub mod is_null; pub mod mask; pub mod not; +// HACKHACK(aduffy): should put this in a separate crate +pub mod geospatial; /// A collection of built-in scalar functions that can be applied to expressions or arrays. pub trait ExprBuiltins: Sized { diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index c929b1befc1..63c0b9b5c45 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -42,7 +42,7 @@ vortex-utils = { workspace = true, features = ["dashmap"] } [dev-dependencies] anyhow = { workspace = true } -datafusion = { workspace = true } +datafusion = { workspace = true, features = ["parquet"] } insta = { workspace = true } rstest = { workspace = true } tempfile = { workspace = true } diff --git a/vortex-datafusion/examples/decode_file.rs b/vortex-datafusion/examples/decode_file.rs new file mode 100644 index 00000000000..35cd1cff444 --- /dev/null +++ b/vortex-datafusion/examples/decode_file.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! This was a one-off file to test out to see if building a pre-trained dictionary on WKB helped +//! us compress more. The answer seems to be no, probably because our files are large enough that it +//! doesn't end up mattering. + +use datafusion::arrow::array::AsArray; +use datafusion::arrow::datatypes::BinaryType; +use datafusion::parquet::arrow::ProjectionMask; +use datafusion::parquet::arrow::arrow_reader::ArrowReaderBuilder; +use futures::StreamExt; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; + +#[tokio::main] +pub async fn main() { + let f = File::open( + "/Users/aduffy/Downloads/BuildingsParquet/custom_download_20251204_095222.parquet", + ) + .await + .unwrap(); + + let mut reader = ArrowReaderBuilder::new(f).await.unwrap(); + + let schema = reader.parquet_schema(); + let projection_mask = ProjectionMask::roots(&schema, [7]); + + reader = reader.with_projection(projection_mask); + let mut reader = reader.build().unwrap(); + + let mut packed = File::create("/Users/aduffy/Downloads/wkb_all.bin") + .await + .unwrap(); + + let mut index = 0; + while let Some(next) = reader.next().await { + let next = next.expect("read error"); + let bytes = next.column(0).as_bytes::(); + let (_, buffer, _) = bytes.clone().into_parts(); + std::fs::write( + format!("/Users/aduffy/Downloads/wkb/{index}.bin"), + buffer.as_slice(), + ) + .unwrap(); + packed.write_all(&buffer).await.unwrap(); + index += 1; + } +} diff --git a/vortex-layout/Cargo.toml b/vortex-layout/Cargo.toml index 4e062f66ba3..d6db97272d5 100644 --- a/vortex-layout/Cargo.toml +++ b/vortex-layout/Cargo.toml @@ -21,8 +21,14 @@ arcref = { workspace = true } arrow-buffer = { workspace = true } async-stream = { workspace = true } async-trait = { workspace = true } +bincode = { workspace = true, features = ["serde"] } +fastbloom = { workspace = true, features = ["serde"] } flatbuffers = { workspace = true } futures = { workspace = true, features = ["alloc", "async-await", "executor"] } +geo = { workspace = true } +geo-types = { workspace = true, features = ["serde"] } +geozero = { workspace = true, features = ["with-wkb"] } +h3o = { workspace = true, features = ["geo", "serde"] } itertools = { workspace = true } kanal = { workspace = true } log = { workspace = true } @@ -35,6 +41,7 @@ pco = { workspace = true } pin-project-lite = { workspace = true } prost = { workspace = true } rustc-hash = { workspace = true } +serde = { workspace = true } termtree = { workspace = true } tokio = { workspace = true, features = ["rt"], optional = true } uuid = { workspace = true } diff --git a/vortex-layout/src/layouts/geo/mod.rs b/vortex-layout/src/layouts/geo/mod.rs new file mode 100644 index 00000000000..fdd349845e9 --- /dev/null +++ b/vortex-layout/src/layouts/geo/mod.rs @@ -0,0 +1,283 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +mod writer; + +use std::sync::Arc; +use std::sync::OnceLock; + +use fastbloom::BloomFilter; +use futures::future::BoxFuture; +use futures::future::Shared; +use geo::ConvexHull; +use geo_types::Geometry; +use geozero::GeozeroGeometry; +use geozero::geo_types::GeoWriter; +use geozero::wkb; +use h3o::Resolution; +use h3o::geom::ContainmentMode; +use h3o::geom::TilerBuilder; +use itertools::Itertools; +use vortex_array::Array; +use vortex_array::ArrayContext; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::DeserializeMetadata; +use vortex_array::SerializeMetadata; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::VarBinVTable; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::StructFields; +use vortex_dtype::TryFromBytes; +use vortex_error::SharedVortexResult; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +pub use writer::*; + +use crate::LayoutChildType; +use crate::LayoutChildren; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::LazyReaderChildren; +use crate::VTable; +use crate::children::OwnedLayoutChildren; +use crate::layouts::geo::reader::GeoReader; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +/// A layout that stores a bloom filter of H3 tile IDs. +#[derive(Clone, Debug)] +pub struct GeoLayout { + dtype: DType, + children: Arc, + /// How many rows are in a zone (except the last zone which might be shorter) + zone_len: usize, +} + +impl GeoLayout { + #[allow(clippy::panic)] + pub fn new(data: LayoutRef, filter: LayoutRef, zone_len: usize) -> Self { + assert!(zone_len > 0); + assert_eq!( + filter.dtype(), + &GeoFilter::dtype(), + "Invalid DType for filters child" + ); + + // TODO(aduffy): check the DType for the r-tree table + Self { + dtype: data.dtype().clone(), + children: OwnedLayoutChildren::layout_children(vec![data, filter]), + zone_len, + } + } + + pub fn nzones(&self) -> usize { + usize::try_from(self.children.child_row_count(1)).vortex_expect("Invalid number of zones") + } +} + +// the whole RTree is very large. +#[derive(Debug, PartialEq, Eq, Clone)] +pub struct GeoMetadata { + pub(super) zone_len: u32, +} + +impl DeserializeMetadata for GeoMetadata { + type Output = Self; + + fn deserialize(metadata: &[u8]) -> VortexResult { + let zone_len = u32::try_from_le_bytes(&metadata[0..4])?; + Ok(Self { zone_len }) + } +} + +impl SerializeMetadata for GeoMetadata { + fn serialize(self) -> Vec { + let mut output = Vec::with_capacity(4); + output.extend_from_slice(self.zone_len.to_le_bytes().as_slice()); + output + } +} + +#[derive(Debug)] +pub struct GeoLayoutEncoding; + +vtable!(Geo); + +impl VTable for GeoVTable { + type Layout = GeoLayout; + type Encoding = GeoLayoutEncoding; + type Metadata = GeoMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_ref("vortex.geo.rtree") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(GeoLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + // 0-th child is the data + layout.children.child_row_count(0) + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + #[allow(clippy::cast_possible_truncation)] + fn metadata(layout: &Self::Layout) -> Self::Metadata { + GeoMetadata { + zone_len: layout.zone_len as u32, + } + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(_layout: &Self::Layout) -> usize { + 2 + } + + fn child(layout: &Self::Layout, idx: usize) -> VortexResult { + match idx { + 0 => layout.children.child(0, layout.dtype()), + 1 => layout.children.child(1, &GeoFilter::dtype()), + _ => vortex_bail!("Invalid child index for RTreeLayout {idx}"), + } + } + + fn child_type(_layout: &Self::Layout, idx: usize) -> LayoutChildType { + match idx { + 0 => LayoutChildType::Transparent(Arc::from("data")), + 1 => LayoutChildType::Auxiliary(Arc::from("rtree")), + _ => unreachable!("invalid child index for RTreeLayout"), + } + } + + fn new_reader( + layout: &GeoLayout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ) -> VortexResult { + let names = vec![Arc::from(format!("{name}.data")), Arc::from("{name}.rtree")]; + let children = LazyReaderChildren::new( + layout.children.clone(), + vec![layout.dtype.clone(), GeoFilter::dtype()], + names, + segment_source, + session.clone(), + ); + + Ok(Arc::new(GeoReader { + name, + layout: layout.clone(), + children, + geo_filter: OnceLock::new(), + })) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + _row_count: u64, + metadata: &GeoMetadata, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: ArrayContext, + ) -> VortexResult { + Ok(GeoLayout { + zone_len: metadata.zone_len as usize, + dtype: dtype.clone(), + children: children.to_arc(), + }) + } +} + +/// Make a `GeometryObject` from a WKB-encoded geometry value. +pub(crate) fn make_geom(wkb: &[u8]) -> Option { + let mut geo = GeoWriter::new(); + wkb::Wkb(wkb).process_geom(&mut geo).unwrap(); + geo.take_geometry() +} + +/// A filter over geospatial data that allows for efficient pruning of intersection and containment +/// queries. +/// +/// Physically, it is a set of zone-level bloom filters over the H3 cell IDs contained within the +/// chunk, up to a certain resolution level. +/// +/// TODO(aduffy): make the cell ID resolution adaptive based on the chunk overview. +#[derive(Clone)] +pub(crate) struct GeoFilter { + inner: Vec, +} + +impl GeoFilter { + pub fn dtype() -> DType { + DType::Struct( + StructFields::from_iter([("filter", DType::Binary(Nullability::Nullable))]), + Nullability::NonNullable, + ) + } + + pub fn try_load(array: ArrayRef) -> VortexResult { + // What is the purpose of this large set of files? + let Canonical::Struct(struct_array) = array.to_canonical() else { + vortex_bail!("expected StructArray from GeoFilter layout"); + }; + + let filters_col = struct_array.fields()[0].clone(); + // TODO(aduffy): this is dumb and we shouldn't actually force this, but for now we + // force to VarBin encoding b/c VarBinView is strictly larger. + let filters_col_bin = filters_col.as_::(); + let decoded_filters: Vec = filters_col_bin.with_iterator(|binary| { + binary + .map(|b| match b { + None => VortexResult::Ok(BloomFilter::from_vec(vec![0]).expected_items(0)), + Some(v) => { + let (filter, _) = + bincode::serde::decode_from_slice(v, bincode::config::standard()) + .map_err(|e| vortex_err!("failed to decode BloomFilter: {e}"))?; + VortexResult::Ok(filter) + } + }) + .try_collect() + })?; + + Ok(Self { + inner: decoded_filters, + }) + } + + /// Probe the GeoFilter to see if the given zone *may* contain a geometry that is covered + /// by the given geometry. + pub fn filter_contains(&self, zone_id: usize, cell_ids: &[u64]) -> bool { + let filter = &self.inner[zone_id]; + + // If some cells of the geometry are contained by this zone, then it's possible that there + // may be some geometry in this zone that would pass a query `ST_CONTAINS(geom, zone_geometry)`. + for &cell_id in cell_ids { + if filter.contains_hash(cell_id) { + return true; + } + } + + false + } +} + +type SharedGeoFilter = Shared>>; diff --git a/vortex-layout/src/layouts/geo/reader.rs b/vortex-layout/src/layouts/geo/reader.rs new file mode 100644 index 00000000000..7595d6e0e42 --- /dev/null +++ b/vortex-layout/src/layouts/geo/reader.rs @@ -0,0 +1,249 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::BTreeSet; +use std::ops::BitAnd; +use std::ops::Range; +use std::sync::Arc; +use std::sync::OnceLock; + +use futures::FutureExt; +use futures::TryFutureExt; +use geo::ConvexHull; +use geo_types::Geometry; +use geozero::GeozeroGeometry; +use geozero::geo_types::GeoWriter; +use geozero::wkb; +use h3o::Resolution; +use h3o::geom::ContainmentMode; +use h3o::geom::TilerBuilder; +use itertools::Itertools; +use vortex_array::Array; +use vortex_array::MaskFuture; +use vortex_array::expr::Expression; +use vortex_array::expr::Literal; +use vortex_array::expr::root; +use vortex_array::expr::st_contains::STContains; +use vortex_buffer::BitBufferMut; +use vortex_dtype::DType; +use vortex_dtype::FieldMask; +use vortex_error::VortexError; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_panic; +use vortex_mask::Mask; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderRef; +use crate::LazyReaderChildren; +use crate::layouts::geo::GeoFilter; +use crate::layouts::geo::GeoLayout; +use crate::layouts::geo::SharedGeoFilter; + +pub struct GeoReader { + pub(crate) name: Arc, + pub(crate) layout: GeoLayout, + pub(crate) children: LazyReaderChildren, + pub(crate) geo_filter: OnceLock, + // TODO(aduffy): cache the pruning result +} + +impl GeoReader { + fn data_child(&self) -> VortexResult<&LayoutReaderRef> { + self.children.get(0) + } + + /// Get the range of zone IDs containing a row range. + pub(crate) fn zone_range(&self, row_range: &Range) -> Range { + // Zone length is guaranteed to be > 0 by ZonedLayout::new validation + debug_assert!(self.layout.zone_len > 0, "zone_len must be > 0"); + let zone_len_u64 = self.layout.zone_len as u64; + let zone_start = row_range.start / zone_len_u64; + let zone_end = row_range.end.div_ceil(zone_len_u64); + zone_start..zone_end + } + + /// Get the row index for the first row in a zone with the given `zone_index`. + pub(crate) fn first_row_offset(&self, zone_idx: u64) -> u64 { + zone_idx + .saturating_mul(self.layout.zone_len as u64) + .min(self.layout.row_count()) + } + + fn geo_filter(&self) -> SharedGeoFilter { + self.geo_filter + .get_or_init(move || { + let nzones = self.layout.nzones(); + + let zones_eval = self + .children + .get(1) + .vortex_expect("failed to get zone child") + .projection_evaluation( + &(0..nzones as u64), + &root(), + MaskFuture::new_true(nzones), + ) + .vortex_expect("Failed construct zone map evaluation"); + + async move { + let zones_array = zones_eval.await?; + GeoFilter::try_load(zones_array) + } + .map_err(Arc::new) + .boxed() + .shared() + }) + .clone() + } +} + +impl LayoutReader for GeoReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + &self.layout.dtype + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + // Register splits from the data. + self.children + .get(0)? + .register_splits(field_mask, row_range, splits)?; + + Ok(()) + } + + fn pruning_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: Mask, + ) -> VortexResult { + let data_eval = self + .data_child()? + .pruning_evaluation(row_range, expr, mask.clone())?; + + if let Some(st_contains) = expr.as_opt::() { + // Get the st_contains by scanning the input + let lhs = st_contains.child(0); + let rhs = st_contains.child(1); + + if let Some(lhs_lit) = lhs.as_opt::() { + // Return the applied version from this instead. + let Some(v) = lhs_lit.data().as_binary_opt() else { + return Ok(MaskFuture::ready(mask)); + }; + + let Some(wkb) = v.value() else { + return Ok(MaskFuture::ready(mask)); + }; + + let geometry = parse_wkb(&wkb); + + // Get the literal from it here. + let geo_filter = self.geo_filter(); + + // Append the new mask instead here + let len = mask.len(); + let zone_len = self.layout.zone_len; + let row_count = row_range.end - row_range.start; + let zone_range = self.zone_range(row_range); + let zone_lengths: Vec<_> = zone_range + .clone() + .map(|zone_idx| { + // Figure out the range in the mask that corresponds to the zone + let start = usize::try_from( + self.first_row_offset(zone_idx) + .saturating_sub(row_range.start), + )?; + let end = usize::try_from( + self.first_row_offset(zone_idx + 1) + .saturating_sub(row_range.start) + .min(row_count), + )?; + Ok::<_, VortexError>(end - start) + }) + .try_collect()?; + + return Ok(MaskFuture::new(len, async move { + let geo_filter = geo_filter.await?; + let mut builder = BitBufferMut::with_capacity(mask.len()); + // Generate the cell IDs for the geometry. + // TODO(aduffy): store this config centrally so the read/write paths don't drift. + let mut tiler = TilerBuilder::new(Resolution::Eight) + .containment_mode(ContainmentMode::Covers) + .build(); + tiler + .add(geometry.convex_hull()) + .unwrap_or_else(|e| vortex_panic!("failed to add polygon to tiler: {e}")); + let cell_ids = tiler.into_coverage().map(u64::from).collect_vec(); + + for (zone_idx, &zone_length) in zone_range.clone().zip_eq(&zone_lengths) { + builder.append_n( + geo_filter.filter_contains(usize::try_from(zone_idx)?, &cell_ids), + zone_length, + ); + } + + let stats_mask = Mask::from(builder.freeze()); + assert_eq!(stats_mask.len(), mask.len(), "Mask length mismatch"); + + // Intersect the masks. + let mut stats_mask = mask.bitand(&stats_mask); + + // Forward to data child for further pruning. + if !stats_mask.all_false() { + let data_mask = data_eval.await?; + stats_mask = stats_mask.bitand(&data_mask); + } + + Ok(stats_mask) + })); + } + } + + // Re-run the data eval. + self.data_child()?.pruning_evaluation(row_range, expr, mask) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + self.data_child()?.filter_evaluation(row_range, expr, mask) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // TODO(aduffy): can we do anything better here? + self.data_child()? + .projection_evaluation(row_range, expr, mask) + } +} + +fn parse_wkb(wkb: &[u8]) -> Geometry { + let mut writer = GeoWriter::new(); + wkb::Wkb(wkb) + .process_geom(&mut writer) + .expect("wkb parsing left"); + writer.take_geometry().expect("wkb should yield geometry") +} diff --git a/vortex-layout/src/layouts/geo/writer.rs b/vortex-layout/src/layouts/geo/writer.rs new file mode 100644 index 00000000000..25916ab88b2 --- /dev/null +++ b/vortex-layout/src/layouts/geo/writer.rs @@ -0,0 +1,193 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Strategy that writes data with an embedded geospatial index. +//! +//! This strategy expects to receive a chunk stream of `BINARY` data that corresponds to WKB +//! encoded geometry objects. Each chunk yields a new bloom filter composed of the H3 cell IDs for +//! all geometries in the chunk. This allows us to do very fast handling of contains queries, +//! without needing to read the full data or decode the WKBs until after a large pruning step. + +use std::sync::Arc; +use std::sync::Mutex; + +use async_trait::async_trait; +use fastbloom::BloomFilter; +use futures::StreamExt; +use geo::ConvexHull; +use h3o::Resolution; +use h3o::geom::ContainmentMode; +use h3o::geom::TilerBuilder; +use vortex_array::Array; +use vortex_array::ArrayContext; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::builder::VarBinBuilder; +use vortex_array::validity::Validity; +use vortex_dtype::DType; +use vortex_dtype::FieldNames; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_error::vortex_panic; +use vortex_io::runtime::Handle; + +use crate::IntoLayout; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::geo::GeoLayout; +use crate::layouts::geo::make_geom; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialArrayStreamExt; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +#[derive(Clone)] +pub struct GeoStrategy { + data_child: Arc, + geo_filter_child: Arc, + zone_len: usize, +} + +impl GeoStrategy { + pub fn new( + data_child: Arc, + geo_filter_child: Arc, + zone_len: usize, + ) -> Self { + Self { + data_child, + geo_filter_child, + zone_len, + } + } +} + +#[async_trait] +impl LayoutStrategy for GeoStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + stream: SendableSequentialStream, + mut eof: SequencePointer, + handle: Handle, + ) -> VortexResult { + // split off data stream write to complete first. + let data_eof = eof.split_off(); + let accum = H3BloomAccumulator::new(); + + let accum2 = accum.clone(); + let stream = SequentialStreamAdapter::new( + stream.dtype().clone(), + stream.map(move |chunk| { + // simple stream transformation that passes the chunk through to the accumulator + if let Ok((_, ref geoms)) = chunk { + accum2.push_chunk(geoms); + } + chunk + }), + ) + .sendable(); + + let data_layout = self + .data_child + .write_stream( + ctx.clone(), + segment_sink.clone(), + stream, + data_eof, + handle.clone(), + ) + .await?; + + // After child write completes, get back the inner type. + let geo_filter = accum.finish(); + + // Write the rtree to a separate node + let geo_filter_stream = geo_filter + .to_array_stream() + .sequenced(eof.split_off()) + .sendable(); + + let geo_filter_layout = self + .geo_filter_child + .write_stream(ctx, segment_sink, geo_filter_stream, eof, handle) + .await?; + + Ok(GeoLayout::new(data_layout, geo_filter_layout, self.zone_len).into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + self.data_child.buffered_bytes() + } +} + +#[derive(Clone)] +struct H3BloomAccumulator { + inner: Arc>>, +} + +impl H3BloomAccumulator { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(VarBinBuilder::with_capacity(128))), + } + } +} + +impl H3BloomAccumulator { + fn push_chunk(&self, chunk: &ArrayRef) { + let chunk = chunk.to_varbinview(); + chunk.with_iterator(|iter| { + let mut tiler = TilerBuilder::new(Resolution::Eight) + .containment_mode(ContainmentMode::Covers) + .build(); + + for geom in iter.filter_map(|v| make_geom(v?)) { + // We use the H3 library to turn each geometry into a set of H3 cells that + // fully cover the geometry. + // TODO(aduffy): how expensive is this? + let cv = geom.convex_hull(); + + // Add to the tiler + tiler + .add(cv) + .unwrap_or_else(|e| vortex_panic!("Failed to tile geometry: {e}")); + } + // TODO(aduffy): tweak these params + let mut filter = BloomFilter::with_false_pos(0.01).expected_items(8192); + for cell_id in tiler.into_coverage() { + filter.insert_hash(cell_id.into()); + } + + // Return the serialized copy of the bloom filter + let encoded = bincode::serde::encode_to_vec(&filter, bincode::config::standard()) + .expect("rtree serde"); + + self.inner.lock().expect("poisoned").append_value(encoded); + }); + } + + /// Finish the accumulator yielding the RTree table + fn finish(&self) -> ArrayRef { + // Slice and contain only these chunks. + let mut inner = self.inner.lock().expect("poisoned"); + let builder = std::mem::take(&mut *inner); + let column = builder.finish(DType::Binary(Nullability::Nullable)); + let len = column.len(); + + // Build the output array + StructArray::new( + FieldNames::from(vec!["filter"]), + vec![column.into_array()], + len, + Validity::NonNullable, + ) + .into_array() + } +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 3ba9b01b727..55d5c4d830d 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -17,7 +17,9 @@ pub mod compressed; pub mod dict; pub mod file_stats; pub mod flat; +pub mod geo; pub(crate) mod partitioned; +pub mod path; pub mod repartition; pub mod row_idx; pub mod struct_; diff --git a/vortex-layout/src/layouts/path.rs b/vortex-layout/src/layouts/path.rs new file mode 100644 index 00000000000..50f469c8c8e --- /dev/null +++ b/vortex-layout/src/layouts/path.rs @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Special strategy configurable to write based on the schema path. + +use std::sync::Arc; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::TryStreamExt; +use futures::future::try_join_all; +use futures::pin_mut; +use itertools::Itertools; +use vortex_array::ArrayContext; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_dtype::DType; +use vortex_dtype::Field; +use vortex_dtype::FieldName; +use vortex_dtype::FieldPath; +use vortex_dtype::Nullability; +use vortex_error::VortexError; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_io::kanal_ext::KanalExt; +use vortex_io::runtime::Handle; +use vortex_utils::aliases::DefaultHashBuilder; +use vortex_utils::aliases::hash_map::HashMap; +use vortex_utils::aliases::hash_set::HashSet; + +use crate::IntoLayout; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::struct_::StructLayout; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequenceId; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +pub struct PathStrategy { + // A set of leaf field overrides, e.g. to force one column to be compact-compressed. + leaf_writers: HashMap>, + // The writer for any validity arrays that may be present + validity: Arc, + // The fallback writer for any fields that do not have an explicit writer set in `leaf_writers` + fallback: Arc, +} + +impl PathStrategy { + /// Create a new field writer with the given path validity + pub fn new( + leaf_writers: HashMap>, + validity: Arc, + fallback: Arc, + ) -> Self { + Self { + leaf_writers, + validity, + fallback, + } + } +} + +impl PathStrategy { + fn descend(&self, field: &Field) -> Self { + // Start with the existing set of overrides, then only retain the ones that contain + // the current field + let mut new_writers = self.leaf_writers.clone(); + new_writers.retain(|k, _| k.starts_with_field(field)); + + Self { + leaf_writers: new_writers, + validity: self.validity.clone(), + fallback: self.fallback.clone(), + } + } +} + +/// Specialized strategy for when we exactly know the input schema. +#[async_trait] +impl LayoutStrategy for PathStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + stream: SendableSequentialStream, + mut eof: SequencePointer, + handle: Handle, + ) -> VortexResult { + let dtype = stream.dtype().clone(); + let struct_dtype = dtype.as_struct_fields(); + + // Check for unique field names at write time. + if HashSet::<_, DefaultHashBuilder>::from_iter(struct_dtype.names().iter()).len() + != struct_dtype.names().len() + { + vortex_bail!("StructLayout must have unique field names"); + } + let is_nullable = dtype.is_nullable(); + + // Optimization: when there are no fields, don't spawn any work and just write a trivial + // StructLayout. + if struct_dtype.nfields() == 0 && !is_nullable { + let row_count = stream + .try_fold( + 0u64, + |acc, (_, arr)| async move { Ok(acc + arr.len() as u64) }, + ) + .await?; + return Ok(StructLayout::new(row_count, dtype, vec![]).into_layout()); + } + + // stream -> stream> + let columns_vec_stream = stream.map(move |chunk| { + let (sequence_id, chunk) = chunk?; + let mut sequence_pointer = sequence_id.descend(); + let struct_chunk = chunk.to_struct(); + let mut columns: Vec<(SequenceId, ArrayRef)> = Vec::new(); + if is_nullable { + columns.push(( + sequence_pointer.advance(), + chunk.validity_mask().into_array(), + )); + } + + columns.extend( + struct_chunk + .fields() + .iter() + .map(|field| (sequence_pointer.advance(), field.to_array())), + ); + + Ok(columns) + }); + + let mut stream_count = struct_dtype.nfields(); + if is_nullable { + stream_count += 1; + } + + let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) = + (0..stream_count).map(|_| kanal::bounded_async(1)).unzip(); + + // Spawn a task to fan out column chunks to their respective transposed streams + handle + .spawn(async move { + pin_mut!(columns_vec_stream); + while let Some(result) = columns_vec_stream.next().await { + match result { + Ok(columns) => { + for (tx, column) in column_streams_tx.iter().zip_eq(columns.into_iter()) + { + let _ = tx.send(Ok(column)).await; + } + } + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx.send(Err(VortexError::from(e.clone()))).await; + } + break; + } + } + } + }) + .detach(); + + // First child column is the validity, subsequence children are the individual struct fields + let column_dtypes: Vec = if is_nullable { + std::iter::once(DType::Bool(Nullability::NonNullable)) + .chain(struct_dtype.fields()) + .collect() + } else { + struct_dtype.fields().collect() + }; + + let column_names: Vec = if is_nullable { + std::iter::once(FieldName::from("__validity")) + .chain(struct_dtype.names().iter().cloned()) + .collect() + } else { + struct_dtype.names().iter().cloned().collect() + }; + + let layout_futures: Vec<_> = column_dtypes + .into_iter() + .zip_eq(column_streams_rx) + .zip_eq(column_names) + .enumerate() + .map(move |(index, ((dtype, recv), name))| { + println!("PathStrategy visiting {name}"); + let column_stream = + SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed()) + .sendable(); + let child_eof = eof.split_off(); + let field = Field::Name(name.clone()); + handle.spawn_nested(|h| { + let fallback = self.fallback.clone(); + let validity = self.validity.clone(); + // descend further and try with new fields + let writer = self + .leaf_writers + .get(&FieldPath::from_name(name)) + .cloned() + .unwrap_or_else(|| { + if dtype.is_struct() { + // Step into the field path for struct columns + Arc::new(self.descend(&field)) + } else { + // Use fallback for leaf columns + self.fallback.clone() + } + }); + let ctx = ctx.clone(); + let dtype = dtype.clone(); + let segment_sink = segment_sink.clone(); + + async move { + // If we have a matching writer, we use it. + // Otherwise, we descend into a new modified one. + // Write validity stream + if index == 0 && is_nullable { + validity + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else { + // Use the underlying writer, otherwise use the fallback writer. + writer + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } + } + }) + }) + .collect(); + + let column_layouts = try_join_all(layout_futures).await?; + // TODO(os): transposed stream could count row counts as well, + // This must hold though, all columns must have the same row count of the struct layout + let row_count = column_layouts.first().map(|l| l.row_count()).unwrap_or(0); + Ok(StructLayout::new(row_count, dtype, column_layouts).into_layout()) + } +} diff --git a/vortex/Cargo.toml b/vortex/Cargo.toml index 3f7918b54c8..3f7c7cfdb8e 100644 --- a/vortex/Cargo.toml +++ b/vortex/Cargo.toml @@ -55,10 +55,16 @@ vortex-zstd = { workspace = true, optional = true } [dev-dependencies] anyhow = { workspace = true } arrow-array = { workspace = true } +axum = { version = "0.8.7" } divan = { workspace = true } +futures = { workspace = true } +geo = { workspace = true } +geo-types = { workspace = true } +geozero = { workspace = true, features = ["with-wkb"] } itertools = { workspace = true } mimalloc = { workspace = true } -parquet = { workspace = true } +parquet = { workspace = true, features = ["async", "tokio"] } +serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tracing = { workspace = true } diff --git a/vortex/examples/geo_web.rs b/vortex/examples/geo_web.rs new file mode 100644 index 00000000000..afca14e3e44 --- /dev/null +++ b/vortex/examples/geo_web.rs @@ -0,0 +1,274 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; +use std::sync::LazyLock; + +use axum::extract::Query; +use axum::extract::State; +use axum::response::Html; +use axum::routing::get; +use axum::Json; +use axum::Router; +use futures::pin_mut; +use futures::StreamExt; +use futures::TryStreamExt; +use geo::algorithm::centroid::Centroid; +use geo_types::Geometry; +use geo_types::Rect; +use geozero::geo_types::GeoWriter; +use geozero::wkb; +use geozero::wkb::WkbDialect; +use geozero::GeozeroGeometry; +use itertools::Itertools; +use serde::Deserialize; +use serde::Serialize; +use vortex::VortexSessionDefault; +use vortex_array::accessor::ArrayAccessor; +use vortex_array::expr::col; +use vortex_array::expr::lit; +use vortex_array::expr::pack; +use vortex_array::expr::session::ExprSessionExt; +use vortex_array::expr::st_contains::STContains; +use vortex_array::expr::ExprVTable; +use vortex_array::expr::VTableExt; +use vortex_array::Array; +use vortex_array::ToCanonical; +use vortex_buffer::ByteBuffer; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_file::OpenOptionsSessionExt; +use vortex_file::VortexFile; +use vortex_layout::layouts::geo::GeoLayoutEncoding; +use vortex_layout::session::LayoutSessionExt; +use vortex_layout::LayoutEncodingRef; +use vortex_session::VortexSession; + +#[derive(Deserialize)] +struct ViewportQuery { + south: f64, + west: f64, + north: f64, + east: f64, + zoom: u8, +} + +#[derive(Serialize)] +struct CountResponse { + count: usize, +} + +#[derive(Serialize)] +struct GeoJsonResponse { + r#type: String, + features: Vec, +} + +#[derive(Serialize)] +struct GeoJsonFeature { + r#type: String, + geometry: GeoJsonGeometry, + properties: GeoJsonProperties, +} + +#[derive(Serialize)] +struct GeoJsonGeometry { + r#type: String, + coordinates: [f64; 2], +} + +#[derive(Serialize)] +struct GeoJsonProperties { + name: String, + description: String, +} + +#[derive(Clone)] +struct AppState { + /// File with geospatial data backing the state service. + vxf: VortexFile, +} + +static SESSION: LazyLock = LazyLock::new(|| { + let session = VortexSession::default(); + + // Register our custom geospatial layouts and functions + session + .layouts() + .register(LayoutEncodingRef::new_ref(GeoLayoutEncoding.as_ref())); + session + .expressions() + .register(ExprVTable::new_static(&STContains)); + + session +}); + +#[tokio::main] +pub async fn main() { + // Open up the Vortex file. + let vxf = SESSION + .open_options() + .open("buildings_large_2.vortex") + .await + .expect("Opening file failed"); + + let state = Arc::new(AppState { vxf }); + + let api_routes = Router::new() + .route("/hello", get(hello_handler)) + .route("/pins", get(pins_handler)) + .route("/counts", get(count_handler)) + .with_state(state.clone()); + + let app = Router::new() + .route("/index.html", get(index_handler)) + .nest("/api", api_routes) + .with_state(state.clone()); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:3001") + .await + .unwrap(); + + println!("Server running on http://127.0.0.1:3001"); + println!(" - Static page: http://127.0.0.1:3001/index.html"); + println!(" - API endpoint: http://127.0.0.1:3001/api/hello"); + println!(" - API endpoint: http://127.0.0.1:3001/api/pins"); + + axum::serve(listener, app).await.unwrap(); +} + +async fn index_handler() -> Html<&'static str> { + Html(include_str!("index.html")) +} + +async fn hello_handler() -> &'static str { + "Hello, World!" +} + +// Query the backing file using the pins instead. +async fn pins_handler( + State(state): State>, + Query(params): Query, +) -> Json { + // For now, generate some sample pins within the viewport + // In a real application, you'd query a database here + let rect = Geometry::Rect(Rect::new( + [params.west, params.south], + [params.east, params.north], + )); + + let mut target_wkb: Vec = vec![]; + let mut writer = wkb::WkbWriter::new(&mut target_wkb, WkbDialect::Wkb); + rect.process_geom(&mut writer).unwrap(); + let target_wkb = ByteBuffer::from(target_wkb); + + let filter = STContains + .try_new_expr((), [lit(target_wkb), col("geometry")]) + .expect("failed to build filter"); + + // Perform the scan operation over the file. + let stream = state + .vxf + .scan() + .expect("creating scan") + .with_filter(filter) + .with_projection(pack( + [ + ("geometry", col("geometry")), + ("occupancy", col("occupancy")), + ("last_update", col("last_update")), + ], + Nullability::NonNullable, + )) + .into_array_stream() + .expect("into_array_stream"); + pin_mut!(stream); + + let result = stream + .try_collect::>() + .await + .expect("reading stream failed"); + + let features = result + .into_iter() + .flat_map(|building| { + // Parse the WKB and extract out the centroid of it. + let building = building.to_struct(); + let geometry = building.field_by_name("geometry").unwrap(); + geometry + .to_varbinview() + .with_iterator(|wkbs| wkbs.filter_map(|wkb| Some(parse_wkb(wkb?))).collect_vec()) + .into_iter() + .filter_map(|geom| geom.centroid()) + .map(|point| GeoJsonFeature { + r#type: "Feature".to_string(), + geometry: GeoJsonGeometry { + r#type: "Point".to_string(), + coordinates: [point.x(), point.y()], + }, + properties: GeoJsonProperties { + name: "a point".to_string(), + description: "a description".to_string(), + }, + }) + }) + .collect_vec(); + + Json(GeoJsonResponse { + r#type: "FeatureCollection".to_string(), + features, + }) +} + +async fn count_handler( + State(state): State>, + Query(params): Query, +) -> Json { + // For now, generate some sample pins within the viewport + // In a real application, you'd query a database here + let rect = Geometry::Rect(Rect::new( + [params.west, params.south], + [params.east, params.north], + )); + + let mut target_wkb: Vec = vec![]; + let mut writer = wkb::WkbWriter::new(&mut target_wkb, WkbDialect::Wkb); + rect.process_geom(&mut writer).unwrap(); + let target_wkb = ByteBuffer::from(target_wkb); + + let filter = STContains + .try_new_expr((), [lit(target_wkb), col("geometry")]) + .expect("failed to build filter"); + + // Perform the scan operation over the file. + let stream = state + .vxf + .scan() + .expect("creating scan") + .with_filter(filter) + .with_projection(pack( + [("occupancy", col("occupancy"))], + Nullability::NonNullable, + )) + .into_array_stream() + .expect("into_array_stream"); + + pin_mut!(stream); + + let counts = stream + .map(|chunk| VortexResult::Ok(chunk?.len())) + .try_collect::>() + .await + .expect("counting stream failed"); + let count = counts.into_iter().sum::(); + + Json(CountResponse { count }) +} + +fn parse_wkb(wkb: &[u8]) -> Geometry { + let mut writer = GeoWriter::new(); + wkb::Wkb(wkb) + .process_geom(&mut writer) + .expect("wkb parsing left"); + writer.take_geometry().expect("wkb should yield geometry") +} diff --git a/vortex/examples/geospatial_scan.rs b/vortex/examples/geospatial_scan.rs new file mode 100644 index 00000000000..cf3a6e03795 --- /dev/null +++ b/vortex/examples/geospatial_scan.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use futures::StreamExt; +use futures::pin_mut; +use geo_types::Coord; +use geo_types::Geometry; +use geo_types::Rect; +use geozero::GeozeroGeometry; +use geozero::wkb; +use geozero::wkb::WkbDialect; +use vortex::VortexSessionDefault; +use vortex_array::Array; +use vortex_array::expr::ExprVTable; +use vortex_array::expr::VTableExt; +use vortex_array::expr::col; +use vortex_array::expr::lit; +use vortex_array::expr::session::ExprSessionExt; +use vortex_array::expr::st_contains::STContains; +use vortex_buffer::ByteBuffer; +use vortex_file::OpenOptionsSessionExt; +use vortex_layout::LayoutEncodingRef; +use vortex_layout::layouts::geo::GeoLayoutEncoding; +use vortex_layout::session::LayoutSessionExt; +use vortex_session::VortexSession; + +#[tokio::main] +pub async fn main() { + let session = VortexSession::default(); + + // Regsiter the ST_CONTAINS expression. + session + .expressions() + .register(ExprVTable::new_static(&STContains)); + + session + .layouts() + .register(LayoutEncodingRef::new_ref(GeoLayoutEncoding.as_ref())); + + let mut target_wkb: Vec = vec![]; + let mut writer = wkb::WkbWriter::new(&mut target_wkb, WkbDialect::Wkb); + + // This should only yield 1 final building. + let target = Geometry::Rect(Rect::new( + Coord { + x: -96.9582104, + y: 20.1394955, + }, + Coord { + x: -96.9573294, + y: 20.1400545, + }, + )); + target.process_geom(&mut writer).unwrap(); + let target_wkb = ByteBuffer::from(target_wkb); + + let st_contains_filter = STContains + .try_new_expr((), [lit(target_wkb), col("geometry")]) + .expect("building new ST_Contains expression"); + + println!("executing scan with row filter {st_contains_filter}"); + + // Create the scan. + let vxf = session + .open_options() + .open("buildings_rtree.vortex") + .await + .expect("open file"); + + let stream = vxf + .scan() + .unwrap() + .with_filter(st_contains_filter) + .into_array_stream() + .unwrap(); + + pin_mut!(stream); + + let mut total_rows = 0; + while let Some(next) = stream.next().await { + let count = next.unwrap().len(); + println!("Rows matched: {:?}", count); + total_rows += count; + } + + println!("Total rows: {}", total_rows); +} diff --git a/vortex/examples/geospatial_write.rs b/vortex/examples/geospatial_write.rs new file mode 100644 index 00000000000..406f08a4049 --- /dev/null +++ b/vortex/examples/geospatial_write.rs @@ -0,0 +1,157 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; +use std::sync::LazyLock; + +use futures::StreamExt; +use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::ProjectionMask; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; +use parquet::arrow::async_reader::ParquetRecordBatchStream; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use vortex::VortexSessionDefault; +use vortex_array::ArrayRef; +use vortex_array::arrow::FromArrowArray; +use vortex_array::stream::ArrayStreamAdapter; +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_dtype::FieldPath; +use vortex_dtype::arrow::FromArrowType; +use vortex_error::VortexError; +use vortex_file::WriteOptionsSessionExt; +use vortex_file::WriteStrategyBuilder; +use vortex_layout::LayoutStrategy; +use vortex_layout::layouts::buffered::BufferedStrategy; +use vortex_layout::layouts::chunked::writer::ChunkedLayoutStrategy; +use vortex_layout::layouts::compact::CompactCompressor; +use vortex_layout::layouts::compressed::CompressingStrategy; +use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; +use vortex_layout::layouts::geo::GeoStrategy; +use vortex_layout::layouts::path::PathStrategy; +use vortex_layout::layouts::repartition::RepartitionStrategy; +use vortex_layout::layouts::repartition::RepartitionWriterOptions; +use vortex_session::VortexSession; +use vortex_utils::aliases::hash_map::HashMap; + +/// Special strategy for writing chunks of data where we add extra index structures +/// to pushdown `ST_Contains` queries. +pub static COMPACT_RTREE_STRATEGY: LazyLock> = + LazyLock::new(|| make_rtree_strategy()); + +async fn make_reader(path: &str) -> ParquetRecordBatchStream { + let f = File::open(path).await.unwrap(); + + let mut reader = ParquetRecordBatchStreamBuilder::new(f).await.unwrap(); + + let schema = reader.parquet_schema(); + + // Drop the bbox column, since we don't use it for pruning and instead use a custom RTreeLayout + let projection_mask = ProjectionMask::roots(&schema, [0, 1, 2, 3, 4, 5, 6, 7, 9]); + + reader = reader.with_projection(projection_mask); + reader.build().unwrap() +} + +#[tokio::main] +pub async fn main() { + // Load data from the Parquet dataset into our special format with the RTree indices. + // let f = File::open( + // "/Users/aduffy/Downloads/BuildingsParquet/custom_download_20251204_095222.parquet", + // ) + // .await + // .unwrap(); + // let reader1 = make_reader( + // "/Users/aduffy/Downloads/BuildingsParquet/custom_download_20251204_095222.parquet", + // ) + // .await; + + let reader = make_reader("/Users/aduffy/Downloads/building.120213.parquet").await; + + let dtype = DType::from_arrow(reader.schema().as_ref()); + + let array_stream = reader + .map(|record_batch| { + record_batch + .map_err(|e| VortexError::generic(e.into())) + .map(|rb| ArrayRef::from_arrow(rb, false)) + }) + .boxed(); + + // Setup the Vortex write to stream the records out. + let mut file = File::create("buildings_large_2.vortex").await.unwrap(); + + let session = VortexSession::default(); + let summary = session + .write_options() + .with_strategy(COMPACT_RTREE_STRATEGY.clone()) + .write(&mut file, ArrayStreamAdapter::new(dtype, array_stream)) + .await + .unwrap(); + drop(summary); + + file.shutdown().await.unwrap(); +} + +/// Make a strategy which has special handling for DType::Binary chunks named "geometry". +fn make_rtree_strategy() -> Arc { + let validity = Arc::new(FlatLayoutStrategy::default()); + let fallback = WriteStrategyBuilder::new() + .with_compressor(CompactCompressor::default()) + .build(); + + // override the handling of the "geometry" column + let leaf_writers = HashMap::from_iter([( + FieldPath::from_name(FieldName::from("geometry")), + geometry_writer(), + )]); + + Arc::new(PathStrategy::new(leaf_writers, validity, fallback)) +} + +fn geometry_writer() -> Arc { + // 7. for each chunk create a flat layout + let chunked = ChunkedLayoutStrategy::new(FlatLayoutStrategy::default()); + // 6. buffer chunks so they end up with closer segment ids physically + let buffered = BufferedStrategy::new(chunked, 2 * 1024 * 1024); // 2MB + // 5. compress each chunk with ZSTD/PCodec + let compressing = CompressingStrategy::new_compact(buffered, CompactCompressor::default()); + + // 4. prior to compression, coalesce up to a minimum size + let coalescing = RepartitionStrategy::new( + compressing, + RepartitionWriterOptions { + block_size_minimum: 1024 * 1024, + block_len_multiple: 8_192, + canonicalize: true, + }, + ); + + // 2.1. | 3.1. compress stats tables and dict values. + let compress_then_flat = CompressingStrategy::new_compact( + FlatLayoutStrategy::default(), + CompactCompressor::default(), + ); + + // 2. calculate rtree for each block + let stats = GeoStrategy::new( + Arc::new(coalescing), + Arc::new(FlatLayoutStrategy::default()), + 8_192, + ); + + // 1. repartition each column to fixed row counts + let repartition = RepartitionStrategy::new( + stats, + RepartitionWriterOptions { + // No minimum block size in bytes + block_size_minimum: 0, + // Always repartition into 8K row blocks + block_len_multiple: 8_192, + canonicalize: false, + }, + ); + + Arc::new(repartition) +} diff --git a/vortex/examples/index.html b/vortex/examples/index.html new file mode 100644 index 00000000000..f62edadfc3e --- /dev/null +++ b/vortex/examples/index.html @@ -0,0 +1,155 @@ + + + + + + Vortex Geo Web + + + + +
+
+

Information Panel

+
+

Move the map to see building counts...

+
+
+ + + + +