b1f4837a93339fe48ba51e0e9918adc7.png

pm2 是 node 社区中多进程管理工具的佼佼者,本系列文章在参考 pm2 v4.5.0 源码的基础上,着重分析:

  • pm2 模块结构
  • 进程创建和启动
  • 进程中止
  • 进程重启
  • 进程通信
为更好理解 pm2 多进程管理,本文对 pm2 源码的分析侧重功能层面,对其他实现细节不关注,本篇文章将分析 pm2 模块结构和进程创建流程。

pm2 源码模块结构

cdf001b7c5f75d8983251a840f9ad5db.png
pm2 模块结构

pm2 核心模块包括:

  • Client.js: 建立工作进程与 rpc 服务的连接,同时负责与 Daemon 的 IPC 通信。
  • Daemon.js: 守护进程,负责创建 God以及 rpc 服务。
  • God.js: 存储每个进程的相关信息,接收并执行 ClientDaemon 发送的指令(包括监听、创建、重启、停止等)。
  • API.js : pm2 暴露的所有 api 接口,包括 startstopreload 等方法。

pm2 输出的主要模块是 API 类的实例:

var API = require('./lib/API.js')
const pm2 = new API
module.exports = pm2

API 初始化主要是设置内部默认配置和用户传入的配置以及生成一个 Client 实例:

class API {
  // https://github.com/Unitech/pm2/blob/4.5.0/lib/API.js#L57
  constructor(opts){
    if (!opts) opts = {};
    var that = this;

    // 标记是否以守护进程方式启动 Daemon
    this.daemon_mode = typeof(opts.daemon_mode) == 'undefined' ? true : opts.daemon_mode;
    // pm2 输出目录(日志、缓存等)
    this.pm2_home = conf.PM2_ROOT_PATH;

    ... 

    this.Client = new Client({
      pm2_home: this.pm2_home,
      daemon_mode: this.daemon_mode
      ...
    });
  } 
}

进程的创建和启动

pm2.start({
  script: './app.js', // 要执行的应用脚本
  exec_mode: 'cluster', // cluster 模式
  instances: 4, // 启动的进程数
})

假定当前系统中没有由 pm2 创建的守护进程或其他进程,用 cluster 模式执行上述脚本后,pm2 会做如下几件事:

确定进程创建入口 => 创建守护进程 => 建立与 rpc 服务的连接 => 启动工作进程、执行应用

1.确定进程创建入口

pm2 创建进程入口有两种:脚本文件和 json 配置。pm2 会根据开发者输入自动选取创建方式。

class API {
  // https://github.com/Unitech/pm2/blob/4.5.0/lib/API.js#L317
  start(cmd, opts, cb) {
    ...
    if (Common.isConfigFile(cmd) || (typeof(cmd) === 'object')) {
      that._startJson(cmd, opts, 'restartProcessId', (err, procs) => {
        return cb ? cb(err, procs) : this.speedList()
      })
    }
    else {
      that._startScript(cmd, opts, (err, procs) => {
        return cb ? cb(err, procs) : this.speedList(0)
      })
    }
  }

  // https://github.com/Unitech/pm2/blob/4.5.0/lib/API.js#L907
  _startJson (file, opts, action, pipe, cb){
    // 设置应用配置
    ...

    // 获取当前存活进程信息
    that.Client.executeRemote('getMonitorData', {}, function(err, raw_proc_list) {
      raw_proc_list.forEach(function(proc) {
        proc_list[proc.name] = proc;
      });

      eachLimit(
        Object.keys(proc_list), 
        conf.CONCURRENT_ACTIONS, 
        // 根据 action 执行对存活进程执行相应操作
        function(proc_name, next) {
          that._operate(action, proc_name, env, function(err, ret) {
            ...
            
            // 启动时传入的 action='restartProcessId'
            that.Client.notifyGod(action, proc_name);
          });
        },
        function(err) {
          ...

          // 创建并启动配置的应用进程
          return startApps(apps_name, function(err, apps) {
            apps_info = apps_info.concat(apps);
            return cb ? cb(err, apps_info) : that.speedList(err ? 1 : 0);
          });
        })
    })

    function startApps(app_name_to_start, cb) {
      ...
   
      that.Client.executeRemote('prepare', resolved_paths, function(err, data) {...})
    }
  }
}

执行上面脚本后,pm2 会使用 _startJson 方法作为进程启动入口,_startJson 会为要启动的应用初始化配置并且启动应用进程。

2.创建守护进程

在创建应用进程前,_startJson 首先会调用 Client.executeRemote('getMonitorData') 方法去获取当前系统中存活的进程信息。

  • 如果要创建的应用进程已存在,则会根据 'restartProcessId' 这个动作去更新该进程的配置信息并重启它。
  • 否则,调用 startApps 方法创建并启动应用进程。

但是在启动应用进程之前,需先建立与守护进程的连接,建立连接前会判断此时是否有守护进程,没有的话会优先创建守护进程。

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L506
Client.prototype.executeRemote = function executeRemote(method, app_conf, fn) {
  ...
  
  // 未建立连接
  if (!this.client || !this.client.call) {
    // 去建立与守护进程的连接
    this.start(function(error) {
      ...

      if (self.client) {
        return self.client.call(method, app_conf, fn);
      }
    });
    return false;
  }
}

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L48
Client.prototype.start = function(cb) {
  // 判断是否有存活的 Daemon
  this.pingDaemon(function(daemonAlive) {
    // 如果守护进程存活则直接启动 rpc 连接
    if (daemonAlive === true)
      return that.launchRPC(function(err, meta) {...});
    
    // 否则,先创建 Daemon
    // 如果配置的是‘非守护’模式,则直接启动 Daemon
    if (that.daemon_mode === false) {
      var Daemon         = require('./Daemon.js');
      var daemon = new Daemon(...);
      console.log('Launching in no daemon mode');
      ...

      daemon.innerStart(function() {
         that.launchRPC(function(err, meta) {...})
      })
      return false
    }
    
    // 如果是‘守护’模式,则会以守护进程的方式启动 Daemon
    that.launchDaemon(function(err, child) {
       that.launchRPC(function(err, meta) {...})
    })
  })
}

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L309
Client.prototype.pingDaemon = function pingDaemon(cb) {
  var req    = axon.socket('req');
  var client = new rpc.Client(req);
  
  // 无法与 Daemon 建立 rpc 连接
  client.sock.once('reconnect attempt', function() {
    client.sock.close();
    debug('Daemon not launched');
    process.nextTick(function() {
      return cb(false);
    });
  });
  ...
  
  // 成功与 Daemon 建立 rpc 连接
  client.sock.once('connect', function() {
    client.sock.once('close', function() {
      return cb(true);
    });
    client.sock.close();
    debug('Daemon alive');
  });
  
  req.connect(this.rpc_socket_file);
}

Client 通过创建 rpc.Client 实例并发起与 rpc.Server 的连接来判断 Daemon 模块是否已启动,事实上,rpc.Server 会在 Daemon 初始化时启动。

启动 Daemon 时,pm2 会根据传入的参数 daemon_mode 判断是直接启动,还是作为守护进程启动,本例中以守护进程模式启动 Daemon:

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L209
Client.prototype.launchDaemon = function(opts, cb) {
  ...

  var ClientJS = path.resolve(path.dirname(module.filename), 'Daemon.js');
  ...
  
  var node_args = [];
  node_args.push(ClientJS);
  ...

  var interpreter = 'node';
  var child = require('child_process').spawn(interpreter, node_args, {...});
  ...
  
  /* 监听 Daemon 启动成功消息, msg 格式为 
  {  online:true,
     success:true,
     pid:进程 id,
     pm2_version: pm2 版本 }
  */
  child.once('message', function(msg) {
    debug('PM2 daemon launched with return message: ', msg);
    ...
    
    // 与 Daemon 断开连接
    child.disconnect();
  })
}

pm2 通过 child_process.spawn 来生成 Daemon 进程,Daemon 启动后会向当前父进程发送 Daemon 相关信息 。

// Daemon.js
var God          = require('./God');

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Daemon.js#L34
Daemon.prototype.start = function() {
  var d = domain.create();
  ...
  
  // 启动同时监听异常
  d.run(function() {
    that.innerStart();
  });
}

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Daemon.js#L66
Daemon.prototype.innerStart = function(cb) {
  // 绑定 pub socket
  this.pub    = axon.socket('pub-emitter');
  this.pub_socket = this.pub.bind(this.pub_socket_file);
  this.pub_socket.once('bind', function() {
     ...
     // pub socket 绑定成功后,向 Client 报告启动成功的消息
     that.sendReady(cb);
  })
  ...

  // 绑定 rep socket
  this.rep    = axon.socket('rep');
  var server = new rpc.Server(this.rep);
  this.rpc_socket = this.rep.bind(this.rpc_socket_file);
  this.rpc_socket.once('bind', function() {
    ...
   
    // req socket 绑定成功后,向 Client 报告启动成功的消息
    that.rpc_socket_ready = true;
    that.sendReady(cb);
  })
  ...

  // 定义 rpc 服务回调
   server.expose({
    prepare                 : God.prepare, // 创建新进程
    getMonitorData          : God.getMonitorData, // 获取当前存活进程信息
    startProcessId          : God.startProcessId,
    stopProcessId           : God.stopProcessId,
    restartProcessId        : God.restartProcessId,
    deleteProcessId         : God.deleteProcessId,
    ...
  });
}

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Daemon.js#L306
Daemon.prototype.sendReady = function(cb) {
  // Send ready message to Client
  if (this.rpc_socket_ready == true && this.pub_socket_ready == true) {
    ...

    // child.once('message') 将会收到消息
    process.send({
      online      : true,
      success     : true,
      pid         : process.pid,
      pm2_version : pkg.version
    });
  };
}

if (require.main === module) {
  var daemon = new Daemon();

  daemon.start();
}

启动 Daemon 过程中,会同时绑定两种 socket: PubEmitterRepSocket,前者负责与 Client 通信,后者负责与 God 通信(RepSocket server 暴露的响应回调全是 God 模块函数,包括创建、重启、停止进程等方法)。两个 socket 一旦有一个绑定就会向 Client 发送 Daemon 创建成功的消息,通知 Client 去创建与 RepSocket server 的连接。 God 存储着当前所有进程的信息,在接收到 ClientDaemon 的指令后会对进程去执行相关操作。

// https://github.com/Unitech/pm2/blob/4.5.0/lib/God.js#L49
var God = module.exports = {
  next_id : 0, // 唯一标识
  clusters_db : {}, // 进程集合
  configuration: {}, // 配置
  bus : new EventEmitter2({...}) // God 内部事件订阅模块
};

// https://github.com/Unitech/pm2/blob/4.5.0/lib/God/ActionMethods.js#L40
God.getMonitorData = function getMonitorData(env, cb) {
  // 进程信息
  var processes = God.getFormatedProcesses();
  ...
}

3.建立 Client 与 rpc 连接

启动 Daemon 以及 rpc 服务后,下一步就是建立 Client 与 rpc 的连接。

// https://github.com/Unitech/pm2/blob/4.5.0/lib/Client.js#L360
Client.prototype.launchRPC = function launchRPC(cb) {
  var req      = axon.socket('req');
  this.client  = new rpc.Client(req);
  ...

  var connectHandler = function() {
    debug('RPC Connected to Daemon');
    ...
  };
  ...

  this.client.sock.once('connect', connectHandler);
  this.client_sock = req.connect(this.rpc_socket_file);
}

相比于创建 Daemon 的复杂流程,建立 Client 与 rpc 服务的连接比较简单,建立个 ReqSocket 发起连接即可。

4.启动应用进程

启动 Daemon 并建立好 rpc 连接后, 回到 Client.executeRemote('getMonitorData') 回调,API 会继续调用 Client.executeRemote('prepare') 去创建新进程,通过 rpc 服务, Client.executeRemote 相当于直接调用 God 的方法。

// https://github.com/Unitech/pm2/blob/4.5.0/lib/God.js#L113
God.prepare = function prepare (env, cb) {
  ...

  return God.executeApp(env, function (err, clu) {...});
}

// https://github.com/Unitech/pm2/blob/4.5.0/lib/God.js#L178
God.executeApp = function executeApp(env, cb) {
  var readyCb = function ready(proc) {
    ...

    console.log(`App [${proc.pm2_env.name}:${proc.pm2_env.pm_id}] online`);
    if (cb) cb(null, proc);
  }
  ...

  // 集群模式
  if (env_copy.exec_mode === 'cluster_mode') {
    God.nodeApp(env_copy, function nodeApp(err, clu) {
      ...

      God.clusters_db[clu.pm2_env.pm_id] = clu;
      ...

      return clu.once('online', function () {
        if (!clu.pm2_env.wait_ready)
          return readyCb(clu);
        // 监听应用进程是否已准备好启动
        var listener = function (packet) {
          if (packet.raw === 'ready' && ...) {
            ...

            return readyCb(clu)
          }
        }

        God.bus.on('process:msg', listener);
      })
    })
  } else {
    // fork 模式
    God.forkMode(env_copy, function nodeApp(err, clu) {
      ...

      God.clusters_db[clu.pm2_env.pm_id] = clu;
      ...
 
      if (!clu.pm2_env.wait_ready)
        return readyCb(clu);

      // 监听应用进程是否已准备好启动
      var listener = function (packet) {
          if (packet.raw === 'ready' && ...) {
            ...

            return readyCb(clu)
          }
        }

      God.bus.on('process:msg', listener);
    })
  }
}

pm2 支持两种启动应用进程的模式:clusterfork 模式,分别对应 ClusterMode.js 和 ForkMode.js 文件,具体实现细节不再赘述,两种模式的区别是:

forkcluster
创建进程实例数量单个多个
底层创建进程方式child_process.spawncluster.fork
支持语言框架nodejs、python、php 等仅支持 nodejs

值得注意的是,很多时候在启动应用进程时候,应用本身依赖的环境需要一定的启动时间(如数据库连接,服务器启动等),pm2 允许在应用依赖的环境准备成功后启动应用进程,称之为’优雅启动‘,只需要在应用准备好后向 pm2 发起 'ready' 消息即可。

var http = require('http');

var server = http.createServer().listen(8000, function() {
  setTimeout(function() {
    // 通知 pm2 已准备就绪
    process.send('ready');
  }, 2000);
});

至此,可以总结出 pm2 从头创建进程的一般过程,如下图:

1264d32346d5b0d62aa8cfa9449c162f.png
pm2 创建进程过程
Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐