Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
## Unreleased

- When configured, gracefully close HTTP/1 connections after early responses to unread request bodies. [#3967]
- Wake HTTP/1 payload receivers with an incomplete-payload error when the sender is dropped before EOF. [#3100]
- Update `foldhash` dependency to `0.2`.

[#3967]: https://github.com/actix/actix-web/issues/3967
[#3100]: https://github.com/actix/actix-web/issues/3100

## 3.12.1

Expand Down
29 changes: 29 additions & 0 deletions actix-http/src/h1/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,20 @@ impl PayloadSender {
}
}

impl Drop for PayloadSender {
fn drop(&mut self) {
if let Some(shared) = self.inner.upgrade() {
shared.borrow_mut().close_sender();
}
}
}

#[derive(Debug)]
struct Inner {
len: usize,
eof: bool,
err: Option<PayloadError>,
sender_closed: bool,
need_read: bool,
items: VecDeque<Bytes>,
task: Option<Waker>,
Expand All @@ -157,6 +166,7 @@ impl Inner {
eof,
len: 0,
err: None,
sender_closed: eof,
items: VecDeque::new(),
need_read: true,
task: None,
Expand Down Expand Up @@ -200,12 +210,21 @@ impl Inner {

#[inline]
fn set_error(&mut self, err: PayloadError) {
self.sender_closed = true;
self.err = Some(err);
self.wake();
}

fn close_sender(&mut self) {
if !self.sender_closed {
self.sender_closed = true;
self.set_error(PayloadError::Incomplete(None));
}
}

#[inline]
fn feed_eof(&mut self) {
self.sender_closed = true;
self.eof = true;
self.wake();
}
Expand Down Expand Up @@ -332,6 +351,16 @@ mod tests {
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}

#[actix_rt::test]
async fn wake_on_sender_drop() {
let (sender, payload) = Payload::create(false);
let (rx, handle) = prepare_waking_test(payload, Some(Err(())));

rx.await.unwrap();
drop(sender);
timeout(WAKE_TIMEOUT, handle).await.unwrap().unwrap();
}

#[actix_rt::test]
async fn test_unread_data() {
let (_, mut payload) = Payload::create(false);
Expand Down
2 changes: 1 addition & 1 deletion actix-multipart/src/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ mod tests {
#[actix_rt::test]
async fn test_multipart_payload_consumption() {
// with sample payload and HttpRequest with no headers
let (_, inner_payload) = h1::Payload::create(false);
let (_sender, inner_payload) = h1::Payload::create(false);
let mut payload = actix_web::dev::Payload::from(inner_payload);
let req = TestRequest::default().to_http_request();

Expand Down
2 changes: 1 addition & 1 deletion actix-multipart/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ mod tests {

#[actix_rt::test]
async fn basic() {
let (_, payload) = h1::Payload::create(false);
let (_sender, payload) = h1::Payload::create(false);
let mut payload = PayloadBuffer::new_with_limit(payload, DEFAULT_BUFFER_LIMIT);

assert_eq!(payload.buf.len(), 0);
Expand Down
2 changes: 2 additions & 0 deletions actix-web/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
- Panic when calling `Route::to()` or `Route::service()` after `Route::wrap()` to prevent silently dropping route middleware. [#3944]
- Fix `HttpRequest::{match_pattern,match_name}` reporting path-only matches when route guards disambiguate overlapping resources. [#3346]
- Fix `Readlines` handling of lines split across payload chunks so combined line limits are enforced and complete lines are yielded.
- Fix app data being retained after graceful shutdown with in-flight slow request bodies. [#3100]
- Update `foldhash` dependency to `0.2`.
- Update `rand` dependency to `0.10`.
- Add `HttpServer::h1_write_buffer_size()`.

[#3944]: https://github.com/actix/actix-web/pull/3944
[#3346]: https://github.com/actix/actix-web/issues/3346
[#3542]: https://github.com/actix/actix-web/issues/3542
[#3100]: https://github.com/actix/actix-web/issues/3100

## 4.13.0

Expand Down
2 changes: 1 addition & 1 deletion actix-web/src/app_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ where
T: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
{
fn drop(&mut self) {
self.app_state.pool().clear();
self.app_state.pool().disable();
}
}

Expand Down
50 changes: 44 additions & 6 deletions actix-web/src/request.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
cell::{Ref, RefCell, RefMut},
cell::{Cell, Ref, RefCell, RefMut},
collections::HashMap,
fmt,
hash::{BuildHasher, Hash},
Expand Down Expand Up @@ -669,6 +669,7 @@ impl fmt::Debug for HttpRequest {
/// The pool's default capacity is 128 items.
pub(crate) struct HttpRequestPool {
inner: RefCell<Vec<Rc<HttpRequestInner>>>,
enabled: Cell<bool>,
cap: usize,
}

Expand All @@ -682,6 +683,7 @@ impl HttpRequestPool {
pub(crate) fn with_capacity(cap: usize) -> Self {
HttpRequestPool {
inner: RefCell::new(Vec::with_capacity(cap)),
enabled: Cell::new(true),
cap,
}
}
Expand All @@ -698,7 +700,7 @@ impl HttpRequestPool {
/// Check if the pool still has capacity for request storage.
#[inline]
pub(crate) fn is_available(&self) -> bool {
self.inner.borrow_mut().len() < self.cap
self.enabled.get() && self.inner.borrow().len() < self.cap
}

/// Push a request to pool.
Expand All @@ -707,15 +709,16 @@ impl HttpRequestPool {
self.inner.borrow_mut().push(req);
}

/// Clears all allocated HttpRequest objects.
pub(crate) fn clear(&self) {
self.inner.borrow_mut().clear()
/// Prevents future requests from being returned to the pool and clears existing entries.
pub(crate) fn disable(&self) {
self.enabled.set(false);
self.inner.borrow_mut().clear();
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::{collections::HashMap, sync::Arc};

use bytes::Bytes;

Expand Down Expand Up @@ -993,6 +996,41 @@ mod tests {
assert_eq!(resp.headers().get("pool_cap").unwrap(), "128");
}

#[actix_rt::test]
async fn test_request_dropped_after_service_does_not_reenter_pool() {
struct State {
_data: Arc<String>,
}

let (weak_data, app_data) = {
let data = Arc::new("data".to_owned());
(Arc::downgrade(&data), web::Data::new(State { _data: data }))
};

let held_req = Rc::new(RefCell::new(None));

{
let held_req = Rc::clone(&held_req);
let srv = init_service(App::new().app_data(app_data).service(web::resource("/").to(
move |req: HttpRequest| {
*held_req.borrow_mut() = Some(req.clone());
HttpResponse::Ok()
},
)))
.await;

let resp = call_service(&srv, TestRequest::default().to_request()).await;
assert_eq!(resp.status(), StatusCode::OK);

drop(resp);
drop(srv);
}

assert!(weak_data.upgrade().is_some());
drop(held_req.borrow_mut().take());
assert!(weak_data.upgrade().is_none());
}

#[actix_rt::test]
async fn test_data() {
let srv = init_service(App::new().app_data(10usize).service(web::resource("/").to(
Expand Down
79 changes: 77 additions & 2 deletions actix-web/tests/test_httpserver.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
#[cfg(feature = "openssl")]
extern crate tls_openssl as openssl;

use std::{sync::mpsc, thread, time::Duration};
use std::{
convert::Infallible,
sync::{mpsc, Arc},
thread,
time::Duration,
};

use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer};
use actix_web::{rt::time::sleep, web, App, HttpRequest, HttpResponse, HttpServer};
use bytes::Bytes;
use futures_util::stream;

#[actix_rt::test]
async fn test_start() {
Expand Down Expand Up @@ -74,6 +81,74 @@ async fn test_start() {
srv.stop(false).await;
}

#[actix_rt::test]
async fn test_app_data_dropped_after_graceful_shutdown_with_slow_request() {
struct State {
_data: Arc<String>,
}

async fn echo(_body: web::Json<String>) -> HttpResponse {
HttpResponse::Ok().finish()
}

let (weak_data, app_data) = {
let data = Arc::new("data".to_owned());
(Arc::downgrade(&data), web::Data::new(State { _data: data }))
};

let server = HttpServer::new(move || {
App::new()
.app_data(app_data.clone())
.service(web::resource("/echo").route(web::post().to(echo)))
})
.workers(1)
.shutdown_timeout(1)
.bind(("127.0.0.1", 0))
.unwrap();

let addr = server.addrs()[0];
let server = server.run();
let server_handle = server.handle();

let send_request = async move {
sleep(Duration::from_millis(100)).await;

let slow_body = stream::unfold(0, |idx| async move {
if idx < 8 {
sleep(Duration::from_millis(200)).await;
Some((Ok::<_, Infallible>(Bytes::from_static(b" ")), idx + 1))
} else {
None
}
});

let client = awc::Client::default();
let _ = client
.post(format!("http://{addr}/echo"))
.insert_header(("content-type", "application/json"))
.send_stream(slow_body)
.await;
};

let graceful_stop = async move {
sleep(Duration::from_millis(300)).await;
server_handle.stop(true).await;
};

let (server_res, (), ()) = tokio::join!(server, send_request, graceful_stop);
server_res.unwrap();

for _ in 0..20 {
sleep(Duration::from_millis(100)).await;

if weak_data.upgrade().is_none() {
return;
}
}

panic!("app data still referenced after graceful shutdown");
}

#[cfg(feature = "openssl")]
fn ssl_acceptor() -> openssl::ssl::SslAcceptorBuilder {
use openssl::{
Expand Down
Loading