From c1c2c5cda3d7647b2fe8edd185dd26b0925a7e40 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Fri, 20 Mar 2026 17:40:25 +0000 Subject: [PATCH 1/4] C Scan API: initial --- proposed/0030-c-scan-api.md | 393 ++++++++++++++++++++++++++++++++++++ 1 file changed, 393 insertions(+) create mode 100644 proposed/0030-c-scan-api.md diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md new file mode 100644 index 0000000..706deb5 --- /dev/null +++ b/proposed/0030-c-scan-api.md @@ -0,0 +1,393 @@ +- Start Date: 2026-03-13 +- Authors: Mikhail Kot + +# C Scan API + +There is a scan API for Rust-compatible code available at +https://github.com/vortex-data/vortex/tree/develop/vortex-scan. + +The goal of introducing C scan API is to make integration with non-Rust query +engines like Velox easier. In theory, such engines can use cxx.rs, but it +requires a lot of binding code and runtime bridging (see below). + +There exists a partial scan API for C exposed over files [1], but it's limited +to single-file URIs without globs, and it's also not thread-safe. Its main +flaws, however, are: + +- Inability to export to well-known format like ArrowArrayStream, +- Lack of introspection over produced `vx_array`s, and +- Inability to control scan on a level lower than just getting partitions and + `vx_array`s with filters and projections pre-configured. + + Why does Scan API need to expose `vx_array`s? What's the benefit of using + own format over ArrowArrayStream? + + The answer is "compression". Vortex DTypes don't exactly match with Arrow + physical encodings, so if you have i.e. a ConstantArray, you need to + decompress it into something Arrow-compatible. This was a major regression + in Duckdb integration. + +C++ API works it out by allowing to produce an ArrowArrayStream interface out of +ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid +while adding C interfaces. C++ API future is outside of scope of this proposal +but it's expected to wrap C API directly over time, removing dependency on +cxx.rs for vortex-cxx. + +## Customization points + +Main goal of providing customization points is to do as little work as possible +in Vortex code and as much work as possible in the query engine. Some engines +may request control over scan execution like pruning. Engines like Duckdb have +own remote storage, caching, and globbing implementations. API still needs an +ability to fall back to own implementation. + +Still, Scan API is a relatively high-level concept, and if its level is not +suffifient, engines can resort to using a layout reader plan and executing it +directly. + +## Datasource + +A Datasource is a reference to multiple possibly remote files. When created, it +opens first file to determine the schema from DType, all other operations are +deferred till a scan is requested. You can request multiple file scans from a +Datasource. + +```c +// Opaque, generated by bindgen +typedef struct vx_data_source vx_data_source; +typedef struct vx_file_handle vx_file_handle; + +typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); +typedef void (*vx_glob_callback)(void* userdata, const char* file); + +typedef struct vx_data_source_options { + // (1) Filesystem customization + + bool (*fs_use_vortex)(const char* schema, const char* path); + void (*fs_set_userdata)(void* userdata); + + // should be called after glob expansion, single-file mode + vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err); + vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err); + + // non-recursive, callback is invoked with each path + void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err); + + void fs_close(vx_file_handle handle); + uint64_t fs_size(vx_file_handle handle, vx_error *err); + + // positional read, doesn't change file open cursor + void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, + vx_error *err); + + // not needed for scanning but makes FS API complete + void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, + vx_error *err); + void fs_sync(vx_file_handle handle, vx_error *err); + + // (2) Globbing customization + + void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err); + + /// (3) Cache customization + + void* (*cache_init)(vx_error* err); + void (*cache_free)(void* cache, vx_error* err); + void (*cache_get)(void* cache, const char* key, void** value, vx_error* err); + void (*cache_put)(void* cache, const char* key, void* value, vx_error* err); + void (*cache_delete)(void* cache, const char* key, vx_error* err); +} vx_data_source_options; + +// Addition to existing DType API, returns owned ArrowSchema which needs to +// be freed by the caller using release callback. If err is populated, out +// is not set. +void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err); + +/// Create a new owned datasource which must be freed by the caller. +const vx_data_source * +vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); + +// datasource is Arc'd inside so a clone creates another reference rather +// than cloning the state fully. +const vx_data_source *vx_data_source_clone(const vx_data_source *ptr); + +// vx_dtype's lifetime is bound to datasource's lifetime, caller doesn't need +// to free it +const vx_dtype *vx_data_source_dtype(const vx_data_source *ds); + +typedef enum { + VX_CARD_UNKNOWN = 0, + VX_CARD_ESTIMATE = 1, + VX_CARD_MAXIMUM = 2, +} vx_cardinality; +typedef struct { + vx_cardinality cardinality; + uint64_t rows; +} vx_data_source_row_count; + +void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); +void vx_data_source_free(const vx_data_source *ptr); +``` + +1. Open local or remote file. Allow using vortex's filesystem or query engine's + filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for + local reads, but vortex fs for remote reads. Remote filesystem customization + point has the benefit of not duplicating credentials e.g. S3 access key + between query engine and vortex. Local implementation may be more performant. + Vortex rolled back full duckdb fs usage due to performance implications. +2. Open single file or multiple files. Query engines may have their own glob + expansion [4] which does HTTP IO. +3. Cache intermediate results. Main use case for Vortex is caching schema in + memory for footer cache and conversion results. Benefits are in no + requirement to open first file in a glob eagerly if there's a cache hit. + Vortex had had an integration with Duckdb object cache which was deleted in + favor of own implementation which led to a performance regression. + +When all three customization points are implemented, Vortex offloads all IO +to a query engine. + + Why not expose API to consume byte ranges or emit byte range requests for + the query engine to read and populate buffers? + + This approach is indeed easier than using a vtable with a specific + implementation, and requires slightly more scaffolding on the query engine + side, but it's significantly easier to implement on Vortex side and it is + coherent with current Rust implementation. + Similar implementation can be found in Arrow's RandomAccessFile or + parquet-rs's AsyncFileReader. + + However, as we're thinking of changing our Rust API, we can try to invest + time into this approach as well. + +Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows +Duckdb integration to abstract memory allocations to the database. + +## Runtime bridging + +In Rust API, a Datasource produces a stream of Partitions. A Partition produces +a stream of Arrays. API is required to be used in an async runtime, current +runtime for Vortex is tokio. + +Velox uses a non-coroutine but async runtime based on Folly executors. Engines +like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync +based on thread pools. Postgres runtime is sync based on processes. Some of +engines may use OS-specific IO like `io_uring`. + +All potential usages of our API may be grouped into 4 cases: + +- sync, single-thread runtime, trivial. (1) +- sync, multi-thread runtime. (2) +- async, multi-thread/coroutine. (3) +- async, single-thread. (4) + +Scan/Partition suggestions outlined below play well with (2) but not with (3) +and (4) because Vortex has its own runtime which will block on current thread +when i.e. getting an Array out of Partition. An async-friendly API basically +means exposing a coroutine/state machine which hands control over to the host on +IO. + +As Joe mentioned, we want to get away from the concept of partitions and emit +chunks of vx_array's directly from the scan. In this case, such state machine +may be expressed with roughly the following states: + +``` + Passed a file handle +START -> NEED_IO (offset, len) -> EXECUTE -> DONE + ^ When passed a file handle, instructs host to read following byte + range into buffer and return a handle to this buffer. + ^ Decompresses the buffer (executes the Array) + one step into other buffer + ^ + Array is executed/canonicalized to the form host can work with. + Host now transfers data from buffers to its own output format. +``` + +However, as the future of such approach is unclear, a async-unfriendly option is +described below + +## Scan + +Scan iterators: + +```c +``` + +Scan options: + +```c +typedef enum { + VX_S_INCLUDE_ALL = 0, + VX_S_INCLUDE_RANGE = 1, + VX_S_EXCLUDE_RANGE = 2, +} vx_scan_selection_include; + +typedef struct { + uint64_t *idx; + size_t idx_len; + // Roaring bitmaps won't be supported as for now + // If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query + // engine on scan invocation and can be freed after a scan iterator is requested + vx_scan_selection_include include; +} vx_scan_selection; + +typedef struct vx_scan_selection { + const size_t* idx; + size_t idx_len; +} vx_scan_selection; + +typedef struct vx_scan_options { + // May be NULL which means "return all columns" + const vx_expression *projection; + + // May be NULL which means "no filter" + const vx_expression *filter; + + // Set both to 0 to indicate no range request + // Inclusive + uint64_t row_range_begin; + // Exclusive + uint64_t row_range_end; + + vx_scan_selection selection; + + // 0 for no limit + uint64_t limit; + int ordered; +} vx_scan_options; +``` + +Scan interface: + +```c +typedef struct vx_scan vx_scan; + +/** + * A partition is a contiguous chunk of memory from which you can interatively + * get vx_arrays. + */ +typedef struct vx_partition vx_partition; + +typedef enum { + VX_ESTIMATE_UNKNOWN = 0, + VX_ESTIMATE_EXACT = 1, + VX_ESTIMATE_INEXACT = 2, +} vx_estimate_boundary; + +typedef struct { + // If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated + uint64_t estimate; + vx_estimate_boundary boundary; +} vx_estimate; + +// Users are encouraged to create worker threads depending on est->estimate to +// distribute work. +// opts and est may be nullptr. +// Requesting a scan doesn't do anything unless vx_partition_next is called. +vx_scan * +vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); + +/** + * Get next owned partition out of a scan request. + * Caller must free this partition using vx_partition_free. + * This method is thread-safe. + * If using in a sync multi-thread runtime, users are encouraged to create a + * worker thread per partition. + * Returns NULL and doesn't set err on exhaustion. + * Returns NULL and sets err on error. + */ +vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); + +// Request an array stream in Arrow format from a partition, consuming it +// fully. Not thread-safe, should be called once. +// stream is owned and must be freed using the release callback +void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err); + +// Thread-unsafe. Get an owned vx_array of an iterator. +// Returns NULL if iterator is exhausted. Array is owned and must be +// freed by caller. +const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); +``` + +There are examples of APIs also exposing batch reads, but I doubt this is a good +option as for every ArrayRef the work that needs to be done to execute it may be +significant, and if you want to parallelize work, you can use this with +partitions, so each thread will be still busy with one ArrayRef at a time. +It can be introduced in the future. + +Scan functions are lazy as they operate on streams and it is +consumer's code responsibility to use parallelism at the desired degree. + +## What to do with `vx_array` + +The main question is how to transform outputs of iteration, vx_array, into +something query engines can operate with. You need to execute the array +iteratively till you recognize data and start exporting it. Duckdb integration +is mostly written in Rust with C++ code calling Rust's vtable functions. Rust +code does all data export. PoC implementation moves Duckdb to use C API but +leaves existing Rust code for exporting `vx_array` into DataChunk. + +However, the goal is not to interface with Rust code, so as a baseline the API +provides a way to scan partitions directly into ArrowArrayStream which should be +good enough for most consumers. + +## Cancellation + +There will be no option to cancel the scan as this isn't possibe on Rust API +either and this is a low priority task. + +## Testing + +C API doesn't have any testing. I suggest setting up a Catch3 testing target and +a CMake library for C API using FetchContent to download Catch. This way people +not working on Duckdb integration or FFI wouldn't need CMake and Catch. To +integrate C tests with `cargo test`, we can write a `build.rs` extension which +parses C test names and codegenerates rust tests targets calling to Catch. + +## Duckdb integration PoC + +``` +before: +Duckdb side Vortex side + +C++ C++ Rust +duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation + +after: + +C++ C++ C +duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* + +* - vx_array -> DataChunk reuses existing Rust code +``` + +https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has an +implementation of using C Scan API for Duckdb scan integration. Duckdb has a +sync multi-threaded runtime, and table function is called from multiple threads +simultaneously. Users can save a per-thread state. + +The integration splits into following parts: +- DType -> LogicalType integration, done sans Temporal extension. +- Table function binding (creating a DataSource), done. +- Global state initialization (creating a Scan), done sans filter pushdown. +- Local state initialization (export batch id), done. +- Utility functions like cardinality estimates, done. +- vx_array -> DataChunk export, delegated to existing Rust code. + +On filter pushdown: projection pushdown requires exposing only `select()` +expression. On the other hand, filter pushdown requires `TableFilter -> Vortex +Expression` conversion which is significant porting so left out. + +On DataChunk export: it requires exposing features like array optimization, +validity masks, and other features, so left out. + +Table function uses Vortex _partition_ concepts as a work splitting term only, +i.e. one worker thread operating on one or multiple partitions. Each thread +pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then +works on its own partition without synchronization. + +[1] `vx_file_scan` + +[2] Need to control pruning + https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999 + +[4] e.g. Duckdb MultiFileReader / MultiFileList From ae6ccfdd14d4e7b37cad1b7e7de85c17be775f62 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Tue, 31 Mar 2026 11:57:47 +0100 Subject: [PATCH 2/4] split API into 3 layers --- proposed/0030-c-scan-api.md | 483 +++++++++++++++++++----------------- 1 file changed, 251 insertions(+), 232 deletions(-) diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md index 706deb5..0dcc0fc 100644 --- a/proposed/0030-c-scan-api.md +++ b/proposed/0030-c-scan-api.md @@ -2,110 +2,153 @@ - Authors: Mikhail Kot # C Scan API +## Summary + +Provide a layered C Scan API for non-Rust clients[^1], each layer exposing more +capabilities to the host: + +- High level layer is intended for sync multi-threaded or multi-process runtimes + like DuckDB, ClickHouse, or Postgres. It provides a `DataSource -> + Partition -> Array` chain with thread safety. It manages the event loop inside + Vortex but allows offloading IO and memory allocations to the host. This layer + roughly matches scan API + [exposed](https://github.com/vortex-data/vortex/tree/develop/vortex-scan) + in Rust. +- Middle level layer is intended for async runtimes like Velox which want to run + their own event loop. In addition to offloaded IO, it doesn't do any things + eagerly[^2] but returns control on IO or CPU work for host to schedule. + This layer exposes a coroutine-like state machine. +- Low level layer is intended for clients which need greater control over + scan scheduling. This layer roughly matches a + [LayoutReader](https://github.com/vortex-data/vortex/tree/develop/vortex-layout/src/reader.rs) + in Rust. + +| Level | Allocations | IO | Event loop | Scan planning | +| ----- | ----------- | -- | ---------- | ------------- | +| High | Host/Vortex | Host/Vortex | Vortex | Vortex | +| Middle | Host | Host | Host | Vortex | +| Low | Host | Host | Host | Host | + +## Motivation + +Vortex is a Rust project, and thus integration with non-Rust clients requires a +decent amount of scaffolding[^3] which lowers down the probability of such +integration. As reading files is most important feature for a file format, +exposing it to C-compatible uses lowers down the integration costs. + +Additional motivation is our DuckDB integration which requires C++-to-Rust +and Rust-to-C++ bridging. Replacing it with direct calls to C API would make +the project much smaller. + +[Current](https://github.com/vortex-data/vortex/tree/develop/vortex-ffi/src/file.rs) +C Scan API in the form of `vx_file_scan` is incomplete for the following +reasons: + +- Thread unsafety: host needs to serialize calls to `vx_array_iterator`. +- Lack of support for file globs. +- Lack of support for exporting to `ArrowSchema` and `ArrowArrayStream`. +- Lack of introspection over produced `vx_array` objects. + + Vortex also has a C++ API in vortex-cxx which wraps Rust code. Its future is + outside of scope of this proposal but it's expected to wrap C scan API, + removing dependency on cxx.rs for vortex-cxx. + +## Motivation for layered API + +Velox uses a non-coroutine but async runtime based on Folly executors. CoroBase +uses a coroutine-based runtime. DuckDB and ClickHouse runtimes are sync based on +thread pools. Postgres runtime is sync based on processes. Some of engines may +use OS-specific IO like `io_uring`. + +Scan API usage may be grouped into two distinct cases: + +- sync, single-thread/multi-thread/multi-process runtime, and +- async, multi-thread/coroutine runtime. + +Exposing just the high level would not work well with async hosts as there's +no way to suspend to host on IO operations. Exposing only the middle layer +requires sync hosts to manage their own event loop which is tedious. + +Another benefit of splitting API into layers is that it can be discussed and +implemented separately. + +## High level API -There is a scan API for Rust-compatible code available at -https://github.com/vortex-data/vortex/tree/develop/vortex-scan. - -The goal of introducing C scan API is to make integration with non-Rust query -engines like Velox easier. In theory, such engines can use cxx.rs, but it -requires a lot of binding code and runtime bridging (see below). - -There exists a partial scan API for C exposed over files [1], but it's limited -to single-file URIs without globs, and it's also not thread-safe. Its main -flaws, however, are: - -- Inability to export to well-known format like ArrowArrayStream, -- Lack of introspection over produced `vx_array`s, and -- Inability to control scan on a level lower than just getting partitions and - `vx_array`s with filters and projections pre-configured. - - Why does Scan API need to expose `vx_array`s? What's the benefit of using - own format over ArrowArrayStream? - - The answer is "compression". Vortex DTypes don't exactly match with Arrow - physical encodings, so if you have i.e. a ConstantArray, you need to - decompress it into something Arrow-compatible. This was a major regression - in Duckdb integration. - -C++ API works it out by allowing to produce an ArrowArrayStream interface out of -ScanBuilder, but it uses Rust code directly via cxx.rs which we want to avoid -while adding C interfaces. C++ API future is outside of scope of this proposal -but it's expected to wrap C API directly over time, removing dependency on -cxx.rs for vortex-cxx. - -## Customization points - -Main goal of providing customization points is to do as little work as possible -in Vortex code and as much work as possible in the query engine. Some engines -may request control over scan execution like pruning. Engines like Duckdb have -own remote storage, caching, and globbing implementations. API still needs an -ability to fall back to own implementation. - -Still, Scan API is a relatively high-level concept, and if its level is not -suffifient, engines can resort to using a layout reader plan and executing it -directly. +``` +┌──────────┐ +│DataSource│ +└─┬────────┘ + ▼ produces a +┌────┐ +│Scan│ +└┬┬┬─┘ + │││ worker threads pull a + ▼▼▼ +┌───────────┐ +│ Partition │ (thread safe) +└┬──────────┘ + │ which produces an + └─►Array (thread unsafe) +``` -## Datasource +### DataSource -A Datasource is a reference to multiple possibly remote files. When created, it +A DataSource is a reference to multiple possibly remote files. When created, it opens first file to determine the schema from DType, all other operations are -deferred till a scan is requested. You can request multiple file scans from a -Datasource. +deferred till a scan is requested and executed. You can request multiple file +scans from a DataSource. ```c -// Opaque, generated by bindgen +// bindgen typedef struct vx_data_source vx_data_source; typedef struct vx_file_handle vx_file_handle; +typedef struct vx_cache_handle *vx_cache_handle; typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); typedef void (*vx_glob_callback)(void* userdata, const char* file); typedef struct vx_data_source_options { // (1) Filesystem customization - bool (*fs_use_vortex)(const char* schema, const char* path); void (*fs_set_userdata)(void* userdata); - // should be called after glob expansion, single-file mode - vx_file_handle (*fs_open)(void* userdata, const char* path, vx_error** err); - vx_file_handle (*fs_create)(void* userdata, const char* path, vx_error** err); + // Called after glob expansion, single-file mode + vx_file_handle (*fs_open)(void* userdata, const char* path, + vx_error** err); + vx_file_handle (*fs_create)(void* userdata, const char* path, + vx_error** err); // non-recursive, callback is invoked with each path - void fs_list(void* userdata, const char* path, vx_list_callback cb, vx_error *err); + void fs_list(void* userdata, const char* path, vx_list_callback cb, + vx_error** err); void fs_close(vx_file_handle handle); uint64_t fs_size(vx_file_handle handle, vx_error *err); // positional read, doesn't change file open cursor - void fs_read(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, - vx_error *err); + void fs_read(vx_file_handle handle, uint64_t offset, size_t len, + uint8_t *buffer, vx_error** err); // not needed for scanning but makes FS API complete - void fs_write(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, - vx_error *err); - void fs_sync(vx_file_handle handle, vx_error *err); - - // (2) Globbing customization - - void (*glob)(const char* glob, vx_glob_callback cb, vx_error* err); + void fs_write(vx_file_handle handle, uint64_t offset, size_t len, + uint8_t *buffer, vx_error** err); - /// (3) Cache customization + void fs_sync(vx_file_handle handle, vx_error** err); - void* (*cache_init)(vx_error* err); - void (*cache_free)(void* cache, vx_error* err); - void (*cache_get)(void* cache, const char* key, void** value, vx_error* err); - void (*cache_put)(void* cache, const char* key, void* value, vx_error* err); - void (*cache_delete)(void* cache, const char* key, vx_error* err); + // (2) Globbing customization + void (*glob)(const char* glob, vx_glob_callback cb, vx_error** err); } vx_data_source_options; // Addition to existing DType API, returns owned ArrowSchema which needs to // be freed by the caller using release callback. If err is populated, out // is not set. -void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, vx_error* err); +void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, + vx_error* err); /// Create a new owned datasource which must be freed by the caller. const vx_data_source * -vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error_out err); +vx_data_source_new(const vx_session *session, + const vx_data_source_options *opts, vx_error_out err); // datasource is Arc'd inside so a clone creates another reference rather // than cloning the state fully. @@ -125,92 +168,38 @@ typedef struct { uint64_t rows; } vx_data_source_row_count; -void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc); -void vx_data_source_free(const vx_data_source *ptr); +void vx_data_source_get_row_count(const vx_data_source *ds, + vx_data_source_row_count *rc); +void vx_data_source_free(const vx_data_source *ds); ``` -1. Open local or remote file. Allow using vortex's filesystem or query engine's - filesystem e.g. duckdb fs. Allow partial customization e.g. duckdb fs for - local reads, but vortex fs for remote reads. Remote filesystem customization +1. Open local or remote file. Allow using Vortex's filesystem or host's + filesystem e.g. DuckDB fs. Allow partial customization e.g. DuckDB fs for + local reads, but Vortex fs for remote reads. Host filesystem customization point has the benefit of not duplicating credentials e.g. S3 access key - between query engine and vortex. Local implementation may be more performant. - Vortex rolled back full duckdb fs usage due to performance implications. -2. Open single file or multiple files. Query engines may have their own glob - expansion [4] which does HTTP IO. -3. Cache intermediate results. Main use case for Vortex is caching schema in - memory for footer cache and conversion results. Benefits are in no - requirement to open first file in a glob eagerly if there's a cache hit. - Vortex had had an integration with Duckdb object cache which was deleted in - favor of own implementation which led to a performance regression. - -When all three customization points are implemented, Vortex offloads all IO -to a query engine. - - Why not expose API to consume byte ranges or emit byte range requests for - the query engine to read and populate buffers? - - This approach is indeed easier than using a vtable with a specific - implementation, and requires slightly more scaffolding on the query engine - side, but it's significantly easier to implement on Vortex side and it is - coherent with current Rust implementation. - Similar implementation can be found in Arrow's RandomAccessFile or - parquet-rs's AsyncFileReader. - - However, as we're thinking of changing our Rust API, we can try to invest - time into this approach as well. - -Coupled with https://github.com/vortex-data/vortex/pull/7012 it also allows -Duckdb integration to abstract memory allocations to the database. - -## Runtime bridging - -In Rust API, a Datasource produces a stream of Partitions. A Partition produces -a stream of Arrays. API is required to be used in an async runtime, current -runtime for Vortex is tokio. + between host and Vortex. Local implementation may be more performant. +2. Open single file or multiple files. Hosts may have their own glob expansion + [^4] which does HTTP IO. -Velox uses a non-coroutine but async runtime based on Folly executors. Engines -like CoroBase use a coroutine-based runtime. Duckdb and ClickHouse runtimes are sync -based on thread pools. Postgres runtime is sync based on processes. Some of -engines may use OS-specific IO like `io_uring`. - -All potential usages of our API may be grouped into 4 cases: - -- sync, single-thread runtime, trivial. (1) -- sync, multi-thread runtime. (2) -- async, multi-thread/coroutine. (3) -- async, single-thread. (4) - -Scan/Partition suggestions outlined below play well with (2) but not with (3) -and (4) because Vortex has its own runtime which will block on current thread -when i.e. getting an Array out of Partition. An async-friendly API basically -means exposing a coroutine/state machine which hands control over to the host on -IO. - -As Joe mentioned, we want to get away from the concept of partitions and emit -chunks of vx_array's directly from the scan. In this case, such state machine -may be expressed with roughly the following states: +When both customization points are implemented, Vortex offloads all IO +to a query engine. -``` - Passed a file handle -START -> NEED_IO (offset, len) -> EXECUTE -> DONE - ^ When passed a file handle, instructs host to read following byte - range into buffer and return a handle to this buffer. - ^ Decompresses the buffer (executes the Array) - one step into other buffer - ^ - Array is executed/canonicalized to the form host can work with. - Host now transfers data from buffers to its own output format. -``` + Memory allocation customization is out of scope of this proposal, but it's + possible for Vortex to expose bringing allocator from outside for buffers. -However, as the future of such approach is unclear, a async-unfriendly option is -described below +### Scan -## Scan +A Scan is a one-time traversal of files in a DataSource. A Scan can't be +restarted once requested. Hosts are encouraged to utilize multiple threads for +scanning. Scan thread-safely produces Partitions[^5] which are chunks of work +for a thread[^6]. -Scan iterators: +Host may instruct a Scan about its parallelism degree by passing `max_threads`. +Vortex may return less partitions than there are `max_threads` but won't return +more. -```c -``` +There will be no option to cancel a scan as this isn't possible on Rust side +either and this is a low priority task. Scan options: @@ -221,50 +210,34 @@ typedef enum { VX_S_EXCLUDE_RANGE = 2, } vx_scan_selection_include; +// Roaring bitmaps won't be supported. +// If selection is VX_S_INCLUDE_ALL, idx is not read. idx is copied +// by host on scan invocation and can be freed after first partition is +// requested typedef struct { uint64_t *idx; size_t idx_len; - // Roaring bitmaps won't be supported as for now - // If selection is VX_S_INCLUDE_ALL, these are not read. idx is copied by query - // engine on scan invocation and can be freed after a scan iterator is requested vx_scan_selection_include include; } vx_scan_selection; -typedef struct vx_scan_selection { - const size_t* idx; - size_t idx_len; -} vx_scan_selection; - typedef struct vx_scan_options { - // May be NULL which means "return all columns" - const vx_expression *projection; - - // May be NULL which means "no filter" - const vx_expression *filter; - - // Set both to 0 to indicate no range request - // Inclusive - uint64_t row_range_begin; - // Exclusive - uint64_t row_range_end; - + const vx_expression *projection; // NULL means "return all columns" + const vx_expression *filter; // NULL means "no filter" + uint64_t row_range_begin; // Inclusive, [0; 0) means "all rows" + uint64_t row_range_end; // Exclusive vx_scan_selection selection; - - // 0 for no limit - uint64_t limit; - int ordered; + uint64_t limit; // 0 means no limit + uint64_t max_threads; // 0 means no limit. + bool ordered; // 0 means unordered scan } vx_scan_options; ``` Scan interface: ```c +// bindgen typedef struct vx_scan vx_scan; - -/** - * A partition is a contiguous chunk of memory from which you can interatively - * get vx_arrays. - */ +// A Partition is a chunk of work for a thread. typedef struct vx_partition vx_partition; typedef enum { @@ -273,86 +246,121 @@ typedef enum { VX_ESTIMATE_INEXACT = 2, } vx_estimate_boundary; +// If type is VX_P_ESTIMATE_UNKNOWN, estimate is not populated. typedef struct { - // If type is VX_P_ESTIMATE_UNKNOWN, estimate field is not populated uint64_t estimate; - vx_estimate_boundary boundary; + vx_estimate_boundary type; } vx_estimate; -// Users are encouraged to create worker threads depending on est->estimate to -// distribute work. -// opts and est may be nullptr. -// Requesting a scan doesn't do anything unless vx_partition_next is called. +// Users are encouraged to create worker threads depending on +// estimate->estimate to distribute work. +// options and estimate fields may be NUL. +// Calling vx_data_source_scan doesn't do IO unless vx_scan_next is called. +// vx_scan can outlive vx_data_source. vx_scan * -vx_data_source_scan(const vx_data_source *data_source, const vx_scan_options *options, vx_error_out err); - -/** - * Get next owned partition out of a scan request. - * Caller must free this partition using vx_partition_free. - * This method is thread-safe. - * If using in a sync multi-thread runtime, users are encouraged to create a - * worker thread per partition. - * Returns NULL and doesn't set err on exhaustion. - * Returns NULL and sets err on error. - */ -vx_partition *vx_scan_next(vx_scan *scan, vx_error_out err); +vx_data_source_scan(const vx_data_source *ds, const vx_scan_options *options, + vx_estimate* estimate, vx_error **err); +``` + +### Partition + +A Partition allows a worker thread to produce Arrays thread-unsafely. +Partitions also allow exporting the data to ArrowArrayStream for hosts +that don't want to introspect Arrays. Hosts should call `vx_scan_next` +and `vx_partition_next` till they return NULL which signals there's no more +data. + +```c +// Get next owned partition out of a scan request. +// Caller must free this partition using vx_partition_free. +// This method is thread-safe. +// Hosts are encouraged to create a worker thread per partition. +// Returns NULL and doesn't set err on exhaustion. +// Returns NULL and sets err on error. +vx_partition *vx_scan_next(vx_scan *scan, vx_error **err); // Request an array stream in Arrow format from a partition, consuming it // fully. Not thread-safe, should be called once. // stream is owned and must be freed using the release callback -void vx_partition_scan_arrow(const vx_partition *partition, FFI_ArrowArrayStream *stream, vx_error_out err); +void vx_partition_scan_arrow(const vx_partition *partition, + ArrowArrayStream *stream, vx_error_out err); // Thread-unsafe. Get an owned vx_array of an iterator. -// Returns NULL if iterator is exhausted. Array is owned and must be -// freed by caller. -const vx_array *vx_partition_next(vx_partition *partition, vx_error_out err); +// Returns NULL and sets err to NULL if iterator is exhausted. +// Array must be freed by caller. +const vx_array *vx_partition_next(vx_partition *partition, vx_error **err); ``` -There are examples of APIs also exposing batch reads, but I doubt this is a good -option as for every ArrayRef the work that needs to be done to execute it may be -significant, and if you want to parallelize work, you can use this with -partitions, so each thread will be still busy with one ArrayRef at a time. -It can be introduced in the future. +### Array introspection -Scan functions are lazy as they operate on streams and it is -consumer's code responsibility to use parallelism at the desired degree. +The main question is how to transform outputs of iteration, `vx_array`, into +something query engines can operate with. You need to execute the array +iteratively till you recognize data and start exporting it. Thus API provides a +way to scan partitions directly into ArrowArrayStream which should be good +enough for most hosts. -## What to do with `vx_array` +## Middle level -The main question is how to transform outputs of iteration, vx_array, into -something query engines can operate with. You need to execute the array -iteratively till you recognize data and start exporting it. Duckdb integration -is mostly written in Rust with C++ code calling Rust's vtable functions. Rust -code does all data export. PoC implementation moves Duckdb to use C API but -leaves existing Rust code for exporting `vx_array` into DataChunk. +TODO -However, the goal is not to interface with Rust code, so as a baseline the API -provides a way to scan partitions directly into ArrowArrayStream which should be -good enough for most consumers. +## Low level -## Cancellation +TODO -There will be no option to cancel the scan as this isn't possibe on Rust API -either and this is a low priority task. +## Compatibility + +No impact. Existing C Scan API will exist for some time but will be removed +eventually. + +## Drawbacks + +TODO + +## Alternatives -## Testing +TODO -C API doesn't have any testing. I suggest setting up a Catch3 testing target and -a CMake library for C API using FetchContent to download Catch. This way people -not working on Duckdb integration or FFI wouldn't need CMake and Catch. To -integrate C tests with `cargo test`, we can write a `build.rs` extension which -parses C test names and codegenerates rust tests targets calling to Catch. +## Prior Art -## Duckdb integration PoC +- Dividing scan requests into Partitions for threads is taken from DataFusion's + [partitioned streams](https://docs.rs/datafusion/latest/datafusion/physical_plan/enum.Partitioning.html). +- Making Partitions produce Arrays without synchronization mimicks + [Arrow Stream](https://arrow.apache.org/docs/format/CStreamInterface.html) + interface. + +## Unresolved Questions + +- Should high level API have a way to use host's persistent caching options? + In-memory caching may be implemented using host allocator only but for + persistent caching we need additional filesystem customizations i.e. cache + location path. +- Should high level API expose batch reads from Partition? There are plans to + deprecate Partitions on Rust side. +- What introspecion should high level API have for hosts which aren't satisfied + with `Vortex -> ArrowArrayStream` conversion? Should there be iterative + execution API? +- How should API expose definitions of `ArrowSchema` and `ArrowArrayStream`? + Rust's implementation exposes `FFI_` structs and you need to typedef + them manually. Should API bundle `nanoarrow` or let hosts handle typedefs + themselves since the writer is ABI-compatible? Should we gate it behind a + macro? + + ```c + typedef FFI_ArrowSchema ArrowSchema; + #include "vortex.h" + ``` + +## High level API integration example: DuckDB ``` -before: -Duckdb side Vortex side +Before: + +DuckDB Vortex C++ C++ Rust duckdb -> TableFunction vtable -> ffi wrapping -> vtable implementation -after: +After: C++ C++ C duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* @@ -385,9 +393,20 @@ i.e. one worker thread operating on one or multiple partitions. Each thread pulls out partitions from `vx_scan_next` (thus it's thread-safe) and then works on its own partition without synchronization. -[1] `vx_file_scan` - -[2] Need to control pruning - https://spiraldb.slack.com/archives/C0AJS0HDS6R/p1773068549282999 - -[4] e.g. Duckdb MultiFileReader / MultiFileList +## Footnotes + +[^1]: Clients of this API would mostly be query engines like Velox or + ClickHouse, but may as well be our own integrations like vortex-duckdb. +[^2]: Like opening the first file in a glob expression to determine schema. +[^3]: Exposed Rust ABI is not stable, so clients can't use cbindgen. C++ clients + can use cxx.rs but this still requires writing manual bindings and runtime + bridging. +[^4]: DuckDB MultiFileReader and MultiFileList. +[^5]: The name may be misleading as it doesn't correspond to Rust side's + Partitions. +[^6]: DuckDB integration currently hides Partitions and Arrays behind a single + [thread-safe iterator](https://github.com/vortex-data/vortex/blob/e8cd130c8ccac45082a0b458b1f843c4313555bf/vortex-duckdb/src/datasource.rs#L151) + which implies unnecessary intra-thread synchronization on pulling data. + On the other hand, the producer, an async crossbeam queue, allows smoothing + out uneven workloads from the Vortex side, and if that's removed, Vortex's + partition scheduling must be precise. From 3e8a9a4efb00096a236dac660d968df0aed35083 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 1 Apr 2026 15:18:34 +0100 Subject: [PATCH 3/4] focus only on high level as for now --- proposed/0030-c-scan-api.md | 31 +++++++++---------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md index 0dcc0fc..bc06b20 100644 --- a/proposed/0030-c-scan-api.md +++ b/proposed/0030-c-scan-api.md @@ -1,7 +1,7 @@ - Start Date: 2026-03-13 - Authors: Mikhail Kot -# C Scan API +# High level C Scan API ## Summary Provide a layered C Scan API for non-Rust clients[^1], each layer exposing more @@ -72,7 +72,10 @@ requires sync hosts to manage their own event loop which is tedious. Another benefit of splitting API into layers is that it can be discussed and implemented separately. -## High level API +As this is a big change, this PR will from now on focus just on the high level +scan API. Other levels may be implemented later on demand. + +## Overview ``` ┌──────────┐ @@ -91,7 +94,7 @@ implemented separately. └─►Array (thread unsafe) ``` -### DataSource +## DataSource A DataSource is a reference to multiple possibly remote files. When created, it opens first file to determine the schema from DType, all other operations are @@ -187,7 +190,7 @@ to a query engine. Memory allocation customization is out of scope of this proposal, but it's possible for Vortex to expose bringing allocator from outside for buffers. -### Scan +## Scan A Scan is a one-time traversal of files in a DataSource. A Scan can't be restarted once requested. Hosts are encouraged to utilize multiple threads for @@ -262,7 +265,7 @@ vx_data_source_scan(const vx_data_source *ds, const vx_scan_options *options, vx_estimate* estimate, vx_error **err); ``` -### Partition +## Partition A Partition allows a worker thread to produce Arrays thread-unsafely. Partitions also allow exporting the data to ArrowArrayStream for hosts @@ -291,7 +294,7 @@ void vx_partition_scan_arrow(const vx_partition *partition, const vx_array *vx_partition_next(vx_partition *partition, vx_error **err); ``` -### Array introspection +## Array introspection The main question is how to transform outputs of iteration, `vx_array`, into something query engines can operate with. You need to execute the array @@ -299,27 +302,11 @@ iteratively till you recognize data and start exporting it. Thus API provides a way to scan partitions directly into ArrowArrayStream which should be good enough for most hosts. -## Middle level - -TODO - -## Low level - -TODO - ## Compatibility No impact. Existing C Scan API will exist for some time but will be removed eventually. -## Drawbacks - -TODO - -## Alternatives - -TODO - ## Prior Art - Dividing scan requests into Partitions for threads is taken from DataFusion's From 23c6fda1ef1ab929f91be8eb12d47e18079670a9 Mon Sep 17 00:00:00 2001 From: Mikhail Kot Date: Wed, 1 Apr 2026 17:07:34 +0100 Subject: [PATCH 4/4] internal review --- proposed/0030-c-scan-api.md | 168 ++++++++++++++++++++++++------------ 1 file changed, 112 insertions(+), 56 deletions(-) diff --git a/proposed/0030-c-scan-api.md b/proposed/0030-c-scan-api.md index bc06b20..baddf39 100644 --- a/proposed/0030-c-scan-api.md +++ b/proposed/0030-c-scan-api.md @@ -32,9 +32,9 @@ capabilities to the host: ## Motivation Vortex is a Rust project, and thus integration with non-Rust clients requires a -decent amount of scaffolding[^3] which lowers down the probability of such +decent amount of scaffolding[^3] which lowers the probability of such integration. As reading files is most important feature for a file format, -exposing it to C-compatible uses lowers down the integration costs. +exposing it to C-compatible users lowers integration costs. Additional motivation is our DuckDB integration which requires C++-to-Rust and Rust-to-C++ bridging. Replacing it with direct calls to C API would make @@ -49,9 +49,9 @@ reasons: - Lack of support for exporting to `ArrowSchema` and `ArrowArrayStream`. - Lack of introspection over produced `vx_array` objects. - Vortex also has a C++ API in vortex-cxx which wraps Rust code. Its future is - outside of scope of this proposal but it's expected to wrap C scan API, - removing dependency on cxx.rs for vortex-cxx. +> Vortex also has a C++ API in vortex-cxx which wraps Rust code. Its future is +> outside of scope of this proposal but it's expected to wrap C scan API, +> removing dependency on cxx.rs for vortex-cxx. ## Motivation for layered API @@ -70,10 +70,9 @@ no way to suspend to host on IO operations. Exposing only the middle layer requires sync hosts to manage their own event loop which is tedious. Another benefit of splitting API into layers is that it can be discussed and -implemented separately. - -As this is a big change, this PR will from now on focus just on the high level -scan API. Other levels may be implemented later on demand. +implemented separately. As this is a big change, this PR will from now on focus +just on the high level scan API. Other levels may be implemented later on +demand. ## Overview @@ -94,6 +93,15 @@ scan API. Other levels may be implemented later on demand. └─►Array (thread unsafe) ``` +## Event loop + +Vortex manages the event loop by using a CurrentThreadRuntime runtime handle +which in turn has a `smol::Executor<'static>` without a background thread pool. +This means the event loop will run on host's thread pool scheduler (in other +terms, use the calling thread) implicitly. In case of multiple threads, the +underlying executor is shared between them, and drives from from all of these +threads. The executor is initialized once on loading FFI library. + ## DataSource A DataSource is a reference to multiple possibly remote files. When created, it @@ -105,38 +113,41 @@ scans from a DataSource. // bindgen typedef struct vx_data_source vx_data_source; typedef struct vx_file_handle vx_file_handle; -typedef struct vx_cache_handle *vx_cache_handle; -typedef void (*vx_list_callback)(void* userdata, const char* name, int is_dir); -typedef void (*vx_glob_callback)(void* userdata, const char* file); +typedef void* vx_fs_userdata; +typedef void (*vx_list_callback)(vx_fs_userdata userdata, const char* name, + int is_dir); +typedef void (*vx_glob_callback)(vx_fs_userdata userdata, const char* file); typedef struct vx_data_source_options { + const char* files; + // (1) Filesystem customization - bool (*fs_use_vortex)(const char* schema, const char* path); - void (*fs_set_userdata)(void* userdata); + vx_fs_userdata fs_userdata; + void (*fs_free_userdata)(vx_fs_userdata fs_userdata); // Called after glob expansion, single-file mode - vx_file_handle (*fs_open)(void* userdata, const char* path, + vx_file_handle (*fs_open)(vx_fs_userdata userdata, const char* path, vx_error** err); - vx_file_handle (*fs_create)(void* userdata, const char* path, + vx_file_handle (*fs_create)(vx_fs_userdata userdata, const char* path, vx_error** err); // non-recursive, callback is invoked with each path - void fs_list(void* userdata, const char* path, vx_list_callback cb, - vx_error** err); + void (*fs_list)(vx_fs_userdata userdata, const char* path, + vx_list_callback cb, vx_error** err); - void fs_close(vx_file_handle handle); - uint64_t fs_size(vx_file_handle handle, vx_error *err); + void (*fs_close)(vx_file_handle handle); + uint64_t (*fs_size)(vx_file_handle handle, vx_error** err); // positional read, doesn't change file open cursor - void fs_read(vx_file_handle handle, uint64_t offset, size_t len, + void (*fs_read)(vx_file_handle handle, uint64_t offset, uint64_t len, uint8_t *buffer, vx_error** err); // not needed for scanning but makes FS API complete - void fs_write(vx_file_handle handle, uint64_t offset, size_t len, + void (*fs_write)(vx_file_handle handle, uint64_t offset, uint64_t len, uint8_t *buffer, vx_error** err); - void fs_sync(vx_file_handle handle, vx_error** err); + void (*fs_sync)(vx_file_handle handle, vx_error** err); // (2) Globbing customization void (*glob)(const char* glob, vx_glob_callback cb, vx_error** err); @@ -146,12 +157,12 @@ typedef struct vx_data_source_options { // be freed by the caller using release callback. If err is populated, out // is not set. void vx_dtype_to_arrow_schema(const vx_dtype* dtype, ArrowSchema* out, - vx_error* err); + vx_error** err); /// Create a new owned datasource which must be freed by the caller. const vx_data_source * vx_data_source_new(const vx_session *session, - const vx_data_source_options *opts, vx_error_out err); + const vx_data_source_options *opts, vx_error** err); // datasource is Arc'd inside so a clone creates another reference rather // than cloning the state fully. @@ -176,19 +187,32 @@ void vx_data_source_get_row_count(const vx_data_source *ds, void vx_data_source_free(const vx_data_source *ds); ``` +> const vx_data_source* may be misleading as it's actually owned, but that's +> limitation of arc_dyn_wrapper! as for now. This would likely be changed soon + 1. Open local or remote file. Allow using Vortex's filesystem or host's - filesystem e.g. DuckDB fs. Allow partial customization e.g. DuckDB fs for - local reads, but Vortex fs for remote reads. Host filesystem customization + filesystem e.g. DuckDB fs. Host filesystem customization point has the benefit of not duplicating credentials e.g. S3 access key between host and Vortex. Local implementation may be more performant. + User must either implement all `fs_*` callback and set `fs_userdata`, or set + all callbacks in (1) and `fs_userdata` to NULL. + 2. Open single file or multiple files. Hosts may have their own glob expansion [^4] which does HTTP IO. When both customization points are implemented, Vortex offloads all IO to a query engine. - Memory allocation customization is out of scope of this proposal, but it's - possible for Vortex to expose bringing allocator from outside for buffers. +> Memory allocation customization is out of scope of this proposal, but it's +> possible for Vortex to expose bringing allocator from outside for buffers. + +To allow stateful filesystem operations, a DataSource allows providing a custom +filesystem userdata with associated deleter. It may be set per DataSource. +Filesystem customizations do syncrohonous IO without batching. Clients who want +control over IO should use the middle layer or lower. +As filesystem callbacks are synchronous, middle layer would use different option +structs, and there would be no way to re-use DataSource or other structures +between layers. ## Scan @@ -207,13 +231,14 @@ either and this is a low priority task. Scan options: ```c +// roaring bitmap include and exclude won't be supported as for now because of +// exposure complexity typedef enum { VX_S_INCLUDE_ALL = 0, VX_S_INCLUDE_RANGE = 1, VX_S_EXCLUDE_RANGE = 2, } vx_scan_selection_include; -// Roaring bitmaps won't be supported. // If selection is VX_S_INCLUDE_ALL, idx is not read. idx is copied // by host on scan invocation and can be freed after first partition is // requested @@ -226,12 +251,13 @@ typedef struct { typedef struct vx_scan_options { const vx_expression *projection; // NULL means "return all columns" const vx_expression *filter; // NULL means "no filter" - uint64_t row_range_begin; // Inclusive, [0; 0) means "all rows" + // special case: row_range_begin == 0 && row_range_end == 0 means "all rows" + uint64_t row_range_begin; // Inclusive uint64_t row_range_end; // Exclusive vx_scan_selection selection; uint64_t limit; // 0 means no limit uint64_t max_threads; // 0 means no limit. - bool ordered; // 0 means unordered scan + int ordered; // 0 means unordered scan } vx_scan_options; ``` @@ -249,7 +275,7 @@ typedef enum { VX_ESTIMATE_INEXACT = 2, } vx_estimate_boundary; -// If type is VX_P_ESTIMATE_UNKNOWN, estimate is not populated. +// If type is VX_ESTIMATE_UNKNOWN, estimate is not populated. typedef struct { uint64_t estimate; vx_estimate_boundary type; @@ -259,10 +285,17 @@ typedef struct { // estimate->estimate to distribute work. // options and estimate fields may be NUL. // Calling vx_data_source_scan doesn't do IO unless vx_scan_next is called. -// vx_scan can outlive vx_data_source. +// +// As an implementation detail, a scan has a reference to a data source thus +// a scan can outlive a data source. vx_scan * vx_data_source_scan(const vx_data_source *ds, const vx_scan_options *options, vx_estimate* estimate, vx_error **err); + +// Return scan progress from 0.0 to 1.0. +// Reports estimates of read vs. total rows. Thread-safe. +// May be inaccurate, main use case is progress bars in clients. +double vx_scan_progress(const vx_scan *scan); ``` ## Partition @@ -277,21 +310,36 @@ data. // Get next owned partition out of a scan request. // Caller must free this partition using vx_partition_free. // This method is thread-safe. +// You can call vx_scan_free or vx_data_source_free while you are holding +// partitions. // Hosts are encouraged to create a worker thread per partition. -// Returns NULL and doesn't set err on exhaustion. -// Returns NULL and sets err on error. +// Returns NULL and sets *err to NULL on exhaustion. +// Returns NULL and sets *err on error. vx_partition *vx_scan_next(vx_scan *scan, vx_error **err); // Request an array stream in Arrow format from a partition, consuming it -// fully. Not thread-safe, should be called once. -// stream is owned and must be freed using the release callback +// fully. Calling vx_partition_next on partition after calling this function +// is undefined behaviour. +// User does not need to call vx_partition_free in partition after calling +// this function. +// Not thread-safe, should be called once. +// stream is owned and must be freed using the release callback. +// Does not modify stream and sets err on error. void vx_partition_scan_arrow(const vx_partition *partition, - ArrowArrayStream *stream, vx_error_out err); + ArrowArrayStream *stream, vx_error** err); // Thread-unsafe. Get an owned vx_array of an iterator. -// Returns NULL and sets err to NULL if iterator is exhausted. +// Calling from multiple threads is undefined behaviour. // Array must be freed by caller. +// You can call vx_scan_free, vx_data_source_free, or vx_partition_free +// while you are holding arrays from it. +// Returns NULL and sets *err to NULL on exhaustion. +// Returns NULL and sets *err on error. const vx_array *vx_partition_next(vx_partition *partition, vx_error **err); + +// Return a (possibly exact) estimate of rows in a partition. +void vx_partition_row_count(const vx_partition *partition, vx_estimate *count, + vx_error **err); ``` ## Array introspection @@ -300,7 +348,9 @@ The main question is how to transform outputs of iteration, `vx_array`, into something query engines can operate with. You need to execute the array iteratively till you recognize data and start exporting it. Thus API provides a way to scan partitions directly into ArrowArrayStream which should be good -enough for most hosts. +enough for most hosts. However, some hosts may want to work with vx_array +directly as it provides types like ConstantArray which Arrow doesn't have, and +thus convertion requires CPU-intensive work. ## Compatibility @@ -311,19 +361,21 @@ eventually. - Dividing scan requests into Partitions for threads is taken from DataFusion's [partitioned streams](https://docs.rs/datafusion/latest/datafusion/physical_plan/enum.Partitioning.html). -- Making Partitions produce Arrays without synchronization mimicks +- Making Partitions produce Arrays without synchronization mimics [Arrow Stream](https://arrow.apache.org/docs/format/CStreamInterface.html) interface. ## Unresolved Questions +- Should high level API support reading a file from a buffer and not from a + path? Is there a use case for that? - Should high level API have a way to use host's persistent caching options? In-memory caching may be implemented using host allocator only but for persistent caching we need additional filesystem customizations i.e. cache location path. - Should high level API expose batch reads from Partition? There are plans to deprecate Partitions on Rust side. -- What introspecion should high level API have for hosts which aren't satisfied +- What introspection should high level API have for hosts which aren't satisfied with `Vortex -> ArrowArrayStream` conversion? Should there be iterative execution API? - How should API expose definitions of `ArrowSchema` and `ArrowArrayStream`? @@ -337,6 +389,9 @@ eventually. #include "vortex.h" ``` + Current implementation copy-pastes ArrowSchema and ArrowArrayStream and gates + them behind a macro. + ## High level API integration example: DuckDB ``` @@ -355,15 +410,15 @@ duckdb -> TableFunction vtable -> ffi_wrapping -> vx_scan()* * - vx_array -> DataChunk reuses existing Rust code ``` -https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has an -implementation of using C Scan API for Duckdb scan integration. Duckdb has a -sync multi-threaded runtime, and table function is called from multiple threads -simultaneously. Users can save a per-thread state. +https://github.com/vortex-data/vortex/tree/myrrc/scan-api-duckdb has a PoC (not +production-ready) implementation of using C Scan API for Duckdb scan +integration. Duckdb has a sync multi-threaded runtime, and table function is +called from multiple threads simultaneously. Users can save a per-thread state. The integration splits into following parts: -- DType -> LogicalType integration, done sans Temporal extension. +- DType -> LogicalType integration, done except for Temporal extension support. - Table function binding (creating a DataSource), done. -- Global state initialization (creating a Scan), done sans filter pushdown. +- Global state initialization (creating a Scan), done except filter pushdown. - Local state initialization (export batch id), done. - Utility functions like cardinality estimates, done. - vx_array -> DataChunk export, delegated to existing Rust code. @@ -385,15 +440,16 @@ works on its own partition without synchronization. [^1]: Clients of this API would mostly be query engines like Velox or ClickHouse, but may as well be our own integrations like vortex-duckdb. [^2]: Like opening the first file in a glob expression to determine schema. -[^3]: Exposed Rust ABI is not stable, so clients can't use cbindgen. C++ clients - can use cxx.rs but this still requires writing manual bindings and runtime - bridging. +[^3]: Rust ABI is not stable if you don't use `repr(C)`, so clients can't use + cbindgen directly. C++ clients can use cxx.rs but this still requires + writing manual bindings and runtime bridging. [^4]: DuckDB MultiFileReader and MultiFileList. [^5]: The name may be misleading as it doesn't correspond to Rust side's - Partitions. + Partitions. Alternative considered was vx_split but it conflicts with + register_splits in LayoutReader. [^6]: DuckDB integration currently hides Partitions and Arrays behind a single [thread-safe iterator](https://github.com/vortex-data/vortex/blob/e8cd130c8ccac45082a0b458b1f843c4313555bf/vortex-duckdb/src/datasource.rs#L151) - which implies unnecessary intra-thread synchronization on pulling data. - On the other hand, the producer, an async crossbeam queue, allows smoothing - out uneven workloads from the Vortex side, and if that's removed, Vortex's + which implies unnecessary intra-thread synchronization on pulling data. On + the other hand, the producer, an async crossbeam queue, allows smoothing out + uneven workloads from the Vortex side, and if that's removed, Vortex's partition scheduling must be precise.