Isolate 基础应用篇

按使用场景(是简单的计算还是长期的后台任务)的不同,Isolate有三种主流的创建方式:

  • Isolate.run()(单次轻量):Dart会自动处理 Isolate 的创建、参数传递、结果返回以及最后的销毁
  • Isolate.spawn()(常驻/双向通信):如果你需要一个常驻的后台服务
  • Flutter 特色:compute(),类似Isolate.run(),更加兼容 Flutter 环境

此外,Isolate.spawnUri()从指定的 URI(如本地路径或 HTTP 链接)读取脚本(源代码),并在一个全新的环境中运行它。

Isolate.run()

如下示例中的heavyTask模拟了一个CPU密集型任务,通过 Isolate.run() 启用一个新的 Isolate 对其进行计算。

// ex751.dart
import 'dart:isolate';

Future<void> main() async {
  final result = await Isolate.run(() => heavyTask(100_000_000));
  print('result: $result'); // Output:  99999999
}

int heavyTask(int count) {
  var sum = 0;
  for (int i = 0; i < count; i++) {
    sum += i % 3;
  }
  return sum;
}

Isolate.spawn()

Isolate 之间通过发送消息通信, ReceivePort 和 SendPort 是其通信端口。

  • ReceivePort(收信箱):它是私有的,只有创建它的 Isolate 才能从中读取消息。本质上它是一个 Stream。当消息掉进箱子时,Event Loop 齿轮转动,触发监听(listen)回调。
  • SendPort(投递地址): 它是公开的,可以被复制、传递给其他 Isolate。
graph LR
    subgraph Isolate_A [main Isolate]
        direction TB
        RP[ReceivePort]
        Handler([处理逻辑])
        RP -->|监听| Handler
    end

    subgraph Isolate_B [worker Isolate]
        direction TB
        SP[SendPort]
        Task([计算任务])
        Task -->|结果| SP
    end

    SP -.->|跨越物理边界| RP

    style Isolate_A fill:#f5f5f5,stroke:#333
    style Isolate_B fill:#f5f5f5,stroke:#333
    style RP fill:#FFF9C4,stroke:#FBC02D
    style SP fill:#E1F5FE,stroke:#0288D1

单向通信(“生产-消费”)

在单向通信(“生产-消费”)模式中,Main Isolate 充当“听众” (即数据的消费者),Worker Isolate 充当数据的“生产者”。

sequenceDiagram
	autonumber
    participant M as Main Isolate
    participant W as Worker Isolate
    
    M->>M: 创建 ReceivePort (RP)
    M->>W: Isolate.spawn(entryPoint, RP.sendPort)
    W->>W: 初始化自己的 Event Loop
    W->>M: 通过传入的 SendPort 发回数据
    M->>M: RP.listen 接收并处理结果
// ex752.dart
import 'dart:isolate';
import 'dart:async';

void main() async {
  final watch = Stopwatch()..start();
  var count = 0;
  void myprint(o) =>
      print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');

  myprint('main: ⚙️ 启动任务...');

  final receivePort = ReceivePort(); // 1
  await Isolate.spawn(entryPoint, receivePort.sendPort); // 2

  // 3
  receivePort.listen((message) {
    // 4
    if (message is int) {
      myprint('main: progress $message%');
    } // 4a
    else if (message is String) {
      myprint('main: $message');
      receivePort.close(); // 5
      myprint('main: 🏁 done');
    }
  });

  myprint('main: 🚀 我继续做其它事');

  /* An example output:
out0 0ms: main: ⚙️ 启动任务...
out1 52ms: main: 🚀 我继续做其他事
out2 1039ms: main: progress 20%
out3 2040ms: main: progress 40%
out4 3045ms: main: progress 60%
out5 4051ms: main: progress 80%
out6 5054ms: main: progress 100%
out7 5054ms: main: DONE
out8 5061ms: main: 🏁 done  
   */
}

/// worker 逻辑: 执行任务并不断向 main “汇报”
void entryPoint(SendPort sendPort) async {
  // 模拟耗时任务,每秒发送一次进度
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    sendPort.send(20 * i); // 发送进度
  }
  // 发送最终计算结果
  sendPort.send('DONE');
}
  1. 首先创建一个ReceivePort;
  2. 启动worker Isolate;Isolate.spawn()的第一个入参,是worker的入口函数(entryPoint,代表了worker的待执行任务),第二个入参是传递给 entryPoint 的初始消息(此处是一个SendPort);
  3. receivePort本质上是一个Stream,receivePort.listen()监听worker发来的消息;
  4. 处理worker发来的消息;
  5. receivePort.close() 释放work Isolate相关资源,最终work Isolate终止。

最佳实践Isolate.spawn() 必须指向一个顶级函数或静态方法(如此例中的 entryPoint()),这是为了满足跨线程(Isolate)序列化的要求。

双向通信

我们在理解了单向通信模式之后,就不难理解双向通信模式。main isolate要想发消息给work isolate,必须通过worker的SendPort,因此我们要设法将worker的SendPort传递给main isolate。在entrypoint 函数中,worker调用mainRP.send(...)即可实现这一点(mainRP即main isolate的ReceivePort)。

sequenceDiagram
    autonumber
    participant M as Main Isolate
    participant W as Worker Isolate
    
    Note over M: 准备阶段
    M->>M: 创建 ReceivePort (mainRP)
    M->>W: Isolate.spawn(entryPoint, mainRP.sendPort)
    
    Note over W: Worker 启动
    W->>W: 初始化自己的 Event Loop
    W->>W: 创建自己的 ReceivePort (workerRP)
    W->>M: 发回 workerRP.sendPort (握手)
    
    Note over M: 建立双向连接
    M->>M: 收到并保存 Worker 的 SendPort
    
    rect rgb(232, 245, 233)
    Note over M, W: 交互阶段
    M->>W: 发送任务数据 (Main 发送消息)
    W->>W: 执行密集计算/任务
    W->>M: 通过 mainRP 发回处理结果
    end
    
    M->>M: 后续处理

如下示例由示例ex752改写而成,它演示了两个Isolate间的双向通信,请读者自行分析。

// ex753.dart
import 'dart:isolate';
import 'dart:async';

void main() async {
  final watch = Stopwatch()..start();
  var count = 0;
  String myprintPrefix() => 'out${count++} ${watch.elapsed.inMilliseconds}ms:';
  void myprint(o) => print('${myprintPrefix()} $o');

  myprint('main: ⚙️ 启动任务...');

  final receivePort = ReceivePort(); // 1
  await Isolate.spawn(entryPoint, receivePort.sendPort); // 2

  SendPort? workerSendPort;

  // 3
  receivePort.listen((message) {
    // 4
    if (message case (int x, int y)) {
      myprint('main: f($x)=$y');
    } // 4a
    else if (message is SendPort) {
      workerSendPort = message;
      myprint('main: 🤝 握手成功');
    }
  });

  myprint('main: 🚀 我继续做其它事');

  for (var x = 1; x <= 5; x++) {
    Future.delayed(
      Duration(seconds: x),
      () => workerSendPort?.send((x, myprintPrefix())),
    );
  }

  Future.delayed(Duration(seconds: 6), () => receivePort.close());

  /* An example output:
out0 0ms: main: ⚙️ 启动任务...
out1 76ms: main: 🚀 我继续做其它事
out2 138ms: main: 🤝 握手成功
out3 1089ms: worker: x=1
out4 1195ms: main: f(1)=1
out5 2090ms: worker: x=2
out6 2196ms: main: f(2)=4
out7 3096ms: worker: x=3
out8 3203ms: main: f(3)=9
out9 4097ms: worker: x=4
out10 4201ms: main: f(4)=16
out11 5096ms: worker: x=5
out12 5202ms: main: f(5)=25
   */
}

void entryPoint(SendPort mainSendPort) async {
  final workerReceivePort = ReceivePort();
  mainSendPort.send(workerReceivePort.sendPort);

  workerReceivePort.listen((message) async {
    if (message case (int x, String prefix)) {
      print('$prefix worker: x=$x');
      // Simulate a time-consuming task
      await Future.delayed(Duration(milliseconds: 100));
      final y = x * x;
      mainSendPort.send((x, y));
    }
  });
}

Isolate.spawnUri()

Isolate.spawnUri()Isolate.spawn()类似,区别在于前者的 entryPoint 是由一个脚本文件提供。我们将 ex752 中的 entryPoint 函数稍作改变,然后移至一个文件中:

// A script file for ex754.dart
import 'dart:isolate';

void main(List<String> args, SendPort sendPort) async {
  print('worker: $args');
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    sendPort.send(20 * i); // 发送进度
  }
  // 发送最终计算结果
  sendPort.send('DONE');
}

请注意观察这里的函数及其签名:原来的entryPoint变成了main函数,第二个入参代表初始消息。然后 ex752.dart 剩余的部分也稍作变化:

// ex754.dart
import 'dart:isolate';

void main() async {
  // ... (omitted for brevity)
  
  final receivePort = ReceivePort(); // 1
  var uri = Uri.file('./ex754a.dart');
  await Isolate.spawnUri(uri, ['arg1', 'arg2'], receivePort.sendPort);

  // ... (omitted for brevity)
}

请尝试运行修改后得到的新示例(ex754)并观察程序输出。

常见约束与避坑指南

  • spawnUri() 会重新加载其依赖的所有库。如果外部脚本也依赖了 dart:math,它会在新 Isolate 中拥有一份独立的拷贝。
  • 在 Flutter 环境中,由于打包机制,spawnUri()通常不能直接指向磁盘文件
  • 安全性:虽然 Isolate 之间是隔离的,但在加载外部脚本时仍需确保来源可靠。

Isolate.spawn() VS Isolate.spawnUri()

特性Isolate.spawn()Isolate.spawnUri()
代码来源当前程序中的函数(共享现有代码)外部 URI 指向的文件(独立代码)
启动速度较快(不需要重新解析代码)较慢(需要加载、解析并运行新脚本)
参数限制可以传递大部分不可变对象参数必须是简单的类型(如字符串、列表、SendPort)
内存开销相对较小较大(整个脚本环境需要重新初始化)
典型用途处理当前应用的密集计算动态插件系统、运行不受信任的外部脚本

Flutter 特色:compute()

Flutter 的 compute() 函数在用法上与 Isolate.run() 有些类似。下面是示例 ex751的Flutter/compute 版本。

// ex755.dart
import 'package:flutter/foundation.dart'; // compute

Future<void> main() async {
  final result = await compute(heavyTask, 100_000_000);
  debugPrint('result: $result'); // Output:  99999999
}

int heavyTask(int count) {
  var sum = 0;
  for (int i = 0; i < count; i++) {
    sum += i % 3;
  }
  return sum;
}

compute() 函数第一个入参数是回调函数(callback),第二入参将传递给callback(作为callback的入参)。为保持接口简洁,这里的callback只能有一个入参,如果你要传递多个参数,可以将这些参数打包成Record或Map。

跨 Isolate 传递数据

在 Dart 中,跨 Isolate 传递数据的机制被称为消息传递(Message Passing)。为了保证内存隔离的安全,Dart 限制了可以通过 SendPort 发送的对象类型。

以下是根据 Dart 官方文档整理的跨 Isolate 传递数据类型表:

类别具体数据类型说明
基础类型Null, bool, int, double, String最常用的基础数据,直接支持。
集合类型List, Map, Set容器内的元素也必须是此表中的可传递类型。
二进制数据TypedData (如 Uint8List 等)效率极高,支持通过内存移交(Transfer)机制处理。
通信对象SendPort允许发送一个端口给另一个 Isolate,实现双向通信。
函数顶级函数静态方法必须是全局可见的,不能是捕获了上下文的闭包或实例方法。
系统对象Capability, RegExp, StackTraceDart 内部定义的特殊支持对象。
自定义对象仅限 Isolate.exit() 场景在普通 send 中不支持,但在退出 Isolate 时可移交内存所有权。

注意: 当你发送一个集合(如 List)时,Dart 实际上是在目标 Isolate 中创建了一个深拷贝。这意味着在子 Isolate 中修改该 List,不会影响主 Isolate 中的原始数据。唯一的例外是使用 Isolate.exit()

Reference