【Node.js】深入浅出 stream 模块
正好读到了相关的文章推送,正巧也把之前看到的和弄了一小半的资料整理一下。- - node 的学习又再此开启
让数据流动起来。数据从原来的 source 流向 dest,要像水一样,慢慢的一点一点通过一个管道流过去。stream 并不是 node.js 独有的概念,而是一个操作系统最基本的操作方式,只不过 node.js 有 API 支持这种操作方式。linux 命令的|就是 stream,因此所有 server 端语言都应该实现 stream 的 API。
为何要使用 stream
例子,在线播放视频。一点一点从服务端将视频流动到本地播放器,一边流动一边播放,直到播放完成。
var http = require('http')var fs = require('fs')var path = require('path')var server = http.createServer(function (req, res) { var fileName = path.resolve(__dirname, 'data.txt') fs.readFile(fileName, function (err, data) { res.end(data) })})server.listen(8000)这段 node.js 代码跑起来会读取文件,语法上没问题,但如果 data.txt 文件非常大,在响应大量用户的并发请求时,程序可能会消耗大量的内存,这样很可能会造成用户连接缓慢的问题。而且,如果并发请求过大,服务器内存开销也很大。
要解决该问题很简单,用 stream 改造一下。即不是把全部文件读取了再返回,而是一边读取一边返回,一点点地把数据流动到客户端。
var fs = require('fs')var http = require('http')var path = require('path')var server = http.createServer(function (req, res) { var fileName = path.resolve(__dirname, 'data.txt') var stream = fs.createReadStream(fileName) stream.pipe(res)})server.listen(8000)小结一下,之所以用 stream,是因为一次性读取、操作大文件,内存和网络是吃不消的,因此要让数据流动起来,一点一点地进行操作。这符合分而治之的思想。
stream 流转的过程
从管道换水的例子可看出,stream 包括 source, dest 还有中间的管道,下面将通过这三方面介绍 stream 的过程。其中比较关键的 api 有:
- data 事件,用来监听 stream 数据的输入
- end 事件,用来监听 stream 数据输入完成
- fs.createReadStream 方法,返回一个文件读取的 stream 对象
- fs.createWriteStream 方法, 返回一个文件读取的 stream 对象
- pipe 方法,用来做数据流转
source —— 从哪里来
stream 常见的来源主要有三种:
- 从控制台输入
- http 请求中的 request
- 读取文件
运行如下代码,然后从控制台输入任何内容,都会被 data 事件监听到,process.stdin 就是一个 stream 对象。注意data就是stream用来监听数据传入的一个自定义函数,后续会大量用到该方法。
process.stdin.on('data', function (chunk) { console.log('stream by stdin', chunk.toString())})http 请求中的 request 输入可以参考如下代码片段。即客户端发起 http 请求,服务端可以通过这种方式(用到了 data 事件监听)。这种 http 请求一般是一个 post 请求,上传数据。注意,end 用来监听 stream 数据传输完毕,一般和 data 共用。
req.on('data', function (chunk) { // 一点一点 接受内容 data += chunk.toString()})res.on('end', function () { // end表示数据接受完成})读取文件用 fs.createReadStream(…) 可以返回一个读取文件的 stream 对象,该对象可以监听 data 和 end 事件
var fs = require('fs')var readStream = fs.createReadStream('./file1.txt')var length = 0readStream.on('data', function (chunk) { length += chunk.toString().length})readStream.on('end', function () { console.log(length)})管道
以上 source 三种代码示例有一个共同特点,就是对 stream 对象可以监听 data 和 end 事件。nodejs 监听自定义事件要使用.on 方法,例如 process.stdin.on(‘data’, …) ,能很直观地监听到 stream 数据的传入和结束。
dest —— 到哪里去
stream 常见的输出方式主要有三种:
- 输出到控制台
- http 请求中的 response
- 写入文件
如果让控制台输入这个 source 直接通过管道连接到控制台输入,即让数据从输入直接流向输出,代码如下:
process.stdin.pipe(process.stdout)nodejs 处理 http 请求时会用到 req 和 res,其实这两者都是 stream 对象。其中,req 是 source,res 是 dest。所以 stream 方式读取文件然后直接返回 http 请求
var stream = fs.createReadStream(fileName)stream.pipe(res)读取文件可以用 stream,写入文件也可以用 stream,其中 fs.createWriteStream(…) 会返回一个写入文件的 stream 对象,即 dest。这段代码,就是将一个文件中的内容,一点一点地流动到另外的文件中,完成复制功能。
var fs = require('fs')var readStream = fs.createReadStream('./file1.txt')var writeStrea = fs.createWriteStream('./file2.txt')readStream.pipe(writeStream)stream 常见使用场景
stream 常见的使用场景是 http 请求和文件操作。 总结来看,http 请求和文件操作都属于 IO,即 stream 主要的应用场景是处理 IO,这又回到了 stream 的本质——由于一次性 IO 操作过大,硬件开销太多,影响软件运行效率,因此将 IO 分批分段操作,让数据一点一点地流动起来,直到操作完成。
小结
本章主要介绍了 stream 的基本概念和常用 API
- stream 的基本概念,即 source -> 管道 -> dest
- 为何要用 stream —— 一次性操作 IO,内存和网络开销过大
- source pipe dest 各部分常用 API
- stream 的常见应用场景——IO 操作
node.js 实现 http 请求
var http = require('http')var fs = require('fs')var path = require('path')
var server = http.createServer(function (req, res) { var fileName = path.resolve(__dirname, 'data.txt') fs.readFile(fileName, function (err, data) { res.end(data) })})server.listen(8000)get 请求和 response
通过 req.method 可获取请求方法
var http = require('http')var path = require('path')var fs = require('fs')
var server = http.createServer(function (req, res) { var method = req.method if (method === 'GET') { var fileName = path.resolve(__dirname, 'data.txt') fs.readFile(fileName, function (err, data) { res.end(data) }) }})server.listen(8000)response 和 stream
response 常用的 API 有 send、end 等,如上面代码中的res.end(data),但是 response 也是一个 stream 对象。大家再次回顾一开始的管道换水的图,以及 source.pipe(dest)模型,response 就是一个 dest
var http = require('http')var path = require('path')var fs = require('fs')
var server = http.createServer(function (req, res) { var method = req.method if (method === 'GET') { var fileName = path.resolve(__dirname, 'data.txt') var stream = fs.createReadStream(fileName) stream.pipe(res) }})
server.listen(8000)使用 stream 对性能的提升
略
### 实际应用
对 response 使用 stream 特性能提高性能。因此,在 nodejs 中如果要返回的数据是经过 IO 操作得来的,例如上面例子中读取文件内容,可以直接使用 stream.pipe(res)这种方式,而不再使用 res.end(data)了。
这种应用的实例很多,主要有两种场景:
- 使用 node.js 作为服务代理,即客户端通过 node.js 服务作为跳板去请求其他服务,返回请求的内容
- 使用 node.js 作为静态文件服务器,直接返回静态文件
总结
本节主要讲解了 node.js 如何处理 http 的 get 请求,以及如何对 response 使用 stream 特性,并做了压力测试证明可以提高性能。
在 http post 请求中使用 stream
var http = require('http')var path = require('path')var fs = require('fs')
var server = http.createServer(function (req, res) { var method = req.method if (method === 'POST') { req.on('data', function (chunk) { // 接受到部分数据 console.log('chunk', chunk.toString().length) }) req.on('end', function () { console.log('end') res.end('ok') }) }})server.listen(8000)post 请求发送数据量若很大, res.on(‘data’, …) 要分多次才能把数据接受完毕
小结一下,request 和 response 一样,本身也是一个 stream 对象,可以用 stream 的特性,那肯定也能提高性能。两者的区别在于,request 是 source 类型,是 stream 的源头,而 response 是 dest 类型,是 stream 的目的地。
再举个例子,如果要把 request 请求的数据直接 response,那么最快的方式就是 res.pipe(res)
使用 stream 对性能的提升
var http = require('http')var path = require('path')var fs = require('fs')
var server = http.createServer(function (req, res) { var method = req.method if (method === 'POST') { var dataStr = '' req.on('data', function (chunk) { var chunkStr = chunk.toString() dataStr += chunkStr }) res.on('end', function () { var fileName = path.resolve(__dirname, 'data.txt') fs.writeFile(fileName, dataStr) res.end('ok') }) }})
server.listen(8000)用 stream 改良后如下:
var server = http.createServer(function (req, res) { var method = req.method if (method === 'POST') { var dataStr = '' req.on('data', function (chunk) { var chunkStr = chunk.toString() dataStr += chunkStr }) res.on('end', function () { var fileName = path.resolve(__dirname, 'data.txt') var writeStream = fs.createWriteStream(fileName) res.pipe(writeStream) req.on('end', function () { res.end('ok') }) }) }})实际应用
和 get 请求使用 stream 场景类似,post 请求使用 stream 的场景,主要是用于将接受的数据直接进行 IO 操作,例如:
- 将接收的数据直接存储为文件
- 将接收的数据直接 post 给其他的 web server
小结
介绍了 stream 在 http 请求中的应用和性能提升,IO 操作不仅仅包括网络 IO,还包括文件 IO,下一节讲解 stream 在文件操作中的使用,以及性能提升。
node.js 读写文件
var fs = require('fs')var path = require('path')var fileName = path.resolve(__dirname, 'data.txt')// 读文件fs.readFile(fileName, function (err, data) { if (err) return console.log(data.toString())})// 写文件fs.writeFile(fileName, 'xxx', function (err) { if (err) return console.log('写入成功')})根据以上读写操作,可以做一个简单的文件拷贝程序,将 data.txt 中的内容拷贝到 data-bak.txt 中
var fs = require('fs')var path = require('path')
// 读文件var fileName1 = path.resolve(__dirname, 'data.txt')fs.readFile(fileName1, function (err, data) { if (err) return var dataStr = data.toString() // 写入文件 var fileName2 = path.resolve(__dirname, 'data-bak.txt') fs.writeFile(fileName2, dataStr, function (err) { if (err) return console.log('拷贝文件成功') })})使用 stream 读写文件
- 使用 fs.cretaeReadStream(filename) 来创建读取文件的 stream 对象
- 使用 fs.createWriteStream(filename) 来创建写入文件的 stream 对象
var fs = require('fs')var path = require('path')
var filename1 = path.resolve(__dirname, 'data.txt')var filename2 = path.resolve(__dirname, 'data-bak.txt')var readStream = fs.createReadStream(filename1)var writeStream = fs.createWriteStream(filename2)readStream.pipe(writeStream)readStream.on('end', function () { console.log('拷贝完成')})使用 stream 带来的性能提升
略
应用场景
所有执行文件操作的场景,都应该尝试使用 stream,例如文件的读写、拷贝、解压缩、格式转换等。除非是体积小且读写次数少,性能上可忽略。
原生的 stream 对“行”无能为力,它只是把文件当作一个数据流,简单粗暴地流动。很多文件格式都是分行的,例如 csv 文件、日志文件等
node.js 提供了按行读取 API——readline,它本质上也是 stream,只不过是以“行”作为数据流动的单位
readline 的使用
相比于 stream 的 data 和 end 自定义事件,readline 需要监听 line 和 close 两个自定义事件。readline 的基本使用示例如下:
var fs = require('fs')var path = requrie('path')var readline = require('readline')
var fileName = path.resolve(__dirname, 'readline-data.txt')// 创建读取文件的stream对象var readStream = fs.createReadStream(fileName)// 创建readline对象var rl = readline.createInterface({ // 输入,依赖于stream对象 input: readStream,})// 监听逐行读取的内容rl.on('line', function (lineData) { console.log('lineData: ', lineData)})// 监听读取完成rl.on('close', function () { console.log('readline end')})以上代码,需要先根据文件名,创建读取文件的 stream 对象,然后传入并生成一个 readline 对象,然后通过 line 事件监听逐行读取,通过 close 事件监听读取完成。
应用场景
对于处理按行为单位的文件,如日志文件,使用 readline 是最佳选择。下面是一个实际例子,用来记录访问数:
var num = 0// 监听逐行读取的内容rl.on('line', function (lineData) { if ( lineData.indexOf('2018-10-30 14:00') >= 0 && lineData.indexOf('user.html') >= 0 ) { num++ }})// 监听读取完成rl.on('close', function () { console.log('num: ', num)})最后,将这个示例所用的代码贴到下面,供学习参考:
var fs = require('fs')var path = require('path')var memeye = require('memeye')var readline = require('readline')
memeye()
function doReadLine() { var fileName = path.resolve(__dirname, 'readline-data.txt') var readStream = fs.createReadStream(fileName) var rl = readline.createInterface({ input: readStream, })
var num = 0 rl.on('line', function (lineData) { if ( lineData.indexOf('2018-10-30 14:00') >= 0 && lineData.indexOf('user.html') >= 0 ) { num++ } }) // 监听读取完成 rl.on('close', function () { console.log('num: ', num) })}
setTimeout(doReadLine, 5000)stream 就是数据一点一点地流动起来,那么每次流动的数据是什么呢?
var readStream = fs.createReadStream('./file.txt')readStream.on('data', function (chunk) { // ...})二进制
冯诺依曼结构,其核心内容之一就是:计算机使用二进制形式存储计算。
计算机内存由若干个存储单元组成,每个存储单元只能存储 0 或 1(因为内存是硬件,计算机硬件本质上就是一个一个的电子元件,只能识别充电和放电的状态,充电代表 1,放电代表 0),即二进制单元(bit)。但是这一个单元所能存储的信息太少,因此约定将 8 个二进制单元为一个基本存储单元,叫做字节(byte)。一个字节所能存储的最大整数就是(2^8 = 256),也正好是 16^2,因此也常常使用两位的 16 进制数代表一个字节。例如 css 常见的颜色值就是 6 位 16 进制数字,它占用 3 个字节的空间。
二进制是计算机最底层的数据格式,也是一种通用格式。计算机中的任何数据格式,字符串、数字、视频、音频、程序、网络包等,在最底层都是用二进制来进行存储的。这些高级格式和二进制之间,都可通过固定的编码格式进行相互转换。例如,C 语言中 int32 类型的十进制数,就占用 32bit 即 4byte。总之,计算机底层存储的数据都是二进制格式,各种高级类型都有对应的编码规则,和二进制进行相互转化。
nodejs 表示二进制
Buffer 就是 nodejs 中二进制的表述形式。
var str = '学习nodejs stream'var buf = Buffer.from(str, 'utf-8')以上代码,先通过 Buffer.from 将一段字符串转化为二进制形式,其中 utf-8 是一个编码规则。二进制打印出来之后是一个类数组的对象,每个元素都是两位的 16 进制数字,即代表一个 byte,打印出来的 buf 一共有 20byte。即根据 utf-8 的编码规则,这段字符串需要 20byte 进行存储,最后再通过 utf-8 规则将二进制转换为字符串并打印出来
流动的数据是二进制格式
var readStrem = fs.createReadStream('./file.txt')readStream.on('data', function (chunk) { console.log(chunk instanceof Buffer) console.log('chunk: ', chunk)})可以看到 stream 中流动的数据就是 Buffer 类型。因此,在使用 stream chunk 时,需要将这些二进制数据转换为相应的格式。例如之前讲解 post 请求是,从 request 中接收数据就是这样。再回归一下之前的代码,就能明白了。
var dataStr = ''req.on('data', function (chunk) { // 将二进制数据先转化为字符串 var chunkStr = chunk.toString() dataStr += chunkStr})stream 中为何要“流动”二进制格式的数据呢?
为了优化 IO 操作。无论是文件 IO 还是网络 IO,其中包含的数据格式是未知的,如字符串、音频、视频、网络包等。即便这些字符串,其编码规则也是未知的,如 ASC 编码、utf-8 编码。再这么多未可知的情况下,只能是以不变应万变,直接用最通过的二进制格式,谁都能认识,且二进制格式进行流动和传输,效率是最高的。
Buffer 带来的性能提升
略
总结
- 二进制和字节的基本认识
- node.js 中 Buffer 表示二进制
- stream 中的 chunk 是二进制格式,以及和字符串格式的转换
- 二进制格式在 http 请求中的性能提升
stream 常用类型总结
再次回顾这张图 source 通过一个管道流向了 dest,如下图:
这里的 source 可能是 http 请求中的 request,也可能是读取文件的 stream 对象,也可能是 process.stdin;这里的 dest 可能是请求中的 response,也可能是写入文件的 stream 对象,也可能是 process.stdout;这里的管道就是 pipe 函数。
先不管 pipe 函数。source 和 dest 完全就是两个不同的类型,一个是读取数据的,叫做 readable stream,一个是写入数据的,叫作 Writeable stream。除了这两种类型之外,还有一种类型叫作 duplex stream(双工流),即有读取又有写入能力。示例代码如下:
var fs = require('fs')var zlib = require('zlib')var readStream = fs.createReadStream('./file.txt')var writeStream = fs.createWriteStream('./file.txt')readStream.pipe(zlib.createGzip()).pipe(writeStream)readable stream
http 请求中的 request 和读取文件的 stream 对象都是 readable stream。它有两种常用操作方式,第一种是直接将数据 pipe 到一个 Writeable stream
var fileName = path.resolve(__dirname, 'data.txt')var stream = fs.createReadStream(fileName)//stream.pipe(res)第二种是通过监听 on end 自定义事件来获取数据再手动处理,例如之前讲解 post 请求时的代码示例
var dataStr = ''req.on('data', function (chunk) { // 接收到数据先存储起来 var chunkStr = chunk.toString() dataStr += chunkStr})req.on('end', function () { // 接收完数据将数据写入文件 var fileName = path.resolve(__dirname, 'post.txt') fs.writeFile(fileName, dataStr) res.end('OK')})以上说的两个例子,都是已经分装好的 readable stream,那么它本来的面目是怎样的?如下代码:
var Readable = require('stream').Readable// 构造一个readable stream并往里添数据var rs = new Readable()rs.push('learn')rs.push('nodejs')rs.push('stream')rs.push(null)// pipe到一个Writeable streamrs.pipe(process.stdout)从上代码可看出,nodejs 提供了 readable stream 的构造函数,可以 new 出一个新的 readable stream 对象。然后通过 push 函数往里灌入完成,即可输入了。最后 pipe 到了一个 Writeable stream
Writeable stream
根据之前的分析,http 请求中的 response 和写入文件的 stream 对象都是 Writeable stream,它可以作为参数传入 pipe 函数,以读取上游的数据。例如之前讲解文件操作时拷贝文件的代码示例。
var fileName = path.resolve(__dirname, 'post.txt')var writeStream = fs.createWriteStream(fileName)res.pipe(writeStream)以上代码中 writeStream 是已经封装好了的 Writeable stream ,下面再来看看它的真实面目。
var Writeable = require('stream').Writeable
var ws = Writeable()ws._write = function (chunk, enc, next) { // 输出流动的数据 console.log(chunk.toString()) // 继续监听下一次输出 next()}// 作为参数传递到pipe函数中process.stdin.pipe(ws)根据以上代码得知,nodejs 提供了 Writeable 构造函数可以 new 一个新的 Writeable stream。通过实现它的_write 方法即可监听到每次流动的数据,运行 next()可继续监听.
再谈 pipe
之前一直是用source.pipe(dest)这种模式来用 pipe 的,其实 pipe 可以链式调用。例如上文演示的 duplex stream 示例代码readStream.pipe(zlib.createGzip()).pipe(writeStream),还有之前讲解文件操作最后列举的 gulp 配置文件~
之前讲解source.pipe(dest)模式是为了方便理解和使用,现在我们更新一个更严谨的 pipe 用法:
- 调用 pipe 的对象必须是 readable stream 或者 duplex stream,即具有读取数据的功能,如 req.pipe(…)
- 传入 pipe 的参数必须是 writeable stream 或者 duplex stream, 即具有写入数据的功能,如 req.pipe(res)
- pipe 支持链式调用
更新了 pipe 的最新规则,再来看就不会有困惑了。
小结
这里主要讲解了 stream 的常用类型和 pipe 函数的规则:
- stream 的常见类型:readable stream 和 writeable stream
- readable stream 的本质和用法
- writeable stream 的本质和用法
- pipe 的新规则
支持与分享
如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!