Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 82 additions & 27 deletions futures-util/src/future/join_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use core::pin::Pin;
use core::task::{Context, Poll};

use super::{assert_future, MaybeDone};
use crate::stream::{Collect, FuturesOrdered, StreamExt};

fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
Expand All @@ -19,13 +20,30 @@ fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
}

/// Future for the [`join_all`] function.
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct JoinAll<F>
where
F: Future,
{
elems: Pin<Box<[MaybeDone<F>]>>,
pin_project_lite::pin_project! {
/// Future for the [`join_all`] function.
pub struct JoinAll<F>
where
F: Future,
{
#[pin]
kind: JoinAllKind<F>,
}
}

const SMALL: usize = 30;

pin_project_lite::pin_project! {
#[project = JoinAllKindProj]
pub enum JoinAllKind<F>
where
F: Future,
{
Small { elems: Pin<Box<[MaybeDone<F>]>> },
#[cfg(not(futures_no_atomic_cas))]
Big { #[pin] fut: Collect<FuturesOrdered<F>, Vec<F::Output>> },
}
}
Comment thread
ibraheemdev marked this conversation as resolved.
Outdated

impl<F> fmt::Debug for JoinAll<F>
Expand All @@ -34,7 +52,13 @@ where
F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinAll").field("elems", &self.elems).finish()
match self.kind {
JoinAllKind::Small { ref elems } => {
f.debug_struct("JoinAll").field("elems", elems).finish()
}
#[cfg(not(futures_no_atomic_cas))]
JoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
}
}
}

Expand All @@ -50,10 +74,7 @@ where
///
/// # See Also
///
/// This is purposefully a very simple API for basic use-cases. In a lot of
/// cases you will want to use the more powerful
/// [`FuturesOrdered`][crate::stream::FuturesOrdered] APIs, or, if order does
/// not matter, [`FuturesUnordered`][crate::stream::FuturesUnordered].
/// `join_all` will switch to the more powerful [`FuturesOrdered`] if the number of futures is large for performance reasons. If the return order does not matter and you are polling many futures, you should look into [`FuturesUnordered`][crate::stream::FuturesUnordered].
///
/// Some examples for additional functionality provided by these are:
///
Expand All @@ -75,13 +96,40 @@ where
/// assert_eq!(join_all(futures).await, [1, 2, 3]);
/// # });
/// ```
pub fn join_all<I>(i: I) -> JoinAll<I::Item>
pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
where
I: IntoIterator,
I::Item: Future,
{
let elems: Box<[_]> = i.into_iter().map(MaybeDone::Future).collect();
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { elems: elems.into() })
let iter = iter.into_iter();
let kind = match iter.size_hint().1 {
None => join_all_big(iter),
Some(max) => {
if max <= SMALL {
let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
JoinAllKind::Small { elems }
} else {
join_all_big(iter)
}
}
};
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}

fn join_all_big<I>(iter: I) -> JoinAllKind<I::Item>
where
I: Iterator,
I::Item: Future,
{
#[cfg(not(futures_no_atomic_cas))]
{
return JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() };
}
#[cfg(futures_no_atomic_cas)]
{
let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
JoinAllKind::Small { elems }
}
}

impl<F> Future for JoinAll<F>
Expand All @@ -90,21 +138,28 @@ where
{
type Output = Vec<F::Output>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut all_done = true;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().kind.project() {
JoinAllKindProj::Small { elems } => {
let mut all_done = true;

for elem in iter_pin_mut(self.elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}
for elem in iter_pin_mut(elems.as_mut()) {
if elem.poll(cx).is_pending() {
all_done = false;
}
}

if all_done {
let mut elems = mem::replace(&mut self.elems, Box::pin([]));
let result = iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(result)
} else {
Poll::Pending
if all_done {
let mut elems = mem::replace(elems, Box::pin([]));
let result =
iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
Poll::Ready(result)
} else {
Poll::Pending
}
}
#[cfg(not(futures_no_atomic_cas))]
JoinAllKindProj::Big { fut } => fut.poll(cx),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub trait SinkExt<Item>: Sink<Item> {
/// This future will drive the stream to keep producing items until it is
/// exhausted, sending each item to the sink. It will complete once both the
/// stream is exhausted, the sink has received all items, and the sink has
/// been flushed. Note that the sink is **not** closed. If the stream produces
/// been flushed. Note that the sink is **not** closed. If the stream produces
/// an error, that error will be returned by this future without flushing the sink.
///
/// Doing `sink.send_all(stream)` is roughly equivalent to
Expand Down