使用 Node.js 中的流获取数百万行数据

2025-05-28

使用 Node.js 中的流获取数百万行数据

您是否曾经面临过这样的挑战:从数据库中提取几百万行包含大量列的数据并将其显示在网络上?

嗯,最近我不得不这么做。虽然我不确定这是否有意义,但我还是不得不这么做。

无论如何,这是解决方案。

但首先,技术栈:Node.js、Sequelize 和 MariaDB。客户端并不重要,因为有时数据量接近 4 GB,所以 Chrome 最终还是崩溃了。

Sequelize 是第一个大问题。

获取如此大数据结果的解决方案是流式传输。通过一次大调用接收这些数据会导致 Node 崩溃。因此,流式传输才是解决方案,而 Sequelize 当时还不支持流式传输。

好吧,我可以手动将流添加到 Sequelize,但服务调用最终需要很长时间。

以下是通常的 Sequelize 调用的片段:



await sequelize.authenticate();
const result = await sequelize.query(sql, { type: sequelize.QueryTypes.SELECT });
res.send(result);


Enter fullscreen mode Exit fullscreen mode

就是这样。当然,还有一些部分缺失。比如完整的数据库配置和调用的具体定义get()(例如,它res来自哪里?)。但我想你已经明白了。

运行这段代码,结果很简单:Node 崩溃了。--max-old-space-size=8000例如,你可以使用 为 Node 分配更多内存,但这并非真正的解决方案

如前所述,你可以强制Sequelize流式传输数据。现在,它看起来怎么样?



var Readable = stream.Readable;
var i = 1;
var s = new Readable({
    async read(size) {
        const result = await sequelize.query(
            sql + ` LIMIT 1000000 OFFSET ${(i - 1) * 1000000}`, { type: sequelize.QueryTypes.SELECT });
        this.push(JSON.stringify(result));
        i++;
        if (i === 5) {
            this.push(null);
        }
    }
});
s.pipe(res);


Enter fullscreen mode Exit fullscreen mode

在这个例子中,我知道从数据库返回的行数,因此有一行if (i === 5)。这只是一个测试。您必须发送null才能结束流。当然,您可以count先获取整个结果的,然后相应地修改代码。

其背后的核心思想是减少数据库调用,并借助流返回数据块。这种方法有效,Node 不会崩溃,但仍然耗时较长——3.5 GB 的数据需要近 10 分钟。

使用 Sequelize 进行流式传输

还有什么选择?

MariaDB Node.js 连接

通常的查询是这样的:



const mariadb = require('mariadb');
const pool = mariadb.createPool({ host: "HOST", user: "USER", password: "PASSWORD", port: 3308, database: "DATABASE", connectionLimit: 5 });
let conn = await pool.getConnection();
const result = await conn.query(sql);
res.send(result);


Enter fullscreen mode Exit fullscreen mode

它确实快多了。不过我还是直接跳到流代码吧:



let conn = await pool.getConnection();
const queryStream = conn.queryStream(sql);
const ps = new stream.PassThrough();
const transformStream = new stream.Transform({
    objectMode: true,
    transform: function transformer(chunk, encoding, callback) {
        callback(null, JSON.stringify(chunk));
    }
});
stream.pipeline(
    queryStream,
    transformStream,
    ps,
    (err) => {
        if (err) {
            console.log(err)
            return res.sendStatus(400);
        }
    })
ps.pipe(res);


Enter fullscreen mode Exit fullscreen mode

这看起来可能有点神秘,但实际上,你创建了一个管道来传递数据。首先,queryStream是数据库查询的结果。然后,是transformStream发送字符串化的块(这里只允许字符串和缓冲区,因此需要将对象字符串化)。最后是PassThrough函数和一个用于处理错误情况的函数。

ps.pipe(res)结果传输给客户端。

结果如下:
使用 MariaDB Node.js 连接器进行流式传输

对于相同的数据,只需 4 分钟,您甚至不会注意到 Node 需要一点 RAM。

因此,如果您面临类似的任务,请考虑流式传输数据。

或者你说服你的客户,这种要求对于网络来说是不现实的。

PS:分页功能不行。我们需要一次性获取全部数据。

图片由 brgfx 在freepik.com上创建。


但请稍等,还有更多!

文章来源:https://dev.to/_patrickgod/fetching-millions-of-rows-with-streams-in-node-js-487e
PREV
早晨例行活动的奇迹
NEXT
101 个 React 技巧和窍门,适合初学者和专家✨