nodejs多进程

多进程架构

依靠node提供的child_process模块,创建工作线程,实现多核cpu的利用

  • 主从模式

主进程和工作进程,在分布式架构中用于并行处理业务的模式,主进程负责调度和管理工作进程,工作进程负责业务处理

创建worker.js:

1
2
3
4
5
6
7
var http = require('http');
http.createServer(function(req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain'
});
res.end('Hello World\n');
}).listen(Math.round((1 + Math.random()) * 1000), '127.0.0.1');

创建master.js

1
2
3
4
5
var fork = require('child_process').fork;
var cpus = require('os').cpus();
for (var i = 0; i < cpus.length; i++) {
fork('./worker.js');
}

fork()复制的进程都是独立的进程,有独立的v8,但fork()过程比较长、内存占用多(至少10mb),多进程目的是充分使用cpu资源

创建工作线程

child_process模块提供了4个方法创建工作线程

  1. spawn() 启动一个工作线程执行命令
  2. exec() 启动一个工作线程并且包含回调函数 可以设置超时时间
  3. execFile() 启动一个工作线程来执行可执行文件 可以设置超时时间
  4. fork() 指定js文件来创建工作线程

后面三个方法都是spawn()的延伸

1
2
3
4
5
6
7
8
9
var cp = require('child_process');
cp.spawn('node', ['worker.js']);
cp.exec('node worker.js', function(err, stdout, stderr) {
// some code
});
cp.execFile('worker.js', function(err, stdout, stderr) {
// some code
});
cp.fork('./worker.js');

Smaller icon

通过execFile()执行的js文件,需要在首行加入: #!/usr/bin/env node

进程通信

  • IPC Inter-Process Communication 进程间通信
  • node使用libuv提供的管道(pipe)实现IPC,在window中使用named pipe,*nix使用Unix Domain Socket实现
  • 文件描述符:内核(kernel)利用文件描述符(file descriptor)来访问文件。文件描述符是非负整数。打开现存文件或新建文件时,内核会返回一个文件描述符,读写文件也需要使用文件描述符来指定待读写的文件。它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表

在创建工作线程之前会先创建IPC通道并监听,然后通过环境变量(NODE_CHANNEL_FD)告诉工作线程这个IPC通道的文件描述符,工作线程会根据它连接IPC通道完成进程连接。

IPC通道类似socket属于双向通信,直接在系统内核中完成更高效,IPC被node抽象为stream对象,在调用send()时(类似write())发送数据,接收到消息会通过message事件(类似data)触发

句柄传递

  • 句柄是一种可以用来标识资源的引用,包含了指向对象的文件描述符,可以表示一个服务端socket对象,客户端socket对象等

主进程接受到socket请求后,将socket直接发送给工作进程,不用重新与工作进程建立新的socket连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// parent.js
var cp = require('child_process');
var child1 = cp.fork('child.js');
var child2 = cp.fork('child.js');
// Open up the server object and send the handle
var server = require('net').createServer();
server.listen(1337, function() { //主进程发送完句柄后关闭了监听
child1.send('server', server);
child2.send('server', server);
server.close();
});
// child.js
var http = require('http');
var server = http.createServer(function(req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain'
});
res.end('handled by child, pid is ' + process.pid + '\n');
});
process.on('message', function(m, tcp) {
if (m === 'server') {
tcp.on('connection', function(socket) {
server.emit('connection', socket);
});
}
});
//多个工作线程同时监听相同端口

child_process().fork.send()可以发送的句柄有:

  1. net.Socket tcp套接字
  2. net.Server tcp服务器 建立在tcp服务上的应用层服务都可以
  3. net.Native C++层面的tcp套接字或IPC管道
  4. dgarm.Socket UDP套接字
  5. dgram.Native c++层面的UDP套接字

send()方法在将消息发送到IPC前,会组装成两个对象 一个是handle 一个是message:
{
cmd: ‘NODE_HANDLE’,
type: ‘net.Server’,
msg: message
}

Smaller icon

message会先JSON.stringify()序列化再写入IPC管道(IPC通道只接受字符串),子线程再解析还原为对象,然后触发message事件,这其中还要进行过滤,如果message.cmd的值以NODE_为前缀,就会响应internalMessage事件,例如为NODE_HANDLE,就会取出message.type和得到的文件描述符一起还原

1
2
3
4
5
6
7
8
9
10
//此处为接受到tcp服务器句柄后工作线程的还原过程
function(message, handle, emit) {
var self = this;
var server = new net.Server();
server.listen(handle, function() {
emit(server);
});
}
// 子进程根据message.type创建对应TCP服务对象,然后监听到文件描述符

node进程之间只有消息传递,没有实例对象传递

端口共同监听
  • 多进程使用相同的文件描述符

node底层对每个端口监听都设置了SO_REUSEADDR,可以让不同进程对相同的网卡和端口监听,即服务端套接字可以被多进程服用

setsockopt(tcp->io_watcher.fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));

当独立启动进程时,多个tcp服务器socket套接字的文件描述符互不相同,所以在监听相同端口时会报错。但send()发送句柄再启动的服务,它们引用了相同的文件描述符,并且文件描述符同一时间只能被一个进程使用,所以当网络请求来时,会出现抢占式服务

集群稳定

进程事件

  • error 当工作线程无法被复制创建、杀死、无法发送消息时触发
  • exit 工作线程退出时触发 ,若正常退出,则第一个参数为退出码,若是kill()的,则第二个参数为杀死进程信号(第一个为null)
  • close 工作线程的标准输入输出流中止时触发 参数与exit相同
  • disconnect 父进程、工作线程调用disconnect()方法时触发 将关闭监听IPC

父进程也可以用kill()发送信号 Node提供了kill -l列出的信号对应的事件

1
2
3
4
5
6
7
process.on('SIGTERM', function() {
console.log('Got a SIGTERM, exiting...');
process.exit(1);
});
console.log('sever running with PID:',process.pid);
process.kill(process.pid,'SIGTERM');

自动重启

  • 当工作线程退出时 启动新的工作进程来服务

给 process 对象添加 uncaughtException 事件绑定能够避免发生异常时进程直接退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// master.js
var fork = require('child_process').fork;
var cpus = require('os').cpus();
var server = require('net').createServer();
server.listen(1337);
var workers = {};
var createWorker = function() {
var worker = fork(__dirname + '/worker.js');
// 退出时重新启动新的进程
worker.on('exit', function() {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
createWorker();
});
// 句柄转发
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};
for (var i = 0; i < cpus.length; i++) {
createWorker();
}
//主进程自己退出时,让所有工作进程退出
process.on('exit', function() {
for (var pid in workers) {
workers[pid].kill();
}
});

当工作线程出现未捕获的异常,就立刻停止接受新的连接,在所有连接断开后退出,主进程帧听到exit后启动新的进程服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// worker.js
var http = require('http');
var server = http.createServer(function(req, res) {
res.writeHead(200, {
'Content-Type': 'text/plain'
});
res.end('handled by child, pid is ' + process.pid + '\n');
});
var worker;
process.on('message', function(m, tcp) {
if (m === 'server') {
worker = tcp;
worker.on('connection', function(socket) {
server.emit('connection', socket);
});
}
});
process.on('uncaughtException', function() {
//停止接收新的连接
worker.close(function() {
// 所有已有的连接断开后,退出进程
process.exit(1);
});
});
自杀信号
  • 当子工作线程因为意外要退出时 先发送自杀信号 再开始退出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// worker.js
process.on('uncaughtException', function(err) {
//记录日志
logger.error(err);
// 发送自杀信号
process.send({
act: 'suicide'
});
// 停止接收新的连接
worker.close(function() {
// 所有已有连接断开后,退出进程
process.exit(1);
});
// 5秒后退出进程 此处不graceful
setTimeout(function() {
process.exit(1);
}, 5000);
});
//原本在exit重启新进程的,改为在message事件中
var createWorker = function() {
var worker = fork(__dirname + '/worker.js');
// 启动新的进程
worker.on('message', function(message) {
if (message.act === 'suicide') {
createWorker();
}
});
worker.on('exit', function() {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
});
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};
限量重启
  • 避免因为在启动过程就出现错误 导致进程一直被重启

限制单位时间内重启次数,超过限制就触发giveup事件,并且对于此事件要着重加日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 重启次数
var limit = 10;
// 时间单位
var during = 60000;
var restart = [];
var isTooFrequently = function() {
// 记录重启时间
var time = Date.now();
var length = restart.push(time);
if (length > limit) {
// 取出最后10次记录
restart = restart.slice(limit * -1);
}
// 最后一次重启到前10次重启之间的时间间隔
return restart.length >= limit && restart[restart.length - 1] - restart[0] < during;
};
var workers = {};
var createWorker = function() {
// 检查是否太过频繁
if (isTooFrequently()) {
// 触发giveup事件后,不再重启
process.emit('giveup', length, during);
return;
}
var worker = fork(__dirname + '/worker.js');
worker.on('exit', function() {
console.log('Worker ' + worker.pid + ' exited.');
delete workers[worker.pid];
});
// 重新启动新的进程
worker.on('message', function(message) {
if (message.act === 'suicide') {
createWorker();
}
});
// 句柄转发
worker.send('server', server);
workers[worker.pid] = worker;
console.log('Create worker. pid: ' + worker.pid);
};

负载均衡

  • Round-Robin 轮叫调度

对于Node而言,它繁忙部分有CPU和I/O,影响抢占的是CPU的繁忙度,所以使用抢占式服务可能会导致负载不均衡。

多线程/多进程等待同一个 socket 事件,当这个事件发生时,这些线程/进程被同时唤醒,就是惊群。可以想见,效率很低下,许多进程被内核重新调度唤醒,同时去响应这一个事件,当然只有一个进程能处理事件成功,其他的进程在处理该事件失败后重新休眠(也有其他选择)。这种性能浪费现象就是惊群。

轮叫调度是由主线程接受连接,将它依次分发给工作线程,分发策略为:在N个工作线程中,每次选择第i=(i+1)mod n个进程来发送连接

可以在cluster模块中启用:

1
2
3
4
5
6
7
8
// 启用Round-Robin
cluster.schedulingPolicy = cluster.SCHED_RR
// 不启用Round-Robin
cluster.schedulingPolicy = cluster.SCHED_NONE
或者者在环境变量中设置NODE_CLUSTER_SCHED_POLICY的值
export NODE_CLUSTER_SCHED_POLICY=rr
export NODE_CLUSTER_SCHED_POLICY=none

Round-Robin也可以通过代理服务器实现

状态共享

  • 在多个进程中共享数据

将数据放到数据库、文件、缓存,在进程启动时读取进内存。但在数据改变时,需要通知各个工作线程,使它们及时同步状态

定时轮询各个工作线程向第三方定时轮询。但轮询时间设定比较麻烦,过短会有并发处理,且增加性能损耗,过长则会导致数据更新不及时

主动通知 新建通知进程(服务),由它来轮询获取数据更新,并通知工作线程.为了能跨服务器使用,可以使用TCP,UDP来通信。进程在启动时,主动将进程信息注册到通知服务,通知服务依次来通知

Smaller icon

Cluster模块

  • child_process和net模块的组合
  • 提供完善的API,但没有child_process灵活
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
}

在进程中判断是主进程还是工作进程:主要通过环境变量的NODE_UNIQUE_ID

1
2
3
cluster.isWorker = ('NODE_UNIQUE_ID' in process.env);
cluster.isMaster = (cluster.isWorker === false);
//使用cluster.setupMaster()设置主进程

Cluster会在内部启动TCP服务器,在fork()工作线程时,将tcp服务器的socket文件描述符发送给工作线程,并行该工作线程的环境变量里存在NODE_UNIQUE_ID。如果工作线程有listen()帧听端口,就会拿到文件描述符,通过SO_REUSEADDR设置实现端口共享。

使用cluster模块后,无法主动指定要共享的socket文件描述符,所以主进程只能管理一组工作进程。而使用child_process则可以创建多个tcp服务器,共享多个socket

Cluster事件

  • fork 复制一个工作线程后触发
  • online 复制工作线程完成后,工作线程主动发送online消息给主进程,主进程收到消息后触发
  • listening 工作线程调用listen()后,会发送listening消息给主进程,主进程收到消息后触发
  • disconnect 主进程和工作线程的IPC通道断开后触发
  • exit 工作线程退出时触发
  • setup cluster.setupMaster()执行后触发

上面的事件大都和child_process相关,是在进程间消息传递的基础上封装完成的

graceful

  • 优雅退出进程

在调用http.createServer().close()关闭server时,它会停止接收新的连接,但对于keepalive的连接,它依然会继续接受,导致close()无法完成

解决方案:

  • 记录所有连接 在关闭时全部销毁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
function enableDestroy(server) {
var connections = {}
server.on('connection', function(conn) {
var key = conn.remoteAddress + ':' + conn.remotePort;
connections[key] = conn;
conn.on('close', function() {
delete connections[key];
});
});
server.destroy = function(cb) {
server.close(cb);
for (var key in connections)
connections[key].destroy();
};
}

但是太浪费内存了

req.socket.destroy()