-
Notifications
You must be signed in to change notification settings - Fork 464
fix(puffin): implement LZ4 footer compression #2438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ use std::io::{Read, Write}; | |
| use flate2::Compression; | ||
| use flate2::read::GzDecoder; | ||
| use flate2::write::GzEncoder; | ||
| use lz4_flex::frame::{FrameDecoder, FrameEncoder, FrameInfo}; | ||
| use serde::{Deserialize, Deserializer, Serialize, Serializer}; | ||
|
|
||
| use crate::{Error, ErrorKind, Result}; | ||
|
|
@@ -118,10 +119,12 @@ impl CompressionCodec { | |
| pub(crate) fn decompress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> { | ||
| match self { | ||
| CompressionCodec::None => Ok(bytes), | ||
| CompressionCodec::Lz4 => Err(Error::new( | ||
| ErrorKind::FeatureUnsupported, | ||
| "LZ4 decompression is not supported currently", | ||
| )), | ||
| CompressionCodec::Lz4 => { | ||
| let mut decoder = FrameDecoder::new(&bytes[..]); | ||
| let mut decompressed = Vec::new(); | ||
| decoder.read_to_end(&mut decompressed)?; | ||
| Ok(decompressed) | ||
| } | ||
| CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?), | ||
| CompressionCodec::Gzip(_) => { | ||
| let mut decoder = GzDecoder::new(&bytes[..]); | ||
|
|
@@ -139,10 +142,17 @@ impl CompressionCodec { | |
| pub(crate) fn compress(&self, bytes: Vec<u8>) -> Result<Vec<u8>> { | ||
| match self { | ||
| CompressionCodec::None => Ok(bytes), | ||
| CompressionCodec::Lz4 => Err(Error::new( | ||
| ErrorKind::FeatureUnsupported, | ||
| "LZ4 compression is not supported currently", | ||
| )), | ||
| CompressionCodec::Lz4 => { | ||
| // The Puffin spec requires "LZ4 single compression frame with content size | ||
| // present" for footer payloads, so we set content_size on the frame header. | ||
| // See https://iceberg.apache.org/puffin-spec/#footer-payload | ||
| let frame_info = FrameInfo::new().content_size(Some(bytes.len() as u64)); | ||
| let mut encoder = FrameEncoder::with_frame_info(frame_info, Vec::new()); | ||
| encoder.write_all(&bytes)?; | ||
| encoder.finish().map_err(|e| { | ||
| Error::new(ErrorKind::Unexpected, "Failed to finish LZ4 frame").with_source(e) | ||
| }) | ||
| } | ||
| CompressionCodec::Zstd(level) => { | ||
| let writer = Vec::<u8>::new(); | ||
| let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?; | ||
|
|
@@ -173,7 +183,10 @@ impl CompressionCodec { | |
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns an error for Lz4 and Zstd as they are not fully supported. | ||
| /// Returns an error for codecs without a canonical file-extension convention | ||
| /// (Lz4, Zstd, Snappy). LZ4 is fully supported for compression and decompression, | ||
| /// but is used in framed form (e.g., inside Puffin footers) where no separate | ||
| /// file suffix is required. | ||
| pub fn suffix(&self) -> Result<&'static str> { | ||
| match self { | ||
| CompressionCodec::None => Ok(""), | ||
|
|
@@ -208,6 +221,7 @@ mod tests { | |
| let bytes_vec = [0_u8; 100].to_vec(); | ||
|
|
||
| let compression_codecs = [ | ||
| CompressionCodec::Lz4, | ||
| CompressionCodec::zstd_default(), | ||
| CompressionCodec::gzip_default(), | ||
| ]; | ||
|
|
@@ -220,25 +234,46 @@ mod tests { | |
| } | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_compression_codec_lz4_roundtrip() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this test give us much over the |
||
| let codec = CompressionCodec::Lz4; | ||
|
|
||
| // Empty input must round-trip cleanly. | ||
| let empty: Vec<u8> = vec![]; | ||
| let compressed_empty = codec.compress(empty.clone()).unwrap(); | ||
| assert_eq!(codec.decompress(compressed_empty).unwrap(), empty); | ||
|
|
||
| // Mixed-byte payload (less compressible than all-zeros) round-trips. | ||
| let payload: Vec<u8> = (0..10_000).map(|i| (i % 251) as u8).collect(); | ||
| let compressed = codec.compress(payload.clone()).unwrap(); | ||
| assert_eq!(codec.decompress(compressed).unwrap(), payload); | ||
|
|
||
| // Frame must begin with the LZ4 frame magic number 0x184D2204 (little-endian) | ||
| // per https://github.com/lz4/lz4/blob/dev/doc/lz4_Frame_format.md. | ||
| let highly_compressible = vec![0u8; 10_000]; | ||
| let compressed = codec.compress(highly_compressible).unwrap(); | ||
| assert_eq!(&compressed[..4], &[0x04, 0x22, 0x4D, 0x18]); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_compression_codec_unsupported() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: maybe just rename this test to something like |
||
| let unsupported_codecs = [ | ||
| (CompressionCodec::Lz4, "LZ4"), | ||
| (CompressionCodec::Snappy, "Snappy"), | ||
| ]; | ||
| let bytes_vec = [0_u8; 100].to_vec(); | ||
|
|
||
| for (codec, name) in unsupported_codecs { | ||
| assert_eq!( | ||
| codec.compress(bytes_vec.clone()).unwrap_err().to_string(), | ||
| format!("FeatureUnsupported => {name} compression is not supported currently"), | ||
| ); | ||
| assert_eq!( | ||
| CompressionCodec::Snappy | ||
| .compress(bytes_vec.clone()) | ||
| .unwrap_err() | ||
| .to_string(), | ||
| "FeatureUnsupported => Snappy compression is not supported currently", | ||
| ); | ||
|
|
||
| assert_eq!( | ||
| codec.decompress(bytes_vec.clone()).unwrap_err().to_string(), | ||
| format!("FeatureUnsupported => {name} decompression is not supported currently"), | ||
| ); | ||
| } | ||
| assert_eq!( | ||
| CompressionCodec::Snappy | ||
| .decompress(bytes_vec) | ||
| .unwrap_err() | ||
| .to_string(), | ||
| "FeatureUnsupported => Snappy decompression is not supported currently", | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -273,13 +273,65 @@ mod tests { | |
| let blobs = vec![blob_0(), blob_1()]; | ||
| let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4); | ||
|
|
||
| assert_eq!( | ||
| write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) | ||
| .await | ||
| .unwrap_err() | ||
| .to_string(), | ||
| "FeatureUnsupported => LZ4 compression is not supported currently" | ||
| ); | ||
| let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) | ||
| .await | ||
| .unwrap() | ||
| .to_input_file(); | ||
|
|
||
| // Blob round-trip must yield the original bytes after LZ4 framed decompression. | ||
| assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs); | ||
| } | ||
|
|
||
| /// Regression for https://github.com/apache/iceberg-rust/issues/2419 — | ||
| /// the PuffinWriter previously claimed LZ4 compression on the footer (by setting | ||
| /// the FooterPayloadCompressed flag) but wrote the raw uncompressed JSON, which | ||
| /// produced unreadable files. The writer now actually LZ4-encodes the footer when | ||
| /// compress_footer=true, and the reader round-trips it. | ||
|
Comment on lines
+285
to
+289
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this actually true? Or did it just fail before? I also personally wouldn't describe this as a regression test since this feature just wasn't supported before? WDYT? |
||
| #[tokio::test] | ||
| async fn test_compress_footer_lz4_round_trips() { | ||
| let temp_dir = TempDir::new().unwrap(); | ||
| let file_io = FileIO::new_with_fs(); | ||
| let path = temp_dir.path().join("compressed_footer.bin"); | ||
| let output_file = file_io.new_output(path.to_str().unwrap()).unwrap(); | ||
|
|
||
| // compress_footer=true sets the footer codec to LZ4. | ||
| let mut writer = PuffinWriter::new(&output_file, file_properties(), true) | ||
| .await | ||
| .unwrap(); | ||
| writer.add(blob_0(), CompressionCodec::None).await.unwrap(); | ||
| writer.close().await.unwrap(); | ||
|
|
||
| // Reader must be able to LZ4-decompress the footer and recover both the | ||
| // file metadata and the blob payload. | ||
| let input_file = output_file.to_input_file(); | ||
| let metadata = FileMetadata::read(&input_file).await.unwrap(); | ||
| assert_eq!(metadata.properties, file_properties()); | ||
| assert_eq!(metadata.blobs.len(), 1); | ||
| assert_eq!(read_all_blobs_from_puffin_file(input_file).await, vec![ | ||
| blob_0() | ||
| ]); | ||
| } | ||
|
|
||
| /// Direct adaptation of the reproducer from | ||
| /// https://github.com/apache/iceberg-rust/issues/2419 — close must succeed when | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at the rest of the codebase we don't seem to typically reference issues. I think the test name is descriptive enough? WDYT? |
||
| /// compress_footer=true even with no blobs written. | ||
| #[tokio::test] | ||
| async fn test_compress_empty_footer_lz4_succeeds() { | ||
| let temp_dir = TempDir::new().unwrap(); | ||
| let file_io = FileIO::new_with_fs(); | ||
| let path = temp_dir.path().join("compressed_empty_footer.bin"); | ||
| let output_file = file_io.new_output(path.to_str().unwrap()).unwrap(); | ||
|
|
||
| let writer = PuffinWriter::new(&output_file, HashMap::new(), true) | ||
| .await | ||
| .unwrap(); | ||
| writer.close().await.unwrap(); | ||
|
|
||
| // The compressed empty footer must still parse back to an empty FileMetadata. | ||
| let input_file = output_file.to_input_file(); | ||
| let metadata = FileMetadata::read(&input_file).await.unwrap(); | ||
| assert!(metadata.blobs.is_empty()); | ||
| assert!(metadata.properties.is_empty()); | ||
| } | ||
|
|
||
| async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MIT licensed https://github.com/PSeitz/lz4_flex/blob/main/LICENSE which is good