Node.js硬实战 - 子进程

杨旭 bio photo By 杨旭

子进程:利用Node整合外部应用


第八章 进程:利用Node整合外部应用


在Node的世界里,child_process模块是允许我们在node程序中执行外部程序

Node允许使用四种方式执行外部应用:

  • execFile - 执行外部程序,并且需要提供一组参数,以及一个在子进程退出后的缓冲输出的回调
  • spawn - 执行外部程序,并且需要提供一组阐述,以及一个在子进程退出后的输入输出和事件的数据流接口
  • exec - 在一个命令行窗口中执行一个或多个命令,以及一个在进程退出后缓冲输出的回调
  • fork - 在一个独立的进程中执行一个Node模块,并且需要提供一组参数,以及一个类似spawn方法里的数据流和事件接口,同时设置好父进程和子进程之间的进程间通信

执行外部应用程序

执行一个外部程序获得相应的输出

const cp = require('child_process')

cp.execFile('echo', ['hello', 'world'], (err, stdout, stdin) => {
    if (err) console.error(err)
    console.log('stdout', stdout)
    console.log('stdin', stdin)
})

路径和Path的环境变量

  • PATH包含一组可执行程序的执行目录列表
  • 在Node中,当后台运行execvp时,如果没有提供绝对或相对路径,就会基于PATH搜索相关程序
  • 通过process.env.PATH查看环境变量
  • 也可以在执行execFile之前通过process.env.PATH设置环境变量
process.env.PATH += `:a/new/path/to/executable`

执行外部程序时出现的异常

  • 找不到程序时,遇到ENOENT的错误
  • Node无权限访问程序,遇到EACCES或者EPERM的错误
  • 当外部程序返回非0状态时
const cp = require('child_process')

cp.execFile('ls', ['file-not-exist'], (err, stdout, stdin) => {
    if (err) console.error(err)
    console.log('stdout', stdout)
    console.log('stdin', stdin)
})

流和外部应用程序

  • 预期会有大数据量传输的时候
  • 希望提高响应效率的时候
const cp = require('child_process')

const child = cp.spawn('echo', ['hello', 'world'])

child.on('error', console.error)
child.stdout.pipe(process.stdout)
child.stderr.pipe(process.stderr)

外部程序的串联调用

  • UNIX系统的一个重要哲理:每个程序专注做好一件事,各程序之间通过通用的接口(纯文本)进行相互通信
const cp = require('child_process')

const child = cp.spawn('echo', ['hello', 'world'])

child.on('error', console.error)
child.stdout.pipe(process.stdout)
child.stderr.pipe(process.stderr)

在shell中执行命令

  • 使用exec执行一些系统已有的子命令
  • exec实际上是调用/bin/sh或cmd.exe来执行命令
  • 如果使用用户输入来拼接命令,很有可能出现非法的注入 ```js const cp = require(‘child_process’)

cp.exec(‘cat messy.txt | sort | uniq’, (err, stdout, stdin) => { console.log(stdout) })


### 分离子进程

通过Node打开一个外部应用程序,让他独立运行

- 分离一个衍生的子进程
- 正常情况下父进程终止时,子进程会终结;使用spawn方法可以做到进程分离

```js
const child = cp.spawn('./longrun', [], { detached: true })

父进程和子进程之间的I/O处理

  • stdio选项定义了子进程的I/O连接到一个具体的地方
  • 值可以是字符串或者数组
  • 默认情况下,使用字符串pipe - stdio: 'pipe'
  • 数组模式下,0-2号文件解析器,分别对应stdin、stdout、stderr
  • 这样依赖stdio的值就连接了父子进程(通过文件)
  • 如果想销毁这一连接,可以使用destroy()方法,或者设置为ignore
const cp = require('child_process')
const fs = require('fs')

const outFd = fs.openSync('./longrun.out', 'a')
const errFd = fs.openSync('./longrun.err', 'a')

const child = cp.spawn('./longrun', [], {
    detached: true,
    stdio: ['ignore', outFd, errFd],
})

引用计数和子进程

  • 分离之后,父进程会保留对子进程的引用
  • 通过child.unref()来取消计数

执行Node程序

Forking Node模块

  • 在处理大数据量计算的时候,将大批量的任务压力分解到一个独立的进程中,通过进程间通信来处理问题

  • Node为其他的Node程序之间通信,提供了更好的方式:stdio: [0, 1, 2, ‘ipc’],表示所有的输入和输出都是继承父进程
  • 如果希望提供一种和spawn具有一样的默认的I/O配置,可以使用silent: true选项

和forked的Node模块进行通信

  • 使用fork方法会开辟一个IPC通道
  • 子进程这边会暴露process.on(‘message’)process.send()
  • 父进程这边使用的是child.on(‘message’)child.send()
  • 使用child.disconnect()来断开连接

运行作业

  • 创建一个工作池
  • 父进程发送一个消息给子进程的时候,期望一个确切的结果
// worker.js
#! /usr/bin/env node

process.on('message', (msg) => {
    process.send(parseInt(msg) * 2) // 收到数字,乘2之后返回
})

// worker-pool.js
const cp = require('child_process')
const cpus = require('os').cpus().length // 获取cpu数量,创建尽可能多的fork

module.exports = function (workModule) {
    const awaiting = [] // 当前处于排队的任务
    const readyPool = [] // 当前可用的worker
    let poolSize = 0 // 工作池的大小

    return function doWork(job, cb) {

        console.log(`poolSize: ${poolSize} readyPool: ${readyPool.length} awaiting: ${awaiting.length}`)

        if (!readyPool.length && poolSize > cpus) {
            return awaiting.push([doWork, job, cb]) // 如果没有空闲工作者或者数量超过池限制,加入排队
        }


        const child = readyPool.length
            ? readyPool.shift() // 如果有空闲资源,直接使用
            : (poolSize++, cp.fork(workModule)) // 如果还未超限,创建一个新的fork

        let cbTriggered = false // 记录回调是否被执行

        child
            .removeAllListeners()
            .once('error', (err) => {
            if (!cbTriggered) { // 避免多次执行回调
                cb(err)
                cbTriggered = true
            }
            child.kill() // 杀死子进程
        }).once('exit', (code, signal) => {
            if (!cbTriggered) {
                cb(new Error(`Child exited with code: ${code}`))
            }
            poolSize--
            const childIdx = readyPool.indexOf(child) // 如果子进程因为其他原因退出,已经不可用,在进程池中进行清理
            if (childIdx > -1) {
                readyPool.slice(childIdx, 1)
            }
        }).once('message', (result) => {
            cb(null, result) // 收到结果后执行回调
            cbTriggered = true
            readyPool.push(child) // 执行任务结束后,当前子进程作为空闲工作者
            if (awaiting.length) {
                setImmediate.apply(null, awaiting.shift()) // 如果还有排队任务,在下个周期执行
            }
        }).send(job)

    }
}

使用工作池:

const workerPool = require('./worker-pool')


const runJob = workerPool('./worker.js')  // 初始化工作池

    runJob(1000, (err, data) => {
        if (err) console.error(err)
        console.log(data)
    })

同步运行

使用execFileSync, spawnSync, execFile同步执行子进程

const cp = require('child_process')

const stdout = cp.execFileSync('echo', ['hello']).toString()
console.log(stdout)

const ps = cp.spawnSync('ps', ['aux'])
const grep = cp.spawnSync('grep', ['node'], {
    input: ps.stdout,
    encoding: 'utf8',
})

console.log(grep)

const execSyncOut = cp.execSync('ps aux | grep node').toString()
console.log(execSyncOut)

同步子进程中的异常处理

  • 如果在同步执行的方法中返回非零状态,会抛出异常
  • 使用try/cache捕获异常信息

try {
    cp.execFileSync('cat', ['not-exist-file']).toString()
} catch (e) {
    console.error(e.status)
    console.error(e.stderr)
}