Isolate 基础应用篇
按使用场景(是简单的计算还是长期的后台任务)的不同,Isolate有三种主流的创建方式:
Isolate.run()(单次轻量):Dart会自动处理 Isolate 的创建、参数传递、结果返回以及最后的销毁Isolate.spawn()(常驻/双向通信):如果你需要一个常驻的后台服务- Flutter 特色:
compute(),类似Isolate.run(),更加兼容 Flutter 环境
此外,Isolate.spawnUri()从指定的 URI(如本地路径或 HTTP 链接)读取脚本(源代码),并在一个全新的环境中运行它。
Isolate.run()
如下示例中的heavyTask模拟了一个CPU密集型任务,通过 Isolate.run() 启用一个新的 Isolate 对其进行计算。
// ex751.dart
import 'dart:isolate';
Future<void> main() async {
final result = await Isolate.run(() => heavyTask(100_000_000));
print('result: $result'); // Output: 99999999
}
int heavyTask(int count) {
var sum = 0;
for (int i = 0; i < count; i++) {
sum += i % 3;
}
return sum;
}
Isolate.spawn()
Isolate 之间通过发送消息通信, ReceivePort 和 SendPort 是其通信端口。
- ReceivePort(收信箱):它是私有的,只有创建它的 Isolate 才能从中读取消息。本质上它是一个 Stream。当消息掉进箱子时,Event Loop 齿轮转动,触发监听(
listen)回调。 - SendPort(投递地址): 它是公开的,可以被复制、传递给其他 Isolate。
graph LR
subgraph Isolate_A [main Isolate]
direction TB
RP[ReceivePort]
Handler([处理逻辑])
RP -->|监听| Handler
end
subgraph Isolate_B [worker Isolate]
direction TB
SP[SendPort]
Task([计算任务])
Task -->|结果| SP
end
SP -.->|跨越物理边界| RP
style Isolate_A fill:#f5f5f5,stroke:#333
style Isolate_B fill:#f5f5f5,stroke:#333
style RP fill:#FFF9C4,stroke:#FBC02D
style SP fill:#E1F5FE,stroke:#0288D1
单向通信(“生产-消费”)
在单向通信(“生产-消费”)模式中,Main Isolate 充当“听众” (即数据的消费者),Worker Isolate 充当数据的“生产者”。
sequenceDiagram
autonumber
participant M as Main Isolate
participant W as Worker Isolate
M->>M: 创建 ReceivePort (RP)
M->>W: Isolate.spawn(entryPoint, RP.sendPort)
W->>W: 初始化自己的 Event Loop
W->>M: 通过传入的 SendPort 发回数据
M->>M: RP.listen 接收并处理结果
// ex752.dart
import 'dart:isolate';
import 'dart:async';
void main() async {
final watch = Stopwatch()..start();
var count = 0;
void myprint(o) =>
print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');
myprint('main: ⚙️ 启动任务...');
final receivePort = ReceivePort(); // 1
await Isolate.spawn(entryPoint, receivePort.sendPort); // 2
// 3
receivePort.listen((message) {
// 4
if (message is int) {
myprint('main: progress $message%');
} // 4a
else if (message is String) {
myprint('main: $message');
receivePort.close(); // 5
myprint('main: 🏁 done');
}
});
myprint('main: 🚀 我继续做其它事');
/* An example output:
out0 0ms: main: ⚙️ 启动任务...
out1 52ms: main: 🚀 我继续做其他事
out2 1039ms: main: progress 20%
out3 2040ms: main: progress 40%
out4 3045ms: main: progress 60%
out5 4051ms: main: progress 80%
out6 5054ms: main: progress 100%
out7 5054ms: main: DONE
out8 5061ms: main: 🏁 done
*/
}
/// worker 逻辑: 执行任务并不断向 main “汇报”
void entryPoint(SendPort sendPort) async {
// 模拟耗时任务,每秒发送一次进度
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
sendPort.send(20 * i); // 发送进度
}
// 发送最终计算结果
sendPort.send('DONE');
}
- 首先创建一个ReceivePort;
- 启动worker Isolate;
Isolate.spawn()的第一个入参,是worker的入口函数(entryPoint,代表了worker的待执行任务),第二个入参是传递给entryPoint的初始消息(此处是一个SendPort); receivePort本质上是一个Stream,receivePort.listen()监听worker发来的消息;- 处理worker发来的消息;
receivePort.close()释放work Isolate相关资源,最终work Isolate终止。
最佳实践: Isolate.spawn() 必须指向一个顶级函数或静态方法(如此例中的 entryPoint()),这是为了满足跨线程(Isolate)序列化的要求。
双向通信
我们在理解了单向通信模式之后,就不难理解双向通信模式。main isolate要想发消息给work isolate,必须通过worker的SendPort,因此我们要设法将worker的SendPort传递给main isolate。在entrypoint 函数中,worker调用mainRP.send(...)即可实现这一点(mainRP即main isolate的ReceivePort)。
sequenceDiagram
autonumber
participant M as Main Isolate
participant W as Worker Isolate
Note over M: 准备阶段
M->>M: 创建 ReceivePort (mainRP)
M->>W: Isolate.spawn(entryPoint, mainRP.sendPort)
Note over W: Worker 启动
W->>W: 初始化自己的 Event Loop
W->>W: 创建自己的 ReceivePort (workerRP)
W->>M: 发回 workerRP.sendPort (握手)
Note over M: 建立双向连接
M->>M: 收到并保存 Worker 的 SendPort
rect rgb(232, 245, 233)
Note over M, W: 交互阶段
M->>W: 发送任务数据 (Main 发送消息)
W->>W: 执行密集计算/任务
W->>M: 通过 mainRP 发回处理结果
end
M->>M: 后续处理
如下示例由示例ex752改写而成,它演示了两个Isolate间的双向通信,请读者自行分析。
// ex753.dart
import 'dart:isolate';
import 'dart:async';
void main() async {
final watch = Stopwatch()..start();
var count = 0;
String myprintPrefix() => 'out${count++} ${watch.elapsed.inMilliseconds}ms:';
void myprint(o) => print('${myprintPrefix()} $o');
myprint('main: ⚙️ 启动任务...');
final receivePort = ReceivePort(); // 1
await Isolate.spawn(entryPoint, receivePort.sendPort); // 2
SendPort? workerSendPort;
// 3
receivePort.listen((message) {
// 4
if (message case (int x, int y)) {
myprint('main: f($x)=$y');
} // 4a
else if (message is SendPort) {
workerSendPort = message;
myprint('main: 🤝 握手成功');
}
});
myprint('main: 🚀 我继续做其它事');
for (var x = 1; x <= 5; x++) {
Future.delayed(
Duration(seconds: x),
() => workerSendPort?.send((x, myprintPrefix())),
);
}
Future.delayed(Duration(seconds: 6), () => receivePort.close());
/* An example output:
out0 0ms: main: ⚙️ 启动任务...
out1 76ms: main: 🚀 我继续做其它事
out2 138ms: main: 🤝 握手成功
out3 1089ms: worker: x=1
out4 1195ms: main: f(1)=1
out5 2090ms: worker: x=2
out6 2196ms: main: f(2)=4
out7 3096ms: worker: x=3
out8 3203ms: main: f(3)=9
out9 4097ms: worker: x=4
out10 4201ms: main: f(4)=16
out11 5096ms: worker: x=5
out12 5202ms: main: f(5)=25
*/
}
void entryPoint(SendPort mainSendPort) async {
final workerReceivePort = ReceivePort();
mainSendPort.send(workerReceivePort.sendPort);
workerReceivePort.listen((message) async {
if (message case (int x, String prefix)) {
print('$prefix worker: x=$x');
// Simulate a time-consuming task
await Future.delayed(Duration(milliseconds: 100));
final y = x * x;
mainSendPort.send((x, y));
}
});
}
Isolate.spawnUri()
Isolate.spawnUri() 与 Isolate.spawn()类似,区别在于前者的 entryPoint 是由一个脚本文件提供。我们将 ex752 中的 entryPoint 函数稍作改变,然后移至一个文件中:
// A script file for ex754.dart
import 'dart:isolate';
void main(List<String> args, SendPort sendPort) async {
print('worker: $args');
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
sendPort.send(20 * i); // 发送进度
}
// 发送最终计算结果
sendPort.send('DONE');
}
请注意观察这里的函数及其签名:原来的entryPoint变成了main函数,第二个入参代表初始消息。然后 ex752.dart 剩余的部分也稍作变化:
// ex754.dart
import 'dart:isolate';
void main() async {
// ... (omitted for brevity)
final receivePort = ReceivePort(); // 1
var uri = Uri.file('./ex754a.dart');
await Isolate.spawnUri(uri, ['arg1', 'arg2'], receivePort.sendPort);
// ... (omitted for brevity)
}
请尝试运行修改后得到的新示例(ex754)并观察程序输出。
常见约束与避坑指南
spawnUri()会重新加载其依赖的所有库。如果外部脚本也依赖了dart:math,它会在新 Isolate 中拥有一份独立的拷贝。- 在 Flutter 环境中,由于打包机制,
spawnUri()通常不能直接指向磁盘文件 - 安全性:虽然 Isolate 之间是隔离的,但在加载外部脚本时仍需确保来源可靠。
Isolate.spawn() VS Isolate.spawnUri()
| 特性 | Isolate.spawn() | Isolate.spawnUri() |
|---|---|---|
| 代码来源 | 当前程序中的函数(共享现有代码) | 外部 URI 指向的文件(独立代码) |
| 启动速度 | 较快(不需要重新解析代码) | 较慢(需要加载、解析并运行新脚本) |
| 参数限制 | 可以传递大部分不可变对象 | 参数必须是简单的类型(如字符串、列表、SendPort) |
| 内存开销 | 相对较小 | 较大(整个脚本环境需要重新初始化) |
| 典型用途 | 处理当前应用的密集计算 | 动态插件系统、运行不受信任的外部脚本 |
Flutter 特色:compute()
Flutter 的 compute() 函数在用法上与 Isolate.run() 有些类似。下面是示例 ex751的Flutter/compute 版本。
// ex755.dart
import 'package:flutter/foundation.dart'; // compute
Future<void> main() async {
final result = await compute(heavyTask, 100_000_000);
debugPrint('result: $result'); // Output: 99999999
}
int heavyTask(int count) {
var sum = 0;
for (int i = 0; i < count; i++) {
sum += i % 3;
}
return sum;
}
compute() 函数第一个入参数是回调函数(callback),第二入参将传递给callback(作为callback的入参)。为保持接口简洁,这里的callback只能有一个入参,如果你要传递多个参数,可以将这些参数打包成Record或Map。
跨 Isolate 传递数据
在 Dart 中,跨 Isolate 传递数据的机制被称为消息传递(Message Passing)。为了保证内存隔离的安全,Dart 限制了可以通过 SendPort 发送的对象类型。
以下是根据 Dart 官方文档整理的跨 Isolate 传递数据类型表:
| 类别 | 具体数据类型 | 说明 |
|---|---|---|
| 基础类型 | Null, bool, int, double, String | 最常用的基础数据,直接支持。 |
| 集合类型 | List, Map, Set | 容器内的元素也必须是此表中的可传递类型。 |
| 二进制数据 | TypedData (如 Uint8List 等) | 效率极高,支持通过内存移交(Transfer)机制处理。 |
| 通信对象 | SendPort | 允许发送一个端口给另一个 Isolate,实现双向通信。 |
| 函数 | 顶级函数 或 静态方法 | 必须是全局可见的,不能是捕获了上下文的闭包或实例方法。 |
| 系统对象 | Capability, RegExp, StackTrace | Dart 内部定义的特殊支持对象。 |
| 自定义对象 | 仅限 Isolate.exit() 场景 | 在普通 send 中不支持,但在退出 Isolate 时可移交内存所有权。 |
注意: 当你发送一个集合(如 List)时,Dart 实际上是在目标 Isolate 中创建了一个深拷贝。这意味着在子 Isolate 中修改该 List,不会影响主 Isolate 中的原始数据。唯一的例外是使用 Isolate.exit() 。
Reference
- Dart asynchronous programming: Isolates and event loops
- https://dart.dev/language/isolates
- https://api.dart.dev/dart-isolate/Isolate-class.html