在实现文件上传功能时,很多时候会使用 Promise.race 来控制并发:
const pool = new Set();
for (const file of files) { const task = upload(file); pool.add(task); task.finally(() => pool.delete(task));
if (pool.size >= limit) { await Promise.race(pool); } }
|
这段代码确实能限制同时上传几个任务,但它本质上只是一个”信号灯”——它只知道”有一个任务完成了”,却不知道是哪 一个,也不知道当前队列的真实状态。
当我们需要实现暂停、恢复、取消、状态追踪等功能时,Promise.race 就力不从心了。
1.Promise.race 的局限性
| 功能 |
Promise.race |
任务调度系统 |
| 并发数量控制 |
支持 |
支持 |
| 任务管理(增删改查) |
不支持 |
支持 |
| 状态管理 |
不支持 |
支持 |
| 暂停/恢复 |
不支持 |
支持 |
| 失败重试策略 |
不支持 |
支持 |
| 优先级调度 |
不支持 |
支持 |
核心问题:Promise.race 无法追踪任务状态,无法干预任务执行。
2.任务调度系统的设计
2.1 系统架构

核心流程:
- 任务进入等待队列
- 调度器从队列取出任务
- 固定数量的 Worker 消费任务
- 任务完成后,Worker 空闲,继续消费队列
2.2 任务状态定义
const TASK_STATUS = { WAITING: 'waiting', UPLOADING: 'uploading', SUCCESS: 'success', ERROR: 'error', PAUSED: 'paused', };
class UploadTask { constructor(file) { this.id = crypto.randomUUID(); this.file = file; this.status = TASK_STATUS.WAITING; this.progress = 0; this.retryCount = 0; this.maxRetry = 3; this.controller = new AbortController(); } }
|
2.3 调度器核心实现
class UploadScheduler { constructor(concurrency = 3) { this.concurrency = concurrency; this.queue = []; this.running = new Map(); this.paused = false; }
addTask(file) { const task = new UploadTask(file); this.queue.push(task); this.schedule(); return task; }
schedule() { if (this.paused) return;
while ( this.running.size < this.concurrency && this.queue.length > 0 ) { const task = this.queue.shift(); this.runTask(task); } }
async runTask(task) { this.running.set(task.id, task); task.status = TASK_STATUS.UPLOADING;
try { await this.uploadFile(task); task.status = TASK_STATUS.SUCCESS; console.log(`${task.file.name} 上传成功`); this.running.delete(task.id); } catch (err) { if (err.name === 'AbortError') { task.status = TASK_STATUS.PAUSED; } else { console.error(`${task.file.name} 上传失败`, err); if (task.retryCount < task.maxRetry) { task.retryCount++; console.log(`${task.file.name} 重试第 ${task.retryCount} 次`); task.status = TASK_STATUS.WAITING; this.running.delete(task.id); this.queue.push(task); } else { task.status = TASK_STATUS.ERROR; this.running.delete(task.id); } } }
this.schedule(); }
async uploadFile(task) { const formData = new FormData(); formData.append('file', task.file);
const response = await fetch('/upload', { method: 'POST', body: formData, signal: task.controller.signal, });
return await response.json(); } }
|
代码解读:
- 任务池:使用
running Map 追踪正在运行的任务
- 调度循环:
while 循环保证只要有空闲 worker 就会继续调度
- 自动释放:finally 块确保任务结束后 worker 会被释放
- 重试机制:失败任务重新入队,而不是直接标记失败
2.4 暂停与恢复
pause() { this.paused = true;
for (const task of this.running.values()) { task.controller.abort(); } }
resume() { this.paused = false;
for (const task of this.running.values()) { if (task.status === TASK_STATUS.PAUSED) { task.controller = new AbortController(); task.status = TASK_STATUS.WAITING; this.queue.push(task); } }
this.schedule(); }
pauseTask(taskId) { const task = this.running.get(taskId); if (task) { task.controller.abort(); } }
resumeTask(taskId) { const task = this.running.get(taskId); if (task && task.status === TASK_STATUS.PAUSED) { task.controller = new AbortController(); task.status = TASK_STATUS.WAITING; this.queue.push(task); this.schedule(); } }
|
2.5 取消任务
cancel(taskId) { const task = this.running.get(taskId);
if (!task) return;
task.controller.abort();
this.running.delete(taskId); this.schedule(); }
|
2.6 任务状态流转
任务调度系统本质上是一个状态机,不同状态之间会根据任务执行结果发生流转。

状态流转说明
WAITING:任务进入等待队列,尚未开始上传
UPLOADING:任务被调度器分配给 Worker,正在上传
PAUSED:用户主动暂停,通过 AbortController 中断请求
ERROR:上传失败,若未超过最大重试次数会重新进入等待队列
SUCCESS:任务上传完成,生命周期结束
3. 结合分片上传的任务调度
在实际的文件上传场景中,每个大文件会被分成多个 chunk 上传。我们需要在任务调度的基础上,记录每个 chunk 的状态。
3.1 支持分片的任务
class UploadTask { constructor(file) { this.id = crypto.randomUUID(); this.file = file; this.status = TASK_STATUS.WAITING; this.progress = 0;
this.chunks = []; this.chunkStates = []; this.totalChunks = 0; this.doneChunks = 0;
this.controller = new AbortController(); } }
|
3.2 分片上传逻辑
async uploadChunks(task) { const chunks = this.buildChunks(task.file); task.chunks = chunks; task.totalChunks = chunks.length; task.chunkStates = new Array(task.totalChunks).fill('pending');
for (let i = 0; i < task.totalChunks; i++) { if (task.controller.signal.aborted) { return; }
if (task.chunkStates[i] === 'done') { continue; }
task.chunkStates[i] = 'uploading';
try { await this.uploadChunk(task, i); task.chunkStates[i] = 'done'; task.doneChunks++; task.progress = task.doneChunks / task.totalChunks; } catch (err) { if (err.name === 'AbortError') { return; } task.chunkStates[i] = 'error'; throw err; } }
await this.mergeChunks(task); }
buildChunks(file) { const chunks = []; for (let i = 0; i < file.size; i += CHUNK_SIZE) { chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size))); } return chunks; }
|
3.3 支持恢复的分片上传
恢复上传的关键是:跳过已完成的分片。
async resumeUpload(task) { task.controller = new AbortController(); task.status = TASK_STATUS.UPLOADING;
for (let i = 0; i < task.totalChunks; i++) { if (task.controller.signal.aborted) return;
if (task.chunkStates[i] !== 'done') { if (task.chunkStates[i] === 'error') { task.chunkStates[i] = 'uploading'; try { await this.uploadChunk(task, i); task.chunkStates[i] = 'done'; task.doneChunks++; } catch (err) { if (err.name === 'AbortError') { return; } task.chunkStates[i] = 'error'; throw err; } } }
task.progress = task.doneChunks / task.totalChunks; }
await this.mergeChunks(task); }
|
3.4 完整示例
class UploadScheduler { constructor(concurrency = 3) { this.concurrency = concurrency; this.queue = []; this.running = new Map(); this.paused = false; }
addTask(file) { const task = new UploadTask(file); this.queue.push(task); this.schedule(); return task; }
schedule() { if (this.paused) return;
while ( this.running.size < this.concurrency && this.queue.length > 0 ) { const task = this.queue.shift(); this.runTask(task); } }
async runTask(task) { this.running.set(task.id, task); task.status = TASK_STATUS.UPLOADING;
try { await this.uploadChunks(task); task.status = TASK_STATUS.SUCCESS; this.running.delete(task.id); } catch (err) { if (err.name === 'AbortError') { task.status = TASK_STATUS.PAUSED; } else { if (task.retryCount < task.maxRetry) { task.retryCount++; task.status = TASK_STATUS.WAITING; this.running.delete(task.id); this.queue.push(task); } else { task.status = TASK_STATUS.ERROR; this.running.delete(task.id); } } }
this.schedule(); }
async uploadChunks(task) { const chunks = this.buildChunks(task.file); task.chunks = chunks; task.totalChunks = chunks.length; task.chunkStates = new Array(task.totalChunks).fill('pending'); task.doneChunks = 0;
for (let i = 0; i < task.totalChunks; i++) { if (task.controller.signal.aborted) return;
if (task.chunkStates[i] === 'done') continue;
task.chunkStates[i] = 'uploading';
try { await this.uploadChunk(task, i); task.chunkStates[i] = 'done'; task.doneChunks++; task.progress = task.doneChunks / task.totalChunks; } catch (err) { if (err.name === 'AbortError') { return; } task.chunkStates[i] = 'error'; throw err; } }
await this.mergeChunks(task); }
buildChunks(file) { const chunks = []; for (let i = 0; i < file.size; i += CHUNK_SIZE) { chunks.push(file.slice(i, Math.min(i + CHUNK_SIZE, file.size))); } return chunks; }
async uploadChunk(task, chunkIndex) { const formData = new FormData(); formData.append('chunk', task.chunks[chunkIndex]); formData.append('hash', task.hash);
const response = await fetch('/upload', { method: 'POST', body: formData, signal: task.controller.signal, });
if (!response.ok) throw new Error(`HTTP ${response.status}`); }
async mergeChunks(task) { if (DEMO_MODE) return; const response = await fetch('/upload/merge', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ hash: task.hash, name: task.file.name, totalChunks: task.totalChunks }), signal: task.controller.signal, }); if (!response.ok) throw new Error('Merge failed'); }
pause() { this.paused = true; for (const task of this.running.values()) { task.controller.abort(); } }
resume() { this.paused = false; for (const task of this.running.values()) { if (task.status === TASK_STATUS.PAUSED) { task.controller = new AbortController(); task.status = TASK_STATUS.WAITING; this.queue.push(task); } } this.schedule(); }
cancel(taskId) { const task = this.running.get(taskId); if (task) { task.controller.abort(); } }
getStats() { return { queueLength: this.queue.length, runningCount: this.running.size, paused: this.paused, tasks: Array.from(this.running.values()).map(t => ({ id: t.id, name: t.file.name, status: t.status, progress: t.progress, })), }; } }
|
4. 总结
任务调度系统的核心优势
| 特性 |
实现方式 |
| 并发控制 |
running.size < concurrency 条件判断 |
| 任务追踪 |
Map<id, task> 维护所有任务 |
| 状态管理 |
任务状态机 + 状态流转图 |
| 暂停恢复 |
AbortController + 重新入队 |
| 失败重试 |
失败任务重新入队 + 重试计数器 |
| 统一调度 |
schedule() 方法集中处理 |
关键设计思想
- 池化思想:Worker 池复用,而不是每次创建新 worker
- 状态驱动:任务状态决定行为,而不是流程控制
- 可中断:所有异步操作都关联
AbortController
- 自动调度:
finally 块确保 worker 始终被释放
相比 Promise.race 的”只管放行,不管回收”,任务调度系统提供了完整的生命周期管理和干预能力。