diff --git a/crates/factor-outbound-pg/src/client.rs b/crates/factor-outbound-pg/src/client.rs index c5620dac3..36faa57cc 100644 --- a/crates/factor-outbound-pg/src/client.rs +++ b/crates/factor-outbound-pg/src/client.rs @@ -283,31 +283,42 @@ impl Client for PooledTokioClient { tokio::spawn(async move { loop { - let Some(row) = rows.next().await else { - _ = err_tx.send(Ok(())); - return; - }; - match row { - Ok(row) => { - let byte_count = row.iter().map(|v| v.memory_size()).sum::(); - if byte_count > max_result_bytes { - _ = err_tx.send(Err(v4::Error::QueryFailed(v4::QueryError::Text( - format!("query result exceeds limit of {max_result_bytes} bytes"), - )))); - return; - } - - if let Err(e) = rows_tx.send(row).await { - _ = err_tx.send(Err(v4::Error::QueryFailed(v4::QueryError::Text( - format!("async error: {e}"), - )))); + tokio::select! { + biased; + // Exit immediately if the consumer (rows_rx) has been dropped, + // preventing this task from leaking. + _ = rows_tx.closed() => { + _ = err_tx.send(Ok(())); + return; + } + row = rows.next() => { + let Some(row) = row else { + _ = err_tx.send(Ok(())); return; + }; + match row { + Ok(row) => { + let byte_count = row.iter().map(|v| v.memory_size()).sum::(); + if byte_count > max_result_bytes { + _ = err_tx.send(Err(v4::Error::QueryFailed(v4::QueryError::Text( + format!("query result exceeds limit of {max_result_bytes} bytes"), + )))); + return; + } + + if let Err(e) = rows_tx.send(row).await { + _ = err_tx.send(Err(v4::Error::QueryFailed(v4::QueryError::Text( + format!("async error: {e}"), + )))); + return; + } + } + Err(e) => { + _ = err_tx.send(Err(e)); + return; + } } } - Err(e) => { - _ = err_tx.send(Err(e)); - return; - } } } });