Module core::stream[][src]

🔬 This is a nightly-only experimental API. (async_stream #79024)
Expand description

可组合的异步迭代。

如果 futures 是异步值,则流是异步迭代器。 如果您发现自己具有某种异步集合,并且需要对所述集合的元素执行操作,那么您会很快遇到 ‘streams’。 流在惯用的异步 Rust 代码中大量使用,因此值得熟悉它们。

在解释更多内容之前,让我们讨论一下该模块的结构:

Organization

该模块主要是按类型来组织的:

  • Traits 是核心部分: 这些 traits 定义了存在哪种类型的流以及可以对其进行处理。这些 traits 的方法值得投入一些额外的学习时间。
  • 函数提供了一些有用的方法来创建一些基本流。
  • 结构体通常是该模块的 traits 上各种方法的返回类型。通常,您将需要查看创建 struct 的方法,而不是 struct 本身。 有关原因的更多详细信息,请参见 实现流

就是这样! 让我们深入研究流。

Stream

该模块的核心是 Stream trait。Stream 的核心如下所示:

trait Stream {
    type Item;
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}
Run

Iterator 不同,Stream 区分了实现 Stream 时使用的 poll_next 方法和使用流时使用的 (to-be-implemented) next 方法。

Stream 的使用者只需要考虑 next,当调用 next 时,它会返回 future 并产生 Option<Stream::Item>

只要有元素,next 返回的 future 就会产生 Some(Item),一旦所有元素用完,就会产生 None 来指示迭代已完成。 如果我们正在等待异步处理,则 future 将等待,直到流准备再次屈服。

各个流可能选择恢复迭代,因此再次调用 next 可能会或可能最终不会在某个时候再次产生 Some(Item)

Stream 的完整定义还包括许多其他方法,但是它们是默认方法,基于 poll_next 构建,因此您可以免费获得它们。

实现流

创建自己的流涉及两个步骤: 创建一个 struct 来保存流的状态,然后为该 struct 实现 Stream

让我们创建一个名为 Counter 的流,它从 15 计数:

#![feature(async_stream)]

// 首先,结构体:

/// 从一数到五的流
struct Counter {
    count: usize,
}

// 我们希望计数从一开始,所以让我们添加一个 new() 方法来提供帮助。
// 这不是严格必要的,但很方便。
// 请注意,我们将 `count` 从零开始,我们将在下面的 `poll_next () ` 的实现中看到其原因。
impl Counter {
    fn new() -> Counter {
        Counter { count: 0 }
    }
}

// 然后,我们为 `Counter` 实现 `Stream`:

impl Stream for Counter {
    // 我们将使用 usize 进行计数
    type Item = usize;

    // poll_next() 是唯一需要的方法
    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // 增加我们的数量。这就是为什么我们从零开始。
        self.count += 1;

        // 检查我们是否已经完成计数。
        if self.count < 6 {
            Poll::Ready(Some(self.count))
        } else {
            Poll::Ready(None)
        }
    }
}
Run

Laziness

流是懒惰的。这意味着仅仅创建一个流并不能做很多事情。除非您调用 next,否则什么都不会发生。 当仅出于其副作用创建流时,这有时会引起混乱。 编译器将警告我们这种行为:

warning: unused result that must be used: streams do nothing unless polled

Traits

StreamExperimental

用于处理异步迭代器的接口。