diff --git a/actix-http/CHANGES.md b/actix-http/CHANGES.md index bcc13abe091..6e4f8ec32d7 100644 --- a/actix-http/CHANGES.md +++ b/actix-http/CHANGES.md @@ -3,9 +3,11 @@ ## Unreleased - When configured, gracefully close HTTP/1 connections after early responses to unread request bodies. [#3967] +- Wake HTTP/1 payload receivers with an incomplete-payload error when the sender is dropped before EOF. [#3100] - Update `foldhash` dependency to `0.2`. [#3967]: https://github.com/actix/actix-web/issues/3967 +[#3100]: https://github.com/actix/actix-web/issues/3100 ## 3.12.1 diff --git a/actix-http/src/h1/payload.rs b/actix-http/src/h1/payload.rs index e12c8780636..3a1293bb8d4 100644 --- a/actix-http/src/h1/payload.rs +++ b/actix-http/src/h1/payload.rs @@ -140,11 +140,20 @@ impl PayloadSender { } } +impl Drop for PayloadSender { + fn drop(&mut self) { + if let Some(shared) = self.inner.upgrade() { + shared.borrow_mut().close_sender(); + } + } +} + #[derive(Debug)] struct Inner { len: usize, eof: bool, err: Option, + sender_closed: bool, need_read: bool, items: VecDeque, task: Option, @@ -157,6 +166,7 @@ impl Inner { eof, len: 0, err: None, + sender_closed: eof, items: VecDeque::new(), need_read: true, task: None, @@ -200,12 +210,21 @@ impl Inner { #[inline] fn set_error(&mut self, err: PayloadError) { + self.sender_closed = true; self.err = Some(err); self.wake(); } + fn close_sender(&mut self) { + if !self.sender_closed { + self.sender_closed = true; + self.set_error(PayloadError::Incomplete(None)); + } + } + #[inline] fn feed_eof(&mut self) { + self.sender_closed = true; self.eof = true; self.wake(); } @@ -332,6 +351,16 @@ mod tests { timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap(); } + #[actix_rt::test] + async fn wake_on_sender_drop() { + let (sender, payload) = Payload::create(false); + let (rx, handle) = prepare_waking_test(payload, Some(Err(()))); + + rx.await.unwrap(); + drop(sender); + timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap(); + } + #[actix_rt::test] async fn test_unread_data() { let (_, mut payload) = Payload::create(false); diff --git a/actix-multipart/src/multipart.rs b/actix-multipart/src/multipart.rs index bde7d122fdc..6b53df181ab 100644 --- a/actix-multipart/src/multipart.rs +++ b/actix-multipart/src/multipart.rs @@ -1013,7 +1013,7 @@ mod tests { #[actix_rt::test] async fn test_multipart_payload_consumption() { // with sample payload and HttpRequest with no headers - let (_, inner_payload) = h1::Payload::create(false); + let (_sender, inner_payload) = h1::Payload::create(false); let mut payload = actix_web::dev::Payload::from(inner_payload); let req = TestRequest::default().to_http_request(); diff --git a/actix-multipart/src/payload.rs b/actix-multipart/src/payload.rs index 4c9929aedfe..5fd0aa790ff 100644 --- a/actix-multipart/src/payload.rs +++ b/actix-multipart/src/payload.rs @@ -231,7 +231,7 @@ mod tests { #[actix_rt::test] async fn basic() { - let (_, payload) = h1::Payload::create(false); + let (_sender, payload) = h1::Payload::create(false); let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT); assert_eq!(payload.buf.len(), 0); diff --git a/actix-web/CHANGES.md b/actix-web/CHANGES.md index e0dffae8e41..2c5c0529004 100644 --- a/actix-web/CHANGES.md +++ b/actix-web/CHANGES.md @@ -7,6 +7,7 @@ - Panic when calling `Route::to()` or `Route::service()` after `Route::wrap()` to prevent silently dropping route middleware. [#3944] - Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346] - Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded. +- Fix app data being retained after graceful shutdown with in-flight slow request bodies. [#3100] - Update `foldhash` dependency to `0.2`. - Update `rand` dependency to `0.10`. - Add `HttpServer::h1_write_buffer_size()`. @@ -14,6 +15,7 @@ [#3944]: https://github.com/actix/actix-web/pull/3944 [#3346]: https://github.com/actix/actix-web/issues/3346 [#3542]: https://github.com/actix/actix-web/issues/3542 +[#3100]: https://github.com/actix/actix-web/issues/3100 ## 4.13.0 diff --git a/actix-web/src/app_service.rs b/actix-web/src/app_service.rs index fadcf825b3b..1f99d20956b 100644 --- a/actix-web/src/app_service.rs +++ b/actix-web/src/app_service.rs @@ -255,7 +255,7 @@ where T: Service, Error = Error>, { fn drop(&mut self) { - self.app_state.pool().clear(); + self.app_state.pool().disable(); } } diff --git a/actix-web/src/request.rs b/actix-web/src/request.rs index 08be38ca6cf..d4430bd513d 100644 --- a/actix-web/src/request.rs +++ b/actix-web/src/request.rs @@ -1,5 +1,5 @@ use std::{ - cell::{Ref, RefCell, RefMut}, + cell::{Cell, Ref, RefCell, RefMut}, collections::HashMap, fmt, hash::{BuildHasher, Hash}, @@ -669,6 +669,7 @@ impl fmt::Debug for HttpRequest { /// The pool's default capacity is 128 items. pub(crate) struct HttpRequestPool { inner: RefCell>>, + enabled: Cell, cap: usize, } @@ -682,6 +683,7 @@ impl HttpRequestPool { pub(crate) fn with_capacity(cap: usize) -> Self { HttpRequestPool { inner: RefCell::new(Vec::with_capacity(cap)), + enabled: Cell::new(true), cap, } } @@ -698,7 +700,7 @@ impl HttpRequestPool { /// Check if the pool still has capacity for request storage. #[inline] pub(crate) fn is_available(&self) -> bool { - self.inner.borrow_mut().len() < self.cap + self.enabled.get() && self.inner.borrow().len() < self.cap } /// Push a request to pool. @@ -707,15 +709,16 @@ impl HttpRequestPool { self.inner.borrow_mut().push(req); } - /// Clears all allocated HttpRequest objects. - pub(crate) fn clear(&self) { - self.inner.borrow_mut().clear() + /// Prevents future requests from being returned to the pool and clears existing entries. + pub(crate) fn disable(&self) { + self.enabled.set(false); + self.inner.borrow_mut().clear(); } } #[cfg(test)] mod tests { - use std::collections::HashMap; + use std::{collections::HashMap, sync::Arc}; use bytes::Bytes; @@ -993,6 +996,41 @@ mod tests { assert_eq!(resp.headers().get("pool_cap").unwrap(), "128"); } + #[actix_rt::test] + async fn test_request_dropped_after_service_does_not_reenter_pool() { + struct State { + _data: Arc, + } + + let (weak_data, app_data) = { + let data = Arc::new("data".to_owned()); + (Arc::downgrade(&data), web::Data::new(State { _data: data })) + }; + + let held_req = Rc::new(RefCell::new(None)); + + { + let held_req = Rc::clone(&held_req); + let srv = init_service(App::new().app_data(app_data).service(web::resource("/").to( + move |req: HttpRequest| { + *held_req.borrow_mut() = Some(req.clone()); + HttpResponse::Ok() + }, + ))) + .await; + + let resp = call_service(&srv, TestRequest::default().to_request()).await; + assert_eq!(resp.status(), StatusCode::OK); + + drop(resp); + drop(srv); + } + + assert!(weak_data.upgrade().is_some()); + drop(held_req.borrow_mut().take()); + assert!(weak_data.upgrade().is_none()); + } + #[actix_rt::test] async fn test_data() { let srv = init_service(App::new().app_data(10usize).service(web::resource("/").to( diff --git a/actix-web/tests/test_httpserver.rs b/actix-web/tests/test_httpserver.rs index 44283ebd478..403b66d83ad 100644 --- a/actix-web/tests/test_httpserver.rs +++ b/actix-web/tests/test_httpserver.rs @@ -1,9 +1,16 @@ #[cfg(feature = "openssl")] extern crate tls_openssl as openssl; -use std::{sync::mpsc, thread, time::Duration}; +use std::{ + convert::Infallible, + sync::{mpsc, Arc}, + thread, + time::Duration, +}; -use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer}; +use actix_web::{rt::time::sleep, web, App, HttpRequest, HttpResponse, HttpServer}; +use bytes::Bytes; +use futures_util::stream; #[actix_rt::test] async fn test_start() { @@ -74,6 +81,74 @@ async fn test_start() { srv.stop(false).await; } +#[actix_rt::test] +async fn test_app_data_dropped_after_graceful_shutdown_with_slow_request() { + struct State { + _data: Arc, + } + + async fn echo(_body: web::Json) -> HttpResponse { + HttpResponse::Ok().finish() + } + + let (weak_data, app_data) = { + let data = Arc::new("data".to_owned()); + (Arc::downgrade(&data), web::Data::new(State { _data: data })) + }; + + let server = HttpServer::new(move || { + App::new() + .app_data(app_data.clone()) + .service(web::resource("/echo").route(web::post().to(echo))) + }) + .workers(1) + .shutdown_timeout(1) + .bind(("127.0.0.1", 0)) + .unwrap(); + + let addr = server.addrs()[0]; + let server = server.run(); + let server_handle = server.handle(); + + let send_request = async move { + sleep(Duration::from_millis(100)).await; + + let slow_body = stream::unfold(0, |idx| async move { + if idx < 8 { + sleep(Duration::from_millis(200)).await; + Some((Ok::<_, Infallible>(Bytes::from_static(b" ")), idx + 1)) + } else { + None + } + }); + + let client = awc::Client::default(); + let _ = client + .post(format!("http://{addr}/echo")) + .insert_header(("content-type", "application/json")) + .send_stream(slow_body) + .await; + }; + + let graceful_stop = async move { + sleep(Duration::from_millis(300)).await; + server_handle.stop(true).await; + }; + + let (server_res, (), ()) = tokio::join!(server, send_request, graceful_stop); + server_res.unwrap(); + + for _ in 0..20 { + sleep(Duration::from_millis(100)).await; + + if weak_data.upgrade().is_none() { + return; + } + } + + panic!("app data still referenced after graceful shutdown"); +} + #[cfg(feature = "openssl")] fn ssl_acceptor() -> openssl::ssl::SslAcceptorBuilder { use openssl::{