使用 NodeJS Stream API 和 Typescript 从 AWS S3 流式传输文件
查看新的 npm 包!
这个简单的解决方案现在变得更简单了!我还添加了一些升级功能,比如可以调整中途范围的大小!现在你可以随意加速或减速了!
https://www.npmjs.com/package/s3-readstream
您可以使用此包作为替代品AWS.S3.getObject().createReadStream()
!
原始博客
AWS S3 SDK 和 NodeJS 的读写流可以轻松地从 AWS 存储桶下载文件。但是,如果您想以流式传输文件,该怎么办?
在我们开始之前
我假设您已经使用 AWS S3 SDK 成功下载文件,现在想要将该功能转换为合适的流。因此,我将省略 AWS 的具体实现,而是通过一个简单的示例来说明如何以及在何处实例化这个“智能流”类。
我还假设您对 NodeJS 和 NodeJS 读/写流有(基本的)了解。
白日梦
我知道这个双关语很糟糕,但这是文章中唯一的双关语,所以请跟我一起努力。
在实现流时,您可能遇到的第一个解决方案(也是我决定写这篇文章的原因)是简单地从 S3 实例中取出创建的读取流,然后将其插入到需要的位置。流媒体大爆发!
...没那么快。
连接到 AWS S3 实例的超时设置为 120000 毫秒(2 分钟)。除非文件非常小,否则这不足以进行流式传输。
一种选择是直接增加超时时间,但应该增加多少呢?由于超时时间指的是连接能够持续的总时间,因此你要么将超时时间设置得离谱,要么猜测文件流式传输需要多长时间,并相应地更新超时时间。这还没有考虑由于 HTTP(S) 自身的超时原因导致的流关闭。
逐字节
对不起。它就在那儿……我也一样!
超时并非唯一会造成问题的原因,延迟也同样如此。你无法确定你的直播在进行过程中是否会变得非常缓慢,而且每个人都讨厌等待缓冲(如果你选择进行视频直播的话)。虽然这个问题无法彻底解决,但你可以尝试让自己轻松很多。
与其简单地连接软管并喂饱这头野兽,不如使用“智能流”,在单个请求中获取一系列数据。按需获取数据可以帮助您避免延迟,同时还能避免令人讨厌的超时。
智能流媒体
我们的想法是创建一个流,利用 AWS S3 的强大功能,
通过单个请求抓取一系列数据。然后,我们可以通过新的请求抓取另一段数据,以此类推。当缓冲区已满时,此流将暂停,仅在需要时才请求新数据。这样,我们就可以充分利用处理数据所需的所有时间(或者在视频播放过程中暂停,去洗手间)。处理完成后(洗手完毕),它会从上次中断的地方继续播放,节目继续播放。
戴上你最深色的眼镜,你就进来了!
我们可以利用 NodeJS Stream API 并创建我们自己的自定义可读流,而不必进行猜测和对抗随机错误。
我们将从创建“智能流”类开始:
import {Readable, ReadableOptions} from 'stream';
import type {S3} from 'aws-sdk';
export class SmartStream extends Readable {
_currentCursorPosition = 0; // Holds the current starting position for our range queries
_s3DataRange = 64 * 1024; // Amount of bytes to grab
_maxContentLength: number; // Total number of bites in the file
_s3: S3; // AWS.S3 instance
_s3StreamParams: S3.GetObjectRequest; // Parameters passed into s3.getObject method
constructor(
parameters: S3.GetObjectRequest,
s3: S3,
maxLength: number,
// You can pass any ReadableStream options to the NodeJS Readable super class here
// For this example we wont use this, however I left it in to be more robust
nodeReadableStreamOptions?: ReadableOptions
) {
super(nodeReadableStreamOptions);
this._maxContentLength = maxLength;
this._s3 = s3;
this._s3StreamParams = parameters;
}
_read() {
if (this._currentCursorPosition > this._maxContentLength) {
// If the current position is greater than the amount of bytes in the file
// We push null into the buffer, NodeJS ReadableStream will see this as the end of file (EOF) and emit the 'end' event
this.push(null);
} else {
// Calculate the range of bytes we want to grab
const range = this._currentCursorPosition + this._s3DataRange;
// If the range is greater than the total number of bytes in the file
// We adjust the range to grab the remaining bytes of data
const adjustedRange = range < this._maxContentLength ? range : this._maxContentLength;
// Set the Range property on our s3 stream parameters
this._s3StreamParams.Range = `bytes=${this._currentCursorPosition}-${adjustedRange}`;
// Update the current range beginning for the next go
this._currentCursorPosition = adjustedRange + 1;
// Grab the range of bytes from the file
this._s3.getObject(this._s3StreamParams, (error, data) => {
if (error) {
// If we encounter an error grabbing the bytes
// We destroy the stream, NodeJS ReadableStream will emit the 'error' event
this.destroy(error);
} else {
// We push the data into the stream buffer
this.push(data.Body);
}
});
}
}
}
让我们稍微分析一下
我们正在扩展 NodeJS Stream API 中的 Readable 类,以添加一些实现“智能流”所需的功能。我在一些属性前添加了下划线 (_),以区分我们自定义的实现和 Readable 超类中开箱即用的功能。
Readable 类有一个缓冲区,我们可以将数据推送到其中。一旦这个缓冲区满了,我们就不再向 AWS S3 实例请求更多数据,而是将数据推送到另一个流(或者任何我们希望数据到达的地方)。当缓冲区有空间时,我们会再次发出请求来获取一定范围的数据。重复此操作,直到读取完整个文件。
这个简单实现的优点在于,你可以访问 NodeJS readStream 的所有事件监听器和功能。你甚至可以将此流导入到“gzip”文件中,从而实现压缩文件的流式传输!
现在我们已经编写了 SmartStream 类的代码,我们准备将其连接到程序中。
使用 AWS S3 实现
对于下一部分,我假设您了解 AWS s3 SDK,我将简单地提供一个如何建立流的示例。
import {SmartStream} from <Path to SmartStream file>;
export async function createAWSStream(): Promise<SmartStream> {
return new Promise((resolve, reject) => {
const bucketParams = {
Bucket: <Your Bucket>,
Key: <Your Key>
}
try {
const s3 = resolveS3Instance();
s3.headObject(bucketParams, (error, data) => {
if (error) {
throw error;
}
// After getting the data we want from the call to s3.headObject
// We have everything we need to instantiate our SmartStream class
// If you want to pass ReadableOptions to the Readable class, you pass the object as the fourth parameter
const stream = new SmartStream(bucketParams, s3, data.ContentLength);
resolve(stream);
});
} catch (error) {
reject(error)
}
});
}
您可以在我的 github 上的高清视频流应用程序中查看这一点!
感谢您的阅读!如果您喜欢这篇博客,请在下方评论区留言!
进一步阅读
这只是使用 NodeJS 标准 Stream API 实现的众多精彩功能之一。更多内容,请参阅NodeJS Stream API 文档!
鏂囩珷鏉ユ簮锛�https://dev.to/about14sheep/streaming-data-from-aws-s3-using-nodejs-stream-api-and-typescript-3dj0