Worker Pool(续1):残余请求问题

本节及 下一节上一节 Worker Pool (Isolate 池) 的深度延伸。本节我们将探讨并发编程中一个棘手的边缘场景:残余请求——即那些在系统关闭瞬间仍处于传输或执行状态的残留任务。

残余请求问题

在 WorkerPool 关闭时,残余请求通常以两种形式存在:

  • 在途请求(In-flight):任务已离开队列(_taskQueue),正处在发往 Worker 的途中;
  • 执行中请求(In-progress):Worker 已经接收并正在执行计算任务。

我们不妨将它们分别标记为 Q1Q2

Q1. ”在途请求“要求我们在关闭 _resultPort之前,先关闭所有workerReceivePort,以防止worker处理完任务后访问已关闭的资源(resultPort)。记住:永远先关闭‘输入端’,后关闭‘输出端’。

Q2. “执行中请求”要求我们考虑这样一个问题:在关闭 WorkerPool时,所有的Woker都必须处于空闲状态吗(即没有任务在执行中)? 这就是 WorkerPool.close()方法中 ensureExecutingTaskCompleted参数存在的原因。

解决方案

我们定义如下 WorkerPool 优雅关闭协议

sequenceDiagram
    autonumber
    participant M as WorkerPool (Main Isolate)
    participant W as Worker (Worker Isolate)

    Note over M, W: --- 阶段 1: 拦截 Q1 (断源) ---
    M->>M: 设置 _closed = true
    
    par 临界竞争
        M->>W: 发送 shutdown 指令
        M->>W: 在途任务 (Q1)
    end

    Note right of W: Worker 优先处理 #shutdown
    W->>W: 自行关闭 ReceivePort
    Note right of W: 后续抵达的 Q1 任务被自动忽略

    Note over M, W: --- 阶段 2: 消化 Q2 (排空) ---
    loop 轮询直到 busy == false
        W->>W: 继续处理执行中的 Q2 任务
        W-->>M: 回传 Q2 结果
    end

    Note over M, W: --- 阶段 3: 终结 (清理) ---
    M->>M: 关闭 _resultPort
    M->>W: 执行 isolate.kill()
    

Q1(在途请求)

在关闭 _resultPort 之前,先关闭所有workerReceivePort,该如何实现这一点呢?Worker要想收到 close 指令,必须由 WorkerPool 发送给它,我们可以将这个close指令称为shutdown消息。

首先定义_shutdownSignal常量,它表示 shutdown消息:

const _shutdownSignal = #shutdown;

然后修改 workerEntryPoint,让其处理 _shutdownSignal

void workerEntryPoint((int workerId, SendPort resultPort) message) {
  // ... (omitted for brevity)

  workerReceivePort.listen((data) async {
    if (data case TaskData taskData) {
      // ... (omitted for brevity)
    } else if (data == _shutdownSignal) {
      workerReceivePort.close();
      print('${Isolate.current.debugName} closed');
    }
  });
}

最后修改WorkerPoo.close()方法,给Worker发送_shutdownSignal:

Future close({
    bool ensureExecutingTaskCompleted = true,
    bool ensureTaskQueueEmpty = true,
    Duration timeout = const Duration(seconds: 5),
  }) async {
    if (_closed) return;
    _closed = true;

    // Notify workers to shutdown
    for (var w in _workers.values) {
      w.sendPort.send(_shutdownSignal);
    }

    // ... (omitted for brevity)
}

Q2(执行中请求)

WorkerPool.close()方法中 ensureExecutingTaskCompleted参数的默认值为true,表示WorkerPool要等到当前所有待执行任务完成后,才会去执行close逻辑(目前的代码已实现)。 而当ensureExecutingTaskCompleted 值为false时,表示强制结束当前所有的待执行任务,其参考实现如下。

  Future close({
    bool ensureExecutingTaskCompleted = true,
    bool ensureTaskQueueEmpty = true,
    Duration timeout = const Duration(seconds: 5),
  }) async {
    if (_closed) return;
    _closed = true;

    // Complete the futures
    if (!ensureExecutingTaskCompleted) {
      for (var w in _workers.values) {
        if (w.completer != null && !w.completer!.isCompleted) {
          w.completer!.completeError('worker pool force closed');
        }
      }
    }

    // ... (omitted for brevity)
  }  

结束语

本节讨论了Dart并发编程中的残余请求问题及解决方案(含代码示例),下一节将讨论 WorkerPool的最后一块拼图 —— 重启意外崩溃的 Isolate。