使用客户端流式 gRPC 分块上传文件 - Go
欢迎回到 gRPC 课程!
回顾一下,gRPC 有4 种类型。在前面的课程中,我们学习了如何在 Golang 中实现一元 RPC和服务器流式 RPC 。
今天我们将学习如何实现和测试 gRPC 的第三种类型,即客户端流式传输。具体来说,我们将构建一个 API,将图像文件分多个块上传到服务器。
这是Youtube 上完整 gRPC 课程播放列表的链接
Github 存储库:pcbook-go和pcbook-java
Gitlab 存储库:pcbook-go和pcbook-java
1. 在proto文件中定义客户端流式RPC
让我们在文件中定义新的 RPC laptop_service.proto
。
首先,我们需要一条UploadImageRequest
消息:
message UploadImageRequest {
oneof data {
ImageInfo info = 1;
bytes chunk_data = 2;
};
}
其思路是将图像文件分成多个块,并在每次请求消息中逐个发送给服务器。我oneof
在这里使用字段是因为第一个请求只包含元数据,也就是图像的一些基本信息。之后的请求将包含图像数据块。
该ImageInfo
消息将有 2 个字符串字段:笔记本电脑 ID 和图像类型,例如“.jpg”或“.png”。
message ImageInfo {
string laptop_id = 1;
string image_type = 2;
}
然后我们定义一条UploadImageResponse
消息,一旦服务器接收到图像的所有块,该消息将返回给客户端:
message UploadImageResponse {
string id = 1;
uint32 size = 2;
}
该消息包含服务器生成的图像的 ID,以及上传图像的总大小(以字节为单位)。
好的,现在我们定义UploadImage
RPC。这是一个客户端流式 RPC,因此它接受一个 流UploadImageRequest
作为输入,并返回一个UploadImageResponse
。
service LaptopService {
...
rpc UploadImage(stream UploadImageRequest) returns (UploadImageResponse) {};
}
好了,现在让我们运行make
gen 来生成代码。
代码成功生成后,我们会看到代码中存在一些错误,因为笔记本电脑服务器还没有实现接口UploadImage()
所需的方法LaptopServiceServer
。
我们实施服务器后将会修复这些错误。
2.实现服务器
打开laptop_server.go
文件,将UploadImage()
函数添加到结构体中LaptopServer
。我们可以在生成的文件中轻松找到它的签名laptop_service.pb.go
。只需将其复制粘贴到laptop_server.go
文件中即可:
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
return nil
}
现在我们先返回nil
。实现图像存储后,我们再回来讨论这个函数。
2.1 实现图像存储
存储的作用是将上传的图像文件保存在服务器或云端的某个位置。为了使其更加通用,并易于切换到不同类型的存储,我们将其定义ImageStore
为一个接口。
type ImageStore interface {
Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error)
}
它有一个函数用于保存笔记本电脑的图片,该函数接受三个输入参数:笔记本电脑 ID、图片类型以及由字节缓冲区提供的图像数据。它会返回已保存图片的 ID,否则返回错误。
接下来我们将用 实现这个接口DiskImageStore
,它将图像文件保存到磁盘,并将其信息存储在内存中。
type DiskImageStore struct {
mutex sync.RWMutex
imageFolder string
images map[string]*ImageInfo
}
与笔记本电脑存储类似,我们需要一个互斥锁来处理并发。然后,我们需要保存笔记本电脑图片的文件夹路径。最后,我们需要一个映射,其键是图片 ID,值是图片的一些信息。
ImageInfo 包含 3 个字段:笔记本电脑的 ID、图像的类型(或其文件扩展名:jpg/png)以及磁盘上图像文件的路径。
type ImageInfo struct {
LaptopID string
Type string
Path string
}
让我们编写一个函数来创建一个新的DiskImageStore
。它只有一个输入,即图像文件夹。在函数内部,我们只需要初始化images
地图:
func NewDiskImageStore(imageFolder string) *DiskImageStore {
return &DiskImageStore{
imageFolder: imageFolder,
images: make(map[string]*ImageInfo),
}
}
现在我们必须实现接口Save()
所需的功能ImageStore
。
首先,我们需要为图片生成一个新的随机 UUID。我们通过连接图片文件夹、图片 ID 和图片类型来创建图片的存储路径。
然后我们调用os.Create()
创建文件。然后我们调用imageData.WriteTo()
将图像数据写入创建的文件中
func (store *DiskImageStore) Save(laptopID string, imageType string, imageData bytes.Buffer) (string, error) {
imageID, err := uuid.NewRandom()
if err != nil {
return "", fmt.Errorf("cannot generate image id: %w", err)
}
imagePath := fmt.Sprintf("%s/%s%s", store.imageFolder, imageID, imageType)
file, err := os.Create(imagePath)
if err != nil {
return "", fmt.Errorf("cannot create image file: %w", err)
}
_, err = imageData.WriteTo(file)
if err != nil {
return "", fmt.Errorf("cannot write image to file: %w", err)
}
store.mutex.Lock()
defer store.mutex.Unlock()
store.images[imageID.String()] = &ImageInfo{
LaptopID: laptopID,
Type: imageType,
Path: imagePath,
}
return imageID.String(), nil
}
如果文件写入成功,我们需要将其信息保存到内存映射中。因此,我们必须获取存储的写锁。
然后我们将图片信息保存到map中,key是图片的ID,value包含笔记本电脑ID,图片类型,图片文件的路径。
最后,我们将图片 ID 返回给调用者。就这样,图片存储部分就完成了。现在,让我们返回到服务器。
2.2 实现 UploadImage RPC
我们需要将新的图像存储添加到LaptopServer
结构体中,并将其添加imageStore
为函数的第二个参数NewLaptopServer()
:
type LaptopServer struct {
laptopStore LaptopStore
imageStore ImageStore
}
func NewLaptopServer(laptopStore LaptopStore, imageStore ImageStore) *LaptopServer {
return &LaptopServer{laptopStore, imageStore}
}
然后在main.go
服务器的文件中,我们还需要将两个存储传递给NewLaptopServer
函数:一个是笔记本电脑存储,另一个是图像存储。假设我们将上传的图像保存到“img”文件夹中。
func main() {
...
laptopStore := service.NewInMemoryLaptopStore()
imageStore := service.NewDiskImageStore("img")
laptopServer := service.NewLaptopServer(laptopStore, imageStore)
...
}
现在让我们实现该UploadImage()
功能。
首先,我们调用stream.Recv()
来接收第一个请求,其中包含图片的元数据信息。如果发生错误,我们会写入日志并将状态码返回Unknown
给客户端。
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
req, err := stream.Recv()
if err != nil {
return logError(status.Errorf(codes.Unknown, "cannot receive image info"))
}
...
}
func logError(err error) error {
if err != nil {
log.Print(err)
}
return err
}
为了简洁起见,避免重复,我在这里定义了一个logError()
函数,用于在返回错误之前记录错误。只有当错误不为 nil 时,它才会打印日志,并且始终将错误返回给调用者。
接下来,我们可以从请求中获取笔记本电脑 ID 和图片类型。我们在这里写入一条日志,说明我们已收到包含该笔记本电脑 ID 和图片类型的上传图片请求。
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
laptopID := req.GetInfo().GetLaptopId()
imageType := req.GetInfo().GetImageType()
log.Printf("receive an upload-image request for laptop %s with image type %s", laptopID, imageType)
...
}
在保存笔记本电脑图像之前,我们必须确保笔记本电脑 ID 确实存在。因此,我们调用以下命令server.laptopStore.Find()
通过 ID 查找笔记本电脑。
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
laptop, err := server.laptopStore.Find(laptopID)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot find laptop: %v", err))
}
if laptop == nil {
return logError(status.Errorf(codes.InvalidArgument, "laptop id %s doesn't exist", laptopID))
}
...
}
如果出现错误,只需记录错误信息并返回Internal
状态码即可。否则,如果笔记本电脑为 nil(即未找到),我们将记录错误信息并返回错误状态码InvalidArgument
(或者,您也可以使用NotFound
更精确的 code 表示)。
现在,如果一切顺利,笔记本电脑找到了,我们就可以开始接收图像块数据了。因此,我们需要创建一个新的字节缓冲区来存储这些数据,并创建一个变量来跟踪图像的总大小。
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
imageData := bytes.Buffer{}
imageSize := 0
for {
log.Print("waiting to receive more data")
req, err := stream.Recv()
if err == io.EOF {
log.Print("no more data")
break
}
if err != nil {
return logError(status.Errorf(codes.Unknown, "cannot receive chunk data: %v", err))
}
...
}
...
}
由于我们将从流中接收许多请求,因此我在这里使用了一个 for 循环。在循环内部,我们调用stream.Recv()
来获取请求。
但这次,我们首先检查是否存在错误EOF
。如果存在,则意味着不会再发送任何数据,我们可以安全地中断循环。否则,如果错误仍然不为 nil,我们将错误连同Unknown
状态码一起返回给客户端。
否则,如果没有错误,我们可以从请求中获取块数据。我们使用该len()
函数获取其大小,并将此大小添加到图像总大小中。
假设我们不希望客户端发送太大的图像,因此我们检查图像大小是否大于最大大小,比如说常量定义的 1 MB maxImageSize
(1 MB = 2^20 字节 = 1 << 20 字节)。
const maxImageSize = 1 << 20
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
imageData := bytes.Buffer{}
imageSize := 0
for {
...
chunk := req.GetChunkData()
size := len(chunk)
log.Printf("received a chunk with size: %d", size)
imageSize += size
if imageSize > maxImageSize {
return logError(status.Errorf(codes.InvalidArgument, "image is too large: %d > %d", imageSize, maxImageSize))
}
_, err = imageData.Write(chunk)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
}
}
...
}
现在,如果图像大小大于最大图像大小,我们可以返回一个包含InvalidArgument
状态码的错误,以及一条提示图像过大的消息。否则,我们可以使用该函数将数据块附加到图像数据中。如果发生错误,Write()
还可以记录并返回状态码。Internal
经过 for 循环,我们已经将图像的所有数据收集到了缓冲区中。因此,我们可以调用imageStore.Save()
该方法将图像数据保存到存储区并返回图像 ID:
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
for {
...
}
imageID, err := server.imageStore.Save(laptopID, imageType, imageData)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot save image to the store: %v", err))
}
res := &pb.UploadImageResponse{
Id: imageID,
Size: uint32(imageSize),
}
err = stream.SendAndClose(res)
if err != nil {
return logError(status.Errorf(codes.Unknown, "cannot send response: %v", err))
}
log.Printf("saved image with id: %s, size: %d", imageID, imageSize)
return nil
}
如果图片保存成功,我们将创建一个包含图片 ID 和图片大小的响应对象。然后调用该函数stream.SendAndClose()
将响应发送给客户端。
最后,我们可以写一条日志,表明图像已成功使用此 ID 和大小保存。
这样我们就完成了服务器部分。现在我们来实现客户端。
3.实现客户端
首先,我将稍微重构一下我们在前几节课中编写的代码。让我们将 laptop 添加为该函数的参数createLaptop()
。
func createLaptop(laptopClient pb.LaptopServiceClient, laptop *pb.Laptop) {
...
}
然后创建单独的函数来测试创建笔记本电脑 RPC 和搜索笔记本电脑 RPC:
func testCreateLaptop(laptopClient pb.LaptopServiceClient) {
createLaptop(laptopClient, sample.NewLaptop())
}
func testSearchLaptop(laptopClient pb.LaptopServiceClient) {
...
}
现在我们将编写一个新函数来测试上传图像 RPC,并从主函数中调用它:
func testUploadImage(laptopClient pb.LaptopServiceClient) {
laptop := sample.NewLaptop()
createLaptop(laptopClient, laptop)
uploadImage(laptopClient, laptop.GetId(), "tmp/laptop.jpg")
}
func main() {
serverAddress := flag.String("address", "", "the server address")
flag.Parse()
log.Printf("dial server %s", *serverAddress)
conn, err := grpc.Dial(*serverAddress, grpc.WithInsecure())
if err != nil {
log.Fatal("cannot dial server: ", err)
}
laptopClient := pb.NewLaptopServiceClient(conn)
testUploadImage(laptopClient)
}
在testUploadImage()
函数中,我们首先生成一台随机的笔记本电脑,并调用createLaptop()
函数在服务器上创建它。
然后,我们将编写一个uploadImage()
函数,将该笔记本电脑的图像上传到服务器。该函数有 3 个输入参数:笔记本电脑客户端、笔记本电脑 ID 和笔记本电脑图像的路径。
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
file, err := os.Open(imagePath)
if err != nil {
log.Fatal("cannot open image file: ", err)
}
defer file.Close()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
stream, err := laptopClient.UploadImage(ctx)
if err != nil {
log.Fatal("cannot upload image: ", err)
}
...
}
首先,我们调用os.Open()
此函数打开镜像文件。如果出现错误,我们会写入致命错误日志。否则,我们defer()
随后会使用此函数关闭文件。
然后我们创建一个超时时间为 5 秒的上下文,并laptopClient.UploadImage()
使用该上下文调用该方法。它将返回一个流对象和一个错误。如果错误不为 nil,我们将写入一个致命日志。
否则,我们创建第一个请求,向服务器发送一些图像信息,其中包括笔记本电脑 ID、图像类型或图像文件的扩展名:
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
...
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_Info{
Info: &pb.ImageInfo{
LaptopId: laptopID,
ImageType: filepath.Ext(imagePath),
},
},
}
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send image info to server: ", err, stream.RecvMsg(nil))
}
...
}
之后,我们调用该方法stream.Send()
向服务器发送第一个请求。如果出现错误,则写入致命日志。
否则,我们将创建一个缓冲区读取器,以块为单位读取图像文件的内容。假设每个块为 1 KB,即 1024 字节。我们将在 for 循环中按顺序读取图像数据块:
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
...
reader := bufio.NewReader(file)
buffer := make([]byte, 1024)
for {
n, err := reader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
log.Fatal("cannot read chunk to buffer: ", err)
}
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_ChunkData{
ChunkData: buffer[:n],
},
}
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send chunk to server: ", err)
}
}
...
}
首先,我们调用reader.Read()
来将数据读取到缓冲区。它将返回读取的字节数和一个错误信息。如果错误信息为EOF
,则表示文件已结束,我们只需中断循环即可。否则,如果错误信息不为nil
,则写入一个致命错误日志。
否则,我们将使用块数据创建一个新的请求。请确保该块仅包含缓冲区的前 n 个字节,因为最后一个块可能小于 1024 个字节。
然后我们调用stream.Send()
它将其发送到服务器。如果发生错误,再次写入致命日志。
最后,在 for 循环之后,我们调用stream.CloseAndRecv()
以接收来自服务器的响应:
func uploadImage(laptopClient pb.LaptopServiceClient, laptopID string, imagePath string) {
...
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatal("cannot receive response: ", err)
}
log.Printf("image uploaded with id: %s, size: %d", res.GetId(), res.GetSize())
...
}
如果没有错误,我们会写一个日志说图片上传成功,并且服务器返回这个ID和大小。
就这样,客户端就完成了。让我们把laptop.jpg
文件放到文件夹里tmp
,然后运行服务器和客户端:
我们收到一个错误:无法将数据块发送到服务器:EOF。这个错误信息没什么用,因为它没有告诉我们确切的原因。
但我们知道该消息来自此日志:
...
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send chunk to server: ", err)
}
...
我们得到 EOF 的原因是因为当发生错误时,服务器将关闭流,因此客户端无法向其发送更多数据。
要获取包含 gRPC 状态码的真正错误,我们必须stream.RecvMsg()
使用 nil 参数进行调用。nil 参数的基本含义是,我们不希望收到任何消息,而只想获取函数返回的错误。
...
err = stream.Send(req)
if err != nil {
log.Fatal("cannot send chunk to server: ", err, stream.RecvMsg(nil))
}
...
现在,如果我们重新运行客户端,我们可以看到真正的错误是InvalidArgument
,笔记本电脑不存在。
这是因为笔记本电脑 ID 为空,正如我们在createLaptop()
上一节课的函数中设置的那样:
因此,让我们删除此行并重新运行客户端。
这次成功了。图片上传成功。打开 img 文件夹,可以看到笔记本电脑的图片保存在那里:
出色的!
好的,现在让我们看看如果发生超时会发生什么。假设服务器写入数据的速度非常慢。在这里,我在将数据块写入缓冲区之前休眠了 1 秒。
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
for {
...
// write slowly
time.Sleep(time.Second)
_, err = imageData.Write(chunk)
if err != nil {
return logError(status.Errorf(codes.Internal, "cannot write chunk data: %v", err))
}
}
...
}
我们来尝试一下。
5秒后,我们在服务器上看到了一条错误日志。然而,状态码是Unknown
,并且还包含其他DeadlineExceeded
错误,这不太好。
因此,让我们通过在流上调用接收之前检查服务器端的上下文错误来解决这个问题:
func (server *LaptopServer) UploadImage(stream pb.LaptopService_UploadImageServer) error {
...
for {
err := contextError(stream.Context())
if err != nil {
return err
}
log.Print("waiting to receive more data")
req, err := stream.Recv()
if err == io.EOF {
log.Print("no more data")
break
}
...
}
...
}
我已经从创建笔记本电脑 RPC 中提取了上下文错误检查块,并使其成为一个单独的函数:
func contextError(ctx context.Context) error {
switch ctx.Err() {
case context.Canceled:
return logError(status.Error(codes.Canceled, "request is canceled"))
case context.DeadlineExceeded:
return logError(status.Error(codes.DeadlineExceeded, "deadline is exceeded"))
default:
return nil
}
}
这里我们使用 switch case 使其更简洁,更易于阅读:
- 如果上下文错误
Canceled
,我们会记录它并返回带有状态代码的错误Canceled
。 - 在这种情况下
DeadlineExceeded
,我们做同样的事情,但使用DeadlineExceeded
状态代码。 - 对于默认情况下,我们只返回 nil。
好的,让我们重新运行服务器和客户端。
现在在服务器端,我们看到了更好的错误日志,其中包含了状态码DeadLineExceeded
。完美!
让我们尝试另一种上传图片大于最大允许大小的情况。我将最大文件大小限制从 1 MB 改为 1 KB。
const maxImageSize = 1 << 10
然后重新运行服务器和客户端。
这次我们得到了InvalidArgument
:图像太大。服务器端只接收了 2 个数据块,就打印出了同样的错误日志。所以它成功了!
现在让我们学习如何为这个客户端流式 RPC 编写测试。
4.编写单元测试
本次测试中,我将使用tmp
它来存储图片。首先,我们需要创建一个新的内存笔记本电脑存储,并使用 tmp 文件夹创建一个新的磁盘映像存储。
func TestClientUploadImage(t *testing.T) {
t.Parallel()
testImageFolder := "../tmp"
laptopStore := service.NewInMemoryLaptopStore()
imageStore := service.NewDiskImageStore(testImageFolder)
laptop := sample.NewLaptop()
err := laptopStore.Save(laptop)
require.NoError(t, err)
serverAddress := startTestLaptopServer(t, laptopStore, imageStore, nil)
laptopClient := newTestLaptopClient(t, serverAddress)
...
}
我们生成一台示例笔记本电脑,并将其保存到笔记本电脑商店。然后,我们启动测试服务器并创建一个新的笔记本电脑客户端。
我们要上传的图片是laptop.jpg
tmp 文件夹中的文件。因此,我们打开文件,检查是否有错误,然后延迟关闭它。然后我们调用laptopClient.UploadImage()
来获取流并确保没有错误发生。
func TestClientUploadImage(t *testing.T) {
...
imagePath := fmt.Sprintf("%s/laptop.jpg", testImageFolder)
file, err := os.Open(imagePath)
require.NoError(t, err)
defer file.Close()
stream, err := laptopClient.UploadImage(context.Background())
require.NoError(t, err)
...
}
实际上,测试的其余部分与我们在client/main.go
文件中所做的非常相似。我们发送的第一个请求仅包含笔记本电脑图像的元数据。
func TestClientUploadImage(t *testing.T) {
...
imageType := filepath.Ext(imagePath)
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_Info{
Info: &pb.ImageInfo{
LaptopId: laptop.GetId(),
ImageType: imageType,
},
},
}
err = stream.Send(req)
require.NoError(t, err)
...
}
然后我们使用 for 循环分块发送图像文件的内容:
func TestClientUploadImage(t *testing.T) {
...
reader := bufio.NewReader(file)
buffer := make([]byte, 1024)
size := 0
for {
n, err := reader.Read(buffer)
if err == io.EOF {
break
}
require.NoError(t, err)
size += n
req := &pb.UploadImageRequest{
Data: &pb.UploadImageRequest_ChunkData{
ChunkData: buffer[:n],
},
}
err = stream.Send(req)
require.NoError(t, err)
}
...
我们还想跟踪图像的总大小,所以我size
为此定义了一个变量。每次读取新的数据块时,我们都会将大小加 n。
最后一步,我们调用stream.CloseAndRecv()
以获取服务器的响应,并且我们返回的 ID 不应该是零值,并且返回的图像大小的值应该等于size
。
func TestClientUploadImage(t *testing.T) {
...
res, err := stream.CloseAndRecv()
require.NoError(t, err)
require.NotZero(t, res.GetId())
require.EqualValues(t, size, res.GetSize())
...
}
我们还想检查图片是否保存在服务器上正确的文件夹中。它应该位于测试图片文件夹中,文件名是图片 ID,文件扩展名是图片类型。我们可以使用require.FileExists()
函数来检查。
func TestClientUploadImage(t *testing.T) {
...
savedImagePath := fmt.Sprintf("%s/%s%s", testImageFolder, res.GetId(), imageType)
require.FileExists(t, savedImagePath)
require.NoError(t, os.Remove(savedImagePath))
}
最后我们需要在测试结束时删除该文件。
好的,让我们运行它。
过去了!
今天关于客户端流式 RPC 的讲座就到这里。感谢阅读,我们下篇文章再见!
如果您喜欢这篇文章,请订阅我们的 Youtube 频道并在 Twitter 上关注我们,以便将来获取更多教程。
如果你想加入我目前在 Voodoo 的优秀团队,请查看我们的职位空缺。你可以远程办公,也可以在巴黎/阿姆斯特丹/伦敦/柏林/巴塞罗那现场办公,但需获得签证担保。
鏂囩珷鏉ユ簮锛�https://dev.to/techschoolguru/upload-file-in-chunks-with-client-streaming-grpc-golang-4loc