NestJS:从 Promises 到 Observables

2025-06-10

NestJS:从 Promises 到 Observables

NestJS 是一个基于 Angular 核心的渐进式 Node.js 框架,用于后端应用程序。它使用 Typescript 构建。在本文中,我们将了解 Observables 如何在特定用例中成为 Promises 的有效替代方案。

理解 NestJS 中的 Promise

在 Javascript/Typescript 中,Promises 是执行异步任务的标准,例如数据库查询、文件操作、HTTP 请求……

// Example of Promise in NestJS

@Injectable()
export class UserService {
  async findById(id: string): Promise<User> {
    return this.userModel.findById(id).exec();
  }
}
Enter fullscreen mode Exit fullscreen mode

Promises 的主要特点是:

  • 简单的异步操作:当处理返回单个值或一次性操作的简单异步任务时,Promises 通常就足够了。
  • 兼容性: JavaScript 原生支持 Promise,并且是与使用基于 Promise 的 API 的库或 API 集成时的首选。
  • 错误处理: Promises 通过 .catch() 或 try-catch 块提供内置错误处理,使其适用于错误处理至关重要的场景。

什么是可观察对象?

Observables 是 RxJS 库中的一项功能,用于表示随时间变化的值流。它们提供了强大的功能来处理异步操作,例如转换组合取消

// Example of Observable in NestJS

@Injectable()
export class UserService {
  findAll(): Observable<User[]> {
    return from(this.userModel.find().exec());
  }
}
Enter fullscreen mode Exit fullscreen mode

使用 Observables,您可以编写对数据和事件的变化做出反应的代码,从而构建更具反应性的应用程序。

Observable 的主要特点是:

  • 复杂的异步工作流:可观察对象在涉及复杂异步工作流的场景中表现出色,例如事件流、实时数据更新或连续数据转换。
  • 数据流:处理数据流或事件序列时,Observables 提供了一种更具表现力和灵活性的方式来处理异步操作。
  • 操作符和管道:可观察对象提供了丰富的操作符,可实现强大的数据流转换、过滤和组合功能,使其成为需要高级数据操作的场景的理想选择。一些常见的操作符包括 map、filter、reduce、merge 和 debounce 等。

我为什么需要 Observables?

使用可观察对象的主要优势之一是它们支持组合。您可以组合多个可观察对象,对它们应用操作符,并创建新的可观察对象。这使得构建复杂的异步工作流和处理数据依赖关系变得容易。

此外,Observable 还支持处理错误和完成。当数据处理过程中出现问题时,Observable 可以发出错误通知,以便你通过补偿过程处理错误并从中恢复。当数据流结束时,它们还会发出通知,指示流式传输结束。

因此,Observables 和 RxJS 可帮助您在处理复杂的异步场景时编写更具响应性、声明性和高效的代码。

NestJS 中有哪些常见用例?

将 RxJS 与 NestJS 应用程序结合使用可以增强其处理异步操作和创建响应式管道的功能。以下是 NestJS 中一些常见的 RxJS 用例。

异步

任何应用程序都可能需要执行异步耗时操作。Observable 可以用来表示异步数据流,并应用一些操作符,例如from将 Promise 或回调函数转换为 Observable 并进行数据处理。

import { Injectable } from '@nestjs/common';
import { Observable, from } from 'rxjs';
import { AxiosResponse } from 'axios';

@Injectable()
export class DataService {
  fetchData(): Observable<any> {
    return from(getExternalDataFromAPI());
  }
}

async function getExternalDataFromAPI(): Promise<AxiosResponse> {
  // Simulate fetching data from an external API
  return axios.get('https://myapi.com/data');
}
Enter fullscreen mode Exit fullscreen mode

反应控制器

您可以创建响应端点来响应数据或事件的变化,这要归功于代表数据流的 Observables,然后将结果作为响应返回。

import { Controller, Get } from '@nestjs/common';
import { Observable, interval } from 'rxjs';
import { map } from 'rxjs/operators';

@Controller('data')
export class DataReactiveController {
  @Get('stream')
  streamData(): Observable<number> {
    // Simulate streaming data with an interval
    return interval(1000).pipe(
      map(() => Math.random()) // Transform the interval data
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

进程间通信

在微服务架构中,NestJS 应用程序可能需要与其他服务进行通信。RxJS 可以使用 Observables 作为服务间数据流传输的手段,从而促进这种通信。您可以使用switchMap或 之类的操作符mergeMap来处理数据依赖关系,并以响应式的方式进行多个服务调用。

import { Controller, Get, Inject } from '@nestjs/common';
import { ClientProxy, ClientProxyFactory, Transport } from '@nestjs/microservices';
import { Observable } from 'rxjs';
import { switchMap } from 'rxjs/operators';

@Controller('data')
export class DataController {
  private readonly client: ClientProxy;

  constructor() {
    this.client = ClientProxyFactory.create({
      transport: Transport.TCP,
      options: {
        host: 'localhost',
        port: 8888,
      },
    });
  }

  @Get()
  fetchData(): Observable<YourType> {
    return this.client.send<YourType>({ cmd: 'fetchData' }).pipe(
      switchMap(response => {
        // Perform additional operations with the received data
        // For example, make another service call based on the initial response
        return this.client.send<YourType>({ cmd: 'processData', data: response });
      })
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

有时,在复杂的沟通流程中,用图形来表示会很有帮助。这就是为什么你可以使用 Marbles。

在 RxJS 中,弹珠是一种可视化的表示形式,用于展示可观察序列、操作符和基于时间的事件的行为。这些图表由 -、|、^ 和 # 等字符组成,分别表示可观察流的不同方面,包括随时间发出的值、完成、错误和订阅点。您可以使用ThinkRx等工具来可视化您的流程。

中间件和管道

NestJS 提供了中间件和管道,用于拦截和修改传入的请求和传出的响应。您可以使用 RxJS 运算符来处理中间件或管道中的异步操作。例如,您可以使用map运算符来转换数据,或catchError使用运算符来处理错误。

import { Injectable, NestMiddleware } from '@nestjs/common';
import { Request, Response } from 'express';
import { Observable, of } from 'rxjs';
import { catchError, map } from 'rxjs/operators';

@Injectable()
export class LoggingMiddleware implements NestMiddleware {
  use(req: Request, res: Response, next: () => void) {
    console.log('Logging middleware executing...');
    // Simulate an asynchronous operation
    this.asyncOperation().pipe(
      map(data => {
        // Transform data if needed
        return data.toUpperCase();
      }),
      catchError(error => {
        // Handle errors if any
        console.error('Error occurred in logging middleware:', error);
        return of('Error occurred in logging middleware');
      })
    ).subscribe(
      transformedData => {
        console.log('Transformed data:', transformedData);
        next();
      }
    );
  }

  asyncOperation(): Observable<string> {
    return new Observable<string>(observer => {
      setTimeout(() => {
        observer.next('Data from async operation');
        observer.complete();
      }, 1000);
    });
  }
}
Enter fullscreen mode Exit fullscreen mode

事件驱动编程

NestJS 应用程序可以从事件驱动编程中受益,其中组件会对事件做出响应并相应地触发操作。RxJS 提供了一组丰富的操作符来处理事件流。您可以使用主题或事件发射器作为可观察对象来表示事件,并使用诸如filter或 之类的操作符debounceTime来处理事件流转换。让我们用一个实时通知系统来说明这一点:

import { Injectable } from '@nestjs/common';
import { Subject, Observable } from 'rxjs';
import { filter, debounceTime } from 'rxjs/operators';

@Injectable()
export class EventService {
  private eventSubject = new Subject<string>();

  emitEvent(event: string): void {
    this.eventSubject.next(event);
  }

  getFilteredEvents(keyword: string): Observable<string> {
    return this.eventSubject.pipe(
      filter(event => event.includes(keyword))
    );
  }

  getDebouncedEvents(time: number): Observable<string> {
    return this.eventSubject.pipe(
      debounceTime(time)
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

测试

您还可以使用 RxJS 中的测试实用程序为 NestJS 应用程序编写测试。您可以使用toArray或 之类的运算符toPromise将 Observable 转换为数组或 Promise,以便在测试期间断言所发出的值。

让我们想象一个像这样的数据服务:

import { Injectable } from '@nestjs/common';
import { HttpClient } from '@nestjs/common';
import { Observable } from 'rxjs';
import { map } from 'rxjs/operators';

@Injectable()
export class DataService {
  constructor(private readonly http: HttpClient) {}

  fetchData(): Observable<YourType[]> {
    return this.http.get<YourType[]>('https://myapi.com/data').pipe(
      map(response => response.map(item => ({ id: item.id, name: item.name })))
    );
  }
}
Enter fullscreen mode Exit fullscreen mode

以下是我们借助 RxJS 构建的 NestJS 测试:

import { Test, TestingModule } from '@nestjs/testing';
import { DataService } from './data.service';
import { HttpClientTestingModule, HttpTestingController } from '@nestjs/common/testing';
import { of } from 'rxjs';

describe('DataService', () => {
  let service: DataService;
  let httpTestingController: HttpTestingController;

  beforeEach(async () => {
    const module: TestingModule = await Test.createTestingModule({
      imports: [HttpClientTestingModule],
      providers: [DataService],
    }).compile();

    service = module.get<DataService>(DataService);
    httpTestingController = module.get<HttpTestingController>(HttpTestingController);
  });

  afterEach(() => {
    httpTestingController.verify();
  });

  it('should be defined', () => {
    expect(service).toBeDefined();
  });

  it('should fetch data from the API and transform it', (done) => {
    const testData = [{ id: 1, name: 'Item 1' }, { id: 2, name: 'Item 2' }];
    const transformedData = [{ id: 1, name: 'Item 1' }, { id: 2, name: 'Item 2' }];

    service.fetchData().subscribe((data) => {
      expect(data).toEqual(transformedData);
      done();
    });

    const req = httpTestingController.expectOne('https://myapi.com/data');
    expect(req.request.method).toEqual('GET');

    req.flush(testData);
  });

  it('should handle errors', (done) => {
    const errorResponse = { status: 404, message: 'Not Found' };

    service.fetchData().subscribe(
      () => {},
      (error) => {
        expect(error).toEqual(errorResponse);
        done();
      }
    );

    const req = httpTestingController.expectOne('https://myapi.com/data');
    req.error(new ErrorEvent('Error'));
  });
});

Enter fullscreen mode Exit fullscreen mode

我们创建了一个 DataService,它使用来自 @nestjs/common 的 HttpClient 从外部 API 获取数据。fetchData 方法使用 map 运算符转换数据,然后将其作为 Observable 返回。

在测试中,我们使用 @nestjs/testing 中的 Test.createTestingModule 来设置测试模块。我们从 @nestjs/common/testing 导入 HttpClientTestingModule 来模拟 HttpClient。然后,我们通过订阅 Observable 并断言发出的值来测试 fetchData 方法的行为。我们还通过模拟 API 的错误响应来测试错误处理。

通过使用 RxJS 测试实用程序(例如来自 @nestjs/common/testing 的 of、toPromise 和 HttpTestingController),我们可以轻松地为使用 Observables 的 NestJS 应用程序编写测试,确保我们的服务按预期运行并优雅地处理错误。

结论

必要时,在 NestJS 中从 Promises 过渡到 Observables 和 RxJS 操作符,为处理复杂的异步工作流开辟了新的可能性。无论您是从外部 API 获取数据pipe、处理实时更新,还是管理事件流,它都为在 NestJS 中进行可持续的异步编程提供了强大的工具。

鏂囩珷鏉ユ簮锛�https://dev.to/onepoint/nestjs-from-promises-to-observables-2gl7
PREV
38 个很棒的 AP​​I 来启发你的下一个软件项目
NEXT
像绝地武士一样掌握 Web 性能