Worker Pool (Isolate 池)

Dart Worker Pool (Isolate 池)类似于Java中的线程池。在 Dart 中,Spawn 一个 Isolate 比创建线程更贵(因为它要初始化独立的堆内存)。使用Worker Pool可以 避免冷启动开销控制内存配额

  • 避免冷启动开销:如果你的应用需要频繁处理小任务(如每隔 100ms 滤镜一张图片),不断地 Isolate.run 会导致 CPU 在频繁的创建/销毁中空转。

  • 控制内存配额:由于每个 Isolate 都有独立的堆内存,无节制地开启 Isolate 会迅速导致 OOM(内存溢出)。池化可以强制将并发数限制在 CPU 核心数左右。

在 Dart 的世界里,池化不只是为了复用线程,更是为了维持一组‘热就绪’的隔离空间。如果你在做音视频剪辑、大型游戏逻辑解析,Isolate 池就是你的后台工厂。

实现方案

Worer Pool 需要维持一组长期存活的后台 Isolate,并维护它们的 SendPort,以便向其分发任务。当 Worker Pool 收到一个任务(Task)时,它分发给空闲的Worker,如果此时所有的Worker都忙,就将任务放入待执行队列(TaskQueue)。Worker Pool的核心指责就是尽快撮合 TaskWorker,一旦Worker空闲,它就去TaskQueue里取出任务交给Worker去执行。

下面是该方案的 时序图类图

sequenceDiagram
    autonumber
    participant Client as main(Task Submitter)
    participant Pool as Worker Pool (Manager)
    participant W as Worker (Isolate 0..N)

    Note over Pool, W: --- 初始化 (建立长存连接) ---
    Pool->>W: 创建并保持 Isolate 存活
    W-->>Pool: 回传通信凭证 (SendPort)
    Note over Pool: 维护 Worker 状态列表

    Note over Client, W: --- 场景 A: 任务处理 (直接执行) ---
    Client->>Pool: submit(任务 1)
    Pool->>W: 寻找空闲工人并派发
    W->>W: 执行计算
    W-->>Pool: 返回结果 1
    Pool->>Client: 交付结果 1 (Future 完成)

    Note over Client, W: --- 场景 B: 负载均衡 (排队与复用) ---
    Client->>Pool: submit(任务 2)
    Client->>Pool: submit(任务 3)
    Note over Pool: (此时工人忙碌,内部暂存任务)
    W-->>Pool: (工人忙完任务 2) 返回结果 2
    Pool->>Client: 交付结果 2
    Pool->>W: 自动派发暂存的任务 3
    W-->>Pool: 返回结果 3
    Pool->>Client: 交付结果 3
classDiagram
    class TaskData {
        +Function task
        +Record data
        +TaskData(Function task, Record data)
    }

    class Worker {
        +Isolate isolate
        +SendPort sendPort
        +bool idle
        +Completer? completer
        +Worker(Isolate isolate, SendPort sendPort)
    }

    class WorkerPool {
        +int size
        -Map~int, Worker~ _workers
        -Queue~~ _taskQueue
        -StreamController _idleWorkerController
        -ReceivePort? _resultPort
        -bool _closed
        
        +WorkerPool(int size)
        +Future~T~ submit<T>(TaskData taskData)
        +Future~void~ close()
    }

    class GlobalFunctions {
        <<Global>>
        +void workerEntryPoint(Record message)
    }

    WorkerPool "1" *-- "0..*" Worker : 维护并调度
    WorkerPool ..> TaskData : 处理任务请求(submit)
    Worker ..> TaskData : 通过 SendPort 发送任务给 Worker Isolate
    Worker ..> GlobalFunctions : Isolate 运行入口

该方案的示例代码如下。

// ex771.dart
import 'dart:async';
import 'dart:collection';
import 'dart:isolate';

int add((int, int) args) => args.$1 + args.$2; // 1

// 2
final watch = Stopwatch()..start();
var count = 0;
void myprint(o) => print('out${count++} ${watch.elapsed.inMilliseconds}ms: $o');

void main() async {
  var pool = WorkerPool(3); // 3
  await Future.delayed(Duration(seconds: 1)); // 3a

  const n = 9; // 4
  var results = await Future.wait([
    for (var x = 1; x <= n; x++) pool.submit(TaskData(add, (x, x))),
  ]);
  myprint(results); // 5

  await pool.close(); // 6

  /* An example output:
out0 0ms: main: 🤝 pool-worker-0 握手成功
out1 11ms: main: 🤝 pool-worker-1 握手成功
out2 18ms: main: 🤝 pool-worker-2 握手成功
out3 989ms: pool-worker-1 returns 4
out4 994ms: pool-worker-0 returns 2
out5 994ms: pool-worker-2 returns 6
out6 995ms: pool-worker-1 returns 8
out7 995ms: pool-worker-0 returns 10
out8 995ms: pool-worker-2 returns 12
out9 995ms: pool-worker-1 returns 14
out10 995ms: pool-worker-0 returns 16
out11 995ms: pool-worker-2 returns 18
out12 997ms: [2, 4, 6, 8, 10, 12, 14, 16, 18]
out13 1111ms: closed
 */
}

// 7
class TaskData {
  final Function task;
  final Record data;
  TaskData(this.task, this.data);
}

class WorkerPool {
  final String name;
  final int size; // 8
  final _workers = <int, Worker>{}; // 9
  final _taskQueue = Queue<(TaskData, Completer)>(); // 10
  final _idleWorkerController = StreamController<Worker>(); // 11

  ReceivePort? _resultPort;
  var _closed = false;

  String _workerName(int id) => '$name-worker-$id';

  WorkerPool(int size) : this.named(size, 'pool');

  WorkerPool.named(this.size, this.name) {
    _init();
    _start();
  }

  Future<void> _init() async {
    final resultPort = ReceivePort();
    final isolates = <int, Isolate>{};
    // 12
    for (int workerId = 0; workerId < size; workerId++) {
      isolates[workerId] = await Isolate.spawn(workerEntryPoint, (
        workerId,
        resultPort.sendPort,
      ), debugName: _workerName(workerId));
    }
    // 13
    resultPort.listen((message) {
      // 13b
      if (message case (int workerId, bool ok, dynamic result)) {
        _handleResult(workerId, ok, result);
      } // 13a
      else if (message case (int workerId, SendPort workerPort)) {
        _workers[workerId] = Worker(isolates[workerId]!, workerPort);
        myprint('main: 🤝 ${_workerName(workerId)} 握手成功');
      }
    });

    _resultPort = resultPort;
  }

  // 14
  void _handleResult(int workerId, bool ok, dynamic result) {
    myprint('${_workerName(workerId)} returns $result');
    final worker = _workers[workerId]!;
    if (ok) {
      worker.completer?.complete(result);
    } else {
      worker.completer?.completeError(result);
    }
    worker.completer = null;
    _idleWorkerController.sink.add(worker);
  }

  void _start() {
    // 15
    _idleWorkerController.stream.listen((worker) {
      if (_taskQueue.isNotEmpty) {
        final (task, completer) = _taskQueue.removeFirst();
        _execute(worker, task, completer);
      } else {
        worker.idle = true;
      }
    });
  }

  // 16
  void _execute(Worker worker, TaskData taskData, Completer completer) {
    worker.idle = false;
    worker.completer = completer;
    worker.sendPort.send(taskData);
  }

  // 17
  Future<T> submit<T>(TaskData taskData) {
    final completer = Completer<T>();
    if (_closed) {
      completer.completeError('worker pool closed');
    } else {
      var worker = _workers.values.where((w) => w.idle).firstOrNull;
      if (worker == null) {
        _taskQueue.add((taskData, completer));
      } else {
        _execute(worker, taskData, completer);
      }
    }
    return completer.future;
  }

  // 18
  Future close({
    bool ensureExecutingTaskCompleted = true,
    bool ensureTaskQueueEmpty = true,
  }) async {
    if (_closed) return;
    _closed = true;

    // Waiting to complete
    while (true) {
      final buzy =
          ensureExecutingTaskCompleted &&
          _workers.values.any((w) => !w.idle);
      final awaitTaskQueue = ensureTaskQueueEmpty && _taskQueue.isNotEmpty;
      if (_resultPort == null || buzy || awaitTaskQueue) {
        await Future.delayed(Duration(milliseconds: 100));
        continue;
      }
      break;
    }

    _resultPort!.close();
    _idleWorkerController.close();
    myprint('$name closed');
  }
}

// 13a
class Worker {
  final Isolate isolate;
  final SendPort sendPort;
  bool idle = true;
  Completer? completer;

  Worker(this.isolate, this.sendPort);
}

// 12a
void workerEntryPoint((int workerId, SendPort resultPort) message) {
  final workerReceivePort = ReceivePort();
  final workerId = message.$1;
  final resultPort = message.$2;
  resultPort.send((workerId, workerReceivePort.sendPort));

  workerReceivePort.listen((data) async {
    if (data case TaskData taskData) {
      try {
        var result = taskData.task.call(taskData.data);
        resultPort.send((workerId, true, result));
      } catch (e) {
        resultPort.send((workerId, false, e));
      }
    }
  });
}
  1. add 是一个全局函数,用来模拟一个计算任务。
  2. myprint 是一个辅助函数,用来打印一些日志,方便分析(在生产环境中用 logger 包)。
  3. main 函数启用了一个 WorkerPoolpoolsize=3表示将启用 3个Worker Isolate),随后等待1秒钟,让其有充分的时间初始化(方便演示)。
  4. main isolatepool 连续提交 n(此处n=9)个计算任务,并等待结果。
  5. 打印结果集。
  6. 关闭 pool,释放相关资源。
  7. TaskData 代表了待执行任务,此处简单地封装了一个函数task及其需要的参数data
  8. size 表示 Worker Isolate 的个数。
  9. _workers 形式上是一个Map(workerId->Worker),是Worker isolate及其状态(闲/忙)的列表。
  10. _taskQueue 待执行任务队列,其成员的数据类型是 Record (TaskData, Completer)
  11. _idleWorkerController 是空闲 worker 的一个 Stream,用来实现响应式(Reactive)任务调度(及时撮合taskworker)。
  12. 启动 Worker Isolate ,建立WorkerPoolWorker的双向通信,顶级函数 workerEntryPointWorker的执行入口。
  13. 处理 Worker Isolate 发来的消息,根据消息类型分别处理:a. (int workerId, SendPort workerPort) ,对应通信握手;b. (int workerId, bool ok, dynamic result),对应计算结果。
  14. _handleResult做了两件事:首先满足了调用方的 Future,然后立即通过 _idleWorkerController 宣告 Worker 可用。这正是 Reactive 编程核心:数据流动驱动逻辑执行。
  15. _idleWorkerController.listen() 监听空闲Worker事件,及时分发任务。
  16. 分发任务:将任务交给Worker Isolate去执行。
  17. submit()WorkerPool 提交任务。
  18. 关闭 WorkerPool,释放相关资源。

优雅关闭(释放资源)

现在让我们把视线聚焦在 WorkerPool.close() 方法。

  // 18
  Future close({
    bool ensureExecutingTaskCompleted = true,
    bool ensureTaskQueueEmpty = true,
  }) async {
    if (_closed) return;
    _closed = true;

    // Waiting to complete
    while (true) {
      final buzy =
          ensureExecutingTaskCompleted &&
          _workers.values.any((w) => !w.idle);
      final awaitTaskQueue = ensureTaskQueueEmpty && _taskQueue.isNotEmpty;
      if (_resultPort == null || buzy || awaitTaskQueue) {
        await Future.delayed(Duration(milliseconds: 100));
        continue;
      }
      break;
    }

    _resultPort!.close();
    _idleWorkerController.close();
    myprint('$name closed');
  }

目前代码主要存在如下问题:

  • while(true)死循环:默认参数情况下,如果某个 Isolate 内部的任务永远不返回,close() 也会永远卡住。考虑增加一个 timeout 参数,如果超过指定时间_taskQueue还没清空,强制执行后面的close逻辑(跳出 while 循环)。

  • Isolate 的销毁:目前只是关闭了WorkerPoolReceivePort_resultPort),Worker Isolate进程其实还在运行(虽然没有新任务了)。在 close 的最后,应该调用 worker.isolate.kill() 来释放系统线程资源。

  • Completer 的善后:如果用户选择了 ensureTaskQueueEmpty: false_taskQueue 中剩余任务的 Completer 应该手动 completeError(),否则调用方的 await 会永久挂起。

作为练习,请读者针对以上问题完善 WorkerPool.close()

Reference

  • https://dart.dev/language/isolates
  • https://api.dart.dev/dart-isolate/Isolate-class.html