从 Promise.race 到并发任务调度器:实现可控的文件上传

在实现文件上传功能时,很多时候会使用 Promise.race 来控制并发:

const pool = new Set();

for (const file of files) {
const task = upload(file);
pool.add(task);
task.finally(() => pool.delete(task));

if (pool.size >= limit) {
await Promise.race(pool);
}
}

这段代码确实能限制同时上传几个任务,但它本质上只是一个”信号灯”——它只知道”有一个任务完成了”,却不知道是哪 一个,也不知道当前队列的真实状态。

当我们需要实现暂停、恢复、取消、状态追踪等功能时,Promise.race 就力不从心了。

1.Promise.race 的局限性

功能 Promise.race 任务调度系统
并发数量控制 支持 支持
任务管理(增删改查) 不支持 支持
状态管理 不支持 支持
暂停/恢复 不支持 支持
失败重试策略 不支持 支持
优先级调度 不支持 支持

核心问题:Promise.race 无法追踪任务状态,无法干预任务执行

2.任务调度系统的设计

2.1 系统架构

核心流程

  1. 任务进入等待队列
  2. 调度器从队列取出任务
  3. 固定数量的 Worker 消费任务
  4. 任务完成后,Worker 空闲,继续消费队列

2.2 任务状态定义

// 定义任务的所有可能状态
const TASK_STATUS = {
WAITING: 'waiting', // 等待中
UPLOADING: 'uploading', // 上传中
SUCCESS: 'success', // 上传成功
ERROR: 'error', // 上传失败
PAUSED: 'paused', // 已暂停
};

// 上传任务类
class UploadTask {
constructor(file) {
this.id = crypto.randomUUID(); // 唯一标识
this.file = file; // 文件对象
this.status = TASK_STATUS.WAITING; // 当前状态
this.progress = 0; // 进度 0-1
this.retryCount = 0; // 重试次数
this.maxRetry = 3; // 最大重试次数
this.controller = new AbortController(); // 取消控制器
}
}

2.3 调度器核心实现

class UploadScheduler {
constructor(concurrency = 3) {
this.concurrency = concurrency; // 最大并发数
this.queue = []; // 等待队列
this.running = new Map(); // 正在运行的任务 Map<id, task>
this.paused = false; // 全局暂停标志
}

// 添加任务
addTask(file) {
const task = new UploadTask(file);
this.queue.push(task);
this.schedule();
return task;
}

// 核心调度逻辑
schedule() {
// 暂停时不调度
if (this.paused) return;

// 只要还有空闲 worker 且队列还有任务
while (
this.running.size < this.concurrency &&
this.queue.length > 0
) {
const task = this.queue.shift();
this.runTask(task);
}
}

// 执行任务
async runTask(task) {
// 放入运行池
this.running.set(task.id, task);
task.status = TASK_STATUS.UPLOADING;

try {
await this.uploadFile(task);
task.status = TASK_STATUS.SUCCESS;
console.log(`${task.file.name} 上传成功`);
// 成功时从 running 删除
this.running.delete(task.id);
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
// 用户主动暂停,不删除!保持 running 中的引用
task.status = TASK_STATUS.PAUSED;
} else {
console.error(`${task.file.name} 上传失败`, err);
// 真实错误,触发重试逻辑
if (task.retryCount < task.maxRetry) {
task.retryCount++;
console.log(`${task.file.name} 重试第 ${task.retryCount} 次`);
task.status = TASK_STATUS.WAITING;
this.running.delete(task.id); // 重试入队,从 running 删除
this.queue.push(task); // 重新入队
} else {
task.status = TASK_STATUS.ERROR;
this.running.delete(task.id); // 失败耗尽,从 running 删除
}
}
}

// 继续调度
this.schedule();
}

// 上传文件
async uploadFile(task) {
const formData = new FormData();
formData.append('file', task.file);

const response = await fetch('/upload', {
method: 'POST',
body: formData,
signal: task.controller.signal, // 关联取消控制器
});

return await response.json();
}
}

代码解读

  1. 任务池:使用 running Map 追踪正在运行的任务
  2. 调度循环while 循环保证只要有空闲 worker 就会继续调度
  3. 自动释放:finally 块确保任务结束后 worker 会被释放
  4. 重试机制:失败任务重新入队,而不是直接标记失败

2.4 暂停与恢复

// 暂停全部
pause() {
this.paused = true;

// 中止所有正在运行的请求
for (const task of this.running.values()) {
task.controller.abort();
// 注意:status 会在 runTask 的 catch 中被设为 PAUSED
}
}

// 恢复全部
resume() {
this.paused = false;

// 将暂停的任务重新入队
for (const task of this.running.values()) {
if (task.status === TASK_STATUS.PAUSED) {
// 创建新的 AbortController(一个只能 abort 一次)
task.controller = new AbortController();
task.status = TASK_STATUS.WAITING;
this.queue.push(task);
}
}

this.schedule();
}

// 暂停单个任务
pauseTask(taskId) {
const task = this.running.get(taskId);
if (task) {
task.controller.abort();
// status 会在 runTask 的 catch 中被设为 PAUSED
}
}

// 恢复单个任务
resumeTask(taskId) {
const task = this.running.get(taskId);
if (task && task.status === TASK_STATUS.PAUSED) {
// 创建新的 AbortController(一个只能 abort 一次)
task.controller = new AbortController();
task.status = TASK_STATUS.WAITING;
this.queue.push(task);
this.schedule();
}
}

2.5 取消任务

// 取消指定任务
cancel(taskId) {
const task = this.running.get(taskId);

if (!task) return;

// 终止请求
task.controller.abort();
// 注意:status 会在 runTask 的 catch 中被设为 ERROR

this.running.delete(taskId);
this.schedule();
}

2.6 任务状态流转

任务调度系统本质上是一个状态机,不同状态之间会根据任务执行结果发生流转。

状态流转说明

  • WAITING:任务进入等待队列,尚未开始上传
  • UPLOADING:任务被调度器分配给 Worker,正在上传
  • PAUSED:用户主动暂停,通过 AbortController 中断请求
  • ERROR:上传失败,若未超过最大重试次数会重新进入等待队列
  • SUCCESS:任务上传完成,生命周期结束

3. 结合分片上传的任务调度

在实际的文件上传场景中,每个大文件会被分成多个 chunk 上传。我们需要在任务调度的基础上,记录每个 chunk 的状态。

3.1 支持分片的任务

class UploadTask {
constructor(file) {
this.id = crypto.randomUUID();
this.file = file;
this.status = TASK_STATUS.WAITING;
this.progress = 0;

// 分片相关
this.chunks = []; // 分片数组
this.chunkStates = []; // 每个分片的状态
this.totalChunks = 0;
this.doneChunks = 0;

// 取消控制器
this.controller = new AbortController();
}
}

3.2 分片上传逻辑

async uploadChunks(task) {
// 生成分片
const chunks = this.buildChunks(task.file);
task.chunks = chunks;
task.totalChunks = chunks.length;
task.chunkStates = new Array(task.totalChunks).fill('pending');

// 逐个上传分片
for (let i = 0; i < task.totalChunks; i++) {
// 检查是否被暂停/取消
if (task.controller.signal.aborted) {
return; // 用户暂停,直接返回,不抛错误
}

// 跳过已完成的分片(支持恢复)
if (task.chunkStates[i] === 'done') {
continue;
}

task.chunkStates[i] = 'uploading';

try {
await this.uploadChunk(task, i);
task.chunkStates[i] = 'done';
task.doneChunks++;
task.progress = task.doneChunks / task.totalChunks;
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
return; // 用户暂停,不抛错误,不触发外层重试
}
// 真实错误
task.chunkStates[i] = 'error';
throw err; // 抛出给外层处理
}
}

// 合并分片
await this.mergeChunks(task);
}

// 生成分片
buildChunks(file) {
const chunks = [];
for (let i = 0; i < file.size; i += CHUNK_SIZE) {
chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size)));
}
return chunks;
}

3.3 支持恢复的分片上传

恢复上传的关键是:跳过已完成的分片

async resumeUpload(task) {
// 创建新的 AbortController
task.controller = new AbortController();
task.status = TASK_STATUS.UPLOADING;

// 从上次中断的地方继续
for (let i = 0; i < task.totalChunks; i++) {
if (task.controller.signal.aborted) return;

// 跳过已完成和失败的分片
if (task.chunkStates[i] !== 'done') {
if (task.chunkStates[i] === 'error') {
// 重试失败的 chunk
task.chunkStates[i] = 'uploading';
try {
await this.uploadChunk(task, i);
task.chunkStates[i] = 'done';
task.doneChunks++;
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
return; // 用户暂停,不抛错误
}
task.chunkStates[i] = 'error';
throw err; // 真实错误抛给外层
}
}
}

task.progress = task.doneChunks / task.totalChunks;
}

await this.mergeChunks(task);
}

3.4 完整示例

class UploadScheduler {
constructor(concurrency = 3) {
this.concurrency = concurrency;
this.queue = [];
this.running = new Map();
this.paused = false;
}

addTask(file) {
const task = new UploadTask(file);
this.queue.push(task);
this.schedule();
return task;
}

schedule() {
if (this.paused) return;

while (
this.running.size < this.concurrency &&
this.queue.length > 0
) {
const task = this.queue.shift();
this.runTask(task);
}
}

async runTask(task) {
this.running.set(task.id, task);
task.status = TASK_STATUS.UPLOADING;

try {
await this.uploadChunks(task);
task.status = TASK_STATUS.SUCCESS;
// 成功或重试入队后,才从 running 删除
this.running.delete(task.id);
} catch (err) {
if (err.name === 'AbortError') {
// 用户暂停:不删除!保持 running 中的引用
task.status = TASK_STATUS.PAUSED;
// 注意:不在这里删除 task,让它在 running 中保留
// resume 时可以找到并重新入队
} else {
// 真实错误
if (task.retryCount < task.maxRetry) {
task.retryCount++;
task.status = TASK_STATUS.WAITING;
this.running.delete(task.id); // 重试入队,从 running 删除
this.queue.push(task);
} else {
task.status = TASK_STATUS.ERROR;
this.running.delete(task.id); // 失败耗尽,从 running 删除
}
}
}

this.schedule();
}

async uploadChunks(task) {
const chunks = this.buildChunks(task.file);
task.chunks = chunks;
task.totalChunks = chunks.length;
task.chunkStates = new Array(task.totalChunks).fill('pending');
task.doneChunks = 0;

for (let i = 0; i < task.totalChunks; i++) {
if (task.controller.signal.aborted) return;

if (task.chunkStates[i] === 'done') continue;

task.chunkStates[i] = 'uploading';

try {
await this.uploadChunk(task, i);
task.chunkStates[i] = 'done';
task.doneChunks++;
task.progress = task.doneChunks / task.totalChunks;
} catch (err) {
// 关键:区分是用户暂停还是真实错误
if (err.name === 'AbortError') {
return;
}
task.chunkStates[i] = 'error';
throw err;
}
}

await this.mergeChunks(task);
}

buildChunks(file) {
const chunks = [];
for (let i = 0; i < file.size; i += CHUNK_SIZE) {
chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size)));
}
return chunks;
}

async uploadChunk(task, chunkIndex) {
const formData = new FormData();
formData.append('chunk', task.chunks[chunkIndex]);
formData.append('hash', task.hash);

const response = await fetch('/upload', {
method: 'POST',
body: formData,
signal: task.controller.signal,
});

if (!response.ok) throw new Error(`HTTP ${response.status}`);
}

async mergeChunks(task) {
if (DEMO_MODE) return;
const response = await fetch('/upload/merge', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ hash: task.hash, name: task.file.name, totalChunks: task.totalChunks }),
signal: task.controller.signal,
});
if (!response.ok) throw new Error('Merge failed');
}

pause() {
this.paused = true;
for (const task of this.running.values()) {
task.controller.abort();
// status 由 runTask 的 catch 设置为 PAUSED
}
}

resume() {
this.paused = false;
for (const task of this.running.values()) {
if (task.status === TASK_STATUS.PAUSED) {
task.controller = new AbortController();
task.status = TASK_STATUS.WAITING;
this.queue.push(task);
}
}
this.schedule();
}

cancel(taskId) {
const task = this.running.get(taskId);
if (task) {
task.controller.abort();
// status 由 runTask 的 catch 设置为 ERROR(因为不是 AbortError)
}
}

getStats() {
return {
queueLength: this.queue.length,
runningCount: this.running.size,
paused: this.paused,
tasks: Array.from(this.running.values()).map(t => ({
id: t.id,
name: t.file.name,
status: t.status,
progress: t.progress,
})),
};
}
}

4. 总结

任务调度系统的核心优势

特性 实现方式
并发控制 running.size < concurrency 条件判断
任务追踪 Map<id, task> 维护所有任务
状态管理 任务状态机 + 状态流转图
暂停恢复 AbortController + 重新入队
失败重试 失败任务重新入队 + 重试计数器
统一调度 schedule() 方法集中处理

关键设计思想

  1. 池化思想:Worker 池复用,而不是每次创建新 worker
  2. 状态驱动:任务状态决定行为,而不是流程控制
  3. 可中断:所有异步操作都关联 AbortController
  4. 自动调度finally 块确保 worker 始终被释放

相比 Promise.race 的”只管放行,不管回收”,任务调度系统提供了完整的生命周期管理和干预能力。