Worker Pool (Isolate 池)
Dart Worker Pool (Isolate 池)类似于Java中的线程池。在 Dart 中,Spawn 一个 Isolate 比创建线程更贵(因为它要初始化独立的堆内存)。使用Worker Pool可以 避免冷启动开销 和 控制内存配额。
-
避免冷启动开销:如果你的应用需要频繁处理小任务(如每隔 100ms 滤镜一张图片),不断地 Isolate.run 会导致 CPU 在频繁的创建/销毁中空转。
-
控制内存配额:由于每个 Isolate 都有独立的堆内存,无节制地开启 Isolate 会迅速导致 OOM(内存溢出)。池化可以强制将并发数限制在 CPU 核心数左右。
在 Dart 的世界里,池化不只是为了复用线程,更是为了维持一组‘热就绪’的隔离空间。如果你在做音视频剪辑、大型游戏逻辑解析,Isolate 池就是你的后台工厂。
实现方案
Worer Pool 需要维持一组长期存活的后台 Isolate,并维护它们的 SendPort,以便向其分发任务。当 Worker Pool 收到一个任务(Task)时,它分发给空闲的Worker,如果此时所有的Worker都忙,就将任务放入待执行队列(TaskQueue)。Worker Pool的核心指责就是尽快撮合 Task 与 Worker,一旦Worker空闲,它就去TaskQueue里取出任务交给Worker去执行。
下面是该方案的 时序图 和 类图 。
sequenceDiagram
autonumber
participant Client as main(Task Submitter)
participant Pool as Worker Pool (Manager)
participant W as Worker (Isolate 0..N)
Note over Pool, W: --- 初始化 (建立长存连接) ---
Pool->>W: 创建并保持 Isolate 存活
W-->>Pool: 回传通信凭证 (SendPort)
Note over Pool: 维护 Worker 状态列表
Note over Client, W: --- 场景 A: 任务处理 (直接执行) ---
Client->>Pool: submit(任务 1)
Pool->>W: 寻找空闲工人并派发
W->>W: 执行计算
W-->>Pool: 返回结果 1
Pool->>Client: 交付结果 1 (Future 完成)
Note over Client, W: --- 场景 B: 负载均衡 (排队与复用) ---
Client->>Pool: submit(任务 2)
Client->>Pool: submit(任务 3)
Note over Pool: (此时工人忙碌,内部暂存任务)
W-->>Pool: (工人忙完任务 2) 返回结果 2
Pool->>Client: 交付结果 2
Pool->>W: 自动派发暂存的任务 3
W-->>Pool: 返回结果 3
Pool->>Client: 交付结果 3
classDiagram
class TaskData {
+Function task
+Record data
+TaskData(Function task, Record data)
}
class Worker {
+Isolate isolate
+SendPort sendPort
+bool idle
+Completer? completer
+Worker(Isolate isolate, SendPort sendPort)
}
class WorkerPool {
+int size
-Map~int, Worker~ _workers
-Queue~~ _taskQueue
-StreamController _idleWorkerController
-ReceivePort? _resultPort
-bool _closed
+WorkerPool(int size)
+Future~T~ submit<T>(TaskData taskData)
+Future~void~ close()
}
class GlobalFunctions {
<<Global>>
+void workerEntryPoint(Record message)
}
WorkerPool "1" *-- "0..*" Worker : 维护并调度
WorkerPool ..> TaskData : 处理任务请求(submit)
Worker ..> TaskData : 通过 SendPort 发送任务给 Worker Isolate
Worker ..> GlobalFunctions : Isolate 运行入口
该方案的示例代码如下。
// ex771.dart
import 'dart:async';
import 'dart:collection';
import 'dart:isolate';
int add((int, int) args) => args.$1 + args.$2; // 1
// 2
final watch = Stopwatch()..start();
var count = 0;
void myprint(o) => print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');
void main() async {
var pool = WorkerPool(3); // 3
await Future.delayed(Duration(seconds: 1)); // 3a
const n = 9; // 4
var results = await Future.wait([
for (var x = 1; x <= n; x++) pool.submit(TaskData(add, (x, x))),
]);
myprint(results); // 5
await pool.close(); // 6
/* An example output:
out0 0ms: main: 🤝 pool-worker-0 握手成功
out1 11ms: main: 🤝 pool-worker-1 握手成功
out2 18ms: main: 🤝 pool-worker-2 握手成功
out3 989ms: pool-worker-1 returns 4
out4 994ms: pool-worker-0 returns 2
out5 994ms: pool-worker-2 returns 6
out6 995ms: pool-worker-1 returns 8
out7 995ms: pool-worker-0 returns 10
out8 995ms: pool-worker-2 returns 12
out9 995ms: pool-worker-1 returns 14
out10 995ms: pool-worker-0 returns 16
out11 995ms: pool-worker-2 returns 18
out12 997ms: [2, 4, 6, 8, 10, 12, 14, 16, 18]
out13 1111ms: closed
*/
}
// 7
class TaskData {
final Function task;
final Record data;
TaskData(this.task, this.data);
}
class WorkerPool {
final String name;
final int size; // 8
final _workers = <int, Worker>{}; // 9
final _taskQueue = Queue<(TaskData, Completer)>(); // 10
final _idleWorkerController = StreamController<Worker>(); // 11
ReceivePort? _resultPort;
var _closed = false;
String _workerName(int id) => '$name-worker-$id';
WorkerPool(int size) : this.named(size, 'pool');
WorkerPool.named(this.size, this.name) {
_init();
_start();
}
Future<void> _init() async {
final resultPort = ReceivePort();
final isolates = <int, Isolate>{};
// 12
for (int workerId = 0; workerId < size; workerId++) {
isolates[workerId] = await Isolate.spawn(workerEntryPoint, (
workerId,
resultPort.sendPort,
), debugName: _workerName(workerId));
}
// 13
resultPort.listen((message) {
// 13b
if (message case (int workerId, bool ok, dynamic result)) {
_handleResult(workerId, ok, result);
} // 13a
else if (message case (int workerId, SendPort workerPort)) {
_workers[workerId] = Worker(isolates[workerId]!, workerPort);
myprint('main: 🤝 ${_workerName(workerId)} 握手成功');
}
});
_resultPort = resultPort;
}
// 14
void _handleResult(int workerId, bool ok, dynamic result) {
myprint('${_workerName(workerId)} returns $result');
final worker = _workers[workerId]!;
if (ok) {
worker.completer?.complete(result);
} else {
worker.completer?.completeError(result);
}
worker.completer = null;
_idleWorkerController.sink.add(worker);
}
void _start() {
// 15
_idleWorkerController.stream.listen((worker) {
if (_taskQueue.isNotEmpty) {
final (task, completer) = _taskQueue.removeFirst();
_execute(worker, task, completer);
} else {
worker.idle = true;
}
});
}
// 16
void _execute(Worker worker, TaskData taskData, Completer completer) {
worker.idle = false;
worker.completer = completer;
worker.sendPort.send(taskData);
}
// 17
Future<T> submit<T>(TaskData taskData) {
final completer = Completer<T>();
if (_closed) {
completer.completeError('worker pool closed');
} else {
var worker = _workers.values.where((w) => w.idle).firstOrNull;
if (worker == null) {
_taskQueue.add((taskData, completer));
} else {
_execute(worker, taskData, completer);
}
}
return completer.future;
}
// 18
Future close({
bool ensureExecutingTaskCompleted = true,
bool ensureTaskQueueEmpty = true,
}) async {
if (_closed) return;
_closed = true;
// Waiting to complete
while (true) {
final buzy =
ensureExecutingTaskCompleted &&
_workers.values.any((w) => !w.idle);
final awaitTaskQueue = ensureTaskQueueEmpty && _taskQueue.isNotEmpty;
if (_resultPort == null || buzy || awaitTaskQueue) {
await Future.delayed(Duration(milliseconds: 100));
continue;
}
break;
}
_resultPort!.close();
_idleWorkerController.close();
myprint('$name closed');
}
}
// 13a
class Worker {
final Isolate isolate;
final SendPort sendPort;
bool idle = true;
Completer? completer;
Worker(this.isolate, this.sendPort);
}
// 12a
void workerEntryPoint((int workerId, SendPort resultPort) message) {
final workerReceivePort = ReceivePort();
final workerId = message.$1;
final resultPort = message.$2;
resultPort.send((workerId, workerReceivePort.sendPort));
workerReceivePort.listen((data) async {
if (data case TaskData taskData) {
try {
var result = taskData.task.call(taskData.data);
resultPort.send((workerId, true, result));
} catch (e) {
resultPort.send((workerId, false, e));
}
}
});
}
add是一个全局函数,用来模拟一个计算任务。myprint是一个辅助函数,用来打印一些日志,方便分析(在生产环境中用logger包)。main函数启用了一个WorkerPool(pool,size=3表示将启用 3个Worker Isolate),随后等待1秒钟,让其有充分的时间初始化(方便演示)。main isolate向pool连续提交n(此处n=9)个计算任务,并等待结果。- 打印结果集。
- 关闭
pool,释放相关资源。 TaskData代表了待执行任务,此处简单地封装了一个函数task及其需要的参数data。size表示Worker Isolate的个数。_workers形式上是一个Map(workerId->Worker),是Worker isolate及其状态(闲/忙)的列表。_taskQueue待执行任务队列,其成员的数据类型是Record (TaskData, Completer)。_idleWorkerController是空闲worker的一个Stream,用来实现响应式(Reactive)任务调度(及时撮合task与worker)。- 启动
Worker Isolate,建立WorkerPool与Worker的双向通信,顶级函数workerEntryPoint是Worker的执行入口。 - 处理
Worker Isolate发来的消息,根据消息类型分别处理:a.(int workerId, SendPort workerPort),对应通信握手;b.(int workerId, bool ok, dynamic result),对应计算结果。 _handleResult做了两件事:首先满足了调用方的 Future,然后立即通过_idleWorkerController宣告 Worker 可用。这正是 Reactive 编程核心:数据流动驱动逻辑执行。_idleWorkerController.listen()监听空闲Worker事件,及时分发任务。- 分发任务:将任务交给
Worker Isolate去执行。 submit()向WorkerPool提交任务。- 关闭
WorkerPool,释放相关资源。
优雅关闭(释放资源)
现在让我们把视线聚焦在 WorkerPool.close() 方法。
// 18
Future close({
bool ensureExecutingTaskCompleted = true,
bool ensureTaskQueueEmpty = true,
}) async {
if (_closed) return;
_closed = true;
// Waiting to complete
while (true) {
final buzy =
ensureExecutingTaskCompleted &&
_workers.values.any((w) => !w.idle);
final awaitTaskQueue = ensureTaskQueueEmpty && _taskQueue.isNotEmpty;
if (_resultPort == null || buzy || awaitTaskQueue) {
await Future.delayed(Duration(milliseconds: 100));
continue;
}
break;
}
_resultPort!.close();
_idleWorkerController.close();
myprint('$name closed');
}
目前代码主要存在如下问题:
-
while(true)死循环:默认参数情况下,如果某个 Isolate 内部的任务永远不返回,close()也会永远卡住。考虑增加一个timeout参数,如果超过指定时间_taskQueue还没清空,强制执行后面的close逻辑(跳出while循环)。 -
Isolate 的销毁:目前只是关闭了
WorkerPool的ReceivePort(_resultPort),Worker Isolate进程其实还在运行(虽然没有新任务了)。在 close 的最后,应该调用worker.isolate.kill()来释放系统线程资源。 -
Completer 的善后:如果用户选择了
ensureTaskQueueEmpty: false,_taskQueue中剩余任务的Completer应该手动completeError(),否则调用方的await会永久挂起。
作为练习,请读者针对以上问题完善 WorkerPool.close() 。
Reference
- https://dart.dev/language/isolates
- https://api.dart.dev/dart-isolate/Isolate-class.html