diff --git a/changelog.d/http_source_decompression_bomb.security.md b/changelog.d/http_source_decompression_bomb.security.md new file mode 100644 index 0000000000000..9b7b39ec750c0 --- /dev/null +++ b/changelog.d/http_source_decompression_bomb.security.md @@ -0,0 +1,3 @@ +HTTP-based sources (`http_server`, `prometheus_pushgateway`, `prometheus_remote_write`, `heroku_logs`, `opentelemetry`) now cap decompressed request bodies at 100 MiB. Previously, a single unauthenticated request carrying a compressed payload (e.g. a gzip bomb) could allocate unbounded memory and OOM-kill the Vector process. Decompressed payloads exceeding the cap are rejected with HTTP 413, as are requests whose declared `Content-Length` exceeds the same limit. + +authors: pront diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index bc588fb713c0c..337ed060b97e5 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -1328,6 +1328,41 @@ mod tests { } } + #[tokio::test] + async fn http_rejects_gzip_bomb_with_413() { + // A modestly-sized gzipped blob of zeros that would expand past the default + // 100 MiB cap if decompression were unbounded. + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + let chunk = [0u8; 8 * 1024]; + for _ in 0..(200 * 1024 * 1024 / chunk.len()) { + encoder.write_all(&chunk).unwrap(); + } + let body = encoder.finish().unwrap(); + + let mut headers = HeaderMap::new(); + headers.insert("Content-Encoding", "gzip".parse().unwrap()); + + components::init_test(); + let (_rx, addr) = source( + vec![], + vec![], + "http_path", + "remote_ip", + "/", + "POST", + StatusCode::OK, + None, + true, + EventStatus::Delivered, + true, + None, + None, + ) + .await; + + assert_eq!(413, send_bytes(addr, body, headers).await); + } + #[tokio::test] async fn http_path() { let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async { diff --git a/src/sources/opentelemetry/http.rs b/src/sources/opentelemetry/http.rs index 42be681f1cb90..4590481608652 100644 --- a/src/sources/opentelemetry/http.rs +++ b/src/sources/opentelemetry/http.rs @@ -39,7 +39,10 @@ use crate::{ sources::{ http_server::HttpConfigParamKind, opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES}, - util::{add_headers, decompress_body}, + util::{ + add_headers, decompress_body, + http::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, limited_body}, + }, }, tls::MaybeTlsSettings, }; @@ -191,6 +194,8 @@ where + 'static + Fn(Option, HeaderMap, Bytes) -> Result, ErrorMessage>, { + let body_filter = limited_body(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE); + warp::post() .and(warp::path("v1")) .and(warp::path(telemetry_type)) @@ -201,7 +206,7 @@ where )) .and(warp::header::optional::("content-encoding")) .and(warp::header::headers_cloned()) - .and(warp::body::bytes()) + .and(body_filter) .and_then( move |encoding_header: Option, headers: HeaderMap, body: Bytes| { let events = make_events(encoding_header, headers, body); diff --git a/src/sources/util/http/encoding.rs b/src/sources/util/http/encoding.rs index c71a1891d604e..ebaf8a5a4de32 100644 --- a/src/sources/util/http/encoding.rs +++ b/src/sources/util/http/encoding.rs @@ -1,41 +1,86 @@ use std::io::Read; -use bytes::{Buf, Bytes}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; use flate2::read::{MultiGzDecoder, ZlibDecoder}; +use futures_util::StreamExt; use snap::raw::Decoder as SnappyDecoder; -use warp::http::StatusCode; +use warp::{Filter, filters::BoxedFilter, http::StatusCode}; use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError}; +/// Default cap on the decompressed body size produced by [`decompress_body`]. +/// +/// Prevents a compressed "bomb" payload from causing unbounded memory growth. +pub(crate) const DEFAULT_MAX_DECOMPRESSED_BODY_SIZE: usize = 100 * 1024 * 1024; + +/// Collects a request body into [`Bytes`] while enforcing an in-memory size cap. +pub(crate) fn limited_body(max_body_size: usize) -> BoxedFilter<(Bytes,)> { + let max_body_size_header = u64::try_from(max_body_size).unwrap_or(u64::MAX); + + warp::header::optional::("content-length") + .and_then(move |declared: Option| async move { + if declared.is_some_and(|len| len > max_body_size_header) { + Err(warp::reject::custom(request_body_too_large_error( + max_body_size, + ))) + } else { + Ok(()) + } + }) + .untuple_one() + .and(warp::body::stream()) + .and_then(move |body| async move { + collect_body_with_limit(body, max_body_size) + .await + .map_err(warp::reject::custom) + }) + .boxed() +} + /// Decompresses the body based on the Content-Encoding header. /// /// Supports gzip, deflate, snappy, zstd, and identity (no compression). -pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result { +/// +/// Caps the decompressed output at 100 MiB to mitigate decompression-bomb DoS attacks. +pub fn decompress_body(header: Option<&str>, body: Bytes) -> Result { + decompress_body_with_limit(header, body, Some(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE)) +} + +/// Like [`decompress_body`], but allows the caller to control the decompressed size cap. +/// +/// `max_decompressed_size = None` disables the cap (not recommended for unauthenticated input). +pub(crate) fn decompress_body_with_limit( + header: Option<&str>, + mut body: Bytes, + max_decompressed_size: Option, +) -> Result { if let Some(encodings) = header { for encoding in encodings.rsplit(',').map(str::trim) { body = match encoding { "identity" => body, - "gzip" => { - let mut decoded = Vec::new(); - MultiGzDecoder::new(body.reader()) - .read_to_end(&mut decoded) + "gzip" => decompress_reader( + MultiGzDecoder::new(body.reader()), + encoding, + max_decompressed_size, + )?, + "deflate" => decompress_reader( + ZlibDecoder::new(body.reader()), + encoding, + max_decompressed_size, + )?, + "snappy" => decompress_snappy(&body, max_decompressed_size)?, + "zstd" => { + let mut decoder = zstd::stream::read::Decoder::new(body.reader()) .map_err(|error| emit_decompress_error(encoding, error))?; - decoded.into() + if let Some(max) = max_decompressed_size + && let Some(window_log_max) = zstd_window_log_max(max) + { + decoder + .window_log_max(window_log_max) + .map_err(|error| emit_decompress_error(encoding, error))?; + } + decompress_reader(decoder, encoding, max_decompressed_size)? } - "deflate" => { - let mut decoded = Vec::new(); - ZlibDecoder::new(body.reader()) - .read_to_end(&mut decoded) - .map_err(|error| emit_decompress_error(encoding, error))?; - decoded.into() - } - "snappy" => SnappyDecoder::new() - .decompress_vec(&body) - .map_err(|error| emit_decompress_error(encoding, error))? - .into(), - "zstd" => zstd::decode_all(body.reader()) - .map_err(|error| emit_decompress_error(encoding, error))? - .into(), encoding => { return Err(ErrorMessage::new( StatusCode::UNSUPPORTED_MEDIA_TYPE, @@ -46,9 +91,121 @@ pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result( + reader: R, + encoding: &str, + max_decompressed_size: Option, +) -> Result { + let mut decoded = Vec::new(); + match max_decompressed_size { + Some(max) => { + // Read one byte beyond the cap so we can detect overflow without ambiguity. + let limit = u64::try_from(max).unwrap_or(u64::MAX).saturating_add(1); + reader + .take(limit) + .read_to_end(&mut decoded) + .map_err(|error| emit_decompress_error(encoding, error))?; + if decoded.len() > max { + return Err(decompressed_too_large_error(encoding, max)); + } + } + None => { + let mut reader = reader; + reader + .read_to_end(&mut decoded) + .map_err(|error| emit_decompress_error(encoding, error))?; + } + } + Ok(decoded.into()) +} + +fn decompress_snappy( + body: &Bytes, + max_decompressed_size: Option, +) -> Result { + // Snappy stores the decompressed length in the frame header, so reject oversized + // payloads before allocating the output buffer. + if let Some(max) = max_decompressed_size { + let len = snap::raw::decompress_len(body) + .map_err(|error| emit_decompress_error("snappy", error))?; + if len > max { + return Err(decompressed_too_large_error("snappy", max)); + } + } + let decoded = SnappyDecoder::new() + .decompress_vec(body) + .map_err(|error| emit_decompress_error("snappy", error))?; + Ok(decoded.into()) +} + +async fn collect_body_with_limit(body: S, max_body_size: usize) -> Result +where + S: futures_util::Stream>, + B: Buf, +{ + futures_util::pin_mut!(body); + + let mut bytes = BytesMut::new(); + while let Some(chunk) = body.next().await { + let chunk = chunk.map_err(body_read_error)?; + if chunk.remaining() > max_body_size.saturating_sub(bytes.len()) { + return Err(request_body_too_large_error(max_body_size)); + } + bytes.put(chunk); + } + + Ok(bytes.freeze()) +} + +fn ensure_body_within_limit( + body: &Bytes, + encoding: &str, + max_decompressed_size: Option, +) -> Result<(), ErrorMessage> { + if let Some(max) = max_decompressed_size + && body.len() > max + { + return Err(decompressed_too_large_error(encoding, max)); + } + Ok(()) +} + +fn zstd_window_log_max(max_decompressed_size: usize) -> Option { + const MIN_ZSTD_WINDOW_LOG: u32 = 10; + const MAX_ZSTD_WINDOW_LOG: u32 = 31; + + // `window_log_max` is expressed as a power-of-two log. Use the smallest zstd + // window capable of representing the configured byte budget. + max_decompressed_size.checked_sub(1).map(|max_index| { + (usize::BITS - max_index.leading_zeros()).clamp(MIN_ZSTD_WINDOW_LOG, MAX_ZSTD_WINDOW_LOG) + }) +} + +fn request_body_too_large_error(max: usize) -> ErrorMessage { + ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Request body exceeds limit of {max} bytes."), + ) +} + +fn decompressed_too_large_error(encoding: &str, max: usize) -> ErrorMessage { + ErrorMessage::new( + StatusCode::PAYLOAD_TOO_LARGE, + format!("Decompressed {encoding} body exceeds limit of {max} bytes."), + ) +} + +fn body_read_error(error: warp::Error) -> ErrorMessage { + ErrorMessage::new( + StatusCode::BAD_REQUEST, + format!("Failed reading request body: {error}"), + ) +} + pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage { emit!(HttpDecompressError { encoding, @@ -59,3 +216,128 @@ pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> E format!("Failed decompressing payload with {encoding} decoder."), ) } + +#[cfg(test)] +mod tests { + use std::io::Write; + + use flate2::{Compression, write::GzEncoder}; + use futures_util::stream; + use zstd::stream::Encoder as ZstdEncoder; + + use super::*; + + fn gzip_payload(plaintext: &[u8]) -> Bytes { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(plaintext).unwrap(); + encoder.finish().unwrap().into() + } + + fn zstd_payload_with_window_log(plaintext: &[u8], window_log: u32) -> Bytes { + let mut encoder = ZstdEncoder::new(Vec::new(), 0).unwrap(); + encoder.window_log(window_log).unwrap(); + encoder.write_all(plaintext).unwrap(); + encoder.finish().unwrap().into() + } + + #[test] + fn gzip_within_limit_succeeds() { + let plaintext = vec![0u8; 10_000]; + let body = gzip_payload(&plaintext); + + let decoded = decompress_body_with_limit(Some("gzip"), body, Some(100_000)).unwrap(); + assert_eq!(decoded.len(), plaintext.len()); + } + + #[test] + fn gzip_exceeding_limit_returns_413() { + // Compress 1 MB of zeros, then cap at 1 KB. + let plaintext = vec![0u8; 1_000_000]; + let body = gzip_payload(&plaintext); + + let err = + decompress_body_with_limit(Some("gzip"), body, Some(1024)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn snappy_exceeding_limit_returns_413_before_allocating() { + // 2 MB of zeros. Snappy keeps the embedded length in the frame header. + let plaintext = vec![0u8; 2 * 1024 * 1024]; + let compressed = snap::raw::Encoder::new().compress_vec(&plaintext).unwrap(); + + let err = decompress_body_with_limit(Some("snappy"), compressed.into(), Some(1024)) + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn zstd_exceeding_limit_returns_413() { + let plaintext = vec![0u8; 10_000]; + let compressed = zstd_payload_with_window_log(plaintext.as_slice(), 10); + + let err = decompress_body_with_limit(Some("zstd"), compressed, Some(1024)) + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn identity_passes_through() { + let body: Bytes = Bytes::from_static(b"hello world"); + let decoded = decompress_body(Some("identity"), body.clone()).unwrap(); + assert_eq!(decoded, body); + } + + #[test] + fn identity_exceeding_limit_returns_413() { + let body = Bytes::from_static(b"hello world"); + + let err = + decompress_body_with_limit(Some("identity"), body, Some(5)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn missing_content_encoding_exceeding_limit_returns_413() { + let body = Bytes::from_static(b"hello world"); + + let err = decompress_body_with_limit(None, body, Some(5)).expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } + + #[test] + fn zstd_window_log_tracks_limit() { + assert_eq!(zstd_window_log_max(0), None); + assert_eq!(zstd_window_log_max(1), Some(10)); + assert_eq!(zstd_window_log_max(1024), Some(10)); + assert_eq!(zstd_window_log_max(1025), Some(11)); + assert_eq!( + zstd_window_log_max(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE), + Some(27) + ); + } + + #[tokio::test] + async fn collect_body_with_limit_succeeds_within_limit() { + let body = stream::iter([ + Ok::<_, warp::Error>(Bytes::from_static(b"hello")), + Ok::<_, warp::Error>(Bytes::from_static(b" world")), + ]); + + let collected = collect_body_with_limit(body, 11).await.unwrap(); + assert_eq!(collected, Bytes::from_static(b"hello world")); + } + + #[tokio::test] + async fn collect_body_with_limit_rejects_oversized_stream() { + let body = stream::iter([ + Ok::<_, warp::Error>(Bytes::from_static(b"hello")), + Ok::<_, warp::Error>(Bytes::from_static(b" world")), + ]); + + let err = collect_body_with_limit(body, 5) + .await + .expect_err("should reject"); + assert_eq!(err.status_code(), StatusCode::PAYLOAD_TOO_LARGE); + } +} diff --git a/src/sources/util/http/mod.rs b/src/sources/util/http/mod.rs index 22c75b925f14a..67734286e2f79 100644 --- a/src/sources/util/http/mod.rs +++ b/src/sources/util/http/mod.rs @@ -16,6 +16,8 @@ mod prelude; ))] mod query; +#[cfg(feature = "sources-opentelemetry")] +pub(crate) use encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, limited_body}; #[cfg(feature = "sources-utils-http-encoding")] pub use encoding::{decompress_body, emit_decompress_error}; #[cfg(feature = "sources-utils-http-headers")] diff --git a/src/sources/util/http/prelude.rs b/src/sources/util/http/prelude.rs index 9645b432ad82b..d00870ba940fd 100644 --- a/src/sources/util/http/prelude.rs +++ b/src/sources/util/http/prelude.rs @@ -22,7 +22,7 @@ use warp::{ reject::Rejection, }; -use super::encoding::decompress_body; +use super::encoding::{DEFAULT_MAX_DECOMPRESSED_BODY_SIZE, decompress_body, limited_body}; use crate::{ SourceSender, common::http::{ErrorMessage, server_auth::HttpServerAuthConfig}, @@ -112,6 +112,8 @@ pub trait HttpSource: Clone + Send + Sync + 'static { for s in path.split('/').filter(|&x| !x.is_empty()) { filter = filter.and(warp::path(s.to_string())).boxed() } + let body_filter = limited_body(DEFAULT_MAX_DECOMPRESSED_BODY_SIZE); + let svc = filter .and(warp::path::tail()) .and_then(move |tail: Tail| async move { @@ -131,7 +133,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static { .and(warp::path::full()) .and(warp::header::optional::("content-encoding")) .and(warp::header::headers_cloned()) - .and(warp::body::bytes()) + .and(body_filter) .and(warp::query::>()) .and(warp::filters::ext::optional()) .and_then(