Stream(流)

如果说 Future 是“一次性买卖”,那么 Stream 就是“订阅服务”。Stream 是一系列异步事件的序列。它会随着时间的推移发出多个值,直到任务完成或发生错误。

graph LR
    A((生成数据)) --> B@{ shape: das, label: "Stream" }
    B -->|数据事件 Data| C([监听器 Listener])
    B -->|错误事件 Error| C
    B -->|完成事件 Done| C
    C --> D{处理数据}

Stream 是异步版的Iterable,借助下表我们可以快速掌握其核心差异。

单个数据多个数据
同步T (普通变量)Iterable<T>
异步Future<T>Stream<T>

Iterable 与 Stream 的对称性:

  • 数据结构对称性:Iterable 允许你遍历一组已经存在的数据;Stream 允许你“遍历”一组未来才会到达的数据。
  • 处理逻辑对称性:在 Iterable 上用的 mapwhereexpand,在 Stream 中几乎完全通用。
  • 语法对称性:Iterable 对应 sync* / yield,Stream 对应 async* / yield

示例

下面的示例演示了如何Stream API创建和监听Stream。

// ex731.dart
import 'dart:async';
import 'dart:math';

void main() async {
  final watch = Stopwatch()..start();
  var count = 0;
  void myprint(o) =>
      print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');

  randomStream(5).listen(myprint, onDone: () => myprint('done')); // 5

  /** An example output:
out0 119ms: 1
out1 212ms: 4
out2 312ms: 2
out3 415ms: 2
out4 514ms: 4
out5 518ms: done
   */
}

// 1
Stream<int> randomStream(int max) {
  var controller = StreamController<int>(); // 2
  // Generate random int asynchronously
  final rand = Random();
  for (var i = 1; i <= max; i++) {
    Future.delayed(Duration(milliseconds: 100 * i), () {
      controller.sink.add(rand.nextInt(max)); // 3
      if (i == max) {
        controller.close(); // 4
      }
    });
  }
  return controller.stream; // 2a
}
  1. randomStream(max) 是数据的生产者,它每间隔100毫秒产生一个[0, max)区间内的随机整数(异步执行);
  2. 使用StreamController来控制数据的生成,在函数的最后返回 controller.stream
  3. 调用 controller.sink.add(event)(等同于controller.add(event))将数据放进controller.stream,由于该操作时延迟且异步进行的,当randomStream函数返回之后才会执行;
  4. 生成max个数据后关闭Stream;
  5. 调用 Stream.listen(onData,{...})方法监听Stream并消费(处理)数据。

Iterable 生成器( sync*-yield

在介绍异步生成器 async*-yield 之前,先来看下Iterable生成器(使用 sync*-yield 创建 Iterable 对象),请对两者进行对比。

// ex732.dart
void main() {
  print(countList(5)); // 1 Output: (1, 2, 3, 4, 5)
  print(countList2(5)); // 2 Output: [1, 2, 3, 4, 5]
}

Iterable<int> countList(int max) sync* {
  for (int n = 1; n <= max; n++) {
    yield n;
  }
}

Iterable<int> countList2(int max) => [for (int n = 1; n <= max; n++) n];

async* / await for

与 Future 的 async / await 语法对应,Stream提供了async* / await for语法;使用该语法改写示例ex731,我们得到了一个更为现代与简洁的版本:

// ex733.dart
import 'dart:math';

void main() async { // 4
  final watch = Stopwatch()..start();
  var count = 0;
  void myprint(o) =>
      print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');

  // 3
  await for (var n in randomStream(5)) {
    myprint(n);
  }
  myprint('done');
}

// 1
Stream<int> randomStream(int max) async* {
  final rand = Random();
  for (var i = 0; i < max; i++) {
    await Future.delayed(Duration(milliseconds: 100));
    yield rand.nextInt(max); // 2
  }
}
  1. async* 写在函数签名的最后,用以标记该函数是一个异步生成器(返回一个 Stream 对象);
  2. 使用 yield 将生成的数据放进Stream,此类函数不用return
  3. 使用 await for...in... 监听并消费Stream里的数据;
  4. 使用 await 关键字的外围函数必须带 async 标记;

广播 (Broadcast)

以上介绍的是 Stream 的“单订阅”(Single-subscription) 模式。然而,Stream 还有另一种更为灵活的形态,即广播模式(Broadcast)。我们先来看下两种模式的区别。

特性单订阅广播
监听数量只能被 listen 一次,第二次报错可以有无限个监听者同时监听
缓存机制监听前的事件可能会在缓冲区等待事件是即时的,错过监听时间点就拿不到旧数据
典型用途文件读取、网络请求(端对端数据流)状态管理、传感器数据、UI 事件通知

请看下面的示例:

// ex734.dart
import 'dart:async';

void main() {
  final watch = Stopwatch()..start();
  var count = 0;
  void myprint(o) =>
      print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');

  var datas = countStream(5);
  datas.listen((d) => myprint('sub1 $d')); // 1

  // 0.2s later
  Future.delayed(
    Duration(milliseconds: 200),
    () => datas.listen((d) => myprint('sub2 $d')), // 2
  );

  /* An example output:
out0 128ms: sub1 1
out1 210ms: sub1 2
out2 323ms: sub1 3
out3 323ms: sub2 3
out4 415ms: sub1 4
out5 415ms: sub2 4
out6 510ms: sub1 5
out7 510ms: sub2 5
   */
}

Stream<int> countStream(int max) {
  var controller = StreamController<int>.broadcast(); // 1
  for (var n = 1; n <= max; n++) {
    Future.delayed(Duration(milliseconds: 100 * n), () => controller.add(n));
  }
  return controller.stream;
}
  1. StreamController.broadcast 构造函数返回一个广播Stream;
  2. 第一个消费者(sub1)监听并消费数据;
  3. 0.2秒之后,第二个消费者(sub2)监听并消费数据;注意观察程序输出,对比两个消费者各自监听到的数据。

可以用 async*-yield 语法改写此示例的 countStream 函数如下:

Stream<int> countStream(int max) async* {
  for (var n = 1; n <= max; n++) {
    await Future.delayed(Duration(milliseconds: 100));
    yield n;
  }
}

改写后的countStream 函数更加简洁。相应地 var datas = countStream(5); 一行需改写为:

    var datas = countStream(5).asBroadcastStream(); // 1 

Stream 的常用方法

订阅与生命周期

  • listen(): 最核心的方法。手动订阅流,处理 onData, onError, onDone 事件。
  • await for: 异步迭代器。像遍历 List 一样消费流(最简洁的消费方式)。
  • asBroadcastStream(): 将单订阅流转换为广播流(多订阅流)。
  • timeout(duration): 设置超时时间,若规定时间内无事件则触发错误或执行回调。

转换

这些方法返回一个新的 Stream,且大多是惰性的。

  • map<T>(convert): 将流中的每个元素同步转换为另一种类型。
  • asyncMap<T>(convert): 异步转换。等待 Future 完成后再发射下一个元素。
  • expand(convert): 将一个元素转换为多个元素,并平铺(Flatten)到流中。
  • asyncExpand(convert): 将每个元素转换为一个新的 Stream 并按顺序连接。
  • transform(transformer): 使用 StreamTransformer 进行自定义的复杂转换(如解密、编码)。

过滤

  • where(test): 筛选掉不满足条件的元素。
  • distinct([equals]): 过滤掉与前一个元素连续重复的元素。
  • take(count): 只获取前 count 个元素。
  • takeWhile(test): 持续获取,直到条件不再满足。
  • skip(count): 跳过前 count 个元素。
  • skipWhile(test): 跳过直到条件不再满足,之后开始获取。
  • handleError(onError): 捕获并处理流中的错误,防止流中断。

聚合与检索

  • toList(): 将流中所有元素收集到一个 List 中。
  • toSet(): 将流中所有元素收集到一个 Set 中。
  • fold<T>(initial, combine): 提供初始值,将元素累加合并为单一结果。
  • reduce(combine): 无初始值,两两合并流中的元素(要求非空)。
  • join([separator]): 将元素转为字符串并用分隔符连接。
  • first / last / single: 获取流中第一个、最后一个或唯一一个元素。
  • any(test) / every(test): 检查是否至少一个/全部元素满足条件。
  • contains(element): 检查流中是否包含特定元素。

一个简单示例:

// ex736.dart
void main() async {
  var nums = Stream.fromIterable([for (var i = 0; i < 10; i++) i]);
  print(await nums.join("-")); // Output: 0-1-2-3-4-5-6-7-8-9
  print(await nums.map((i) => i * i).toList()); // 1*1,2*2,...
  print(await nums.where((i) => i % 2 == 0).toList()); // even numbers
  print(await nums.fold<int>(0, (old, i) => old + i)); // 1+2+...
  print(await nums.contains(1)); // Output: true

  print(
    await nums
        .take(5)
        .map((i) => i * 3 + 1)
        .expand((i) => [i, i * i, i * i * i])
        .where((i) => i < 200)
        .toList(),
  );
  // Output: [1, 1, 1, 4, 16, 64, 7, 49, 10, 100, 13, 169]
}

yield*(yield-each)

yield* (yield-each)是Dart 非常强大的异步生成器。如果说 yield 是插入一个元素,那么 yield* 就是插入一个 Stream 中的所有元素。yield* 后接 一个 Stream,其作用是暂停当前的生成器,转而监听后面的 Stream,并将其产生的所有事件逐一转发,直到那个 Stream 结束,才继续执行当前生成器接下来的代码。

先来看一个使用 yield* 的例子:

// ex737.dart
void main() async {
  var nums = combinedStream(Stream.fromIterable([1, 2, 3]));
  print(await nums.toList()); // Output: [1, 2, 3, 0]
}

Stream<int> combinedStream(Stream<int> prev) async* {
  // 1
  await for (final n in prev) {
    yield n; 
  }
  yield 0; // 2
}

combinedStream 函数使用 await forprev 中的元素一个个写进目标stream后,然后追加一条数据(0)。用 yield* 改写该函数如下:

// ex738.dart
Stream<int> combinedStream(Stream<int> prev) async* {
  yield* prev; // 1
  yield 0; // 2
}

改写后的函数看起来简洁多了。

异步递归

文件夹是一个典型的递归结构(文件夹内包含文件夹)。使用 yield* 可以让你像写同步递归一样,源源不断地产出深度嵌套的文件。

// ex739.dart
import 'dart:io';

void main() {
  final dir = Directory('.');
  traversal(dir).listen((f) => print(f.path));
}

Stream<File> traversal(Directory dir) async* {
  final entities = await dir.list().toList(); // 1
  for (var entity in entities) {
    if (entity is File) {
      yield entity; // 2
    } else if (entity is Directory) {
      yield* traversal(entity); // 3
    }
  }
}
  1. 获取 dir 的子目录及文件;
  2. 如果 entity 是文件(File),就直接产出(yield);
  3. 递归调用并将子目录流的所有结果“委派”给当前流(yield*)。

如果不用 yield*,就必须使用 await for 循环去解包子流,代码会变得冗长且难以维护。 在处理树形结构的数据时,yield* 是实现异步递归的唯一标准方式。

yieldyield* 对比

特性yieldyield*
右侧对象单个数据(如 int, String一个 Stream<T>
行为产出一个值后继续代理另一个Stream,直到其结束才继续
性能逐个处理更加高效(Dart 引擎对其有内部优化)

关键特性与陷阱

执行顺序

yield* 会挂起(suspend)当前生成器的执行。只有当 yield* 后面的Stream关闭(close)后,当前函数才会执行下一行。

错误传递

如果 yield* 监听的Stream抛出了异常,这个异常会直接传递到当前Stream中,除非你在外层用了 try-catch

广播流

如果 yield* 后面是一个广播流,那么它只会转发从“订阅那一刻”开始的数据。