多个用户实时使用同一表单。Nx、NestJs 和 Angular

2025-06-04

多个用户实时使用同一表单。Nx、NestJs 和 Angular

最终产品的视觉呈现

如果你没有时间阅读本教程,这里是代码的最终版本

在本文中,我想探讨一下我曾多次被要求针对不同用例构建的功能。对于分布式和远程团队来说,实时协作是成功的关键。每当我们提到实时应用程序时,我们总会想到同一个例子:聊天。虽然聊天很酷也很重要,但还有一个更简单的东西可以帮助团队最大限度地提高协作:可由多个用户同时编辑的表单。

这看起来很有挑战性,当然,根据具体用例,它可能会更难,成本也更高。它之所以成本高昂,仅仅是因为它意味着需要来回发送更多数据。如果你的应用程序运行在 VPS 或专用服务器上,你或许无需任何额外费用即可实现这一点,但如果你采用无服务器架构,这意味着你月底会花费更多。

在传统的表单实现中,每个客户端都有自己的状态,并且仅在表单提交时发送请求。在这种情况下,事情会变得更加复杂,每次一个客户端更新表单时,所有其他客户端都应该收到此信息。如果您计划在只有少数用户的应用中使用此功能,那么还可以,但如果您计划让 1,000 个用户同时更改表单,则必须考虑到每次更改都会将数据发送给所有 1,000 个用户。

在这种情况下,我将专注于做一个非常简单的实现来帮助您入门,这绝不是一个可以投入生产的应用程序。

问题

假设你有多个用户需要共同努力实现一个目标,你希望尽可能减少摩擦。建立一个实时协作完成同一任务的机制会非常有用。

解决方案

应该有一个服务负责跟踪任务的当前状态并向所有连接的客户端发送更新。客户端使用的 Web 客户端应该显示连接的客户端以及一个可以通过用户交互或来自服务的更新进行更改的表单。

由于并发的可能性很大,我们必须选择一种能够帮助我们实现并发的策略。我个人非常喜欢 Redux,所以我的实现基于它,并根据自己的需求进行了调整。由于这是一个非常小的应用程序,我使用纯 RxJs 来实现状态管理。可以执行的操作包括:

  • Init:设置 Web 客户端的初始状态,在每个客户端加载时触发。
  • ClientConnected:每次客户端连接到服务时,所有客户端都会收到当前连接的客户端的更新列表。
  • 数据:每当客户端连接时,服务都会以当前表单状态进行响应。
  • PatchValue:当客户端通过直接与表单交互来更新表单时,它会将更改发送给服务。
  • ValuePatched:当服务接收到状态变化时,它会将其广播给所有其他客户端。

对于此示例,表单数据非常简单,它仅包含标题和描述,两者均为字符串类型。

执行

首先要选择我们想要使用的技术。我是一名 Angular 开发者,因此我选择使用 Angular 作为 Web 客户端。由于 NestJs 非常酷,我决定用它来实现同步服务。最后,由于 Web 客户端和服务需要实时通信,Nx 可以非常有效地减少重复,并使用共享接口确保传递的消息类型安全。

注意:对于 Web 客户端,您可以使用任何 JS 框架,甚至是纯 JavaScript。服务端也一样,只要您有 Socket.IO 实现,就可以使用 Node 或任何您想要的框架。我使用了 Nx,只是因为我喜欢它,但您也可以跳过这部分。

我们将从生成 Nx 工作区开始。

  • 运行命令npx create-nx-workspace@latest realtime-form
  • angular-nest在提示选项中选择工作区
  • 输入web-client应用程序名称
  • 选择您喜欢的样式表格式(我总是使用 SASS)
  • 进入realtime-form目录

将 Nx 与 NestJs 和 Angular 结合使用的一大优势是它们之间可以共享内容。让我们利用这一点,创建FormData接口和ActionTypes枚举。

转到/libs/api-interfaces/src/lib/api-interfaces.ts并将其内容更改为:



export enum ActionTypes {
  Data = '[Socket] Data',
  ClientConnected = '[Socket] Client Connected',
  ValuePatched = '[Socket] Value Patched',
  PatchValue = '[Form] Patch Value',
  Init = '[Init] Init'
}

export interface FormData {
  title: string;
  description: string;
}


Enter fullscreen mode Exit fullscreen mode

现在我们可以从服务和 Web 客户端使用它们,因为它们是共享的,所以它可以作为两者之间的契约。

我们将从服务开始:

  • 跑步npm i --save @nestjs/websockets @nestjs/platform-socket.io
  • 跑步npm i --save-dev @types/socket.io
  • 进入目录/apps/api/src/app
  • 创建一个名为的新目录events并移动到该目录
  • 创建名为events.gateway.ts
  • 创建名为events.module.ts

接下来您只需写入新文件的内容。

前往/apps/api/src/app/events/events.gateway.ts



import {
  SubscribeMessage,
  WebSocketGateway,
  WebSocketServer
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';

import { ActionTypes, FormData } from '@realtime-form/api-interfaces';

@WebSocketGateway()
export class EventsGateway {
  connectedClients = [];
  data = {};
  @WebSocketServer()
  server: Server;
  private logger: Logger = new Logger('EventsGateway');

  handleConnection(client: Socket) {
    this.connectedClients = [...this.connectedClients, client.id];
    this.logger.log(
      `Client connected: ${client.id} - ${this.connectedClients.length} connected clients.`
    );
    this.server.emit(ActionTypes.ClientConnected, this.connectedClients);
    client.emit(ActionTypes.Data, this.data);
  }

  handleDisconnect(client: Socket) {
    this.connectedClients = this.connectedClients.filter(
      connectedClient => connectedClient !== client.id
    );
    this.logger.log(
      `Client disconnected: ${client.id} - ${this.connectedClients.length} connected clients.`
    );
    this.server.emit(ActionTypes.ClientConnected, this.connectedClients);
  }

  @SubscribeMessage(ActionTypes.PatchValue)
  patchValue(client: Socket, payload: Partial<FormData>) {
    this.data = { ...this.data, ...payload };
    this.logger.log(`Patch value: ${JSON.stringify(payload)}.`);
    client.broadcast.emit(ActionTypes.ValuePatched, payload);
  }
}


Enter fullscreen mode Exit fullscreen mode

如果你对这段代码感到困惑,别担心,我们相信 NestJs 会完成所有繁重的工作。你可以将每个方法视为对事件的响应;连接、断开连接和补丁值。

  • 连接:更新已连接客户端的列表,将发生的事件记录到服务中,向所有当前连接的客户端发出新的connectedClients列表,并向客户端发出表单的当前状态。
  • 断开连接:更新已连接客户端列表,将发生的事件记录到服务中,向所有当前连接的客户端发出新的connectedClients列表。
  • PatchValue:更新表单的当前状态,将发生的事件记录到服务,将新状态广播到所有当前连接的客户端。

注意:this.server.emit 和 client.broadcast.emit 之间的区别在于,第一个将消息发送给所有客户端,而第二个将消息发送给除发送方之外的所有客户端。

现在让我们更新/apps/api/src/app/events/events.module.ts文件:



import { Module } from '@nestjs/common';
import { EventsGateway } from './events.gateway';

@Module({
  providers: [EventsGateway]
})
export class EventsModule {}


Enter fullscreen mode Exit fullscreen mode

以及/apps/api/src/app/app.module.ts文件:



import { Module } from '@nestjs/common';
import { EventsModule } from './events/events.module';

@Module({
  imports: [EventsModule]
})
export class AppModule {}


Enter fullscreen mode Exit fullscreen mode

我还删除了AppControllerAppService文件。并apps/api/src/main.ts用以下内容更新了文件:



import { NestFactory } from '@nestjs/core';
import { AppModule } from './app/app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  const port = 3000;
  await app.listen(port, () => {
    console.log('Listening at http://localhost:' + port);
  });
}

bootstrap();


Enter fullscreen mode Exit fullscreen mode

现在是时候开始使用 Web 客户端了,请转到apps/web-client/src/app/app.component.html



<header>
  <h1>Realtime Form</h1>
</header>

<main>
  <form [formGroup]="form">
    <fieldset>
      <label class="form-control">
        <span>Title: </span>
        <input formControlName="title" />
      </label>

      <label class="form-control">
        <span>Description: </span>
        <textarea formControlName="description" rows="5"></textarea>
      </label>
    </fieldset>
  </form>

  <ng-container *ngIf="connectedClients$ | async as clients">
    <h2>Clients ({{ clients.length }})</h2>
    <ul>
      <li *ngFor="let client of clients">{{ client }}</li>
    </ul>
  </ng-container>
</main>


Enter fullscreen mode Exit fullscreen mode

为了确保它看起来就像我在开始时展示的那样,请转到/apps/web-client/src/app/app.component.scss并将其内容替换为以下内容:



form {
  width: 100%;
  padding: 0.5rem;
  max-width: 600px;

  .form-control {
    display: flex;
    margin-bottom: 1rem;

    & > span {
      flex-basis: 20%;
    }

    & > input,
    & > textarea {
      flex-grow: 1;
    }
  }
}


Enter fullscreen mode Exit fullscreen mode

使用以下命令安装 Angular 的 Socket IO 包npm install --save ngx-socket-io

不要忘记在 Web 客户端中注入ReactiveFormsModule。前往SocketIoModuleAppModule/apps/web-client/src/app/app.module.ts



import { BrowserModule } from '@angular/platform-browser';
import { NgModule } from '@angular/core';
import { ReactiveFormsModule } from '@angular/forms';

import { AppComponent } from './app.component';

import { SocketIoModule, SocketIoConfig } from 'ngx-socket-io';

const config: SocketIoConfig = {
  url: 'http://192.168.1.2:3000',
  options: {}
};

@NgModule({
  declarations: [AppComponent],
  imports: [BrowserModule, ReactiveFormsModule, SocketIoModule.forRoot(config)],
  providers: [],
  bootstrap: [AppComponent]
})
export class AppModule {}


Enter fullscreen mode Exit fullscreen mode

接下来前往apps/web-client/src/app/app.component.ts



import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, merge } from 'rxjs';
import { scan, map } from 'rxjs/operators';
import { FormBuilder } from '@angular/forms';
import { Socket } from 'ngx-socket-io';

import { ActionTypes, FormData } from '@realtime-form/api-interfaces';
import { State, reducer } from './core/state';
import {
  ClientConnected,
  Data,
  ValuePatched,
  Action,
  Init
} from './core/actions';
import {
  getPatchValueEffect,
  getValuePatchedEffect,
  getFormChangesEffect
} from './core/effects';

@Component({
  selector: 'realtime-form-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
  // 1: Action dispatcher
  private dispatcher = new BehaviorSubject<Action>(new Init());
  actions$ = this.dispatcher.asObservable();
  // 2: State stream
  store$ = this.actions$.pipe(
    scan((state: State, action: Action) => reducer(state, action))
  );
  // 3: Define all the selectors
  connectedClients$ = this.store$.pipe(
    map((state: State) => state.connectedClients)
  );
  data$ = this.store$.pipe(map((state: State) => state.data));
  title$ = this.data$.pipe(map((state: Partial<FormData>) => state.title));
  description$ = this.data$.pipe(
    map((state: Partial<FormData>) => state.description)
  );

  // 4: Initialize the form
  form = this.fb.group({
    title: [''],
    description: ['']
  });

  constructor(private socket: Socket, private fb: FormBuilder) {}

  ngOnInit() {
    // 5: Connect to all the socket events
    this.socket.on(ActionTypes.ClientConnected, (payload: string[]) => {
      this.dispatcher.next(new ClientConnected(payload));
    });

    this.socket.on(ActionTypes.Data, (payload: Partial<FormData>) => {
      this.dispatcher.next(new Data(payload));
    });

    this.socket.on(ActionTypes.ValuePatched, (payload: Partial<FormData>) => {
      this.dispatcher.next(new ValuePatched(payload));
    });

    // 6: Subscribe to all the effects
    merge(
      getPatchValueEffect(this.socket, this.actions$),
      getValuePatchedEffect(this.form, this.actions$),
      getFormChangesEffect(this.form, this.dispatcher)
    ).subscribe();
  }
}


Enter fullscreen mode Exit fullscreen mode

让我们回顾一下我刚才做的每件事:

1:动作调度器

我首先创建一个动作调度器和一个从动作流中可观察的对象,我使用 RxJs BehaviorSubject 和一个如下所示的初始动作:



// apps/web-client/src/app/core/actions/init.action.ts
import { ActionTypes } from '@realtime-form/api-interfaces';

export class Init {
  type = ActionTypes.Init;
  payload = null;
}


Enter fullscreen mode Exit fullscreen mode

我还在Action桶导入中创建了一个类型,以使其更易于使用:



// apps/web-client/src/app/core/actions/index.ts
import { Init } from './init.action';

export type Action = Init;
export { Init };


Enter fullscreen mode Exit fullscreen mode

2:状态流

通过使用 scan 运算符,我们可以获取可观察对象的每一次发射,并保存一个内部状态,该状态会通过其回调的返回进行更新。使用 reducer 函数,该函数接受状态和动作,并以不可变的方式返回状态,这样我们就可以更安全地获取当前状态的流。

我创建了一个如下所示的减速器:



// apps/web-client/src/app/core/state/state.reducer.ts
import { ActionTypes } from '@realtime-form/api-interfaces';
import { State } from './state.interface';
import { Action } from '../actions';
import { initialState } from './initial-state.const';

export const reducer = (state: State, action: Action): State => {
  switch (action.type) {
    case ActionTypes.Init:
      return { ...initialState };
    case ActionTypes.ClientConnected:
      return {
        ...state,
        connectedClients: action.payload
      };
    case ActionTypes.Data:
      return { ...state, data: action.payload };
    case ActionTypes.PatchValue:
      return { ...state, data: { ...state.data, ...action.payload } };
    default:
      return { ...state };
  }
};


Enter fullscreen mode Exit fullscreen mode

动作简要描述:

  • Init:将状态设置为initialStateconst。
  • ClientConnected:使用更新后的列表更新状态中的connectedClients。
  • 数据:将状态的数据设置为连接时返回的值。
  • PatchValue:使用有效载荷中的更改来修补数据。

界面State如下:



// apps/web-client/src/app/core/state/state.interface.ts
import { FormData } from '@realtime-form/api-interfaces';

export interface State {
  connectedClients: string[];
  data: Partial<FormData>;
}


Enter fullscreen mode Exit fullscreen mode

constinitialState看起来像这样:



// apps/web-client/src/app/core/state/initial-state.const.ts
import { State } from './state.interface';

export const initialState = {
  connectedClients: [],
  data: {}
} as State;


Enter fullscreen mode Exit fullscreen mode

我也在这里创建了一个桶导入,我有点喜欢它们。



export { initialState } from './initial-state.const';
export { State } from './state.interface';
export { reducer } from './state.reducer';


Enter fullscreen mode Exit fullscreen mode

3:定义所有选择器

为了方便访问商店中的值,我创建了一组额外的可观察对象,它们基本上将状态映射到子状态,其工作方式类似于投影。

4:初始化表单

我刚刚使用 ReactiveForms 创建了一个非常非常简单的表单,如果您想了解更多信息,可以查看我的 ReactiveForms 系列。

5:连接所有套接字事件

正如我们刚才看到的,我们的服务可以发出三个事件,在这一步中,我们将监听这些事件并做出相应的响应。为了使其更清晰,我创建了一些动作创建器类。



// apps/web-client/src/app/core/actions/client-connected.action.ts
import { ActionTypes } from '@realtime-form/api-interfaces';

export class ClientConnected {
  type = ActionTypes.ClientConnected;

  constructor(public payload: string[]) {}
}


Enter fullscreen mode Exit fullscreen mode


// apps/web-client/src/app/core/actions/data.action.ts
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';

export class Data {
  type = ActionTypes.Data;

  constructor(public payload: Partial<FormData>) {}
}


Enter fullscreen mode Exit fullscreen mode


// apps/web-client/src/app/core/actions/value-patched.action.ts
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';

export class ValuePatched {
  type = ActionTypes.ValuePatched;

  constructor(public payload: Partial<FormData>) {}
}


Enter fullscreen mode Exit fullscreen mode

并且不要忘记更新桶导入



// apps/web-client/src/app/core/actions/index.ts
import { Init } from './init.action';
import { Data } from './data.action';
import { ClientConnected } from './client-connected.action';
import { ValuePatched } from './value-patched.action';

export type Action = Init | Data | ClientConnected | ValuePatched;
export { Init, Data, ClientConnected, ValuePatched };


Enter fullscreen mode Exit fullscreen mode

6:订阅所有效果

剩下的就是副作用了。让我们逐一看看:

当用户更新表单时,这些更改必须广播到所有其他客户端,为此我们需要将其发送到服务。我们可以这样做:



// apps/web-client/src/app/core/effects/patch-value.effect.ts
import { Action } from '../actions';
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn, filter, tap } from 'rxjs/operators';
import { ActionTypes } from '@realtime-form/api-interfaces';
import { Socket } from 'ngx-socket-io';

export const getPatchValueEffect = (
  socket: Socket,
  actions: Observable<Action>
) => {
  return actions.pipe(
    observeOn(asyncScheduler),
    filter(action => action.type === ActionTypes.PatchValue),
    tap(action => socket.emit(ActionTypes.PatchValue, action.payload))
  );
};


Enter fullscreen mode Exit fullscreen mode

注意:我使用asyncScheduleronly 因为我想确保减速器始终是第一个。

当服务发出值已更改的消息,或在连接后发送当前表单状态时,我们必须做出相应的响应。在这两种情况下,我们已经将套接字事件映射到一个动作,现在我们只需要一个效果来为每个客户端本地更新表单。



// apps/web-client/src/app/core/effects/value-patched.effect.ts
import { Action } from '../actions';
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn, filter, tap } from 'rxjs/operators';
import { ActionTypes } from '@realtime-form/api-interfaces';
import { FormGroup } from '@angular/forms';

export const getValuePatchedEffect = (
  form: FormGroup,
  actions: Observable<Action>
) => {
  return actions.pipe(
    observeOn(asyncScheduler),
    filter(
      action =>
        action.type === ActionTypes.ValuePatched ||
        action.type === ActionTypes.Data
    ),
    tap(action => form.patchValue(action.payload, { emitEvent: false }))
  );
};


Enter fullscreen mode Exit fullscreen mode

最后,每当客户端与表单交互时,我们都希望向服务发出一条消息,将该更改传播到所有连接的客户端。



// apps/web-client/src/app/core/effects/form-changes.effect.ts
import { Action, PatchValue } from '../actions';
import { merge, BehaviorSubject } from 'rxjs';
import { debounceTime, map, tap } from 'rxjs/operators';
import { FormGroup } from '@angular/forms';
import { FormData } from '@realtime-form/api-interfaces';

export const getFormChangesEffect = (
  form: FormGroup,
  dispatcher: BehaviorSubject<Action>
) => {
  const title$ = form
    .get('title')
    .valueChanges.pipe(map((title: string) => ({ title })));

  const description$ = form
    .get('description')
    .valueChanges.pipe(map((description: string) => ({ description })));

  return merge(title$, description$).pipe(
    debounceTime(300),
    tap((payload: Partial<FormData>) =>
      dispatcher.next(new PatchValue(payload))
    )
  );
};


Enter fullscreen mode Exit fullscreen mode

您可能注意到了一个新的PatchValue动作,所以让我们创建它:



// apps/web-client/src/app/core/actions/patch-value.action.ts
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';

export class PatchValue {
  type = ActionTypes.PatchValue;

  constructor(public payload: Partial<FormData>) {}
}


Enter fullscreen mode Exit fullscreen mode

并更新桶的导入:



// apps/web-client/src/app/core/actions/index.ts
import { Init } from './init.action';
import { Data } from './data.action';
import { ClientConnected } from './client-connected.action';
import { ValuePatched } from './value-patched.action';
import { PatchValue } from './patch-value.action';

export type Action = Init | Data | ClientConnected | ValuePatched | PatchValue;
export { Init, Data, ClientConnected, ValuePatched, PatchValue };


Enter fullscreen mode Exit fullscreen mode

因为我喜欢桶装进口,所以我创建了另一个桶装进口以实现效果:



// apps/web-client/src/app/core/effects/index.ts
export { getFormChangesEffect } from './form-changes.effect';
export { getPatchValueEffect } from './patch-value.effect';
export { getValuePatchedEffect } from './value-patched.effect';


Enter fullscreen mode Exit fullscreen mode

现在您只需要在应用程序主目录中的不同终端中运行每个服务:

  • 运行命令ng serve
  • 运行命令ng serve api

结论

就是这样。我第一次做这件事真的很有挑战性,所以我尽量把每个步骤都讲得清晰易懂,希望你不会感到困惑。正如我之前提到的,这并非一个可以投入生产的实现,但确实是一个很好的起点。既然你已经知道如何解决这个问题了,别忘了有时解决方案可能更糟糕,在某些情况下,这可能会增加基础设施成本。

Flaticon的itim2101制作的图标

文章来源:https://dev.to/danmt/multiple-users-using-the-same-form-in-real-time-nx-nestjs-and-angular-2ck9
PREV
每个开发人员都会有“天哪,我明白了”的时刻。
NEXT
Kubernetes 图解:主题公园类比