node 异步编程

高阶函数:把函数参数作为参数,或作为返回值

偏函数: 将传入参数作为判断或者其他逻辑条件

注意点

异常处理

异步I/O有两个处理阶段,中间有事件循环调度,异步方法在提交请求后就返回了,try/catch只能捕获当次事件循环内的异常,不能捕获callback的异常

1
2
3
4
5
6
7
8
9
try {
req.body = JSON.parse(buf, options.reviver);
//callback()不能放在这,若callback异常,会导致它多次执行
} catch (err) {
err.body = buf;
err.status = 400;
return callback(err);
}
callback();

阻塞代码

这么写会持续占用CPU资源,破坏事件循环的调度,因为Node是单线程,会导致其余请求得不到响应

1
2
3
4
5
6
// TODO
var start = new Date();
while (new Date() - start < 1000) {
// TODO
}
// 需要阻塞的代码

异步编程解决方案

事件

Node中事件的发布通常是伴随事件循环而异步触发的

  • 事件与侦听器是多对多的,设置过多的侦听器会导致过多的CPU占用
  • 要给EventEmitter对象添加error事件和处理,若触发error事件不处理,会引起线程退出\
  • 使用once()添加的侦听器只会执行一次,在执行后就会将相关的事件移除

使用事件的途径

  • 实例化events
1
2
3
4
5
var events =require(‘events’);
var emitter = new events.EventEmitter();
emitter.on('do',function(value){console.log(value)});
emitter.emit('do','doing');
  • 继承events模块
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var util = require('util')
var events = require('events');
function Stream() {
events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
Stream.prototype.test = function() {
var self = this;
try {
throw 111;
} catch (error) {
self.emit('error')
}
}
var stream = new Stream();
stream.on('error', function() {
console.error('asdasd')
});
stream.test();
利用Once()解决雪崩问题
  • 在高访问量、大并发量的情况下缓存突然失效, 大量的请求同时涌入数据库中,数据库无法承受,影响网站速度
  • 在缓存中无数据时,访问量大的话,同条SQL可能会被执行多次,此时可以将所有请求放入事件队列,利用once()来绑定,SQL在执行时,新到的相同调用只需在队列中等待数据,一旦查询结束,得到的结果可以被这些调用共同使用
1
2
3
4
5
6
7
8
9
10
11
12
var proxy = new events.EventEmitter();
var status = "ready";
var select = function(callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function(results) {
proxy.emit("selected", results);
status = "ready";
});
}
};

可能会因为侦听器过多引发警号,调用setMaxListeners(0)移除或设置大的阈值

EventProxy 处理多事件对一侦听器,适用于实例化事件
  • all() 当每个事件都被触发了才会执行,只执行一次
1
2
3
var proxy = new EventProxy();
proxy.all("template", "data", "resources", function(template, data, resources) { // TODO
});
  • tail() 首次也是需要每个事件都被触发,之后只要某个事件触发,就会用最新的数据执行
  • after() 事件执行多少次后执行侦听器的单一事件组合订阅
1
2
3
var proxy = new EventProxy();
proxy.after("data", 10, function(datas) { // TODO
});
  • not()
  • any()

EventProxy对异常处理的优化

  • fail()
1
2
3
4
5
6
7
8
9
10
11
12
ep.fail(callback);
//等价于
ep.fail(function(err) {
callback(err);
});
//又等价于
ep.bind('error', function(err) {
// 卸载所有处理函数
ep.unbind();
// 异常回调
callback(err);
});
  • done()
1
2
3
4
5
6
7
8
9
ep.done('tpl');
//等价于
function(err, content) {
if (err) {
// 发生异常,交给error事件处理函数处理
return ep.emit('error', err);
}
ep.emit('tpl', content);
}

done()接受函数参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
ep.done(function(content) {
// TODO
// 无需考虑异常
ep.emit('tpl', content);
});
//等价于
function(err, content) {
if (err) {
// 发生异常 给error事件的处理函数处理
return ep.emit('error', err);
}
(function(content) {
// TODO
// 无需考虑异常
ep.emit('tpl', content);
}(content));
}

代码对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
exports.getContent = function(callback) {
var ep = new EventProxy();
ep.all('tpl', function(tpl, data) {
// 成功回调
callback(null, {
template: tpl,
data: data
});
});
// 帧听error事件
ep.bind('error', function(err) {
// 卸载 所有处理函数
ep.unbind();
// 异常回调
callback(err);
});
fs.readFile('template.tpl', 'utf-8', function(err, content) {
if (err) {
// 发生异常,给error事件的处理函数处理
return ep.emit('error', err);
}
ep.emit('tpl', content);
});
};
exports.getContent = function(callback) {
var ep = new EventProxy();
ep.all('tpl', function(tpl, data) {
// 成功回调
callback(null, {
template: tpl,
data: data
});
});
//绑定错误处理函数
ep.fail(callback);
fs.readFile('template.tpl', 'utf-8', ep.done('tpl'));
};

Promise/Deferred(应用参见:es6 Promise)

  • 先执行异步调用,后传递处理方法
  • then()方法只接受function对象,继续返回promise()对象,可选progress事件传入
  • Promise是高级接口,依靠低级接口事件来实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//then
var Promise = function() {
EventEmitter.call(this);
};
util.inherits(Promise, EventEmitter);
Promise.prototype.then = function(fulfilledHandler, errorHandler, progressHandler) {
if (typeof fulfilledHandler === 'function') {
// 用once()方法 保证成功回调只执行一次
this.once('success', fulfilledHandler);
}
if (typeof errorHandler === 'function') {
// 用once()方法 保证成功回调只执行一次
this.once('error', errorHandler);
}
if (typeof progressHandler === 'function') {
this.on('progress', progressHandler);
}
return this;
};
//实现这些功能的对象被称为Deferred,即延迟对象
var Deferred = function() {
this.state = 'unfulfilled';
this.promise = new Promise();
};
Deferred.prototype.resolve = function(obj) {
this.state = 'fulfilled';
this.promise.emit('success', obj);
};
Deferred.prototype.reject = function(err) {
this.state = 'failed';
this.promise.emit('error', err);
};
Deferred.prototype.progress = function(data) {
this.promise.emit('progress', data);
};

对res改造成promise

Deferred用于内部,维护异步模型的状态,Promise作用于外部,通过then()暴露给外部以添加自定义逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
var promisify = function(res) {
var deferred = new Deferred();
var result = '';
res.on('data', function(chunk) {
result += chunk;
deferred.progress(chunk);
});
res.on('end', function() {
deferred.resolve(result);
});
res.on('error', function(err) {
deferred.reject(err);
});
return deferred.promise;//更改内部状态的行为由定义者处理
};
//then调用的是promise
promisify(res).then(function() {
// Done
}, function(err) {
// Error
}, function(chunk) {
// progress
console.log('BODY: ' + chunk);
});

Promise多异步协作,all()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Deferred.prototype.all = function(promises) {
var count = promises.length;
var that = this;
var results = [];
promises.forEach(function(promise, i) {
promise.then(function(data) {
count--;
results[i] = data;
if (count === 0) {
that.resolve(results);
}
}, function(err) {
that.reject(err);
});
});
return this.promise;
};
//all()返回resolve()结果集

Promise链式调用

  • 前一个的调用的结果,作为下一个调用的开始,后一个then的回调函数会等待前一个promise的状态变化而运行

将API Promise化

// smooth(fs.readFile);
var smooth = function(method) {
    return function() {
        var deferred = new Deferred();
        var args = Array.prototype.slice.call(arguments, 1); //跳过第一个参数
        args.push(deferred.callback());
        method.apply(null, args);
        return deferred.promise;
    };
};

1
bluebrid的 promisify可以将api promise化
var Promise = require('bluebird') fs.readFileAsync = Promise.promisify(fs.readFie, fs) var Promise = require('bluebird') Promise.promisifyAll(fs)
1
2
3
#### async
`串行执行`:series()
async.series([ function(callback) { fs.readFile('file1.txt', 'utf-8', callback); }, function(callback) { fs.readFile('file2.txt', 'utf-8', callback); } ], function(err, results) { // results => [file1.txt, file2.txt] });
1
2
3
传入的callback()不是由使用者指定,callback()执行时将结果保存然后执行下一个调用,最终的回调函数执行时,保存的结果以数组传入,一旦异常结束所有调用,将异常传递给最终函数的第一个参数
`并行执行`:parallel()
async.parallel([function(callback) { fs.readFile('file1.txt', 'utf-8', callback); }, function(callback) { fs.readFile('file2.txt', 'utf-8', callback); } ], function(err, results) { // results => [file1.txt, file2.txt] }); //等价于 var counter = 2; var results = []; var done = function(index, value) { results[index] = value; counter--; if (counter === 0) { callback(null, results); } }; // 只传递第一个异常 var hasErr = false; var fail = function(err) { if (!hasErr) { hasErr = true; callback(err); } }; fs.readFile('file1.txt', 'utf-8', function(err, content) { if (err) { return fail(err); } done(0, content); }); fs.readFile('file2.txt', 'utf-8', function(err, data) { if (err) { return fail(err); } done(1, data); });
1
2
3
一旦异步调用异常,,就将异常作为第一个参数传给最终回调函数,结果为数组
`异步调用 依赖`:当前一个的结果是后一个调用的输入 waterfall()
async.waterfall([function(callback) { fs.readFile('file1.txt', 'utf-8', function(err, content) { callback(err, content); }); }, function(arg1, callback) { // arg1 => file2.txt fs.readFile(arg1, 'utf-8', function(err, content) { callback(err, content); }); }, function(arg1, callback) { // arg1 => file3.txt fs.readFile(arg1, 'utf-8', function(err, content) { callback(err, content); }); } ], function(err, result) { // result => file4.txt });
1
2
`自动依赖处理` auto()
{ readConfig: function() {}, //读取配置 connectMongoDB: function() {},//连接mongo connectRedis: function() {},//连接redis complieAsserts: function() {},//编译静态 uploadAsserts: function() {},//上传静态到cdn startup: function() {}//启动 } var deps = { readConfig: function(callback) { // read config file callback(); }, connectMongoDB: ['readConfig', function(callback) { // connect to mongodb callback(); }], connectRedis: ['readConfig', function(callback) { // connect to redis callback(); 图灵社区会员 Eric Liu(guangqiang.dev @gmail.com) 专享 尊重版权 }], complieAsserts: function(callback) { // complie asserts callback(); }, uploadAsserts: ['complieAsserts', function(callback) { // upload to assert 2 callback(); }], startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function(callback) { // startup }] }; //auto根据依赖关系自动分析 async.auto(deps);
1
2
3
#### Step
`串行`
Step( function readFile1() { fs.readFile('file1.txt', 'utf-8', this); }, function readFile2(err, content) { fs.readFile('file2.txt', 'utf-8', this); }, function done(err, content) { console.log(content); } );
1
2
3
4
thisStep内部的一个next()方法,将调用结果作为下一个任务的参数并调用
`并行` parallel()
Step( function readFile1() { fs.readFile('file1.txt', 'utf-8', this.parallel()); fs.readFile('file2.txt', 'utf-8', this.parallel()); }, function done(err, content1, content2) { // content1 => file1 // content2 => file2 console.log(arguments); });

如果异步方法传回结果为多个参数,step只取前两个。parallel()原理是每次执行时将内部计数器加1,