守护进程有什么用_从 pm2 看 node 多进程管理(一):进程创建
pm2 是 node 社区中多进程管理工具的佼佼者,本系列文章在参考 pm2 v4.5.0 源码的基础上,着重分析:pm2 模块结构进程创建和启动进程中止进程重启进程通信为更好理解 pm2 多进程管理,本文对 pm2 源码的分析侧重功能层面,对其他实现细节不关注,本篇文章将分析 pm2 模块结构和进程创建流程。pm2 源码模块结构pm2 模块结构pm2 核心模块包括:Client.js: 建立工作
pm2 是 node 社区中多进程管理工具的佼佼者,本系列文章在参考 pm2 v4.5.0 源码的基础上,着重分析:
- pm2 模块结构
- 进程创建和启动
- 进程中止
- 进程重启
- 进程通信
为更好理解 pm2 多进程管理,本文对 pm2 源码的分析侧重功能层面,对其他实现细节不关注,本篇文章将分析 pm2 模块结构和进程创建流程。
pm2 源码模块结构
pm2 核心模块包括:
- Client.js: 建立工作进程与 rpc 服务的连接,同时负责与
Daemon
的 IPC 通信。 - Daemon.js: 守护进程,负责创建
God
以及 rpc 服务。 - God.js: 存储每个进程的相关信息,接收并执行
Client
与Daemon
发送的指令(包括监听、创建、重启、停止等)。 - API.js : pm2 暴露的所有 api 接口,包括
start
、stop
、reload
等方法。
pm2 输出的主要模块是 API
类的实例:
var API = require('./lib/API.js')
const pm2 = new API
module.exports = pm2
API
初始化主要是设置内部默认配置和用户传入的配置以及生成一个 Client
实例:
class API {
// https://github.com/Unitech/pm2/blob/4.5.0/lib/API.js#L57
constructor(opts){
if (!opts) opts = {};
var that = this;
// 标记是否以守护进程方式启动 Daemon
this.daemon_mode = typeof(opts.daemon_mode) == 'undefined' ? true : opts.daemon_mode;
// pm2 输出目录(日志、缓存等)
this.pm2_home = conf.PM2_ROOT_PATH;
...
this.Client = new Client({
pm2_home: this.pm2_home,
daemon_mode: this.daemon_mode
...
});
}
}
进程的创建和启动
pm2.start({
script: './app.js', // 要执行的应用脚本
exec_mode: 'cluster', // cluster 模式
instances: 4, // 启动的进程数
})
假定当前系统中没有由 pm2 创建的守护进程或其他进程,用 cluster
模式执行上述脚本后,pm2 会做如下几件事:
确定进程创建入口 => 创建守护进程 => 建立与 rpc 服务的连接 => 启动工作进程、执行应用
1.确定进程创建入口
pm2 创建进程入口有两种:脚本文件和 json 配置。pm2 会根据开发者输入自动选取创建方式。
class API {
// https://github.com/Unitech/pm2/blob/4.5.0/lib/API.js#L317
start(cmd, opts, cb) {
...
if (Common.isConfigFile(cmd) || (typeof(cmd) === 'object')) {
that._startJson(cmd, opts, 'restartProcessId', (err, procs) => {
return cb ? cb(err, procs) : this.speedList()
})
}
else {
that._startScript(cmd, opts, (err, procs) => {
return cb ? cb(err, procs) : this.speedList(0)
})
}
}
// https://github.com/Unitech/pm2/blob/4.5.0/lib/API.js#L907
_startJson (file, opts, action, pipe, cb){
// 设置应用配置
...
// 获取当前存活进程信息
that.Client.executeRemote('getMonitorData', {}, function(err, raw_proc_list) {
raw_proc_list.forEach(function(proc) {
proc_list[proc.name] = proc;
});
eachLimit(
Object.keys(proc_list),
conf.CONCURRENT_ACTIONS,
// 根据 action 执行对存活进程执行相应操作
function(proc_name, next) {
that._operate(action, proc_name, env, function(err, ret) {
...
// 启动时传入的 action='restartProcessId'
that.Client.notifyGod(action, proc_name);
});
},
function(err) {
...
// 创建并启动配置的应用进程
return startApps(apps_name, function(err, apps) {
apps_info = apps_info.concat(apps);
return cb ? cb(err, apps_info) : that.speedList(err ? 1 : 0);
});
})
})
function startApps(app_name_to_start, cb) {
...
that.Client.executeRemote('prepare', resolved_paths, function(err, data) {...})
}
}
}
执行上面脚本后,pm2 会使用 _startJson
方法作为进程启动入口,_startJson
会为要启动的应用初始化配置并且启动应用进程。
2.创建守护进程
在创建应用进程前,_startJson
首先会调用 Client.executeRemote('getMonitorData')
方法去获取当前系统中存活的进程信息。
- 如果要创建的应用进程已存在,则会根据
'restartProcessId'
这个动作去更新该进程的配置信息并重启它。 - 否则,调用
startApps
方法创建并启动应用进程。
但是在启动应用进程之前,需先建立与守护进程的连接,建立连接前会判断此时是否有守护进程,没有的话会优先创建守护进程。
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L506
Client.prototype.executeRemote = function executeRemote(method, app_conf, fn) {
...
// 未建立连接
if (!this.client || !this.client.call) {
// 去建立与守护进程的连接
this.start(function(error) {
...
if (self.client) {
return self.client.call(method, app_conf, fn);
}
});
return false;
}
}
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L48
Client.prototype.start = function(cb) {
// 判断是否有存活的 Daemon
this.pingDaemon(function(daemonAlive) {
// 如果守护进程存活则直接启动 rpc 连接
if (daemonAlive === true)
return that.launchRPC(function(err, meta) {...});
// 否则,先创建 Daemon
// 如果配置的是‘非守护’模式,则直接启动 Daemon
if (that.daemon_mode === false) {
var Daemon = require('./Daemon.js');
var daemon = new Daemon(...);
console.log('Launching in no daemon mode');
...
daemon.innerStart(function() {
that.launchRPC(function(err, meta) {...})
})
return false
}
// 如果是‘守护’模式,则会以守护进程的方式启动 Daemon
that.launchDaemon(function(err, child) {
that.launchRPC(function(err, meta) {...})
})
})
}
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L309
Client.prototype.pingDaemon = function pingDaemon(cb) {
var req = axon.socket('req');
var client = new rpc.Client(req);
// 无法与 Daemon 建立 rpc 连接
client.sock.once('reconnect attempt', function() {
client.sock.close();
debug('Daemon not launched');
process.nextTick(function() {
return cb(false);
});
});
...
// 成功与 Daemon 建立 rpc 连接
client.sock.once('connect', function() {
client.sock.once('close', function() {
return cb(true);
});
client.sock.close();
debug('Daemon alive');
});
req.connect(this.rpc_socket_file);
}
Client
通过创建 rpc.Client
实例并发起与 rpc.Server
的连接来判断 Daemon
模块是否已启动,事实上,rpc.Server
会在 Daemon
初始化时启动。
启动 Daemon
时,pm2 会根据传入的参数 daemon_mode
判断是直接启动,还是作为守护进程启动,本例中以守护进程模式启动 Daemon
:
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L209
Client.prototype.launchDaemon = function(opts, cb) {
...
var ClientJS = path.resolve(path.dirname(module.filename), 'Daemon.js');
...
var node_args = [];
node_args.push(ClientJS);
...
var interpreter = 'node';
var child = require('child_process').spawn(interpreter, node_args, {...});
...
/* 监听 Daemon 启动成功消息, msg 格式为
{ online:true,
success:true,
pid:进程 id,
pm2_version: pm2 版本 }
*/
child.once('message', function(msg) {
debug('PM2 daemon launched with return message: ', msg);
...
// 与 Daemon 断开连接
child.disconnect();
})
}
pm2 通过 child_process.spawn
来生成 Daemon
进程,Daemon
启动后会向当前父进程发送 Daemon
相关信息 。
// Daemon.js
var God = require('./God');
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Daemon.js#L34
Daemon.prototype.start = function() {
var d = domain.create();
...
// 启动同时监听异常
d.run(function() {
that.innerStart();
});
}
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Daemon.js#L66
Daemon.prototype.innerStart = function(cb) {
// 绑定 pub socket
this.pub = axon.socket('pub-emitter');
this.pub_socket = this.pub.bind(this.pub_socket_file);
this.pub_socket.once('bind', function() {
...
// pub socket 绑定成功后,向 Client 报告启动成功的消息
that.sendReady(cb);
})
...
// 绑定 rep socket
this.rep = axon.socket('rep');
var server = new rpc.Server(this.rep);
this.rpc_socket = this.rep.bind(this.rpc_socket_file);
this.rpc_socket.once('bind', function() {
...
// req socket 绑定成功后,向 Client 报告启动成功的消息
that.rpc_socket_ready = true;
that.sendReady(cb);
})
...
// 定义 rpc 服务回调
server.expose({
prepare : God.prepare, // 创建新进程
getMonitorData : God.getMonitorData, // 获取当前存活进程信息
startProcessId : God.startProcessId,
stopProcessId : God.stopProcessId,
restartProcessId : God.restartProcessId,
deleteProcessId : God.deleteProcessId,
...
});
}
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Daemon.js#L306
Daemon.prototype.sendReady = function(cb) {
// Send ready message to Client
if (this.rpc_socket_ready == true && this.pub_socket_ready == true) {
...
// child.once('message') 将会收到消息
process.send({
online : true,
success : true,
pid : process.pid,
pm2_version : pkg.version
});
};
}
if (require.main === module) {
var daemon = new Daemon();
daemon.start();
}
启动 Daemon
过程中,会同时绑定两种 socket: PubEmitter
和 RepSocket
,前者负责与 Client
通信,后者负责与 God
通信(RepSocket
server 暴露的响应回调全是 God
模块函数,包括创建、重启、停止进程等方法)。两个 socket 一旦有一个绑定就会向 Client
发送 Daemon
创建成功的消息,通知 Client
去创建与 RepSocket
server 的连接。 God
存储着当前所有进程的信息,在接收到 Client
或 Daemon
的指令后会对进程去执行相关操作。
// https://github.com/Unitech/pm2/blob/4.5.0/lib/God.js#L49
var God = module.exports = {
next_id : 0, // 唯一标识
clusters_db : {}, // 进程集合
configuration: {}, // 配置
bus : new EventEmitter2({...}) // God 内部事件订阅模块
};
// https://github.com/Unitech/pm2/blob/4.5.0/lib/God/ActionMethods.js#L40
God.getMonitorData = function getMonitorData(env, cb) {
// 进程信息
var processes = God.getFormatedProcesses();
...
}
3.建立 Client
与 rpc 连接
启动 Daemon 以及 rpc 服务后,下一步就是建立 Client 与 rpc 的连接。
// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L360
Client.prototype.launchRPC = function launchRPC(cb) {
var req = axon.socket('req');
this.client = new rpc.Client(req);
...
var connectHandler = function() {
debug('RPC Connected to Daemon');
...
};
...
this.client.sock.once('connect', connectHandler);
this.client_sock = req.connect(this.rpc_socket_file);
}
相比于创建 Daemon
的复杂流程,建立 Client
与 rpc 服务的连接比较简单,建立个 ReqSocket
发起连接即可。
4.启动应用进程
启动 Daemon
并建立好 rpc 连接后, 回到 Client.executeRemote('getMonitorData')
回调,API
会继续调用 Client.executeRemote('prepare')
去创建新进程,通过 rpc 服务, Client.executeRemote
相当于直接调用 God
的方法。
// https://github.com/Unitech/pm2/blob/4.5.0/lib/God.js#L113
God.prepare = function prepare (env, cb) {
...
return God.executeApp(env, function (err, clu) {...});
}
// https://github.com/Unitech/pm2/blob/4.5.0/lib/God.js#L178
God.executeApp = function executeApp(env, cb) {
var readyCb = function ready(proc) {
...
console.log(`App [${proc.pm2_env.name}:${proc.pm2_env.pm_id}] online`);
if (cb) cb(null, proc);
}
...
// 集群模式
if (env_copy.exec_mode === 'cluster_mode') {
God.nodeApp(env_copy, function nodeApp(err, clu) {
...
God.clusters_db[clu.pm2_env.pm_id] = clu;
...
return clu.once('online', function () {
if (!clu.pm2_env.wait_ready)
return readyCb(clu);
// 监听应用进程是否已准备好启动
var listener = function (packet) {
if (packet.raw === 'ready' && ...) {
...
return readyCb(clu)
}
}
God.bus.on('process:msg', listener);
})
})
} else {
// fork 模式
God.forkMode(env_copy, function nodeApp(err, clu) {
...
God.clusters_db[clu.pm2_env.pm_id] = clu;
...
if (!clu.pm2_env.wait_ready)
return readyCb(clu);
// 监听应用进程是否已准备好启动
var listener = function (packet) {
if (packet.raw === 'ready' && ...) {
...
return readyCb(clu)
}
}
God.bus.on('process:msg', listener);
})
}
}
pm2 支持两种启动应用进程的模式:cluster
和 fork
模式,分别对应 ClusterMode.js 和 ForkMode.js 文件,具体实现细节不再赘述,两种模式的区别是:
fork | cluster | |
---|---|---|
创建进程实例数量 | 单个 | 多个 |
底层创建进程方式 | child_process.spawn | cluster.fork |
支持语言框架 | nodejs、python、php 等 | 仅支持 nodejs |
值得注意的是,很多时候在启动应用进程时候,应用本身依赖的环境需要一定的启动时间(如数据库连接,服务器启动等),pm2 允许在应用依赖的环境准备成功后启动应用进程,称之为’优雅启动‘,只需要在应用准备好后向 pm2 发起 'ready' 消息即可。
var http = require('http');
var server = http.createServer().listen(8000, function() {
setTimeout(function() {
// 通知 pm2 已准备就绪
process.send('ready');
}, 2000);
});
至此,可以总结出 pm2 从头创建进程的一般过程,如下图:
更多推荐
所有评论(0)