使用 NodeJS Stream API 和 Typescript 从 AWS S3 流式传输文件

2025-06-08

使用 NodeJS Stream API 和 Typescript 从 AWS S3 流式传输文件

查看新的 npm 包!

这个简单的解决方案现在变得更简单了!我还添加了一些升级功能,比如可以调整中途范围的大小!现在你可以随意加速或减速了!

https://www.npmjs.com/package/s3-readstream

您可以使用此包作为替代品AWS.S3.getObject().createReadStream()

原始博客

AWS S3 SDK 和 NodeJS 的读写流可以轻松地从 AWS 存储桶下载文件。但是,如果您想以流式传输文件,该怎么办?

在我们开始之前

我假设您已经使用 AWS S3 SDK 成功下载文件,现在想要将该功能转换为合适的流。因此,我将省略 AWS 的具体实现,而是通过一个简单的示例来说明如何以及在何处实例化这个“智能流”类。

我还假设您对 NodeJS 和 NodeJS 读/写流有(基本的)了解。

白日梦

我知道这个双关语很糟糕,但这是文章中唯一的双关语,所以请跟我一起努力。

在实现流时,您可能遇到的第一个解决方案(也是我决定写这篇文章的原因)是简单地从 S3 实例中取出创建的读取流,然后将其插入到需要的位置。流媒体大爆发!

...没那么快。

连接到 AWS S3 实例的超时设置为 120000 毫秒(2 分钟)。除非文件非常小,否则这不足以进行流式传输。

一种选择是直接增加超时时间,但应该增加多少呢?由于超时时间指的是连接能够持续的总时间,因此你要么将超时时间设置得离谱,要么猜测文件流式传输需要多长时间,并相应地更新超时时间。这还没有考虑由于 HTTP(S) 自身的超时原因导致的流关闭。

逐字节

对不起。它就在那儿……我也一样!

超时并非唯一会造成问题的原因,延迟也同样如此。你无法确定你的直播在进行过程中是否会变得非常缓慢,而且每个人都讨厌等待缓冲(如果你选择进行视频直播的话)。虽然这个问题无法彻底解决,但你可以尝试让自己轻松很多。

与其简单地连接软管并喂饱这头野兽,不如使用“智能流”,在单个请求中获取一系列数据。按需获取数据可以帮助您避免延迟,同时还能避免令人讨厌的超时。

智能流媒体

我们的想法是创建一个流,利用 AWS S3 的强大功能,
通过单个请求抓取一系列数据。然后,我们可以通过新的请求抓取另一段数据,以此类推。当缓冲区已满时,此流将暂停,仅在需要时才请求新数据。这样,我们就可以充分利用处理数据所需的所有时间(或者在视频播放过程中暂停,去洗手间)。处理完成后(洗手完毕),它会从上次中断的地方继续播放,节目继续播放。

戴上你最深色的眼镜,你就进来了

我们可以利用 NodeJS Stream API 并创建我们自己的自定义可读流,而不必进行猜测和对抗随机错误。

我们将从创建“智能流”类开始:

import {Readable, ReadableOptions} from 'stream';
import type {S3} from 'aws-sdk';

export class SmartStream extends Readable {
    _currentCursorPosition = 0; // Holds the current starting position for our range queries
    _s3DataRange = 64 * 1024; // Amount of bytes to grab
    _maxContentLength: number; // Total number of bites in the file
    _s3: S3; // AWS.S3 instance
    _s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method

    constructor(
        parameters: S3.GetObjectRequest,
        s3: S3,
        maxLength: number,
        // You can pass any ReadableStream options to the NodeJS Readable super class here
        // For this example we wont use this, however I left it in to be more robust
        nodeReadableStreamOptions?: ReadableOptions
    ) {
        super(nodeReadableStreamOptions);
        this._maxContentLength = maxLength;
        this._s3 = s3;
        this._s3StreamParams = parameters;
    }

    _read() {
        if (this._currentCursorPosition > this._maxContentLength) {
            // If the current position is greater than the amount of bytes in the file
            // We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
            this.push(null);
        } else {
            // Calculate the range of bytes we want to grab
            const range = this._currentCursorPosition + this._s3DataRange;
            // If the range is greater than the total number of bytes in the file
            // We adjust the range to grab the remaining bytes of data
            const adjustedRange = range < this._maxContentLength ? range : this._maxContentLength;
            // Set the Range property on our s3 stream parameters
            this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
            // Update the current range beginning for the next go 
            this._currentCursorPosition = adjustedRange + 1;
            // Grab the range of bytes from the file
            this._s3.getObject(this._s3StreamParams, (error, data) => {
                if (error) {
                    // If we encounter an error grabbing the bytes
                    // We destroy the stream, NodeJS ReadableStream will emit the 'error' event
                    this.destroy(error);
                } else {
                    // We push the data into the stream buffer
                    this.push(data.Body);
                }
            });
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

让我们稍微分析一下

我们正在扩展 NodeJS Stream API 中的 Readable 类,以添加一些实现“智能流”所需的功能。我在一些属性前添加了下划线 (_),以区分我们自定义的实现和 Readable 超类中开箱即用的功能。

Readable 类有一个缓冲区,我们可以将数据推送到其中。一旦这个缓冲区满了,我们就不再向 AWS S3 实例请求更多数据,而是将数据推送到另一个流(或者任何我们希望数据到达的地方)。当缓冲区有空间时,我们会再次发出请求来获取一定范围的数据。重复此操作,直到读取完整个文件。

这个简单实现的优点在于,你可以访问 NodeJS readStream 的所有事件监听器和功能。你甚至可以将此流导入到“gzip”文件中,从而实现压缩文件的流式传输!

现在我们已经编写了 SmartStream 类的代码,我们准备将其连接到程序中。

使用 AWS S3 实现

对于下一部分,我假设您了解 AWS s3 SDK,我将简单地提供一个如何建立流的示例。

import {SmartStream} from <Path to SmartStream file>;

export async function createAWSStream(): Promise<SmartStream> {
    return new Promise((resolve, reject) => {
        const bucketParams = {
            Bucket: <Your Bucket>,
            Key: <Your Key>
        }

        try {
            const s3 = resolveS3Instance();

            s3.headObject(bucketParams, (error, data) => {
                if (error) {
                    throw error;
                }
                // After getting the data we want from the call to s3.headObject
                // We have everything we need to instantiate our SmartStream class
                // If you want to pass ReadableOptions to the Readable class, you pass the object as the fourth parameter
                const stream = new SmartStream(bucketParams, s3, data.ContentLength);

                resolve(stream);
            });
        } catch (error) {
            reject(error)
        }
    });
}
Enter fullscreen mode Exit fullscreen mode

您可以在我的 github 上的高清视频流应用程序中查看这一点!

最简单的高清视频流应用程序

感谢您的阅读!如果您喜欢这篇博客,请在下方评论区留言!

进一步阅读

这只是使用 NodeJS 标准 Stream API 实现的众多精彩功能之一。更多内容,请参阅NodeJS Stream API 文档

鏂囩珷鏉ユ簮锛�https://dev.to/about14sheep/streaming-data-from-aws-s3-using-nodejs-stream-api-and-typescript-3dj0
PREV
如何准备 DevOps 和 SRE 面试?
NEXT
ReactJS 滚动时导航栏粘性