【Node.js】深入浅出 stream 模块

4710 字
24 分钟
【Node.js】深入浅出 stream 模块

正好读到了相关的文章推送,正巧也把之前看到的和弄了一小半的资料整理一下。- - node 的学习又再此开启

让数据流动起来。数据从原来的 source 流向 dest,要像水一样,慢慢的一点一点通过一个管道流过去。stream 并不是 node.js 独有的概念,而是一个操作系统最基本的操作方式,只不过 node.js 有 API 支持这种操作方式。linux 命令的|就是 stream,因此所有 server 端语言都应该实现 stream 的 API。

为何要使用 stream#

例子,在线播放视频。一点一点从服务端将视频流动到本地播放器,一边流动一边播放,直到播放完成。

var http = require('http')
var fs = require('fs')
var path = require('path')
var server = http.createServer(function (req, res) {
var fileName = path.resolve(__dirname, 'data.txt')
fs.readFile(fileName, function (err, data) {
res.end(data)
})
})
server.listen(8000)

这段 node.js 代码跑起来会读取文件,语法上没问题,但如果 data.txt 文件非常大,在响应大量用户的并发请求时,程序可能会消耗大量的内存,这样很可能会造成用户连接缓慢的问题。而且,如果并发请求过大,服务器内存开销也很大。

要解决该问题很简单,用 stream 改造一下。即不是把全部文件读取了再返回,而是一边读取一边返回,一点点地把数据流动到客户端。

var fs = require('fs')
var http = require('http')
var path = require('path')
var server = http.createServer(function (req, res) {
var fileName = path.resolve(__dirname, 'data.txt')
var stream = fs.createReadStream(fileName)
stream.pipe(res)
})
server.listen(8000)

小结一下,之所以用 stream,是因为一次性读取、操作大文件,内存和网络是吃不消的,因此要让数据流动起来,一点一点地进行操作。这符合分而治之的思想。

stream 流转的过程#

从管道换水的例子可看出,stream 包括 source, dest 还有中间的管道,下面将通过这三方面介绍 stream 的过程。其中比较关键的 api 有:

  • data 事件,用来监听 stream 数据的输入
  • end 事件,用来监听 stream 数据输入完成
  • fs.createReadStream 方法,返回一个文件读取的 stream 对象
  • fs.createWriteStream 方法, 返回一个文件读取的 stream 对象
  • pipe 方法,用来做数据流转

source —— 从哪里来#

stream 常见的来源主要有三种:

  • 从控制台输入
  • http 请求中的 request
  • 读取文件

运行如下代码,然后从控制台输入任何内容,都会被 data 事件监听到,process.stdin 就是一个 stream 对象。注意data就是stream用来监听数据传入的一个自定义函数,后续会大量用到该方法。

process.stdin.on('data', function (chunk) {
console.log('stream by stdin', chunk.toString())
})

http 请求中的 request 输入可以参考如下代码片段。即客户端发起 http 请求,服务端可以通过这种方式(用到了 data 事件监听)。这种 http 请求一般是一个 post 请求,上传数据。注意,end 用来监听 stream 数据传输完毕,一般和 data 共用。

req.on('data', function (chunk) {
// 一点一点 接受内容
data += chunk.toString()
})
res.on('end', function () {
// end表示数据接受完成
})

读取文件用 fs.createReadStream(…) 可以返回一个读取文件的 stream 对象,该对象可以监听 data 和 end 事件

var fs = require('fs')
var readStream = fs.createReadStream('./file1.txt')
var length = 0
readStream.on('data', function (chunk) {
length += chunk.toString().length
})
readStream.on('end', function () {
console.log(length)
})

管道#

以上 source 三种代码示例有一个共同特点,就是对 stream 对象可以监听 data 和 end 事件。nodejs 监听自定义事件要使用.on 方法,例如 process.stdin.on(‘data’, …) ,能很直观地监听到 stream 数据的传入和结束。

dest —— 到哪里去#

stream 常见的输出方式主要有三种:

  • 输出到控制台
  • http 请求中的 response
  • 写入文件

如果让控制台输入这个 source 直接通过管道连接到控制台输入,即让数据从输入直接流向输出,代码如下:

process.stdin.pipe(process.stdout)

nodejs 处理 http 请求时会用到 req 和 res,其实这两者都是 stream 对象。其中,req 是 source,res 是 dest。所以 stream 方式读取文件然后直接返回 http 请求

var stream = fs.createReadStream(fileName)
stream.pipe(res)

读取文件可以用 stream,写入文件也可以用 stream,其中 fs.createWriteStream(…) 会返回一个写入文件的 stream 对象,即 dest。这段代码,就是将一个文件中的内容,一点一点地流动到另外的文件中,完成复制功能。

var fs = require('fs')
var readStream = fs.createReadStream('./file1.txt')
var writeStrea = fs.createWriteStream('./file2.txt')
readStream.pipe(writeStream)

stream 常见使用场景#

stream 常见的使用场景是 http 请求和文件操作。 总结来看,http 请求和文件操作都属于 IO,即 stream 主要的应用场景是处理 IO,这又回到了 stream 的本质——由于一次性 IO 操作过大,硬件开销太多,影响软件运行效率,因此将 IO 分批分段操作,让数据一点一点地流动起来,直到操作完成。

小结#

本章主要介绍了 stream 的基本概念和常用 API

  • stream 的基本概念,即 source -> 管道 -> dest
  • 为何要用 stream —— 一次性操作 IO,内存和网络开销过大
  • source pipe dest 各部分常用 API
  • stream 的常见应用场景——IO 操作

node.js 实现 http 请求#

var http = require('http')
var fs = require('fs')
var path = require('path')
var server = http.createServer(function (req, res) {
var fileName = path.resolve(__dirname, 'data.txt')
fs.readFile(fileName, function (err, data) {
res.end(data)
})
})
server.listen(8000)

get 请求和 response#

通过 req.method 可获取请求方法

var http = require('http')
var path = require('path')
var fs = require('fs')
var server = http.createServer(function (req, res) {
var method = req.method
if (method === 'GET') {
var fileName = path.resolve(__dirname, 'data.txt')
fs.readFile(fileName, function (err, data) {
res.end(data)
})
}
})
server.listen(8000)

response 和 stream#

response 常用的 API 有 send、end 等,如上面代码中的res.end(data),但是 response 也是一个 stream 对象。大家再次回顾一开始的管道换水的图,以及 source.pipe(dest)模型,response 就是一个 dest

var http = require('http')
var path = require('path')
var fs = require('fs')
var server = http.createServer(function (req, res) {
var method = req.method
if (method === 'GET') {
var fileName = path.resolve(__dirname, 'data.txt')
var stream = fs.createReadStream(fileName)
stream.pipe(res)
}
})
server.listen(8000)

使用 stream 对性能的提升#

### 实际应用

对 response 使用 stream 特性能提高性能。因此,在 nodejs 中如果要返回的数据是经过 IO 操作得来的,例如上面例子中读取文件内容,可以直接使用 stream.pipe(res)这种方式,而不再使用 res.end(data)了。

这种应用的实例很多,主要有两种场景:

  • 使用 node.js 作为服务代理,即客户端通过 node.js 服务作为跳板去请求其他服务,返回请求的内容
  • 使用 node.js 作为静态文件服务器,直接返回静态文件

总结#

本节主要讲解了 node.js 如何处理 http 的 get 请求,以及如何对 response 使用 stream 特性,并做了压力测试证明可以提高性能。

在 http post 请求中使用 stream#

var http = require('http')
var path = require('path')
var fs = require('fs')
var server = http.createServer(function (req, res) {
var method = req.method
if (method === 'POST') {
req.on('data', function (chunk) {
// 接受到部分数据
console.log('chunk', chunk.toString().length)
})
req.on('end', function () {
console.log('end')
res.end('ok')
})
}
})
server.listen(8000)

post 请求发送数据量若很大, res.on(‘data’, …) 要分多次才能把数据接受完毕

小结一下,request 和 response 一样,本身也是一个 stream 对象,可以用 stream 的特性,那肯定也能提高性能。两者的区别在于,request 是 source 类型,是 stream 的源头,而 response 是 dest 类型,是 stream 的目的地。

再举个例子,如果要把 request 请求的数据直接 response,那么最快的方式就是 res.pipe(res)

使用 stream 对性能的提升#

var http = require('http')
var path = require('path')
var fs = require('fs')
var server = http.createServer(function (req, res) {
var method = req.method
if (method === 'POST') {
var dataStr = ''
req.on('data', function (chunk) {
var chunkStr = chunk.toString()
dataStr += chunkStr
})
res.on('end', function () {
var fileName = path.resolve(__dirname, 'data.txt')
fs.writeFile(fileName, dataStr)
res.end('ok')
})
}
})
server.listen(8000)

用 stream 改良后如下:

var server = http.createServer(function (req, res) {
var method = req.method
if (method === 'POST') {
var dataStr = ''
req.on('data', function (chunk) {
var chunkStr = chunk.toString()
dataStr += chunkStr
})
res.on('end', function () {
var fileName = path.resolve(__dirname, 'data.txt')
var writeStream = fs.createWriteStream(fileName)
res.pipe(writeStream)
req.on('end', function () {
res.end('ok')
})
})
}
})

实际应用#

和 get 请求使用 stream 场景类似,post 请求使用 stream 的场景,主要是用于将接受的数据直接进行 IO 操作,例如:

  • 将接收的数据直接存储为文件
  • 将接收的数据直接 post 给其他的 web server

小结#

介绍了 stream 在 http 请求中的应用和性能提升,IO 操作不仅仅包括网络 IO,还包括文件 IO,下一节讲解 stream 在文件操作中的使用,以及性能提升。


node.js 读写文件#

var fs = require('fs')
var path = require('path')
var fileName = path.resolve(__dirname, 'data.txt')
// 读文件
fs.readFile(fileName, function (err, data) {
if (err) return
console.log(data.toString())
})
// 写文件
fs.writeFile(fileName, 'xxx', function (err) {
if (err) return
console.log('写入成功')
})

根据以上读写操作,可以做一个简单的文件拷贝程序,将 data.txt 中的内容拷贝到 data-bak.txt 中

var fs = require('fs')
var path = require('path')
// 读文件
var fileName1 = path.resolve(__dirname, 'data.txt')
fs.readFile(fileName1, function (err, data) {
if (err) return
var dataStr = data.toString()
// 写入文件
var fileName2 = path.resolve(__dirname, 'data-bak.txt')
fs.writeFile(fileName2, dataStr, function (err) {
if (err) return
console.log('拷贝文件成功')
})
})

使用 stream 读写文件#

  • 使用 fs.cretaeReadStream(filename) 来创建读取文件的 stream 对象
  • 使用 fs.createWriteStream(filename) 来创建写入文件的 stream 对象
var fs = require('fs')
var path = require('path')
var filename1 = path.resolve(__dirname, 'data.txt')
var filename2 = path.resolve(__dirname, 'data-bak.txt')
var readStream = fs.createReadStream(filename1)
var writeStream = fs.createWriteStream(filename2)
readStream.pipe(writeStream)
readStream.on('end', function () {
console.log('拷贝完成')
})

使用 stream 带来的性能提升#

应用场景#

所有执行文件操作的场景,都应该尝试使用 stream,例如文件的读写、拷贝、解压缩、格式转换等。除非是体积小且读写次数少,性能上可忽略。


原生的 stream 对“行”无能为力,它只是把文件当作一个数据流,简单粗暴地流动。很多文件格式都是分行的,例如 csv 文件、日志文件等

node.js 提供了按行读取 API——readline,它本质上也是 stream,只不过是以“行”作为数据流动的单位

readline 的使用#

相比于 stream 的 data 和 end 自定义事件,readline 需要监听 line 和 close 两个自定义事件。readline 的基本使用示例如下:

var fs = require('fs')
var path = requrie('path')
var readline = require('readline')
var fileName = path.resolve(__dirname, 'readline-data.txt')
// 创建读取文件的stream对象
var readStream = fs.createReadStream(fileName)
// 创建readline对象
var rl = readline.createInterface({
// 输入,依赖于stream对象
input: readStream,
})
// 监听逐行读取的内容
rl.on('line', function (lineData) {
console.log('lineData: ', lineData)
})
// 监听读取完成
rl.on('close', function () {
console.log('readline end')
})

以上代码,需要先根据文件名,创建读取文件的 stream 对象,然后传入并生成一个 readline 对象,然后通过 line 事件监听逐行读取,通过 close 事件监听读取完成。

应用场景#

对于处理按行为单位的文件,如日志文件,使用 readline 是最佳选择。下面是一个实际例子,用来记录访问数:

var num = 0
// 监听逐行读取的内容
rl.on('line', function (lineData) {
if (
lineData.indexOf('2018-10-30 14:00') >= 0 &&
lineData.indexOf('user.html') >= 0
) {
num++
}
})
// 监听读取完成
rl.on('close', function () {
console.log('num: ', num)
})

最后,将这个示例所用的代码贴到下面,供学习参考:

var fs = require('fs')
var path = require('path')
var memeye = require('memeye')
var readline = require('readline')
memeye()
function doReadLine() {
var fileName = path.resolve(__dirname, 'readline-data.txt')
var readStream = fs.createReadStream(fileName)
var rl = readline.createInterface({
input: readStream,
})
var num = 0
rl.on('line', function (lineData) {
if (
lineData.indexOf('2018-10-30 14:00') >= 0 &&
lineData.indexOf('user.html') >= 0
) {
num++
}
})
// 监听读取完成
rl.on('close', function () {
console.log('num: ', num)
})
}
setTimeout(doReadLine, 5000)

stream 就是数据一点一点地流动起来,那么每次流动的数据是什么呢?

var readStream = fs.createReadStream('./file.txt')
readStream.on('data', function (chunk) {
// ...
})

二进制#

冯诺依曼结构,其核心内容之一就是:计算机使用二进制形式存储计算。

计算机内存由若干个存储单元组成,每个存储单元只能存储 0 或 1(因为内存是硬件,计算机硬件本质上就是一个一个的电子元件,只能识别充电和放电的状态,充电代表 1,放电代表 0),即二进制单元(bit)。但是这一个单元所能存储的信息太少,因此约定将 8 个二进制单元为一个基本存储单元,叫做字节(byte)。一个字节所能存储的最大整数就是(2^8 = 256),也正好是 16^2,因此也常常使用两位的 16 进制数代表一个字节。例如 css 常见的颜色值就是 6 位 16 进制数字,它占用 3 个字节的空间。

二进制是计算机最底层的数据格式,也是一种通用格式。计算机中的任何数据格式,字符串、数字、视频、音频、程序、网络包等,在最底层都是用二进制来进行存储的。这些高级格式和二进制之间,都可通过固定的编码格式进行相互转换。例如,C 语言中 int32 类型的十进制数,就占用 32bit 即 4byte。总之,计算机底层存储的数据都是二进制格式,各种高级类型都有对应的编码规则,和二进制进行相互转化。

nodejs 表示二进制#

Buffer 就是 nodejs 中二进制的表述形式。

var str = '学习nodejs stream'
var buf = Buffer.from(str, 'utf-8')

以上代码,先通过 Buffer.from 将一段字符串转化为二进制形式,其中 utf-8 是一个编码规则。二进制打印出来之后是一个类数组的对象,每个元素都是两位的 16 进制数字,即代表一个 byte,打印出来的 buf 一共有 20byte。即根据 utf-8 的编码规则,这段字符串需要 20byte 进行存储,最后再通过 utf-8 规则将二进制转换为字符串并打印出来

流动的数据是二进制格式#

var readStrem = fs.createReadStream('./file.txt')
readStream.on('data', function (chunk) {
console.log(chunk instanceof Buffer)
console.log('chunk: ', chunk)
})

可以看到 stream 中流动的数据就是 Buffer 类型。因此,在使用 stream chunk 时,需要将这些二进制数据转换为相应的格式。例如之前讲解 post 请求是,从 request 中接收数据就是这样。再回归一下之前的代码,就能明白了。

var dataStr = ''
req.on('data', function (chunk) {
// 将二进制数据先转化为字符串
var chunkStr = chunk.toString()
dataStr += chunkStr
})

stream 中为何要“流动”二进制格式的数据呢?

为了优化 IO 操作。无论是文件 IO 还是网络 IO,其中包含的数据格式是未知的,如字符串、音频、视频、网络包等。即便这些字符串,其编码规则也是未知的,如 ASC 编码、utf-8 编码。再这么多未可知的情况下,只能是以不变应万变,直接用最通过的二进制格式,谁都能认识,且二进制格式进行流动和传输,效率是最高的。

Buffer 带来的性能提升#

总结#

  • 二进制和字节的基本认识
  • node.js 中 Buffer 表示二进制
  • stream 中的 chunk 是二进制格式,以及和字符串格式的转换
  • 二进制格式在 http 请求中的性能提升

stream 常用类型总结#

再次回顾这张图 source 通过一个管道流向了 dest,如下图:

这里的 source 可能是 http 请求中的 request,也可能是读取文件的 stream 对象,也可能是 process.stdin;这里的 dest 可能是请求中的 response,也可能是写入文件的 stream 对象,也可能是 process.stdout;这里的管道就是 pipe 函数。

先不管 pipe 函数。source 和 dest 完全就是两个不同的类型,一个是读取数据的,叫做 readable stream,一个是写入数据的,叫作 Writeable stream。除了这两种类型之外,还有一种类型叫作 duplex stream(双工流),即有读取又有写入能力。示例代码如下:

var fs = require('fs')
var zlib = require('zlib')
var readStream = fs.createReadStream('./file.txt')
var writeStream = fs.createWriteStream('./file.txt')
readStream.pipe(zlib.createGzip()).pipe(writeStream)

readable stream#

http 请求中的 request 和读取文件的 stream 对象都是 readable stream。它有两种常用操作方式,第一种是直接将数据 pipe 到一个 Writeable stream

var fileName = path.resolve(__dirname, 'data.txt')
var stream = fs.createReadStream(fileName)
//
stream.pipe(res)

第二种是通过监听 on end 自定义事件来获取数据再手动处理,例如之前讲解 post 请求时的代码示例

var dataStr = ''
req.on('data', function (chunk) {
// 接收到数据先存储起来
var chunkStr = chunk.toString()
dataStr += chunkStr
})
req.on('end', function () {
// 接收完数据将数据写入文件
var fileName = path.resolve(__dirname, 'post.txt')
fs.writeFile(fileName, dataStr)
res.end('OK')
})

以上说的两个例子,都是已经分装好的 readable stream,那么它本来的面目是怎样的?如下代码:

var Readable = require('stream').Readable
// 构造一个readable stream并往里添数据
var rs = new Readable()
rs.push('learn')
rs.push('nodejs')
rs.push('stream')
rs.push(null)
// pipe到一个Writeable stream
rs.pipe(process.stdout)

从上代码可看出,nodejs 提供了 readable stream 的构造函数,可以 new 出一个新的 readable stream 对象。然后通过 push 函数往里灌入完成,即可输入了。最后 pipe 到了一个 Writeable stream

Writeable stream#

根据之前的分析,http 请求中的 response 和写入文件的 stream 对象都是 Writeable stream,它可以作为参数传入 pipe 函数,以读取上游的数据。例如之前讲解文件操作时拷贝文件的代码示例。

var fileName = path.resolve(__dirname, 'post.txt')
var writeStream = fs.createWriteStream(fileName)
res.pipe(writeStream)

以上代码中 writeStream 是已经封装好了的 Writeable stream ,下面再来看看它的真实面目。

var Writeable = require('stream').Writeable
var ws = Writeable()
ws._write = function (chunk, enc, next) {
// 输出流动的数据
console.log(chunk.toString())
// 继续监听下一次输出
next()
}
// 作为参数传递到pipe函数中
process.stdin.pipe(ws)

根据以上代码得知,nodejs 提供了 Writeable 构造函数可以 new 一个新的 Writeable stream。通过实现它的_write 方法即可监听到每次流动的数据,运行 next()可继续监听.

再谈 pipe#

之前一直是用source.pipe(dest)这种模式来用 pipe 的,其实 pipe 可以链式调用。例如上文演示的 duplex stream 示例代码readStream.pipe(zlib.createGzip()).pipe(writeStream),还有之前讲解文件操作最后列举的 gulp 配置文件~

之前讲解source.pipe(dest)模式是为了方便理解和使用,现在我们更新一个更严谨的 pipe 用法:

  • 调用 pipe 的对象必须是 readable stream 或者 duplex stream,即具有读取数据的功能,如 req.pipe(…)
  • 传入 pipe 的参数必须是 writeable stream 或者 duplex stream, 即具有写入数据的功能,如 req.pipe(res)
  • pipe 支持链式调用

更新了 pipe 的最新规则,再来看就不会有困惑了。

小结#

这里主要讲解了 stream 的常用类型和 pipe 函数的规则:

  • stream 的常见类型:readable stream 和 writeable stream
  • readable stream 的本质和用法
  • writeable stream 的本质和用法
  • pipe 的新规则

支持与分享

如果这篇文章对你有帮助,欢迎分享给更多人或赞助支持!

【Node.js】深入浅出 stream 模块
https://blog.fridolph.top/posts/2018-11-15__node-stream/
作者
Fridolph
发布于
2018-11-15
许可协议
CC BY-NC-SA 4.0

评论区

Profile Image of the Author
Fridolph
热爱 Coding、音乐和羽毛球的 90 后全栈工程师
公告
欢迎访问我的小站 ^_^ 我是昇哥,热爱Coding,喜爱音乐、羽毛球和摄影的 90后全栈工程师
分类
标签

文章目录