diff --git a/crates/async-compression/Cargo.toml b/crates/async-compression/Cargo.toml index a0d8837f..6e279269 100644 --- a/crates/async-compression/Cargo.toml +++ b/crates/async-compression/Cargo.toml @@ -30,6 +30,7 @@ all-algorithms = [ "xz-parallel", "zlib", "zstd", + "snappy", ] # algorithms @@ -46,6 +47,7 @@ xz2 = ["compression-codecs/xz2", "xz"] zlib = ["compression-codecs/zlib"] zstd = ["compression-codecs/zstd"] zstdmt = ["compression-codecs/zstdmt", "zstd"] +snappy = ["compression-codecs/snappy"] [dependencies] @@ -72,6 +74,7 @@ tokio = { version = "1.38.2", default-features = false, features = [ "macros", "rt-multi-thread", "io-std", + "fs" ] } tokio-util = { version = "0.7", default-features = false, features = ["io"] } @@ -83,6 +86,7 @@ lz4 = "1.28.1" liblzma = "0.4.2" zstd-safe = { version = "7", default-features = false } deflate64 = "0.1.5" +snap = "1" [lints] workspace = true @@ -131,6 +135,10 @@ required-features = ["zstd", "tokio"] name = "zstd-window-size" required-features = ["zstd", "tokio"] +[[test]] +name = "snappy" +required-features = ["snappy"] + [[example]] name = "zlib_tokio_write" required-features = ["zlib", "tokio"] diff --git a/crates/async-compression/src/macros.rs b/crates/async-compression/src/macros.rs index 75af23cc..9eb72ea6 100644 --- a/crates/async-compression/src/macros.rs +++ b/crates/async-compression/src/macros.rs @@ -375,5 +375,19 @@ macro_rules! algos { { @dec } ); + algos!(@algo snappy ["snappy"] SnappyDecoder SnappyEncoder <$inner> + { @enc + + pub fn with_quality(inner: $inner, _level: crate::core::Level) -> Self { + Self { + inner: crate::$($mod::)+generic::Encoder::new( + inner, + crate::codecs::SnappyEncoder::new() + ), + } + } + } + { @dec } + ); } } diff --git a/crates/async-compression/tests/proptest.rs b/crates/async-compression/tests/proptest.rs index ba66b1f7..215fc02b 100644 --- a/crates/async-compression/tests/proptest.rs +++ b/crates/async-compression/tests/proptest.rs @@ -142,4 +142,7 @@ mod proptest { #[cfg(feature = "zstd")] tests!(zstd); + + #[cfg(feature = "snappy")] + tests!(snappy); } diff --git a/crates/async-compression/tests/snappy.rs b/crates/async-compression/tests/snappy.rs new file mode 100644 index 00000000..a0d9b8d0 --- /dev/null +++ b/crates/async-compression/tests/snappy.rs @@ -0,0 +1,4 @@ +#[macro_use] +mod utils; + +test_cases!(snappy); diff --git a/crates/async-compression/tests/utils/algos.rs b/crates/async-compression/tests/utils/algos.rs index 634c47dd..4c4f0345 100644 --- a/crates/async-compression/tests/utils/algos.rs +++ b/crates/async-compression/tests/utils/algos.rs @@ -229,6 +229,33 @@ algos! { } } } + + pub mod snappy("snappy", SnappyEncoder, SnappyDecoder) { + pub mod sync { + pub use crate::utils::impls::sync::to_vec; + + pub fn compress(bytes: &[u8]) -> Vec { + if bytes.is_empty() { + return vec![0xff, 0x06, 0x00, 0x00, b's', b'N', b'a', b'P', b'p', b'Y']; + } + + use std::io::Write; + use snap::write::FrameEncoder; + + let mut output = Vec::new(); + { + let mut encoder = FrameEncoder::new(&mut output); + encoder.write_all(bytes).unwrap(); + } + output + } + + pub fn decompress(bytes: &[u8]) -> Vec { + use snap::read::FrameDecoder; + to_vec(FrameDecoder::new(bytes)) + } + } + } } macro_rules! io_algo_parallel { diff --git a/crates/async-compression/tests/utils/test_cases.rs b/crates/async-compression/tests/utils/test_cases.rs index af79c4b1..071f367f 100644 --- a/crates/async-compression/tests/utils/test_cases.rs +++ b/crates/async-compression/tests/utils/test_cases.rs @@ -176,6 +176,7 @@ macro_rules! io_test_cases { #[test] #[ntest::timeout(1000)] + #[cfg(not(feature = "snappy"))] fn trailer() { let mut compressed = sync::compress(&[1, 2, 3, 4, 5, 6]); diff --git a/crates/compression-codecs/Cargo.toml b/crates/compression-codecs/Cargo.toml index bd01d5ba..3638e7ce 100644 --- a/crates/compression-codecs/Cargo.toml +++ b/crates/compression-codecs/Cargo.toml @@ -28,6 +28,7 @@ all-algorithms = [ "zlib", "zstd", "deflate64", + "snappy" ] # algorithms @@ -42,6 +43,7 @@ zlib = ["flate2"] zstd = ["libzstd", "zstd-safe"] zstdmt = ["zstd", "zstd-safe/zstdmt"] deflate64 = ["dep:deflate64"] +snappy = ["snap", "crc32c"] [dependencies] # Workspace dependencies. @@ -56,6 +58,8 @@ lz4 = { version = "1.28.1", optional = true } liblzma = { version = "0.4.5", optional = true } memchr = { version = "2", optional = true } zstd-safe = { version = "7", optional = true, default-features = false } +snap = { version = "1", optional = true, default-features = false } +crc32c = { version = "0.6.8", optional = true, default-features = false } [lints] workspace = true diff --git a/crates/compression-codecs/src/lib.rs b/crates/compression-codecs/src/lib.rs index 32c186a9..ba234bbb 100644 --- a/crates/compression-codecs/src/lib.rs +++ b/crates/compression-codecs/src/lib.rs @@ -22,6 +22,8 @@ pub mod gzip; pub mod lz4; #[cfg(feature = "lzma")] pub mod lzma; +#[cfg(feature = "snappy")] +pub mod snappy; #[cfg(feature = "xz")] pub mod xz; #[cfg(feature = "lzma")] @@ -49,6 +51,8 @@ pub use self::gzip::{GzipDecoder, GzipEncoder}; pub use self::lz4::{Lz4Decoder, Lz4Encoder}; #[cfg(feature = "lzma")] pub use self::lzma::{LzmaDecoder, LzmaEncoder}; +#[cfg(feature = "snappy")] +pub use self::snappy::{SnappyDecoder, SnappyEncoder}; #[cfg(feature = "xz")] pub use self::xz::{XzDecoder, XzEncoder}; #[cfg(feature = "lzma")] diff --git a/crates/compression-codecs/src/snappy/decoder.rs b/crates/compression-codecs/src/snappy/decoder.rs new file mode 100644 index 00000000..85cc5b94 --- /dev/null +++ b/crates/compression-codecs/src/snappy/decoder.rs @@ -0,0 +1,270 @@ +use crate::snappy::{ + crc32c_masked, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE, STREAM_DATA_FRAME_SIZE, +}; +use crate::DecodeV2; +use compression_core::util::{PartialBuffer, WriteBuffer}; +use std::io; +use std::num::NonZeroUsize; + +#[derive(Debug)] +pub struct SnappyDecoder { + state: State, + in_buf: PartialBuffer>, + out_buf: PartialBuffer>, +} + +impl Default for SnappyDecoder { + fn default() -> Self { + Self { + state: State::default(), + in_buf: PartialBuffer::new(Vec::with_capacity(MAX_FRAME_SIZE)), + out_buf: PartialBuffer::new(Vec::with_capacity(MAX_BLOCK_SIZE)), + } + } +} + +impl SnappyDecoder { + pub fn new() -> Self { + Self::default() + } + + fn decode_chunk(&mut self, chunk_type: ChunkType) -> std::io::Result<()> { + let mut expected_sum: PartialBuffer<[u8; 4]> = PartialBuffer::default(); + expected_sum.copy_unwritten_from(&mut self.in_buf); + let expected_sum = u32::from_le_bytes(expected_sum.into_inner()); + + let data = self.in_buf.unwritten(); + + self.out_buf.reset(); + let out_buf = self.out_buf.get_mut(); + out_buf.clear(); + let got_sum = match chunk_type { + ChunkType::Compressed => { + let uncompress_length = snap::raw::decompress_len(data)?; + out_buf.resize(uncompress_length, 0); + let mut decoder = snap::raw::Decoder::new(); + decoder.decompress(data, out_buf)?; + self.state = State::CompressedCopy; + crc32c_masked(out_buf) + } + ChunkType::Uncompressed => { + // Data is uncompressed, so we just need to reset the partial buffer and advance + // past the header + self.in_buf.reset(); + self.in_buf.advance(4); + self.state = State::UncompressedCopy; + crc32c_masked(self.in_buf.unwritten()) + } + _ => unreachable!( + "can only decode compressed or uncompressed chunks, not {:?}", + chunk_type + ), + }; + + if expected_sum != got_sum { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "checksum mismatch", + )); + } + + Ok(()) + } +} + +#[derive(Debug)] +enum State { + StreamIdentifier(PartialBuffer<[u8; 4]>), + ChunkHeader(PartialBuffer<[u8; 4]>), + Skipping(NonZeroUsize), + Buffering { + remaining: NonZeroUsize, + chunk_type: ChunkType, + }, + UncompressedCopy, + CompressedCopy, +} + +impl Default for State { + fn default() -> Self { + State::StreamIdentifier(PartialBuffer::new([0; 4])) + } +} + +impl DecodeV2 for SnappyDecoder { + fn reinit(&mut self) -> std::io::Result<()> { + self.state = State::default(); + self.in_buf.get_mut().clear(); + self.in_buf.reset(); + self.out_buf.reset(); + Ok(()) + } + + fn decode( + &mut self, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + ) -> std::io::Result { + loop { + match &mut self.state { + State::StreamIdentifier(header) => { + header.copy_unwritten_from(input); + if !header.unwritten().is_empty() { + return Ok(false); + } + + let header = FrameHeader::parse(header.written())?; + if matches!(header.chunk_type, ChunkType::Stream) { + let data_frame_length = header.data_frame_length as usize; + if data_frame_length != STREAM_DATA_FRAME_SIZE { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid stream frame data length, expected {}, got {}", + STREAM_DATA_FRAME_SIZE, data_frame_length + ), + )); + } + // We checked above that the stream data frame length is valid and non-zero + self.state = State::Skipping(NonZeroUsize::new(data_frame_length).unwrap()) + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Invalid chunk type, expected Stream, got: {:?}", + header.chunk_type + ), + )); + } + } + State::ChunkHeader(header) => { + header.copy_unwritten_from(input); + if !header.unwritten().is_empty() { + return Ok(false); + } + + let header = FrameHeader::parse(header.written())?; + + let Some(data_frame_length) = + NonZeroUsize::new(header.data_frame_length as usize) + else { + self.state = State::ChunkHeader([0u8; 4].into()); + continue; + }; + + match header.chunk_type { + ChunkType::Stream + | ChunkType::ReservedSkippable(_) + | ChunkType::Padding => self.state = State::Skipping(data_frame_length), + ChunkType::Compressed | ChunkType::Uncompressed => { + let in_buf = &mut self.in_buf; + in_buf.get_mut().clear(); + in_buf.reset(); + self.state = State::Buffering { + remaining: data_frame_length, + chunk_type: header.chunk_type, + } + } + ChunkType::ReservedUnskippable(chunk_type) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!( + "Reserved unskippable chunk type encountered: {}", + chunk_type + ), + )) + } + } + } + State::Skipping(n) => { + let input_len = input.unwritten().len(); + + let n = n.get(); + if input_len < n { + input.advance(input_len); + if let Some(n) = NonZeroUsize::new(n - input_len) { + self.state = State::Skipping(n); + return Ok(false); + } + } + input.advance(n); + self.state = State::ChunkHeader([0u8; 4].into()) + } + State::Buffering { + remaining, + chunk_type, + } => { + let mut rem = remaining.get(); + let input_buf = input.unwritten(); + let boundary = rem.min(input_buf.len()); + let input_buf = &input_buf[..boundary]; + + rem -= input_buf.len(); + + self.in_buf.get_mut().extend_from_slice(input_buf); + input.advance(input_buf.len()); + + if let Some(rem) = NonZeroUsize::new(rem) { + *remaining = rem; + return Ok(false); + } + + // We're done buffering, so let's decode the chunk + let chunk_type = *chunk_type; + self.decode_chunk(chunk_type)?; + } + State::UncompressedCopy => { + let buffer = &mut self.in_buf; + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()) + } else { + return Ok(false); + } + } + State::CompressedCopy => { + let buffer = &mut self.out_buf; + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()) + } else { + return Ok(false); + } + } + } + } + } + + fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { + match &mut self.state { + State::UncompressedCopy => { + let buffer = &mut self.in_buf; + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()); + Ok(true) + } else { + Ok(false) + } + } + State::CompressedCopy => { + let buffer = &mut self.out_buf; + output.copy_unwritten_from(buffer); + if buffer.unwritten().is_empty() { + self.state = State::ChunkHeader([0u8; 4].into()); + Ok(true) + } else { + Ok(false) + } + } + _ => Ok(true), + } + } + + fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> std::io::Result { + match &mut self.state { + State::ChunkHeader(header) if header.unwritten().len() == 4 => Ok(true), + _ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)), + } + } +} diff --git a/crates/compression-codecs/src/snappy/encoder.rs b/crates/compression-codecs/src/snappy/encoder.rs new file mode 100644 index 00000000..00ed0c4a --- /dev/null +++ b/crates/compression-codecs/src/snappy/encoder.rs @@ -0,0 +1,180 @@ +use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, MAX_FRAME_SIZE, STREAM_FRAME}; +use crate::EncodeV2; +use compression_core::util::{PartialBuffer, WriteBuffer}; + +const MAX_BLOCK_SIZE: usize = 1 << 16; + +#[derive(Debug)] +pub struct SnappyEncoder { + state: State, + in_buf: PartialBuffer>, + out_buf: PartialBuffer>, +} + +impl Default for SnappyEncoder { + fn default() -> Self { + Self { + state: State::InitStream(PartialBuffer::new(STREAM_FRAME)), + in_buf: PartialBuffer::new(Vec::with_capacity(MAX_BLOCK_SIZE)), + out_buf: PartialBuffer::new(Vec::with_capacity(MAX_FRAME_SIZE)), + } + } +} + +impl SnappyEncoder { + pub fn new() -> Self { + Self::default() + } + + fn compress_frame(&mut self) -> std::io::Result<()> { + let in_buffer = &self.in_buf.unwritten(); + let checksum = crc32c_masked(in_buffer); + + self.out_buf.reset(); + let out_buf = self.out_buf.get_mut(); + out_buf.clear(); + let max_compress_size = snap::raw::max_compress_len(in_buffer.len()); + out_buf.resize(max_compress_size + 8, 0); + + let mut encoder = snap::raw::Encoder::new(); + let compress_data = encoder.compress(in_buffer, &mut out_buf[8..])?; + + let (chunk_type, chunk_len) = if compress_data >= in_buffer.len() - (in_buffer.len() / 8) { + (ChunkType::Uncompressed, in_buffer.len()) + } else { + out_buf.truncate(compress_data); + (ChunkType::Compressed, out_buf.len()) + }; + + // We add 4 because the length includes the 4 bytes of the checksum. + let chunk_len = chunk_len + 4; + let header = FrameHeader { + chunk_type, + data_frame_length: chunk_len as u64, + }; + + let mut raw_chunk_header = [0u8; 8]; + let raw_frame_header: [u8; 4] = header.into(); + let raw_checksum: [u8; 4] = checksum.to_le_bytes(); + + raw_chunk_header[0..4].copy_from_slice(&raw_frame_header); + raw_chunk_header[4..8].copy_from_slice(&raw_checksum); + + match chunk_type { + ChunkType::Compressed => self.state = State::CompressCopy(raw_chunk_header.into()), + ChunkType::Uncompressed => self.state = State::UncompressCopy(raw_chunk_header.into()), + _ => unreachable!(), + } + + Ok(()) + } +} + +fn write( + header: &mut PartialBuffer<[u8; 8]>, + input: &mut PartialBuffer>, + output: &mut WriteBuffer<'_>, +) -> bool { + if !header.unwritten().is_empty() { + output.copy_unwritten_from(header); + if output.has_no_spare_space() { + return false; + } + } + + if !input.unwritten().is_empty() { + output.copy_unwritten_from(input); + false + } else { + true + } +} + +#[derive(Debug)] +enum State { + InitStream(PartialBuffer<&'static [u8]>), + Buffering, + UncompressCopy(PartialBuffer<[u8; 8]>), + CompressCopy(PartialBuffer<[u8; 8]>), +} + +impl EncodeV2 for SnappyEncoder { + fn encode( + &mut self, + input: &mut PartialBuffer<&[u8]>, + output: &mut WriteBuffer<'_>, + ) -> std::io::Result<()> { + loop { + match &mut self.state { + State::InitStream(buffer) => { + if !buffer.unwritten().is_empty() { + output.copy_unwritten_from(buffer); + if output.has_no_spare_space() { + return Ok(()); + } + } + self.state = State::Buffering + } + State::Buffering => { + let buffer = self.in_buf.get_mut(); + let input_buf = input.unwritten(); + let available = MAX_BLOCK_SIZE - buffer.len(); + let boundary = available.min(input_buf.len()); + let input_buf = &input_buf[..boundary]; + + buffer.extend_from_slice(input_buf); + input.advance(input_buf.len()); + + if buffer.len() < MAX_BLOCK_SIZE { + return Ok(()); + } + + self.compress_frame()?; + } + State::UncompressCopy(header) => { + if !write(header, &mut self.in_buf, output) { + return Ok(()); + } + self.in_buf.get_mut().clear(); + self.in_buf.reset(); + self.state = State::Buffering + } + State::CompressCopy(header) => { + if !write(header, &mut self.out_buf, output) { + return Ok(()); + } + self.in_buf.get_mut().clear(); + self.in_buf.reset(); + self.state = State::Buffering + } + } + } + } + + fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { + loop { + match &mut self.state { + State::InitStream(buffer) => { + if !buffer.unwritten().is_empty() { + output.copy_unwritten_from(buffer); + if output.has_no_spare_space() { + return Ok(false); + } + } + self.state = State::Buffering + } + State::Buffering => { + self.compress_frame()?; + } + State::UncompressCopy(header) => { + return Ok(write(header, &mut self.in_buf, output)) + } + State::CompressCopy(header) => return Ok(write(header, &mut self.out_buf, output)), + } + } + } + + fn finish(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result { + self.flush(output) + } +} diff --git a/crates/compression-codecs/src/snappy/mod.rs b/crates/compression-codecs/src/snappy/mod.rs new file mode 100644 index 00000000..4e63b5f4 --- /dev/null +++ b/crates/compression-codecs/src/snappy/mod.rs @@ -0,0 +1,105 @@ +mod decoder; +mod encoder; + +pub use self::decoder::SnappyDecoder; +pub use self::encoder::SnappyEncoder; + +use std::io; + +const STREAM_FRAME: &[u8] = b"\xFF\x06\x00\x00sNaPpY"; +const STREAM_DATA_FRAME_SIZE: usize = 6; +const CHUNK_HEADER_SIZE: usize = 4; +const CRC_SIZE: usize = 4; +const MAX_COMPRESSED_SIZE: usize = 76490; + +const MAX_FRAME_SIZE: usize = + STREAM_FRAME.len() + CHUNK_HEADER_SIZE + CRC_SIZE + MAX_COMPRESSED_SIZE; + +const MAX_BLOCK_SIZE: usize = 65536; + +#[derive(Debug, Copy, Clone)] +struct FrameHeader { + chunk_type: ChunkType, + data_frame_length: u64, +} + +#[derive(Debug, Copy, Clone)] +enum ChunkType { + Stream, + Compressed, + Uncompressed, + Padding, + ReservedUnskippable(u8), + ReservedSkippable(u8), +} + +impl From for ChunkType { + fn from(value: u8) -> Self { + match value { + 0xFF => Self::Stream, + 0x00 => Self::Compressed, + 0x01 => Self::Uncompressed, + 0xFE => Self::Padding, + 0x02..=0x7F => Self::ReservedUnskippable(value), + 0x80..=0xFD => Self::ReservedSkippable(value), + } + } +} + +impl From for u8 { + fn from(value: ChunkType) -> Self { + match value { + ChunkType::Stream => 0xFF, + ChunkType::Compressed => 0x00, + ChunkType::Uncompressed => 0x01, + ChunkType::Padding => 0xFE, + ChunkType::ReservedUnskippable(chunk_type) => chunk_type, + ChunkType::ReservedSkippable(chunk_type) => chunk_type, + } + } +} + +impl FrameHeader { + fn parse(input: &[u8]) -> io::Result { + let (header_part, _): (&[u8; 4], _) = input.split_first_chunk().ok_or(io::Error::new( + io::ErrorKind::UnexpectedEof, + "Need a least 4 bytes to parse the frame's header", + ))?; + + let chunk_type = ChunkType::from(header_part[0]); + // header_part is guaranteed to have at least 4 bytes due to split_first_chunk + let length_part: &[u8; 3] = header_part[1..].first_chunk().unwrap(); + + let length = read_u24_le(length_part) as u64; + + Ok(Self { + chunk_type, + data_frame_length: length, + }) + } +} + +impl From for [u8; 4] { + fn from(value: FrameHeader) -> Self { + let frame_length = value.data_frame_length as u32; + + let mut header = [0u8; 4]; + header[0] = u8::from(value.chunk_type); + // We're writing a little endian u24 from an u32 by removing the latest significant byte + header[1..4].copy_from_slice(&frame_length.to_le_bytes()[..3]); + header + } +} + +fn read_u24_le(slice: &[u8; 3]) -> u32 { + slice[0] as u32 | (slice[1] as u32) << 8 | (slice[2] as u32) << 16 +} + +fn crc32c_masked(input: &[u8]) -> u32 { + let sum = crc32c::crc32c(input); + mask_crc(sum) +} + +fn mask_crc(crc: u32) -> u32 { + (crc.wrapping_shr(15) | crc.wrapping_shl(17)).wrapping_add(0xA282EAD8) +}