Worker Pool(续2):重启意外崩溃的 Isolate
由于 Isolate 拥有独立的内存堆,一个子 Isolate 的崩溃默认不会直接导致主 Isolate 崩溃,但会导致主 Isolate 永远等不到结果。处理崩溃需要结合使用 主动拦截(内部捕获) 和 被动监听(外部监控)。
主动拦截
在多 Isolate 环境中,异常不是自动共享的。如果子 Isolate 发生崩溃而没有被主动捕获,主 Isolate 只能感知到端口关闭(甚至没有任何提示),这会造成极难排查的“静默失败”。通过在 entryPoint 中对业务逻辑进行 try-catch,确保任何异常都能转换为一条“错误消息”发回给主 Isolate。
在WorkPool中,Worker Isolate 发回的业务消息是一个Record,其格式如下:
(int workerId, bool ok, dynamic result)
它代表了 (workerId, 成功标识, 计算结果或异常信息)。
当业务逻辑出错时,示例ex771简单地将捕获到的异常,发回给 WorkerPool(Main Isolate)。
void workerEntryPoint((int workerId, SendPort resultPort) message) {
// ... (omitted for brevity)
workerReceivePort.listen((data) async {
if (data case TaskData taskData) {
try {
var result = taskData.task.call(taskData.data);
resultPort.send((workerId, true, result));
} catch (e) {
resultPort.send((workerId, false, e));
}
}
});
}
实战中关于异常捕获的策略,我们可以考虑:
- 完整回传:将异常及异常堆栈(StackTrace)一并发回给WorkerPool;
- 二次封装(协议化的错误响应):将异常和/或异常堆栈采用某种格式二次封装后再发回WorkerPool;
- 日志记录(观测性的保障):结合异常和异常堆栈打印错误日志。
被动监听
有时候崩溃发生在 EventLoop 之外(例如内存溢出或初始化错误),Worker 根本没有机会执行 resultPort.send()。此时需要利用 Isolate.addErrorListener(),在创建 Isolate 时为其安装一个监听器,这样当Isolate崩溃时 WorkerPool 就可得到通知并做相应处理(如重启Isolate)。
sequenceDiagram
autonumber
participant M as WorkerPool
participant W as Worker Isolate
participant EP as ErrorPort
W->>W: 发生致命错误 (如 OOM)
W-->>EP: 自动发送错误信号
EP->>M: 触发 ErrorListener 回调
M->>M: 标记 Worker 为失效
alt
M->>M: 执行 Completer.completeError
end
M->>W: isolate.kill()
M->>W: 重新 spawn 一个新 Isolate
M->>M: 将新Worker加入池子
监听 Isolate
Isolate.addOnExitListener()往往与Isolate.addOnExitListener()配合使用,后者用于监听 Isolate 的终止(Terminate)。每个 Worker Isolate 都需要被监听,因此我们将重构 Worker 类,在其构造函数中启动监听,同时在 Worker 类中新增必要的字段。
为 Worker 类添加如下字段:
final int id;
final String name;
final errorPort = ReceivePort();
final exitPort = ReceivePort();
final OnCrashCallback onCrash;
其中 OnCrashCallback 是处理Isolate 崩溃的回调函数:
typedef OnCrashCallback = Future Function(Worker, dynamic ex, dynamic stack);
启动监听
Worker(this.id, this.name, this.isolate, this.sendPort, this.onCrash) {
_addListeners();
}
void _addListeners() {
isolate.addErrorListener(errorPort.sendPort);
isolate.addOnExitListener(exitPort.sendPort);
errorPort.listen((message) {
final [ex, stack] = message;
myprint('$name crash: $ex');
onCrash(this, ex, stack);
});
exitPort.listen((_) {
myprint('$name exit');
errorPort.close();
exitPort.close();
});
}
重启崩溃的Isolate
当检测到 Worker Isolate 崩溃时,WorkerPool 需要具备“自愈”能力(即onCrash 函数需要完成之事):
- 处理遗留任务:给正在处理的
Completer发送错误,或者将其任务移交给新的 Worker,防止主线程挂起。 - 重新补员:立即终止旧的Isolate(强制释放相关资源),并创建一个新的 Isolate 来替代崩溃的那个。
为了将正在处理的任务移交给新的 Worker,我们需要记录 TaskData;同时为了防止无限次移交,给 TaskData 新增一个 maxComputation(默认为1),该字段代表了任务最多被执行多少次。以下是对应的代码片段:
class TaskData {
final Function task;
final Record data;
final int maxComputation;
TaskData(this.task, this.data) : maxComputation = 1;
TaskData.maxComputations(this.task, this.data, this.maxComputation);
}
class Worker {
final Isolate isolate;
final SendPort sendPort;
bool idle = true;
bool died = false;
Completer? completer;
TaskData? taskData;
int computations = 0;
// ... (omitted for brevity)
}
字段 died 用于标记 Worker.isolate 已终止,computations 用于记录任务执行次数。
下面我们开始实现 onCrash 逻辑。新增 WorkerPool._handleWorkerCrash 实例方法,它将被传入 Worker 构造函数的 onCrash 参数。
Future _handleWorkerCrash(Worker worker, ex, stack) async {
// Fast fail
if (worker.taskData != null &&
worker.computations >= worker.taskData!.maxComputation) {
worker.complete(ex, ok: false);
}
worker.die();
// Reborn Isolate
if (_closed) return;
myprint('reborn worker ${worker.name} ...');
_isolates[worker.id] = await _spawn(worker.id, _resultPort!);
}
Worker.complete()是一个实用方法,旨在让代码更加简洁:
void complete(result, {required bool ok}) {
if (completer != null && !completer!.isCompleted) {
if (ok) {
completer!.complete(result);
} else {
completer!.completeError(result);
}
}
_resetTask();
}
void _resetTask() {
taskData = null;
completer = null;
computations = 0;
}
_isolates 和 _spawn() 由原来WorkerPool._init()方法中的代码片段重构而来。
class WorkerPool {
// ...
final _isolates = <int, Isolate>{};
Future<Isolate> _spawn(int workerId, ReceivePort resultPort) {
return Isolate.spawn(workerEntryPoint, (
workerId,
resultPort.sendPort,
), debugName: _workerName(workerId));
}
// ...
}
worker.die() 强制终止 isolate,并标记 isolate 已终止。
class Worker {
// ... (omitted for brevity)
void die() {
idle = false; // To be reborned
died = true;
isolate.kill(priority: Isolate.immediate);
errorPort.close();
exitPort.close();
}
// ... (omitted for brevity)
}
可能有读者要问:既然 isolate 已经崩溃了,还有必要调用 isolate.kill() 吗?答案是很有必要。这是一种“防御性编程”,虽然在逻辑层面上 Isolate 已经“崩溃”了(触发了 errorListener),但崩溃并不总是意味着底层的内存和资源已经彻底释放(崩溃不等于消亡)。借助下图我们可以更好地理解这个问题。
sequenceDiagram
participant P as WorkerPool
participant W as Old Isolate
participant N as New Isolate
W->>P: 发生错误 (ErrorListener)
Note over W: 虽报错但未彻底退出(僵尸态)
rect rgb(255, 200, 200)
Note right of P: 如果没有调用 kill
P->>N: Isolate.spawn (启动新实例)
Note over W, N: 此时系统中存在两个 Isolate 副本
end
Note over P: 资源压力增加,可能导致连续崩溃
P->>W: isolate.kill(immediate)
Note over W: 强制释放资源
新的 Isolate 接管旧的任务
下图表达了 WorkerPool的初始化(_init()方法)逻辑。
sequenceDiagram
autonumber
participant Main as Main Isolate (WorkerPool)
participant RP as ReceivePort (resultPort)
participant W1 as Worker Isolate 1
participant W2 as Worker Isolate N
Note over Main: _init() 开始
Main->>RP: 创建 ReceivePort
rect rgb(240, 240, 240)
Note over Main, W2: 循环派生阶段 (Spawn)
Main->>W1: Isolate.spawn(workerEntryPoint, resultPort.sendPort)
Main->>W2: Isolate.spawn(workerEntryPoint, resultPort.sendPort)
end
Note over Main, RP: 进入 resultPort.listen 监听
rect rgb(230, 255, 230)
Note over W1, Main: 阶段一:握手协议 (Handshake)
W1->>RP: 发送 (workerId, workerSendPort)
RP-->>Main: 触发 listen 回调
Main->>Main: 创建 Worker 实例
Main->>Main: worker.takeOverTask (尝试接管旧任务)
alt 无旧任务 (taskData == null)
Main->>Main: 加入空闲控制器 (_idleWorkerController)
else 有旧任务 (需重算)
Main->>W1: _execute (重新发送任务)
end
Note over Main: 🤝 握手成功
end
rect rgb(230, 240, 255)
Note over W1, Main: 阶段二:结果返回 (Result)
W1->>RP: 发送 (workerId, ok, result)
RP-->>Main: 触发 listen 回调
Main->>Main: _handleResult(workerId, ok, result)
end
resultPort 不仅仅用于接收计算结果,它还承担了 “控制流” 的职责。在 Worker Isolate 刚启动时,它通过这个端口回传自己的 SendPort。这种“自报家门”的机制是建立双向通讯的基础。 在握手成功的一瞬间,代码立即执行了 takeOverTask()。这在重启场景下至关重要:新 Worker 在逻辑上“透明”地替换了崩溃的旧 Worker,并立即检查是否需要继续执行未完成的任务。只有在确认没有任务需要接管时,才会将 Worker 放入 _idleWorkerController,避免了 Worker 在接管任务的同时又被分配了新任务。
具体实现如下:
Future<void> _init() async {
final resultPort = ReceivePort();
for (int workerId = 0; workerId < size; workerId++) {
_isolates[workerId] = await _spawn(workerId, resultPort);
}
resultPort.listen((message) {
if (message case (int workerId, bool ok, dynamic result)) {
_handleResult(workerId, ok, result);
} else if (message case (int workerId, SendPort workerPort)) {
final workerName = _workerName(workerId);
final worker = Worker(
workerId,
workerName,
_isolates[workerId]!,
workerPort,
_handleWorkerCrash,
);
worker.takeOverTask(_workers[workerId]);
_workers[workerId] = worker;
if (worker.taskData == null) {
_idleWorkerController.sink.add(worker);
} else {
// Recompute
_execute(worker, worker.taskData!, worker.completer!);
}
myprint('main: 🤝 $workerName 握手成功');
}
});
_resultPort = resultPort;
}
新的_execute()方法,记录了 TaskData 和 computations(任务执行次数),以支持 任务接管(takeOverTask) 和 快速失败(任务的执行次数已达上限):
void _execute(Worker worker, TaskData taskData, Completer completer) {
worker.idle = false;
worker.completer = completer;
worker.taskData = taskData;
worker.computations++;
worker.sendPort.send(taskData);
}
崩溃发生时,旧 Worker 的 taskData 和 completer 被平滑转移给新 Worker,保证了业务层对底层崩溃几乎“无感知”。
void takeOverTask(Worker? worker) {
if (worker == null) return;
taskData = worker.taskData;
completer = worker.completer;
computations = worker.computations;
}
使用指数退避(Exponential Backoff)算法
WorkerPool里_handleWorkerCrashl方法的最后一行:
_isolates[worker.id] = await _spawn(worker.id, _resultPort!);
如果 _spawn 抛出异常(如内存不足引起IsolateSpawnException),就会导致 Isolate 重启失败。因此我们需要为这行代码进行 try-catch,当异常发生时,尝试重试。这里最好采用指数退避算法来确定重试的延迟时间(delay)。
$$delay = base \times 2^{n}$$
其中
base: 初始等待时间(例如 200ms)n: 连续失败的次数(第n次重试)
为了防止延迟时间无限增长,通常会设置一个上限(max_delay):
$$delay = \min(base \times 2^{n}, \text{max_delay})$$
本文采用带随机抖动(jitter)的指数退避算法:
$$delay = \min(base \cdot 2^{n}, \text{max_delay}) + \text{jitter}$$
具体的实现如下:
final _retryCounts = <int, int>{}; // worker reborn counts
final Duration _baseDelay = Duration(milliseconds: 200); // reborn base delay
final Duration _maxDelay = Duration(seconds: 60); // reborn max delay
/// Reborn Isolate (Exponential Backoff)
void _reborn(Worker worker) {
if (_closed) return;
final id = worker.id;
final currentRetry = _retryCounts[id] ?? 0;
_retryCounts[id] = currentRetry + 1;
var delay = _baseDelay * pow(2, currentRetry);
if (delay > _maxDelay) delay = _maxDelay;
delay += Duration(milliseconds: Random().nextInt(50)); // +jitter
myprint(
'reborn worker ${worker.name} ($currentRetry)'
' in ${delay.inMilliseconds}ms ...',
);
Future.delayed(delay, () async {
if (_closed) return;
try {
_isolates[id] = await _spawn(worker.id, _resultPort!);
} catch (e) {
myprint('failed to reborn worker ${worker.name} ($currentRetry): $e');
if (!_closed && delay < _maxDelay - Duration(milliseconds: 100)) {
_reborn(worker);
}
}
});
}
当Worker Isolate 握手成功时,需要重置 _retryCount :
_retryCounts[workerId] = 0; // reset reborn count
要点解析:
a. 避免群效应:代码中的随机抖动 jitter = Random().nextInt(50) ms ,看似这微小,但却是多Isolate或分布式系统中的“防洪坝”——它能有效避免多个 Isolate 在同一时刻重启而产生的 CPU 峰值(即惊群效应),因为这些重启请求将被均匀地摊分在时间轴上。
b. 递归重启: 在 catch 块中通过判断 delay < _maxDelay 再次调用_reborn,形成了一个自动重试的闭环。只要延迟没达到上限且 WorkerPool 未关闭,系统就会持续尝试恢复健康。
c. 在 Future.delayed 的开始和 _spawn 之前双重检查 if (_closed) return,确保了在 WorkerPool 关闭期间不会有“幽灵 Isolate”被创建出来。
其它细节
WorkerPool.close()在判断是否有在途任务时,需要同时考虑 Worker.idle 和 Worker.died 两个字段。
代码片段如下:
final buzy =
ensureExecutingTaskCompleted &&
_workers.values.any((w) => !w.idle && !w.died);
DEMO
最后我们修改 workerEntryPoint 引入随机性的失败,然后修改main函数后运行一下我们的程序。
void workerEntryPoint((int workerId, SendPort resultPort) message) {
// ... (omitted for brevity)
workerReceivePort.listen((data) async {
if (data case TaskData taskData) {
// Simulate an uncaught exception
if (Random().nextBool()) {
throw 'CRASH';
}
// ... (omitted for brevity)
} else if (data == _shutdownSignal) {
workerReceivePort.close();
print('${Isolate.current.debugName} closed');
}
});
}
void main() async {
var pool = WorkerPool(3);
await Future.delayed(Duration(seconds: 1));
const n = 9, maxCount = 3;
var results = await Future.wait([
for (var x = 1; x <= n; x++)
pool
.submit(TaskData.maxComputations(add, (x, x), maxCount))
.onError((e, s) => -1),
]);
myprint(results);
await pool.close();
}
/* An example output:
out0 0ms: main: 🤝 pool-worker-0 握手成功
out1 13ms: main: 🤝 pool-worker-1 握手成功
out2 18ms: main: 🤝 pool-worker-2 握手成功
out3 984ms: pool-worker-2 returns 6. computations=1
out4 989ms: pool-worker-1 crash: CRASH
out5 1007ms: reborn worker pool-worker-1 (0) in 200ms ...
out6 1008ms: pool-worker-0 crash: CRASH
out7 1008ms: reborn worker pool-worker-0 (0) in 210ms ...
out8 1009ms: pool-worker-2 crash: CRASH
out9 1009ms: reborn worker pool-worker-2 (0) in 247ms ...
out10 1237ms: main: 🤝 pool-worker-1 握手成功
out11 1237ms: pool-worker-1 crash: CRASH
out12 1237ms: reborn worker pool-worker-1 (0) in 201ms ...
out13 1241ms: main: 🤝 pool-worker-0 握手成功
out14 1242ms: pool-worker-0 crash: CRASH
out15 1242ms: reborn worker pool-worker-0 (0) in 243ms ...
out16 1293ms: main: 🤝 pool-worker-2 握手成功
out17 1294ms: pool-worker-2 returns 8. computations=2
out18 1295ms: pool-worker-2 crash: CRASH
out19 1295ms: reborn worker pool-worker-2 (0) in 244ms ...
out20 1555ms: main: 🤝 pool-worker-1 握手成功
out21 1556ms: pool-worker-1 returns 4. computations=3
out22 1556ms: pool-worker-1 crash: CRASH
out23 1556ms: reborn worker pool-worker-1 (0) in 219ms ...
out24 1571ms: main: 🤝 pool-worker-0 握手成功
out25 1571ms: pool-worker-0 returns 2. computations=3
out26 1572ms: pool-worker-0 crash: CRASH
out27 1572ms: reborn worker pool-worker-0 (0) in 200ms ...
out28 1594ms: main: 🤝 pool-worker-2 握手成功
out29 1595ms: pool-worker-2 crash: CRASH
out30 1595ms: reborn worker pool-worker-2 (0) in 206ms ...
out31 1809ms: main: 🤝 pool-worker-0 握手成功
out32 1810ms: pool-worker-0 crash: CRASH
out33 1810ms: reborn worker pool-worker-0 (0) in 228ms ...
out34 1823ms: main: 🤝 pool-worker-1 握手成功
out35 1823ms: pool-worker-1 returns 12. computations=2
out36 1824ms: pool-worker-1 crash: CRASH
out37 1824ms: reborn worker pool-worker-1 (0) in 241ms ...
out38 1838ms: main: 🤝 pool-worker-2 握手成功
out39 1838ms: pool-worker-2 crash: CRASH
out40 1839ms: reborn worker pool-worker-2 (0) in 226ms ...
out41 2063ms: main: 🤝 pool-worker-0 握手成功
out42 2063ms: pool-worker-0 returns 14. computations=3
out43 2063ms: pool-worker-0 returns 18. computations=1
out44 2091ms: main: 🤝 pool-worker-2 握手成功
out45 2097ms: main: 🤝 pool-worker-1 握手成功
out46 2097ms: pool-worker-1 crash: CRASH
out47 2098ms: reborn worker pool-worker-1 (0) in 211ms ...
out48 2336ms: main: 🤝 pool-worker-1 握手成功
out49 2336ms: pool-worker-1 returns 16. computations=3
out50 2339ms: [2, 4, 6, 8, -1, 12, 14, 16, 18]
pool-worker-2 closed
pool-worker-1 closed
pool-worker-0 closed
out51 2363ms: pool-worker-0 exit
out52 2365ms: pool-worker-2 exit
out53 2368ms: pool-worker-1 exit
out54 2451ms: pool closed
结束语
软件工程中有一句名言:“编写并发程序很难,而编写能够正确处理故障的并发程序更难。”
我们所做的每一点改进——处理残余请求的_shutdownSignal、那 50ms 的随机抖动、那看似多余的isolate.kill()、那个接管任务的握手协议——都是为了让我们的系统在面对真实的生产压力、网络波动与硬件极限时,不仅仅是“活着”,而是优雅且稳健地运行。
希望通过对 WorkerPool 的学习,能助您建立起关于防御性并发编程的思维模型。