  当初使用 cluster 时,一直好奇它是怎么做到多个子进程监听同一个端口而不冲突的,比如下面这段代码:
  const cluster = require('cluster')
  const net = require('net')
  const cpus = require('os').cpus()
  if (cluster.isPrimary) {
    for (let i = 0; i < cpus.length; i++) {
  } else {
      .createServer(function (socket) {
        socket.on('data', function (data) {
          socket.write(`Reply from ${process.pid}: ` + data.toString())
        socket.on('end', function () {
  该段代码通过父进程 fork 出了多个子进程,且这些子进程都监听了 9999 这个端口并能正常提供服务,这是如何做到的呢?我们来研究一下。
  学习 Node.js 官方提供库最好的方式当然是调试一下,所以,我们先来准备一下环境。注:本文的操作系统为 macOS Big Sur 11.6.6,其他系统请自行准备相应环境。
  编译 Node.js
  下载 Node.js 源码
  git clone https://github.com/nodejs/node.git
  // lib/internal/cluster/primary.js
  function queryServer(worker, message) {
    // Stop processing if worker already disconnecting
    if (worker.exitedAfterDisconnect) return;
  // lib/internal/cluster/child.js
  send(message, (reply, handle) => {
    if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
    if (handle) {
      // Shared listen socket
      shared(reply, {handle, indexesKey, index}, cb)
    } else {
      // Round-robin.
      rr(reply, {indexesKey, index}, cb)
  ./configure --debug
  make -j4
  之后会生成 out/Debug/node
  准备 IDE 环境
  使用 vscode 调试,配置好 launch.json 就可以了(其他 IDE 类似,请自行解决):
    "version": "0.2.0",
    "configurations": [
        "name": "Debug C++",
        "type": "cppdbg",
        "program": "/Users/youxingzhi/ayou/node/out/Debug/node",
        "request": "launch",
        "args": ["/Users/youxingzhi/ayou/node/index.js"],
        "stopAtEntry": false,
        "cwd": "${workspaceFolder}",
        "environment": [],
        "externalConsole": false,
        "MIMode": "lldb"
        "name": "Debug Node",
        "type": "node",
        "runtimeExecutable": "/Users/youxingzhi/ayou/node/out/Debug/node",
        "request": "launch",
        "args": ["--expose-internals", "--nolazy"],
        "skipFiles": [],
        "program": "${workspaceFolder}/index.js"
  其中第一个是用于调式 C++ 代码(需要安装 C/C++ 插件),第二个用于调式 JS 代码。接下来就可以开始调试了,我们暂时用调式 JS 代码的那个配置就好了。
  Cluster 源码调试
  const cluster = require('cluster')
  const net = require('net')
  if (cluster.isPrimary) {
  } else {
    const server = net.createServer(function (socket) {
      socket.on('data', function (data) {
        socket.write(`Reply from ${process.pid}: ` + data.toString())
      socket.on('end', function () {
  执行 require('cluster') 时,会进入 lib/cluster.js 这个文件:
  const childOrPrimary = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'primary'
  module.exports = require(`internal/cluster/${childOrPrimary}`)
  会根据当前 process.env 上是否有 NODE_UNIQUE_ID 来引入不同的模块,此时是没有的,所以会引入 internal/cluster/primary.js 这个模块:
  const cluster = new EventEmitter();
  module.exports = cluster
  const handles = new SafeMap()
  cluster.isWorker = false
  cluster.isMaster = true // Deprecated alias. Must be same as isPrimary.
  cluster.isPrimary = true
  cluster.Worker = Worker
  cluster.workers = {}
  cluster.settings = {}
  cluster.SCHED_NONE = SCHED_NONE // Leave it to the operating system.
  cluster.SCHED_RR = SCHED_RR // Primary distributes connections.
  cluster.schedulingPolicy = schedulingPolicy
  cluster.setupPrimary = function (options) {
  // Deprecated alias must be same as setupPrimary
  cluster.setupMaster = cluster.setupPrimary
  function setupSettingsNT(settings) {
  function createWorkerProcess(id, env) {
  function removeWorker(worker) {
  function removeHandlesForWorker(worker) {
  cluster.fork = function (env) {
  该模块主要是在 cluster 对象上挂载了一些属性和方法,并导出,这些后面回过头再看,我们继续往下调试。往下调试会进入 if (cluster.isPrimary) 分支,代码很简单,仅仅是 fork 出了一个新的子进程而已:
  // lib/internal/cluster/primary.js
  cluster.fork = function (env) {
    const id = ++ids
    const workerProcess = createWorkerProcess(id, env)
    const worker = new Worker({
      id: id,
      process: workerProcess,
    worker.process.on('internalMessage', internal(worker, onmessage))
    process.nextTick(emitForkNT, worker)
    cluster.workers[worker.id] = worker
    return worker
  createWorkerProcess(id, env):
  // lib/internal/cluster/primary.js
  function createWorkerProcess(id, env) {
    const workerEnv = {...process.env, ...env, NODE_UNIQUE_ID: `${id}`}
    const execArgv = [...cluster.settings.execArgv]
    return fork(cluster.settings.exec, cluster.settings.args, {
      cwd: cluster.settings.cwd,
      env: workerEnv,
      serialization: cluster.settings.serialization,
      silent: cluster.settings.silent,
      windowsHide: cluster.settings.windowsHide,
      execArgv: execArgv,
      stdio: cluster.settings.stdio,
      gid: cluster.settings.gid,
      uid: cluster.settings.uid,
  可以看到,该方法主要是通过 fork 启动了一个子进程来执行我们的 index.js,且启动子进程的时候设置了环境变量 NODE_UNIQUE_ID,这样 index.js 中 require('cluster') 的时候,引入的就是 internal/cluster/child.js 模块了。
  worker.process.on('internalMessage', internal(worker, onmessage)):监听子进程传递过来的消息并处理。
  前面说了,此时引入的是 internal/cluster/child.js 模块,我们先跳过,继续往下,执行 server.listen(9999) 时实际上是调用了 Server 上的方法:
  // lib/net.js
  Server.prototype.listen = function (...args) {
          options.port | 0,
  可以看到,最终是调用了 listenInCluster:
  // lib/net.js
  function listenInCluster(
  ) {
    exclusive = !!exclusive
    if (cluster === undefined) cluster = require('cluster')
    if (cluster.isPrimary || exclusive) {
      // Will create a new handle
      // _listen2 sets up the listened handle, it is still named like this
      // to avoid breaking code that wraps this method
      server._listen2(address, port, addressType, backlog, fd, flags)
    const serverQuery = {
      address: address,
      port: port,
      addressType: addressType,
      fd: fd,
    // Get the primary's server handle, and listen on it
    cluster._getServer(server, serverQuery, listenOnPrimaryHandle)
    function listenOnPrimaryHandle(err, handle) {
      err = checkBindError(err, port, handle)
      if (err) {
        const ex = exceptionWithHostPort(err, 'bind', address, port)
        return server.emit('error', ex)
      // Reuse primary's server handle
      server._handle = handle
      // _listen2 sets up the listened handle, it is still named like this
      // to avoid breaking code that wraps this method
      server._listen2(address, port, addressType, backlog, fd, flags)
  由于是在子进程中执行,所以最后会调用 cluster._getServer(server, serverQuery, listenOnPrimaryHandle):
  // lib/internal/cluster/child.js
  // 这里的 cb 就是上面的 listenOnPrimaryHandle
  cluster._getServer = function (obj, options, cb) {
    send(message, (reply, handle) => {
      if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
      if (handle) {
        // Shared listen socket
        shared(reply, {handle, indexesKey, index}, cb)
      } else {
        // Round-robin.
        rr(reply, {indexesKey, index}, cb)
  该函数最终会向父进程发送 queryServer 的消息,父进程处理完后会调用回调函数,回调函数中会调用 cb 即 listenOnPrimaryHandle。看来,listen 的逻辑是在父进程中进行的了。
  父进程收到 queryServer 的消息后,最终会调用 queryServer 这个方法:
  // lib/internal/cluster/primary.js
  function queryServer(worker, message) {
    // Stop processing if worker already disconnecting
    if (worker.exitedAfterDisconnect) return
    const key =
      `${message.address}:${message.port}:${message.addressType}:` +
    let handle = handles.get(key)
    if (handle === undefined) {
      let address = message.address
      // Find shortest path for unix sockets because of the ~100 byte limit
      if (
        message.port < 0 &&
        typeof address === 'string' &&
        process.platform !== 'win32'
      ) {
        address = path.relative(process.cwd(), address)
        if (message.address.length < address.length) address = message.address
      // UDP is exempt from round-robin connection balancing for what should
      // be obvious reasons: it's connectionless. There is nothing to send to
      // the workers except raw datagrams and that's pointless.
      if (
        schedulingPolicy !== SCHED_RR ||
        message.addressType === 'udp4' ||
        message.addressType === 'udp6'
      ) {
        handle = new SharedHandle(key, address, message)
      } else {
        handle = new RoundRobinHandle(key, address, message)
      handles.set(key, handle)
  可以看到,这里主要是对 handle 的处理,这里的 handle 指的是调度策略,分为 SharedHandle 和 RoundRobinHandle,分别对应抢占式和轮询两种策略(文章最后补充部分有关于两者对比的例子)。
  Node.js 中默认是 RoundRobinHandle 策略,可通过环境变量 NODE_CLUSTER_SCHED_POLICY 来修改,取值可以为 none(SharedHandle) 或 rr(RoundRobinHandle)。
  首先,我们来看一下 SharedHandle,由于我们这里是 TCP 协议,所以最后会通过 net._createServerHandle 创建一个 TCP 对象挂载在 handle 属性上(注意这里又有一个 handle,别搞混了):
  // lib/internal/cluster/shared_handle.js
  function SharedHandle(key, address, {port, addressType, fd, flags}) {
    this.key = key
    this.workers = new SafeMap()
    this.handle = null
    this.errno = 0
    let rval
    if (addressType === 'udp4' || addressType === 'udp6')
      rval = dgram._createSocketHandle(address, port, addressType, fd, flags)
    else rval = net._createServerHandle(address, port, addressType, fd, flags)
    if (typeof rval === 'number') this.errno = rval
    else this.handle = rval
  在 createServerHandle 中除了创建 TCP 对象外,还绑定了端口和地址:
  // lib/net.js
  function createServerHandle(address, port, addressType, fd, flags) {
    } else {
      handle = new TCP(TCPConstants.SERVER);
      isTCP = true;
    if (address || port || isTCP) {
        err = handle.bind6(address, port, flags);
      } else {
        err = handle.bind(address, port);
    return handle;
  然后,queryServer 中继续执行,会调用 add 方法,最终会将 handle 也就是 TCP 对象传递给子进程:
  // lib/internal/cluster/primary.js
  function queryServer(worker, message) {
    if (!handle.data) handle.data = message.data
    // Set custom server data
    handle.add(worker, (errno, reply, handle) => {
      const {data} = handles.get(key)
      if (errno) handles.delete(key) // Gives other workers a chance to retry.
          ack: message.seq,
        handle // TCP 对象
  子进程收到父进程对于 queryServer 的回复后,会调用 shared:
  // lib/internal/cluster/child.js
  // `obj` is a net#Server or a dgram#Socket object.
  cluster._getServer = function (obj, options, cb) {
    send(message, (reply, handle) => {
      if (typeof obj._setServerData === 'function') obj._setServerData(reply.data)
      if (handle) {
        // Shared listen socket
        shared(reply, {handle, indexesKey, index}, cb)
      } else {
        // Round-robin.
        rr(reply, {indexesKey, index}, cb) // cb 是 listenOnPrimaryHandle
  shared 中最后会调用 cb 也就是 listenOnPrimaryHandle:
  // lib/net.js
  function listenOnPrimaryHandle(err, handle) {
    err = checkBindError(err, port, handle)
    if (err) {
      const ex = exceptionWithHostPort(err, 'bind', address, port)
      return server.emit('error', ex)
    // Reuse primary's server handle 这里的 server 是 index.js 中 net.createServer 返回的那个对象
    server._handle = handle
    // _listen2 sets up the listened handle, it is still named like this
    // to avoid breaking code that wraps this method
    server._listen2(address, port, addressType, backlog, fd, flags)
  这里会把 handle 赋值给 server._handle,这里的 server 是 index.js 中 net.createServer 返回的那个对象,并调用 server._listen2,也就是 setupListenHandle:
  // lib/net.js
  function setupListenHandle(address, port, addressType, backlog, fd, flags) {
    debug('setupListenHandle', address, port, addressType, backlog, fd)
    // If there is not yet a handle, we need to create one and bind.
    // In the case of a server sent via IPC, we don't need to do this.
    if (this._handle) {
      debug('setupListenHandle: have a handle already')
    } else {
    this[async_id_symbol] = getNewAsyncId(this._handle)
    this._handle.onconnection = onconnection
    this._handle[owner_symbol] = this
    // Use a backlog of 512 entries. We pass 511 to the listen() call because
    // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
    // which will thus give us a backlog of 512 entries.
    const err = this._handle.listen(backlog || 511)
    if (err) {
      const ex = uvExceptionWithHostPort(err, 'listen', address, port)
      this._handle = null
  首先会执行 this._handle.onconnection = onconnection,由于客户端请求过来时会调用 this._handle(也就是 TCP 对象)上的 onconnection 方法,也就是会执行lib/net.js 中的 onconnection 方法建立连接,之后就可以通信了。为了控制篇幅,该方法就不继续往下了。
  然后调用 listen 监听,注意这里参数 backlog 跟之前不同,不是表示端口,而是表示在拒绝连接之前,操作系统可以挂起的最大连接数量,也就是连接请求的排队数量。我们平时遇到的 listen EADDRINUSE: address already in use 错误就是因为这行代码返回了非 0 的错误。
  如果还有其他子进程,也会同样走一遍上述的步骤,不同之处是在主进程中 queryServer 时,由于已经有 handle 了,不需要再重新创建了:
  function queryServer(worker, message) {
    // Stop processing if worker already disconnecting
    if (worker.exitedAfterDisconnect) return;
    const key =
      `${message.address}:${message.port}:${message.addressType}:` +
    let handle = handles.get(key);


