第 1 部分:介绍和设置

2025-06-10

第 1 部分:介绍和设置

John 是 NestJS 核心团队的成员

介绍

本系列文章涵盖了为NestJS 微服务子系统构建自定义传输器的主题。这篇文章适合你吗?如果你属于以下两类读者之一,那么这篇文章就适合你:

  • 您正在尝试构建或修改 NestJS 微服务传输器。这一点可能很明显😄。
  • 您是一位 NestJS 应用程序开发者(即在通常称为“用户空间”的领域编写代码的人),并且想要更深入地了解 NestJS 微服务的工作原理。坦白说,这正是我的出发点,也是我进行研究并撰写本系列文章的主要动机。即使您从未打算构建自己的传输器,您仍然可以像我一样,从这种更深入的理解中受益。在此过程中,我将尝试分享一些见解,帮助您更好地使用 Nest 微服务。

推荐阅读

如果您还没有读过,请查看我的NestJS 微服务实践系列,其中我介绍了 NestJS 微服务子系统架构的许多基础知识,并建立了我在所有 Nest 微服务文章中使用的术语,包括回答“传输器到底是什么?”这个问题。

星际迷航运输机

很接近了,但是还没有成功!

无论如何,虽然没有严格要求,但本系列假设您对该文章系列中的材料有很好的理解。

为什么要定制运输车?

让我们从“为什么?”这个问题开始。Nest微服务子系统提供了一个通信层抽象,使应用程序(包括 Nest 和非 Nest)可以轻松地通过各种所谓的传输器进行通信。这提供了几个关键优势,在上一篇系列文章中已经完整介绍过。Nest 内置了各种传输器,包括 NATS、RabbitMQ、Kafka 等。但是,如果您想构建自己的传输器(例如 ZeroMQ、Google Cloud Pub/Sub、Amazon Kinesis 或 Apache ActiveMQ),该怎么办?如果这是您的愿望,那么您来对地方了!或者,如果您只是想进一步了解 Nest 基础设施这一关键部分的工作原理,请继续阅读!

一个相关的问题是“如何扩展现有 Nest 传输器的功能?”例如,您可能想将 MQTT 的QoS 功能MQTT 传输器一起使用。这是我将在另一系列文章(已在开发中💥)中讨论的主题,但本系列文章为满足该需求提供了基础。

文章系列概述

  • 第一部分(本文):为整个系列奠定基础。我们将首先研究简单的Faye 消息代理,为构建基于其的自定义 Nest 传输器奠定基础。完成第五部分后,我们将拥有一个功能齐全的 Faye 传输器!
  • 第二部分:着手构建我们自定义传输器的服务器端的初始版本。最终成果是一个可运行的版本,它将帮助您熟悉构建任何自定义传输器的服务器组件所需的主要概念。为了让我们专注于主线,我们将一些重要细节的实现推迟到下一篇文章。
  • 第 3 部分:清理一些松散的部分,深入研究框架的一些更先进和令人印象深刻的功能,并向您展示如何在自定义传输器中启用这些功能。
  • 第四部分:切换到客户端。与服务器端一样,本文将介绍Faye 的基本功能实现,并将一些更复杂的功能留到下一篇文章中介绍。
  • 第 5 部分:完成客户端旅程,为 Faye 打造一个功能齐全的定制运输车。
  • 第 6 部分:(即将推出!)调查一些内置的 NestJS 传输器,并将它们的实现与 Faye 实现进行比较,以阐明一些特定于代理的细微实现细节。

让我们开始吧!

获取代码

这些文章中的所有代码都可以在这里找到。与往常一样,如果您按照代码进行操作,这些教程的效果会更好。README涵盖了获取仓库、构建应用程序以及后续操作所需的所有步骤。很简单!我强烈建议您这样做。请注意,每篇文章在仓库中都有一个对应的分支。例如,本文(第 1 部分)就有一个名为的对应分支part1。继续阅读,或在此处获取有关使用仓库的更多详细信息。

Git 签出当前版本

对于本文,您应该使用git checkout分支part1。您可以在此处获取有关 git 存储库分支的更多信息。

构建此部分的应用程序

在本系列的每篇文章中,我们都会介绍一些新组件(有时甚至是全新的项目)。为了方便起见,在每篇文章的开头,您应该从顶层目录(克隆代码库的位置)运行以下命令:*

$ # from root directory of project (e.g., transporter-tutorial, or whatever you chose as the root)
$ sh build.sh
Enter fullscreen mode Exit fullscreen mode

*这是一个在类 Linux 系统上运行的 Shell 脚本。您可能需要针对非 Linux 系统进行一些调整。请注意,该脚本只是npm install && npm run build在每个顶级目录中运行的一个便利程序,因此如果您在使用脚本时遇到问题,可以随时回退到该技术。

第 1 部分概述

在本案例研究中,我们将构建一个自定义传输器,以便与Faye消息代理配合使用。Faye 是一个简单的开源 JavaScript 发布/订阅消息代理,可以在 Node.js 上流畅运行。它的启动和运行以及 API 理解都很简单,因此非常适合作为目标平台。同时,它提供了Nest构建传输器所需的所有功能。

Faye 使用非常简单(几乎是规范的)的发布/订阅协议,如下图 1 所示。

Faye消息协议

图1:Faye消息协议

构建基本的 Faye 客户端应用程序

正如我们在NestJS 微服务实战系列中所做的那样,我们将从构建简单的原生请求器和响应器应用开始。这些是 TypeScript 应用,可以直接使用 Faye 客户端 API。这将帮助我们确保理解 Faye 客户端 API,并为我们提供一个便捷的测试平台。

正如上一篇系列文章所述,Nest 微服务包面临的挑战之一是在发布/订阅语义之上分层请求/响应消息样式。换句话说,Nest 请求者需要能够运行如下代码,该代码发出请求并从远程微服务返回响应:

  @Get('customers')
  getCustomers(): Observable<any> {
    this.logger.log('client#send -> topic: "get-customers"');
    // the following request returns a response as an Observable
    return this.client.send('/get-customers', {});
  }
Enter fullscreen mode Exit fullscreen mode

支持请求/响应

由于 Faye(以及大多数代理)仅支持发布/订阅模式,因此我们需要遵循一个简单的方案来提供请求/响应语义。以下是通常如何实现此操作的通用模式的概述。

假设组件 A 希望从组件 B “获取客户”,而组件 B 可以访问客户数据库。组件 A 可以发布一条'/get-customers'消息,组件 B(假设它已经订阅了该主题)接收该消息,在客户数据库中查询客户列表,然后发送一条响应消息并返回给 A。处理响应消息就是奇迹发生的地方。为了让 B 响应 A,它们双方必须按照约定完成以下几件事:

  • A 选择回应主题(有时称为回复主题
  • A订阅响应主题
  • A发布请求,将响应主题作为消息的一部分传递
  • B发布响应,使用响应主题作为其消息的主题

💡 我们已经见过几次这种“订阅响应,然后发送请求”的模式了,以后还会再见到。这是一个至关重要的模式,值得永远铭刻在你的脑海里。为此,我们发明一个简单的助记符。这样,我们就能在任何需要的时候快速地在脑海中想起这个模式。我把它称为STRPTQ 。这些字母来自“订阅响应,然后发布请求Q为了容易记住请求不是响应,并且顺序要正确。

正如我们在开发 Faye 传输器时所看到的,Nest 会自动为您选择响应主题名称,从而使此过程更加轻松。实际上,Nest 会根据您声明的每个模式构建两个主题。例如,假设您定义一个如下所示的Nest 微服务响应器消息模式:

@MessagePattern('/get-customers')
Enter fullscreen mode Exit fullscreen mode

注意:Nest 使用术语“渠道”作为主题(某些代理也称为主题和渠道)的通用内部名称,并帮助消除它们与它们所衍生的模式的用户空间概念的歧义。)

在内部,Nest 根据上述模式构建了两个渠道:

  • '/get-customers_ack'- 这是我们将在 Faye 传输器中用于发布/订阅主题请求消息的物理通道名称'/get-customers'
  • '/get-customers_res'- 这是我们将在 Faye 传输器中用来发布/订阅请求响应消息的物理通道名称'/get-customers'

让我们构建之前提到的那些原生应用。注意:稍后我们将使用这些原生应用来测试 Nest 传输器,因此为了使它们与 Nest 无缝交互,我们将使用上面描述的 Nest 通道名称。我们将从响应器应用开始。

本机响应应用程序(客户服务)

这是响应入站消息(请求)并生成响应的应用程序。假设您正在跟踪此分支part1,请打开文件customerService/src/service.ts

getCustomers 以下是Faye 订阅处理程序的实现。这是我们注册的代码,用于在应用从频道接收到入站消息时运行'/get-customers_ack'(请参阅subscribe()文件下方的调用):

// customerService/src/service.ts
function getCustomers(packet): void {
  const message = parsePacket(packet);
  console.log(
    `\n========== <<< 'get-customers' message >>> ==========\n${JSON.stringify(
      message,
    )}\n=============================================\n`,
  );

  // filter customers list if there's a `customerId` param
  const customers =
    message.data && message.data.customerId
      ? customerList.filter(
          cust => cust.id === parseInt(message.data.customerId, 10),
        )
      : customerList;

  client.publish('/get-customers_res', getPayload({ customers }, message.id));
}
Enter fullscreen mode Exit fullscreen mode

逻辑应该很容易理解。请参阅完整清单,并确保您了解我们在这里如何使用'/get-customers_ack''/get-customers_res'通道。我们会看到很多这样的模式,所以请确保它对您来说有意义。

需要指出的一个细节是对 的调用getPayload()。我们来讨论一下。如上所述,我们正在构建这些原生应用以符合 Nest 协议。这包括使用 Nest 标准通道名称,以及匹配 Nest 内部消息格式(消息格式在NestJS 微服务实战系列中有详细介绍)。由于我们采用了这种方法,这些应用为 Nest 传输器提供了一个便捷的测试平台Server,而ClientProxy我们在本系列文章中也正在构建它。消息格式要求意味着我们需要将响应包装在一个标准对象中,我们可以像这样描述:

   {
     /**
      *  Request error message, if any
      */
     err: any,
     /**
      *  The response message payload (return value of a
      *  message pattern handler)
      */
     response: any
     /**
      *  Status of an Observable response. `true` once the final
      *  stream value has been retrieved.
      */
     isDisposed: boolean
     /**
      *  Unique identifier, corresponding to the id field received
      *  from the initial request
      */
     id: string
   }
Enter fullscreen mode Exit fullscreen mode

还有getPayload()一个辅助函数,它将有效负载包装在这个标准的 Nest 消息结构中。如果您仔细观察,会注意到getPayload()(这里没有显示——请查看service.ts源代码查看)将message.id字段从入站请求复制到出站消息。我们还没有讨论过这些id字段,但从第 5 部分开始深入研究传输器的“客户端”时,它们将变得非常重要。

本机请求者应用程序(customerApp)

这是面向用户的应用程序,用于发出出站请求。请求者位于customerApp/src/customer-app.ts。以下是该应用程序中函数的实现getCustomers()

虽然有一些样板代码,但主要逻辑应该很容易理解。它实现了我们几分钟前讲过的STRPTQ请求/响应协议请求端。

// customerApp/src/customer-app.ts
async function getCustomers(customerId) {
  // build Nest-shaped message
  const payload = getPayload('/get-customers', { customerId }, uuid());

  return new Promise((resolve, reject) => {
    // subscribe to the response message
    const subscription = client.subscribe('/get-customers_res', result => {
      console.log(
        `==> Receiving 'get-customers' reply: \n${JSON.stringify(
          result.response,
          null,
          2,
        )}\n`,
      );
    });

    // once response is subscribed, publish the request
    subscription.then(() => {
      console.log(
        `<== Sending 'get-customers' request with payload:\n${JSON.stringify(
          payload,
        )}\n`,
      );
      const pub = client.publish('/get-customers_ack', payload);
      pub.then(() => {
        // wait .5 second to ensure subscription handler executes
        // then unsubscribe and resolve
        setTimeout(() => {
          subscription.cancel();
          resolve();
        }, 500);
      });
    });
  });
}
Enter fullscreen mode Exit fullscreen mode

我们首先订阅'/get-customers_res'响应通道)。我们向其传递一个简单的Faye 订阅处理程序,当此通道上的消息返回时,该处理程序会打印出结果。此输出将成为对从命令行发出的最终用户命令(请求)的“最终响应”(通过文件底部的基本命令行处理逻辑接收)。

然后我们在 上发布请求'/get-customers_ack'。注意,对 的调用subscribe()返回了subscription一个 Promise,一旦 Faye 服务器确认订阅操作已完成,该 Promise 就会解析。

因为我们将其作为独立的 Node.js 命令行程序运行,因此需要在发送请求后退出(这样我们就可以将其用作命令处理器,方便地发送请求),所以我们在发布请求后等待 500 毫秒,然后取消订阅并退出。取消订阅是重要的安全措施——如果不取消,订阅就会一直存在(尽管 Faye 最终会检测到并清理它们)。这有点粗暴,我们希望在传输器客户端中进行更复杂的订阅管理,但目前来说已经足够了。

最后,你会注意到,与响应器应用非常相似customerService,我们有一个辅助getPayload()实用程序来将请求包装在 Nest 消息协议中。你还会注意到,我们正在生成一个id字段——与我们在上面的响应器应用中处理的字段相同id。现在,只需将该id字段视为使用原生 Nest 消息协议所需的内容。稍后,我们将更详细地探讨这一点。

为了完整起见,让我们快速看一下如何运行 Faye 服务器。虽然你可以把它当作一个黑匣子,但值得花点时间看看它的运行方式,并欣赏它原生的 node.js 特性和简洁性。打开。代码如下所示。你可以在这里faye-server/server.js阅读有关运行 Faye 服务器的更多信息,但我们基本上只是启动它,并且出于仪表目的,监听各种事件,我们可以将这些事件记录到控制台,以便轻松跟踪握手和消息交换。你根本不需要弄乱它,但如果你这样做了,那也是非常基础的东西。

// faye-server/server.js
const http = require('http');
const faye = require('faye');

const mountPath = '/faye';
const port = 8000;

const server = http.createServer();
const bayeux = new faye.NodeAdapter({ mount: mountPath, timeout: 45 });

bayeux.attach(server);
server.listen(port, () => {
  console.log(
    `listening on http://localhost:${port}${mountPath}\n========================================`,
  );
});

bayeux.on('handshake', clientId =>
  console.log('^^ client connect (#', clientId.substring(0, 4), ')'),
);

bayeux.on('disconnect', clientId =>
  console.log('vv client disconnect (#', clientId.substring(0, 4), ')'),
);

bayeux.on('publish', (clientId, channel, data) => {
  console.log(
    `<== New message from ${clientId.substring(0, 4)} on channel ${channel}`,
    `\n    ** Payload: ${JSON.stringify(data)}`,
  );
});

bayeux.on('subscribe', (clientId, channel) => {
  console.log(
    `++ New subscription from ${clientId.substring(0, 4)} on ${JSON.stringify(
      channel,
    )}`,
  );
});

bayeux.on('unsubscribe', (clientId, channel) => {
  console.log(
    `-- Unsubscribe by ${clientId.substring(0, 4)} on ${JSON.stringify(
      channel,
    )}`,
  );
});
Enter fullscreen mode Exit fullscreen mode

测试应用程序

现在,我们已经完成了请求器和响应器,可以对它们进行测试了。虽然我只回顾了'/get-customers'上面的消息流,但此分支中的代码也实现了该'/add-customer'消息。请花一点时间按照这些简单的说明运行代码。它看起来是这样的。在此视频循环中,Faye 代理在顶部窗格中运行,customerService应用程序在中间窗格中运行,customerApp应用程序在底部窗格中运行(如果您感兴趣,这是在tmux虚拟终端中运行的;如果您感兴趣,可以在这里获取我的设置)。

原生应用演示

屏幕截图:原生应用演示

下一步

在第二部分中,我们将编写并测试 Faye 传输器服务器组件的基础版本。该组件在微服务监听器的上下文中运行,使应用程序能够充当 Nest 响应器(这些术语和概念在NestJS 微服务实战系列中有详细讨论)。我们将在一个简单的 Nest 响应器(微服务)应用中使用它来测试该服务器组件。该 Nest 微服务应用将取代我们刚刚构建的原生应用customerApp,我们将确保它能够响应完全相同的客户相关消息。

欢迎在下方评论区提问、评论或建议,或者直接打个招呼。欢迎加入我们的Discord,一起愉快地讨论 NestJS。我在那里的用户名是Y Prospect

鏂囩珷鏉ユ簮锛�https://dev.to/nestjs/part-1-introduction-and-setup-1a2l
PREV
TypeScript 入门
NEXT
5 分钟内为 Knex.js(或其他基于资源的库)构建 NestJS 模块