diff --git a/crates/iceberg/src/encryption/crypto.rs b/crates/iceberg/src/encryption/crypto.rs index 0f6a9eff43..02d085ebf8 100644 --- a/crates/iceberg/src/encryption/crypto.rs +++ b/crates/iceberg/src/encryption/crypto.rs @@ -177,6 +177,15 @@ impl SecureKey { } } +impl TryFrom for SecureKey { + type Error = Error; + + fn try_from(key: SensitiveBytes) -> Result { + let key_size = AesKeySize::from_key_length(key.len())?; + Ok(Self { key, key_size }) + } +} + /// AES-GCM cipher for encrypting and decrypting data. pub struct AesGcmCipher { key: SensitiveBytes, diff --git a/crates/iceberg/src/encryption/file_decryptor.rs b/crates/iceberg/src/encryption/file_decryptor.rs deleted file mode 100644 index e44c0e1d78..0000000000 --- a/crates/iceberg/src/encryption/file_decryptor.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! File-level decryption helper for AGS1 stream-encrypted files. - -use std::fmt; -use std::sync::Arc; - -use super::crypto::{AesGcmCipher, SecureKey}; -use super::stream::AesGcmFileRead; -use crate::Result; -use crate::io::FileRead; - -/// Holds the decryption material for a single encrypted file. -/// -/// Created from a plaintext DEK and AAD prefix, then used to wrap -/// an encrypted file reader for transparent decryption on read. -pub struct AesGcmFileDecryptor { - cipher: Arc, - aad_prefix: Box<[u8]>, -} - -impl AesGcmFileDecryptor { - /// Creates a new `AesGcmFileDecryptor` from a plaintext DEK and AAD prefix. - pub fn new(dek: &[u8], aad_prefix: impl Into>) -> Result { - let key = SecureKey::new(dek)?; - let cipher = Arc::new(AesGcmCipher::new(key)); - Ok(Self { - cipher, - aad_prefix: aad_prefix.into(), - }) - } - - /// Wraps a raw encrypted-file reader in a decrypting [`AesGcmFileRead`]. - pub fn wrap_reader( - &self, - reader: Box, - encrypted_file_length: u64, - ) -> Result> { - let decrypting = AesGcmFileRead::new( - reader, - Arc::clone(&self.cipher), - self.aad_prefix.clone(), - encrypted_file_length, - )?; - Ok(Box::new(decrypting)) - } - - /// Calculates the plaintext length from an encrypted file's total length. - pub fn plaintext_length(&self, encrypted_file_length: u64) -> Result { - AesGcmFileRead::calculate_plaintext_length(encrypted_file_length) - } -} - -impl fmt::Debug for AesGcmFileDecryptor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AesGcmFileDecryptor") - .field("aad_prefix_len", &self.aad_prefix.len()) - .finish_non_exhaustive() - } -} - -#[cfg(test)] -mod tests { - use std::ops::Range; - - use bytes::Bytes; - - use super::*; - use crate::encryption::AesGcmFileEncryptor; - use crate::io::FileWrite; - - struct MemoryFileRead(Bytes); - - #[async_trait::async_trait] - impl FileRead for MemoryFileRead { - async fn read(&self, range: Range) -> Result { - Ok(self.0.slice(range.start as usize..range.end as usize)) - } - } - - struct MemoryFileWrite { - buffer: std::sync::Arc>>, - } - - #[async_trait::async_trait] - impl FileWrite for MemoryFileWrite { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.buffer.lock().unwrap().extend_from_slice(&bs); - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_wrap_reader_roundtrip() { - let key = b"0123456789abcdef"; - let aad_prefix = b"test-aad-prefix!"; - let plaintext = b"Hello from file decryptor!"; - - // Encrypt via the encryptor wrapper - let encryptor = AesGcmFileEncryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); - let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite { - buffer: buffer.clone(), - })); - writer.write(Bytes::from(plaintext.to_vec())).await.unwrap(); - writer.close().await.unwrap(); - let encrypted = buffer.lock().unwrap().clone(); - let encrypted_len = encrypted.len() as u64; - - // Decrypt via the decryptor wrapper - let decryptor = AesGcmFileDecryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let reader = decryptor - .wrap_reader( - Box::new(MemoryFileRead(Bytes::from(encrypted))), - encrypted_len, - ) - .unwrap(); - - let result = reader.read(0..plaintext.len() as u64).await.unwrap(); - assert_eq!(&result[..], plaintext); - } - - #[tokio::test] - async fn test_invalid_key_length() { - let result = AesGcmFileDecryptor::new(b"too-short", b"aad".as_slice()); - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_plaintext_length() { - let decryptor = AesGcmFileDecryptor::new(b"0123456789abcdef", b"aad".as_slice()).unwrap(); - // header(8) + nonce(12) + 10 bytes ciphertext + tag(16) = 46 - let encrypted_len = 8 + 12 + 10 + 16; - let plain_len = decryptor.plaintext_length(encrypted_len).unwrap(); - assert_eq!(plain_len, 10); - } -} diff --git a/crates/iceberg/src/encryption/file_encryptor.rs b/crates/iceberg/src/encryption/file_encryptor.rs deleted file mode 100644 index 773438ad80..0000000000 --- a/crates/iceberg/src/encryption/file_encryptor.rs +++ /dev/null @@ -1,138 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! File-level encryption helper for AGS1 stream-encrypted files. - -use std::fmt; -use std::sync::Arc; - -use super::crypto::{AesGcmCipher, SecureKey}; -use super::stream::AesGcmFileWrite; -use crate::Result; -use crate::io::FileWrite; - -/// Holds the encryption material for a single encrypted file. -/// -/// This is the write-side counterpart to -/// [`AesGcmFileDecryptor`](super::AesGcmFileDecryptor). Created from -/// a plaintext DEK and AAD prefix, then used to wrap an output writer -/// for transparent encryption on write. -pub struct AesGcmFileEncryptor { - cipher: Arc, - aad_prefix: Box<[u8]>, -} - -impl AesGcmFileEncryptor { - /// Creates a new `AesGcmFileEncryptor` from a plaintext DEK and AAD prefix. - pub fn new(dek: &[u8], aad_prefix: impl Into>) -> Result { - let key = SecureKey::new(dek)?; - let cipher = Arc::new(AesGcmCipher::new(key)); - Ok(Self { - cipher, - aad_prefix: aad_prefix.into(), - }) - } - - /// Wraps a raw output writer in an encrypting [`AesGcmFileWrite`]. - pub fn wrap_writer(&self, writer: Box) -> Box { - Box::new(AesGcmFileWrite::new( - writer, - Arc::clone(&self.cipher), - self.aad_prefix.clone(), - )) - } -} - -impl fmt::Debug for AesGcmFileEncryptor { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("AesGcmFileEncryptor") - .field("aad_prefix_len", &self.aad_prefix.len()) - .finish_non_exhaustive() - } -} - -#[cfg(test)] -mod tests { - use std::ops::Range; - - use bytes::Bytes; - - use super::*; - use crate::encryption::AesGcmFileDecryptor; - use crate::io::FileRead; - - struct MemoryFileRead(Bytes); - - #[async_trait::async_trait] - impl FileRead for MemoryFileRead { - async fn read(&self, range: Range) -> Result { - Ok(self.0.slice(range.start as usize..range.end as usize)) - } - } - - struct MemoryFileWrite { - buffer: std::sync::Arc>>, - } - - #[async_trait::async_trait] - impl FileWrite for MemoryFileWrite { - async fn write(&mut self, bs: Bytes) -> Result<()> { - self.buffer.lock().unwrap().extend_from_slice(&bs); - Ok(()) - } - - async fn close(&mut self) -> Result<()> { - Ok(()) - } - } - - #[tokio::test] - async fn test_wrap_writer_roundtrip() { - let key = b"0123456789abcdef"; - let aad_prefix = b"test-aad-prefix!"; - let plaintext = b"Hello from file encryptor!"; - - // Encrypt via the encryptor wrapper - let encryptor = AesGcmFileEncryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let buffer = std::sync::Arc::new(std::sync::Mutex::new(Vec::new())); - let mut writer = encryptor.wrap_writer(Box::new(MemoryFileWrite { - buffer: buffer.clone(), - })); - writer.write(Bytes::from(plaintext.to_vec())).await.unwrap(); - writer.close().await.unwrap(); - let encrypted = buffer.lock().unwrap().clone(); - let encrypted_len = encrypted.len() as u64; - - // Decrypt via the decryptor wrapper - let decryptor = AesGcmFileDecryptor::new(key.as_slice(), aad_prefix.as_slice()).unwrap(); - let reader = decryptor - .wrap_reader( - Box::new(MemoryFileRead(Bytes::from(encrypted))), - encrypted_len, - ) - .unwrap(); - - let result = reader.read(0..plaintext.len() as u64).await.unwrap(); - assert_eq!(&result[..], plaintext); - } - - #[tokio::test] - async fn test_invalid_key_length() { - let result = AesGcmFileEncryptor::new(b"bad-key", b"aad".as_slice()); - assert!(result.is_err()); - } -} diff --git a/crates/iceberg/src/encryption/io.rs b/crates/iceberg/src/encryption/io.rs new file mode 100644 index 0000000000..c3d81dd850 --- /dev/null +++ b/crates/iceberg/src/encryption/io.rs @@ -0,0 +1,220 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encrypted file wrappers for InputFile / OutputFile. + +use std::sync::Arc; + +use bytes::Bytes; + +use super::crypto::{AesGcmCipher, SecureKey}; +use super::key_metadata::StandardKeyMetadata; +use super::stream::{AesGcmFileRead, AesGcmFileWrite}; +use crate::Result; +use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; + +/// An AGS1 stream-encrypted input file wrapping a plain [`InputFile`]. +/// +/// Transparently decrypts on read. +pub struct EncryptedInputFile { + inner: InputFile, + key_metadata: StandardKeyMetadata, +} + +impl EncryptedInputFile { + /// Creates a new encrypted input file. + pub fn new(inner: InputFile, key_metadata: StandardKeyMetadata) -> Self { + Self { + inner, + key_metadata, + } + } + + /// Absolute path of the file. + pub fn location(&self) -> &str { + self.inner.location() + } + + /// Check if file exists. + pub async fn exists(&self) -> Result { + self.inner.exists().await + } + + /// Fetch and returns metadata of file. + /// + /// The returned size is the **plaintext** size. + pub async fn metadata(&self) -> Result { + let raw_meta = self.inner.metadata().await?; + let plaintext_size = AesGcmFileRead::calculate_plaintext_length(raw_meta.size)?; + Ok(FileMetadata { + size: plaintext_size, + }) + } + + /// Read and returns whole content of file (decrypted plaintext). + pub async fn read(&self) -> Result { + let meta = self.metadata().await?; + let reader = self.reader().await?; + reader.read(0..meta.size).await + } + + /// Creates a reader that transparently decrypts on each read. + pub async fn reader(&self) -> Result> { + let raw_meta = self.inner.metadata().await?; + let raw_reader = self.inner.reader().await?; + let cipher = build_cipher(&self.key_metadata)?; + let aad_prefix: Box<[u8]> = self.key_metadata.aad_prefix().unwrap_or_default().into(); + let decrypting = AesGcmFileRead::new(raw_reader, cipher, aad_prefix, raw_meta.size)?; + Ok(Box::new(decrypting)) + } + + /// Returns a reference to the file's key metadata. + pub fn key_metadata(&self) -> &StandardKeyMetadata { + &self.key_metadata + } + + /// Consumes self and returns the underlying plain input file. + pub fn into_inner(self) -> InputFile { + self.inner + } +} + +impl std::fmt::Debug for EncryptedInputFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EncryptedInputFile") + .field("path", &self.inner.location()) + .finish_non_exhaustive() + } +} + +/// An AGS1 stream-encrypted output file wrapping a plain [`OutputFile`]. +/// +/// Transparently encrypts on write. +pub struct EncryptedOutputFile { + inner: OutputFile, + key_metadata: StandardKeyMetadata, +} + +impl EncryptedOutputFile { + /// Creates a new encrypted output file. + pub fn new(inner: OutputFile, key_metadata: StandardKeyMetadata) -> Self { + Self { + inner, + key_metadata, + } + } + + /// Returns a reference to the file's key metadata. + pub fn key_metadata(&self) -> &StandardKeyMetadata { + &self.key_metadata + } + + /// Absolute path of the file. + pub fn location(&self) -> &str { + self.inner.location() + } + + /// Creates a writer that transparently encrypts on each write. + pub async fn writer(&self) -> Result> { + let raw_writer = self.inner.writer().await?; + let cipher = build_cipher(&self.key_metadata)?; + let aad_prefix: Box<[u8]> = self.key_metadata.aad_prefix().unwrap_or_default().into(); + Ok(Box::new(AesGcmFileWrite::new( + raw_writer, cipher, aad_prefix, + ))) + } + + /// Write bytes to file (transparently encrypted). + pub async fn write(&self, bs: Bytes) -> Result<()> { + let mut writer = self.writer().await?; + writer.write(bs).await?; + writer.close().await + } + + /// Deletes the underlying file. + pub async fn delete(&self) -> Result<()> { + self.inner.delete().await + } + + /// Consumes self and returns the underlying plain output file. + pub fn into_inner(self) -> OutputFile { + self.inner + } +} + +impl std::fmt::Debug for EncryptedOutputFile { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("EncryptedOutputFile") + .field("path", &self.inner.location()) + .finish_non_exhaustive() + } +} + +fn build_cipher(metadata: &StandardKeyMetadata) -> Result> { + let key = SecureKey::new(metadata.encryption_key().as_bytes())?; + Ok(Arc::new(AesGcmCipher::new(key))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIO; + + fn key_metadata() -> StandardKeyMetadata { + StandardKeyMetadata::new(b"0123456789abcdef").with_aad_prefix(b"test-aad-prefix!") + } + + #[tokio::test] + async fn test_write_read_roundtrip() { + let fileio = FileIO::new_with_memory(); + let path = "memory:///test/io_roundtrip.bin"; + let plaintext = b"Hello from EncryptedInputFile/EncryptedOutputFile!"; + + let output = EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata()); + output.write(Bytes::from(plaintext.to_vec())).await.unwrap(); + + let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(), key_metadata()); + let content = input.read().await.unwrap(); + assert_eq!(&content[..], plaintext); + } + + #[tokio::test] + async fn test_metadata_returns_plaintext_size() { + let fileio = FileIO::new_with_memory(); + let path = "memory:///test/io_metadata.bin"; + let plaintext = b"some bytes to measure"; + + let output = EncryptedOutputFile::new(fileio.new_output(path).unwrap(), key_metadata()); + output.write(Bytes::from(plaintext.to_vec())).await.unwrap(); + + let raw_size = fileio + .new_input(path) + .unwrap() + .metadata() + .await + .unwrap() + .size; + assert!( + raw_size > plaintext.len() as u64, + "encrypted file should be larger than plaintext (header + nonce + tag)" + ); + + let input = EncryptedInputFile::new(fileio.new_input(path).unwrap(), key_metadata()); + let meta = input.metadata().await.unwrap(); + assert_eq!(meta.size, plaintext.len() as u64); + } +} diff --git a/crates/iceberg/src/encryption/manager.rs b/crates/iceberg/src/encryption/manager.rs new file mode 100644 index 0000000000..b70ce1cc71 --- /dev/null +++ b/crates/iceberg/src/encryption/manager.rs @@ -0,0 +1,634 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Encryption manager for file-level encryption and two-layer envelope key management. +//! +//! [`EncryptionManager`] provides file-level `decrypt` / `encrypt` +//! operations matching Java's `org.apache.iceberg.encryption.EncryptionManager`, +//! using envelope encryption: +//! - A master key (in KMS) wraps a Key Encryption Key (KEK) +//! - The KEK wraps Data Encryption Keys (DEKs) locally + +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use std::time::Duration; + +use aes_gcm::aead::OsRng; +use aes_gcm::aead::rand_core::RngCore; +use chrono::Utc; +use moka::future::Cache; +use uuid::Uuid; + +const MILLIS_IN_DAY: i64 = 24 * 60 * 60 * 1000; + +use super::crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; +use super::io::EncryptedOutputFile; +use super::key_metadata::StandardKeyMetadata; +use super::kms::KeyManagementClient; +use crate::io::OutputFile; +use crate::spec::EncryptedKey; +use crate::{Error, ErrorKind, Result}; + +/// Property key for the KEK creation timestamp (milliseconds since epoch). +/// Matches Java's `StandardEncryptionManager.KEY_TIMESTAMP`. +pub const KEK_CREATED_AT_PROPERTY: &str = "KEY_TIMESTAMP"; + +/// Default KEK lifespan in days, per NIST SP 800-57. +const DEFAULT_KEK_LIFESPAN_DAYS: i64 = 730; + +/// Default cache TTL for unwrapped KEKs. +const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(3600); + +/// Default AAD prefix length in bytes. +/// Matches Java's `TableProperties.ENCRYPTION_AAD_LENGTH_DEFAULT`. +const AAD_PREFIX_LENGTH: usize = 16; + +/// File-level encryption manager using two-layer envelope encryption. +/// +/// Uses an async cache for unwrapped KEK bytes to avoid repeated KMS calls. +#[derive(typed_builder::TypedBuilder)] +#[builder(mutators( + /// Add an encryption key (KEK or wrapped key metadata entry). + pub fn add_encryption_key(&mut self, key: EncryptedKey) { + self.encryption_keys.insert(key.key_id().to_string(), key); + } + /// Set all encryption keys from table metadata. + pub fn encryption_keys(&mut self, keys: HashMap) { + self.encryption_keys = keys; + } +))] +pub struct EncryptionManager { + kms_client: Arc, + #[builder( + default = Cache::builder().time_to_live(DEFAULT_CACHE_TTL).build(), + setter(skip) + )] + kek_cache: Cache, + /// AES key size for DEK generation. Defaults to 128-bit. + #[builder(default = AesKeySize::default())] + key_size: AesKeySize, + /// Master key ID from table property `encryption.key-id`. + #[builder(setter(into))] + table_key_id: String, + /// All encryption keys from table metadata (KEKs and wrapped key metadata entries). + #[builder(default, via_mutators)] + encryption_keys: HashMap, +} + +impl fmt::Debug for EncryptionManager { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EncryptionManager") + .field("key_size", &self.key_size) + .field("table_key_id", &self.table_key_id) + .finish_non_exhaustive() + } +} + +impl EncryptionManager { + /// Encrypt a file with AGS1 stream encryption. + /// + /// Returns an [`EncryptedOutputFile`] that transparently encrypts on + /// write, along with key metadata for later decryption. + pub fn encrypt(&self, raw_output: OutputFile) -> EncryptedOutputFile { + let dek = SecureKey::generate(self.key_size); + let aad_prefix = Self::generate_aad_prefix(); + let metadata = StandardKeyMetadata::new(dek.as_bytes()).with_aad_prefix(&aad_prefix); + EncryptedOutputFile::new(raw_output, metadata) + } + + /// Wrap key metadata bytes with a KEK for storage in table metadata. + /// + /// Returns `(wrapped_entry, optional_new_kek)`. The wrapped entry + /// contains the key metadata encrypted by the KEK, and should be stored + /// in `TableMetadata.encryption_keys`. The optional second element is a + /// newly created KEK — present only when no active KEK existed (first + /// write) or the existing KEK expired (rotation). When `Some`, the + /// caller must also persist this KEK in table metadata so that future + /// `unwrap_key_metadata` calls can find it. + pub async fn wrap_key_metadata( + &self, + key_metadata: &[u8], + ) -> Result<(EncryptedKey, Option)> { + let (kek, new_kek) = match self.find_active_kek(&self.encryption_keys) { + Some(existing) => (existing.clone(), None), + None => { + let created = self.create_kek().await?; + let cloned = created.clone(); + (created, Some(cloned)) + } + }; + + let kek_bytes = self.unwrap_kek(&kek).await?; + + // Use the KEK timestamp as AAD to prevent timestamp tampering attacks. + let aad = Self::kek_timestamp_aad(&kek)?; + let wrapped_metadata = self.wrap_dek_with_kek(key_metadata, &kek_bytes, Some(aad))?; + + let wrapped_key = EncryptedKey::builder() + .key_id(Uuid::new_v4().to_string()) + .encrypted_key_metadata(wrapped_metadata) + .encrypted_by_id(kek.key_id()) + .build(); + + Ok((wrapped_key, new_kek)) + } + + /// Unwrap key metadata that was KEK-wrapped and stored in table metadata. + /// + /// Given an `EncryptedKey` entry (from a manifest list or snapshot) and + /// the full map of encryption keys from `TableMetadata`, returns the + /// unwrapped key metadata bytes (e.g. serialized `StandardKeyMetadata`). + pub async fn unwrap_key_metadata( + &self, + encrypted_key: &EncryptedKey, + encryption_keys: &HashMap, + ) -> Result> { + let kek_key_id = encrypted_key.encrypted_by_id().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "EncryptedKey '{}' has no encrypted_by_id", + encrypted_key.key_id() + ), + ) + })?; + + self.decrypt_dek( + kek_key_id, + encrypted_key.encrypted_key_metadata(), + encryption_keys, + ) + .await + } + + /// Create a new KEK, wrapped by the table's master key. + async fn create_kek(&self) -> Result { + let (plaintext_kek, wrapped_kek) = if self.kms_client.supports_key_generation() { + let result = self.kms_client.generate_key(&self.table_key_id).await?; + (result.key().clone(), result.wrapped_key().to_vec()) + } else { + let plaintext_key = SecureKey::generate(self.key_size); + let wrapped = self + .kms_client + .wrap_key(plaintext_key.as_bytes(), &self.table_key_id) + .await?; + + (SensitiveBytes::new(plaintext_key.as_bytes()), wrapped) + }; + + let key_id = Uuid::new_v4().to_string(); + let now_ms = Utc::now().timestamp_millis(); + + let mut properties = HashMap::new(); + properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), now_ms.to_string()); + + self.kek_cache.insert(key_id.clone(), plaintext_kek).await; + + Ok(EncryptedKey::builder() + .key_id(key_id) + .encrypted_key_metadata(wrapped_kek) + .encrypted_by_id(&self.table_key_id) + .properties(properties) + .build()) + } + + /// Check whether a KEK has exceeded its configured lifespan (730 days per NIST SP 800-57). + fn is_kek_expired(&self, kek: &EncryptedKey) -> bool { + let created_at_ms = match kek + .properties() + .get(KEK_CREATED_AT_PROPERTY) + .and_then(|ts| ts.parse::().ok()) + { + Some(ts) => ts, + None => return true, // No timestamp -> treat as expired + }; + + let now_ms = Utc::now().timestamp_millis(); + let lifespan_ms = DEFAULT_KEK_LIFESPAN_DAYS * MILLIS_IN_DAY; + (now_ms - created_at_ms) >= lifespan_ms + } + + /// Find the latest non-expired KEK for the table's master key. + fn find_active_kek<'a>( + &self, + encryption_keys: &'a HashMap, + ) -> Option<&'a EncryptedKey> { + encryption_keys + .values() + .filter(|kek| { + kek.encrypted_by_id() + .map(|id| id == self.table_key_id) + .unwrap_or(false) + && !self.is_kek_expired(kek) + }) + .max_by_key(|kek| { + kek.properties() + .get(KEK_CREATED_AT_PROPERTY) + .and_then(|ts| ts.parse::().ok()) + .unwrap_or(0) + }) + } + + /// Unwrap a KEK using the KMS, with caching to avoid repeated calls. + async fn unwrap_kek(&self, kek: &EncryptedKey) -> Result { + let cache_key = kek.key_id().to_string(); + + if let Some(cached) = self.kek_cache.get(&cache_key).await { + return Ok(cached); + } + + let master_key_id = kek.encrypted_by_id().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("KEK '{}' has no encrypted_by_id", kek.key_id()), + ) + })?; + + let plaintext = self + .kms_client + .unwrap_key(kek.encrypted_key_metadata(), master_key_id) + .await?; + + self.kek_cache.insert(cache_key, plaintext.clone()).await; + + Ok(plaintext) + } + + /// Decrypt a wrapped DEK using the KEK identified by `kek_key_id`. + async fn decrypt_dek( + &self, + kek_key_id: &str, + wrapped_dek: &[u8], + encryption_keys: &HashMap, + ) -> Result> { + let kek = encryption_keys.get(kek_key_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("KEK not found in encryption keys: {kek_key_id}"), + ) + })?; + + // KEK timestamp as AAD prevents timestamp tampering. + let aad = Self::kek_timestamp_aad(kek)?; + + let kek_bytes = self.unwrap_kek(kek).await?; + self.unwrap_dek_with_kek(wrapped_dek, &kek_bytes, Some(aad)) + .map_err(|e| { + Error::new( + e.kind(), + format!("Failed to unwrap key metadata with KEK '{kek_key_id}'"), + ) + .with_source(e) + }) + } + + /// Extract the KEK timestamp for use as AAD. Returns an error if missing. + fn kek_timestamp_aad(kek: &EncryptedKey) -> Result<&[u8]> { + kek.properties() + .get(KEK_CREATED_AT_PROPERTY) + .map(|ts| ts.as_bytes()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "KEK '{}' is missing required '{}' property", + kek.key_id(), + KEK_CREATED_AT_PROPERTY + ), + ) + }) + } + + /// Generate a random AAD prefix for file encryption. + fn generate_aad_prefix() -> Box<[u8]> { + let mut prefix = vec![0u8; AAD_PREFIX_LENGTH]; + OsRng.fill_bytes(&mut prefix); + prefix.into_boxed_slice() + } + + /// Wrap a DEK with a KEK using local AES-GCM. + fn wrap_dek_with_kek( + &self, + dek: &[u8], + kek: &SensitiveBytes, + aad: Option<&[u8]>, + ) -> Result> { + let key = SecureKey::try_from(kek.clone())?; + let cipher = AesGcmCipher::new(key); + cipher.encrypt(dek, aad) + } + + /// Unwrap a DEK with a KEK using local AES-GCM. + fn unwrap_dek_with_kek( + &self, + wrapped_dek: &[u8], + kek: &SensitiveBytes, + aad: Option<&[u8]>, + ) -> Result> { + let key = SecureKey::try_from(kek.clone())?; + let cipher = AesGcmCipher::new(key); + cipher.decrypt(wrapped_dek, aad) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::encryption::EncryptedInputFile; + use crate::encryption::kms::MemoryKeyManagementClient; + + fn create_test_kms() -> Arc { + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + Arc::new(kms) + } + + fn create_test_manager() -> EncryptionManager { + EncryptionManager::builder() + .kms_client(create_test_kms()) + .table_key_id("master-1") + .build() + } + + #[tokio::test] + async fn test_create_kek() { + let mgr = create_test_manager(); + let kek = mgr.create_kek().await.unwrap(); + + assert!(!kek.key_id().is_empty()); + assert!(!kek.encrypted_key_metadata().is_empty()); + assert_eq!(kek.encrypted_by_id(), Some("master-1")); + assert!(kek.properties().contains_key(KEK_CREATED_AT_PROPERTY)); + } + + #[tokio::test] + async fn test_wrap_unwrap_key_metadata_roundtrip() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + let plaintext = b"some-key-metadata"; + let (entry, new_kek) = mgr.wrap_key_metadata(plaintext).await.unwrap(); + + // First wrap should create a new KEK + assert!(new_kek.is_some()); + let kek = new_kek.unwrap(); + + // Build a manager with the KEK so we can unwrap + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .add_encryption_key(kek.clone()) + .build(); + + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek); + let decrypted = mgr + .unwrap_key_metadata(&entry, &encryption_keys) + .await + .unwrap(); + assert_eq!(decrypted, plaintext); + } + + #[tokio::test] + async fn test_kek_reuse_when_not_expired() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // First wrap creates a new KEK + let (_, new_kek) = mgr.wrap_key_metadata(b"data-1").await.unwrap(); + let kek = new_kek.unwrap(); + + // Build manager with the active KEK (same KMS to unwrap) + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .add_encryption_key(kek.clone()) + .build(); + + // Second wrap should reuse the existing KEK (no new KEK) + let (entry, new_kek2) = mgr.wrap_key_metadata(b"data-2").await.unwrap(); + assert!(new_kek2.is_none()); + assert_eq!(entry.encrypted_by_id(), Some(kek.key_id())); + } + + #[tokio::test] + async fn test_kek_rotation_when_expired() { + let kms = create_test_kms(); + + // Create a KEK with a timestamp 3 years in the past (exceeds 730-day lifespan) + let three_years_ago_ms = Utc::now().timestamp_millis() - (3 * 365 * MILLIS_IN_DAY); + let mut properties = HashMap::new(); + properties.insert( + KEK_CREATED_AT_PROPERTY.to_string(), + three_years_ago_ms.to_string(), + ); + + // Wrap a real KEK so unwrap works if needed + let kek_key = SecureKey::generate(AesKeySize::Bits128); + let wrapped = kms.wrap_key(kek_key.as_bytes(), "master-1").await.unwrap(); + + let old_kek = EncryptedKey::builder() + .key_id("expired-kek") + .encrypted_key_metadata(wrapped) + .encrypted_by_id("master-1") + .properties(properties) + .build(); + + // Build manager with the expired KEK + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .add_encryption_key(old_kek.clone()) + .build(); + + // Wrap should rotate to a new KEK since the existing one is expired + let (_, new_kek) = mgr.wrap_key_metadata(b"data").await.unwrap(); + assert!(new_kek.is_some()); + assert_ne!(new_kek.unwrap().key_id(), old_kek.key_id()); + } + + #[tokio::test] + async fn test_is_kek_expired_no_timestamp() { + let mgr = create_test_manager(); + + // KEK without a created-at timestamp -> treated as expired + let kek = EncryptedKey::builder() + .key_id("no-ts") + .encrypted_key_metadata(vec![0u8; 32]) + .build(); + + assert!(mgr.is_kek_expired(&kek)); + } + + #[tokio::test] + async fn test_decrypt_dek_with_unknown_kek() { + let mgr = create_test_manager(); + + let encryption_keys: HashMap = HashMap::new(); + let result = mgr + .decrypt_dek("nonexistent-kek", &[1, 2, 3], &encryption_keys) + .await; + + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_kek_cache_hit() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // Create KEK (caches the plaintext KEK) + let kek = mgr.create_kek().await.unwrap(); + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .add_encryption_key(kek.clone()) + .build(); + + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek.key_id().to_string(), kek); + + // Wrap key metadata (unwraps KEK -- should hit cache from create_kek) + let (entry, _) = mgr.wrap_key_metadata(b"test-data").await.unwrap(); + + // Unwrap key metadata (unwraps KEK again -- should hit cache) + let decrypted = mgr + .unwrap_key_metadata(&entry, &encryption_keys) + .await + .unwrap(); + assert_eq!(decrypted, b"test-data"); + } + + #[tokio::test] + async fn test_unwrap_fails_when_kek_missing_timestamp() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // Wrap some metadata to get a valid encrypted entry + let (entry, new_kek) = mgr.wrap_key_metadata(b"secret").await.unwrap(); + let kek = new_kek.unwrap(); + + // Re-create the KEK without its KEY_TIMESTAMP property + let kek_no_ts = EncryptedKey::builder() + .key_id(kek.key_id()) + .encrypted_key_metadata(kek.encrypted_key_metadata()) + .encrypted_by_id(kek.encrypted_by_id().unwrap()) + .build(); + + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(kek_no_ts.key_id().to_string(), kek_no_ts); + + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .build(); + + let result = mgr.unwrap_key_metadata(&entry, &encryption_keys).await; + assert!(result.is_err()); + let err = result.unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.to_string().contains(KEK_CREATED_AT_PROPERTY), + "error should mention the missing property: {}", + err + ); + } + + #[tokio::test] + async fn test_unwrap_fails_when_kek_timestamp_tampered() { + let kms = create_test_kms(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::clone(&kms)) + .table_key_id("master-1") + .build(); + + // Wrap metadata normally + let (entry, new_kek) = mgr.wrap_key_metadata(b"secret").await.unwrap(); + let kek = new_kek.unwrap(); + + // Tamper with the KEK timestamp (change the AAD) + let mut tampered_properties = kek.properties().clone(); + tampered_properties.insert(KEK_CREATED_AT_PROPERTY.to_string(), "9999999".to_string()); + + let tampered_kek = EncryptedKey::builder() + .key_id(kek.key_id()) + .encrypted_key_metadata(kek.encrypted_key_metadata()) + .encrypted_by_id(kek.encrypted_by_id().unwrap()) + .properties(tampered_properties) + .build(); + + let mut encryption_keys = HashMap::new(); + encryption_keys.insert(tampered_kek.key_id().to_string(), tampered_kek); + + let mgr = EncryptionManager::builder() + .kms_client(kms) + .table_key_id("master-1") + .build(); + + // Unwrap should fail because the AAD (timestamp) doesn't match what was used to wrap + let result = mgr.unwrap_key_metadata(&entry, &encryption_keys).await; + assert!( + result.is_err(), + "tampered timestamp should cause decryption failure" + ); + } + + #[tokio::test] + async fn test_encrypt_decrypt_roundtrip() { + use crate::io::FileIO; + + let io = FileIO::new_with_memory(); + let path = "memory:///test/encrypt_roundtrip.bin"; + + let kms = MemoryKeyManagementClient::new(); + kms.add_master_key("master-1").unwrap(); + let mgr = EncryptionManager::builder() + .kms_client(Arc::new(kms) as Arc) + .table_key_id("master-1") + .build(); + + let output = io.new_output(path).unwrap(); + let encrypted_output = mgr.encrypt(output); + + let plaintext = b"Hello, encrypted Iceberg round-trip!"; + let serialized_metadata = encrypted_output.key_metadata().encode().unwrap(); + encrypted_output + .write(bytes::Bytes::from(plaintext.to_vec())) + .await + .unwrap(); + + let input = io.new_input(path).unwrap(); + let parsed_metadata = StandardKeyMetadata::decode(&serialized_metadata).unwrap(); + let decrypted_file = EncryptedInputFile::new(input, parsed_metadata); + + let content = decrypted_file.read().await.unwrap(); + assert_eq!(&content[..], plaintext); + } +} diff --git a/crates/iceberg/src/encryption/mod.rs b/crates/iceberg/src/encryption/mod.rs index 773d781d6d..12ee76e5e0 100644 --- a/crates/iceberg/src/encryption/mod.rs +++ b/crates/iceberg/src/encryption/mod.rs @@ -21,15 +21,15 @@ //! for encrypting and decrypting data in Iceberg tables. mod crypto; -mod file_decryptor; -mod file_encryptor; +pub(crate) mod io; pub(crate) mod key_metadata; pub mod kms; +mod manager; mod stream; pub use crypto::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; -pub use file_decryptor::AesGcmFileDecryptor; -pub use file_encryptor::AesGcmFileEncryptor; +pub use io::{EncryptedInputFile, EncryptedOutputFile}; pub use key_metadata::StandardKeyMetadata; pub use kms::{GeneratedKey, KeyManagementClient}; +pub use manager::EncryptionManager; pub use stream::{AesGcmFileRead, AesGcmFileWrite};