Buffer:使用比特、字节以及编码 Events:玩转EventEmitter 流:最强大和最容易误用的功能
第三章 Buffers 使用比特、字节以及编码
- 介绍Buffer数据类型
- 修改数据编码
- 二进制文件转化为JSON格式
- 创建自定义对二进制协议
-
介绍
历史上JavaScript对二进制支持欠佳,其自身没有提供一个良好对方式去处理原始对内存数据,出于对性能对考虑,Node关于原始数据对处理都在Buffer数据类型中。
Buffers是代表原始堆的分配额的数据类型,在JS中使用类数组对方式来使用。
修改数据编码
文件操作以及很多网络操作都会将数据作为Buffer返回
const fs = require('fs')
fs.readFile('./package.json', (err, buf) => {
console.log(Buffer.isBuffer(buf)) // true
})
转换为其他格式
const fs = require('fs')
fs.readFile('./package.json', (err, buf) => {
console.log(Buffer.isBuffer(buf))
console.log(buf)
console.log(buf.toString())
console.log(buf.toString('ascii'))
})
- toString方法默认将Buffer转换为utf8格式的字符串
- 如果知道内容只包含ascii字符,也可以指定编码类型
修改字符串编码
const encoded = new Buffer('yangxu: passwd').toString('base64')
// eWFuZ3h1OiBwYXNzd2Q=
使用Buffer创建png图片的data URIs
const fs = require('fs')
const mine = 'image/png'
const encoding = 'base64'
const data = fs.readFileSync('./images.png').toString(encoding)
const uri = `data:${mine};${encoding},${data}`
console.log(uri)
// ...
将dataUri写入文件
const fs = require('fs')
const uri = '...'
const data = uri.split(',')[1]
const buffer = new Buffer(data, 'base64')
fs.writeFileSync('./image_write.png', buffer)
二进制文件转换为JSON
头部信息
- 二进制文件利用头部信息来存储文件相关的基础信息
- Buffer获取索引为0的数据和JavaScript中数组相关操作类似,只是索引是指在内存中的字节位置
- buf[0]代表第0位的八位字节,或无符号的8比特整数,或8比特的正整数
头部信息中1-3位包含最后更新时间,YYMMDD格式
const fs = require('fs')
fs.readFile('./world.dbf', (error, buffer) => {
const header = {}
const date = new Date()
date.setUTCFullYear(buffer[1])
date.setUTCMonth(buffer[2])
date.setUTCDate(buffer[3])
header.lastUpdated = date.toUTCString()
console.log(header) // { lastUpdated: 'Fri, 20 Dec 109 16:31:44 GMT' }
})
4-7 记录数量,32-bit的数字
header.totalRecords = buffer.readUInt32LE(4)
8-9 16-bit数字 头部的字节数 10-11 16-bit数字 记录部分的字节数
header.bytesInHeader = buffer.readUInt16LE(8) header.bytesPerRecord = buffer.readUInt16LE(10)
32-n each 32byte 字段描述,可能包含多个字段的数据 n+1 1byte 字段分隔符
const fs = require('fs')
fs.readFile('./world.dbf', (error, buffer) => {
const header = {}
const date = new Date()
date.setUTCFullYear(buffer[1])
date.setUTCMonth(buffer[2])
date.setUTCDate(buffer[3])
header.lastUpdated = date.toUTCString()
header.totalRecords = buffer.readUInt32LE(4)
header.bytesInHeader = buffer.readUInt16LE(8)
header.bytesPerRecord = buffer.readUInt16LE(10)
const fields = []
let fieldOffset = 32
const fieldLength = 32
const fieldTerminator = 0x0D
const FIELD_TYPES = {
C: 'Character',
N: 'Numeric'
}
while(buffer[fieldOffset] !== fieldTerminator) {
const fieldBuf = buffer.slice(fieldOffset, fieldOffset + fieldLength) // 只是快照,并非拷贝
const field = {
name: fieldBuf.toString('ascii', 0, 11).replace(/\u0000/, ''),
type: FIELD_TYPES[fieldBuf.toString('ascii', 11, 12)],
length: fieldBuf[16]
}
fields.push(field)
fieldOffset += fieldLength
}
console.log(header)
console.log(fields)
})
{ lastUpdated: 'Fri, 20 Dec 109 16:45:03 GMT',
totalRecords: 2127,
bytesInHeader: 225,
bytesPerRecord: 74 }
[ { name: 'CODE\u0000\u0000\u0000\u0000\u0000\u0000',
type: 'Character',
length: 2 },
{ name: 'CNTRY_NAME', type: 'Character', length: 39 },
{ name: 'POP_CNTRY\u0000', type: 'Numeric', length: 10 },
{ name: 'CURR_TYPE\u0000', type: 'Character', length: 16 },
{ name: 'CURR_CODE\u0000', type: 'Character', length: 4 },
{ name: 'FIPS\u0000\u0000\u0000\u0000\u0000\u0000',
type: 'Character',
length: 2 } ]
创建自己的二进制协议
使用良好的二进制协议来传输数据是一种简洁高效的方法
- 首先要确定要传输哪些数据以及如何去表示,确定规范
- 使用掩码来确定数据存放在哪个数据库
- 数据保存以一个在0-255范围内的无符号正数(单字节)的键值标识
- 通过zlib压缩
0 1byte 数据库
1 1byte 数据库键
2-n 0-nbyte 数据
- 数值的二进制表示:
8..toString(2)
两个点用来区分小数
const database = [[],[],[],[],[],[],[],[]]
const bitmasks = [1, 2, 4, 8, 16, 32, 64, 128]
function store(buf) {
const db = buf[0]
const key = buf.readUInt8(1)
bitmasks.forEach((bitmask, index) => {
if ((db & bitmask) === bitmask) {
database[index][key] = 'some data'
}
})
}
- 掩码计算利用&运算判断属于哪个db
使用zlib解压缩
const zlib = require('zlib')
const database = [[],[],[],[],[],[],[],[]]
const bitmasks = [1, 2, 4, 8, 16, 32, 64, 128]
function store(buf) {
const db = buf[0]
const key = buf.readUInt8(1)
if (buf[2] === 0x78) { // 判读是否被压缩
zlib.inflate(buf.slice(2), (error, inflatedBuf) => { // 解压缩
if (error) return console.error(error)
const data = inflatedBuf.toString()
bitmasks.forEach((bitmask, index) => {
if ((db & bitmask) === bitmask) {
database[index][key] = 'some data'
}
})
})
}
}
使用zlib压缩并存储数据
const header = new Buffer(2)
header[0] = 8
header[1] = 0
zlib.deflate('my message', (error, deflateBuf) => {
if (error) return console.error(error)
const message = Buffer.concat([header, deflateBuf])
store(message)
})
第四章 玩转EventEmitter
- 使用Node的EventEmitter模块
- 异常管理
- 第三方模块中如何使用EventEmitter
- 如何使用domains模块的events
- EventEmitter的替代品
基础用法:从EventEmitter继承
const events = require('events')
class MusicPlayer extends events.EventEmitter { // 继承
constructor() {
super()
this.playing = false
}
}
const musicPlayer = new MusicPlayer()
musicPlayer.on('play', (track) => { // 监听播放事件
this.playing = true
})
musicPlayer.on('play', (track) => { // 使用多个监听器
console.log(`play ${track}`)
})
musicPlayer.on('stop', () => { // 监听停止事件
this.playing = false
console.log('stop')
})
musicPlayer.emit('play', 'The Roots - The Fire')
setTimeout(() => {
musicPlayer.emit('stop')
}, 1000)
- 移除监听器
musicPlayer.removeAllListeners()
function listener() {
}
musicPlayer.on('play', listener)
musicPlayer.removeListener('play', listener)
- 只触发一次的监听器
musicPlayer.once('play', listener)
混合EventEmitter
当给一个已有当类增加事件能力当时候,使用组合代替继承
class MusicPlayer { // 继承
constructor() {
this.playing = false
}
}
const musicPlayer = new MusicPlayer()
Object.assign(musicPlayer, events.EventEmitter.prototype) // mixin
异常处理
当一个EventEmitter实例发生错误时,通常会发出一个error事件,在Node中,error事件被当作特殊情况,如果没有监听,默认打印堆栈并退出程序
const events = require('events')
class MusicPlayer { // 继承
constructor() {
this.playing = false
}
}
const musicPlayer = new MusicPlayer()
Object.assign(musicPlayer, events.EventEmitter.prototype)
musicPlayer.on('play', (track) => { // 监听播放事件
this.playing = true
})
musicPlayer.on('error', (track) => { // 错误处理
console.log(`play ${track}`)
})
musicPlayer.on('stop', () => { // 监听停止事件
this.playing = false
console.log('stop')
})
musicPlayer.emit('error', 'The Roots - The Fire') // 触发错误
setTimeout(() => {
musicPlayer.emit('stop')
}, 1000)
通过domains管理异常
帮助处理多个EventEmitter实例的异常,例如多个非阻塞的API 能够集中处理多个异步操作,在处理多个相互依赖的I/O操作时非常有帮助
const events = require('events')
const domain = require('domain')
const audioDomain = domain.create() // 创建一个domain
class AudioDevice extends events.EventEmitter {
play() {
this.emit('error', 'not implemented yet')
}
}
const audioDevice = new AudioDevice()
audioDevice.on('play', () => {
audioDevice.play()
})
class MusicPlayer extends events.EventEmitter {
constructor() {
super()
this.audioDevice = audioDevice
}
play() {
this.audioDevice.emit('play')
}
}
audioDomain.on('error', (err) => { // 捕获异常
console.log('error')
})
audioDomain.run(() => { // domain之内执行的代码都会被捕获
const musicPlayer = new MusicPlayer()
musicPlayer.play()
})
高级模式
反射
动态的响应EventEmitter的变化,或者查询他的监听器 利用EventEmitter的特殊事件newListener
const events = require('events')
class MusicPlayer extends events.EventEmitter {
constructor() {
super()
this.playing = false
}
}
const musicPlayer = new MusicPlayer()
musicPlayer.on('newListener', (name, listener) => { // 监听`监听`事件
console.log(name, listener)
})
musicPlayer.on('play', (track) => { // 监听播放事件
this.playing = true
})
探索EventEmitter
大型项目中,模块之间的通信
- Express的app对象
- Redis客户端的RedisClient
组织事件名称
使用一个对象来存储所有事件名,在项目中创建一个统一的位置
MusicPlayer.events = {
play: 'play',
stop: 'stop'
}
第三方模块以及扩展 - EventEmitter的替代方案
- 发布/订阅模式 - 水平扩展分布式集群,借助AMQP、js-signals进行多个Node进程之间的通信
-
第五章 流:最强大和最容易误解的功能
-
流是基于事件的api,借助事件和非阻塞I/O库,流模块允许在可用的时候动态处理,在不用的时候释放
- 什么是流和如何使用
- 如何使用Node集成的流API
- 流API在Node0.8版本以下的使用
- 0.10版本以后的流的原始类
- 测试流的策略
-
流的简介
流适合做什么:
- 内置 - Nodejs的很多内置模块
- HTTP - 网络技术
- 解析器 - xml、json等解析模块
- 浏览器 - 被客户端共享等模块
- Audio - 声音模块
- RPC - 进程间通信
- 测试 - 友好的测试工具库
- 控制、元、状态 - 对流对更加抽象应用
当处理大文件压缩、归档、媒体文件和巨大的日志文件等的时候,内存的使用就成为问题,无法一次性加载所有内容。这时候就需要应用流,配合一个合适的缓冲区,每次只处理部分数据
新版本和老版本中的流
- 0.10版本对流对API进行了重大更新,老版本继续兼容
- 新版本API语法更加严格,但是功能更加灵活
- 新版本模型中,从一个可读流开始,到一个可写流结束
新版流中的可用类
- stream.Readable - 用于在IO上获取数据
- stream.Writable - 用于在输出的目标写入数据
- stream.Duplex - 一个可读和可写的流,例如网络连接
- stream.Transform - 一个会以某种方式修改数据的双工流,没有输入数据要匹配输出数据的限制
内置流
通过内置的流来实现静态Web服务器
第一个版本:
const fs = require('fs')
const http = require('http')
http.createServer((req, res) => {
fs.readFile(__dirname + '/index.html', (err, data) => {
if (err) {
res.statusCode = 500
res.end(String(err))
} else {
res.end(data)
}
})
}).listen(8000)
这个版本存在一个问题,readFile将内容读取到内存后,返回给前台请求;当文件过大时适应性无法保证
第二个版本,数据通过管道输出到res请求响应:
const fs = require('fs')
const http = require('http')
http.createServer((req, res) => {
fs.createReadStream(__dirname + '/index.html').pipe(res)
}).listen(8000)
第三个版本,对返回数据进行gzip压缩:
const fs = require('fs')
const http = require('http')
const zlib = require('zlib')
http.createServer((req, res) => {
res.writeHead(200, {'content-encoding': 'gzip'})
fs.createReadStream(__dirname + '/index.html')
.pipe(zlib.createGzip())
.pipe(res)
}).listen(8000)
流的错误处理
流继承于事件类,意味着提供了标准明晰的错误处理方式。
const fs = require('fs')
const stream = fs.createReadStream('not-exist')
stream.on('error', (err) => {
console.trace()
console.error(err)
})
使用第三方模块
在Express中使用流
一个简单的express应用如下:
const express = require('express')
const app = express()
app.get('/', (req, res) => {
res.send('Hello world')
})
app.listen(3000)
res对象实际上是一个response对象,继承自http.ServerResponse,express集成了一种方式让res.send方法能够使用流。
使用流的express应用:
const express = require('express')
const stream = require('stream')
const util = require('util')
const app = express()
class StatStream extends stream.Readable {
constructor(limit) {
super()
this.limit = limit
}
_read(size) {
if (this.limit === 0) {
this.push(null) // push()无法终止流
} else {
this.push(util.inspect(process.memoryUsage()))
this.push('n')
this.limit--
}
}
}
app.get('/', (req, res) => {
const statStream = new StatStream(10)
statStream.pipe(res)
})
app.listen(3000)
使用流基类
正确的选择基类
- Readable - 包装一个底层的IO数据源
- Writable - 发送数据或者从程序中获取输出到其他地方使用
- Transform - 解析数据并修改
- Duplex - 包装一个数据源,并且可以接收消息
- PassThrough - 从流中提取数据,并且不修改它
实现一个可读流
- 继承stream.Readable
- 实现_read方法
- 调用push(null)结束流
代码参考上文“在Express中使用流”
实现一个可写流
- 继承stream.Writable
- 实现_write方法
const stream = require('stream')
class GreenStream extends stream.Writable {
_write(chunk, encoding, callback) {
process.stdout.write('u001b[32m' + chunk + 'u001b[39m')
callback()
}
}
process.stdin.pipe(new GreenStream())
使用双工流转换和接收数据
- 继承Duplex实现 ```js const stream = require(‘stream’)
class HungryStream extends stream.Duplex {
constructor() {
super()
this.waiting = false
}
_write(chunk, encoding, callback) {
process.stdout.write('u001b[32m' + chunk + 'u001b[39m')
callback()
}
_read(size) {
if (!this.waiting) {
process.stdout.write('Feed me >')
this.waiting = true
}
} }
process.stdin.pipe(new HungryStream()).pipe(process.stdout)
- 双工流的优势是处理管道之间的关系
### 使用转换流解析数据
- 流被长期用来实现高效的转换器
```js
const stream = require('stream')
const fs = require('fs')
class CSVParser extends stream.Transform {
constructor() {
super()
this.value = ''
this.headers = []
this.values = []
this.line = 0
}
addValue() {
if (this.line === 0) {
this.headers.push(this.value)
} else {
this.values.push(this.value)
}
this.value = ''
}
toObject() { // 当前行转换为json格式
const obj = {}
for (let i = 0; i < this.headers.length; i++) {
obj[this.headers[i]] = this.values[i]
}
return obj
}
_transform(chunk, encoding, done) {
chunk = chunk.toString()
for (let i = 0; i < chunk.length; i++) {
const c = chunk.charAt(i)
if (c === ',') { // 读取单元格结束
this.addValue()
} else if (c === 'n') { // 读完一行
this.addValue()
if (this.line > 0) { // 内容行,转换为json
this.push(JSON.stringify(this.toObject())) // 使用push方法推送到输出
}
this.values = [] // 清空内容缓存
this.line++
} else { // 继续读取单元格
this.value += c
}
}
}
}
const parser = new CSVParser()
fs.createReadStream(__dirname + '/sample.csv')
.pipe(parser)
.pipe(process.stdout)
- 输入内容自动进入transform方法
- 使用push方法推送到输出
高级模块与优化
流基类接受各种选项定制他们的行为,而且其中一些可以用来调整性能
创建一个流的基准测试
const fs = require('fs')
const zlib = require('zlib')
function benchStream(inSize, outSize) {
const time = process.hrtime() // 纳秒级起始事件
let watermark = process.memoryUsage().rss
const input = fs.createReadStream('/usr/share/dict/words', {
bufferSize: inSize // 设置读入缓冲区大小
})
const gzip = zlib.createGzip({
chunkSize: outSize // 设置压缩缓冲区大小
})
const output = fs.createWriteStream('out.gz', {bufferSize: inSize})
const memoryCheck = setInterval(() => {
const rss = process.memoryUsage().rss
if (rss > watermark) {
watermark = rss // 记录内存峰值
}
}, 50)
input.on('end', () => {
clearInterval(memoryCheck)
const diff = process.hrtime(time) // 计算执行时间
console.log([
inSize, outSize, (diff[0] * 1e9 + diff[1]) / 1e6, watermark / 1024
].join(','))
})
input.pipe(gzip).pipe(output) // 读入文件经过压缩后,写入结果
return input
}
console.log('file size, gzip size, ms, RSS')
let fileSize = 128
let zipSize = 5024
function run(times) {
benchStream(fileSize, zipSize).on('end', () => {
times--
fileSize *= 2 // 每次执行完成后,扩大缓冲区
zipSize *= 2
if (times > 0) { // 递归调用
run(times)
}
})
}
run(10)
- 随着缓冲区的增大,时间并未出现明显变化,但是峰值内存持续增加
测试流
- 用一些合适的样本数据来驱动流
- 调用read()或write()方法来获取结果
- 对比结果和预期
通过module.exports = CSVParser
将之前的csv解析器作为模块
const assert = require('assert')
const fs = require('fs')
const CSVParser = require('./csv-parser')
const parser = new CSVParser()
const actual = []
fs.createReadStream(__dirname + '/sample.csv').pipe(parser)
process.on('exit', () => {
actual.push(parser.read())
const expected = [
{} // 期望的结果
]
assert.deepEqual(expected, actual) // 对比结果
})