Skip to content

feat: DynamicInsert with schema-reflected RowBinary encoding#414

Open
catinspace-au wants to merge 76 commits intoClickHouse:mainfrom
hyperi-io:hyperi/schema-cache-strong
Open

feat: DynamicInsert with schema-reflected RowBinary encoding#414
catinspace-au wants to merge 76 commits intoClickHouse:mainfrom
hyperi-io:hyperi/schema-cache-strong

Conversation

@catinspace-au
Copy link
Copy Markdown
Contributor

Summary

Adds DynamicInsert — a schema-reflected RowBinary insert path that encodes Map<String, Value> rows directly into ClickHouse's native binary format, eliminating server-side JSON parsing overhead.

Key components:

  • DynamicInsert — schema-reflected RowBinary encoding via system.columns metadata
  • DynamicSchemaCache — TTL-based thread-safe schema cache with invalidation
  • DynamicBatcher — async multi-producer batch inserter with auto-flush
  • ParsedType — structured type parser with TypeTag enum dispatch for fast encoding
  • Schema drift recovery — auto-invalidates cache on SchemaMismatch and data errors ("cannot parse", "incorrect data", "type mismatch")
  • Nullable support — proper 1-byte flag encoding for all Nullable(T) types

Highlights

  • Schema fetched once from system.columns, cached with configurable TTL (default 5min)
  • FxHashMap for O(1) column lookups (non-cryptographic, 2-3x faster than std HashMap)
  • TypeTag enum dispatch — pre-computed at schema-fetch time, single jump table per row
  • Cow<str> in value encoding — borrows existing strings, allocates only for type conversions
  • JSON columns encoded as strings with input_format_binary_read_json_as_string=1 setting
  • Comprehensive nullable/non-nullable tests for ALL supported types

Type Coverage

Integer (U/Int 8-256), Float32/64, Bool, String, FixedString, Date/Date32, DateTime/DateTime64, UUID, IPv4/IPv6, Decimal32/64/128, Enum8/16, JSON, Array, Map, Tuple, LowCardinality, LowCardinality(Nullable), and all Nullable variants.

Test plan

  • Unit tests for type parsing, encoding, schema cache
  • Integration tests with real ClickHouse for every supported type
  • Nullable/non-nullable matrix covering all types with NULL and non-NULL values
  • Edge cases: empty strings, zero numerics, epoch dates, empty collections
  • Schema drift recovery: error classification, cache invalidation on data errors

- Map 'Bool' type string to UInt8 in the column type parser (Bool is
  a UInt8 alias on the wire: true=1, false=0)
- Implement sparse (custom) column serialization in the data block
  reader: read offset groups to locate non-default values, read only
  those values, then reconstruct the full column with zero/empty/null
  defaults at the remaining positions
- Add native_bool_type integration test covering SELECT of true/false
  values; also exercises the sparse code path on the cluster where
  ClickHouse sends Bool columns with custom_ser=1
…e, Decimal, Enum, UUID, BFloat16, Point, Time

Adds tests and bug fixes for all previously untested types:

- UInt128/Int128 (u128/i128), UInt256/Int256 ([u8;32] raw LE)
- BFloat16 (u16 raw bits; 1.0 = 0x3F80, 2.0 = 0x4000)
- UUID ([u8;16], ClickHouse stores as two LE uint64s: low-value at byte 8)
- Enum8/Enum16 (wire-compatible with Int8/Int16; read as i8/i16)
- Date/Date32/DateTime/DateTime64 (3, 6, 9 precision) with epoch-relative int values
- Time/Time64(3) — seconds/milliseconds since midnight
- Decimal32/Decimal128/Decimal256 (all sizes; Decimal64 was already tested)
- Point (geo) — fix: Point is Tuple(Float64,Float64) in columnar format,
  not a flat 16-byte blob; all x-values come before all y-values in the
  native stream, so we must read two separate Float64 sub-columns and
  interleave per-row
Each NativeClient now owns a lazily-initialised bounded connection pool
(default size: 10) instead of opening a fresh TCP connection per operation.

Pool model:
- tokio Semaphore caps total (idle + in-use) connections to pool_size
- Idle connections stored in Mutex<VecDeque> (FIFO — reuses recently
  returned connections first)
- Pool created on first acquire() call with a snapshot of the current
  client config; builder methods that change connection params
  (with_addr, with_database, with_user, with_password, with_setting,
  with_lz4) reset the pool so fresh connections use updated config
- clone() shares the same pool across client copies

PooledConnection guard:
- DerefMut<Target=NativeConnection> for transparent use at all call sites
- Returns connection to pool on drop
- discard() marks a connection as broken — it is closed and the
  semaphore slot is freed instead of queuing the conn as idle
- Cursor aborts (exception / incomplete read) call discard() to avoid
  returning misaligned connections to the pool
- NativeInsert::abort() discards the connection (incomplete INSERT)

New public API: NativeClient::with_pool_size(n)

Added native_pool_reuse integration test: 20 sequential pings on a
pool capped to 1 connection verify that the same connection is reused.
…schema cache

New files:
- protocol.rs, io.rs, tcp.rs, connection.rs: low-level TCP + handshake
- block_info.rs, client_info.rs: protocol framing helpers
- writer.rs: packet serialisation (query, data block, ping, addendum)
- compression.rs: LZ4 block decompression
- error_codes.rs: server error parsing
- sparse.rs: sparse column deserialisation (Bool / custom_ser=1)
- encode.rs: RowBinary → columnar transpose for INSERT
- schema.rs: TTL-backed schema cache (NativeSchemaCache)
- inserter.rs: multi-batch NativeInserter<T> (mirrors HTTP Inserter)
- batcher.rs, tests/it/batcher.rs: HTTP batcher (separate feature)

Core changes:
- Cargo.toml: deadpool, zstd, socket2 under native-transport feature
- src/lib.rs: expose native module + public re-exports
- tests/it/main.rs: wire in native and batcher test modules
…k; edge-case tests

Pool:
- Replace hand-rolled Semaphore/Mutex pool with deadpool::managed
- NativeConnectionManager implements Manager::create/recycle
- Poisoned connections rejected in recycle(); dropped instead of recycled
- NativeClient stores NativePool directly (Arc-backed); builder methods
  call rebuild_pool() on config changes

Connection:
- Add poisoned field + is_poisoned(); check_alive() for recycle health check
- check_alive(): rejects if poisoned, BufReader has leftover bytes, or
  non-blocking poll_read returns EOF/unexpected data (noop waker)

Cursor / Query:
- NativeRowCursor: add drain() to consume packets until EndOfStream
- NativeRowCursor: Drop impl discards connection if stream not fully consumed
- fetch_one / fetch_optional: call drain() after retrieving row so the
  connection is returned to the pool in a clean state (fixes RowNotFound
  race when pool connections were reused mid-stream)

Insert:
- NativeInsert<T>: add Drop impl that calls abort() to discard connection
  if end() was never called
- end(): take conn out on success so Drop is a no-op after clean commit

Tests (59 integration tests, 11 sparse unit tests):
- Pool: concurrent (10 tasks / pool_size=2), error recovery, insert+ping
- Bool/sparse: stream alignment, multi-column, large gap (1000 rows),
  single true at start, single true at end
- Sparse unit: single-row, large gap, consecutive non-defaults, state carry
- Cursor: fetch_one on empty, fetch_optional, bind() edge cases
- INSERT: large batch, abort, sequential, bool insert
- Schema cache: miss, clear_all
Encoder (encode.rs):
- Remove strip_low_cardinality() — send original type name to server
- Implement proper LowCardinality wire encoding in write_col_values:
  * version=1, HAS_ADDITIONAL_KEYS flag, dict + indices
  * For LC(Nullable(T)): dict type is T (not Nullable(T)); index 0 is
    the null sentinel (default T value); null inputs → index 0
  * For LC(T): plain T dict; indices map rows to unique values
  * Chooses smallest index type (U8/U16/U32/U64) based on dict size

Reader (columns.rs):
- Fix read_low_cardinality_column for LC(Nullable(T)):
  * Dict is now read using T (not Nullable(T)) — matches ClickHouse wire
  * Index 0 → RowBinary null [0x01]; other indices → [0x00, T bytes]
  * Non-nullable LC unchanged

Tests:
- native_insert_low_cardinality: LowCardinality(String) INSERT + readback
- native_insert_low_cardinality_nullable: LC(Nullable(String)) with None values
Ported from HyperI DFE Loader's per-table buffer + orchestrator pattern
(dfe-loader/src/buffer/). The key improvement is embedding the batching
policy in the library rather than requiring each consumer to re-implement
the orchestrator select! loop.

Architecture: bounded MPSC channel + background tokio task with select!
over command recv and periodic timer tick. Provides concurrent writes,
backpressure, and automatic flush on row/byte/period limits.

HTTP transport:
- AsyncInserter<T> + AsyncInserterConfig + AsyncInserterHandle<T>
- feature = "async-inserter" (depends on "inserter" + tokio time/sync)

Native TCP transport:
- AsyncNativeInserter<T> + AsyncNativeInserterConfig + AsyncNativeInserterHandle<T>
- Same MPSC pattern, uses NativeInserter<T> internally

TableBatcher<T> reworked as thin wrapper over AsyncInserter<T>:
- append() now takes owned T (was &T::Value<'_>)
- Removes Arc<Mutex<Inserter>> in favour of channel-based concurrency

Integration tests: happy path, edge cases (single row, empty flush,
double flush, max_rows=1, large strings, tiny channel, max_bytes trigger),
failure cases (write/flush after end, bad table), and stress/concurrency
tests (20 concurrent writers × 50 rows, interleaved flush).
Realistic, large, deeply nested JSON blobs matching Elastic Beat agent
output in production. Exercises full insert round-trip for both HTTP
and native TCP transports.

Payloads:
- Filebeat nginx access: Unicode URL params (CJK, French, German),
  nested headers, JWT cookie, geo coords, null fields
- Winlogbeat Security 4625: Cyrillic usernames/workstation names,
  Windows SIDs, embedded \n\t in multiline message
- Filebeat Java stack trace: Windows backslash paths, 3 chained
  exceptions, Spring CGLIB frames, diacritics in user_id
- Winlogbeat PowerShell 4104: ScriptBlockText with heavy diacritics,
  4-level deep nesting, escaped quotes, Base64 encoding
- Filebeat Kubernetes: JSON-in-JSON message, CJK payment error,
  Greek symbols, Yen currency, Go goroutine stack trace
- Mixed beats concurrent: all 5 sources from 5 concurrent handles,
  max_rows=15 forcing mid-batch flushes, per-source count verification
…s, and wire format

Create docs/ directory with topic-focused guides:
- native-transport.md: connect, query, insert over TCP
- connection-pooling.md: deadpool pool, health checks, discard pattern
- batching.md: AsyncInserter, AsyncNativeInserter, TableBatcher hierarchy
- types.md: full type coverage matrix across HTTP and native transports
- wire-format.md: LowCardinality, Dynamic, Variant, Array encoding internals
- migration.md: HTTP vs native trade-offs, switching guide, upstream differences

Update root README.md with native transport section and links to docs.
Replace all text-art diagrams with ```mermaid blocks:
- batching.md: inserter hierarchy + MPSC architecture
- connection-pooling.md: cursor drain flow + pool architecture
- native-transport.md: connection lifecycle
- migration.md: branch structure
Update STATE.md branch strategy table with current 5-branch structure.
Update migration.md Mermaid diagram with hyperi/ prefix.
Rich ClickHouse type parser lifted from HyperI dfe-loader. Handles
Nullable, LowCardinality, Array, Map, DateTime64(p, tz), Decimal(p, s),
FixedString(n), Enum. 15 unit tests. DynamicError for schema mismatch,
encoding, and fetch errors.
ColumnDef with ParsedType, DynamicSchemaCache with TTL + invalidation,
fetch_dynamic_schema() queries system.columns. Uses positional tuple
fetch to avoid derive(Row) macro issues inside the crate. 5 unit tests.
Encodes Map<String, Value> to RowBinary using DynamicSchema. Supports
all scalar types, Nullable, FixedString, UUID, IPv4/IPv6, Array, Map,
JSON. Type-appropriate defaults for missing columns. Unknown types
fall back to String encoding for forward compatibility. 14 unit tests.

End-to-end CPU perspective documented: schema-reflected RowBinary
shifts parsing cost from the ClickHouse cluster to the client, where
the work (binary encoding vs JSON serialisation) is roughly equivalent
but the server does zero parsing on ingest.
Client::dynamic_insert() creates a DynamicInsert that fetches schema
from system.columns, encodes Map<String, Value> to RowBinary, and
handles schema mismatch with automatic cache invalidation.

Adds serde_json as a runtime dependency (was dev-only).
Adds dynamic_schema_cache field to Client (shared via Arc).
MPSC bounded channel, background flush task, row count + time thresholds.
Schema recovery on mismatch: invalidate cache, re-fetch, retry once.
DynamicBatcherHandle for multi-producer concurrent writes.
Client::dynamic_batcher() convenience method.
Tests: simple types, nullable columns, default column skipping,
batcher end-flush. Requires running ClickHouse instance.
- fxhash (RUSTSEC-2025-0057, unmaintained) -> rustc-hash 2.x
- linked-hash-map (no releases since 2020) -> indexmap (already a dep)
- cargo update for semver-compatible bumps
…436)

- Drop polonius-the-crab and 3 transitive deps (paste, higher-kinded-types,
  macro_rules_attribute) — clears RUSTSEC-2024-0436 (paste unmaintained)
- Inline the raw-pointer reborrow that polonius wrapped behind a macro
- Fix rustfmt.toml edition 2021 -> 2024 to match Cargo.toml
4 mock-based tests (no ClickHouse needed) + 3 integration tests
covering the unsafe reborrow paths in poll_next and Next::poll:
- single row, multi-row, empty result, fetch_all/fetch_one (mock)
- large result spanning chunks, borrowed rows, small block size (integration)
Approach C (additive wrapper): UnifiedClient holds a Transport enum
dispatching to Client (HTTP) or NativeClient (native TCP, cfg-gated).
Includes UnifiedHttpBuilder and UnifiedNativeBuilder with full builder
delegation and From<Builder> for UnifiedClient conversions.
Introduces `UnifiedQuery` (src/unified_query.rs) holding a `QueryInner`
enum that wraps either the HTTP `Query` or the native `NativeQuery`.
Dispatches `bind`, `execute`, `fetch_all`, `fetch_one`, and
`fetch_optional` to whichever backend is active.

Adds `UnifiedClient::query(&self, sql) -> UnifiedQuery` in `unified.rs`.
Registers `pub mod unified_query` in `lib.rs`.

`bind` uses `impl Display` as the common interface: for HTTP the value
is `.to_string()`-ed (String: Serialize satisfies the Bind trait);
for native it is passed directly to `NativeQuery::bind`.
Introduces `UnifiedCursor<T>` in `src/unified_cursor.rs` — a streaming
row cursor that wraps either `RowCursor<T>` (HTTP) or `NativeRowCursor<T>`
(native TCP) and exposes a single `next() -> Result<Option<T>>` method.

`T: RowOwned + RowRead` is required so both transports can return owned
values: for HTTP `T::Value<'_> = T` (via the `RowOwned` supertrait bound),
for native TCP `next()` already returns `T` directly.

Adds `UnifiedQuery::fetch::<T>() -> Result<UnifiedCursor<T>>` as the
primary constructor.  Registers `unified_cursor` as a public module in
`lib.rs`.
Introduces `UnifiedInsert<T>` in `src/unified_insert.rs` — an INSERT
handle wrapping either `Insert<T>` (HTTP) or `NativeInsert<T>` (native
TCP) and exposing `write(&T::Value<'_>)` and `end()`.

`end()` requires `T: Row` because `NativeInsert<T>` is only implemented
for `T: Row`.  In practice callers always have this bound since
`UnifiedClient::insert::<T>()` itself requires `T: Row`.

`insert_formatted_with()` is added to `UnifiedClient` returning
`UnsupportedTransport` for the native backend, as the InsertFormatted
API is HTTP-only.  Both new methods are wired in `src/unified.rs`.
Registers `unified_insert` as a public module in `lib.rs`.
Adds `UnifiedClient::ping()` which routes to `SELECT 1` (HTTP) or the
native ping packet (native TCP transport).
Add `with_roles()` and `with_default_roles()` to `NativeClient` and
`UnifiedNativeBuilder`, mirroring the existing HTTP `Client` API.

Roles are session-scoped in ClickHouse. The pool manager sends
`SET ROLE role1, role2, …` once per new connection immediately after
the handshake, before the connection enters the idle queue. All
subsequent queries on that connection inherit the active roles without
any per-query overhead.

Implementation:
- `NativeClient`: add `roles: Vec<String>` field, builder methods, and
  propagate through `rebuild_pool()` / `PoolConfig`.
- `PoolConfig`: add `roles` field.
- `NativeConnectionManager::create()`: call `conn.set_roles()` when
  `roles` is non-empty, before returning the connection.
- `NativeConnection::set_roles()`: new method that issues the
  `SET ROLE …` query via `execute_query()`.
- `UnifiedNativeBuilder`: add `with_roles()` / `with_default_roles()`
  delegating to `NativeClient`.
ClickHouse native protocol requires param_ settings to be sent with
the Custom flag (0x02) and values wrapped in single quotes (field dump
format). Without this, the server rejects named parameters with
"Substitution not set" errors.

Also fixes test bugs: use .param() builder, DNS resolution for
multi-host test.
Box-drawing chars -> ASCII art, em-dashes -> --, arrows -> ->,
multiplication signs -> x. Intentional UTF-8 in test data preserved.
- read_exception now loops until has_nested=false, consuming the entire
  exception chain. Previously only read the first exception and left
  nested bytes on the wire, causing stream desync on complex errors.
- Parameters (param_*) sent AFTER the query body in their own block
  (revision >= 54459), not mixed into the settings block.
- Clean up code comments.
Native protocol sends parameter keys as bare names ("val"), not
prefixed ("param_val"). The prefix is HTTP-only (URL params).
20 tests exercising the exact upstream clickhouse-rs public API:
client builders, query/fetch/bind, cursor iteration, typed insert,
borrowed rows, Identifier binding, error types, and new HTTP methods
(ping, server_version, query_id, settings).

Zero regressions from fork changes.
with_addr() previously called expect() on to_socket_addrs(), causing a
panic when the hostname couldn't be resolved. This is wrong for builder
patterns — the error should surface at connect time, not at construction.

Now logs a warning and leaves addrs empty, which will fail on first
connect attempt with a proper error.
Three changes from dfe-loader's battle-tested insert path:

1. Cow<str> in value_to_str() -- borrow string values directly instead
   of cloning on every row. The common case (Value::String) is now
   zero-allocation.

2. FxHashMap for all internal maps -- rustc-hash (2-3x faster than
   std HashMap for string keys, no crypto overhead needed). Swapped in
   DynamicSchema column_index, schema caches, Client options/headers,
   InsertMetadataCache, RowMetadata column_lookup, and LowCardinality
   dedup in native encode.

3. TypeTag enum on ParsedType -- pre-computed integer discriminant
   replaces string comparison (match pt.base.as_str()) on the encode
   hot path. Resolved once at schema-fetch time, used on every row.
   encode_typed() now matches on TypeTag variants.
Document WHY Cow<str>, TypeTag, and FxHashMap were chosen.
These are ported from the dfe-loader production insert path
where they eliminate measurable overhead at scale.
Security fixes from code review (Derek Thoms review):

1. SQL injection in NativeQuery::bind() -- now uses escape::string()
   to properly escape backslashes, quotes, backticks, tabs, newlines.
   bind() wraps values as quoted strings. For typed params use .param().

2. SQL injection in cancel_query() -- switched from format! interpolation
   to server-side parameter binding ({qid:String} + .param()).

3. Incomplete param escaping on native wire -- replaced manual
   single-quote-only replace() with escape::string() which handles
   backslashes (the old code was vulnerable to backslash breakout).

4. Unescaped identifiers in DynamicInsert -- database, table, and column
   names now backtick-escaped via escape::identifier().

5. Unescaped role names in set_roles() -- now backtick-escaped.

6. Unbounded allocation from server num_rows/num_columns -- added
   MAX_BLOCK_COLUMNS (100k) and MAX_BLOCK_ROWS (100M) sanity caps
   to prevent OOM from malicious server responses.

7. Password/JWT redacted from Debug output -- manual Debug impl on
   Authentication replaces derive(Debug) to prevent credential leakage
   in logs, panics, or error messages.

8. #[non_exhaustive] on Progress and ProfileInfo -- prevents semver
   breakage when ClickHouse adds new protocol fields.

9. Eliminated unsafe noop_waker() -- replaced with Waker::noop() (stable
   since Rust 1.85). Removes last unsafe block outside cursors/row.rs.

10. Removed .unwrap() from DynamicInsert library code -- replaced with
    let-else pattern and proper error returns.

11. Fixed remaining em-dashes in with_addr comments.

12. Updated bind_multiple test to use .param() (server-side binding).
Native test database setup now uses:
- DROP DATABASE IF EXISTS ... SYNC (waits for all replicas)
- CREATE DATABASE IF NOT EXISTS (belt-and-braces fallback)

Without SYNC, ON CLUSTER DDL returns before all replicas execute
the DROP -- a fast re-run hits "already exists" because the CREATE
arrives before the DROP propagates. Upstream HTTP tests use
wait_end_of_query=1 which is the HTTP equivalent; SYNC is the
native protocol equivalent.
1. NativeClient::fetch_schema(): database and table names were
   interpolated directly into the WHERE clause without escaping.
   Now uses escape::string() -- matches the HTTP path and
   DynamicInsert schema fetch which already escape correctly.

2. Array and Map column readers: end - prev subtraction could
   underflow if a malicious server sent non-monotonic offsets.
   Added bounds check (end >= prev) before subtraction.
Security:
- Table name in INSERT now escaped via escape::identifier() (both
  HTTP and native paths). Resolves upstream TODO comment.
- HTTP LZ4 decompression: added MAX_UNCOMPRESSED_SIZE (1 GiB) cap
  to prevent decompression bomb from malicious server responses.
  The native path already had this cap; HTTP path did not.

Bugs:
- UUID byte order in rowbinary_to_json_inner: was using from_be_bytes
  but RowBinary stores u64 pairs in little-endian. Fixed to
  from_le_bytes. Affected Dynamic/Variant columns containing UUIDs.
- Date type in dynamic encoder: was grouped with Int16 (signed) but
  ClickHouse Date is UInt16 (unsigned days since epoch). Dates beyond
  day 32767 (2059-09-18) would wrap to negative. Moved to UInt16 arm.

Perf:
- encode_map: map keys were cloned into Value::String just to call
  encode_value. Keys are always String type -- write bytes directly,
  avoiding a heap allocation per map entry per row.
kazmosahebi added 2 commits April 2, 2026 16:30
ClickHouse's Nullable(JSON) in RowBinary does not use the standard
0x00/0x01 Nullable prefix byte. The JSON binary format handles NULL
internally. Sending the Nullable prefix causes ClickHouse to interpret
it as the start of JSON data, resulting in "Unknown type code: 0x73".

- Skip Nullable wrapper when TypeTag is JSON
- Send empty string for NULL JSON values (ClickHouse interprets as NULL)
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ catinspace-au
❌ kazmosahebi


kazmosahebi seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

catinspace-au and others added 5 commits April 2, 2026 17:10
…owBinary inserts

ClickHouse's GA JSON type (25.3+) uses a structured path-value binary
format in RowBinary by default. The encoder sends JSON as a
length-prefixed string, which broke for Nullable(JSON) columns —
ClickHouse misinterpreted the Nullable prefix byte as part of the
structured format, causing "Unknown type code: 0x73" errors.

Set input_format_binary_read_json_as_string=1 on INSERT queries when
JSON columns are detected, telling ClickHouse to accept the string
encoding the encoder already produces.

Fixes hyperi-io/dfe-loader#22
…coder

DateTime64 columns were grouped with Int64 in the encoder, so datetime
strings like "2026-04-07 07:03:00.095" from dfe-loader's transformer
failed with "string not parseable as i64". Adds datetime64_to_epoch()
with a lightweight parser (no chrono dep) that converts datetime strings
to precision-scaled epoch values.

JSON columns used value.to_string() which double-quoted Value::String
content, producing "\"{ ... }\"" that ClickHouse rejected with "Cannot
read JSON object from JSON element". Now writes string content directly
when the value is already a String (the common case for dfe-loader's
_json column which stores raw Kafka payloads).

Fixes dfe-loader RowBinary insert failures for Nullable(JSON) columns
introduced when migrating from Nullable(String) to Nullable(JSON).
…ncoding

fix: RowBinary DateTime64 string parsing and JSON double-quoting
DateTime64: separate from Int64 match arm, add datetime64_to_epoch()
with Howard Hinnant civil days algorithm. Converts "YYYY-MM-DD HH:MM:SS.fff"
strings to precision-scaled epoch values without chrono dependency.

JSON: Value::String writes content directly instead of value.to_string()
which double-quoted and escaped it, causing ClickHouse rejection.

Review fixes (post PR #12):
- Reject non-zero timezone offsets instead of silently dropping them
- Return error on epoch multiplication overflow instead of wrong value
- Eliminate String::replace('T', " ") hot-path allocation
- Clamp fractional seconds to 9 digits to avoid u32 parse overflow
- 21 new tests covering all fixes and edge cases
DynamicInsert::write_map_with_raw() accepts raw bytes for named columns
(e.g. _json), writing them directly as length-prefixed strings in the
RowBinary output. This avoids Value::String wrapping and to_string()
re-serialisation — zero intermediate allocation for columns where the
caller already has the raw bytes (e.g. raw Kafka payload).

encode_dynamic_row_with_raw() is the underlying encoder function that
checks raw_columns before the normal Value encoding path.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants