Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/http_source_decompression_bomb.security.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
HTTP-based sources (`http_server`, `prometheus_pushgateway`, `prometheus_remote_write`, `heroku_logs`, `opentelemetry`) now cap decompressed request bodies at 100 MiB. Previously, a single unauthenticated request carrying a compressed payload (e.g. a gzip bomb) could allocate unbounded memory and OOM-kill the Vector process. Decompressed payloads exceeding the cap are rejected with HTTP 413, as are requests whose declared `Content-Length` exceeds the same limit.

authors: pront
33 changes: 33 additions & 0 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,39 @@ mod tests {
}
}

#[tokio::test]
async fn http_rejects_gzip_bomb_with_413() {
// A modestly-sized gzipped blob of zeros that would expand past the default
// 100 MiB cap if decompression were unbounded.
let plaintext = vec![0u8; 200 * 1024 * 1024];
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&plaintext).unwrap();
let body = encoder.finish().unwrap();

let mut headers = HeaderMap::new();
headers.insert("Content-Encoding", "gzip".parse().unwrap());

components::init_test();
let (_rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
None,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;

assert_eq!(413, send_bytes(addr, body, headers).await);
}

#[tokio::test]
async fn http_path() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
Expand Down
249 changes: 229 additions & 20 deletions src/sources/util/http/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,55 @@ use warp::http::StatusCode;

use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};

/// Default cap on the decompressed body size produced by [`decompress_body`].
///
/// Prevents a compressed "bomb" payload from causing unbounded memory growth.
pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024;

/// Decompresses the body based on the Content-Encoding header.
///
/// Supports gzip, deflate, snappy, zstd, and identity (no compression).
pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
///
/// Caps the decompressed output at 100 MiB to mitigate decompression-bomb DoS attacks.
pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
decompress_body_with_limit(header, body, Some(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE))
Comment thread
pront marked this conversation as resolved.
}

/// Like [`decompress_body`], but allows the caller to control the decompressed size cap.
///
/// `max_decompressed_size = None` disables the cap (not recommended for unauthenticated input).
pub(crate) fn decompress_body_with_limit(
header: Option<&str>,
mut body: Bytes,
max_decompressed_size: Option<usize>,
) -> Result<Bytes, ErrorMessage> {
if let Some(encodings) = header {
for encoding in encodings.rsplit(',').map(str::trim) {
body = match encoding {
"identity" => body,
"gzip" => {
let mut decoded = Vec::new();
MultiGzDecoder::new(body.reader())
.read_to_end(&mut decoded)
.map_err(|error| emit_decompress_error(encoding, error))?;
decoded.into()
}
"deflate" => {
let mut decoded = Vec::new();
ZlibDecoder::new(body.reader())
.read_to_end(&mut decoded)
"gzip" => decompress_reader(
MultiGzDecoder::new(body.reader()),
encoding,
max_decompressed_size,
)?,
"deflate" => decompress_reader(
ZlibDecoder::new(body.reader()),
encoding,
max_decompressed_size,
)?,
"snappy" => decompress_snappy(&body, max_decompressed_size)?,
"zstd" => {
let mut decoder = zstd::stream::read::Decoder::new(body.reader())
.map_err(|error| emit_decompress_error(encoding, error))?;
decoded.into()
if let Some(max) = max_decompressed_size
&& let Some(window_log_max) = zstd_window_log_max(max)
{
decoder
.window_log_max(window_log_max)
.map_err(|error| emit_decompress_error(encoding, error))?;
}
decompress_reader(decoder, encoding, max_decompressed_size)?
}
"snappy" => SnappyDecoder::new()
.decompress_vec(&body)
.map_err(|error| emit_decompress_error(encoding, error))?
.into(),
"zstd" => zstd::decode_all(body.reader())
.map_err(|error| emit_decompress_error(encoding, error))?
.into(),
encoding => {
return Err(ErrorMessage::new(
StatusCode::UNSUPPORTED_MEDIA_TYPE,
Expand All @@ -46,9 +66,88 @@ pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, E
}
}

ensure_body_within_limit(&body, "identity", max_decompressed_size)?;
Ok(body)
}

fn decompress_reader<R: Read>(
reader: R,
encoding: &str,
max_decompressed_size: Option<usize>,
) -> Result<Bytes, ErrorMessage> {
let mut decoded = Vec::new();
match max_decompressed_size {
Some(max) => {
// Read one byte beyond the cap so we can detect overflow without ambiguity.
let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1);
reader
.take(limit)
.read_to_end(&mut decoded)
.map_err(|error| emit_decompress_error(encoding, error))?;
if decoded.len() > max {
return Err(decompressed_too_large_error(encoding, max));
}
}
None => {
let mut reader = reader;
reader
.read_to_end(&mut decoded)
.map_err(|error| emit_decompress_error(encoding, error))?;
}
}
Ok(decoded.into())
}

fn decompress_snappy(
body: &Bytes,
max_decompressed_size: Option<usize>,
) -> Result<Bytes, ErrorMessage> {
// Snappy stores the decompressed length in the frame header, so reject oversized
// payloads before allocating the output buffer.
if let Some(max) = max_decompressed_size {
let len = snap::raw::decompress_len(body)
.map_err(|error| emit_decompress_error("snappy", error))?;
if len > max {
return Err(decompressed_too_large_error("snappy", max));
}
}
let decoded = SnappyDecoder::new()
.decompress_vec(body)
.map_err(|error| emit_decompress_error("snappy", error))?;
Ok(decoded.into())
}

fn ensure_body_within_limit(
body: &Bytes,
encoding: &str,
max_decompressed_size: Option<usize>,
) -> Result<(), ErrorMessage> {
if let Some(max) = max_decompressed_size
&& body.len() > max
{
return Err(decompressed_too_large_error(encoding, max));
}
Ok(())
}

fn zstd_window_log_max(max_decompressed_size: usize) -> Option<u32> {
const MIN_ZSTD_WINDOW_LOG: u32 = 10;
const MAX_ZSTD_WINDOW_LOG: u32 = 31;

// `window_log_max` is expressed as a power-of-two log. Use the smallest zstd
// window capable of representing the configured byte budget.
max_decompressed_size.checked_sub(1).map(|max_index| {
(usize::BITS - max_index.leading_zeros()).clamp(MIN_ZSTD_WINDOW_LOG, MAX_ZSTD_WINDOW_LOG)
})
}

fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage {
ErrorMessage::new(
StatusCode::PAYLOAD_TOO_LARGE,
format!("Decompressed {encoding} body exceeds limit of {max} bytes."),
)
}

pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
emit!(HttpDecompressError {
encoding,
Expand All @@ -59,3 +158,113 @@ pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> E
format!("Failed decompressing payload with {encoding} decoder."),
)
}

#[cfg(test)]
mod tests {
use std::io::Write;

use flate2::{Compression, write::GzEncoder};
use zstd::stream::Encoder as ZstdEncoder;

use super::*;

fn gzip_payload(plaintext: &[u8]) -> Bytes {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(plaintext).unwrap();
encoder.finish().unwrap().into()
}

fn zstd_payload_with_window_log(plaintext: &[u8], window_log: u32) -> Bytes {
let mut encoder = ZstdEncoder::new(Vec::new(), 0).unwrap();
encoder.window_log(window_log).unwrap();
encoder.write_all(plaintext).unwrap();
encoder.finish().unwrap().into()
}

#[test]
fn gzip_within_limit_succeeds() {
let plaintext = vec![0u8; 10_000];
let body = gzip_payload(&plaintext);

let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap();
assert_eq!(decoded.len(), plaintext.len());
}

#[test]
fn gzip_exceeding_limit_returns_413() {
// Compress 1 MB of zeros, then cap at 1 KB.
let plaintext = vec![0u8; 1_000_000];
let body = gzip_payload(&plaintext);

let err =
decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn default_limit_protects_against_decompression_bomb() {
// A small input that would expand far past 100 MB if we let it run unbounded.
let plaintext = vec![0u8; 200 * 1024 * 1024];
Comment thread
pront marked this conversation as resolved.
Outdated
let body = gzip_payload(&plaintext);

let err = decompress_body(Some("gzip"), body).expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn snappy_exceeding_limit_returns_413_before_allocating() {
// 2 MB of zeros. Snappy keeps the embedded length in the frame header.
let plaintext = vec![0u8; 2 * 1024 * 1024];
let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap();

let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024))
.expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn zstd_exceeding_limit_returns_413() {
let plaintext = vec![0u8; 10_000];
let compressed = zstd_payload_with_window_log(plaintext.as_slice(), 10);

let err = decompress_body_with_limit(Some("zstd"), compressed, Some(1024))
.expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn identity_passes_through() {
let body: Bytes = Bytes::from_static(b"hello world");
let decoded = decompress_body(Some("identity"), body.clone()).unwrap();
assert_eq!(decoded, body);
}

#[test]
fn identity_exceeding_limit_returns_413() {
let body = Bytes::from_static(b"hello world");

let err =
decompress_body_with_limit(Some("identity"), body, Some(5)).expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn missing_content_encoding_exceeding_limit_returns_413() {
let body = Bytes::from_static(b"hello world");

let err = decompress_body_with_limit(None, body, Some(5)).expect_err("should reject");
assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE);
}

#[test]
fn zstd_window_log_tracks_limit() {
assert_eq!(zstd_window_log_max(0), None);
assert_eq!(zstd_window_log_max(1), Some(10));
assert_eq!(zstd_window_log_max(1024), Some(10));
assert_eq!(zstd_window_log_max(1025), Some(11));
assert_eq!(
zstd_window_log_max(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE),
Some(27)
);
}
}
26 changes: 24 additions & 2 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use warp::{
reject::Rejection,
};

use super::encoding::decompress_body;
use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body};
use crate::{
SourceSender,
common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
Expand Down Expand Up @@ -99,6 +99,28 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
for s in path.split('/').filter(|&x| !x.is_empty()) {
filter = filter.and(warp::path(s.to_string())).boxed()
}
// Defense-in-depth: reject oversized requests up front based on the declared
// `Content-Length`, before reading or decompressing the body. Mirrors the
// decompressed-body cap applied in `decompress_body`.
const MAX_REQUEST_BODY_SIZE: u64 = DEFAULT_MAX_DECOMPRESSED_BODY_SIZE as u64;
let body_filter: BoxedFilter<(Bytes,)> = warp::header::optional::<u64>(
"content-length",
)
.and_then(|declared: Option<u64>| async move {
match declared {
Some(len) if len > MAX_REQUEST_BODY_SIZE => {
Err(warp::reject::custom(ErrorMessage::new(
StatusCode::PAYLOAD_TOO_LARGE,
format!("Request body exceeds limit of {MAX_REQUEST_BODY_SIZE} bytes."),
)))
}
_ => Ok(()),
}
})
.untuple_one()
.and(warp::body::bytes())
Comment thread
pront marked this conversation as resolved.
Outdated
.boxed();

let svc = filter
.and(warp::path::tail())
.and_then(move |tail: Tail| async move {
Expand All @@ -118,7 +140,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
.and(warp::path::full())
.and(warp::header::optional::<String>("content-encoding"))
.and(warp::header::headers_cloned())
.and(warp::body::bytes())
.and(body_filter)
.and(warp::query::<HashMap<String, String>>())
.and(warp::filters::ext::optional())
.and_then(
Expand Down
Loading