Worker Pool(续2):重启意外崩溃的 Isolate

由于 Isolate 拥有独立的内存堆,一个子 Isolate 的崩溃默认不会直接导致主 Isolate 崩溃,但会导致主 Isolate 永远等不到结果。处理崩溃需要结合使用 主动拦截(内部捕获)被动监听(外部监控)

主动拦截

在多 Isolate 环境中,异常不是自动共享的。如果子 Isolate 发生崩溃而没有被主动捕获,主 Isolate 只能感知到端口关闭(甚至没有任何提示),这会造成极难排查的“静默失败”。通过在 entryPoint 中对业务逻辑进行 try-catch,确保任何异常都能转换为一条“错误消息”发回给主 Isolate。

在WorkPool中,Worker Isolate 发回的业务消息是一个Record,其格式如下:

(int workerId, bool ok, dynamic result)

它代表了 (workerId, 成功标识, 计算结果或异常信息)

当业务逻辑出错时,示例ex771简单地将捕获到的异常,发回给 WorkerPool(Main Isolate)。

void workerEntryPoint((int workerId, SendPort resultPort) message) {
  // ... (omitted for brevity)

  workerReceivePort.listen((data) async {
    if (data case TaskData taskData) {
      try {
        var result = taskData.task.call(taskData.data);
        resultPort.send((workerId, true, result)); 
      } catch (e) {
        resultPort.send((workerId, false, e));
      }
    }
  });
}

实战中关于异常捕获的策略,我们可以考虑:

  1. 完整回传:将异常及异常堆栈(StackTrace)一并发回给WorkerPool;
  2. 二次封装(协议化的错误响应):将异常和/或异常堆栈采用某种格式二次封装后再发回WorkerPool;
  3. 日志记录(观测性的保障):结合异常和异常堆栈打印错误日志。

被动监听

有时候崩溃发生在 EventLoop 之外(例如内存溢出或初始化错误),Worker 根本没有机会执行 resultPort.send()。此时需要利用 Isolate.addErrorListener(),在创建 Isolate 时为其安装一个监听器,这样当Isolate崩溃时 WorkerPool 就可得到通知并做相应处理(如重启Isolate)。

sequenceDiagram
	autonumber
    participant M as WorkerPool
    participant W as Worker Isolate
    participant EP as ErrorPort

    W->>W: 发生致命错误 (如 OOM)
    W-->>EP: 自动发送错误信号
    EP->>M: 触发 ErrorListener 回调
    M->>M: 标记 Worker 为失效
    alt
      M->>M: 执行 Completer.completeError
    end
    M->>W: isolate.kill()
    M->>W: 重新 spawn 一个新 Isolate
    M->>M: 将新Worker加入池子

监听 Isolate

Isolate.addOnExitListener()往往与Isolate.addOnExitListener()配合使用,后者用于监听 Isolate 的终止(Terminate)。每个 Worker Isolate 都需要被监听,因此我们将重构 Worker 类,在其构造函数中启动监听,同时在 Worker 类中新增必要的字段。

Worker 类添加如下字段:

  final int id;
  final String name;
  final errorPort = ReceivePort();
  final exitPort = ReceivePort();
  final OnCrashCallback onCrash;

其中 OnCrashCallback 是处理Isolate 崩溃的回调函数:

typedef OnCrashCallback = Future Function(Worker, dynamic ex, dynamic stack);

启动监听

  Worker(this.id, this.name, this.isolate, this.sendPort, this.onCrash) {
    _addListeners();
  }

  void _addListeners() {
    isolate.addErrorListener(errorPort.sendPort);
    isolate.addOnExitListener(exitPort.sendPort);

    errorPort.listen((message) {
      final [ex, stack] = message;
      myprint('$name crash: $ex');
      onCrash(this, ex, stack);
    });

    exitPort.listen((_) {
      myprint('$name exit');
      errorPort.close();
      exitPort.close();
    });
  }

重启崩溃的Isolate

当检测到 Worker Isolate 崩溃时,WorkerPool 需要具备“自愈”能力(即onCrash 函数需要完成之事):

  • 处理遗留任务:给正在处理的 Completer 发送错误,或者将其任务移交给新的 Worker,防止主线程挂起。
  • 重新补员:立即终止旧的Isolate(强制释放相关资源),并创建一个新的 Isolate 来替代崩溃的那个。

为了将正在处理的任务移交给新的 Worker,我们需要记录 TaskData;同时为了防止无限次移交,给 TaskData 新增一个 maxComputation(默认为1),该字段代表了任务最多被执行多少次。以下是对应的代码片段:

class TaskData {
  final Function task;
  final Record data;
  final int maxComputation;

  TaskData(this.task, this.data) : maxComputation = 1;
  TaskData.maxComputations(this.task, this.data, this.maxComputation);
}

class Worker {
  final Isolate isolate;
  final SendPort sendPort;

  bool idle = true;
  bool died = false;

  Completer? completer;
  TaskData? taskData;
  int computations = 0;
  // ... (omitted for brevity)
}  

字段 died 用于标记 Worker.isolate 已终止,computations 用于记录任务执行次数。

下面我们开始实现 onCrash 逻辑。新增 WorkerPool._handleWorkerCrash 实例方法,它将被传入 Worker 构造函数的 onCrash 参数。

  Future _handleWorkerCrash(Worker worker, ex, stack) async {
    // Fast fail
    if (worker.taskData != null &&
        worker.computations >= worker.taskData!.maxComputation) {
      worker.complete(ex, ok: false);
    }
    worker.die();

    // Reborn Isolate
    if (_closed) return;
    myprint('reborn worker ${worker.name} ...');
    _isolates[worker.id] = await _spawn(worker.id, _resultPort!);
  }

Worker.complete()是一个实用方法,旨在让代码更加简洁:


  void complete(result, {required bool ok}) {
    if (completer != null && !completer!.isCompleted) {
      if (ok) {
        completer!.complete(result);
      } else {
        completer!.completeError(result);
      }
    }
    _resetTask();
  }

  void _resetTask() {
    taskData = null;
    completer = null;
    computations = 0;
  }

_isolates_spawn() 由原来WorkerPool._init()方法中的代码片段重构而来。

class WorkerPool {
  // ...
  
  final _isolates = <int, Isolate>{};

  Future<Isolate> _spawn(int workerId, ReceivePort resultPort) {
    return Isolate.spawn(workerEntryPoint, (
      workerId,
      resultPort.sendPort,
    ), debugName: _workerName(workerId));
  }

  // ...
}    

worker.die() 强制终止 isolate,并标记 isolate 已终止。

class Worker {
  // ... (omitted for brevity)

  void die() {
    idle = false; // To be reborned
    died = true;
    isolate.kill(priority: Isolate.immediate);
    errorPort.close();
    exitPort.close();
  }

  // ... (omitted for brevity)
}

可能有读者要问:既然 isolate 已经崩溃了,还有必要调用 isolate.kill() 吗?答案是很有必要。这是一种“防御性编程”,虽然在逻辑层面上 Isolate 已经“崩溃”了(触发了 errorListener),但崩溃并不总是意味着底层的内存和资源已经彻底释放(崩溃不等于消亡)。借助下图我们可以更好地理解这个问题。

sequenceDiagram
    participant P as WorkerPool
    participant W as Old Isolate
    participant N as New Isolate

    W->>P: 发生错误 (ErrorListener)
    Note over W: 虽报错但未彻底退出(僵尸态)
    
    rect rgb(255, 200, 200)
    Note right of P: 如果没有调用 kill
    P->>N: Isolate.spawn (启动新实例)
    Note over W, N: 此时系统中存在两个 Isolate 副本
    end

    Note over P: 资源压力增加,可能导致连续崩溃
    
    P->>W: isolate.kill(immediate)
    Note over W: 强制释放资源

新的 Isolate 接管旧的任务

下图表达了 WorkerPool的初始化(_init()方法)逻辑。

sequenceDiagram
    autonumber
    participant Main as Main Isolate (WorkerPool)
    participant RP as ReceivePort (resultPort)
    participant W1 as Worker Isolate 1
    participant W2 as Worker Isolate N

    Note over Main: _init() 开始
    Main->>RP: 创建 ReceivePort

    rect rgb(240, 240, 240)
    Note over Main, W2: 循环派生阶段 (Spawn)
    Main->>W1: Isolate.spawn(workerEntryPoint, resultPort.sendPort)
    Main->>W2: Isolate.spawn(workerEntryPoint, resultPort.sendPort)
    end

    Note over Main, RP: 进入 resultPort.listen 监听

    rect rgb(230, 255, 230)
    Note over W1, Main: 阶段一:握手协议 (Handshake)
    W1->>RP: 发送 (workerId, workerSendPort)
    RP-->>Main: 触发 listen 回调
    Main->>Main: 创建 Worker 实例
    Main->>Main: worker.takeOverTask (尝试接管旧任务)
    alt 无旧任务 (taskData == null)
        Main->>Main: 加入空闲控制器 (_idleWorkerController)
    else 有旧任务 (需重算)
        Main->>W1: _execute (重新发送任务)
    end
    Note over Main: 🤝 握手成功
    end

    rect rgb(230, 240, 255)
    Note over W1, Main: 阶段二:结果返回 (Result)
    W1->>RP: 发送 (workerId, ok, result)
    RP-->>Main: 触发 listen 回调
    Main->>Main: _handleResult(workerId, ok, result)
    end

resultPort 不仅仅用于接收计算结果,它还承担了 “控制流” 的职责。在 Worker Isolate 刚启动时,它通过这个端口回传自己的 SendPort。这种“自报家门”的机制是建立双向通讯的基础。 在握手成功的一瞬间,代码立即执行了 takeOverTask()。这在重启场景下至关重要:新 Worker 在逻辑上“透明”地替换了崩溃的旧 Worker,并立即检查是否需要继续执行未完成的任务。只有在确认没有任务需要接管时,才会将 Worker 放入 _idleWorkerController,避免了 Worker 在接管任务的同时又被分配了新任务。

具体实现如下

  Future<void> _init() async {
    final resultPort = ReceivePort();

    for (int workerId = 0; workerId < size; workerId++) {
      _isolates[workerId] = await _spawn(workerId, resultPort);
    }

    resultPort.listen((message) {
      if (message case (int workerId, bool ok, dynamic result)) {
        _handleResult(workerId, ok, result);
      } else if (message case (int workerId, SendPort workerPort)) {
        final workerName = _workerName(workerId);
        final worker = Worker(
          workerId,
          workerName,
          _isolates[workerId]!,
          workerPort,
          _handleWorkerCrash,
        );
        worker.takeOverTask(_workers[workerId]);
        _workers[workerId] = worker;

        if (worker.taskData == null) {
          _idleWorkerController.sink.add(worker);
        } else {
          // Recompute
          _execute(worker, worker.taskData!, worker.completer!);
        }
        myprint('main: 🤝 $workerName 握手成功');
      }
    });

    _resultPort = resultPort;
  }

新的_execute()方法,记录了 TaskDatacomputations(任务执行次数),以支持 任务接管takeOverTask) 和 快速失败(任务的执行次数已达上限):

 void _execute(Worker worker, TaskData taskData, Completer completer) {
    worker.idle = false;
    worker.completer = completer;
    worker.taskData = taskData;
    worker.computations++;
    worker.sendPort.send(taskData);
  }

崩溃发生时,旧 Worker 的 taskDatacompleter 被平滑转移给新 Worker,保证了业务层对底层崩溃几乎“无感知”。

 void takeOverTask(Worker? worker) {
    if (worker == null) return;
    taskData = worker.taskData;
    completer = worker.completer;
    computations = worker.computations;
  }

使用指数退避(Exponential Backoff)算法

WorkerPool_handleWorkerCrashl方法的最后一行:

    _isolates[worker.id] = await _spawn(worker.id, _resultPort!);

如果 _spawn 抛出异常(如内存不足引起IsolateSpawnException),就会导致 Isolate 重启失败。因此我们需要为这行代码进行 try-catch,当异常发生时,尝试重试。这里最好采用指数退避算法来确定重试的延迟时间(delay)。

$$delay = base \times 2^{n}$$

其中

  • base: 初始等待时间(例如 200ms)
  • n: 连续失败的次数(第 n 次重试)

为了防止延迟时间无限增长,通常会设置一个上限(max_delay):

$$delay = \min(base \times 2^{n}, \text{max_delay})$$

本文采用带随机抖动(jitter)的指数退避算法:

$$delay = \min(base \cdot 2^{n}, \text{max_delay}) + \text{jitter}$$

具体的实现如下:

  final _retryCounts = <int, int>{}; // worker reborn counts
  final Duration _baseDelay = Duration(milliseconds: 200); // reborn base delay
  final Duration _maxDelay = Duration(seconds: 60); // reborn max delay

  ///  Reborn Isolate (Exponential Backoff)
  void _reborn(Worker worker) {
    if (_closed) return;

    final id = worker.id;
    final currentRetry = _retryCounts[id] ?? 0;
    _retryCounts[id] = currentRetry + 1;

    var delay = _baseDelay * pow(2, currentRetry);
    if (delay > _maxDelay) delay = _maxDelay;
    delay += Duration(milliseconds: Random().nextInt(50)); // +jitter

    myprint(
      'reborn worker ${worker.name} ($currentRetry)'
      ' in ${delay.inMilliseconds}ms ...',
    );

    Future.delayed(delay, () async {
      if (_closed) return;
      try {
        _isolates[id] = await _spawn(worker.id, _resultPort!);
      } catch (e) {
        myprint('failed to reborn worker ${worker.name} ($currentRetry): $e');
        if (!_closed && delay < _maxDelay - Duration(milliseconds: 100)) {
          _reborn(worker);
        }
      }
    });
  }

当Worker Isolate 握手成功时,需要重置 _retryCount :

        _retryCounts[workerId] = 0; // reset reborn count

要点解析:

a. 避免群效应:代码中的随机抖动 jitter = Random().nextInt(50) ms ,看似这微小,但却是多Isolate或分布式系统中的“防洪坝”——它能有效避免多个 Isolate 在同一时刻重启而产生的 CPU 峰值(即惊群效应),因为这些重启请求将被均匀地摊分在时间轴上。

b. 递归重启: 在 catch 块中通过判断 delay < _maxDelay 再次调用_reborn,形成了一个自动重试的闭环。只要延迟没达到上限且 WorkerPool 未关闭,系统就会持续尝试恢复健康。

c. 在 Future.delayed 的开始和 _spawn 之前双重检查 if (_closed) return,确保了在 WorkerPool 关闭期间不会有“幽灵 Isolate”被创建出来。

其它细节

WorkerPool.close()在判断是否有在途任务时,需要同时考虑 Worker.idleWorker.died 两个字段。 代码片段如下

      final buzy =
          ensureExecutingTaskCompleted &&
          _workers.values.any((w) => !w.idle && !w.died);

DEMO

最后我们修改 workerEntryPoint 引入随机性的失败,然后修改main函数后运行一下我们的程序。

void workerEntryPoint((int workerId, SendPort resultPort) message) {
  // ... (omitted for brevity)

  workerReceivePort.listen((data) async {
    if (data case TaskData taskData) {
      // Simulate an uncaught exception
      if (Random().nextBool()) {
        throw 'CRASH';
      }
      // ... (omitted for brevity)
    } else if (data == _shutdownSignal) {
      workerReceivePort.close();
      print('${Isolate.current.debugName} closed');
    }
  });
}

void main() async {
  var pool = WorkerPool(3);
  await Future.delayed(Duration(seconds: 1));

  const n = 9, maxCount = 3;
  var results = await Future.wait([
    for (var x = 1; x <= n; x++)
      pool
          .submit(TaskData.maxComputations(add, (x, x), maxCount))
          .onError((e, s) => -1),
  ]);
  myprint(results);

  await pool.close();
}

/* An example output:
out0 0ms: main: 🤝 pool-worker-0 握手成功
out1 13ms: main: 🤝 pool-worker-1 握手成功
out2 18ms: main: 🤝 pool-worker-2 握手成功
out3 984ms: pool-worker-2 returns 6. computations=1
out4 989ms: pool-worker-1 crash: CRASH
out5 1007ms: reborn worker pool-worker-1 (0) in 200ms ...
out6 1008ms: pool-worker-0 crash: CRASH
out7 1008ms: reborn worker pool-worker-0 (0) in 210ms ...
out8 1009ms: pool-worker-2 crash: CRASH
out9 1009ms: reborn worker pool-worker-2 (0) in 247ms ...
out10 1237ms: main: 🤝 pool-worker-1 握手成功
out11 1237ms: pool-worker-1 crash: CRASH
out12 1237ms: reborn worker pool-worker-1 (0) in 201ms ...
out13 1241ms: main: 🤝 pool-worker-0 握手成功
out14 1242ms: pool-worker-0 crash: CRASH
out15 1242ms: reborn worker pool-worker-0 (0) in 243ms ...
out16 1293ms: main: 🤝 pool-worker-2 握手成功
out17 1294ms: pool-worker-2 returns 8. computations=2
out18 1295ms: pool-worker-2 crash: CRASH
out19 1295ms: reborn worker pool-worker-2 (0) in 244ms ...
out20 1555ms: main: 🤝 pool-worker-1 握手成功
out21 1556ms: pool-worker-1 returns 4. computations=3
out22 1556ms: pool-worker-1 crash: CRASH
out23 1556ms: reborn worker pool-worker-1 (0) in 219ms ...
out24 1571ms: main: 🤝 pool-worker-0 握手成功
out25 1571ms: pool-worker-0 returns 2. computations=3
out26 1572ms: pool-worker-0 crash: CRASH
out27 1572ms: reborn worker pool-worker-0 (0) in 200ms ...
out28 1594ms: main: 🤝 pool-worker-2 握手成功
out29 1595ms: pool-worker-2 crash: CRASH
out30 1595ms: reborn worker pool-worker-2 (0) in 206ms ...
out31 1809ms: main: 🤝 pool-worker-0 握手成功
out32 1810ms: pool-worker-0 crash: CRASH
out33 1810ms: reborn worker pool-worker-0 (0) in 228ms ...
out34 1823ms: main: 🤝 pool-worker-1 握手成功
out35 1823ms: pool-worker-1 returns 12. computations=2
out36 1824ms: pool-worker-1 crash: CRASH
out37 1824ms: reborn worker pool-worker-1 (0) in 241ms ...
out38 1838ms: main: 🤝 pool-worker-2 握手成功
out39 1838ms: pool-worker-2 crash: CRASH
out40 1839ms: reborn worker pool-worker-2 (0) in 226ms ...
out41 2063ms: main: 🤝 pool-worker-0 握手成功
out42 2063ms: pool-worker-0 returns 14. computations=3
out43 2063ms: pool-worker-0 returns 18. computations=1
out44 2091ms: main: 🤝 pool-worker-2 握手成功
out45 2097ms: main: 🤝 pool-worker-1 握手成功
out46 2097ms: pool-worker-1 crash: CRASH
out47 2098ms: reborn worker pool-worker-1 (0) in 211ms ...
out48 2336ms: main: 🤝 pool-worker-1 握手成功
out49 2336ms: pool-worker-1 returns 16. computations=3
out50 2339ms: [2, 4, 6, 8, -1, 12, 14, 16, 18]
pool-worker-2 closed
pool-worker-1 closed
pool-worker-0 closed
out51 2363ms: pool-worker-0 exit
out52 2365ms: pool-worker-2 exit
out53 2368ms: pool-worker-1 exit
out54 2451ms: pool closed

结束语

软件工程中有一句名言:“编写并发程序很难,而编写能够正确处理故障的并发程序更难。”

我们所做的每一点改进——处理残余请求的_shutdownSignal、那 50ms 的随机抖动、那看似多余的isolate.kill()、那个接管任务的握手协议——都是为了让我们的系统在面对真实的生产压力、网络波动与硬件极限时,不仅仅是“活着”,而是优雅且稳健地运行。

希望通过对 WorkerPool 的学习,能助您建立起关于防御性并发编程的思维模型。

Reference