Worker Pool(续1):残余请求问题
本节及 下一节 是 上一节 Worker Pool (Isolate 池) 的深度延伸。本节我们将探讨并发编程中一个棘手的边缘场景:残余请求——即那些在系统关闭瞬间仍处于传输或执行状态的残留任务。
残余请求问题
在 WorkerPool 关闭时,残余请求通常以两种形式存在:
- 在途请求(In-flight):任务已离开队列(
_taskQueue),正处在发往 Worker 的途中; - 执行中请求(In-progress):Worker 已经接收并正在执行计算任务。
我们不妨将它们分别标记为 Q1 和 Q2。
Q1. ”在途请求“要求我们在关闭 _resultPort之前,先关闭所有worker的 ReceivePort,以防止worker处理完任务后访问已关闭的资源(resultPort)。记住:永远先关闭‘输入端’,后关闭‘输出端’。
Q2. “执行中请求”要求我们考虑这样一个问题:在关闭 WorkerPool时,所有的Woker都必须处于空闲状态吗(即没有任务在执行中)? 这就是 WorkerPool.close()方法中 ensureExecutingTaskCompleted参数存在的原因。
解决方案
我们定义如下 WorkerPool 优雅关闭协议。
sequenceDiagram
autonumber
participant M as WorkerPool (Main Isolate)
participant W as Worker (Worker Isolate)
Note over M, W: --- 阶段 1: 拦截 Q1 (断源) ---
M->>M: 设置 _closed = true
par 临界竞争
M->>W: 发送 shutdown 指令
M->>W: 在途任务 (Q1)
end
Note right of W: Worker 优先处理 #shutdown
W->>W: 自行关闭 ReceivePort
Note right of W: 后续抵达的 Q1 任务被自动忽略
Note over M, W: --- 阶段 2: 消化 Q2 (排空) ---
loop 轮询直到 busy == false
W->>W: 继续处理执行中的 Q2 任务
W-->>M: 回传 Q2 结果
end
Note over M, W: --- 阶段 3: 终结 (清理) ---
M->>M: 关闭 _resultPort
M->>W: 执行 isolate.kill()
Q1(在途请求)
在关闭 _resultPort 之前,先关闭所有worker的 ReceivePort,该如何实现这一点呢?Worker要想收到 close 指令,必须由 WorkerPool 发送给它,我们可以将这个close指令称为shutdown消息。
首先定义_shutdownSignal常量,它表示 shutdown消息:
const _shutdownSignal = #shutdown;
然后修改 workerEntryPoint,让其处理 _shutdownSignal:
void workerEntryPoint((int workerId, SendPort resultPort) message) {
// ... (omitted for brevity)
workerReceivePort.listen((data) async {
if (data case TaskData taskData) {
// ... (omitted for brevity)
} else if (data == _shutdownSignal) {
workerReceivePort.close();
print('${Isolate.current.debugName} closed');
}
});
}
最后修改WorkerPoo.close()方法,给Worker发送_shutdownSignal:
Future close({
bool ensureExecutingTaskCompleted = true,
bool ensureTaskQueueEmpty = true,
Duration timeout = const Duration(seconds: 5),
}) async {
if (_closed) return;
_closed = true;
// Notify workers to shutdown
for (var w in _workers.values) {
w.sendPort.send(_shutdownSignal);
}
// ... (omitted for brevity)
}
Q2(执行中请求)
WorkerPool.close()方法中 ensureExecutingTaskCompleted参数的默认值为true,表示WorkerPool要等到当前所有待执行任务完成后,才会去执行close逻辑(目前的代码已实现)。
而当ensureExecutingTaskCompleted 值为false时,表示强制结束当前所有的待执行任务,其参考实现如下。
Future close({
bool ensureExecutingTaskCompleted = true,
bool ensureTaskQueueEmpty = true,
Duration timeout = const Duration(seconds: 5),
}) async {
if (_closed) return;
_closed = true;
// Complete the futures
if (!ensureExecutingTaskCompleted) {
for (var w in _workers.values) {
if (w.completer != null && !w.completer!.isCompleted) {
w.completer!.completeError('worker pool force closed');
}
}
}
// ... (omitted for brevity)
}
结束语
本节讨论了Dart并发编程中的残余请求问题及解决方案(含代码示例),下一节将讨论 WorkerPool的最后一块拼图 —— 重启意外崩溃的 Isolate。