并行处理

传统回调的解决方式

对于统计三个文件信息,我们想要通过并行处理提升性能,使用传统的回调解决策略,会是这样的:

let info = {};

function print() {
    keys = Object.keys(info);
    if (keys.length === 3 ) {
        console.dir(info);
    }
}

fs.stat('file1.md', function(err, stat) {
    if(!err) {
        info['file1'] = stat.size;
        print();
    }
});

fs.stat('file2.md', function(err, stat) {
    if(!err) {
        info['file2'] = stat.size;
        print();
    }
});

fs.stat('file3.md', function(err, stat) {
    if(!err) {
        info['file3'] = stat.size;
        print();
    }    
})

传统回调组织并行任务时,乍看起来还好,但是我们发现,其免不了的是需要对任务完成情况(即并行任务的是否结束)进行重复性判断,比如上例中,我们要判断三个文件是否都统计完毕,就需要在各个异步任务中重复执行 print 方法中的判断逻辑:

if (keys.length === 3 ) {
    console.dir(info);
}

利用 generator 并行处理任务

除了重复,上述判断逻辑无疑也是强侵入性的,而如果使用 generator,我们希望最终能通过一个数组就组织并行任务:

function *main(){
    let sizeInfo = {};
    let sizes = yield [
        size('file1.md'),
        size('file2.md'),
        size('file3.md')
    ];
    sizeInfo['file1'] = sizes[0];
    sizeInfo['file2'] = sizes[1];
    sizeInfo['file3'] = sizes[2];
}

实现这个目标之前,需要考虑如下几点:

  • 异步任务结果的组织顺序能与声明顺序保持一致,而不是与完成顺序一致,因为任务的完成顺序是不定的
  • 并行任务中的任意一个失败,不再继续后续任务,而是抛出错误,交给开发者俘获处理

为此,我们需要考虑修改运行器,当 generator 遇到数组进入暂态时,额外进行处理:

function runGenerator(gen) {
    // 先获得迭代器
    const it = gen();
    // 驱动generator运行
    next();

    function next(err, res) {
        if(err) {
            return it.throw(err);
        }

        const { value, done } = it.next(res);
        if(done) {
            return;
        }

        if (Array.isArray(value)) {
            // 存放异步任务结果
            const results = [];
            // 等待完成的任务数
            let pending = value.length;
            // 当任务队列里任意一个任务发生错误时,终止所有任务的继续
            let finished = false;
            value.forEach(function(func, index) {
                func.call(this, function(err, res) {
                    if(err) {
                        finished = true;
                        next(err);
                    } else {
                        // 保证结果的存放顺序
                        results[index] = res;
                        // 直到所有任务执行完毕
                        if(--pending === 0) {
                            next(null, results);
                        }
                    }
                })
            })
        }
        if(typeof value === 'function') {
            value.call(this, function(err, res) {
                if(err) {
                    next(err, null);
                } else {
                    next(null, res);
                }
            });
        }
    }
}

results matching ""

    No results matching ""