高阶函数
:把函数参数作为参数,或作为返回值
偏函数
: 将传入参数作为判断或者其他逻辑条件
注意点
异常处理
异步I/O有两个处理阶段,中间有事件循环调度,异步方法在提交请求后就返回了,try/catch只能捕获当次事件循环内的异常,不能捕获callback的异常
1 2 3 4 5 6 7 8 9
| try { req.body = JSON.parse(buf, options.reviver); } catch (err) { err.body = buf; err.status = 400; return callback(err); } callback();
|
阻塞代码
这么写会持续占用CPU资源,破坏事件循环的调度,因为Node是单线程,会导致其余请求得不到响应
1 2 3 4 5 6
| var start = new Date(); while (new Date() - start < 1000) { }
|
异步编程解决方案
事件
Node中事件的发布通常是伴随事件循环而异步触发的
- 事件与侦听器是多对多的,设置过多的侦听器会导致过多的CPU占用
- 要给EventEmitter对象添加error事件和处理,若触发error事件不处理,会引起线程退出\
- 使用once()添加的侦听器只会执行一次,在执行后就会将相关的事件移除
使用事件的途径
1 2 3 4 5
| var events =require(‘events’); var emitter = new events.EventEmitter(); emitter.on('do',function(value){console.log(value)}); emitter.emit('do','doing');
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| var util = require('util') var events = require('events'); function Stream() { events.EventEmitter.call(this); } util.inherits(Stream, events.EventEmitter); Stream.prototype.test = function() { var self = this; try { throw 111; } catch (error) { self.emit('error') } } var stream = new Stream(); stream.on('error', function() { console.error('asdasd') }); stream.test();
|
利用Once()解决雪崩问题
- 在高访问量、大并发量的情况下缓存突然失效, 大量的请求同时涌入数据库中,数据库无法承受,影响网站速度
- 在缓存中无数据时,访问量大的话,同条SQL可能会被执行多次,此时可以将所有请求放入事件队列,利用once()来绑定,SQL在执行时,新到的相同调用只需在队列中等待数据,一旦查询结束,得到的结果可以被这些调用共同使用
1 2 3 4 5 6 7 8 9 10 11 12
| var proxy = new events.EventEmitter(); var status = "ready"; var select = function(callback) { proxy.once("selected", callback); if (status === "ready") { status = "pending"; db.select("SQL", function(results) { proxy.emit("selected", results); status = "ready"; }); } };
|
可能会因为侦听器过多引发警号,调用setMaxListeners(0)移除或设置大的阈值
EventProxy 处理多事件对一侦听器,适用于实例化事件
- all() 当每个事件都被触发了才会执行,只执行一次
1 2 3
| var proxy = new EventProxy(); proxy.all("template", "data", "resources", function(template, data, resources) { });
|
- tail() 首次也是需要每个事件都被触发,之后只要某个事件触发,就会用最新的数据执行
- after() 事件执行多少次后执行侦听器的单一事件组合订阅
1 2 3
| var proxy = new EventProxy(); proxy.after("data", 10, function(datas) { });
|
EventProxy对异常处理的优化
1 2 3 4 5 6 7 8 9 10 11 12
| ep.fail(callback); ep.fail(function(err) { callback(err); }); ep.bind('error', function(err) { ep.unbind(); callback(err); });
|
1 2 3 4 5 6 7 8 9
| ep.done('tpl'); function(err, content) { if (err) { return ep.emit('error', err); } ep.emit('tpl', content); }
|
done()接受函数参数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| ep.done(function(content) { ep.emit('tpl', content); }); function(err, content) { if (err) { return ep.emit('error', err); } (function(content) { ep.emit('tpl', content); }(content)); }
|
代码对比
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| exports.getContent = function(callback) { var ep = new EventProxy(); ep.all('tpl', function(tpl, data) { callback(null, { template: tpl, data: data }); }); ep.bind('error', function(err) { ep.unbind(); callback(err); }); fs.readFile('template.tpl', 'utf-8', function(err, content) { if (err) { return ep.emit('error', err); } ep.emit('tpl', content); }); }; exports.getContent = function(callback) { var ep = new EventProxy(); ep.all('tpl', function(tpl, data) { callback(null, { template: tpl, data: data }); }); ep.fail(callback); fs.readFile('template.tpl', 'utf-8', ep.done('tpl')); };
|
Promise/Deferred(应用参见:es6 Promise)
- 先执行异步调用,后传递处理方法
- then()方法只接受function对象,继续返回promise()对象,可选progress事件传入
- Promise是高级接口,依靠低级接口事件来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| var Promise = function() { EventEmitter.call(this); }; util.inherits(Promise, EventEmitter); Promise.prototype.then = function(fulfilledHandler, errorHandler, progressHandler) { if (typeof fulfilledHandler === 'function') { this.once('success', fulfilledHandler); } if (typeof errorHandler === 'function') { this.once('error', errorHandler); } if (typeof progressHandler === 'function') { this.on('progress', progressHandler); } return this; }; var Deferred = function() { this.state = 'unfulfilled'; this.promise = new Promise(); }; Deferred.prototype.resolve = function(obj) { this.state = 'fulfilled'; this.promise.emit('success', obj); }; Deferred.prototype.reject = function(err) { this.state = 'failed'; this.promise.emit('error', err); }; Deferred.prototype.progress = function(data) { this.promise.emit('progress', data); };
|
对res改造成promise
Deferred用于内部,维护异步模型的状态,Promise作用于外部,通过then()暴露给外部以添加自定义逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| var promisify = function(res) { var deferred = new Deferred(); var result = ''; res.on('data', function(chunk) { result += chunk; deferred.progress(chunk); }); res.on('end', function() { deferred.resolve(result); }); res.on('error', function(err) { deferred.reject(err); }); return deferred.promise; }; promisify(res).then(function() { }, function(err) { }, function(chunk) { console.log('BODY: ' + chunk); });
|
Promise多异步协作,all()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| Deferred.prototype.all = function(promises) { var count = promises.length; var that = this; var results = []; promises.forEach(function(promise, i) { promise.then(function(data) { count--; results[i] = data; if (count === 0) { that.resolve(results); } }, function(err) { that.reject(err); }); }); return this.promise; }; //all()返回resolve()结果集
|
Promise链式调用
- 前一个的调用的结果,作为下一个调用的开始,后一个then的回调函数会等待前一个promise的状态变化而运行
将API Promise化
// smooth(fs.readFile);
var smooth = function(method) {
return function() {
var deferred = new Deferred();
var args = Array.prototype.slice.call(arguments, 1); //跳过第一个参数
args.push(deferred.callback());
method.apply(null, args);
return deferred.promise;
};
};
1
| bluebrid的 promisify可以将api promise化
|
var Promise = require('bluebird')
fs.readFileAsync = Promise.promisify(fs.readFie, fs)
var Promise = require('bluebird')
Promise.promisifyAll(fs)
1 2 3
| #### async `串行执行`:series()
|
async.series([
function(callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function(callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function(err, results) {
// results => [file1.txt, file2.txt]
});
1 2 3
| 传入的callback不是由使用者指定,callback执行时将结果保存然后执行下一个调用,最终的回调函数执行时,保存的结果以数组传入,一旦异常结束所有调用,将异常传递给最终函数的第一个参数 `并行执行`:parallel
|
async.parallel([function(callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function(callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function(err, results) {
// results => [file1.txt, file2.txt]
});
//等价于
var counter = 2;
var results = [];
var done = function(index, value) {
results[index] = value;
counter--;
if (counter === 0) {
callback(null, results);
}
};
// 只传递第一个异常
var hasErr = false;
var fail = function(err) {
if (!hasErr) {
hasErr = true;
callback(err);
}
};
fs.readFile('file1.txt', 'utf-8', function(err, content) {
if (err) {
return fail(err);
}
done(0, content);
});
fs.readFile('file2.txt', 'utf-8', function(err, data) {
if (err) {
return fail(err);
}
done(1, data);
});
1 2 3
| 一旦异步调用异常,,就将异常作为第一个参数传给最终回调函数,结果为数组 `异步调用 依赖`:当前一个的结果是后一个调用的输入 waterfall()
|
async.waterfall([function(callback) {
fs.readFile('file1.txt', 'utf-8', function(err, content) {
callback(err, content);
});
},
function(arg1, callback) {
// arg1 => file2.txt
fs.readFile(arg1, 'utf-8', function(err, content) {
callback(err, content);
});
},
function(arg1, callback) {
// arg1 => file3.txt
fs.readFile(arg1, 'utf-8', function(err, content) {
callback(err, content);
});
}
], function(err, result) {
// result => file4.txt
});
{
readConfig: function() {}, //读取配置
connectMongoDB: function() {},//连接mongo
connectRedis: function() {},//连接redis
complieAsserts: function() {},//编译静态
uploadAsserts: function() {},//上传静态到cdn
startup: function() {}//启动
}
var deps = {
readConfig: function(callback) {
// read config file
callback();
},
connectMongoDB: ['readConfig', function(callback) {
// connect to mongodb
callback();
}],
connectRedis: ['readConfig', function(callback) {
// connect to redis callback();
图灵社区会员 Eric Liu(guangqiang.dev @gmail.com) 专享 尊重版权
}],
complieAsserts: function(callback) {
// complie asserts
callback();
},
uploadAsserts: ['complieAsserts', function(callback) {
// upload to assert 2 callback();
}],
startup: ['connectMongoDB', 'connectRedis', 'uploadAsserts', function(callback) {
// startup
}]
};
//auto根据依赖关系自动分析
async.auto(deps);
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this);
},
function readFile2(err, content) {
fs.readFile('file2.txt', 'utf-8', this);
},
function done(err, content) {
console.log(content);
}
);
1 2 3 4
| this是Step内部的一个next()方法,将调用结果作为下一个任务的参数并调用 `并行` parallel()
|
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this.parallel());
fs.readFile('file2.txt', 'utf-8', this.parallel());
},
function done(err, content1, content2) {
// content1 => file1
// content2 => file2
console.log(arguments);
});
如果异步方法传回结果为多个参数,step只取前两个。parallel()原理是每次执行时将内部计数器加1,