diff --git a/.gitignore b/.gitignore index 78e17712..2483001e 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dev/pki/* !dev/pki/pki **/mitmproxy/ dev/logs/ +.zed/ diff --git a/broker/src/serve_sockets.rs b/broker/src/serve_sockets.rs index 381755e5..51caae93 100644 --- a/broker/src/serve_sockets.rs +++ b/broker/src/serve_sockets.rs @@ -100,6 +100,12 @@ async fn connect_socket( } } + let tm = state.task_manager.clone(); + // Drop the task if any side that connected to it loses interest (aka drops the connection) + let _guard = DropGuard::new(move || { + // We don't care if the task expired by now + _ = tm.remove(&task_id); + }); let Some(conn) = parts.extensions.remove::() else { warn!("Failed to upgrade connection: {:#?}", parts.headers); return Err(StatusCode::UPGRADE_REQUIRED); @@ -117,8 +123,6 @@ async fn connect_socket( debug!("Socket expired because nobody connected"); return Err(StatusCode::GONE); }; - // We don't care if the task expired by now - _ = state.task_manager.remove(&task_id); tokio::spawn(async move { let (socket1, socket2) = match tokio::try_join!(conn, other_con) { Ok(sockets) => sockets, @@ -139,3 +143,21 @@ async fn connect_socket( (header::CONNECTION, HeaderValue::from_static("upgrade")) ], StatusCode::SWITCHING_PROTOCOLS).into_response()) } + +struct DropGuard { + on_drop: Option, +} + +impl DropGuard { + fn new(on_drop: F) -> Self { + Self { on_drop: Some(on_drop) } + } +} + +impl Drop for DropGuard { + fn drop(&mut self) { + if let Some(f) = self.on_drop.take() { + f(); + } + } +} diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index fa16c583..d0a6bed7 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -1,7 +1,7 @@ version: "3.7" services: vault: - image: hashicorp/vault + image: hashicorp/vault:1.21 ports: - 127.0.0.1:8200:8200 environment: