1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
#![allow(unused_imports, unused_macros)] // 由宏使用的项
use crate::cell::UnsafeCell;
use crate::future::{poll_fn, Future};
use crate::mem;
use crate::pin::Pin;
use crate::task::{ready, Context, Poll};
/// 同时轮询多个 futures,完成后,返回所有结果的元组。
///
/// 虽然 `join!(a, b).await` 类似于 `(a.await, b.await)`,但 `join!` 会同时轮询两个 futures,因此效率更高。
///
///
/// # Examples
///
/// ```
/// #![feature(future_join)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
///
/// # let _ = async {
/// let x = join!(one(), two()).await;
/// assert_eq!(x, (1, 2));
/// # };
/// ```
///
/// `join!` 的参数是可变的,因此您可以传递任意数量的 futures:
///
/// ```
/// #![feature(future_join)]
///
/// use std::future::join;
///
/// async fn one() -> usize { 1 }
/// async fn two() -> usize { 2 }
/// async fn three() -> usize { 3 }
///
/// # let _ = async {
/// let x = join!(one(), two(), three()).await;
/// assert_eq!(x, (1, 2, 3));
/// # };
/// ```
///
#[unstable(feature = "future_join", issue = "91642")]
pub macro join( $($fut:expr),+ $(,)? ) {
// 通过内部宏传递信息,不泄露实现细节。
join_internal! {
current_position: []
futures_and_positions: []
munching: [ $($fut)+ ]
}
}
// FIXME(danielhenrymantilla): 私有宏应该不需要稳定性保证。
#[unstable(feature = "future_join", issue = "91642")]
/// 为了能够*命名*元组中的第 `i` 个 future (假设我们想要第 .4 个),将使用以下技巧: `let (_, , , _, it, ..) = tuple;` 为了做到这一点,我们需要为每个第 `i` 个 fut 生成一个 `i` 长的重复 `_`。
/// 因此,递归 muncher 方法。
///
///
macro join_internal {
// 递归步骤:映射每个 future 及其 "position" (下划线计数)。
(
// 为每个已扩展的 future 累积一个 token: "_ _ _"。
current_position: [
$($underscores:tt)*
]
// 累积 Futures 及其在元组中的位置: `_0th () _1st ( _ )…`。
futures_and_positions: [
$($acc:tt)*
]
// Munch 一个 future。
munching: [
$current:tt
$($rest:tt)*
]
) => (
join_internal! {
current_position: [
$($underscores)*
_
]
futures_and_positions: [
$($acc)*
$current ( $($underscores)* )
]
munching: [
$($rest)*
]
}
),
// 递归结束:生成输出 future。
(
current_position: $_:tt
futures_and_positions: [
$(
$fut_expr:tt ( $($pos:tt)* )
)*
]
// 没有什么可 munch 的。
munching: []
) => (
match ( $( MaybeDone::Future($fut_expr), )* ) { futures => async {
let mut futures = futures;
// SAFETY: 这是 `pin_mut!`。
let mut futures = unsafe { Pin::new_unchecked(&mut futures) };
poll_fn(move |cx| {
let mut done = true;
// 对于每个 `fut`,将其固定并轮询它。
$(
// SAFETY: 固定推断 (pinning projection)
let fut = unsafe {
futures.as_mut().map_unchecked_mut(|it| {
let ( $($pos,)* fut, .. ) = it;
fut
})
};
// 尽管 `let () = ready!(fut.poll(cx));` 这样做有多么诱人,但这样做会破坏 `join!` 的要点: 开始急切地轮询所有 futures,以允许并行等待。
//
//
done &= fut.poll(cx).is_ready();
)*
if !done {
return Poll::Pending;
}
// 一切就绪; 是时候提取所有输出了。
// SAFETY: `.take_output()` 不会破坏该 `fut` 的 `Pin` 不变量。
let futures = unsafe {
futures.as_mut().get_unchecked_mut()
};
Poll::Ready(
($(
{
let ( $($pos,)* fut, .. ) = &mut *futures;
fut.take_output().unwrap()
}
),*) // <- 没有尾随逗号,因为我们不需要一元组。
)
}).await
}}
),
}
/// Future 使用 `join!` 存储它的输出,以供以后使用,并且在 ready 后轮询时不会发生 panic。
///
///
/// 这种类型在私有模块中是公共的,供宏使用。
#[allow(missing_debug_implementations)]
#[unstable(feature = "future_join", issue = "91642")]
pub enum MaybeDone<F: Future> {
Future(F),
Done(F::Output),
Taken,
}
#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> MaybeDone<F> {
pub fn take_output(&mut self) -> Option<F::Output> {
match *self {
MaybeDone::Done(_) => match mem::replace(self, Self::Taken) {
MaybeDone::Done(val) => Some(val),
_ => unreachable!(),
},
_ => None,
}
}
}
#[unstable(feature = "future_join", issue = "91642")]
impl<F: Future> Future for MaybeDone<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// SAFETY: `f` 的结构固定
unsafe {
// 不要将符合人体工程学的设计与不安全混为一谈。
match *self.as_mut().get_unchecked_mut() {
MaybeDone::Future(ref mut f) => {
let val = ready!(Pin::new_unchecked(f).poll(cx));
self.set(Self::Done(val));
}
MaybeDone::Done(_) => {}
MaybeDone::Taken => unreachable!(),
}
}
Poll::Ready(())
}
}