React、GraphQL、gRPC 和 Reactive Microservices - Appwish 平台中的数据流解释
大家好!在这篇文章中,我将解释如何在Appwish 项目中使用 Vert.x、gRPC 和 GraphQL 为UI 应用程序提供 API。我们将以两个后端微服务graphqlapi和wishservice
为例。
如果你不熟悉 Appwish,它是我两周前在dev.to 帖子中启动的一个开源项目。它原本打算作为某种教程,但在Slack上超过 130 人加入后,我决定将重点转向构建一个开源社区,目标是发布一个完全开源、由社区驱动的真实平台。
要了解 Appwish 中的数据流,我们首先看一下图表:
该图显示了 Appwish 中组件之间的数据流和交互。
让我们讨论一下它是如何从 UI 到数据库的。
一切都始于前端
UI 使用 GraphQL 客户端发送一个 GraphQL 查询,该查询命中我们的第一个后端微服务 - GraphQL API。
下图中标记了此部分:
要发送 GraphQL 查询,客户端需要知道 GraphQL 模式,该模式在.graphqls 文件中声明。GraphQL 服务器解析这些文件并将其用作 UI 与后端通信的契约。
假设客户端发送了以下查询:
query {
wish(id: 123) {
title
content
rating
}
}
在这种情况下,GraphQL 服务器必须返回 UI 要求的数据(id=123 的愿望的标题、内容和评级)
它无法独自做到这一点
即使创建了模式,GraphQL 服务器也无法提供 UI 要求的数据。
它不知道怎么做!你需要将模式连接到一些能够获取所需数据或执行适当业务逻辑的可执行代码。
换句话说,GraphQL 模式中声明的所有操作都需要与可以传递
所需数据的代码相关联。
这是通过所谓的RuntimeWirings 和 DataFetchers实现的。RuntimeWiring 将 GraphQL 类型之一(例如“查询愿望”)与 DataFetcher(简单来说 - 一些可以获取愿望数据的代码)关联起来。
下面是在我们的 GraphQL 服务器应用程序中使用 DataFetchers 连接与愿望相关的 GraphQL 类型的示例。
在我们提供 RuntimeWirings 和 DataFetchers 之后,GraphQL 服务器将知道如何传递所需的数据。
让我们讨论一下如何实现 DataFetchers。
您可能已经知道,Appwish 使用Vert.x作为主要后端框架,并使用gRPC进行服务间通信。Vert.x 的核心组件之一是事件总线 (Event Bus)。事件总线可以用作应用程序各个部分之间的通信通道。它甚至可以用于与其他应用程序通信,但在 Appwish 中我们不使用此功能——我们使用 gRPC 与其他后端应用程序通信。事件总线仅用于应用程序内部通信。
要获取愿望数据,DataFetcher 必须与另一个服务——wishservice 进行通信。wishservice 负责
管理愿望数据和相关业务逻辑。
为此,它必须使用 gRPC 请求。但正如您在此处所见,
WishFetcher 不使用 gRPC。相反,它通过事件总线发送事件。“给我关于 wish id=123 的数据!”。
下图中红色圆圈处可以看到这一部分:
DataFetcher 在事件总线上发送事件,然后由 gRPC Verticle 处理。
Verticle是 Vert.x 的另一个重要概念。为了简单起见,您可以将其视为处理事件的事件循环。它在概念上与 Node.js 的事件循环非常相似——它也实现了Reactor 模式。
gRPC Verticle 监听 DataFetchers 发送的事件,并使用 gRPC 客户端从其他微服务获取所需的数据。
下图标记了gRPC 垂直使用 gRPC 服务存根(客户端)从其他微服务获取所需数据的部分:
这种 gRPC 通信是如何发生的?
实际上,gRPC 客户端是生成的“服务存根”。使用 gRPC 时,默认也会使用 protobuf。gRPC 使用 protobuf 作为底层消息交换格式以及其 IDL(接口定义语言)。
我们创建.proto 文件,然后使用它们来生成我们想要的任何编程语言的代码 - 在本例中是 Java。
例如:
message WishProto {
int64 id = 1;
int64 authorId = 2;
string title = 3;
string content = 4;
string url = 5;
string cover_image_url = 6;
}
用于生成一种数据传输对象类,该类保存我们需要的数据,并且可以非常高效地序列化
和反序列化以进行网络传输。
这部分:
service WishService {
rpc GetAllWish (AllWishQueryProto) returns (AllWishReplyProto) {
}
rpc GetWish (WishQueryProto) returns (WishReplyProto) {
}
rpc CreateWish (WishInputProto) returns (WishReplyProto) {
}
rpc UpdateWish (UpdateWishInputProto) returns (WishReplyProto) {
}
rpc DeleteWish (WishQueryProto) returns (WishDeleteReplyProto) {
}
}
定义与 wish 服务通信的接口。所有 RPC 方法都有名称、输入和输出。
使用此声明,我们可以生成 Java 代码,然后用于与愿望服务和其他
应用程序进行通信。
当我们从 .proto 文件中的服务声明生成源代码时,我们会同时获得客户端和服务器。希望服务使用生成的服务器来处理来自其他应用的请求,而其他应用则使用生成的客户端来发送请求。
在客户端,生成的客户端(服务存根)就是您与 wish 服务通信所需的全部内容。您无需实现任何功能(当然,除了处理从其他应用程序接收的数据的逻辑之外)。
您可以在此处看到生成的客户端的使用情况。
我将快速介绍一下存根的一个用法:
eventBus.<WishQueryProto>consumer(Address.WISH.get(), event -> {
stub.getWish(event.body(), grpc -> {
if (grpc.succeeded()) {
event.reply(grpc.result());
} else {
event.fail(FAILURE_CODE, WISH_SERVICE_COMMUNICATION_ERROR_MESSAGE);
}
});
});
gRPC 客户端服务注册一个事件消费者,接受“WishQueries”作为输入。
当事件发生时(当 DataFetchers 请求愿望数据时),上面的代码就会被执行。
调用 Stub.getWish() 。 Stub 是生成的 wish 服务 gRPC 客户端,getWish 对应我们在 .proto 文件中声明的内容:
rpc GetWish (WishQueryProto) returns (WishReplyProto) {
}
如您所见,我们使用 WishQueryProt 作为输入,调用 stub.GetWish() 方法,并等待 WishReply 输出。
它与我们在 .proto 文件服务定义中声明的内容完全匹配。
等待另一个应用程序的结果是异步的,因此等待结果的代码被声明为 lambda 回调,当结果到达时调用:
grpc -> {
if (grpc.succeeded()) {
event.reply(grpc.result());
} else {
event.fail(FAILURE_CODE, WISH_SERVICE_COMMUNICATION_ERROR_MESSAGE);
}
}
如果 gRPC 调用成功,我们会通过事件总线向 DataFetcher 发送一个包含 gRPC 请求结果的 REPLY 消息(WishReply)。
否则,我们会通知 DataFetcher 调用失败。
好吧,这有点难。我们从 UI 发送的 GraphQL 查询经过了 GraphQL 服务器、DataFetcher、gRPC 客户端,现在已经到达
wish 服务的 gRPC 服务器。
现在让我们讨论一下 gRPC 服务器如何提供 gRPC 调用
为了实现 gRPC 服务器,我们需要再次遵循我们在 .proto 文件中声明的内容:
service WishService {
rpc GetAllWish (AllWishQueryProto) returns (AllWishReplyProto) {
}
rpc GetWish (WishQueryProto) returns (WishReplyProto) {
}
rpc CreateWish (WishInputProto) returns (WishReplyProto) {
}
rpc UpdateWish (UpdateWishInputProto) returns (WishReplyProto) {
}
rpc DeleteWish (WishQueryProto) returns (WishDeleteReplyProto) {
}
}
生成的 gRPC 服务器有 5 种方法我们需要覆盖/实现 - GetAllWish、GetWish、CreateWish、UpdateWish 和 DeleteWish 以及它们相应的输入和输出(所有内容都根据 .proto 文件)。
我们所要做的就是为这些方法提供实现。
希望服务能在这里完成。
让我们看一下其中一个重写的方法:
/**
* This method gets invoked when other service (app, microservice) invokes stub.getWish(...)
*/
@Override
public void getWish(final WishQueryProto request, final Promise<WishReplyProto> response) {
eventBus.<Optional<Wish>>request(
Address.FIND_ONE_WISH.get(), converter.toDomain(WishQuery.class, request),
event -> {
if (event.succeeded() && event.result().body().isPresent()) {
response.complete(converter.toProtobuf(WishReplyProto.class, new WishReply(event.result().body().get())));
} else if (event.succeeded()) {
response.complete();
} else {
response.fail(event.cause());
}
});
}
如你所见,方法名与我们在 .proto 文件中声明的一致。我们还获得了一个 WishQuery 输入和一个 WishReply 的 Promise。
我们要做的是获取输入,与数据库对话以获取所需数据并解决 WishReply 承诺。
由于我们使用 Vert.x,它通过事件总线 (Event Bus) 实现。我们发送一个事件总线请求,然后等待某个处理程序返回我们需要的信息。
此部分在下图中标记:
gRPC 服务器使用事件总线请求愿望数据。在事件总线的另一侧,有一个数据库 Verticle,它接收这些请求,与数据库通信并返回结果:
数据库 Verticle 使用 DatabaseService 来注册监听愿望数据请求的处理程序。
这是DatabaseService 的实现。
如您所见,事件总线处理程序声明如下:
public void registerEventBusEventHandlers() {
eventBus.<AllWishQuery>consumer(Address.FIND_ALL_WISHES.get())
.handler(event -> wishRepository.findAll(event.body()).setHandler(findAllHandler(event)));
eventBus.<WishQuery>consumer(Address.FIND_ONE_WISH.get())
.handler(event -> wishRepository.findOne(event.body()).setHandler(findOneHandler(event)));
eventBus.<WishInput>consumer(Address.CREATE_ONE_WISH.get())
.handler(event -> wishRepository.addOne(event.body()).setHandler(addOneHandler(event)));
eventBus.<UpdateWishInput>consumer(Address.UPDATE_ONE_WISH.get())
.handler(event -> wishRepository.updateOne(event.body()).setHandler(updateOneHandler(event)));
eventBus.<WishQuery>consumer(Address.DELETE_ONE_WISH.get())
.handler(event -> wishRepository.deleteOne(event.body()).setHandler(deleteOneHandler(event)));
}
它们使用 WishQueries/Inputs 消费事件总线事件。当事件发生时,它们使用 wishRepository(实际包含数据库客户端和与 SQL 查询相关的逻辑的代码)与数据库进行交互。
由于Reactive Vert.x Postgres Client,数据库调用是异步的。
这就是我们为 wishRepository 方法的这些异步结果设置处理程序的原因:
// setHandler to react to async result
wishRepository.findOne(event.body()).setHandler(findOneHandler(event))
如果成功,处理程序只需通过事件总线向从数据库请求数据的 gRPC Verticle 发送回复,如果出现错误,则通知失败:
private Handler<AsyncResult<Optional<Wish>>> updateOneHandler(
final Message<UpdateWishInput> event) {
return query -> {
if (query.succeeded()) {
event.reply(query.result(), new DeliveryOptions().setCodecName(Codec.WISH.getCodecName()));
} else {
event.fail(1, "Error updating the wish in the database");
}
};
}
如果整个流程成功,数据将返回到发送初始 GraphQL 请求的 UI
结论
好了,这就是我在这篇文章中想讲的全部内容。我希望它能让你对Appwish 项目中的数据流有一个高层次的了解。由于涉及的概念太多,我无法一一详述——如果你想深入了解,可以使用我在文章中提供的链接、留言或在Slack上向我提问。
我还想邀请您为该项目做出贡献。
我们所做的一切都是 100% 开源且透明的。已有超过 130 人加入了我们的 Slack 团队。我们正在逐步启动实施工作。如果您想了解更多信息,请阅读我之前的帖子、发表评论或加入Slack频道,我会在那里定期更新项目状态。
链接:https://dev.to/pjeziorowski/react-graphql-grpc-and-reactive-microservices-the-dataflow-in-appwish-platform-explained-34ag