diff --git a/Cargo.lock b/Cargo.lock index c55c102409..42a02c8b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3355,6 +3355,7 @@ dependencies = [ "bimap", "bytes", "chrono", + "crc32fast", "derive_builder", "expect-test", "fastnum", diff --git a/Cargo.toml b/Cargo.toml index 7f612c44bf..03719e5c42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ bytes = "1.11" cfg-if = "1" chrono = "0.4.41" clap = { version = "4.5.48", features = ["derive", "cargo"] } +crc32fast = "1" dashmap = "6" datafusion = "53.1.0" datafusion-cli = "53.0.0" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 18729176dc..6dc15ad717 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -52,6 +52,7 @@ base64 = { workspace = true } bimap = { workspace = true } bytes = { workspace = true } chrono = { workspace = true } +crc32fast = { workspace = true } derive_builder = { workspace = true } expect-test = { workspace = true } flate2 = { workspace = true } diff --git a/crates/iceberg/src/delete_vector.rs b/crates/iceberg/src/delete_vector.rs index df8a10193c..4224bb3907 100644 --- a/crates/iceberg/src/delete_vector.rs +++ b/crates/iceberg/src/delete_vector.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::io::Cursor; use std::ops::BitOrAssign; use roaring::RoaringTreemap; @@ -23,6 +24,17 @@ use roaring::treemap::BitmapIter; use crate::{Error, ErrorKind, Result}; +#[allow(dead_code)] +const DV_BLOB_MAGIC: [u8; 4] = [0xD1, 0xD3, 0x39, 0x64]; +#[allow(dead_code)] +const DV_LENGTH_FIELD_SIZE: usize = 4; +#[allow(dead_code)] +const DV_MAGIC_SIZE: usize = 4; +#[allow(dead_code)] +const DV_CRC_SIZE: usize = 4; +#[allow(dead_code)] +const DV_MIN_BLOB_SIZE: usize = DV_LENGTH_FIELD_SIZE + DV_MAGIC_SIZE + DV_CRC_SIZE; + #[derive(Debug, Default)] pub struct DeleteVector { inner: RoaringTreemap, @@ -68,6 +80,88 @@ impl DeleteVector { pub fn len(&self) -> u64 { self.inner.len() } + + #[allow(dead_code)] + pub fn deserialize_blob(blob_bytes: &[u8], expected_cardinality: Option) -> Result { + if blob_bytes.len() < DV_MIN_BLOB_SIZE { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "deletion-vector-v1 blob too short: {} bytes (minimum {DV_MIN_BLOB_SIZE})", + blob_bytes.len() + ), + )); + } + + // 2GB cap matches Java's signed-i32 length field. + if blob_bytes.len() > i32::MAX as usize { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "deletion-vector-v1 blob exceeds 2GB: {} bytes", + blob_bytes.len() + ), + )); + } + + let declared_len = + u32::from_be_bytes(blob_bytes[0..DV_LENGTH_FIELD_SIZE].try_into().unwrap()) as usize; + let expected_len = blob_bytes.len() - DV_LENGTH_FIELD_SIZE - DV_CRC_SIZE; + if declared_len != expected_len { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "deletion-vector-v1 length mismatch: declared {declared_len}, actual {expected_len}" + ), + )); + } + + let magic_end = DV_LENGTH_FIELD_SIZE + DV_MAGIC_SIZE; + let magic = &blob_bytes[DV_LENGTH_FIELD_SIZE..magic_end]; + if magic != DV_BLOB_MAGIC { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "deletion-vector-v1 bad magic: {magic:02X?}, expected {DV_BLOB_MAGIC:02X?}" + ), + )); + } + + let roaring_end = blob_bytes.len() - DV_CRC_SIZE; + let mut cursor = Cursor::new(&blob_bytes[magic_end..roaring_end]); + let inner = RoaringTreemap::deserialize_from(&mut cursor).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "failed to deserialize deletion-vector-v1 roaring payload", + ) + .with_source(e) + })?; + + let expected_crc = u32::from_be_bytes(blob_bytes[roaring_end..].try_into().unwrap()); + let computed_crc = crc32fast::hash(&blob_bytes[DV_LENGTH_FIELD_SIZE..roaring_end]); + if computed_crc != expected_crc { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "deletion-vector-v1 CRC mismatch: computed 0x{computed_crc:08x}, expected 0x{expected_crc:08x}" + ), + )); + } + + if let Some(expected) = expected_cardinality { + let actual = inner.len(); + if actual != expected { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "deletion-vector-v1 cardinality mismatch: got {actual}, expected {expected}" + ), + )); + } + } + + Ok(DeleteVector { inner }) + } } // Ideally, we'd just wrap `roaring::RoaringTreemap`'s iterator, `roaring::treemap::Iter` here. @@ -198,4 +292,214 @@ mod tests { let res = dv.insert_positions(&positions); assert!(res.is_err()); } + + // ----- deletion-vector-v1 blob deserialization ----- + + /// Inverse of [`DeleteVector::deserialize_blob`]; promoted to a public + /// `serialize_blob` when the DV write path lands (PR-D). + fn serialize_blob_for_test(dv: &DeleteVector) -> Vec { + let mut roaring = Vec::with_capacity(dv.inner.serialized_size()); + dv.inner.serialize_into(&mut roaring).unwrap(); + + let bitmap_data_len = DV_MAGIC_SIZE + roaring.len(); + let mut blob = Vec::with_capacity(DV_LENGTH_FIELD_SIZE + bitmap_data_len + DV_CRC_SIZE); + blob.extend_from_slice(&(bitmap_data_len as u32).to_be_bytes()); + blob.extend_from_slice(&DV_BLOB_MAGIC); + blob.extend_from_slice(&roaring); + let crc = crc32fast::hash(&blob[DV_LENGTH_FIELD_SIZE..]); + blob.extend_from_slice(&crc.to_be_bytes()); + blob + } + + fn dv_from(positions: impl IntoIterator) -> DeleteVector { + let mut dv = DeleteVector::default(); + for p in positions { + dv.insert(p); + } + dv + } + + fn assert_positions(dv: &DeleteVector, expected: &[u64]) { + let mut got: Vec = dv.iter().collect(); + got.sort_unstable(); + let mut want = expected.to_vec(); + want.sort_unstable(); + assert_eq!(got, want); + } + + #[test] + fn dv_blob_roundtrip_empty() { + let dv = DeleteVector::default(); + let bytes = serialize_blob_for_test(&dv); + let parsed = DeleteVector::deserialize_blob(&bytes, Some(0)).unwrap(); + assert_eq!(parsed.len(), 0); + assert_positions(&parsed, &[]); + } + + #[test] + fn dv_blob_roundtrip_small() { + let dv = dv_from([0, 5, 100]); + let bytes = serialize_blob_for_test(&dv); + let parsed = DeleteVector::deserialize_blob(&bytes, Some(3)).unwrap(); + assert_positions(&parsed, &[0, 5, 100]); + } + + #[test] + fn dv_blob_roundtrip_64bit_keys() { + // Positions span multiple inner 32-bit bitmaps. + let positions = [1u64, 1u64 << 33, (1u64 << 33) + 5]; + let dv = dv_from(positions); + let bytes = serialize_blob_for_test(&dv); + let parsed = DeleteVector::deserialize_blob(&bytes, Some(3)).unwrap(); + assert_positions(&parsed, &positions); + } + + #[test] + fn dv_blob_roundtrip_dense_range() { + let dv = dv_from(0..10_000); + let bytes = serialize_blob_for_test(&dv); + let parsed = DeleteVector::deserialize_blob(&bytes, Some(10_000)).unwrap(); + assert_eq!(parsed.len(), 10_000); + } + + #[test] + fn dv_blob_cardinality_mismatch_errors() { + let bytes = serialize_blob_for_test(&dv_from([0, 5, 100])); + let err = DeleteVector::deserialize_blob(&bytes, Some(99)).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("cardinality mismatch"), + "unexpected error: {err}" + ); + } + + #[test] + fn dv_blob_cardinality_skip_when_none() { + let bytes = serialize_blob_for_test(&dv_from([0, 5, 100])); + let parsed = DeleteVector::deserialize_blob(&bytes, None).unwrap(); + assert_positions(&parsed, &[0, 5, 100]); + } + + #[test] + fn dv_blob_rejects_short_buffer() { + let err = DeleteVector::deserialize_blob(&[0u8; 11], None).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("too short"), + "unexpected error: {err}" + ); + } + + #[test] + fn dv_blob_rejects_wrong_magic() { + let mut bytes = serialize_blob_for_test(&dv_from([0])); + bytes[DV_LENGTH_FIELD_SIZE] ^= 0xFF; + let err = DeleteVector::deserialize_blob(&bytes, None).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("bad magic"), + "unexpected error: {err}" + ); + } + + #[test] + fn dv_blob_rejects_bad_crc() { + let mut bytes = serialize_blob_for_test(&dv_from([0, 1, 2])); + // Corrupt a roaring payload byte; roaring may also reject before CRC check. + let mid = DV_LENGTH_FIELD_SIZE + DV_MAGIC_SIZE + 1; + bytes[mid] ^= 0xFF; + let err = DeleteVector::deserialize_blob(&bytes, None).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + let msg = err.message(); + assert!( + msg.contains("CRC mismatch") || msg.contains("roaring"), + "expected CRC or roaring failure, got: {msg}" + ); + } + + #[test] + fn dv_blob_rejects_length_mismatch() { + let mut bytes = serialize_blob_for_test(&dv_from([0])); + let original_len = u32::from_be_bytes(bytes[0..4].try_into().unwrap()); + bytes[0..4].copy_from_slice(&(original_len - 1).to_be_bytes()); + let err = DeleteVector::deserialize_blob(&bytes, None).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.message().contains("length mismatch"), + "unexpected error: {err}" + ); + } + + // ----- golden fixtures (see testdata/deletes/README.md) ----- + + fn fixture_path(name: &str) -> std::path::PathBuf { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("testdata") + .join("deletes") + .join(name) + } + + fn read_fixture(name: &str) -> Vec { + let path = fixture_path(name); + std::fs::read(&path).unwrap_or_else(|e| panic!("missing fixture {}: {e}", path.display())) + } + + #[test] + fn dv_blob_golden_empty() { + let bytes = read_fixture("empty.bin"); + let dv = DeleteVector::deserialize_blob(&bytes, Some(0)).unwrap(); + assert_eq!(dv.len(), 0); + } + + #[test] + fn dv_blob_golden_single_position() { + let bytes = read_fixture("single-position.bin"); + let dv = DeleteVector::deserialize_blob(&bytes, Some(1)).unwrap(); + assert_positions(&dv, &[0]); + } + + #[test] + fn dv_blob_golden_small_bitmap() { + let bytes = read_fixture("small-bitmap.bin"); + let dv = DeleteVector::deserialize_blob(&bytes, Some(4)).unwrap(); + assert_positions(&dv, &[0, 1, 100, 1000]); + } + + #[test] + fn dv_blob_golden_spanning_keys() { + let bytes = read_fixture("spanning-keys.bin"); + let dv = DeleteVector::deserialize_blob(&bytes, Some(4)).unwrap(); + assert_positions(&dv, &[0, 1u64 << 33, (1u64 << 33) + 5, 1u64 << 34]); + } + + #[test] + fn dv_blob_golden_dense_range() { + let bytes = read_fixture("dense-range.bin"); + let dv = DeleteVector::deserialize_blob(&bytes, Some(10_000)).unwrap(); + assert_eq!(dv.len(), 10_000); + assert!(dv.iter().any(|p| p == 0)); + assert!(dv.iter().any(|p| p == 9_999)); + } + + /// Regenerate the committed golden fixtures (see testdata README). + #[test] + #[ignore] + fn dv_blob_regenerate_golden_fixtures() { + let dir = fixture_path(""); + std::fs::create_dir_all(&dir).unwrap(); + let cases: Vec<(&str, DeleteVector)> = vec![ + ("empty.bin", DeleteVector::default()), + ("single-position.bin", dv_from([0])), + ("small-bitmap.bin", dv_from([0, 1, 100, 1000])), + ( + "spanning-keys.bin", + dv_from([0, 1u64 << 33, (1u64 << 33) + 5, 1u64 << 34]), + ), + ("dense-range.bin", dv_from(0..10_000)), + ]; + for (name, dv) in cases { + let bytes = serialize_blob_for_test(&dv); + std::fs::write(dir.join(name), bytes).unwrap(); + } + } } diff --git a/crates/iceberg/testdata/deletes/README.md b/crates/iceberg/testdata/deletes/README.md new file mode 100644 index 0000000000..ca82443c89 --- /dev/null +++ b/crates/iceberg/testdata/deletes/README.md @@ -0,0 +1,39 @@ + + +# `deletion-vector-v1` golden fixtures + +Inputs to the `dv_blob_golden_*` tests in +[`crates/iceberg/src/delete_vector.rs`](../../src/delete_vector.rs). Each +file is a complete Puffin `deletion-vector-v1` blob payload +(`[length][magic][roaring][crc32]`). + +| File | Positions | +|---|---| +| `empty.bin` | _(none)_ | +| `single-position.bin` | `{0}` | +| `small-bitmap.bin` | `{0, 1, 100, 1000}` | +| `spanning-keys.bin` | `{0, 1<<33, (1<<33)+5, 1<<34}` | +| `dense-range.bin` | `0..10_000` (bitmap-container path; Java's RLE produces ~30 B for the same input) | + +Regenerate with: + +```bash +cargo test -p iceberg dv_blob_regenerate_golden_fixtures -- --ignored --exact +``` diff --git a/crates/iceberg/testdata/deletes/dense-range.bin b/crates/iceberg/testdata/deletes/dense-range.bin new file mode 100644 index 0000000000..7af4301958 Binary files /dev/null and b/crates/iceberg/testdata/deletes/dense-range.bin differ diff --git a/crates/iceberg/testdata/deletes/empty.bin b/crates/iceberg/testdata/deletes/empty.bin new file mode 100644 index 0000000000..8bbc1265dc Binary files /dev/null and b/crates/iceberg/testdata/deletes/empty.bin differ diff --git a/crates/iceberg/testdata/deletes/single-position.bin b/crates/iceberg/testdata/deletes/single-position.bin new file mode 100644 index 0000000000..81fcd1b0bd Binary files /dev/null and b/crates/iceberg/testdata/deletes/single-position.bin differ diff --git a/crates/iceberg/testdata/deletes/small-bitmap.bin b/crates/iceberg/testdata/deletes/small-bitmap.bin new file mode 100644 index 0000000000..3dec072856 Binary files /dev/null and b/crates/iceberg/testdata/deletes/small-bitmap.bin differ diff --git a/crates/iceberg/testdata/deletes/spanning-keys.bin b/crates/iceberg/testdata/deletes/spanning-keys.bin new file mode 100644 index 0000000000..ddf261efe4 Binary files /dev/null and b/crates/iceberg/testdata/deletes/spanning-keys.bin differ