多进程架构
依靠node提供的child_process模块,创建工作线程,实现多核cpu的利用
主进程和工作进程,在分布式架构中用于并行处理业务的模式,主进程负责调度和管理工作进程,工作进程负责业务处理
创建worker.js:
1 2 3 4 5 6 7
| var http = require('http'); http.createServer(function(req, res) { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('Hello World\n'); }).listen(Math.round((1 + Math.random()) * 1000), '127.0.0.1');
|
创建master.js
1 2 3 4 5
| var fork = require('child_process').fork; var cpus = require('os').cpus(); for (var i = 0; i < cpus.length; i++) { fork('./worker.js'); }
|
fork()复制的进程都是独立的进程,有独立的v8,但fork()过程比较长、内存占用多(至少10mb),多进程目的是充分使用cpu资源
创建工作线程
child_process模块提供了4个方法创建工作线程
- spawn() 启动一个工作线程执行命令
- exec() 启动一个工作线程并且包含回调函数 可以设置超时时间
- execFile() 启动一个工作线程来执行可执行文件 可以设置超时时间
- fork() 指定js文件来创建工作线程
后面三个方法都是spawn()的延伸
1 2 3 4 5 6 7 8 9
| var cp = require('child_process'); cp.spawn('node', ['worker.js']); cp.exec('node worker.js', function(err, stdout, stderr) { }); cp.execFile('worker.js', function(err, stdout, stderr) { }); cp.fork('./worker.js');
|

通过execFile()执行的js文件,需要在首行加入: #!/usr/bin/env node
进程通信
- IPC Inter-Process Communication 进程间通信
- node使用libuv提供的管道(pipe)实现IPC,在window中使用named pipe,*nix使用Unix Domain Socket实现
- 文件描述符:内核(kernel)利用文件描述符(file descriptor)来访问文件。文件描述符是非负整数。打开现存文件或新建文件时,内核会返回一个文件描述符,读写文件也需要使用文件描述符来指定待读写的文件。它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表
在创建工作线程之前会先创建IPC通道并监听,然后通过环境变量(NODE_CHANNEL_FD)告诉工作线程这个IPC通道的文件描述符,工作线程会根据它连接IPC通道完成进程连接。
IPC通道类似socket属于双向通信,直接在系统内核中完成更高效,IPC被node抽象为stream对象,在调用send()时(类似write())发送数据,接收到消息会通过message事件(类似data)触发
句柄传递
- 句柄是一种可以用来标识资源的引用,包含了指向对象的文件描述符,可以表示一个服务端socket对象,客户端socket对象等
主进程接受到socket请求后,将socket直接发送给工作进程,不用重新与工作进程建立新的socket连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| var cp = require('child_process'); var child1 = cp.fork('child.js'); var child2 = cp.fork('child.js'); var server = require('net').createServer(); server.listen(1337, function() { child1.send('server', server); child2.send('server', server); server.close(); }); var http = require('http'); var server = http.createServer(function(req, res) { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('handled by child, pid is ' + process.pid + '\n'); }); process.on('message', function(m, tcp) { if (m === 'server') { tcp.on('connection', function(socket) { server.emit('connection', socket); }); } });
|
child_process().fork.send()可以发送的句柄有:
- net.Socket tcp套接字
- net.Server tcp服务器 建立在tcp服务上的应用层服务都可以
- net.Native C++层面的tcp套接字或IPC管道
- dgarm.Socket UDP套接字
- dgram.Native c++层面的UDP套接字
send()方法在将消息发送到IPC前,会组装成两个对象 一个是handle 一个是message:
{
cmd: ‘NODE_HANDLE’,
type: ‘net.Server’,
msg: message
}

message会先JSON.stringify()序列化再写入IPC管道(IPC通道只接受字符串),子线程再解析还原为对象,然后触发message事件,这其中还要进行过滤,如果message.cmd的值以NODE_为前缀,就会响应internalMessage事件,例如为NODE_HANDLE,就会取出message.type和得到的文件描述符一起还原
1 2 3 4 5 6 7 8 9 10
| function(message, handle, emit) { var self = this; var server = new net.Server(); server.listen(handle, function() { emit(server); }); } 上
|
node进程之间只有消息传递,没有实例对象传递
端口共同监听
node底层对每个端口监听都设置了SO_REUSEADDR,可以让不同进程对相同的网卡和端口监听,即服务端套接字可以被多进程服用
setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
当独立启动进程时,多个tcp服务器socket套接字的文件描述符互不相同,所以在监听相同端口时会报错。但send()发送句柄再启动的服务,它们引用了相同的文件描述符,并且文件描述符同一时间只能被一个进程使用,所以当网络请求来时,会出现抢占式服务
集群稳定
进程事件
- error 当工作线程无法被复制创建、杀死、无法发送消息时触发
- exit 工作线程退出时触发 ,若正常退出,则第一个参数为退出码,若是kill()的,则第二个参数为杀死进程信号(第一个为null)
- close 工作线程的标准输入输出流中止时触发 参数与exit相同
- disconnect 父进程、工作线程调用disconnect()方法时触发 将关闭监听IPC
父进程也可以用kill()发送信号 Node提供了kill -l列出的信号对应的事件
1 2 3 4 5 6 7
| process.on('SIGTERM', function() { console.log('Got a SIGTERM, exiting...'); process.exit(1); }); console.log('sever running with PID:',process.pid); process.kill(process.pid,'SIGTERM');
|
自动重启
给 process 对象添加 uncaughtException 事件绑定能够避免发生异常时进程直接退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| var fork = require('child_process').fork; var cpus = require('os').cpus(); var server = require('net').createServer(); server.listen(1337); var workers = {}; var createWorker = function() { var worker = fork(__dirname + '/worker.js'); worker.on('exit', function() { console.log('Worker ' + worker.pid + ' exited.'); delete workers[worker.pid]; createWorker(); }); worker.send('server', server); workers[worker.pid] = worker; console.log('Create worker. pid: ' + worker.pid); }; for (var i = 0; i < cpus.length; i++) { createWorker(); } process.on('exit', function() { for (var pid in workers) { workers[pid].kill(); } });
|
当工作线程出现未捕获的异常,就立刻停止接受新的连接,在所有连接断开后退出,主进程帧听到exit后启动新的进程服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| var http = require('http'); var server = http.createServer(function(req, res) { res.writeHead(200, { 'Content-Type': 'text/plain' }); res.end('handled by child, pid is ' + process.pid + '\n'); }); var worker; process.on('message', function(m, tcp) { if (m === 'server') { worker = tcp; worker.on('connection', function(socket) { server.emit('connection', socket); }); } }); process.on('uncaughtException', function() { worker.close(function() { process.exit(1); }); });
|
自杀信号
- 当子工作线程因为意外要退出时 先发送自杀信号 再开始退出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| // worker.js process.on('uncaughtException', function(err) { //记录日志 logger.error(err); // 发送自杀信号 process.send({ act: 'suicide' }); // 停止接收新的连接 worker.close(function() { // 所有已有连接断开后,退出进程 process.exit(1); }); // 5秒后退出进程 此处不graceful setTimeout(function() { process.exit(1); }, 5000); }); //原本在exit重启新进程的,改为在message事件中 var createWorker = function() { var worker = fork(__dirname + '/worker.js'); // 启动新的进程 worker.on('message', function(message) { if (message.act === 'suicide') { createWorker(); } }); worker.on('exit', function() { console.log('Worker ' + worker.pid + ' exited.'); delete workers[worker.pid]; }); worker.send('server', server); workers[worker.pid] = worker; console.log('Create worker. pid: ' + worker.pid); };
|
限量重启
限制单位时间内重启次数,超过限制就触发giveup事件,并且对于此事件要着重加日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| var limit = 10; var during = 60000; var restart = []; var isTooFrequently = function() { var time = Date.now(); var length = restart.push(time); if (length > limit) { restart = restart.slice(limit * -1); } return restart.length >= limit && restart[restart.length - 1] - restart[0] < during; }; var workers = {}; var createWorker = function() { if (isTooFrequently()) { process.emit('giveup', length, during); return; } var worker = fork(__dirname + '/worker.js'); worker.on('exit', function() { console.log('Worker ' + worker.pid + ' exited.'); delete workers[worker.pid]; }); worker.on('message', function(message) { if (message.act === 'suicide') { createWorker(); } }); worker.send('server', server); workers[worker.pid] = worker; console.log('Create worker. pid: ' + worker.pid); };
|
负载均衡
对于Node而言,它繁忙部分有CPU和I/O,影响抢占的是CPU的繁忙度,所以使用抢占式服务可能会导致负载不均衡。
多线程/多进程等待同一个 socket 事件,当这个事件发生时,这些线程/进程被同时唤醒,就是惊群。可以想见,效率很低下,许多进程被内核重新调度唤醒,同时去响应这一个事件,当然只有一个进程能处理事件成功,其他的进程在处理该事件失败后重新休眠(也有其他选择)。这种性能浪费现象就是惊群。
轮叫调度是由主线程接受连接,将它依次分发给工作线程,分发策略为:在N个工作线程中,每次选择第i=(i+1)mod n个进程来发送连接
可以在cluster模块中启用:
1 2 3 4 5 6 7 8
| cluster.schedulingPolicy = cluster.SCHED_RR cluster.schedulingPolicy = cluster.SCHED_NONE 或者者在环境变量中设置NODE_CLUSTER_SCHED_POLICY的值 export NODE_CLUSTER_SCHED_POLICY=rr export NODE_CLUSTER_SCHED_POLICY=none
|
Round-Robin也可以通过代理服务器实现
状态共享
将数据放到数据库、文件、缓存,在进程启动时读取进内存。但在数据改变时,需要通知各个工作线程,使它们及时同步状态
定时轮询
各个工作线程向第三方定时轮询。但轮询时间设定比较麻烦,过短会有并发处理,且增加性能损耗,过长则会导致数据更新不及时
主动通知
新建通知进程(服务),由它来轮询获取数据更新,并通知工作线程.为了能跨服务器使用,可以使用TCP,UDP来通信。进程在启动时,主动将进程信息注册到通知服务,通知服务依次来通知

Cluster模块
- child_process和net模块的组合
- 提供完善的API,但没有child_process灵活
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| var cluster = require('cluster'); var http = require('http'); var numCPUs = require('os').cpus().length; if (cluster.isMaster) { for (var i = 0; i < numCPUs; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { console.log('worker ' + worker.process.pid + ' died'); }); } else { http.createServer(function(req, res) { res.writeHead(200); res.end("hello world\n"); }).listen(8000); }
|
在进程中判断是主进程还是工作进程:主要通过环境变量的NODE_UNIQUE_ID
1 2 3
| cluster.isWorker = ('NODE_UNIQUE_ID' in process.env); cluster.isMaster = (cluster.isWorker === false);
|
Cluster会在内部启动TCP服务器,在fork()工作线程时,将tcp服务器的socket文件描述符发送给工作线程,并行该工作线程的环境变量里存在NODE_UNIQUE_ID。如果工作线程有listen()帧听端口,就会拿到文件描述符,通过SO_REUSEADDR设置实现端口共享。
使用cluster模块后,无法主动指定要共享的socket文件描述符,所以主进程只能管理一组工作进程。而使用child_process则可以创建多个tcp服务器,共享多个socket
Cluster事件
- fork 复制一个工作线程后触发
- online 复制工作线程完成后,工作线程主动发送online消息给主进程,主进程收到消息后触发
- listening 工作线程调用listen()后,会发送listening消息给主进程,主进程收到消息后触发
- disconnect 主进程和工作线程的IPC通道断开后触发
- exit 工作线程退出时触发
- setup cluster.setupMaster()执行后触发
上面的事件大都和child_process相关,是在进程间消息传递的基础上封装完成的
graceful
在调用http.createServer().close()关闭server时,它会停止接收新的连接,但对于keepalive的连接,它依然会继续接受,导致close()无法完成
解决方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| function enableDestroy(server) { var connections = {} server.on('connection', function(conn) { var key = conn.remoteAddress + ':' + conn.remotePort; connections[key] = conn; conn.on('close', function() { delete connections[key]; }); }); server.destroy = function(cb) { server.close(cb); for (var key in connections) connections[key].destroy(); }; }
|
但是太浪费内存了
req.socket.destroy()