Skip to content
8 changes: 8 additions & 0 deletions crates/async-compression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ all-algorithms = [
"xz-parallel",
"zlib",
"zstd",
"snappy",
]

# algorithms
Expand All @@ -46,6 +47,7 @@ xz2 = ["compression-codecs/xz2", "xz"]
zlib = ["compression-codecs/zlib"]
zstd = ["compression-codecs/zstd"]
zstdmt = ["compression-codecs/zstdmt", "zstd"]
snappy = ["compression-codecs/snappy"]


[dependencies]
Expand All @@ -72,6 +74,7 @@ tokio = { version = "1.38.2", default-features = false, features = [
"macros",
"rt-multi-thread",
"io-std",
"fs"
] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }

Expand All @@ -83,6 +86,7 @@ lz4 = "1.28.1"
liblzma = "0.4.2"
zstd-safe = { version = "7", default-features = false }
deflate64 = "0.1.5"
snap = "1"

[lints]
workspace = true
Expand Down Expand Up @@ -131,6 +135,10 @@ required-features = ["zstd", "tokio"]
name = "zstd-window-size"
required-features = ["zstd", "tokio"]

[[test]]
name = "snappy"
required-features = ["snappy"]

[[example]]
name = "zlib_tokio_write"
required-features = ["zlib", "tokio"]
Expand Down
14 changes: 14 additions & 0 deletions crates/async-compression/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,5 +375,19 @@ macro_rules! algos {
{ @dec }
);

algos!(@algo snappy ["snappy"] SnappyDecoder SnappyEncoder <$inner>
{ @enc

pub fn with_quality(inner: $inner, _level: crate::core::Level) -> Self {
Self {
inner: crate::$($mod::)+generic::Encoder::new(
inner,
crate::codecs::SnappyEncoder::new()
),
}
}
}
{ @dec }
);
}
}
3 changes: 3 additions & 0 deletions crates/async-compression/tests/proptest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ mod proptest {

#[cfg(feature = "zstd")]
tests!(zstd);

#[cfg(feature = "snappy")]
tests!(snappy);
}
4 changes: 4 additions & 0 deletions crates/async-compression/tests/snappy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[macro_use]
mod utils;

test_cases!(snappy);
27 changes: 27 additions & 0 deletions crates/async-compression/tests/utils/algos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ algos! {
}
}
}

pub mod snappy("snappy", SnappyEncoder, SnappyDecoder) {
pub mod sync {
pub use crate::utils::impls::sync::to_vec;

pub fn compress(bytes: &[u8]) -> Vec<u8> {
if bytes.is_empty() {
return vec![0xff, 0x06, 0x00, 0x00, b's', b'N', b'a', b'P', b'p', b'Y'];
}

use std::io::Write;
use snap::write::FrameEncoder;

let mut output = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut output);
encoder.write_all(bytes).unwrap();
}
output
}

pub fn decompress(bytes: &[u8]) -> Vec<u8> {
use snap::read::FrameDecoder;
to_vec(FrameDecoder::new(bytes))
}
}
}
}

macro_rules! io_algo_parallel {
Expand Down
1 change: 1 addition & 0 deletions crates/async-compression/tests/utils/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ macro_rules! io_test_cases {

#[test]
#[ntest::timeout(1000)]
#[cfg(not(feature = "snappy"))]
fn trailer() {
let mut compressed = sync::compress(&[1, 2, 3, 4, 5, 6]);

Expand Down
4 changes: 4 additions & 0 deletions crates/compression-codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ all-algorithms = [
"zlib",
"zstd",
"deflate64",
"snappy"
]

# algorithms
Expand All @@ -42,6 +43,7 @@ zlib = ["flate2"]
zstd = ["libzstd", "zstd-safe"]
zstdmt = ["zstd", "zstd-safe/zstdmt"]
deflate64 = ["dep:deflate64"]
snappy = ["snap", "crc32c"]

[dependencies]
# Workspace dependencies.
Expand All @@ -56,6 +58,8 @@ lz4 = { version = "1.28.1", optional = true }
liblzma = { version = "0.4.5", optional = true }
memchr = { version = "2", optional = true }
zstd-safe = { version = "7", optional = true, default-features = false }
snap = { version = "1", optional = true, default-features = false }
crc32c = { version = "0.6.8", optional = true, default-features = false }

[lints]
workspace = true
4 changes: 4 additions & 0 deletions crates/compression-codecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod gzip;
pub mod lz4;
#[cfg(feature = "lzma")]
pub mod lzma;
#[cfg(feature = "snappy")]
pub mod snappy;
#[cfg(feature = "xz")]
pub mod xz;
#[cfg(feature = "lzma")]
Expand Down Expand Up @@ -49,6 +51,8 @@ pub use self::gzip::{GzipDecoder, GzipEncoder};
pub use self::lz4::{Lz4Decoder, Lz4Encoder};
#[cfg(feature = "lzma")]
pub use self::lzma::{LzmaDecoder, LzmaEncoder};
#[cfg(feature = "snappy")]
pub use self::snappy::{SnappyDecoder, SnappyEncoder};
#[cfg(feature = "xz")]
pub use self::xz::{XzDecoder, XzEncoder};
#[cfg(feature = "lzma")]
Expand Down
216 changes: 216 additions & 0 deletions crates/compression-codecs/src/snappy/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
use crate::snappy::{crc32c_masked, ChunkType, FrameHeader, MAX_BLOCK_SIZE, MAX_FRAME_SIZE};
use crate::DecodeV2;
use compression_core::util::{PartialBuffer, WriteBuffer};
use std::convert::TryInto;
use std::io;

#[derive(Debug)]
pub struct SnappyDecoder {
state: State,
in_buf: Vec<u8>,
out_buf: PartialBuffer<Vec<u8>>,
Comment thread
NobodyXu marked this conversation as resolved.
}

impl Default for SnappyDecoder {
fn default() -> Self {
Self {
state: State::default(),
in_buf: Vec::with_capacity(MAX_FRAME_SIZE),
out_buf: PartialBuffer::new(Vec::with_capacity(MAX_BLOCK_SIZE)),
}
}
}

impl SnappyDecoder {
pub fn new() -> Self {
Self::default()
}

fn decode_chunk(&mut self, chunk_type: ChunkType) -> std::io::Result<()> {
let (expected_sum, data) = self.in_buf.split_at(4);

let expected_sum: [u8; 4] = expected_sum
.try_into()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid checksum length"))?;
let expected_sum = u32::from_le_bytes(expected_sum);

self.out_buf.reset();
let out_buf = self.out_buf.get_mut();
out_buf.clear();
match chunk_type {
ChunkType::Compressed => {
let uncompress_length = snap::raw::decompress_len(data)?;
out_buf.resize(uncompress_length, 0);
let mut decoder = snap::raw::Decoder::new();
decoder.decompress(data, out_buf)?;
Comment on lines +44 to +46
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a shame that snap does not support writing to uninitialised buffer, zeroing the whole vec would indeed take quite some time

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to support it. It could be a good challenge to tackle :).

}
ChunkType::Uncompressed => out_buf.extend_from_slice(data),
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
_ => unreachable!(
"can only decode compressed or uncompressed chunks, not {:?}",
chunk_type
),
};

let got_sum = crc32c_masked(out_buf);
if expected_sum != got_sum {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"checksum mismatch",
));
}

Ok(())
}
}

#[derive(Debug)]
enum State {
StreamIdentifier(PartialBuffer<[u8; 4]>),
ChunkHeader(PartialBuffer<[u8; 4]>),
Skipping(usize),
Buffering {
remaining: usize,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Skipping(usize),
Buffering {
remaining: usize,
Skipping(NonZeroUize),
Buffering {
remaining: NonZeroUsize,

Using NonZeroUsize would make it clear the invariant of the state

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can do that because remaining and skipping can be 0.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the neat part: if it's 0, then we just skip that stage and go to the next stage

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! That's better that way.

chunk_type: ChunkType,
},
Sending,
}

impl Default for State {
fn default() -> Self {
State::StreamIdentifier(PartialBuffer::new([0; 4]))
}
}

impl DecodeV2 for SnappyDecoder {
fn reinit(&mut self) -> std::io::Result<()> {
*self = Self::new();
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
Ok(())
}

fn decode(
&mut self,
input: &mut PartialBuffer<&[u8]>,
output: &mut WriteBuffer<'_>,
) -> std::io::Result<bool> {
loop {
match &mut self.state {
State::StreamIdentifier(header) => {
header.copy_unwritten_from(input);
if !header.unwritten().is_empty() {
return Ok(false);
}

let header = FrameHeader::parse(header.written())?;
if let ChunkType::Stream = header.chunk_type {
self.state = State::Skipping(header.data_frame_length as usize)
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid chunk type, expected Stream, got: {:?}",
header.chunk_type
),
));
}
}
State::ChunkHeader(header) => {
header.copy_unwritten_from(input);
if !header.unwritten().is_empty() {
return Ok(false);
}

let header = FrameHeader::parse(header.written())?;

let data_frame_length = header.data_frame_length as usize;

match header.chunk_type {
ChunkType::Stream
| ChunkType::ReservedSkippable(_)
| ChunkType::Padding => self.state = State::Skipping(data_frame_length),
ChunkType::Compressed | ChunkType::Uncompressed => {
let in_buf = &mut self.in_buf;
in_buf.clear();
self.state = State::Buffering {
remaining: data_frame_length,
chunk_type: header.chunk_type,
}
}
ChunkType::ReservedUnskippable(chunk_type) => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Reserved unskippable chunk type encountered: {}",
chunk_type
),
))
}
}
}
State::Skipping(n) => {
let input_len = input.unwritten().len();
if input_len < *n {
input.advance(input_len);
*n -= input_len;
return Ok(false);
}
input.advance(*n);
self.state = State::ChunkHeader([0u8; 4].into())
}
State::Buffering {
remaining,
chunk_type,
} => {
let input_buf = input.unwritten();
let boundary = (*remaining).min(input_buf.len());
let input_buf = &input_buf[..boundary];

*remaining -= input_buf.len();

self.in_buf.extend_from_slice(input_buf);
input.advance(input_buf.len());

if *remaining != 0 {
return Ok(false);
}

// We're done buffering, so let's decode the chunk
let chunk_type = *chunk_type;
self.decode_chunk(chunk_type)?;
self.state = State::Sending
}
State::Sending => {
let buffer = &mut self.out_buf;
output.copy_unwritten_from(buffer);
if buffer.unwritten().is_empty() {
self.state = State::ChunkHeader([0u8; 4].into())
} else {
return Ok(false);
}
}
}
}
}

fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result<bool> {
match &mut self.state {
State::Sending => {
let buffer = &mut self.out_buf;
output.copy_unwritten_from(buffer);
if buffer.unwritten().is_empty() {
self.state = State::ChunkHeader([0u8; 4].into());
Ok(true)
} else {
Ok(false)
}
}
_ => Ok(true),
}
}

fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> std::io::Result<bool> {
match &mut self.state {
State::ChunkHeader(header) if header.unwritten().len() == 4 => Ok(true),
_ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
}
}
}
Loading