当前位置: 首页> 娱乐> 八卦 > 中装建设集团有限公司股票_品牌设计与策划_深圳网络推广建站_厦门人才网唯一官方网站

中装建设集团有限公司股票_品牌设计与策划_深圳网络推广建站_厦门人才网唯一官方网站

时间:2025/7/29 10:58:37来源:https://blog.csdn.net/weixin_43114209/article/details/145954480 浏览次数:0次
中装建设集团有限公司股票_品牌设计与策划_深圳网络推广建站_厦门人才网唯一官方网站

1. Streams:异步数据流

1.1 Streams 与 Iterator 的异同

Rust 的 Iterator 是同步的,通过 next() 方法逐个获取数据。而 Streamasync 版本的 Iterator,它使用 next().await 来获取数据项。

示例:将 Iterator 转换为 Stream

use trpl::{stream_from_iter, StreamExt};let numbers = vec![1, 2, 3, 4, 5];
let stream = stream_from_iter(numbers.into_iter());while let Some(value) = stream.next().await {println!("Received: {}", value);
}

此示例中:

  • stream_from_iter()Iterator 转换为 Stream
  • 通过 stream.next().await 按顺序异步获取数据项。

2. 组合 Streams

2.1 构建 Stream 处理异步消息

在实际应用中,我们经常需要从网络、数据库或消息队列中接收数据。这时,可以用 trpl::channel 创建 Stream 来异步处理数据。

use trpl::{channel, ReceiverStream};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = channel();spawn_task(async move {for letter in "abcdefghij".chars() {tx.send(letter.to_string()).await.unwrap();}});ReceiverStream::new(rx)
}while let Some(msg) = get_messages().next().await {println!("Message: {}", msg);
}
  • get_messages 返回一个 Stream,每次 next().await 便能获取新的数据项。
  • 通过 spawn_task 启动异步任务,定期向 Stream 发送数据。

3. 控制 Stream 速率与超时

3.1 timeout:为 Stream 设置超时

当处理外部数据时,我们可能希望对每个 Stream 数据项设定超时时间,以避免某个数据源长时间无响应。

use trpl::{StreamExt, sleep, Duration};let messages = get_messages().timeout(Duration::from_millis(200));while let Some(result) = messages.next().await {match result {Ok(msg) => println!("Message: {}", msg),Err(_) => println!("Timeout occurred!"),}
}
  • timeout() 方法为 Stream 每个数据项设置超时时间。
  • 当数据在 200ms 内到达时,正常输出,否则触发超时逻辑。

3.2 throttle:限制 Stream 处理速率

有时,我们希望 Stream 以固定的速率生成数据,而不是尽可能快地处理。

use trpl::StreamExt;let throttled_messages = get_messages().throttle(Duration::from_millis(100));
  • throttle() 方法限制 Stream 处理频率,每 100ms 处理一个数据项。
  • 避免 Stream 过快地填充下游处理逻辑。

4. 合并多个 Streams

4.1 merge:合并多个 Stream

在某些情况下,我们可能有多个 Stream 数据源,例如:

  • 一个 Stream 处理用户输入
  • 一个 Stream 处理传感器数据

可以使用 merge() 将它们合并到一个 Stream,以便统一处理:

let messages = get_messages().timeout(Duration::from_millis(200));
let intervals = get_intervals().map(|i| format!("Interval: {}", i));let merged = messages.merge(intervals);while let Some(event) = merged.next().await {println!("Received: {}", event);
}
  • messages 处理异步消息,带 200ms 超时。
  • intervals 生成时间间隔数据(Interval: 1, Interval: 2, …)。
  • merge() 方法合并两个 Stream,同时接收消息和时间间隔。

4.2 take:限制 Stream 处理的项数

有时,我们希望 Stream 只处理有限数量的数据项。例如,限制为 10 条:

let limited_stream = merged.take(10);

这样,merged 只会输出 10 条数据,然后 Stream 自动结束。

5. 处理 Stream 可能的错误

在异步系统中,消息通道的 send 操作可能会失败,例如 tx.send(msg).await.unwrap();

如果通道关闭,send 会返回 Err。因此,我们应当合理地处理这些错误,而不是 unwrap()

if let Err(e) = tx.send(msg).await {println!("Error sending message: {:?}", e);break;
}

在真实应用中,应当根据错误类型采取适当的恢复策略,而不是直接 break 退出。


6. 总结

  • Stream 适用于异步数据流,类似 Iterator,但支持 await
  • timeout 可为 Stream 每个数据项设置超时时间。
  • throttle 限制 Stream 生成数据的速率。
  • merge 将多个 Stream 合并,便于处理多个数据源。
  • take 限制 Stream 处理的最大数据项数。
  • 合理处理 send 失败,避免异步任务意外崩溃。

🚀 适用场景:

  • 处理 WebSocketKafka数据库监听流式数据
  • 限流 API 调用,避免发送太多请求。
  • 处理用户事件流,如 键盘输入、鼠标点击

通过 Stream 及其扩展方法,我们可以轻松构建高效的异步数据处理系统。Rust 提供了强大的 async 生态,让我们能更轻松地编写安全、高性能的并发代码!

关键字:中装建设集团有限公司股票_品牌设计与策划_深圳网络推广建站_厦门人才网唯一官方网站

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: