Skip to content
8 changes: 8 additions & 0 deletions crates/async-compression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ all-algorithms = [
"xz-parallel",
"zlib",
"zstd",
"snappy",
]

# algorithms
Expand All @@ -46,6 +47,7 @@ xz2 = ["compression-codecs/xz2", "xz"]
zlib = ["compression-codecs/zlib"]
zstd = ["compression-codecs/zstd"]
zstdmt = ["compression-codecs/zstdmt", "zstd"]
snappy = ["compression-codecs/snappy"]


[dependencies]
Expand All @@ -72,6 +74,7 @@ tokio = { version = "1.38.2", default-features = false, features = [
"macros",
"rt-multi-thread",
"io-std",
"fs"
] }
tokio-util = { version = "0.7", default-features = false, features = ["io"] }

Expand All @@ -83,6 +86,7 @@ lz4 = "1.28.1"
liblzma = "0.4.2"
zstd-safe = { version = "7", default-features = false }
deflate64 = "0.1.5"
snap = "1"

[lints]
workspace = true
Expand Down Expand Up @@ -131,6 +135,10 @@ required-features = ["zstd", "tokio"]
name = "zstd-window-size"
required-features = ["zstd", "tokio"]

[[test]]
name = "snappy"
required-features = ["snappy"]

[[example]]
name = "zlib_tokio_write"
required-features = ["zlib", "tokio"]
Expand Down
14 changes: 14 additions & 0 deletions crates/async-compression/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,5 +375,19 @@ macro_rules! algos {
{ @dec }
);

algos!(@algo snappy ["snappy"] SnappyDecoder SnappyEncoder <$inner>
{ @enc

pub fn with_quality(inner: $inner, _level: crate::core::Level) -> Self {
Self {
inner: crate::$($mod::)+generic::Encoder::new(
inner,
crate::codecs::SnappyEncoder::new()
),
}
}
}
{ @dec }
);
}
}
3 changes: 3 additions & 0 deletions crates/async-compression/tests/proptest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,7 @@ mod proptest {

#[cfg(feature = "zstd")]
tests!(zstd);

#[cfg(feature = "snappy")]
tests!(snappy);
}
4 changes: 4 additions & 0 deletions crates/async-compression/tests/snappy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[macro_use]
mod utils;

test_cases!(snappy);
27 changes: 27 additions & 0 deletions crates/async-compression/tests/utils/algos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,33 @@ algos! {
}
}
}

pub mod snappy("snappy", SnappyEncoder, SnappyDecoder) {
pub mod sync {
pub use crate::utils::impls::sync::to_vec;

pub fn compress(bytes: &[u8]) -> Vec<u8> {
if bytes.is_empty() {
return vec![0xff, 0x06, 0x00, 0x00, b's', b'N', b'a', b'P', b'p', b'Y'];
}

use std::io::Write;
use snap::write::FrameEncoder;

let mut output = Vec::new();
{
let mut encoder = FrameEncoder::new(&mut output);
encoder.write_all(bytes).unwrap();
}
output
}

pub fn decompress(bytes: &[u8]) -> Vec<u8> {
use snap::read::FrameDecoder;
to_vec(FrameDecoder::new(bytes))
}
}
}
}

macro_rules! io_algo_parallel {
Expand Down
4 changes: 4 additions & 0 deletions crates/compression-codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ all-algorithms = [
"zlib",
"zstd",
"deflate64",
"snappy"
]

# algorithms
Expand All @@ -42,6 +43,7 @@ zlib = ["flate2"]
zstd = ["libzstd", "zstd-safe"]
zstdmt = ["zstd", "zstd-safe/zstdmt"]
deflate64 = ["dep:deflate64"]
snappy = ["snap", "crc32c"]

[dependencies]
# Workspace dependencies.
Expand All @@ -56,6 +58,8 @@ lz4 = { version = "1.28.1", optional = true }
liblzma = { version = "0.4.5", optional = true }
memchr = { version = "2", optional = true }
zstd-safe = { version = "7", optional = true, default-features = false }
snap = { version = "1", optional = true, default-features = false }
crc32c = { version = "0.6.8", optional = true, default-features = false }

[lints]
workspace = true
4 changes: 4 additions & 0 deletions crates/compression-codecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ pub mod gzip;
pub mod lz4;
#[cfg(feature = "lzma")]
pub mod lzma;
#[cfg(feature = "snappy")]
pub mod snappy;
#[cfg(feature = "xz")]
pub mod xz;
#[cfg(feature = "lzma")]
Expand Down Expand Up @@ -49,6 +51,8 @@ pub use self::gzip::{GzipDecoder, GzipEncoder};
pub use self::lz4::{Lz4Decoder, Lz4Encoder};
#[cfg(feature = "lzma")]
pub use self::lzma::{LzmaDecoder, LzmaEncoder};
#[cfg(feature = "snappy")]
pub use self::snappy::{SnappyDecoder, SnappyEncoder};
#[cfg(feature = "xz")]
pub use self::xz::{XzDecoder, XzEncoder};
#[cfg(feature = "lzma")]
Expand Down
203 changes: 203 additions & 0 deletions crates/compression-codecs/src/snappy/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
use crate::snappy::{mask_crc, ChunkType, FrameHeader};
use crate::DecodeV2;
use compression_core::util::{PartialBuffer, WriteBuffer};
use std::convert::TryInto;
use std::{io, mem};

#[derive(Debug, Default)]
pub struct SnappyDecoder {
state: State,
}

impl SnappyDecoder {
pub fn new() -> Self {
Self::default()
}
}

fn decode_chunk(chunk_type: ChunkType, mut buffer: Vec<u8>) -> std::io::Result<Vec<u8>> {
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
let data = buffer.split_off(4);
Comment thread
NobodyXu marked this conversation as resolved.
Outdated

let expected_sum: [u8; 4] = buffer
.try_into()
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "invalid checksum length"))?;
let expected_sum = u32::from_le_bytes(expected_sum);

let output = match chunk_type {
ChunkType::Compressed => {
let uncompress_length = snap::raw::decompress_len(&data)?;
let mut out_buf = vec![0; uncompress_length];
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
let mut decoder = snap::raw::Decoder::new();
decoder.decompress(&data, &mut out_buf)?;
out_buf
}
ChunkType::Uncompressed => data,
_ => unreachable!(
"can only decode compressed or uncompressed chunks, not {:?}",
chunk_type
),
};

let got_sum = crc32c::crc32c(&output);
let got_sum = mask_crc(got_sum);
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
if expected_sum != got_sum {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"checksum mismatch",
));
}

Ok(output)
}

#[derive(Debug)]
enum State {
StreamIdentifier(PartialBuffer<[u8; 4]>),
ChunkHeader(PartialBuffer<[u8; 4]>),
Skipping(usize),
Buffering {
remaining: usize,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Skipping(usize),
Buffering {
remaining: usize,
Skipping(NonZeroUize),
Buffering {
remaining: NonZeroUsize,

Using NonZeroUsize would make it clear the invariant of the state

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I can do that because remaining and skipping can be 0.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the neat part: if it's 0, then we just skip that stage and go to the next stage

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed! That's better that way.

chunk_type: ChunkType,
buffer: Vec<u8>,
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
},
Sending(PartialBuffer<Vec<u8>>),
}

impl Default for State {
fn default() -> Self {
State::StreamIdentifier(PartialBuffer::new([0; 4]))
}
}

impl DecodeV2 for SnappyDecoder {
fn reinit(&mut self) -> std::io::Result<()> {
*self = Self::new();
Comment thread
NobodyXu marked this conversation as resolved.
Outdated
Ok(())
}

fn decode(
&mut self,
input: &mut PartialBuffer<&[u8]>,
output: &mut WriteBuffer<'_>,
) -> std::io::Result<bool> {
loop {
match &mut self.state {
State::StreamIdentifier(header) => {
header.copy_unwritten_from(input);
if !header.unwritten().is_empty() {
return Ok(false);
}

let header = FrameHeader::parse(header.written())?;
if let ChunkType::Stream = header.chunk_type {
self.state = State::Skipping(header.data_frame_length as usize)
} else {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Invalid chunk type, expected Stream, got: {:?}",
header.chunk_type
),
));
}
}
State::ChunkHeader(header) => {
header.copy_unwritten_from(input);
if !header.unwritten().is_empty() {
return Ok(false);
}

let header = FrameHeader::parse(header.written())?;

let data_frame_length = header.data_frame_length as usize;

match header.chunk_type {
ChunkType::Stream
| ChunkType::ReservedSkippable(_)
| ChunkType::Padding => self.state = State::Skipping(data_frame_length),
ChunkType::Compressed | ChunkType::Uncompressed => {
self.state = State::Buffering {
remaining: data_frame_length,
chunk_type: header.chunk_type,
buffer: Vec::with_capacity(data_frame_length),
}
}
ChunkType::ReservedUnskippable(chunk_type) => {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!(
"Reserved unskippable chunk type encountered: {}",
chunk_type
),
))
}
}
}
State::Skipping(n) => {
let input_len = input.unwritten().len();
if input_len < *n {
input.advance(input_len);
*n -= input_len;
return Ok(false);
}
input.advance(*n);
self.state = State::ChunkHeader([0u8; 4].into())
}
State::Buffering {
remaining,
chunk_type,
buffer,
} => {
let input_buf = input.unwritten();
let boundary = (*remaining).min(input_buf.len());
let input_buf = &input_buf[..boundary];

*remaining -= input_buf.len();

buffer.extend_from_slice(input_buf);
input.advance(input_buf.len());

if *remaining != 0 {
return Ok(false);
}

// We're done buffering, so let's decode the chunk
let chunk_type = *chunk_type;
let buffer = mem::take(buffer);
let output = decode_chunk(chunk_type, buffer)?;
self.state = State::Sending(PartialBuffer::new(output))
}
State::Sending(buffer) => {
output.copy_unwritten_from(buffer);
if buffer.unwritten().is_empty() {
self.state = State::ChunkHeader([0u8; 4].into())
} else {
return Ok(false);
}
}
}
}
}

fn flush(&mut self, output: &mut WriteBuffer<'_>) -> std::io::Result<bool> {
match &mut self.state {
State::Sending(buffer) => {
output.copy_unwritten_from(buffer);
if buffer.unwritten().is_empty() {
self.state = State::ChunkHeader([0u8; 4].into());
Ok(true)
} else {
Ok(false)
}
}
_ => Ok(true),
}
}

fn finish(&mut self, _output: &mut WriteBuffer<'_>) -> std::io::Result<bool> {
match &mut self.state {
State::ChunkHeader(header) if header.unwritten().len() == 4 => Ok(true),
_ => Err(io::Error::from(io::ErrorKind::UnexpectedEof)),
}
}
}
Loading
Loading