From ccbbd6d6bcd7e6882ee0040e8fb3976aa5d8b533 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 10 Dec 2025 23:10:25 +0000 Subject: [PATCH 1/5] Immutable session + registries Signed-off-by: Nicholas Gates --- bench-vortex/src/lib.rs | 7 +- encodings/alp/src/alp/array.rs | 3 +- .../bitpacking/array/bitpack_decompress.rs | 3 +- fuzz/src/lib.rs | 16 ++- vortex-array/src/array/mod.rs | 4 +- .../src/arrays/scalar_fn/vtable/mod.rs | 5 +- vortex-array/src/execution/mod.rs | 8 +- vortex-array/src/expr/functions/session.rs | 3 +- vortex-array/src/expr/session/mod.rs | 9 +- vortex-array/src/session/mod.rs | 9 +- vortex-cxx/src/lib.rs | 10 +- vortex-datafusion/examples/vortex_table.rs | 4 +- vortex-datafusion/src/persistent/cache.rs | 6 +- vortex-datafusion/src/persistent/format.rs | 13 +- vortex-datafusion/src/persistent/opener.rs | 7 +- vortex-datafusion/src/persistent/sink.rs | 6 +- vortex-datafusion/src/persistent/source.rs | 6 +- vortex-duckdb/src/lib.rs | 10 +- vortex-ffi/examples/hello_vortex.rs | 10 +- vortex-ffi/src/file.rs | 4 +- vortex-ffi/src/session.rs | 9 +- vortex-file/src/file.rs | 4 +- vortex-file/src/footer/deserializer.rs | 6 +- vortex-file/src/footer/mod.rs | 6 +- vortex-file/src/lib.rs | 6 +- vortex-file/src/open.rs | 20 ++- vortex-file/src/tests.rs | 9 +- vortex-file/src/writer.rs | 17 ++- vortex-io/src/session.rs | 39 ++++-- vortex-jni/src/lib.rs | 10 +- vortex-layout/src/layout.rs | 6 +- vortex-layout/src/layouts/chunked/mod.rs | 4 +- vortex-layout/src/layouts/chunked/reader.rs | 4 +- vortex-layout/src/layouts/dict/mod.rs | 4 +- vortex-layout/src/layouts/dict/reader.rs | 4 +- vortex-layout/src/layouts/flat/mod.rs | 4 +- vortex-layout/src/layouts/row_idx/mod.rs | 6 +- vortex-layout/src/layouts/struct_/mod.rs | 7 +- vortex-layout/src/layouts/struct_/reader.rs | 6 +- vortex-layout/src/layouts/zoned/mod.rs | 4 +- vortex-layout/src/layouts/zoned/reader.rs | 4 +- vortex-layout/src/reader.rs | 6 +- vortex-layout/src/session.rs | 9 +- vortex-layout/src/test.rs | 4 +- vortex-layout/src/vtable.rs | 4 +- vortex-python/src/lib.rs | 10 +- vortex-python/src/registry.rs | 25 ++-- vortex-scan/src/gpu/gpubuilder.rs | 6 +- vortex-scan/src/repeated_scan.rs | 6 +- vortex-scan/src/scan_builder.rs | 8 +- vortex-scan/src/test.rs | 4 +- vortex-session/src/lib.rs | 129 +++++++----------- vortex-session/src/registry.rs | 13 +- vortex-tui/src/main.rs | 7 +- vortex/examples/tracing_vortex.rs | 9 +- vortex/src/lib.rs | 29 +++- 56 files changed, 311 insertions(+), 280 deletions(-) diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index ea766a1076f..2e03d82964a 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -48,15 +48,16 @@ pub use datasets::file; pub use engines::df; use vortex::VortexSessionDefault; pub use vortex::error::vortex_panic; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; // All benchmarks run with mimalloc for consistency. #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; -pub static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_tokio()); +pub static SESSION: LazyLock = + LazyLock::new(|| VortexSession::new_with_defaults().with_tokio().freeze()); #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, Serialize)] pub struct Target { diff --git a/encodings/alp/src/alp/array.rs b/encodings/alp/src/alp/array.rs index d2812f32d9f..138dc50bf48 100644 --- a/encodings/alp/src/alp/array.rs +++ b/encodings/alp/src/alp/array.rs @@ -460,11 +460,12 @@ mod tests { use vortex_array::vtable::ValidityHelper; use vortex_dtype::PTypeDowncast; use vortex_session::VortexSession; + use vortex_session::VortexSessionRef; use vortex_vector::VectorOps; use super::*; - static SESSION: LazyLock = LazyLock::new(VortexSession::empty); + static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().freeze()); #[rstest] #[case(0)] diff --git a/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs b/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs index 7c6cb0eb0b3..c53a8379a8d 100644 --- a/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs +++ b/encodings/fastlanes/src/bitpacking/array/bitpack_decompress.rs @@ -211,6 +211,7 @@ mod tests { use vortex_buffer::buffer; use vortex_dtype::Nullability; use vortex_session::VortexSession; + use vortex_session::VortexSessionRef; use vortex_vector::VectorMutOps; use vortex_vector::VectorOps; @@ -218,7 +219,7 @@ mod tests { use crate::BitPackedVTable; use crate::bitpack_compress::bitpack_encode; - static SESSION: LazyLock = LazyLock::new(VortexSession::empty); + static SESSION: LazyLock = LazyLock::new(|| VortexSession::empty().freeze()); fn compression_roundtrip(n: usize) { let values = PrimitiveArray::from_iter((0..n).map(|i| (i % 2047) as u16)); diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index ad3904dc298..b3c8aa720a8 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -26,12 +26,16 @@ mod native_runtime { use vortex::VortexSessionDefault; use vortex_io::runtime::BlockingRuntime; use vortex_io::runtime::current::CurrentThreadRuntime; - use vortex_io::session::RuntimeSessionExt; + use vortex_io::session::RuntimeSessionMutExt; use vortex_session::VortexSession; + use vortex_session::VortexSessionRef; pub static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); - pub static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); + pub static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze() + }); } #[cfg(not(target_arch = "wasm32"))] @@ -46,10 +50,10 @@ mod wasm_runtime { use vortex::VortexSessionDefault; use vortex_io::runtime::wasm::WasmRuntime; use vortex_io::session::RuntimeSessionExt; - use vortex_session::VortexSession; + use vortex_session::VortexSessionRef; - pub static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(WasmRuntime::handle())); + pub static SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with_handle(WasmRuntime::handle())); } #[cfg(target_arch = "wasm32")] diff --git a/vortex-array/src/array/mod.rs b/vortex-array/src/array/mod.rs index 71df1db4c34..c269111c177 100644 --- a/vortex-array/src/array/mod.rs +++ b/vortex-array/src/array/mod.rs @@ -22,7 +22,7 @@ use vortex_error::vortex_ensure; use vortex_error::vortex_panic; use vortex_mask::Mask; use vortex_scalar::Scalar; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_vector::Vector; use vortex_vector::VectorOps; @@ -366,7 +366,7 @@ impl dyn Array + '_ { /// Execute the array and return the resulting vector. /// /// This entry-point function will choose an appropriate CPU-based execution strategy. - pub fn execute(&self, session: &VortexSession) -> VortexResult { + pub fn execute(&self, session: &VortexSessionRef) -> VortexResult { let mut ctx = ExecutionCtx::new(session.clone()); self.batch_execute(&mut ctx) } diff --git a/vortex-array/src/arrays/scalar_fn/vtable/mod.rs b/vortex-array/src/arrays/scalar_fn/vtable/mod.rs index 37a5cf643e8..3e0766bf764 100644 --- a/vortex-array/src/arrays/scalar_fn/vtable/mod.rs +++ b/vortex-array/src/arrays/scalar_fn/vtable/mod.rs @@ -19,6 +19,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_vector::Vector; use crate::Array; @@ -43,8 +44,8 @@ use crate::vtable::VTable; // TODO(ngates): canonicalize doesn't currently take a session, therefore we cannot dispatch // to registered scalar function kernels. We therefore hold our own non-pluggable session here // that contains all the built-in kernels while we migrate over to "execute" instead of canonicalize. -static SCALAR_FN_SESSION: LazyLock = - LazyLock::new(|| VortexSession::empty().with::()); +static SCALAR_FN_SESSION: LazyLock = + LazyLock::new(|| VortexSession::empty().with::().freeze()); vtable!(ScalarFn); diff --git a/vortex-array/src/execution/mod.rs b/vortex-array/src/execution/mod.rs index 0c20c86f901..5a39880174c 100644 --- a/vortex-array/src/execution/mod.rs +++ b/vortex-array/src/execution/mod.rs @@ -7,24 +7,24 @@ mod validity; pub use batch::*; pub use mask::*; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; /// Execution context for batch array compute. // NOTE(ngates): This context will eventually hold cached resources for execution, such as CSE // nodes, and may well eventually support a type-map interface for arrays to stash arbitrary // execution-related data. pub struct ExecutionCtx { - session: VortexSession, + session: VortexSessionRef, } impl ExecutionCtx { /// Create a new execution context with the given session. - pub(crate) fn new(session: VortexSession) -> Self { + pub(crate) fn new(session: VortexSessionRef) -> Self { Self { session } } /// Get the session associated with this execution context. - pub fn session(&self) -> &VortexSession { + pub fn session(&self) -> &VortexSessionRef { &self.session } } diff --git a/vortex-array/src/expr/functions/session.rs b/vortex-array/src/expr/functions/session.rs index a596f041d95..3bde37d7ad1 100644 --- a/vortex-array/src/expr/functions/session.rs +++ b/vortex-array/src/expr/functions/session.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::registry::Registry; @@ -19,7 +18,7 @@ impl FunctionSession { } pub trait ScalarFuncSessionExt: SessionExt { - fn functions(&self) -> Ref<'_, FunctionSession> { + fn functions(&self) -> &FunctionSession { self.get::() } } diff --git a/vortex-array/src/expr/session/mod.rs b/vortex-array/src/expr/session/mod.rs index d2f8be2bd30..9b1e44180f8 100644 --- a/vortex-array/src/expr/session/mod.rs +++ b/vortex-array/src/expr/session/mod.rs @@ -4,7 +4,6 @@ mod rewrite; pub use rewrite::RewriteRuleRegistry; -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::registry::Registry; @@ -54,12 +53,12 @@ impl ExprSession { } /// Register an expression vtable in the session, replacing any existing vtable with the same ID. - pub fn register(&self, expr: ExprVTable) { + pub fn register(&mut self, expr: ExprVTable) { self.registry.register(expr) } /// Register expression vtables in the session, replacing any existing vtables with the same IDs. - pub fn register_many(&self, exprs: impl IntoIterator) { + pub fn register_many(&mut self, exprs: impl IntoIterator) { self.registry.register_many(exprs); } @@ -151,7 +150,7 @@ impl ExprSession { impl Default for ExprSession { fn default() -> Self { - let expressions = ExprRegistry::default(); + let mut expressions = ExprRegistry::default(); // Register built-in expressions here if needed. expressions.register_many([ @@ -187,7 +186,7 @@ impl Default for ExprSession { /// Extension trait for accessing expression session data. pub trait ExprSessionExt: SessionExt { /// Returns the expression vtable registry. - fn expressions(&self) -> Ref<'_, ExprSession> { + fn expressions(&self) -> &ExprSession { self.get::() } } diff --git a/vortex-array/src/session/mod.rs b/vortex-array/src/session/mod.rs index e214904d7ee..4b5bd196e6c 100644 --- a/vortex-array/src/session/mod.rs +++ b/vortex-array/src/session/mod.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::registry::Registry; @@ -52,19 +51,19 @@ impl ArraySession { } /// Register a new array encoding, replacing any existing encoding with the same ID. - pub fn register(&self, encoding: ArrayVTable) { + pub fn register(&mut self, encoding: ArrayVTable) { self.registry.register(encoding) } /// Register many array encodings, replacing any existing encodings with the same ID. - pub fn register_many(&self, encodings: impl IntoIterator) { + pub fn register_many(&mut self, encodings: impl IntoIterator) { self.registry.register_many(encodings); } } impl Default for ArraySession { fn default() -> Self { - let encodings = ArrayRegistry::default(); + let mut encodings = ArrayRegistry::default(); // Register the canonical encodings. encodings.register_many([ @@ -108,7 +107,7 @@ impl Default for ArraySession { /// Session data for Vortex arrays. pub trait ArraySessionExt: SessionExt { /// Returns the array encoding registry. - fn arrays(&self) -> Ref<'_, ArraySession> { + fn arrays(&self) -> &ArraySession { self.get::() } } diff --git a/vortex-cxx/src/lib.rs b/vortex-cxx/src/lib.rs index 1cc63071ebc..663b37fa1e2 100644 --- a/vortex-cxx/src/lib.rs +++ b/vortex-cxx/src/lib.rs @@ -17,8 +17,9 @@ use scalar::*; use vortex::VortexSessionDefault; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use write::*; /// By default, the C++ API uses a current-thread runtime, providing control of the threading @@ -28,8 +29,11 @@ use write::*; // this runtime. pub(crate) static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); -pub(crate) static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +pub(crate) static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze() +}); #[cxx::bridge(namespace = "vortex::ffi")] #[allow(let_underscore_drop)] diff --git a/vortex-datafusion/examples/vortex_table.rs b/vortex-datafusion/examples/vortex_table.rs index 6c49bb68026..6867ce15cab 100644 --- a/vortex-datafusion/examples/vortex_table.rs +++ b/vortex-datafusion/examples/vortex_table.rs @@ -19,13 +19,13 @@ use vortex::array::validity::Validity; use vortex::buffer::buffer; use vortex::error::vortex_err; use vortex::file::WriteOptionsSessionExt; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; use vortex_datafusion::VortexFormat; #[tokio::main] async fn main() -> anyhow::Result<()> { - let session = VortexSession::default().with_tokio(); + let session = VortexSession::new_with_defaults().with_tokio().freeze(); let temp_dir = tempdir()?; let strings = ChunkedArray::from_iter([ diff --git a/vortex-datafusion/src/persistent/cache.rs b/vortex-datafusion/src/persistent/cache.rs index 8f8220e470f..b3fb29f7b47 100644 --- a/vortex-datafusion/src/persistent/cache.rs +++ b/vortex-datafusion/src/persistent/cache.rs @@ -25,14 +25,14 @@ use vortex::file::VortexFile; use vortex::layout::segments::SegmentCache; use vortex::layout::segments::SegmentId; use vortex::metrics::MetricsSessionExt; -use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use vortex::utils::aliases::DefaultHashBuilder; #[derive(Clone)] pub(crate) struct VortexFileCache { file_cache: Cache, segment_cache: Cache, - session: VortexSession, + session: VortexSessionRef, } /// Cache key for a [`VortexFile`]. @@ -59,7 +59,7 @@ struct SegmentKey { } impl VortexFileCache { - pub fn new(size_mb: usize, segment_size_mb: usize, session: VortexSession) -> Self { + pub fn new(size_mb: usize, segment_size_mb: usize, session: VortexSessionRef) -> Self { let file_cache = Cache::builder() .max_capacity(size_mb as u64 * (1 << 20)) .eviction_listener(|k: Arc, _v: VortexFile, cause| { diff --git a/vortex-datafusion/src/persistent/format.rs b/vortex-datafusion/src/persistent/format.rs index f8a64c0320c..36d2811183f 100644 --- a/vortex-datafusion/src/persistent/format.rs +++ b/vortex-datafusion/src/persistent/format.rs @@ -54,6 +54,7 @@ use vortex::expr::stats::Stat; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::scalar::Scalar; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use super::cache::VortexFileCache; use super::sink::VortexSink; @@ -63,7 +64,7 @@ use crate::convert::TryToDataFusion; /// Vortex implementation of a DataFusion [`FileFormat`]. pub struct VortexFormat { - session: VortexSession, + session: VortexSessionRef, file_cache: VortexFileCache, opts: VortexOptions, } @@ -95,7 +96,7 @@ impl Eq for VortexOptions {} /// Minimal factory to create [`VortexFormat`] instances. #[derive(Debug)] pub struct VortexFormatFactory { - session: VortexSession, + session: VortexSessionRef, options: Option, } @@ -106,7 +107,7 @@ impl GetExt for VortexFormatFactory { } impl VortexFormatFactory { - /// Creates a new instance with a default [`VortexSession`] and default options. + /// Creates a new instance with a default [`VortexSessionRef`] and default options. #[expect( clippy::new_without_default, reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing" @@ -121,7 +122,7 @@ impl VortexFormatFactory { /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory. /// /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`]. - pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self { + pub fn new_with_options(session: VortexSessionRef, options: VortexOptions) -> Self { Self { session, options: Some(options), @@ -175,12 +176,12 @@ impl FileFormatFactory for VortexFormatFactory { impl VortexFormat { /// Create a new instance with default options. - pub fn new(session: VortexSession) -> Self { + pub fn new(session: VortexSessionRef) -> Self { Self::new_with_options(session, VortexOptions::default()) } /// Creates a new instance with configured by a [`VortexOptions`]. - pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self { + pub fn new_with_options(session: VortexSessionRef, opts: VortexOptions) -> Self { Self { session: session.clone(), file_cache: VortexFileCache::new( diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index f7ccd43e03b..84e70573a5b 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -40,7 +40,7 @@ use vortex::expr::select; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; use vortex::scan::ScanBuilder; -use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; @@ -50,7 +50,7 @@ use crate::convert::exprs::make_vortex_predicate; #[derive(Clone)] pub(crate) struct VortexOpener { - pub session: VortexSession, + pub session: VortexSessionRef, pub object_store: Arc, /// Projection by index of the file's columns pub projection: Option>, @@ -437,10 +437,11 @@ mod tests { use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; use vortex::session::VortexSession; + use vortex::session::VortexSessionRef; use super::*; - static SESSION: LazyLock = LazyLock::new(VortexSession::default); + static SESSION: LazyLock = LazyLock::new(VortexSession::default); #[rstest] #[case(0..100, 100, 100, 0..100)] diff --git a/vortex-datafusion/src/persistent/sink.rs b/vortex-datafusion/src/persistent/sink.rs index c5cdea8ee7f..b93513acec6 100644 --- a/vortex-datafusion/src/persistent/sink.rs +++ b/vortex-datafusion/src/persistent/sink.rs @@ -35,16 +35,16 @@ use vortex::error::VortexResult; use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; -use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; pub struct VortexSink { config: FileSinkConfig, schema: SchemaRef, - session: VortexSession, + session: VortexSessionRef, } impl VortexSink { - pub fn new(config: FileSinkConfig, schema: SchemaRef, session: VortexSession) -> Self { + pub fn new(config: FileSinkConfig, schema: SchemaRef, session: VortexSessionRef) -> Self { Self { config, schema, diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index d42436eb652..4e4a9db16b6 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -32,7 +32,7 @@ use vortex::error::VortexExpect as _; use vortex::file::VORTEX_FILE_EXTENSION; use vortex::layout::LayoutReader; use vortex::metrics::MetricsSessionExt; -use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use vortex_utils::aliases::dash_map::DashMap; use super::cache::VortexFileCache; @@ -45,7 +45,7 @@ use crate::convert::exprs::can_be_pushed_down; /// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec #[derive(Clone)] pub struct VortexSource { - pub(crate) session: VortexSession, + pub(crate) session: VortexSessionRef, pub(crate) file_cache: VortexFileCache, /// Combined predicate expression containing all filters from DataFusion query planning. /// Used with FilePruner to skip files based on statistics and partition values. @@ -67,7 +67,7 @@ pub struct VortexSource { } impl VortexSource { - pub(crate) fn new(session: VortexSession, file_cache: VortexFileCache) -> Self { + pub(crate) fn new(session: VortexSessionRef, file_cache: VortexFileCache) -> Self { Self { session, file_cache, diff --git a/vortex-duckdb/src/lib.rs b/vortex-duckdb/src/lib.rs index 5d12cf19b8e..d1667bb4a99 100644 --- a/vortex-duckdb/src/lib.rs +++ b/vortex-duckdb/src/lib.rs @@ -12,8 +12,9 @@ use vortex::error::VortexExpect; use vortex::error::VortexResult; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use crate::copy::VortexCopyFunction; use crate::duckdb::Config; @@ -41,8 +42,11 @@ mod e2e_test; // A global runtime for Vortex operations within DuckDB. static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); -static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze() +}); /// Register Vortex extension configuration options with DuckDB. /// This must be called before `register_table_functions` to take effect. diff --git a/vortex-ffi/examples/hello_vortex.rs b/vortex-ffi/examples/hello_vortex.rs index c6ca24d9b8f..bdf37f1d4e5 100644 --- a/vortex-ffi/examples/hello_vortex.rs +++ b/vortex-ffi/examples/hello_vortex.rs @@ -30,12 +30,16 @@ use vortex::file::WriteOptionsSessionExt; use vortex::io::VortexWrite; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::CurrentThreadRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; static RUNTIME: LazyLock = LazyLock::new(CurrentThreadRuntime::new); -static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze() +}); const BIN_NAME: &str = "hello_vortex"; diff --git a/vortex-ffi/src/file.rs b/vortex-ffi/src/file.rs index e38671b4716..1011cbb08b8 100644 --- a/vortex-ffi/src/file.rs +++ b/vortex-ffi/src/file.rs @@ -42,7 +42,7 @@ use vortex::io::runtime::BlockingRuntime; use vortex::proto::expr::Expr; use vortex::scan::ScanBuilder; use vortex::scan::SplitBy; -use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use crate::RUNTIME; use crate::arc_wrapper; @@ -126,7 +126,7 @@ impl vx_file_scan_options { /// Processes FFI scan options. /// /// Extracts and converts a scan configuration from an FFI options struct. - fn process_scan_options(&self, session: &VortexSession) -> VortexResult { + fn process_scan_options(&self, session: &VortexSessionRef) -> VortexResult { // Extract field names for projection. let projection_expr = extract_expression( session.expressions().registry(), diff --git a/vortex-ffi/src/session.rs b/vortex-ffi/src/session.rs index 8950abb90d4..ecc7cfb511b 100644 --- a/vortex-ffi/src/session.rs +++ b/vortex-ffi/src/session.rs @@ -3,15 +3,16 @@ use vortex::VortexSessionDefault; use vortex::io::runtime::BlockingRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use crate::RUNTIME; use crate::box_wrapper; box_wrapper!( /// A handle to a Vortex session. - VortexSession, + VortexSessionRef, vx_session ); @@ -21,6 +22,8 @@ box_wrapper!( #[unsafe(no_mangle)] pub unsafe extern "C-unwind" fn vx_session_new() -> *mut vx_session { vx_session::new(Box::new( - VortexSession::default().with_handle(RUNTIME.handle()), + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze(), )) } diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index 83249667ec1..5e219eccdea 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -25,7 +25,7 @@ use vortex_layout::segments::SegmentSource; use vortex_metrics::VortexMetrics; use vortex_scan::ScanBuilder; use vortex_scan::SplitBy; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_utils::aliases::hash_map::HashMap; use crate::footer::Footer; @@ -45,7 +45,7 @@ pub struct VortexFile { /// Metrics tied to the file. pub(crate) metrics: VortexMetrics, /// The Vortex session used to open this file - pub(crate) session: VortexSession, + pub(crate) session: VortexSessionRef, } impl VortexFile { diff --git a/vortex-file/src/footer/deserializer.rs b/vortex-file/src/footer/deserializer.rs index 144355d9163..a87bfa7ee1e 100644 --- a/vortex-file/src/footer/deserializer.rs +++ b/vortex-file/src/footer/deserializer.rs @@ -12,7 +12,7 @@ use vortex_error::vortex_err; use vortex_flatbuffers::FlatBuffer; use vortex_flatbuffers::ReadFlatBuffer; use vortex_flatbuffers::dtype as fbd; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::EOF_SIZE; use crate::Footer; @@ -30,7 +30,7 @@ pub struct FooterDeserializer { // the caller. buffer: ByteBuffer, // The session to use for deserialization. - session: VortexSession, + session: VortexSessionRef, // The DType, if provided externally. dtype: Option, @@ -43,7 +43,7 @@ pub struct FooterDeserializer { } impl FooterDeserializer { - pub(super) fn new(initial_read: ByteBuffer, session: VortexSession) -> Self { + pub(super) fn new(initial_read: ByteBuffer, session: VortexSessionRef) -> Self { Self { buffer: initial_read, session, diff --git a/vortex-file/src/footer/mod.rs b/vortex-file/src/footer/mod.rs index 62724b27c36..ec53feb1c42 100644 --- a/vortex-file/src/footer/mod.rs +++ b/vortex-file/src/footer/mod.rs @@ -38,7 +38,7 @@ use vortex_layout::LayoutContext; use vortex_layout::LayoutRef; use vortex_layout::layout_from_flatbuffer; use vortex_layout::session::LayoutSessionExt; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; /// Captures the layout information of a Vortex file. #[derive(Debug, Clone)] @@ -71,7 +71,7 @@ impl Footer { layout_bytes: FlatBuffer, dtype: DType, statistics: Option, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { let fb_footer = root::(&footer_bytes)?; @@ -145,7 +145,7 @@ impl Footer { } /// Create a deserializer for a Vortex file footer. - pub fn deserializer(eof_buffer: ByteBuffer, session: VortexSession) -> FooterDeserializer { + pub fn deserializer(eof_buffer: ByteBuffer, session: VortexSessionRef) -> FooterDeserializer { FooterDeserializer::new(eof_buffer, session) } } diff --git a/vortex-file/src/lib.rs b/vortex-file/src/lib.rs index 8fd03b26470..d3d2e9837ae 100644 --- a/vortex-file/src/lib.rs +++ b/vortex-file/src/lib.rs @@ -109,7 +109,7 @@ pub use strategy::*; use vortex_alp::ALPRDVTable; use vortex_alp::ALPVTable; use vortex_array::arrays::DictVTable; -use vortex_array::session::ArraySessionExt; +use vortex_array::session::ArraySession; use vortex_array::vtable::ArrayVTableExt; use vortex_bytebool::ByteBoolVTable; use vortex_datetime_parts::DateTimePartsVTable; @@ -163,8 +163,8 @@ mod forever_constant { /// /// NOTE: this function will be changed in the future to encapsulate logic for using different /// Vortex "Editions" that may support different sets of encodings. -pub fn register_default_encodings(session: &VortexSession) { - session.arrays().register_many([ +pub fn register_default_encodings(session: &mut VortexSession) { + session.get_mut::().register_many([ ALPVTable.as_vtable(), ALPRDVTable.as_vtable(), BitPackedVTable.as_vtable(), diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 1ee6d08feb2..ed12b5d46db 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use futures::executor::block_on; use parking_lot::RwLock; -use vortex_array::session::ArraySessionExt; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; @@ -22,10 +21,9 @@ use vortex_layout::segments::SegmentCacheMetrics; use vortex_layout::segments::SegmentCacheSourceAdapter; use vortex_layout::segments::SegmentId; use vortex_layout::segments::SharedSegmentSource; -use vortex_layout::session::LayoutSessionExt; use vortex_metrics::MetricsSessionExt; use vortex_metrics::VortexMetrics; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_utils::aliases::hash_map::HashMap; use crate::DeserializeStep; @@ -41,7 +39,7 @@ const INITIAL_READ_SIZE: usize = 1 << 20; // 1 MB /// Open options for a Vortex file reader. pub struct VortexOpenOptions { /// The session to use for opening the file. - session: VortexSession, + session: VortexSessionRef, /// Cache to use for file segments. segment_cache: Arc, /// The number of bytes to read when parsing the footer. @@ -58,13 +56,15 @@ pub struct VortexOpenOptions { metrics: VortexMetrics, } -pub trait OpenOptionsSessionExt: - ArraySessionExt + LayoutSessionExt + MetricsSessionExt + RuntimeSessionExt -{ +pub trait OpenOptionsSessionExt { /// Create a new [`VortexOpenOptions`] using the provided session to open a file. + fn open_options(&self) -> VortexOpenOptions; +} + +impl OpenOptionsSessionExt for VortexSessionRef { fn open_options(&self) -> VortexOpenOptions { VortexOpenOptions { - session: self.session(), + session: self.clone(), segment_cache: Arc::new(NoOpSegmentCache), initial_read_size: INITIAL_READ_SIZE, file_size: None, @@ -75,10 +75,6 @@ pub trait OpenOptionsSessionExt: } } } -impl - OpenOptionsSessionExt for S -{ -} impl VortexOpenOptions { /// Configure the initial read size for the Vortex file. diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index de616355e9d..70cdb1c7890 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -63,6 +63,7 @@ use vortex_metrics::VortexMetrics; use vortex_scalar::Scalar; use vortex_scan::ScanBuilder; use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::OpenOptionsSessionExt; use crate::V1_FOOTER_FBS_SIZE; @@ -70,17 +71,17 @@ use crate::VERSION; use crate::VortexFile; use crate::WriteOptionsSessionExt; -static SESSION: LazyLock = LazyLock::new(|| { - let session = VortexSession::empty() +static SESSION: LazyLock = LazyLock::new(|| { + let mut session = VortexSession::empty() .with::() .with::() .with::() .with::() .with::(); - crate::register_default_encodings(&session); + crate::register_default_encodings(&mut session); - session + session.freeze() }); #[tokio::test] diff --git a/vortex-file/src/writer.rs b/vortex-file/src/writer.rs index 50c109438ea..07d91d15129 100644 --- a/vortex-file/src/writer.rs +++ b/vortex-file/src/writer.rs @@ -41,8 +41,7 @@ use vortex_layout::layouts::file_stats::accumulate_stats; use vortex_layout::sequence::SequenceId; use vortex_layout::sequence::SequentialStreamAdapter; use vortex_layout::sequence::SequentialStreamExt; -use vortex_session::SessionExt; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::Footer; use crate::MAGIC_BYTES; @@ -57,18 +56,23 @@ use crate::segments::writer::BufferedSegmentSink; /// Unless overridden, the default [write strategy][crate::WriteStrategyBuilder] will be used with no /// additional configuration. pub struct VortexWriteOptions { - session: VortexSession, + session: VortexSessionRef, strategy: Arc, exclude_dtype: bool, max_variable_length_statistics_size: usize, file_statistics: Vec, } -pub trait WriteOptionsSessionExt: SessionExt { +pub trait WriteOptionsSessionExt { + /// Create [`VortexWriteOptions`] for writing to a Vortex file. + fn write_options(&self) -> VortexWriteOptions; +} + +impl WriteOptionsSessionExt for VortexSessionRef { /// Create [`VortexWriteOptions`] for writing to a Vortex file. fn write_options(&self) -> VortexWriteOptions { VortexWriteOptions { - session: self.session(), + session: self.clone(), strategy: WriteStrategyBuilder::new().build(), exclude_dtype: false, file_statistics: PRUNING_STATS.to_vec(), @@ -76,11 +80,10 @@ pub trait WriteOptionsSessionExt: SessionExt { } } } -impl WriteOptionsSessionExt for S {} impl VortexWriteOptions { /// Create a new [`VortexWriteOptions`] with the given session. - pub fn new(session: VortexSession) -> Self { + pub fn new(session: VortexSessionRef) -> Self { VortexWriteOptions { session, strategy: WriteStrategyBuilder::new().build(), diff --git a/vortex-io/src/session.rs b/vortex-io/src/session.rs index 38500d248a0..d2618aef29f 100644 --- a/vortex-io/src/session.rs +++ b/vortex-io/src/session.rs @@ -5,6 +5,7 @@ use std::fmt::Debug; use vortex_error::VortexExpect; use vortex_session::SessionExt; +use vortex_session::VortexSession; use crate::runtime::Handle; @@ -13,6 +14,8 @@ pub struct RuntimeSession { handle: Option, } +impl RuntimeSession {} + impl Default for RuntimeSession { fn default() -> Self { Self { @@ -27,30 +30,40 @@ impl Debug for RuntimeSession { } } -/// Extension trait for accessing runtime session data. -pub trait RuntimeSessionExt: SessionExt { - /// Returns a handle for this session's runtime. - fn handle(&self) -> Handle { - self.get::().handle - .as_ref() - .vortex_expect("Runtime handle not configured in Vortex session. Please setup a `CurrentThreadRuntime`, or configure the session for `with_tokio`.") - .clone() - } - +/// Extension trait for setting runtime session data. +pub trait RuntimeSessionMutExt: Sized { /// Configure the runtime session to use the application's Tokio runtime. /// /// For example, if the application is launched using `#[tokio::main]`. #[cfg(feature = "tokio")] - fn with_tokio(self) -> Self { + fn with_tokio(self) -> Self; + + /// Configure the runtime session to use a specific Vortex runtime handle. + fn with_handle(self, handle: Handle) -> Self; +} + +impl RuntimeSessionMutExt for VortexSession { + #[cfg(feature = "tokio")] + fn with_tokio(mut self) -> Self { self.get_mut::().handle = Some(crate::runtime::tokio::TokioRuntime::current()); self } - /// Configure the runtime session to use a specific Vortex runtime handle. - fn with_handle(self, handle: Handle) -> Self { + fn with_handle(mut self, handle: Handle) -> Self { self.get_mut::().handle = Some(handle); self } } + +/// Extension trait for accessing runtime session data. +pub trait RuntimeSessionExt: SessionExt { + /// Returns a handle for this session's runtime. + fn handle(&self) -> Handle { + self.get::().handle + .as_ref() + .vortex_expect("Runtime handle not configured in Vortex session. Please setup a `CurrentThreadRuntime`, or configure the session for `with_tokio`.") + .clone() + } +} impl RuntimeSessionExt for S {} diff --git a/vortex-jni/src/lib.rs b/vortex-jni/src/lib.rs index f1f513a491f..2b1439181ca 100644 --- a/vortex-jni/src/lib.rs +++ b/vortex-jni/src/lib.rs @@ -10,8 +10,9 @@ use vortex::VortexSessionDefault; use vortex::error::VortexExpect; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::tokio::TokioRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; macro_rules! throw_runtime { ($($tt:tt)*) => { @@ -38,5 +39,8 @@ static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { static RUNTIME: LazyLock = LazyLock::new(|| TokioRuntime::from(TOKIO_RUNTIME.handle().clone())); /// Shared Vortex session for the JNI instance. -static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze() +}); diff --git a/vortex-layout/src/layout.rs b/vortex-layout/src/layout.rs index 368d7c6bdc8..2fe9302a7a0 100644 --- a/vortex-layout/src/layout.rs +++ b/vortex-layout/src/layout.rs @@ -15,7 +15,7 @@ use vortex_dtype::FieldName; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutEncodingId; use crate::LayoutEncodingRef; @@ -74,7 +74,7 @@ pub trait Layout: 'static + Send + Sync + Debug + private::Sealed { &self, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult; } @@ -327,7 +327,7 @@ impl Layout for LayoutAdapter { &self, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { V::new_reader(&self.0, name, segment_source, session) } diff --git a/vortex-layout/src/layouts/chunked/mod.rs b/vortex-layout/src/layouts/chunked/mod.rs index 3691923f360..428f865d3d0 100644 --- a/vortex-layout/src/layouts/chunked/mod.rs +++ b/vortex-layout/src/layouts/chunked/mod.rs @@ -11,7 +11,7 @@ use vortex_array::DeserializeMetadata; use vortex_array::EmptyMetadata; use vortex_dtype::DType; use vortex_error::VortexResult; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutChildType; use crate::LayoutEncodingRef; @@ -72,7 +72,7 @@ impl VTable for ChunkedVTable { layout: &Self::Layout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { Ok(Arc::new(ChunkedReader::new( layout.clone(), diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index d5bc645ff94..e0ae5a12df8 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -20,7 +20,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_panic; use vortex_mask::Mask; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutReaderRef; use crate::LazyReaderChildren; @@ -42,7 +42,7 @@ impl ChunkedReader { layout: ChunkedLayout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> Self { let nchildren = layout.nchildren(); diff --git a/vortex-layout/src/layouts/dict/mod.rs b/vortex-layout/src/layouts/dict/mod.rs index c7866faab87..220adeb193b 100644 --- a/vortex-layout/src/layouts/dict/mod.rs +++ b/vortex-layout/src/layouts/dict/mod.rs @@ -17,7 +17,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutChildType; use crate::LayoutEncodingRef; @@ -89,7 +89,7 @@ impl VTable for DictVTable { layout: &Self::Layout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { Ok(Arc::new(DictReader::try_new( layout.clone(), diff --git a/vortex-layout/src/layouts/dict/reader.rs b/vortex-layout/src/layouts/dict/reader.rs index 02487311a90..bec2e2160db 100644 --- a/vortex-layout/src/layouts/dict/reader.rs +++ b/vortex-layout/src/layouts/dict/reader.rs @@ -27,7 +27,7 @@ use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_mask::Mask; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_utils::aliases::dash_map::DashMap; use super::DictLayout; @@ -57,7 +57,7 @@ impl DictReader { layout: DictLayout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { let values_len = usize::try_from(layout.values.row_count())?; let values = layout.values.new_reader( diff --git a/vortex-layout/src/layouts/flat/mod.rs b/vortex-layout/src/layouts/flat/mod.rs index 7e5c7da3147..9dd75c7cd15 100644 --- a/vortex-layout/src/layouts/flat/mod.rs +++ b/vortex-layout/src/layouts/flat/mod.rs @@ -15,7 +15,7 @@ use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutChildType; use crate::LayoutEncodingRef; @@ -84,7 +84,7 @@ impl VTable for FlatVTable { layout: &Self::Layout, name: Arc, segment_source: Arc, - _session: &VortexSession, + _session: &VortexSessionRef, ) -> VortexResult { Ok(Arc::new(FlatReader::new( layout.clone(), diff --git a/vortex-layout/src/layouts/row_idx/mod.rs b/vortex-layout/src/layouts/row_idx/mod.rs index 1160c91fc04..3b2f82247c3 100644 --- a/vortex-layout/src/layouts/row_idx/mod.rs +++ b/vortex-layout/src/layouts/row_idx/mod.rs @@ -37,7 +37,7 @@ use vortex_error::VortexResult; use vortex_mask::Mask; use vortex_scalar::PValue; use vortex_sequence::SequenceArray; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_utils::aliases::dash_map::DashMap; use crate::ArrayFuture; @@ -53,8 +53,8 @@ pub struct RowIdxLayoutReader { } impl RowIdxLayoutReader { - pub fn new(row_offset: u64, child: Arc, session: &VortexSession) -> Self { - let expr_optimizer = ExprOptimizer::new(&session.expressions()); + pub fn new(row_offset: u64, child: Arc, session: &VortexSessionRef) -> Self { + let expr_optimizer = ExprOptimizer::new(session.expressions()); Self { name: child.name().clone(), row_offset, diff --git a/vortex-layout/src/layouts/struct_/mod.rs b/vortex-layout/src/layouts/struct_/mod.rs index bcc1e77449e..f879336df6c 100644 --- a/vortex-layout/src/layouts/struct_/mod.rs +++ b/vortex-layout/src/layouts/struct_/mod.rs @@ -20,8 +20,7 @@ use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_ensure; use vortex_error::vortex_err; -use vortex_session::SessionExt; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutChildType; use crate::LayoutEncodingRef; @@ -114,13 +113,13 @@ impl VTable for StructVTable { layout: &Self::Layout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { Ok(Arc::new(StructReader::try_new( layout.clone(), name, segment_source, - session.session(), + session.clone(), )?)) } diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 342c78832d4..a24f951d75a 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -36,7 +36,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_mask::Mask; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::hash_map::HashMap; @@ -68,7 +68,7 @@ impl StructReader { layout: StructLayout, name: Arc, segment_source: Arc, - session: VortexSession, + session: VortexSessionRef, ) -> VortexResult { let struct_dt = layout.struct_fields(); @@ -106,7 +106,7 @@ impl StructReader { let expanded_root_expr = replace_root_fields(root(), struct_dt); // Create the expression optimizer once during construction - let expr_optimizer = ExprOptimizer::new(&session.expressions()); + let expr_optimizer = ExprOptimizer::new(session.expressions()); // This is where we need to do some complex things with the scan in order to split it into // different scans for different fields. diff --git a/vortex-layout/src/layouts/zoned/mod.rs b/vortex-layout/src/layouts/zoned/mod.rs index e0105d306ad..c6d1d8914bb 100644 --- a/vortex-layout/src/layouts/zoned/mod.rs +++ b/vortex-layout/src/layouts/zoned/mod.rs @@ -24,7 +24,7 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_panic; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::LayoutChildType; use crate::LayoutEncodingRef; @@ -101,7 +101,7 @@ impl VTable for ZonedVTable { layout: &Self::Layout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult { Ok(Arc::new(ZonedReader::try_new( layout.clone(), diff --git a/vortex-layout/src/layouts/zoned/reader.rs b/vortex-layout/src/layouts/zoned/reader.rs index d33ae9918d6..22b44ed589e 100644 --- a/vortex-layout/src/layouts/zoned/reader.rs +++ b/vortex-layout/src/layouts/zoned/reader.rs @@ -31,7 +31,7 @@ use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_mask::Mask; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use vortex_utils::aliases::dash_map::DashMap; use crate::LayoutReader; @@ -66,7 +66,7 @@ impl ZonedReader { layout: ZonedLayout, name: Arc, segment_source: Arc, - session: VortexSession, + session: VortexSessionRef, ) -> VortexResult { let dtypes = vec![ layout.dtype.clone(), diff --git a/vortex-layout/src/reader.rs b/vortex-layout/src/reader.rs index e26e33372e5..a7371ea598d 100644 --- a/vortex-layout/src/reader.rs +++ b/vortex-layout/src/reader.rs @@ -16,7 +16,7 @@ use vortex_dtype::FieldMask; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_mask::Mask; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::children::LayoutChildren; use crate::segments::SegmentSource; @@ -108,7 +108,7 @@ pub struct LazyReaderChildren { dtypes: Vec, names: Vec>, segment_source: Arc, - session: VortexSession, + session: VortexSessionRef, // TODO(ngates): we may want a hash map of some sort here? cache: Vec>, } @@ -119,7 +119,7 @@ impl LazyReaderChildren { dtypes: Vec, names: Vec>, segment_source: Arc, - session: VortexSession, + session: VortexSessionRef, ) -> Self { let nchildren = children.nchildren(); let cache = (0..nchildren).map(|_| OnceCell::new()).collect(); diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 0250a855084..d3d02234753 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use vortex_session::Ref; use vortex_session::SessionExt; use vortex_session::registry::Registry; @@ -22,12 +21,12 @@ pub struct LayoutSession { impl LayoutSession { /// Register a layout encoding in the session, replacing any existing encoding with the same ID. - pub fn register(&self, layout: LayoutEncodingRef) { + pub fn register(&mut self, layout: LayoutEncodingRef) { self.registry.register(layout); } /// Register layout encodings in the session, replacing any existing encodings with the same IDs. - pub fn register_many(&self, layouts: impl IntoIterator) { + pub fn register_many(&mut self, layouts: impl IntoIterator) { self.registry.register_many(layouts); } @@ -39,7 +38,7 @@ impl LayoutSession { impl Default for LayoutSession { fn default() -> Self { - let layouts = LayoutRegistry::default(); + let mut layouts = LayoutRegistry::default(); // Register the built-in layout encodings. layouts.register_many([ @@ -57,7 +56,7 @@ impl Default for LayoutSession { /// Extension trait for accessing layout session data. pub trait LayoutSessionExt: SessionExt { /// Returns the layout encoding registry. - fn layouts(&self) -> Ref<'_, LayoutSession> { + fn layouts(&self) -> &LayoutSession { self.get::() } } diff --git a/vortex-layout/src/test.rs b/vortex-layout/src/test.rs index 47ebfa594eb..a44b81651e2 100644 --- a/vortex-layout/src/test.rs +++ b/vortex-layout/src/test.rs @@ -8,14 +8,16 @@ use vortex_array::session::ArraySession; use vortex_io::session::RuntimeSession; use vortex_metrics::VortexMetrics; use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::session::LayoutSession; -pub static SESSION: LazyLock = LazyLock::new(|| { +pub static SESSION: LazyLock = LazyLock::new(|| { VortexSession::empty() .with::() .with::() .with::() .with::() .with::() + .freeze() }); diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index f94195f6ceb..f26ffbab8bc 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -10,7 +10,7 @@ use vortex_array::DeserializeMetadata; use vortex_array::SerializeMetadata; use vortex_dtype::DType; use vortex_error::VortexResult; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::IntoLayout; use crate::Layout; @@ -61,7 +61,7 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { layout: &Self::Layout, name: Arc, segment_source: Arc, - session: &VortexSession, + session: &VortexSessionRef, ) -> VortexResult; #[cfg(gpu_unstable)] diff --git a/vortex-python/src/lib.rs b/vortex-python/src/lib.rs index 48f0f59f083..a0b4ecbce44 100644 --- a/vortex-python/src/lib.rs +++ b/vortex-python/src/lib.rs @@ -32,8 +32,9 @@ use vortex::error::VortexError; use vortex::error::VortexExpect as _; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::tokio::TokioRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { Runtime::new() @@ -42,8 +43,11 @@ static TOKIO_RUNTIME: LazyLock = LazyLock::new(|| { }); static RUNTIME: LazyLock = LazyLock::new(|| TokioRuntime::new(TOKIO_RUNTIME.handle().clone())); -static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_handle(RUNTIME.handle())); +static SESSION: LazyLock = LazyLock::new(|| { + VortexSession::new_with_defaults() + .with_handle(RUNTIME.handle()) + .freeze() +}); /// Vortex is an Apache Arrow-compatible toolkit for working with compressed array data. #[pymodule] diff --git a/vortex-python/src/registry.rs b/vortex-python/src/registry.rs index 2bea754f0e9..59153f77039 100644 --- a/vortex-python/src/registry.rs +++ b/vortex-python/src/registry.rs @@ -5,11 +5,7 @@ use pyo3::Bound; use pyo3::PyResult; use pyo3::Python; use pyo3::prelude::*; -use vortex::array::session::ArraySessionExt; -use vortex::array::vtable::ArrayVTableExt; -use crate::SESSION; -use crate::arrays::py::PythonVTable; use crate::install_module; /// Register serde functions and classes. @@ -18,17 +14,18 @@ pub(crate) fn init(py: Python, parent: &Bound) -> PyResult<()> { parent.add_submodule(&m)?; install_module("vortex._lib.registry", &m)?; - m.add_function(wrap_pyfunction!(register, &m)?)?; + // m.add_function(wrap_pyfunction!(register, &m)?)?; Ok(()) } -/// Register an array encoding implemented by subclassing `PyArray`. -/// -/// It's not currently possible to register a layout encoding from Python. -#[pyfunction] -pub(crate) fn register(cls: PythonVTable) -> PyResult<()> { - let vtable = ArrayVTableExt::into_vtable(cls); - SESSION.arrays().register(vtable); - Ok(()) -} +// TODO(ngates): add a session builder to the Python API. +// /// Register an array encoding implemented by subclassing `PyArray`. +// /// +// /// It's not currently possible to register a layout encoding from Python. +// #[pyfunction] +// pub(crate) fn register(cls: PythonVTable) -> PyResult<()> { +// let vtable = ArrayVTableExt::into_vtable(cls); +// SESSION.arrays().register(vtable); +// Ok(()) +// } diff --git a/vortex-scan/src/gpu/gpubuilder.rs b/vortex-scan/src/gpu/gpubuilder.rs index 58859bfe036..013e1d038cf 100644 --- a/vortex-scan/src/gpu/gpubuilder.rs +++ b/vortex-scan/src/gpu/gpubuilder.rs @@ -14,21 +14,21 @@ use vortex_gpu::GpuVector; use vortex_io::runtime::BlockingRuntime; use vortex_io::session::RuntimeSessionExt; use vortex_layout::gpu::GpuLayoutReaderRef; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::gpu::GpuScan; use crate::gpu::gputask::TaskFuture; use crate::scan_builder::filter_and_projection_masks; pub struct GpuScanBuilder { - session: VortexSession, + session: VortexSessionRef, layout_reader: GpuLayoutReaderRef, projection: Expression, map_fn: Arc) -> VortexResult> + Send + Sync>, } impl GpuScanBuilder { - pub fn new(session: VortexSession, layout_reader: GpuLayoutReaderRef) -> Self { + pub fn new(session: VortexSessionRef, layout_reader: GpuLayoutReaderRef) -> Self { Self { session, layout_reader, diff --git a/vortex-scan/src/repeated_scan.rs b/vortex-scan/src/repeated_scan.rs index cf70243ec5f..95b6ac69833 100644 --- a/vortex-scan/src/repeated_scan.rs +++ b/vortex-scan/src/repeated_scan.rs @@ -21,7 +21,7 @@ use vortex_error::VortexResult; use vortex_io::runtime::BlockingRuntime; use vortex_io::session::RuntimeSessionExt; use vortex_layout::LayoutReaderRef; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::filter::FilterExpr; use crate::selection::Selection; @@ -34,7 +34,7 @@ use crate::tasks::split_exec; /// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this /// data source. pub struct RepeatedScan { - session: VortexSession, + session: VortexSessionRef, layout_reader: LayoutReaderRef, projection: Expression, filter: Option, @@ -84,7 +84,7 @@ impl RepeatedScan { reason = "all arguments are needed for scan construction" )] pub(super) fn new( - session: VortexSession, + session: VortexSessionRef, layout_reader: LayoutReaderRef, projection: Expression, filter: Option, diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index d60b0748f6b..4a25cedf2d2 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -31,7 +31,7 @@ use vortex_layout::LayoutReader; use vortex_layout::LayoutReaderRef; use vortex_layout::layouts::row_idx::RowIdxLayoutReader; use vortex_metrics::VortexMetrics; -use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; use crate::RepeatedScan; use crate::selection::Selection; @@ -42,7 +42,7 @@ use crate::splits::attempt_split_ranges; /// A struct for building a scan operation. pub struct ScanBuilder { expr_optimizer: ExprOptimizer, - session: VortexSession, + session: VortexSessionRef, layout_reader: LayoutReaderRef, projection: Expression, filter: Option, @@ -70,8 +70,8 @@ pub struct ScanBuilder { } impl ScanBuilder { - pub fn new(session: VortexSession, layout_reader: Arc) -> Self { - let expr_optimizer = ExprOptimizer::new(&session.expressions()); + pub fn new(session: VortexSessionRef, layout_reader: Arc) -> Self { + let expr_optimizer = ExprOptimizer::new(session.expressions()); Self { expr_optimizer, session, diff --git a/vortex-scan/src/test.rs b/vortex-scan/src/test.rs index d29bf38d3b5..a1c95fdb8dc 100644 --- a/vortex-scan/src/test.rs +++ b/vortex-scan/src/test.rs @@ -9,12 +9,14 @@ use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; -pub static SESSION: LazyLock = LazyLock::new(|| { +pub static SESSION: LazyLock = LazyLock::new(|| { VortexSession::empty() .with::() .with::() .with::() .with::() .with::() + .freeze() }); diff --git a/vortex-session/src/lib.rs b/vortex-session/src/lib.rs index f1e22a1a529..85028954eee 100644 --- a/vortex-session/src/lib.rs +++ b/vortex-session/src/lib.rs @@ -13,17 +13,20 @@ use std::ops::Deref; use std::ops::DerefMut; use std::sync::Arc; -use dashmap::DashMap; -use dashmap::Entry; use vortex_error::VortexExpect; +use vortex_error::vortex_err; use vortex_error::vortex_panic; +use vortex_utils::aliases::hash_map::Entry; +use vortex_utils::aliases::hash_map::HashMap; + +pub type VortexSessionRef = Arc; /// A Vortex session encapsulates the set of extensible arrays, layouts, compute functions, dtypes, /// etc. that are available for use in a given context. /// /// It is also the entry-point passed to dynamic libraries to initialize Vortex plugins. -#[derive(Clone, Debug)] -pub struct VortexSession(Arc); +#[derive(Debug)] +pub struct VortexSession(SessionVars); impl VortexSession { /// Create a new [`VortexSession`] with no session state. @@ -33,12 +36,17 @@ impl VortexSession { Self(Default::default()) } + /// Freeze the session into an Arc for sharing. + pub fn freeze(self) -> VortexSessionRef { + Arc::new(self) + } + /// Inserts a new session variable of type `V` with its default value. /// /// # Panics /// /// If a variable of that type already exists. - pub fn with(self) -> Self { + pub fn with(mut self) -> Self { match self.0.entry(TypeId::of::()) { Entry::Occupied(_) => { vortex_panic!( @@ -52,97 +60,62 @@ impl VortexSession { } self } + + /// Returns the mutable scope variable of type `V`, or inserts a default one if it does not exist. + /// + /// Note that the returned value internally holds a lock on the variable. + pub fn get_mut(&mut self) -> &mut V { + self.0 + .entry(TypeId::of::()) + .or_insert_with(|| Box::new(V::default())) + .as_any_mut() + .downcast_mut::() + .vortex_expect("Type mismatch - this is a bug") + } } /// Trait for accessing and modifying the state of a Vortex session. pub trait SessionExt: Sized + private::Sealed { - /// Returns the [`VortexSession`]. - fn session(&self) -> VortexSession; - - /// Returns the scope variable of type `V`, or inserts a default one if it does not exist. - fn get(&self) -> Ref<'_, V>; - - /// Returns the scope variable of type `V` if it exists. - fn get_opt(&self) -> Option>; - - /// Returns the scope variable of type `V`, or inserts a default one if it does not exist. - /// - /// Note that the returned value internally holds a lock on the variable. - fn get_mut(&self) -> RefMut<'_, V>; + // Returns the [`VortexSessionRef`]. + // fn session(&self) -> VortexSessionRef; - /// Returns the scope variable of type `V`, if it exists. - /// - /// Note that the returned value internally holds a lock on the variable. - fn get_mut_opt(&self) -> Option>; -} + /// Returns the scope variable of type `V`. + fn get(&self) -> &V; -mod private { - pub trait Sealed {} - impl Sealed for super::VortexSession {} + /// Returns the scope variable of type `V`. + fn get_opt(&self) -> Option<&V>; } impl SessionExt for VortexSession { - fn session(&self) -> VortexSession { - self.clone() - } - - /// Returns the scope variable of type `V`, or inserts a default one if it does not exist. - fn get(&self) -> Ref<'_, V> { - Ref(self - .0 - .entry(TypeId::of::()) - .or_insert_with(|| Box::new(V::default())) - .downgrade() - .map(|v| { - (**v) - .as_any() - .downcast_ref::() - .vortex_expect("Type mismatch - this is a bug") - })) + fn get(&self) -> &V { + self.get_opt::() + .ok_or_else(|| { + vortex_err!( + "Session variable of type {} does not exist", + type_name::() + ) + }) + .vortex_expect("Session variable missing") } - fn get_opt(&self) -> Option> { + fn get_opt(&self) -> Option<&V> { self.0.get(&TypeId::of::()).map(|v| { - Ref(v.map(|v| { - (**v) - .as_any() - .downcast_ref::() - .vortex_expect("Type mismatch - this is a bug") - })) + (**v) + .as_any() + .downcast_ref::() + .vortex_expect("Type mismatch - this is a bug") }) } +} - /// Returns the scope variable of type `V`, or inserts a default one if it does not exist. - /// - /// Note that the returned value internally holds a lock on the variable. - fn get_mut(&self) -> RefMut<'_, V> { - RefMut( - self.0 - .entry(TypeId::of::()) - .or_insert_with(|| Box::new(V::default())) - .map(|v| { - (**v) - .as_any_mut() - .downcast_mut::() - .vortex_expect("Type mismatch - this is a bug") - }), - ) - } - - fn get_mut_opt(&self) -> Option> { - self.0.get_mut(&TypeId::of::()).map(|v| { - RefMut(v.map(|v| { - (**v) - .as_any_mut() - .downcast_mut::() - .vortex_expect("Type mismatch - this is a bug") - })) - }) - } +mod private { + pub trait Sealed {} + impl Sealed for super::VortexSession {} + impl Sealed for super::VortexSessionRef {} } /// A TypeMap based on `https://docs.rs/http/1.2.0/src/http/extensions.rs.html#41-266`. -type SessionVars = DashMap, BuildHasherDefault>; +type SessionVars = HashMap, BuildHasherDefault>; /// With TypeIds as keys, there's no need to hash them. They are already hashes /// themselves, coming from the compiler. The IdHasher just holds the u64 of diff --git a/vortex-session/src/registry.rs b/vortex-session/src/registry.rs index 5f0e5b39191..a020ee79998 100644 --- a/vortex-session/src/registry.rs +++ b/vortex-session/src/registry.rs @@ -6,14 +6,13 @@ use std::fmt::Display; use std::ops::Deref; -use std::sync::Arc; -use vortex_utils::aliases::dash_map::DashMap; +use vortex_utils::aliases::hash_map::HashMap; /// A registry of items that are keyed by a string identifier. // TODO(ngates): define a RegistryItem trait that has a custom key to avoid to_string calls. #[derive(Clone, Debug)] -pub struct Registry(Arc>); +pub struct Registry(HashMap); impl Default for Registry { fn default() -> Self { @@ -28,7 +27,7 @@ impl Registry { /// List the items in the registry. pub fn items(&self) -> impl Iterator + '_ { - self.0.iter().map(|i| i.value().clone()) + self.0.iter().map(|(_key, item)| item.clone()) } /// Return the items with the given IDs. @@ -41,16 +40,16 @@ impl Registry { /// Find the item with the given ID. pub fn find(&self, id: &str) -> Option { - self.0.get(id).as_deref().cloned() + self.0.get(id).cloned() } /// Register a new item, replacing any existing item with the same ID. - pub fn register(&self, item: T) { + pub fn register(&mut self, item: T) { self.0.insert(item.to_string(), item); } /// Register a new item, replacing any existing item with the same ID. - pub fn register_many>(&self, items: I) { + pub fn register_many>(&mut self, items: I) { for item in items { self.0.insert(item.to_string(), item); } diff --git a/vortex-tui/src/main.rs b/vortex-tui/src/main.rs index f9f4ad4a8fe..0f08c4a0833 100644 --- a/vortex-tui/src/main.rs +++ b/vortex-tui/src/main.rs @@ -17,8 +17,9 @@ use tree::TreeArgs; use tree::exec_tree; use vortex::VortexSessionDefault; use vortex::error::VortexExpect; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; +use vortex::session::VortexSessionRef; use crate::inspect::InspectArgs; @@ -55,8 +56,8 @@ impl Commands { } } -pub(crate) static SESSION: LazyLock = - LazyLock::new(|| VortexSession::default().with_tokio()); +pub(crate) static SESSION: LazyLock = + LazyLock::new(|| VortexSession::new_with_defaults().with_tokio().freeze()); #[tokio::main] async fn main() -> anyhow::Result<()> { diff --git a/vortex/examples/tracing_vortex.rs b/vortex/examples/tracing_vortex.rs index a27730ac6cf..c03c1fa1b20 100644 --- a/vortex/examples/tracing_vortex.rs +++ b/vortex/examples/tracing_vortex.rs @@ -46,6 +46,7 @@ use vortex_array::stream::ArrayStreamExt; use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; #[tokio::main] async fn main() -> Result<(), Box> { @@ -189,7 +190,7 @@ impl ShutdownSignal { impl VortexLayer { async fn new( - session: VortexSession, + session: VortexSessionRef, output_dir: PathBuf, batch_size: usize, ) -> (Self, WriterHandle, ShutdownSignal) { @@ -273,7 +274,7 @@ struct WriterHandle { impl WriterHandle { fn spawn( - session: VortexSession, + session: VortexSessionRef, mut rx: mpsc::UnboundedReceiver, output_dir: PathBuf, batch_size: usize, @@ -310,7 +311,7 @@ impl WriterHandle { /// Writes a batch of events to a Vortex file async fn write_batch_to_vortex( - session: VortexSession, + session: VortexSessionRef, output_dir: &Path, events: &[TraceEvent], file_index: usize, @@ -409,7 +410,7 @@ async fn write_batch_to_vortex( /// Reads and displays trace files async fn read_trace_files( - session: &VortexSession, + session: &VortexSessionRef, output_dir: &Path, ) -> Result<(), Box> { let mut entries = tokio::fs::read_dir(output_dir).await?; diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 436ba8ae971..9584df2d7b9 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -15,6 +15,7 @@ use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_session::VortexSession; +use vortex_session::VortexSessionRef; // We re-export like so in order to allow users to search inside subcrates when using the Rust docs. @@ -140,14 +141,24 @@ pub mod encodings { } /// Extension trait to create a default Vortex session. -pub trait VortexSessionDefault { +pub trait VortexSessionDefault: Sized { + /// Creates a new [`VortexSession`] with defaults enabled. + fn new_with_defaults() -> VortexSession; + + /// Enables the standard arrays, layouts, and expressions. + fn with_defaults(self) -> VortexSession; + /// Creates a default Vortex session with the standard arrays, layouts, and expressions. - fn default() -> VortexSession; + fn default() -> VortexSessionRef; } impl VortexSessionDefault for VortexSession { - fn default() -> VortexSession { - let session = VortexSession::empty() + fn new_with_defaults() -> VortexSession { + VortexSession::empty().with_defaults() + } + + fn with_defaults(self) -> VortexSession { + let mut this = self .with::() .with::() .with::() @@ -155,9 +166,13 @@ impl VortexSessionDefault for VortexSession { .with::(); #[cfg(feature = "files")] - file::register_default_encodings(&session); + file::register_default_encodings(&mut this); - session + this + } + + fn default() -> VortexSessionRef { + VortexSession::empty().with_defaults().freeze() } } @@ -183,9 +198,9 @@ mod test { use vortex_file::WriteOptionsSessionExt; use vortex_file::WriteStrategyBuilder; use vortex_layout::layouts::compact::CompactCompressor; - use vortex_session::VortexSession; use crate as vortex; + use crate::VortexSession; use crate::VortexSessionDefault; #[test] From ab358e6fc3dfbf0fbc4577c29b040fe4860c0f81 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 10 Dec 2025 23:20:46 +0000 Subject: [PATCH 2/5] Immutable session + registries Signed-off-by: Nicholas Gates --- java/testfiles/src/main.rs | 12 +++++++++--- vortex-session/src/lib.rs | 6 ++++-- vortex/src/lib.rs | 1 + 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/java/testfiles/src/main.rs b/java/testfiles/src/main.rs index 206db638726..45094bcd1ee 100644 --- a/java/testfiles/src/main.rs +++ b/java/testfiles/src/main.rs @@ -6,9 +6,13 @@ use std::path::Path; use vortex::array::arrays::StructArray; -use vortex::array::builders::{ArrayBuilder, DecimalBuilder, VarBinViewBuilder}; +use vortex::array::builders::ArrayBuilder; +use vortex::array::builders::DecimalBuilder; +use vortex::array::builders::VarBinViewBuilder; use vortex::array::validity::Validity; -use vortex::dtype::{DType, DecimalDType, Nullability}; +use vortex::dtype::DType; +use vortex::dtype::DecimalDType; +use vortex::dtype::Nullability; use vortex::file::WriteOptionsSessionExt; use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::io::runtime::BlockingRuntime; @@ -32,7 +36,9 @@ use vortex::VortexSessionDefault; /// | John | 10000 | VA | fn main() { let runtime = CurrentThreadRuntime::new(); - let session = VortexSession::default().with_handle(runtime.handle()); + let session = VortexSession::new_with_defaults() + .with_handle(runtime.handle()) + .freeze(); let mut names = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::NonNullable), 10); names.append_value("Alice"); diff --git a/vortex-session/src/lib.rs b/vortex-session/src/lib.rs index 85028954eee..15363d95897 100644 --- a/vortex-session/src/lib.rs +++ b/vortex-session/src/lib.rs @@ -65,9 +65,11 @@ impl VortexSession { /// /// Note that the returned value internally holds a lock on the variable. pub fn get_mut(&mut self) -> &mut V { - self.0 + let v = self + .0 .entry(TypeId::of::()) - .or_insert_with(|| Box::new(V::default())) + .or_insert_with(|| Box::new(V::default())); + (**v) .as_any_mut() .downcast_mut::() .vortex_expect("Type mismatch - this is a bug") diff --git a/vortex/src/lib.rs b/vortex/src/lib.rs index 9584df2d7b9..199100ef1ec 100644 --- a/vortex/src/lib.rs +++ b/vortex/src/lib.rs @@ -157,6 +157,7 @@ impl VortexSessionDefault for VortexSession { VortexSession::empty().with_defaults() } + #[allow(unused_mut)] fn with_defaults(self) -> VortexSession { let mut this = self .with::() From 0b280bc8f4737d77eeee9192a9b26f3065d25c4a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 10 Dec 2025 23:45:54 +0000 Subject: [PATCH 3/5] Immutable session + registries Signed-off-by: Nicholas Gates --- java/testfiles/src/main.rs | 2 +- vortex-python/python/vortex/registry.py | 4 ---- vortex-python/test/test_pyarray.py | 32 ++++++++++++++----------- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/java/testfiles/src/main.rs b/java/testfiles/src/main.rs index 45094bcd1ee..f52b638e85b 100644 --- a/java/testfiles/src/main.rs +++ b/java/testfiles/src/main.rs @@ -16,7 +16,7 @@ use vortex::dtype::Nullability; use vortex::file::WriteOptionsSessionExt; use vortex::io::runtime::current::CurrentThreadRuntime; use vortex::io::runtime::BlockingRuntime; -use vortex::io::session::RuntimeSessionExt; +use vortex::io::session::RuntimeSessionMutExt; use vortex::session::VortexSession; use vortex::VortexSessionDefault; diff --git a/vortex-python/python/vortex/registry.py b/vortex-python/python/vortex/registry.py index 465f6aa1be4..26ed55ba506 100644 --- a/vortex-python/python/vortex/registry.py +++ b/vortex-python/python/vortex/registry.py @@ -1,6 +1,2 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright the Vortex contributors - -from vortex._lib.registry import register # pyright: ignore[reportMissingModuleSource] - -__all__ = ["register"] diff --git a/vortex-python/test/test_pyarray.py b/vortex-python/test/test_pyarray.py index 5713ff669f0..8464f7093b7 100644 --- a/vortex-python/test/test_pyarray.py +++ b/vortex-python/test/test_pyarray.py @@ -3,8 +3,6 @@ from __future__ import annotations -from typing import final - import numpy as np import pyarrow as pa import pytest @@ -14,6 +12,7 @@ from pcodec import ( # pyright: ignore[reportMissingTypeStubs] wrapped as pco, # pyright: ignore[reportAttributeAccessIssue, reportUnknownVariableType] ) +from typing import final from typing_extensions import override import vortex as vx @@ -36,21 +35,23 @@ def dtype(self) -> vx.DType: return self._dtype def __init__( - self, - length: int, - dtype: vx.DType, - file_header: memoryview, - chunk_header: memoryview, - data: memoryview, + self, + length: int, + dtype: vx.DType, + file_header: memoryview, + chunk_header: memoryview, + data: memoryview, ): - (fd, _bytes_read) = pco.FileDecompressor.new(file_header) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + (fd, _bytes_read) = pco.FileDecompressor.new( + file_header) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] if dtype == vx.int_(64, nullable=True): dt = "i64" else: raise ValueError(f"Unsupported dtype: {dtype}") - (cd, _bytes_read) = fd.read_chunk_meta(chunk_header, dt) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] + (cd, _bytes_read) = fd.read_chunk_meta(chunk_header, + dt) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] dst = np.array([0] * length, dtype=np.int64) cd.read_page_into( # pyright: ignore[reportUnknownMemberType] @@ -66,7 +67,8 @@ def __init__( self._data = data @classmethod - def encode(cls, array: pa.Array[pa.Scalar[pa.DataType]], config: ChunkConfig | None = None) -> PCodecArray: # pyright: ignore[reportUnknownParameterType] + def encode(cls, array: pa.Array[pa.Scalar[pa.DataType]], + config: ChunkConfig | None = None) -> PCodecArray: # pyright: ignore[reportUnknownParameterType] assert array.null_count == 0, "Cannot compress arrays with nulls" config = config or ChunkConfig() # pyright: ignore[reportUnknownVariableType] @@ -74,11 +76,13 @@ def encode(cls, array: pa.Array[pa.Scalar[pa.DataType]], config: ChunkConfig | N fc = pco.FileCompressor() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] file_header = fc.write_header() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] - cc = fc.chunk_compressor(array.to_numpy(), config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + cc = fc.chunk_compressor(array.to_numpy(), + config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] chunk_header = cc.write_chunk_meta() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] data = b"" - for i, _n in enumerate(cc.n_per_page()): # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportUnknownVariableType] + for i, _n in enumerate( + cc.n_per_page()): # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportUnknownVariableType] data += cc.write_page(i) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] return PCodecArray( @@ -101,4 +105,4 @@ def decode(cls, parts: vx.ArrayParts, ctx: vx.ArrayContext, dtype: vx.DType, len def test_pcodec(): _ = PCodecArray.encode(pa.array([0, 1, 2, 3, 4])) # pyright: ignore[reportUnknownMemberType] - vx.registry.register(PCodecArray) + # vx.registry.register(PCodecArray) From 953b6c814e123ae6cb2909dfdd2d1a2c592afc96 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 10 Dec 2025 23:49:17 +0000 Subject: [PATCH 4/5] Immutable session + registries Signed-off-by: Nicholas Gates --- vortex-python/test/test_pyarray.py | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/vortex-python/test/test_pyarray.py b/vortex-python/test/test_pyarray.py index 8464f7093b7..772d1c0ecc1 100644 --- a/vortex-python/test/test_pyarray.py +++ b/vortex-python/test/test_pyarray.py @@ -35,23 +35,21 @@ def dtype(self) -> vx.DType: return self._dtype def __init__( - self, - length: int, - dtype: vx.DType, - file_header: memoryview, - chunk_header: memoryview, - data: memoryview, + self, + length: int, + dtype: vx.DType, + file_header: memoryview, + chunk_header: memoryview, + data: memoryview, ): - (fd, _bytes_read) = pco.FileDecompressor.new( - file_header) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + (fd, _bytes_read) = pco.FileDecompressor.new(file_header) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] if dtype == vx.int_(64, nullable=True): dt = "i64" else: raise ValueError(f"Unsupported dtype: {dtype}") - (cd, _bytes_read) = fd.read_chunk_meta(chunk_header, - dt) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] + (cd, _bytes_read) = fd.read_chunk_meta(chunk_header, dt) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] dst = np.array([0] * length, dtype=np.int64) cd.read_page_into( # pyright: ignore[reportUnknownMemberType] @@ -67,8 +65,7 @@ def __init__( self._data = data @classmethod - def encode(cls, array: pa.Array[pa.Scalar[pa.DataType]], - config: ChunkConfig | None = None) -> PCodecArray: # pyright: ignore[reportUnknownParameterType] + def encode(cls, array: pa.Array[pa.Scalar[pa.DataType]], config: ChunkConfig | None = None) -> PCodecArray: # pyright: ignore[reportUnknownParameterType] assert array.null_count == 0, "Cannot compress arrays with nulls" config = config or ChunkConfig() # pyright: ignore[reportUnknownVariableType] @@ -76,13 +73,11 @@ def encode(cls, array: pa.Array[pa.Scalar[pa.DataType]], fc = pco.FileCompressor() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] file_header = fc.write_header() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] - cc = fc.chunk_compressor(array.to_numpy(), - config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] + cc = fc.chunk_compressor(array.to_numpy(), config) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] chunk_header = cc.write_chunk_meta() # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] data = b"" - for i, _n in enumerate( - cc.n_per_page()): # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportUnknownVariableType] + for i, _n in enumerate(cc.n_per_page()): # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportUnknownVariableType] data += cc.write_page(i) # pyright: ignore[reportUnknownMemberType, reportUnknownVariableType] return PCodecArray( From 773d7e165eef3574d796f03c8e6bede089b30280 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 10 Dec 2025 23:53:00 +0000 Subject: [PATCH 5/5] Immutable session + registries Signed-off-by: Nicholas Gates --- vortex-python/test/test_pyarray.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vortex-python/test/test_pyarray.py b/vortex-python/test/test_pyarray.py index 772d1c0ecc1..024d1580cff 100644 --- a/vortex-python/test/test_pyarray.py +++ b/vortex-python/test/test_pyarray.py @@ -3,6 +3,8 @@ from __future__ import annotations +from typing import final + import numpy as np import pyarrow as pa import pytest @@ -12,7 +14,6 @@ from pcodec import ( # pyright: ignore[reportMissingTypeStubs] wrapped as pco, # pyright: ignore[reportAttributeAccessIssue, reportUnknownVariableType] ) -from typing import final from typing_extensions import override import vortex as vx