From 74f9e2f885144b197010884d52d677e8d78d5c5c Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 28 Apr 2026 22:12:08 +0100 Subject: [PATCH 01/16] em --- crates/iceberg/src/encryption/crypto.rs | 9 + crates/iceberg/src/encryption/encrypted_io.rs | 250 +++++++ .../src/encryption/encryption_manager.rs | 645 ++++++++++++++++++ crates/iceberg/src/encryption/key_metadata.rs | 39 ++ crates/iceberg/src/encryption/mod.rs | 8 +- 5 files changed, 950 insertions(+), 1 deletion(-) create mode 100644 crates/iceberg/src/encryption/encrypted_io.rs create mode 100644 crates/iceberg/src/encryption/encryption_manager.rs diff --git a/crates/iceberg/src/encryption/crypto.rs b/crates/iceberg/src/encryption/crypto.rs index 0f6a9eff43..02d085ebf8 100644 --- a/crates/iceberg/src/encryption/crypto.rs +++ b/crates/iceberg/src/encryption/crypto.rs @@ -177,6 +177,15 @@ impl SecureKey { } } +impl TryFrom for SecureKey { + type Error = Error; + + fn try_from(key: SensitiveBytes) -> Result { + let key_size = AesKeySize::from_key_length(key.len())?; + Ok(Self { key, key_size }) + } +} + /// AES-GCM cipher for encrypting and decrypting data. pub struct AesGcmCipher { key: SensitiveBytes, diff --git a/crates/iceberg/src/encryption/encrypted_io.rs b/crates/iceberg/src/encryption/encrypted_io.rs new file mode 100644 index 0000000000..214f2e98be --- /dev/null +++ b/crates/iceberg/src/encryption/encrypted_io.rs @@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encrypted file wrappers for InputFile / OutputFile. + +use std::sync::Arc; + +use bytes::Bytes; + +use super::file_decryptor::AesGcmFileDecryptor; +use super::file_encryptor::AesGcmFileEncryptor; +use super::key_metadata::NativeKeyMaterial; +use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; + +/// An AGS1 stream-encrypted input file wrapping a plain [`InputFile`]. +/// +/// Transparently decrypts on read. +pub struct EncryptedInputFile { + inner: InputFile, + decryptor: Arc, +} + +impl EncryptedInputFile { + /// Creates a new encrypted input file. + pub fn new(inner: InputFile, decryptor: Arc) -> Self { + Self { inner, decryptor } + } + + /// Absolute path of the file. + pub fn location(&self) -> &str { + self.inner.location() + } + + /// Check if file exists. + pub async fn exists(&self) -> crate::Result { + self.inner.exists().await + } + + /// Fetch and returns metadata of file. + /// + /// The returned size is the **plaintext** size. + pub async fn metadata(&self) -> crate::Result { + let raw_meta = self.inner.metadata().await?; + let plaintext_size = self.decryptor.plaintext_length(raw_meta.size)?; + Ok(FileMetadata { + size: plaintext_size, + }) + } + + /// Read and returns whole content of file (decrypted plaintext). + pub async fn read(&self) -> crate::Result { + let meta = self.metadata().await?; + let reader = self.reader().await?; + reader.read(0..meta.size).await + } + + /// Creates a reader that transparently decrypts on each read. + pub async fn reader(&self) -> crate::Result> { + let raw_meta = self.inner.metadata().await?; + let raw_reader = self.inner.reader().await?; + self.decryptor.wrap_reader(raw_reader, raw_meta.size) + } + + /// Consumes self and returns the underlying plain input file. + pub fn into_inner(self) -> InputFile { + self.inner + } +} + +impl std::fmt::Debug for EncryptedInputFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EncryptedInputFile") + .field("path", &self.inner.location()) + .finish_non_exhaustive() + } +} + +/// A Parquet Modular Encryption (PME) input file wrapping a plain [`InputFile`]. +/// +/// The Parquet reader handles decryption at the column/page level using the +/// key material carried by this struct. +pub struct NativeEncryptedInputFile { + inner: InputFile, + key_material: NativeKeyMaterial, +} + +impl NativeEncryptedInputFile { + /// Creates a new native-encrypted input file. + pub fn new(inner: InputFile, key_material: NativeKeyMaterial) -> Self { + Self { + inner, + key_material, + } + } + + /// Absolute path of the file. + pub fn location(&self) -> &str { + self.inner.location() + } + + /// Returns the native key material for PME decryption. + pub fn key_material(&self) -> &NativeKeyMaterial { + &self.key_material + } + + /// Consumes self and returns the underlying plain input file. + pub fn into_inner(self) -> InputFile { + self.inner + } +} + +impl std::fmt::Debug for NativeEncryptedInputFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NativeEncryptedInputFile") + .field("path", &self.inner.location()) + .finish_non_exhaustive() + } +} + +/// An AGS1 stream-encrypted output file wrapping a plain [`OutputFile`]. +/// +/// Transparently encrypts on write. +pub struct EncryptedOutputFile { + inner: OutputFile, + key_metadata: Box<[u8]>, + encryptor: Arc, +} + +impl EncryptedOutputFile { + /// Creates a new encrypted output file. + pub fn new( + inner: OutputFile, + key_metadata: Box<[u8]>, + encryptor: Arc, + ) -> Self { + Self { + inner, + key_metadata, + encryptor, + } + } + + /// Returns the key metadata bytes (for storage in manifest/data files). + pub fn key_metadata(&self) -> &[u8] { + &self.key_metadata + } + + /// Absolute path of the file. + pub fn location(&self) -> &str { + self.inner.location() + } + + /// Creates a writer that transparently encrypts on each write. + pub async fn writer(&self) -> crate::Result> { + let raw_writer = self.inner.writer().await?; + Ok(self.encryptor.wrap_writer(raw_writer)) + } + + /// Write bytes to file (transparently encrypted). + pub async fn write(&self, bs: Bytes) -> crate::Result<()> { + let mut writer = self.writer().await?; + writer.write(bs).await?; + writer.close().await + } + + /// Deletes the underlying file. + pub async fn delete(&self) -> crate::Result<()> { + self.inner.delete().await + } + + /// Consumes self and returns the underlying plain output file. + pub fn into_inner(self) -> OutputFile { + self.inner + } +} + +impl std::fmt::Debug for EncryptedOutputFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EncryptedOutputFile") + .field("path", &self.inner.location()) + .finish_non_exhaustive() + } +} + +/// A Parquet Modular Encryption (PME) output file wrapping a plain [`OutputFile`]. +/// +/// The Parquet writer handles encryption at the column/page level using the +/// key material carried by this struct. +pub struct NativeEncryptedOutputFile { + inner: OutputFile, + key_metadata: Box<[u8]>, + key_material: NativeKeyMaterial, +} + +impl NativeEncryptedOutputFile { + /// Creates a new native-encrypted output file. + pub fn new( + inner: OutputFile, + key_metadata: Box<[u8]>, + key_material: NativeKeyMaterial, + ) -> Self { + Self { + inner, + key_metadata, + key_material, + } + } + + /// Returns the key metadata bytes (for storage in manifest/data files). + pub fn key_metadata(&self) -> &[u8] { + &self.key_metadata + } + + /// Absolute path of the file. + pub fn location(&self) -> &str { + self.inner.location() + } + + /// Returns the native key material for PME encryption. + pub fn key_material(&self) -> &NativeKeyMaterial { + &self.key_material + } + + /// Consumes self and returns the underlying plain output file. + pub fn into_inner(self) -> OutputFile { + self.inner + } +} + +impl std::fmt::Debug for NativeEncryptedOutputFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NativeEncryptedOutputFile") + .field("path", &self.inner.location()) + .finish_non_exhaustive() + } +} diff --git a/crates/iceberg/src/encryption/encryption_manager.rs b/crates/iceberg/src/encryption/encryption_manager.rs new file mode 100644 index 0000000000..383db27961 --- /dev/null +++ b/crates/iceberg/src/encryption/encryption_manager.rs @@ -0,0 +1,645 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption manager for file-level encryption and two-layer envelope key management. +//! +//! [`EncryptionManager`] provides file-level `decrypt` / `encrypt` +//! operations matching Java's `org.apache.iceberg.encryption.EncryptionManager`, +//! using envelope encryption: +//! - A master key (in KMS) wraps a Key Encryption Key (KEK) +//! - The KEK wraps Data Encryption Keys (DEKs) locally + +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +use aes_gcm::aead::OsRng; +use aes_gcm::aead::rand_core::RngCore; +use chrono::Utc; +use uuid::Uuid; + +const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; + +use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +use super::encrypted_io::{ + EncryptedInputFile, EncryptedOutputFile, NativeEncryptedInputFile, NativeEncryptedOutputFile, +}; +use super::file_decryptor::AesGcmFileDecryptor; +use super::file_encryptor::AesGcmFileEncryptor; +use super::key_metadata::{NativeKeyMaterial, StandardKeyMetadata}; +use super::kms::KeyManagementClient; +use crate::io::{InputFile, OutputFile}; +use crate::spec::EncryptedKey; +use crate::{Error, ErrorKind, Result}; + +/// Property key for the KEK creation timestamp (milliseconds since epoch). +/// Matches Java's `StandardEncryptionManager.KEY_TIMESTAMP`. +pub const KEK_CREATED_AT_PROPERTY: &str = "KEY_TIMESTAMP"; + +/// Default KEK lifespan in days, per NIST SP 800-57. +const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730; + +/// Default cache TTL for unwrapped KEKs. +const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600); + +/// Default AAD prefix length in bytes. +/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`. +const AAD_PREFIX_LENGTH: usize = 16; + +/// File-level encryption manager using two-layer envelope encryption. +/// +/// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls. +#[derive(typed_builder::TypedBuilder)] +pub struct EncryptionManager { + kms_client: Arc, + #[builder( + default = moka::future::Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(), + setter(skip) + )] + kek_cache: moka::future::Cache, + /// AES key size for DEK generation. Defaults to 128-bit. + #[builder(default = AesKeySize::default())] + key_size: AesKeySize, + /// Master key ID from table property `encryption.key-id`. + #[builder(setter(into))] + table_key_id: String, + /// All encryption keys from table metadata (KEKs and wrapped key metadata entries). + #[builder(default)] + encryption_keys: HashMap, +} + +impl fmt::Debug for EncryptionManager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EncryptionManager") + .field("key_size", &self.key_size) + .field("table_key_id", &self.table_key_id) + .finish_non_exhaustive() + } +} + +impl EncryptionManager { + /// Encrypt a file with AGS1 stream encryption. + /// + /// Returns an [`EncryptedOutputFile`] that transparently encrypts on + /// write, along with key metadata for later decryption. + pub fn encrypt(&self, raw_output: OutputFile) -> Result { + let dek = SecureKey::generate(self.key_size); + let aad_prefix = Self::generate_aad_prefix(); + + let key_metadata_bytes = StandardKeyMetadata::new(dek.as_bytes()) + .with_aad_prefix(&aad_prefix) + .encode()?; + + let encryptor = Arc::new(AesGcmFileEncryptor::new(dek.as_bytes(), aad_prefix)?); + + Ok(EncryptedOutputFile::new( + raw_output, + key_metadata_bytes, + encryptor, + )) + } + + /// Decrypt an encrypted input file, returning an [`EncryptedInputFile`] + /// that transparently decrypts on read. + pub fn decrypt(&self, input: InputFile, key_metadata: &[u8]) -> Result { + let metadata = StandardKeyMetadata::decode(key_metadata)?; + + let decryptor = Arc::new(AesGcmFileDecryptor::new( + metadata.encryption_key().as_bytes(), + metadata.aad_prefix().unwrap_or_default(), + )?); + + Ok(EncryptedInputFile::new(input, decryptor)) + } + + /// Encrypt an output file for Parquet Modular Encryption (PME). + /// + /// Returns a [`NativeEncryptedOutputFile`] whose key material is available + /// for the Parquet writer to configure `FileEncryptionProperties`. + pub fn encrypt_native(&self, raw_output: OutputFile) -> Result { + let dek = SecureKey::generate(self.key_size); + let aad_prefix = Self::generate_aad_prefix(); + + let key_metadata_bytes = StandardKeyMetadata::new(dek.as_bytes()) + .with_aad_prefix(&aad_prefix) + .encode()?; + + Ok(NativeEncryptedOutputFile::new( + raw_output, + key_metadata_bytes, + NativeKeyMaterial::new(SensitiveBytes::new(dek.as_bytes()), aad_prefix), + )) + } + + /// Decrypt key metadata for a Parquet Modular Encryption (PME) file. + /// + /// Returns a [`NativeEncryptedInputFile`] carrying the plaintext DEK + /// and AAD prefix for the Parquet reader to configure + /// `FileDecryptionProperties`. + pub fn decrypt_native( + &self, + raw_input: InputFile, + key_metadata: &[u8], + ) -> Result { + let metadata = StandardKeyMetadata::decode(key_metadata)?; + Ok(NativeEncryptedInputFile::new( + raw_input, + NativeKeyMaterial::new( + metadata.encryption_key().clone(), + metadata.aad_prefix().unwrap_or_default().into(), + ), + )) + } + + /// Wrap key metadata bytes with a KEK for storage in table metadata. + /// + /// Returns `(wrapped_entry, optional_new_kek)`. The wrapped entry + /// contains the key metadata encrypted by the KEK, and should be stored + /// in `TableMetadata.encryption_keys`. The optional second element is a + /// newly created KEK — present only when no active KEK existed (first + /// write) or the existing KEK expired (rotation). When `Some`, the + /// caller must also persist this KEK in table metadata so that future + /// `unwrap_key_metadata` calls can find it. + pub async fn wrap_key_metadata( + &self, + key_metadata: &[u8], + ) -> Result<(EncryptedKey, Option)> { + let (kek, new_kek) = match self.find_active_kek(&self.encryption_keys) { + Some(existing) => (existing.clone(), None), + None => { + let created = self.create_kek().await?; + let cloned = created.clone(); + (created, Some(cloned)) + } + }; + + let kek_bytes = self.unwrap_kek(&kek).await?; + + // Use the KEK timestamp as AAD to prevent timestamp tampering attacks. + let kek_timestamp = kek.properties().get(KEK_CREATED_AT_PROPERTY); + let aad = kek_timestamp.map(|ts| ts.as_bytes()); + let wrapped_metadata = self.wrap_dek_with_kek(key_metadata, &kek_bytes, aad)?; + + let wrapped_key = EncryptedKey::builder() + .key_id(Uuid::new_v4().to_string()) + .encrypted_key_metadata(wrapped_metadata) + .encrypted_by_id(kek.key_id()) + .build(); + + Ok((wrapped_key, new_kek)) + } + + /// Unwrap key metadata that was KEK-wrapped and stored in table metadata. + /// + /// Given an `EncryptedKey` entry (from a manifest list or snapshot) and + /// the full map of encryption keys from `TableMetadata`, returns the + /// unwrapped key metadata bytes (e.g. serialized `StandardKeyMetadata`). + pub async fn unwrap_key_metadata( + &self, + encrypted_key: &EncryptedKey, + encryption_keys: &HashMap, + ) -> Result> { + let kek_key_id = encrypted_key.encrypted_by_id().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "EncryptedKey '{}' has no encrypted_by_id", + encrypted_key.key_id() + ), + ) + })?; + + self.decrypt_dek( + kek_key_id, + encrypted_key.encrypted_key_metadata(), + encryption_keys, + ) + .await + } + + /// Create a new KEK, wrapped by the table's master key. + async fn create_kek(&self) -> Result { + let (plaintext_kek, wrapped_kek) = if self.kms_client.supports_key_generation() { + let result = self.kms_client.generate_key(&self.table_key_id).await?; + (result.key().clone(), result.wrapped_key().to_vec()) + } else { + let plaintext_key = SecureKey::generate(self.key_size); + let wrapped = self + .kms_client + .wrap_key(plaintext_key.as_bytes(), &self.table_key_id) + .await?; + + (SensitiveBytes::new(plaintext_key.as_bytes()), wrapped) + }; + + let key_id = Uuid::new_v4().to_string(); + let now_ms = Utc::now().timestamp_millis(); + + let mut properties = HashMap::new(); + properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), now_ms.to_string()); + + self.kek_cache.insert(key_id.clone(), plaintext_kek).await; + + Ok(EncryptedKey::builder() + .key_id(key_id) + .encrypted_key_metadata(wrapped_kek) + .encrypted_by_id(&self.table_key_id) + .properties(properties) + .build()) + } + + /// Check whether a KEK has exceeded its configured lifespan (730 days per NIST SP 800-57). + fn is_kek_expired(&self, kek: &EncryptedKey) -> bool { + let created_at_ms = match kek + .properties() + .get(KEK_CREATED_AT_PROPERTY) + .and_then(|ts| ts.parse::().ok()) + { + Some(ts) => ts, + None => return true, // No timestamp -> treat as expired + }; + + let now_ms = Utc::now().timestamp_millis(); + let lifespan_ms = DEFAULT_KEK_LIFESPAN_DAYS * MILLIS_IN_DAY; + (now_ms - created_at_ms) >= lifespan_ms + } + + /// Find the latest non-expired KEK for the table's master key. + fn find_active_kek<'a>( + &self, + encryption_keys: &'a HashMap, + ) -> Option<&'a EncryptedKey> { + encryption_keys + .values() + .filter(|kek| { + kek.encrypted_by_id() + .map(|id| id == self.table_key_id) + .unwrap_or(false) + && !self.is_kek_expired(kek) + }) + .max_by_key(|kek| { + kek.properties() + .get(KEK_CREATED_AT_PROPERTY) + .and_then(|ts| ts.parse::().ok()) + .unwrap_or(0) + }) + } + + /// Unwrap a KEK using the KMS, with caching to avoid repeated calls. + async fn unwrap_kek(&self, kek: &EncryptedKey) -> Result { + let cache_key = kek.key_id().to_string(); + + if let Some(cached) = self.kek_cache.get(&cache_key).await { + return Ok(cached); + } + + let master_key_id = kek.encrypted_by_id().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("KEK '{}' has no encrypted_by_id", kek.key_id()), + ) + })?; + + let plaintext = self + .kms_client + .unwrap_key(kek.encrypted_key_metadata(), master_key_id) + .await?; + + self.kek_cache.insert(cache_key, plaintext.clone()).await; + + Ok(plaintext) + } + + /// Decrypt a wrapped DEK using the KEK identified by `kek_key_id`. + async fn decrypt_dek( + &self, + kek_key_id: &str, + wrapped_dek: &[u8], + encryption_keys: &HashMap, + ) -> Result> { + let kek = encryption_keys.get(kek_key_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("KEK not found in encryption keys: {kek_key_id}"), + ) + })?; + + // The Iceberg Spec uses the KEK timestamp as AAD when wrapping key metadata, to + // prevent timestamp tampering attacks. + let aad = kek + .properties() + .get(KEK_CREATED_AT_PROPERTY) + .map(|ts| ts.as_bytes()); + + let kek_bytes = self.unwrap_kek(kek).await?; + self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, aad) + .map_err(|e| { + Error::new( + e.kind(), + format!("Failed to unwrap key metadata with KEK '{kek_key_id}'"), + ) + .with_source(e) + }) + } + + /// Generate a random AAD prefix for file encryption. + fn generate_aad_prefix() -> Box<[u8]> { + let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; + OsRng.fill_bytes(&mut prefix); + prefix.into_boxed_slice() + } + + /// Wrap a DEK with a KEK using local AES-GCM. + fn wrap_dek_with_kek( + &self, + dek: &[u8], + kek: &SensitiveBytes, + aad: Option<&[u8]>, + ) -> Result> { + let key = SecureKey::try_from(kek.clone())?; + let cipher = AesGcmCipher::new(key); + cipher.encrypt(dek, aad) + } + + /// Unwrap a DEK with a KEK using local AES-GCM. + fn unwrap_dek_with_kek( + &self, + wrapped_dek: &[u8], + kek: &SensitiveBytes, + aad: Option<&[u8]>, + ) -> Result> { + let key = SecureKey::try_from(kek.clone())?; + let cipher = AesGcmCipher::new(key); + cipher.decrypt(wrapped_dek, aad) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::encryption::kms::MemoryKeyManagementClient; + + fn create_test_kms() -> Arc { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + Arc::new(kms) + } + + fn create_test_manager() -> EncryptionManager { + EncryptionManager::builder() + .kms_client(create_test_kms()) + .table_key_id("master-1") + .build() + } + + #[tokio::test] + async fn test_create_kek() { + let mgr = create_test_manager(); + let kek = mgr.create_kek().await.unwrap(); + + assert!(!kek.key_id().is_empty()); + assert!(!kek.encrypted_key_metadata().is_empty()); + assert_eq!(kek.encrypted_by_id(), Some("master-1")); + assert!(kek.properties().contains_key(KEK_CREATED_AT_PROPERTY)); + } + + #[tokio::test] + async fn test_wrap_unwrap_key_metadata_roundtrip() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + let plaintext = b"some-key-metadata"; + let (entry, new_kek) = mgr.wrap_key_metadata(plaintext).await.unwrap(); + + // First wrap should create a new KEK + assert!(new_kek.is_some()); + let kek = new_kek.unwrap(); + + // Build a manager with the KEK so we can unwrap + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek); + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .encryption_keys(encryption_keys.clone()) + .build(); + + let decrypted = mgr + .unwrap_key_metadata(&entry, &encryption_keys) + .await + .unwrap(); + assert_eq!(decrypted, plaintext); + } + + #[tokio::test] + async fn test_kek_reuse_when_not_expired() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // First wrap creates a new KEK + let (_, new_kek) = mgr.wrap_key_metadata(b"data-1").await.unwrap(); + let kek = new_kek.unwrap(); + + // Build manager with the active KEK (same KMS to unwrap) + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek.clone()); + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .encryption_keys(encryption_keys) + .build(); + + // Second wrap should reuse the existing KEK (no new KEK) + let (entry, new_kek2) = mgr.wrap_key_metadata(b"data-2").await.unwrap(); + assert!(new_kek2.is_none()); + assert_eq!(entry.encrypted_by_id(), Some(kek.key_id())); + } + + #[tokio::test] + async fn test_kek_rotation_when_expired() { + let kms = create_test_kms(); + + // Create a KEK with a timestamp 3 years in the past (exceeds 730-day lifespan) + let three_years_ago_ms = Utc::now().timestamp_millis() - (3 * 365 * MILLIS_IN_DAY); + let mut properties = HashMap::new(); + properties.insert( + KEK_CREATED_AT_PROPERTY.to_string(), + three_years_ago_ms.to_string(), + ); + + // Wrap a real KEK so unwrap works if needed + let kek_key = SecureKey::generate(AesKeySize::Bits128); + let wrapped = kms.wrap_key(kek_key.as_bytes(), "master-1").await.unwrap(); + + let old_kek = EncryptedKey::builder() + .key_id("expired-kek") + .encrypted_key_metadata(wrapped) + .encrypted_by_id("master-1") + .properties(properties) + .build(); + + // Build manager with the expired KEK + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(old_kek.key_id().to_string(), old_kek.clone()); + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .encryption_keys(encryption_keys) + .build(); + + // Wrap should rotate to a new KEK since the existing one is expired + let (_, new_kek) = mgr.wrap_key_metadata(b"data").await.unwrap(); + assert!(new_kek.is_some()); + assert_ne!(new_kek.unwrap().key_id(), old_kek.key_id()); + } + + #[tokio::test] + async fn test_is_kek_expired_no_timestamp() { + let mgr = create_test_manager(); + + // KEK without a created-at timestamp -> treated as expired + let kek = EncryptedKey::builder() + .key_id("no-ts") + .encrypted_key_metadata(vec![0u8; 32]) + .build(); + + assert!(mgr.is_kek_expired(&kek)); + } + + #[tokio::test] + async fn test_decrypt_dek_with_unknown_kek() { + let mgr = create_test_manager(); + + let encryption_keys: HashMap = HashMap::new(); + let result = mgr + .decrypt_dek("nonexistent-kek", &[1, 2, 3], &encryption_keys) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_kek_cache_hit() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // Create KEK (caches the plaintext KEK) + let kek = mgr.create_kek().await.unwrap(); + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek.clone()); + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .encryption_keys(encryption_keys.clone()) + .build(); + + // Wrap key metadata (unwraps KEK -- should hit cache from create_kek) + let (entry, _) = mgr.wrap_key_metadata(b"test-data").await.unwrap(); + + // Unwrap key metadata (unwraps KEK again -- should hit cache) + let decrypted = mgr + .unwrap_key_metadata(&entry, &encryption_keys) + .await + .unwrap(); + assert_eq!(decrypted, b"test-data"); + } + + #[tokio::test] + async fn test_decrypt_with_known_key() { + use crate::io::FileIO; + + let io = FileIO::new_with_memory(); + let path = "memory:///test/encrypted.bin"; + + // Encrypt data using AesGcmFileEncryptor directly (independent of EncryptionManager) + let dek = b"0123456789abcdef"; + let aad_prefix = b"test-aad-prefix!"; + let plaintext = b"Hello, encrypted Iceberg!"; + + let encryptor = AesGcmFileEncryptor::new(dek.as_slice(), aad_prefix.as_slice()).unwrap(); + let output = io.new_output(path).unwrap(); + let mut writer = encryptor.wrap_writer(output.writer().await.unwrap()); + writer + .write(bytes::Bytes::from(plaintext.to_vec())) + .await + .unwrap(); + writer.close().await.unwrap(); + + // Build key metadata with the known DEK + let key_metadata_bytes = StandardKeyMetadata::new(dek.as_slice()) + .with_aad_prefix(aad_prefix) + .encode() + .unwrap(); + + // Decrypt via EncryptionManager + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(); + + let input_file = io.new_input(path).unwrap(); + let decrypted_file = mgr.decrypt(input_file, &key_metadata_bytes).unwrap(); + + let content = decrypted_file.read().await.unwrap(); + assert_eq!(&content[..], plaintext); + + let meta = decrypted_file.metadata().await.unwrap(); + assert_eq!(meta.size, plaintext.len() as u64); + } + + #[tokio::test] + async fn test_encrypt_decrypt_roundtrip() { + use crate::io::FileIO; + + let io = FileIO::new_with_memory(); + let path = "memory:///test/encrypt_roundtrip.bin"; + + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(); + + let output = io.new_output(path).unwrap(); + let encrypted_output = mgr.encrypt(output).unwrap(); + + let plaintext = b"Hello, encrypted Iceberg round-trip!"; + encrypted_output + .write(bytes::Bytes::from(plaintext.to_vec())) + .await + .unwrap(); + + let input = io.new_input(path).unwrap(); + let decrypted_file = mgr.decrypt(input, encrypted_output.key_metadata()).unwrap(); + + let content = decrypted_file.read().await.unwrap(); + assert_eq!(&content[..], plaintext); + } +} diff --git a/crates/iceberg/src/encryption/key_metadata.rs b/crates/iceberg/src/encryption/key_metadata.rs index 4ef66ce394..58b8fa0121 100644 --- a/crates/iceberg/src/encryption/key_metadata.rs +++ b/crates/iceberg/src/encryption/key_metadata.rs @@ -224,6 +224,45 @@ mod _serde { } } +/// Plaintext key material for Parquet Modular Encryption (PME). +/// +/// Carries the DEK and AAD prefix needed by a Parquet reader/writer to +/// configure `FileEncryptionProperties` / `FileDecryptionProperties`. +/// +/// Rust equivalent of Java's `NativeEncryptionKeyMetadata` interface. +pub struct NativeKeyMaterial { + dek: SensitiveBytes, + /// AAD prefix is not secret — it is stored in plaintext in file metadata + /// and used for integrity (authenticated data), not confidentiality. + aad_prefix: Box<[u8]>, +} + +impl NativeKeyMaterial { + /// Creates a new `NativeKeyMaterial`. + pub fn new(dek: SensitiveBytes, aad_prefix: Box<[u8]>) -> Self { + Self { dek, aad_prefix } + } + + /// Returns the plaintext DEK. + pub fn dek(&self) -> &SensitiveBytes { + &self.dek + } + + /// Returns the AAD prefix. + pub fn aad_prefix(&self) -> &[u8] { + &self.aad_prefix + } +} + +impl std::fmt::Debug for NativeKeyMaterial { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NativeKeyMaterial") + .field("dek", &self.dek) + .field("aad_prefix_len", &self.aad_prefix.len()) + .finish() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 773d781d6d..4f321a38e3 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,6 +21,8 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; +pub(crate) mod encrypted_io; +mod encryption_manager; mod file_decryptor; mod file_encryptor; pub(crate) mod key_metadata; @@ -28,8 +30,12 @@ pub mod kms; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +pub use encrypted_io::{ + EncryptedInputFile, EncryptedOutputFile, NativeEncryptedInputFile, NativeEncryptedOutputFile, +}; +pub use encryption_manager::EncryptionManager; pub use file_decryptor::AesGcmFileDecryptor; pub use file_encryptor::AesGcmFileEncryptor; -pub use key_metadata::StandardKeyMetadata; +pub use key_metadata::{NativeKeyMaterial, StandardKeyMetadata}; pub use kms::{GeneratedKey, KeyManagementClient}; pub use stream::{AesGcmFileRead, AesGcmFileWrite}; From 0351241ef86e6987bd6b94ec2c1bc588270797b2 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 29 Apr 2026 10:12:45 +0100 Subject: [PATCH 02/16] move native key material --- crates/iceberg/src/encryption/encrypted_io.rs | 41 ++++++++++++++++++- .../src/encryption/encryption_manager.rs | 3 +- crates/iceberg/src/encryption/key_metadata.rs | 39 ------------------ crates/iceberg/src/encryption/mod.rs | 3 +- 4 files changed, 44 insertions(+), 42 deletions(-) diff --git a/crates/iceberg/src/encryption/encrypted_io.rs b/crates/iceberg/src/encryption/encrypted_io.rs index 214f2e98be..a80158f11a 100644 --- a/crates/iceberg/src/encryption/encrypted_io.rs +++ b/crates/iceberg/src/encryption/encrypted_io.rs @@ -21,9 +21,9 @@ use std::sync::Arc; use bytes::Bytes; +use super::crypto::SensitiveBytes; use super::file_decryptor::AesGcmFileDecryptor; use super::file_encryptor::AesGcmFileEncryptor; -use super::key_metadata::NativeKeyMaterial; use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; /// An AGS1 stream-encrypted input file wrapping a plain [`InputFile`]. @@ -248,3 +248,42 @@ impl std::fmt::Debug for NativeEncryptedOutputFile { .finish_non_exhaustive() } } + +/// Plaintext key material for Parquet Modular Encryption (PME). +/// +/// Carries the DEK and AAD prefix needed by a Parquet reader/writer to +/// configure `FileEncryptionProperties` / `FileDecryptionProperties`. +/// +/// Rust equivalent of Java's `NativeEncryptionKeyMetadata` interface. +pub struct NativeKeyMaterial { + dek: SensitiveBytes, + /// AAD prefix is not secret — it is stored in plaintext in file metadata + /// and used for integrity (authenticated data), not confidentiality. + aad_prefix: Box<[u8]>, +} + +impl NativeKeyMaterial { + /// Creates a new `NativeKeyMaterial`. + pub fn new(dek: SensitiveBytes, aad_prefix: Box<[u8]>) -> Self { + Self { dek, aad_prefix } + } + + /// Returns the plaintext DEK. + pub fn dek(&self) -> &SensitiveBytes { + &self.dek + } + + /// Returns the AAD prefix. + pub fn aad_prefix(&self) -> &[u8] { + &self.aad_prefix + } +} + +impl std::fmt::Debug for NativeKeyMaterial { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("NativeKeyMaterial") + .field("dek", &self.dek) + .field("aad_prefix_len", &self.aad_prefix.len()) + .finish() + } +} diff --git a/crates/iceberg/src/encryption/encryption_manager.rs b/crates/iceberg/src/encryption/encryption_manager.rs index 383db27961..d7e79afc5f 100644 --- a/crates/iceberg/src/encryption/encryption_manager.rs +++ b/crates/iceberg/src/encryption/encryption_manager.rs @@ -38,10 +38,11 @@ const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; use super::encrypted_io::{ EncryptedInputFile, EncryptedOutputFile, NativeEncryptedInputFile, NativeEncryptedOutputFile, + NativeKeyMaterial, }; use super::file_decryptor::AesGcmFileDecryptor; use super::file_encryptor::AesGcmFileEncryptor; -use super::key_metadata::{NativeKeyMaterial, StandardKeyMetadata}; +use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; use crate::io::{InputFile, OutputFile}; use crate::spec::EncryptedKey; diff --git a/crates/iceberg/src/encryption/key_metadata.rs b/crates/iceberg/src/encryption/key_metadata.rs index 58b8fa0121..4ef66ce394 100644 --- a/crates/iceberg/src/encryption/key_metadata.rs +++ b/crates/iceberg/src/encryption/key_metadata.rs @@ -224,45 +224,6 @@ mod _serde { } } -/// Plaintext key material for Parquet Modular Encryption (PME). -/// -/// Carries the DEK and AAD prefix needed by a Parquet reader/writer to -/// configure `FileEncryptionProperties` / `FileDecryptionProperties`. -/// -/// Rust equivalent of Java's `NativeEncryptionKeyMetadata` interface. -pub struct NativeKeyMaterial { - dek: SensitiveBytes, - /// AAD prefix is not secret — it is stored in plaintext in file metadata - /// and used for integrity (authenticated data), not confidentiality. - aad_prefix: Box<[u8]>, -} - -impl NativeKeyMaterial { - /// Creates a new `NativeKeyMaterial`. - pub fn new(dek: SensitiveBytes, aad_prefix: Box<[u8]>) -> Self { - Self { dek, aad_prefix } - } - - /// Returns the plaintext DEK. - pub fn dek(&self) -> &SensitiveBytes { - &self.dek - } - - /// Returns the AAD prefix. - pub fn aad_prefix(&self) -> &[u8] { - &self.aad_prefix - } -} - -impl std::fmt::Debug for NativeKeyMaterial { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NativeKeyMaterial") - .field("dek", &self.dek) - .field("aad_prefix_len", &self.aad_prefix.len()) - .finish() - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 4f321a38e3..c264492108 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -32,10 +32,11 @@ mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; pub use encrypted_io::{ EncryptedInputFile, EncryptedOutputFile, NativeEncryptedInputFile, NativeEncryptedOutputFile, + NativeKeyMaterial, }; pub use encryption_manager::EncryptionManager; pub use file_decryptor::AesGcmFileDecryptor; pub use file_encryptor::AesGcmFileEncryptor; -pub use key_metadata::{NativeKeyMaterial, StandardKeyMetadata}; +pub use key_metadata::StandardKeyMetadata; pub use kms::{GeneratedKey, KeyManagementClient}; pub use stream::{AesGcmFileRead, AesGcmFileWrite}; From 1ab67ce976cd6deec89a8fdc3cf6cac64f9b6b8a Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 29 Apr 2026 22:03:40 +0100 Subject: [PATCH 03/16] remove native io types --- crates/iceberg/src/encryption/encrypted_io.rs | 135 ------------------ .../src/encryption/encryption_manager.rs | 45 +----- crates/iceberg/src/encryption/mod.rs | 5 +- 3 files changed, 8 insertions(+), 177 deletions(-) diff --git a/crates/iceberg/src/encryption/encrypted_io.rs b/crates/iceberg/src/encryption/encrypted_io.rs index a80158f11a..a7e3fa9a23 100644 --- a/crates/iceberg/src/encryption/encrypted_io.rs +++ b/crates/iceberg/src/encryption/encrypted_io.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use bytes::Bytes; -use super::crypto::SensitiveBytes; use super::file_decryptor::AesGcmFileDecryptor; use super::file_encryptor::AesGcmFileEncryptor; use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; @@ -89,48 +88,6 @@ impl std::fmt::Debug for EncryptedInputFile { } } -/// A Parquet Modular Encryption (PME) input file wrapping a plain [`InputFile`]. -/// -/// The Parquet reader handles decryption at the column/page level using the -/// key material carried by this struct. -pub struct NativeEncryptedInputFile { - inner: InputFile, - key_material: NativeKeyMaterial, -} - -impl NativeEncryptedInputFile { - /// Creates a new native-encrypted input file. - pub fn new(inner: InputFile, key_material: NativeKeyMaterial) -> Self { - Self { - inner, - key_material, - } - } - - /// Absolute path of the file. - pub fn location(&self) -> &str { - self.inner.location() - } - - /// Returns the native key material for PME decryption. - pub fn key_material(&self) -> &NativeKeyMaterial { - &self.key_material - } - - /// Consumes self and returns the underlying plain input file. - pub fn into_inner(self) -> InputFile { - self.inner - } -} - -impl std::fmt::Debug for NativeEncryptedInputFile { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NativeEncryptedInputFile") - .field("path", &self.inner.location()) - .finish_non_exhaustive() - } -} - /// An AGS1 stream-encrypted output file wrapping a plain [`OutputFile`]. /// /// Transparently encrypts on write. @@ -195,95 +152,3 @@ impl std::fmt::Debug for EncryptedOutputFile { .finish_non_exhaustive() } } - -/// A Parquet Modular Encryption (PME) output file wrapping a plain [`OutputFile`]. -/// -/// The Parquet writer handles encryption at the column/page level using the -/// key material carried by this struct. -pub struct NativeEncryptedOutputFile { - inner: OutputFile, - key_metadata: Box<[u8]>, - key_material: NativeKeyMaterial, -} - -impl NativeEncryptedOutputFile { - /// Creates a new native-encrypted output file. - pub fn new( - inner: OutputFile, - key_metadata: Box<[u8]>, - key_material: NativeKeyMaterial, - ) -> Self { - Self { - inner, - key_metadata, - key_material, - } - } - - /// Returns the key metadata bytes (for storage in manifest/data files). - pub fn key_metadata(&self) -> &[u8] { - &self.key_metadata - } - - /// Absolute path of the file. - pub fn location(&self) -> &str { - self.inner.location() - } - - /// Returns the native key material for PME encryption. - pub fn key_material(&self) -> &NativeKeyMaterial { - &self.key_material - } - - /// Consumes self and returns the underlying plain output file. - pub fn into_inner(self) -> OutputFile { - self.inner - } -} - -impl std::fmt::Debug for NativeEncryptedOutputFile { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NativeEncryptedOutputFile") - .field("path", &self.inner.location()) - .finish_non_exhaustive() - } -} - -/// Plaintext key material for Parquet Modular Encryption (PME). -/// -/// Carries the DEK and AAD prefix needed by a Parquet reader/writer to -/// configure `FileEncryptionProperties` / `FileDecryptionProperties`. -/// -/// Rust equivalent of Java's `NativeEncryptionKeyMetadata` interface. -pub struct NativeKeyMaterial { - dek: SensitiveBytes, - /// AAD prefix is not secret — it is stored in plaintext in file metadata - /// and used for integrity (authenticated data), not confidentiality. - aad_prefix: Box<[u8]>, -} - -impl NativeKeyMaterial { - /// Creates a new `NativeKeyMaterial`. - pub fn new(dek: SensitiveBytes, aad_prefix: Box<[u8]>) -> Self { - Self { dek, aad_prefix } - } - - /// Returns the plaintext DEK. - pub fn dek(&self) -> &SensitiveBytes { - &self.dek - } - - /// Returns the AAD prefix. - pub fn aad_prefix(&self) -> &[u8] { - &self.aad_prefix - } -} - -impl std::fmt::Debug for NativeKeyMaterial { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("NativeKeyMaterial") - .field("dek", &self.dek) - .field("aad_prefix_len", &self.aad_prefix.len()) - .finish() - } -} diff --git a/crates/iceberg/src/encryption/encryption_manager.rs b/crates/iceberg/src/encryption/encryption_manager.rs index d7e79afc5f..8d79c95d5d 100644 --- a/crates/iceberg/src/encryption/encryption_manager.rs +++ b/crates/iceberg/src/encryption/encryption_manager.rs @@ -36,10 +36,7 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -use super::encrypted_io::{ - EncryptedInputFile, EncryptedOutputFile, NativeEncryptedInputFile, NativeEncryptedOutputFile, - NativeKeyMaterial, -}; +use super::encrypted_io::{EncryptedInputFile, EncryptedOutputFile}; use super::file_decryptor::AesGcmFileDecryptor; use super::file_encryptor::AesGcmFileEncryptor; use super::key_metadata::StandardKeyMetadata; @@ -128,43 +125,15 @@ impl EncryptionManager { Ok(EncryptedInputFile::new(input, decryptor)) } - /// Encrypt an output file for Parquet Modular Encryption (PME). + /// Generate key material for Parquet Modular Encryption (PME). /// - /// Returns a [`NativeEncryptedOutputFile`] whose key material is available - /// for the Parquet writer to configure `FileEncryptionProperties`. - pub fn encrypt_native(&self, raw_output: OutputFile) -> Result { + /// Returns a [`StandardKeyMetadata`] containing a fresh DEK and AAD prefix. + /// The caller should pass this to the Parquet writer to configure + /// `FileEncryptionProperties`, and serialize it for storage in the manifest. + pub fn generate_native_key_metadata(&self) -> Result { let dek = SecureKey::generate(self.key_size); let aad_prefix = Self::generate_aad_prefix(); - - let key_metadata_bytes = StandardKeyMetadata::new(dek.as_bytes()) - .with_aad_prefix(&aad_prefix) - .encode()?; - - Ok(NativeEncryptedOutputFile::new( - raw_output, - key_metadata_bytes, - NativeKeyMaterial::new(SensitiveBytes::new(dek.as_bytes()), aad_prefix), - )) - } - - /// Decrypt key metadata for a Parquet Modular Encryption (PME) file. - /// - /// Returns a [`NativeEncryptedInputFile`] carrying the plaintext DEK - /// and AAD prefix for the Parquet reader to configure - /// `FileDecryptionProperties`. - pub fn decrypt_native( - &self, - raw_input: InputFile, - key_metadata: &[u8], - ) -> Result { - let metadata = StandardKeyMetadata::decode(key_metadata)?; - Ok(NativeEncryptedInputFile::new( - raw_input, - NativeKeyMaterial::new( - metadata.encryption_key().clone(), - metadata.aad_prefix().unwrap_or_default().into(), - ), - )) + Ok(StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix)) } /// Wrap key metadata bytes with a KEK for storage in table metadata. diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index c264492108..5d72f8c98a 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -30,10 +30,7 @@ pub mod kms; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -pub use encrypted_io::{ - EncryptedInputFile, EncryptedOutputFile, NativeEncryptedInputFile, NativeEncryptedOutputFile, - NativeKeyMaterial, -}; +pub use encrypted_io::{EncryptedInputFile, EncryptedOutputFile}; pub use encryption_manager::EncryptionManager; pub use file_decryptor::AesGcmFileDecryptor; pub use file_encryptor::AesGcmFileEncryptor; From 7f0ffcc9cc83e5d23084b582754d47f7a2db25ac Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 5 May 2026 21:22:55 +0100 Subject: [PATCH 04/16] timestamp is not optional --- .../src/encryption/encryption_manager.rs | 108 ++++++++++++++++-- 1 file changed, 98 insertions(+), 10 deletions(-) diff --git a/crates/iceberg/src/encryption/encryption_manager.rs b/crates/iceberg/src/encryption/encryption_manager.rs index 8d79c95d5d..ecda273648 100644 --- a/crates/iceberg/src/encryption/encryption_manager.rs +++ b/crates/iceberg/src/encryption/encryption_manager.rs @@ -161,9 +161,8 @@ impl EncryptionManager { let kek_bytes = self.unwrap_kek(&kek).await?; // Use the KEK timestamp as AAD to prevent timestamp tampering attacks. - let kek_timestamp = kek.properties().get(KEK_CREATED_AT_PROPERTY); - let aad = kek_timestamp.map(|ts| ts.as_bytes()); - let wrapped_metadata = self.wrap_dek_with_kek(key_metadata, &kek_bytes, aad)?; + let aad = Self::kek_timestamp_aad(&kek)?; + let wrapped_metadata = self.wrap_dek_with_kek(key_metadata, &kek_bytes, Some(aad))?; let wrapped_key = EncryptedKey::builder() .key_id(Uuid::new_v4().to_string()) @@ -309,15 +308,11 @@ impl EncryptionManager { ) })?; - // The Iceberg Spec uses the KEK timestamp as AAD when wrapping key metadata, to - // prevent timestamp tampering attacks. - let aad = kek - .properties() - .get(KEK_CREATED_AT_PROPERTY) - .map(|ts| ts.as_bytes()); + // KEK timestamp as AAD prevents timestamp tampering. + let aad = Self::kek_timestamp_aad(kek)?; let kek_bytes = self.unwrap_kek(kek).await?; - self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, aad) + self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, Some(aad)) .map_err(|e| { Error::new( e.kind(), @@ -327,6 +322,23 @@ impl EncryptionManager { }) } + /// Extract the KEK timestamp for use as AAD. Returns an error if missing. + fn kek_timestamp_aad(kek: &EncryptedKey) -> Result<&[u8]> { + kek.properties() + .get(KEK_CREATED_AT_PROPERTY) + .map(|ts| ts.as_bytes()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "KEK '{}' is missing required '{}' property", + kek.key_id(), + KEK_CREATED_AT_PROPERTY + ), + ) + }) + } + /// Generate a random AAD prefix for file encryption. fn generate_aad_prefix() -> Box<[u8]> { let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; @@ -583,6 +595,82 @@ mod tests { assert_eq!(meta.size, plaintext.len() as u64); } + #[tokio::test] + async fn test_unwrap_fails_when_kek_missing_timestamp() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // Wrap some metadata to get a valid encrypted entry + let (entry, new_kek) = mgr.wrap_key_metadata(b"secret").await.unwrap(); + let kek = new_kek.unwrap(); + + // Re-create the KEK without its KEY_TIMESTAMP property + let kek_no_ts = EncryptedKey::builder() + .key_id(kek.key_id()) + .encrypted_key_metadata(kek.encrypted_key_metadata()) + .encrypted_by_id(kek.encrypted_by_id().unwrap()) + .build(); + + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek_no_ts.key_id().to_string(), kek_no_ts); + + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .encryption_keys(encryption_keys.clone()) + .build(); + + let result = mgr.unwrap_key_metadata(&entry, &encryption_keys).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.to_string().contains(KEK_CREATED_AT_PROPERTY), + "error should mention the missing property: {}", + err + ); + } + + #[tokio::test] + async fn test_unwrap_fails_when_kek_timestamp_tampered() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // Wrap metadata normally + let (entry, new_kek) = mgr.wrap_key_metadata(b"secret").await.unwrap(); + let kek = new_kek.unwrap(); + + // Tamper with the KEK timestamp (change the AAD) + let mut tampered_properties = kek.properties().clone(); + tampered_properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), "9999999".to_string()); + + let tampered_kek = EncryptedKey::builder() + .key_id(kek.key_id()) + .encrypted_key_metadata(kek.encrypted_key_metadata()) + .encrypted_by_id(kek.encrypted_by_id().unwrap()) + .properties(tampered_properties) + .build(); + + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(tampered_kek.key_id().to_string(), tampered_kek); + + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .encryption_keys(encryption_keys.clone()) + .build(); + + // Unwrap should fail because the AAD (timestamp) doesn't match what was used to wrap + let result = mgr.unwrap_key_metadata(&entry, &encryption_keys).await; + assert!(result.is_err(), "tampered timestamp should cause decryption failure"); + } + #[tokio::test] async fn test_encrypt_decrypt_roundtrip() { use crate::io::FileIO; From 864d3ff690b0d3cfd64530b1e22f76591c2d36b6 Mon Sep 17 00:00:00 2001 From: Xander Date: Tue, 5 May 2026 22:02:04 +0100 Subject: [PATCH 05/16] fmt --- crates/iceberg/src/encryption/encryption_manager.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/encryption/encryption_manager.rs b/crates/iceberg/src/encryption/encryption_manager.rs index ecda273648..3361eb031c 100644 --- a/crates/iceberg/src/encryption/encryption_manager.rs +++ b/crates/iceberg/src/encryption/encryption_manager.rs @@ -668,7 +668,10 @@ mod tests { // Unwrap should fail because the AAD (timestamp) doesn't match what was used to wrap let result = mgr.unwrap_key_metadata(&entry, &encryption_keys).await; - assert!(result.is_err(), "tampered timestamp should cause decryption failure"); + assert!( + result.is_err(), + "tampered timestamp should cause decryption failure" + ); } #[tokio::test] From d83b7c107c0cad761cf01ab208a993b21c93ad48 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 6 May 2026 10:46:40 +0100 Subject: [PATCH 06/16] rename --- crates/iceberg/src/encryption/{encrypted_io.rs => io.rs} | 0 .../src/encryption/{encryption_manager.rs => manager.rs} | 2 +- crates/iceberg/src/encryption/mod.rs | 8 ++++---- 3 files changed, 5 insertions(+), 5 deletions(-) rename crates/iceberg/src/encryption/{encrypted_io.rs => io.rs} (100%) rename crates/iceberg/src/encryption/{encryption_manager.rs => manager.rs} (99%) diff --git a/crates/iceberg/src/encryption/encrypted_io.rs b/crates/iceberg/src/encryption/io.rs similarity index 100% rename from crates/iceberg/src/encryption/encrypted_io.rs rename to crates/iceberg/src/encryption/io.rs diff --git a/crates/iceberg/src/encryption/encryption_manager.rs b/crates/iceberg/src/encryption/manager.rs similarity index 99% rename from crates/iceberg/src/encryption/encryption_manager.rs rename to crates/iceberg/src/encryption/manager.rs index 3361eb031c..d9a8217953 100644 --- a/crates/iceberg/src/encryption/encryption_manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -36,7 +36,7 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -use super::encrypted_io::{EncryptedInputFile, EncryptedOutputFile}; +use super::io::{EncryptedInputFile, EncryptedOutputFile}; use super::file_decryptor::AesGcmFileDecryptor; use super::file_encryptor::AesGcmFileEncryptor; use super::key_metadata::StandardKeyMetadata; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 5d72f8c98a..b1470e0f94 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,8 +21,8 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; -pub(crate) mod encrypted_io; -mod encryption_manager; +pub(crate) mod io; +mod manager; mod file_decryptor; mod file_encryptor; pub(crate) mod key_metadata; @@ -30,8 +30,8 @@ pub mod kms; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -pub use encrypted_io::{EncryptedInputFile, EncryptedOutputFile}; -pub use encryption_manager::EncryptionManager; +pub use io::{EncryptedInputFile, EncryptedOutputFile}; +pub use manager::EncryptionManager; pub use file_decryptor::AesGcmFileDecryptor; pub use file_encryptor::AesGcmFileEncryptor; pub use key_metadata::StandardKeyMetadata; From 32fdd3ad36efb2b60dba4e7080360ea398324cd6 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 6 May 2026 10:48:25 +0100 Subject: [PATCH 07/16] import moke cache --- crates/iceberg/src/encryption/manager.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index d9a8217953..65dfb8138a 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -31,6 +31,7 @@ use std::time::Duration; use aes_gcm::aead::OsRng; use aes_gcm::aead::rand_core::RngCore; use chrono::Utc; +use moka::future::Cache; use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; @@ -66,10 +67,10 @@ const AAD_PREFIX_LENGTH: usize = 16; pub struct EncryptionManager { kms_client: Arc, #[builder( - default = moka::future::Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(), + default = Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(), setter(skip) )] - kek_cache: moka::future::Cache, + kek_cache: Cache, /// AES key size for DEK generation. Defaults to 128-bit. #[builder(default = AesKeySize::default())] key_size: AesKeySize, From a5bc5f66945347dc85cbcadfe18e0e968f427fd8 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 6 May 2026 11:19:05 +0100 Subject: [PATCH 08/16] fmt --- crates/iceberg/src/encryption/manager.rs | 2 +- crates/iceberg/src/encryption/mod.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index 65dfb8138a..9e63f3b782 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -37,9 +37,9 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -use super::io::{EncryptedInputFile, EncryptedOutputFile}; use super::file_decryptor::AesGcmFileDecryptor; use super::file_encryptor::AesGcmFileEncryptor; +use super::io::{EncryptedInputFile, EncryptedOutputFile}; use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; use crate::io::{InputFile, OutputFile}; diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index b1470e0f94..4dd1b53759 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,19 +21,19 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; -pub(crate) mod io; -mod manager; mod file_decryptor; mod file_encryptor; +pub(crate) mod io; pub(crate) mod key_metadata; pub mod kms; +mod manager; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -pub use io::{EncryptedInputFile, EncryptedOutputFile}; -pub use manager::EncryptionManager; pub use file_decryptor::AesGcmFileDecryptor; pub use file_encryptor::AesGcmFileEncryptor; +pub use io::{EncryptedInputFile, EncryptedOutputFile}; pub use key_metadata::StandardKeyMetadata; pub use kms::{GeneratedKey, KeyManagementClient}; +pub use manager::EncryptionManager; pub use stream::{AesGcmFileRead, AesGcmFileWrite}; From 78be2acdbf5b34410e40761b974345e3dbf4b4fc Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 6 May 2026 11:19:21 +0100 Subject: [PATCH 09/16] builder method --- crates/iceberg/src/encryption/manager.rs | 35 ++++++++++++++---------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index 9e63f3b782..6913acbb60 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -64,6 +64,16 @@ const AAD_PREFIX_LENGTH: usize = 16; /// /// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls. #[derive(typed_builder::TypedBuilder)] +#[builder(mutators( + /// Add an encryption key (KEK or wrapped key metadata entry). + pub fn add_encryption_key(&mut self, key: EncryptedKey) { + self.encryption_keys.insert(key.key_id().to_string(), key); + } + /// Set all encryption keys from table metadata. + pub fn encryption_keys(&mut self, keys: HashMap) { + self.encryption_keys = keys; + } +))] pub struct EncryptionManager { kms_client: Arc, #[builder( @@ -78,7 +88,7 @@ pub struct EncryptionManager { #[builder(setter(into))] table_key_id: String, /// All encryption keys from table metadata (KEKs and wrapped key metadata entries). - #[builder(default)] + #[builder(default, via_mutators)] encryption_keys: HashMap, } @@ -417,14 +427,14 @@ mod tests { let kek = new_kek.unwrap(); // Build a manager with the KEK so we can unwrap - let mut encryption_keys = HashMap::new(); - encryption_keys.insert(kek.key_id().to_string(), kek); let mgr = EncryptionManager::builder() .kms_client(kms) .table_key_id("master-1") - .encryption_keys(encryption_keys.clone()) + .add_encryption_key(kek.clone()) .build(); + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek); let decrypted = mgr .unwrap_key_metadata(&entry, &encryption_keys) .await @@ -445,12 +455,10 @@ mod tests { let kek = new_kek.unwrap(); // Build manager with the active KEK (same KMS to unwrap) - let mut encryption_keys = HashMap::new(); - encryption_keys.insert(kek.key_id().to_string(), kek.clone()); let mgr = EncryptionManager::builder() .kms_client(kms) .table_key_id("master-1") - .encryption_keys(encryption_keys) + .add_encryption_key(kek.clone()) .build(); // Second wrap should reuse the existing KEK (no new KEK) @@ -483,12 +491,10 @@ mod tests { .build(); // Build manager with the expired KEK - let mut encryption_keys = HashMap::new(); - encryption_keys.insert(old_kek.key_id().to_string(), old_kek.clone()); let mgr = EncryptionManager::builder() .kms_client(kms) .table_key_id("master-1") - .encryption_keys(encryption_keys) + .add_encryption_key(old_kek.clone()) .build(); // Wrap should rotate to a new KEK since the existing one is expired @@ -532,14 +538,15 @@ mod tests { // Create KEK (caches the plaintext KEK) let kek = mgr.create_kek().await.unwrap(); - let mut encryption_keys = HashMap::new(); - encryption_keys.insert(kek.key_id().to_string(), kek.clone()); let mgr = EncryptionManager::builder() .kms_client(kms) .table_key_id("master-1") - .encryption_keys(encryption_keys.clone()) + .add_encryption_key(kek.clone()) .build(); + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek); + // Wrap key metadata (unwraps KEK -- should hit cache from create_kek) let (entry, _) = mgr.wrap_key_metadata(b"test-data").await.unwrap(); @@ -621,7 +628,6 @@ mod tests { let mgr = EncryptionManager::builder() .kms_client(kms) .table_key_id("master-1") - .encryption_keys(encryption_keys.clone()) .build(); let result = mgr.unwrap_key_metadata(&entry, &encryption_keys).await; @@ -664,7 +670,6 @@ mod tests { let mgr = EncryptionManager::builder() .kms_client(kms) .table_key_id("master-1") - .encryption_keys(encryption_keys.clone()) .build(); // Unwrap should fail because the AAD (timestamp) doesn't match what was used to wrap From 6309d0b1571f3f01187daba55c3a2f1ac4f96092 Mon Sep 17 00:00:00 2001 From: Xander Date: Wed, 6 May 2026 11:28:21 +0100 Subject: [PATCH 10/16] generate_native_key_metadata crate pub --- crates/iceberg/src/encryption/manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index 6913acbb60..4a3dc9a353 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -141,7 +141,8 @@ impl EncryptionManager { /// Returns a [`StandardKeyMetadata`] containing a fresh DEK and AAD prefix. /// The caller should pass this to the Parquet writer to configure /// `FileEncryptionProperties`, and serialize it for storage in the manifest. - pub fn generate_native_key_metadata(&self) -> Result { + #[allow(dead_code)] + pub(crate) fn generate_native_key_metadata(&self) -> Result { let dek = SecureKey::generate(self.key_size); let aad_prefix = Self::generate_aad_prefix(); Ok(StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix)) From 5107145803930aa85ce250fe13a492cae6dfe476 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 8 May 2026 13:02:22 +0100 Subject: [PATCH 11/16] remove generate keys for now --- crates/iceberg/src/encryption/manager.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index 4a3dc9a353..ac10548cac 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -136,18 +136,6 @@ impl EncryptionManager { Ok(EncryptedInputFile::new(input, decryptor)) } - /// Generate key material for Parquet Modular Encryption (PME). - /// - /// Returns a [`StandardKeyMetadata`] containing a fresh DEK and AAD prefix. - /// The caller should pass this to the Parquet writer to configure - /// `FileEncryptionProperties`, and serialize it for storage in the manifest. - #[allow(dead_code)] - pub(crate) fn generate_native_key_metadata(&self) -> Result { - let dek = SecureKey::generate(self.key_size); - let aad_prefix = Self::generate_aad_prefix(); - Ok(StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix)) - } - /// Wrap key metadata bytes with a KEK for storage in table metadata. /// /// Returns `(wrapped_entry, optional_new_kek)`. The wrapped entry From a277cbfe3d321eb8baa1a8ba4d2b7f8a5e360370 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 8 May 2026 13:20:11 +0100 Subject: [PATCH 12/16] refactor encrypted io --- .../iceberg/src/encryption/file_decryptor.rs | 156 ------------------ .../iceberg/src/encryption/file_encryptor.rs | 138 ---------------- crates/iceberg/src/encryption/io.rs | 66 +++++--- crates/iceberg/src/encryption/manager.rs | 47 ++---- crates/iceberg/src/encryption/mod.rs | 4 - 5 files changed, 58 insertions(+), 353 deletions(-) delete mode 100644 crates/iceberg/src/encryption/file_decryptor.rs delete mode 100644 crates/iceberg/src/encryption/file_encryptor.rs diff --git a/crates/iceberg/src/encryption/file_decryptor.rs b/crates/iceberg/src/encryption/file_decryptor.rs deleted file mode 100644 index e44c0e1d78..0000000000 --- a/crates/iceberg/src/encryption/file_decryptor.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! File-level decryption helper for AGS1 stream-encrypted files. - -use std::fmt; -use std::sync::Arc; - -use super::crypto::{AesGcmCipher, SecureKey}; -use super::stream::AesGcmFileRead; -use crate::Result; -use crate::io::FileRead; - -/// Holds the decryption material for a single encrypted file. -/// -/// Created from a plaintext DEK and AAD prefix, then used to wrap -/// an encrypted file reader for transparent decryption on read. -pub struct AesGcmFileDecryptor { - cipher: Arc, - aad_prefix: Box<[u8]>, -} - -impl AesGcmFileDecryptor { - /// Creates a new `AesGcmFileDecryptor` from a plaintext DEK and AAD prefix. - pub fn new(dek: &[u8], aad_prefix: impl Into>) -> Result { - let key = SecureKey::new(dek)?; - let cipher = Arc::new(AesGcmCipher::new(key)); - Ok(Self { - cipher, - aad_prefix: aad_prefix.into(), - }) - } - - /// Wraps a raw encrypted-file reader in a decrypting [`AesGcmFileRead`]. - pub fn wrap_reader( - &self, - reader: Box, - encrypted_file_length: u64, - ) -> Result> { - let decrypting = AesGcmFileRead::new( - reader, - Arc::clone(&self.cipher), - self.aad_prefix.clone(), - encrypted_file_length, - )?; - Ok(Box::new(decrypting)) - } - - /// Calculates the plaintext length from an encrypted file's total length. - pub fn plaintext_length(&self, encrypted_file_length: u64) -> Result { - AesGcmFileRead::calculate_plaintext_length(encrypted_file_length) - } -} - -impl fmt::Debug for AesGcmFileDecryptor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AesGcmFileDecryptor") - .field("aad_prefix_len", &self.aad_prefix.len()) - .finish_non_exhaustive() - } -} - -#[cfg(test)] -mod tests { - use std::ops::Range; - - use bytes::Bytes; - - use super::*; - use crate::encryption::AesGcmFileEncryptor; - use crate::io::FileWrite; - - struct MemoryFileRead(Bytes); - - #[async_trait::async_trait] - impl FileRead for MemoryFileRead { - async fn read(&self, range: Range) -> Result { - Ok(self.0.slice(range.start as usize..range.end as usize)) - } - } - - struct MemoryFileWrite { - buffer: std::sync::Arc>>, - } - - #[async_trait::async_trait] - impl FileWrite for MemoryFileWrite { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.buffer.lock().unwrap().extend_from_slice(&bs); - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_wrap_reader_roundtrip() { - let key = b"0123456789abcdef"; - let aad_prefix = b"test-aad-prefix!"; - let plaintext = b"Hello from file decryptor!"; - - // Encrypt via the encryptor wrapper - let encryptor = AesGcmFileEncryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); - let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite { - buffer: buffer.clone(), - })); - writer.write(Bytes::from(plaintext.to_vec())).await.unwrap(); - writer.close().await.unwrap(); - let encrypted = buffer.lock().unwrap().clone(); - let encrypted_len = encrypted.len() as u64; - - // Decrypt via the decryptor wrapper - let decryptor = AesGcmFileDecryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let reader = decryptor - .wrap_reader( - Box::new(MemoryFileRead(Bytes::from(encrypted))), - encrypted_len, - ) - .unwrap(); - - let result = reader.read(0..plaintext.len() as u64).await.unwrap(); - assert_eq!(&result[..], plaintext); - } - - #[tokio::test] - async fn test_invalid_key_length() { - let result = AesGcmFileDecryptor::new(b"too-short", b"aad".as_slice()); - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_plaintext_length() { - let decryptor = AesGcmFileDecryptor::new(b"0123456789abcdef", b"aad".as_slice()).unwrap(); - // header(8) + nonce(12) + 10 bytes ciphertext + tag(16) = 46 - let encrypted_len = 8 + 12 + 10 + 16; - let plain_len = decryptor.plaintext_length(encrypted_len).unwrap(); - assert_eq!(plain_len, 10); - } -} diff --git a/crates/iceberg/src/encryption/file_encryptor.rs b/crates/iceberg/src/encryption/file_encryptor.rs deleted file mode 100644 index 773438ad80..0000000000 --- a/crates/iceberg/src/encryption/file_encryptor.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! File-level encryption helper for AGS1 stream-encrypted files. - -use std::fmt; -use std::sync::Arc; - -use super::crypto::{AesGcmCipher, SecureKey}; -use super::stream::AesGcmFileWrite; -use crate::Result; -use crate::io::FileWrite; - -/// Holds the encryption material for a single encrypted file. -/// -/// This is the write-side counterpart to -/// [`AesGcmFileDecryptor`](super::AesGcmFileDecryptor). Created from -/// a plaintext DEK and AAD prefix, then used to wrap an output writer -/// for transparent encryption on write. -pub struct AesGcmFileEncryptor { - cipher: Arc, - aad_prefix: Box<[u8]>, -} - -impl AesGcmFileEncryptor { - /// Creates a new `AesGcmFileEncryptor` from a plaintext DEK and AAD prefix. - pub fn new(dek: &[u8], aad_prefix: impl Into>) -> Result { - let key = SecureKey::new(dek)?; - let cipher = Arc::new(AesGcmCipher::new(key)); - Ok(Self { - cipher, - aad_prefix: aad_prefix.into(), - }) - } - - /// Wraps a raw output writer in an encrypting [`AesGcmFileWrite`]. - pub fn wrap_writer(&self, writer: Box) -> Box { - Box::new(AesGcmFileWrite::new( - writer, - Arc::clone(&self.cipher), - self.aad_prefix.clone(), - )) - } -} - -impl fmt::Debug for AesGcmFileEncryptor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AesGcmFileEncryptor") - .field("aad_prefix_len", &self.aad_prefix.len()) - .finish_non_exhaustive() - } -} - -#[cfg(test)] -mod tests { - use std::ops::Range; - - use bytes::Bytes; - - use super::*; - use crate::encryption::AesGcmFileDecryptor; - use crate::io::FileRead; - - struct MemoryFileRead(Bytes); - - #[async_trait::async_trait] - impl FileRead for MemoryFileRead { - async fn read(&self, range: Range) -> Result { - Ok(self.0.slice(range.start as usize..range.end as usize)) - } - } - - struct MemoryFileWrite { - buffer: std::sync::Arc>>, - } - - #[async_trait::async_trait] - impl FileWrite for MemoryFileWrite { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.buffer.lock().unwrap().extend_from_slice(&bs); - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_wrap_writer_roundtrip() { - let key = b"0123456789abcdef"; - let aad_prefix = b"test-aad-prefix!"; - let plaintext = b"Hello from file encryptor!"; - - // Encrypt via the encryptor wrapper - let encryptor = AesGcmFileEncryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); - let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite { - buffer: buffer.clone(), - })); - writer.write(Bytes::from(plaintext.to_vec())).await.unwrap(); - writer.close().await.unwrap(); - let encrypted = buffer.lock().unwrap().clone(); - let encrypted_len = encrypted.len() as u64; - - // Decrypt via the decryptor wrapper - let decryptor = AesGcmFileDecryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let reader = decryptor - .wrap_reader( - Box::new(MemoryFileRead(Bytes::from(encrypted))), - encrypted_len, - ) - .unwrap(); - - let result = reader.read(0..plaintext.len() as u64).await.unwrap(); - assert_eq!(&result[..], plaintext); - } - - #[tokio::test] - async fn test_invalid_key_length() { - let result = AesGcmFileEncryptor::new(b"bad-key", b"aad".as_slice()); - assert!(result.is_err()); - } -} diff --git a/crates/iceberg/src/encryption/io.rs b/crates/iceberg/src/encryption/io.rs index a7e3fa9a23..ccc5ffa209 100644 --- a/crates/iceberg/src/encryption/io.rs +++ b/crates/iceberg/src/encryption/io.rs @@ -21,8 +21,10 @@ use std::sync::Arc; use bytes::Bytes; -use super::file_decryptor::AesGcmFileDecryptor; -use super::file_encryptor::AesGcmFileEncryptor; +use super::crypto::{AesGcmCipher, SecureKey}; +use super::key_metadata::StandardKeyMetadata; +use super::stream::{AesGcmFileRead, AesGcmFileWrite}; +use crate::Result; use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; /// An AGS1 stream-encrypted input file wrapping a plain [`InputFile`]. @@ -30,13 +32,16 @@ use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; /// Transparently decrypts on read. pub struct EncryptedInputFile { inner: InputFile, - decryptor: Arc, + key_metadata: StandardKeyMetadata, } impl EncryptedInputFile { /// Creates a new encrypted input file. - pub fn new(inner: InputFile, decryptor: Arc) -> Self { - Self { inner, decryptor } + pub fn new(inner: InputFile, key_metadata: StandardKeyMetadata) -> Self { + Self { + inner, + key_metadata, + } } /// Absolute path of the file. @@ -45,33 +50,41 @@ impl EncryptedInputFile { } /// Check if file exists. - pub async fn exists(&self) -> crate::Result { + pub async fn exists(&self) -> Result { self.inner.exists().await } /// Fetch and returns metadata of file. /// /// The returned size is the **plaintext** size. - pub async fn metadata(&self) -> crate::Result { + pub async fn metadata(&self) -> Result { let raw_meta = self.inner.metadata().await?; - let plaintext_size = self.decryptor.plaintext_length(raw_meta.size)?; + let plaintext_size = AesGcmFileRead::calculate_plaintext_length(raw_meta.size)?; Ok(FileMetadata { size: plaintext_size, }) } /// Read and returns whole content of file (decrypted plaintext). - pub async fn read(&self) -> crate::Result { + pub async fn read(&self) -> Result { let meta = self.metadata().await?; let reader = self.reader().await?; reader.read(0..meta.size).await } /// Creates a reader that transparently decrypts on each read. - pub async fn reader(&self) -> crate::Result> { + pub async fn reader(&self) -> Result> { let raw_meta = self.inner.metadata().await?; let raw_reader = self.inner.reader().await?; - self.decryptor.wrap_reader(raw_reader, raw_meta.size) + let cipher = build_cipher(&self.key_metadata)?; + let aad_prefix: Box<[u8]> = self.key_metadata.aad_prefix().unwrap_or_default().into(); + let decrypting = AesGcmFileRead::new(raw_reader, cipher, aad_prefix, raw_meta.size)?; + Ok(Box::new(decrypting)) + } + + /// Returns a reference to the file's key metadata. + pub fn key_metadata(&self) -> &StandardKeyMetadata { + &self.key_metadata } /// Consumes self and returns the underlying plain input file. @@ -93,26 +106,20 @@ impl std::fmt::Debug for EncryptedInputFile { /// Transparently encrypts on write. pub struct EncryptedOutputFile { inner: OutputFile, - key_metadata: Box<[u8]>, - encryptor: Arc, + key_metadata: StandardKeyMetadata, } impl EncryptedOutputFile { /// Creates a new encrypted output file. - pub fn new( - inner: OutputFile, - key_metadata: Box<[u8]>, - encryptor: Arc, - ) -> Self { + pub fn new(inner: OutputFile, key_metadata: StandardKeyMetadata) -> Self { Self { inner, key_metadata, - encryptor, } } - /// Returns the key metadata bytes (for storage in manifest/data files). - pub fn key_metadata(&self) -> &[u8] { + /// Returns a reference to the file's key metadata. + pub fn key_metadata(&self) -> &StandardKeyMetadata { &self.key_metadata } @@ -122,20 +129,24 @@ impl EncryptedOutputFile { } /// Creates a writer that transparently encrypts on each write. - pub async fn writer(&self) -> crate::Result> { + pub async fn writer(&self) -> Result> { let raw_writer = self.inner.writer().await?; - Ok(self.encryptor.wrap_writer(raw_writer)) + let cipher = build_cipher(&self.key_metadata)?; + let aad_prefix: Box<[u8]> = self.key_metadata.aad_prefix().unwrap_or_default().into(); + Ok(Box::new(AesGcmFileWrite::new( + raw_writer, cipher, aad_prefix, + ))) } /// Write bytes to file (transparently encrypted). - pub async fn write(&self, bs: Bytes) -> crate::Result<()> { + pub async fn write(&self, bs: Bytes) -> Result<()> { let mut writer = self.writer().await?; writer.write(bs).await?; writer.close().await } /// Deletes the underlying file. - pub async fn delete(&self) -> crate::Result<()> { + pub async fn delete(&self) -> Result<()> { self.inner.delete().await } @@ -152,3 +163,8 @@ impl std::fmt::Debug for EncryptedOutputFile { .finish_non_exhaustive() } } + +fn build_cipher(metadata: &StandardKeyMetadata) -> Result> { + let key = SecureKey::new(metadata.encryption_key().as_bytes())?; + Ok(Arc::new(AesGcmCipher::new(key))) +} diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index ac10548cac..48b1f65089 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -37,8 +37,6 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -use super::file_decryptor::AesGcmFileDecryptor; -use super::file_encryptor::AesGcmFileEncryptor; use super::io::{EncryptedInputFile, EncryptedOutputFile}; use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; @@ -106,34 +104,18 @@ impl EncryptionManager { /// /// Returns an [`EncryptedOutputFile`] that transparently encrypts on /// write, along with key metadata for later decryption. - pub fn encrypt(&self, raw_output: OutputFile) -> Result { + pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile { let dek = SecureKey::generate(self.key_size); let aad_prefix = Self::generate_aad_prefix(); - - let key_metadata_bytes = StandardKeyMetadata::new(dek.as_bytes()) - .with_aad_prefix(&aad_prefix) - .encode()?; - - let encryptor = Arc::new(AesGcmFileEncryptor::new(dek.as_bytes(), aad_prefix)?); - - Ok(EncryptedOutputFile::new( - raw_output, - key_metadata_bytes, - encryptor, - )) + let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix); + EncryptedOutputFile::new(raw_output, metadata) } /// Decrypt an encrypted input file, returning an [`EncryptedInputFile`] /// that transparently decrypts on read. pub fn decrypt(&self, input: InputFile, key_metadata: &[u8]) -> Result { let metadata = StandardKeyMetadata::decode(key_metadata)?; - - let decryptor = Arc::new(AesGcmFileDecryptor::new( - metadata.encryption_key().as_bytes(), - metadata.aad_prefix().unwrap_or_default(), - )?); - - Ok(EncryptedInputFile::new(input, decryptor)) + Ok(EncryptedInputFile::new(input, metadata)) } /// Wrap key metadata bytes with a KEK for storage in table metadata. @@ -549,19 +531,23 @@ mod tests { #[tokio::test] async fn test_decrypt_with_known_key() { - use crate::io::FileIO; + use crate::io::{FileIO, FileWrite}; let io = FileIO::new_with_memory(); let path = "memory:///test/encrypted.bin"; - // Encrypt data using AesGcmFileEncryptor directly (independent of EncryptionManager) + // Encrypt data using AesGcmFileWrite directly (independent of EncryptionManager) let dek = b"0123456789abcdef"; - let aad_prefix = b"test-aad-prefix!"; + let aad_prefix: Box<[u8]> = b"test-aad-prefix!".as_slice().into(); let plaintext = b"Hello, encrypted Iceberg!"; - let encryptor = AesGcmFileEncryptor::new(dek.as_slice(), aad_prefix.as_slice()).unwrap(); + let cipher = Arc::new(AesGcmCipher::new(SecureKey::new(dek.as_slice()).unwrap())); let output = io.new_output(path).unwrap(); - let mut writer = encryptor.wrap_writer(output.writer().await.unwrap()); + let mut writer = crate::encryption::AesGcmFileWrite::new( + output.writer().await.unwrap(), + cipher, + aad_prefix.clone(), + ); writer .write(bytes::Bytes::from(plaintext.to_vec())) .await @@ -570,7 +556,7 @@ mod tests { // Build key metadata with the known DEK let key_metadata_bytes = StandardKeyMetadata::new(dek.as_slice()) - .with_aad_prefix(aad_prefix) + .with_aad_prefix(&aad_prefix) .encode() .unwrap(); @@ -684,16 +670,17 @@ mod tests { .build(); let output = io.new_output(path).unwrap(); - let encrypted_output = mgr.encrypt(output).unwrap(); + let encrypted_output = mgr.encrypt(output); let plaintext = b"Hello, encrypted Iceberg round-trip!"; + let serialized_metadata = encrypted_output.key_metadata().encode().unwrap(); encrypted_output .write(bytes::Bytes::from(plaintext.to_vec())) .await .unwrap(); let input = io.new_input(path).unwrap(); - let decrypted_file = mgr.decrypt(input, encrypted_output.key_metadata()).unwrap(); + let decrypted_file = mgr.decrypt(input, &serialized_metadata).unwrap(); let content = decrypted_file.read().await.unwrap(); assert_eq!(&content[..], plaintext); diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 4dd1b53759..12ee76e5e0 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,8 +21,6 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; -mod file_decryptor; -mod file_encryptor; pub(crate) mod io; pub(crate) mod key_metadata; pub mod kms; @@ -30,8 +28,6 @@ mod manager; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -pub use file_decryptor::AesGcmFileDecryptor; -pub use file_encryptor::AesGcmFileEncryptor; pub use io::{EncryptedInputFile, EncryptedOutputFile}; pub use key_metadata::StandardKeyMetadata; pub use kms::{GeneratedKey, KeyManagementClient}; From 3dc68694ebdfb894687caff89b7ad5388bd04f52 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 8 May 2026 13:28:04 +0100 Subject: [PATCH 13/16] remove decrypt --- crates/iceberg/src/encryption/manager.rs | 61 ++---------------------- 1 file changed, 3 insertions(+), 58 deletions(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index 48b1f65089..3ed200d8b7 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -40,7 +40,7 @@ use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; use super::io::{EncryptedInputFile, EncryptedOutputFile}; use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; -use crate::io::{InputFile, OutputFile}; +use crate::io::OutputFile; use crate::spec::EncryptedKey; use crate::{Error, ErrorKind, Result}; @@ -111,13 +111,6 @@ impl EncryptionManager { EncryptedOutputFile::new(raw_output, metadata) } - /// Decrypt an encrypted input file, returning an [`EncryptedInputFile`] - /// that transparently decrypts on read. - pub fn decrypt(&self, input: InputFile, key_metadata: &[u8]) -> Result { - let metadata = StandardKeyMetadata::decode(key_metadata)?; - Ok(EncryptedInputFile::new(input, metadata)) - } - /// Wrap key metadata bytes with a KEK for storage in table metadata. /// /// Returns `(wrapped_entry, optional_new_kek)`. The wrapped entry @@ -529,55 +522,6 @@ mod tests { assert_eq!(decrypted, b"test-data"); } - #[tokio::test] - async fn test_decrypt_with_known_key() { - use crate::io::{FileIO, FileWrite}; - - let io = FileIO::new_with_memory(); - let path = "memory:///test/encrypted.bin"; - - // Encrypt data using AesGcmFileWrite directly (independent of EncryptionManager) - let dek = b"0123456789abcdef"; - let aad_prefix: Box<[u8]> = b"test-aad-prefix!".as_slice().into(); - let plaintext = b"Hello, encrypted Iceberg!"; - - let cipher = Arc::new(AesGcmCipher::new(SecureKey::new(dek.as_slice()).unwrap())); - let output = io.new_output(path).unwrap(); - let mut writer = crate::encryption::AesGcmFileWrite::new( - output.writer().await.unwrap(), - cipher, - aad_prefix.clone(), - ); - writer - .write(bytes::Bytes::from(plaintext.to_vec())) - .await - .unwrap(); - writer.close().await.unwrap(); - - // Build key metadata with the known DEK - let key_metadata_bytes = StandardKeyMetadata::new(dek.as_slice()) - .with_aad_prefix(&aad_prefix) - .encode() - .unwrap(); - - // Decrypt via EncryptionManager - let kms = MemoryKeyManagementClient::new(); - kms.add_master_key("master-1").unwrap(); - let mgr = EncryptionManager::builder() - .kms_client(Arc::new(kms) as Arc) - .table_key_id("master-1") - .build(); - - let input_file = io.new_input(path).unwrap(); - let decrypted_file = mgr.decrypt(input_file, &key_metadata_bytes).unwrap(); - - let content = decrypted_file.read().await.unwrap(); - assert_eq!(&content[..], plaintext); - - let meta = decrypted_file.metadata().await.unwrap(); - assert_eq!(meta.size, plaintext.len() as u64); - } - #[tokio::test] async fn test_unwrap_fails_when_kek_missing_timestamp() { let kms = create_test_kms(); @@ -680,7 +624,8 @@ mod tests { .unwrap(); let input = io.new_input(path).unwrap(); - let decrypted_file = mgr.decrypt(input, &serialized_metadata).unwrap(); + let parsed_metadata = StandardKeyMetadata::decode(&serialized_metadata).unwrap(); + let decrypted_file = EncryptedInputFile::new(input, parsed_metadata); let content = decrypted_file.read().await.unwrap(); assert_eq!(&content[..], plaintext); From 5a14a3ebc0c91708219c92d236d0dc61d67d4272 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 8 May 2026 13:42:08 +0100 Subject: [PATCH 14/16] fix --- crates/iceberg/src/encryption/manager.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs index 3ed200d8b7..b70ce1cc71 100644 --- a/crates/iceberg/src/encryption/manager.rs +++ b/crates/iceberg/src/encryption/manager.rs @@ -37,7 +37,7 @@ use uuid::Uuid; const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -use super::io::{EncryptedInputFile, EncryptedOutputFile}; +use super::io::EncryptedOutputFile; use super::key_metadata::StandardKeyMetadata; use super::kms::KeyManagementClient; use crate::io::OutputFile; @@ -349,6 +349,7 @@ impl EncryptionManager { #[cfg(test)] mod tests { use super::*; + use crate::encryption::EncryptedInputFile; use crate::encryption::kms::MemoryKeyManagementClient; fn create_test_kms() -> Arc { From f5fe2f8f61801a54090bcc42565005a1e7f0cdc2 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 8 May 2026 13:57:18 +0100 Subject: [PATCH 15/16] io test --- crates/iceberg/src/encryption/io.rs | 44 +++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/crates/iceberg/src/encryption/io.rs b/crates/iceberg/src/encryption/io.rs index ccc5ffa209..bb4449b310 100644 --- a/crates/iceberg/src/encryption/io.rs +++ b/crates/iceberg/src/encryption/io.rs @@ -168,3 +168,47 @@ fn build_cipher(metadata: &StandardKeyMetadata) -> Result> { let key = SecureKey::new(metadata.encryption_key().as_bytes())?; Ok(Arc::new(AesGcmCipher::new(key))) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIO; + + fn key_metadata() -> StandardKeyMetadata { + StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!") + } + + #[tokio::test] + async fn test_write_read_roundtrip() { + let fileio = FileIO::new_with_memory(); + let path = "memory:///test/io_roundtrip.bin"; + let plaintext = b"Hello from EncryptedInputFile/EncryptedOutputFile!"; + + let output = EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata()); + output.write(Bytes::from(plaintext.to_vec())).await.unwrap(); + + let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(), key_metadata()); + let content = input.read().await.unwrap(); + assert_eq!(&content[..], plaintext); + } + + #[tokio::test] + async fn test_metadata_returns_plaintext_size() { + let fileio = FileIO::new_with_memory(); + let path = "memory:///test/io_metadata.bin"; + let plaintext = b"some bytes to measure"; + + let output = EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata()); + output.write(Bytes::from(plaintext.to_vec())).await.unwrap(); + + let raw_size = fileio.new_input(path).unwrap().metadata().await.unwrap().size; + assert!( + raw_size > plaintext.len() as u64, + "encrypted file should be larger than plaintext (header + nonce + tag)" + ); + + let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(), key_metadata()); + let meta = input.metadata().await.unwrap(); + assert_eq!(meta.size, plaintext.len() as u64); + } +} From 9ec0e376e2f1ae7f8a4d6626157816eea62a4fe2 Mon Sep 17 00:00:00 2001 From: Xander Date: Fri, 8 May 2026 13:57:27 +0100 Subject: [PATCH 16/16] fmt --- crates/iceberg/src/encryption/io.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/encryption/io.rs b/crates/iceberg/src/encryption/io.rs index bb4449b310..c3d81dd850 100644 --- a/crates/iceberg/src/encryption/io.rs +++ b/crates/iceberg/src/encryption/io.rs @@ -201,7 +201,13 @@ mod tests { let output = EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata()); output.write(Bytes::from(plaintext.to_vec())).await.unwrap(); - let raw_size = fileio.new_input(path).unwrap().metadata().await.unwrap().size; + let raw_size = fileio + .new_input(path) + .unwrap() + .metadata() + .await + .unwrap() + .size; assert!( raw_size > plaintext.len() as u64, "encrypted file should be larger than plaintext (header + nonce + tag)"