Module core::stream [−][src]
Expand description
可组合的异步迭代。
如果 futures 是异步值,则流是异步迭代器。 如果您发现自己具有某种异步集合,并且需要对所述集合的元素执行操作,那么您会很快遇到 ‘streams’。 流在惯用的异步 Rust 代码中大量使用,因此值得熟悉它们。
在解释更多内容之前,让我们讨论一下该模块的结构:
Organization
该模块主要是按类型来组织的:
- Traits 是核心部分: 这些 traits 定义了存在哪种类型的流以及可以对其进行处理。这些 traits 的方法值得投入一些额外的学习时间。
- 函数提供了一些有用的方法来创建一些基本流。
- 结构体通常是该模块的 traits 上各种方法的返回类型。通常,您将需要查看创建
struct
的方法,而不是struct
本身。 有关原因的更多详细信息,请参见 实现流。
就是这样! 让我们深入研究流。
Stream
该模块的核心是 Stream
trait。Stream
的核心如下所示:
trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }Run
与 Iterator
不同,Stream
区分了实现 Stream
时使用的 poll_next
方法和使用流时使用的 (to-be-implemented) next
方法。
Stream
的使用者只需要考虑 next
,当调用 next
时,它会返回 future 并产生 Option<Stream::Item>
。
只要有元素,next
返回的 future 就会产生 Some(Item)
,一旦所有元素用完,就会产生 None
来指示迭代已完成。
如果我们正在等待异步处理,则 future 将等待,直到流准备再次屈服。
各个流可能选择恢复迭代,因此再次调用 next
可能会或可能最终不会在某个时候再次产生 Some(Item)
。
Stream
的完整定义还包括许多其他方法,但是它们是默认方法,基于 poll_next
构建,因此您可以免费获得它们。
实现流
创建自己的流涉及两个步骤: 创建一个 struct
来保存流的状态,然后为该 struct
实现 Stream
。
让我们创建一个名为 Counter
的流,它从 1
到 5
计数:
#![feature(async_stream)] // 首先,结构体: /// 从一数到五的流 struct Counter { count: usize, } // 我们希望计数从一开始,所以让我们添加一个 new() 方法来提供帮助。 // 这不是严格必要的,但很方便。 // 请注意,我们将 `count` 从零开始,我们将在下面的 `poll_next () ` 的实现中看到其原因。 impl Counter { fn new() -> Counter { Counter { count: 0 } } } // 然后,我们为 `Counter` 实现 `Stream`: impl Stream for Counter { // 我们将使用 usize 进行计数 type Item = usize; // poll_next() 是唯一需要的方法 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { // 增加我们的数量。这就是为什么我们从零开始。 self.count += 1; // 检查我们是否已经完成计数。 if self.count < 6 { Poll::Ready(Some(self.count)) } else { Poll::Ready(None) } } }Run
Laziness
流是懒惰的。这意味着仅仅创建一个流并不能做很多事情。除非您调用 next
,否则什么都不会发生。
当仅出于其副作用创建流时,这有时会引起混乱。
编译器将警告我们这种行为:
warning: unused result that must be used: streams do nothing unless polled
Traits
Stream | Experimental 用于处理异步迭代器的接口。 |