多个用户实时使用同一表单。Nx、NestJs 和 Angular
如果你没有时间阅读本教程,这里是代码的最终版本
在本文中,我想探讨一下我曾多次被要求针对不同用例构建的功能。对于分布式和远程团队来说,实时协作是成功的关键。每当我们提到实时应用程序时,我们总会想到同一个例子:聊天。虽然聊天很酷也很重要,但还有一个更简单的东西可以帮助团队最大限度地提高协作:可由多个用户同时编辑的表单。
这看起来很有挑战性,当然,根据具体用例,它可能会更难,成本也更高。它之所以成本高昂,仅仅是因为它意味着需要来回发送更多数据。如果你的应用程序运行在 VPS 或专用服务器上,你或许无需任何额外费用即可实现这一点,但如果你采用无服务器架构,这意味着你月底会花费更多。
在传统的表单实现中,每个客户端都有自己的状态,并且仅在表单提交时发送请求。在这种情况下,事情会变得更加复杂,每次一个客户端更新表单时,所有其他客户端都应该收到此信息。如果您计划在只有少数用户的应用中使用此功能,那么还可以,但如果您计划让 1,000 个用户同时更改表单,则必须考虑到每次更改都会将数据发送给所有 1,000 个用户。
在这种情况下,我将专注于做一个非常简单的实现来帮助您入门,这绝不是一个可以投入生产的应用程序。
问题
假设你有多个用户需要共同努力实现一个目标,你希望尽可能减少摩擦。建立一个实时协作完成同一任务的机制会非常有用。
解决方案
应该有一个服务负责跟踪任务的当前状态并向所有连接的客户端发送更新。客户端使用的 Web 客户端应该显示连接的客户端以及一个可以通过用户交互或来自服务的更新进行更改的表单。
由于并发的可能性很大,我们必须选择一种能够帮助我们实现并发的策略。我个人非常喜欢 Redux,所以我的实现基于它,并根据自己的需求进行了调整。由于这是一个非常小的应用程序,我使用纯 RxJs 来实现状态管理。可以执行的操作包括:
- Init:设置 Web 客户端的初始状态,在每个客户端加载时触发。
- ClientConnected:每次客户端连接到服务时,所有客户端都会收到当前连接的客户端的更新列表。
- 数据:每当客户端连接时,服务都会以当前表单状态进行响应。
- PatchValue:当客户端通过直接与表单交互来更新表单时,它会将更改发送给服务。
- ValuePatched:当服务接收到状态变化时,它会将其广播给所有其他客户端。
对于此示例,表单数据非常简单,它仅包含标题和描述,两者均为字符串类型。
执行
首先要选择我们想要使用的技术。我是一名 Angular 开发者,因此我选择使用 Angular 作为 Web 客户端。由于 NestJs 非常酷,我决定用它来实现同步服务。最后,由于 Web 客户端和服务需要实时通信,Nx 可以非常有效地减少重复,并使用共享接口确保传递的消息类型安全。
注意:对于 Web 客户端,您可以使用任何 JS 框架,甚至是纯 JavaScript。服务端也一样,只要您有 Socket.IO 实现,就可以使用 Node 或任何您想要的框架。我使用了 Nx,只是因为我喜欢它,但您也可以跳过这部分。
我们将从生成 Nx 工作区开始。
- 运行命令
npx create-nx-workspace@latest realtime-form
angular-nest
在提示选项中选择工作区- 输入
web-client
应用程序名称 - 选择您喜欢的样式表格式(我总是使用 SASS)
- 进入
realtime-form
目录
将 Nx 与 NestJs 和 Angular 结合使用的一大优势是它们之间可以共享内容。让我们利用这一点,创建FormData
接口和ActionTypes
枚举。
转到/libs/api-interfaces/src/lib/api-interfaces.ts
并将其内容更改为:
export enum ActionTypes {
Data = '[Socket] Data',
ClientConnected = '[Socket] Client Connected',
ValuePatched = '[Socket] Value Patched',
PatchValue = '[Form] Patch Value',
Init = '[Init] Init'
}
export interface FormData {
title: string;
description: string;
}
现在我们可以从服务和 Web 客户端使用它们,因为它们是共享的,所以它可以作为两者之间的契约。
我们将从服务开始:
- 跑步
npm i --save @nestjs/websockets @nestjs/platform-socket.io
- 跑步
npm i --save-dev @types/socket.io
- 进入目录
/apps/api/src/app
- 创建一个名为的新目录
events
并移动到该目录 - 创建名为
events.gateway.ts
- 创建名为
events.module.ts
接下来您只需写入新文件的内容。
前往/apps/api/src/app/events/events.gateway.ts
:
import {
SubscribeMessage,
WebSocketGateway,
WebSocketServer
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
import { Logger } from '@nestjs/common';
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';
@WebSocketGateway()
export class EventsGateway {
connectedClients = [];
data = {};
@WebSocketServer()
server: Server;
private logger: Logger = new Logger('EventsGateway');
handleConnection(client: Socket) {
this.connectedClients = [...this.connectedClients, client.id];
this.logger.log(
`Client connected: ${client.id} - ${this.connectedClients.length} connected clients.`
);
this.server.emit(ActionTypes.ClientConnected, this.connectedClients);
client.emit(ActionTypes.Data, this.data);
}
handleDisconnect(client: Socket) {
this.connectedClients = this.connectedClients.filter(
connectedClient => connectedClient !== client.id
);
this.logger.log(
`Client disconnected: ${client.id} - ${this.connectedClients.length} connected clients.`
);
this.server.emit(ActionTypes.ClientConnected, this.connectedClients);
}
@SubscribeMessage(ActionTypes.PatchValue)
patchValue(client: Socket, payload: Partial<FormData>) {
this.data = { ...this.data, ...payload };
this.logger.log(`Patch value: ${JSON.stringify(payload)}.`);
client.broadcast.emit(ActionTypes.ValuePatched, payload);
}
}
如果你对这段代码感到困惑,别担心,我们相信 NestJs 会完成所有繁重的工作。你可以将每个方法视为对事件的响应;连接、断开连接和补丁值。
- 连接:更新已连接客户端的列表,将发生的事件记录到服务中,向所有当前连接的客户端发出新的connectedClients列表,并向客户端发出表单的当前状态。
- 断开连接:更新已连接客户端列表,将发生的事件记录到服务中,向所有当前连接的客户端发出新的connectedClients列表。
- PatchValue:更新表单的当前状态,将发生的事件记录到服务,将新状态广播到所有当前连接的客户端。
注意:this.server.emit 和 client.broadcast.emit 之间的区别在于,第一个将消息发送给所有客户端,而第二个将消息发送给除发送方之外的所有客户端。
现在让我们更新/apps/api/src/app/events/events.module.ts
文件:
import { Module } from '@nestjs/common';
import { EventsGateway } from './events.gateway';
@Module({
providers: [EventsGateway]
})
export class EventsModule {}
以及/apps/api/src/app/app.module.ts
文件:
import { Module } from '@nestjs/common';
import { EventsModule } from './events/events.module';
@Module({
imports: [EventsModule]
})
export class AppModule {}
我还删除了AppController
和AppService
文件。并apps/api/src/main.ts
用以下内容更新了文件:
import { NestFactory } from '@nestjs/core';
import { AppModule } from './app/app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const port = 3000;
await app.listen(port, () => {
console.log('Listening at http://localhost:' + port);
});
}
bootstrap();
现在是时候开始使用 Web 客户端了,请转到apps/web-client/src/app/app.component.html
:
<header>
<h1>Realtime Form</h1>
</header>
<main>
<form [formGroup]="form">
<fieldset>
<label class="form-control">
<span>Title: </span>
<input formControlName="title" />
</label>
<label class="form-control">
<span>Description: </span>
<textarea formControlName="description" rows="5"></textarea>
</label>
</fieldset>
</form>
<ng-container *ngIf="connectedClients$ | async as clients">
<h2>Clients ({{ clients.length }})</h2>
<ul>
<li *ngFor="let client of clients">{{ client }}</li>
</ul>
</ng-container>
</main>
为了确保它看起来就像我在开始时展示的那样,请转到/apps/web-client/src/app/app.component.scss
并将其内容替换为以下内容:
form {
width: 100%;
padding: 0.5rem;
max-width: 600px;
.form-control {
display: flex;
margin-bottom: 1rem;
& > span {
flex-basis: 20%;
}
& > input,
& > textarea {
flex-grow: 1;
}
}
}
使用以下命令安装 Angular 的 Socket IO 包npm install --save ngx-socket-io
不要忘记在 Web 客户端中注入ReactiveFormsModule
和。前往:SocketIoModule
AppModule
/apps/web-client/src/app/app.module.ts
import { BrowserModule } from '@angular/platform-browser';
import { NgModule } from '@angular/core';
import { ReactiveFormsModule } from '@angular/forms';
import { AppComponent } from './app.component';
import { SocketIoModule, SocketIoConfig } from 'ngx-socket-io';
const config: SocketIoConfig = {
url: 'http://192.168.1.2:3000',
options: {}
};
@NgModule({
declarations: [AppComponent],
imports: [BrowserModule, ReactiveFormsModule, SocketIoModule.forRoot(config)],
providers: [],
bootstrap: [AppComponent]
})
export class AppModule {}
接下来前往apps/web-client/src/app/app.component.ts
:
import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, merge } from 'rxjs';
import { scan, map } from 'rxjs/operators';
import { FormBuilder } from '@angular/forms';
import { Socket } from 'ngx-socket-io';
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';
import { State, reducer } from './core/state';
import {
ClientConnected,
Data,
ValuePatched,
Action,
Init
} from './core/actions';
import {
getPatchValueEffect,
getValuePatchedEffect,
getFormChangesEffect
} from './core/effects';
@Component({
selector: 'realtime-form-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.scss']
})
export class AppComponent implements OnInit {
// 1: Action dispatcher
private dispatcher = new BehaviorSubject<Action>(new Init());
actions$ = this.dispatcher.asObservable();
// 2: State stream
store$ = this.actions$.pipe(
scan((state: State, action: Action) => reducer(state, action))
);
// 3: Define all the selectors
connectedClients$ = this.store$.pipe(
map((state: State) => state.connectedClients)
);
data$ = this.store$.pipe(map((state: State) => state.data));
title$ = this.data$.pipe(map((state: Partial<FormData>) => state.title));
description$ = this.data$.pipe(
map((state: Partial<FormData>) => state.description)
);
// 4: Initialize the form
form = this.fb.group({
title: [''],
description: ['']
});
constructor(private socket: Socket, private fb: FormBuilder) {}
ngOnInit() {
// 5: Connect to all the socket events
this.socket.on(ActionTypes.ClientConnected, (payload: string[]) => {
this.dispatcher.next(new ClientConnected(payload));
});
this.socket.on(ActionTypes.Data, (payload: Partial<FormData>) => {
this.dispatcher.next(new Data(payload));
});
this.socket.on(ActionTypes.ValuePatched, (payload: Partial<FormData>) => {
this.dispatcher.next(new ValuePatched(payload));
});
// 6: Subscribe to all the effects
merge(
getPatchValueEffect(this.socket, this.actions$),
getValuePatchedEffect(this.form, this.actions$),
getFormChangesEffect(this.form, this.dispatcher)
).subscribe();
}
}
让我们回顾一下我刚才做的每件事:
1:动作调度器
我首先创建一个动作调度器和一个从动作流中可观察的对象,我使用 RxJs BehaviorSubject 和一个如下所示的初始动作:
// apps/web-client/src/app/core/actions/init.action.ts
import { ActionTypes } from '@realtime-form/api-interfaces';
export class Init {
type = ActionTypes.Init;
payload = null;
}
我还在Action
桶导入中创建了一个类型,以使其更易于使用:
// apps/web-client/src/app/core/actions/index.ts
import { Init } from './init.action';
export type Action = Init;
export { Init };
2:状态流
通过使用 scan 运算符,我们可以获取可观察对象的每一次发射,并保存一个内部状态,该状态会通过其回调的返回进行更新。使用 reducer 函数,该函数接受状态和动作,并以不可变的方式返回状态,这样我们就可以更安全地获取当前状态的流。
我创建了一个如下所示的减速器:
// apps/web-client/src/app/core/state/state.reducer.ts
import { ActionTypes } from '@realtime-form/api-interfaces';
import { State } from './state.interface';
import { Action } from '../actions';
import { initialState } from './initial-state.const';
export const reducer = (state: State, action: Action): State => {
switch (action.type) {
case ActionTypes.Init:
return { ...initialState };
case ActionTypes.ClientConnected:
return {
...state,
connectedClients: action.payload
};
case ActionTypes.Data:
return { ...state, data: action.payload };
case ActionTypes.PatchValue:
return { ...state, data: { ...state.data, ...action.payload } };
default:
return { ...state };
}
};
动作简要描述:
- Init:将状态设置为
initialState
const。 - ClientConnected:使用更新后的列表更新状态中的connectedClients。
- 数据:将状态的数据设置为连接时返回的值。
- PatchValue:使用有效载荷中的更改来修补数据。
界面State
如下:
// apps/web-client/src/app/core/state/state.interface.ts
import { FormData } from '@realtime-form/api-interfaces';
export interface State {
connectedClients: string[];
data: Partial<FormData>;
}
constinitialState
看起来像这样:
// apps/web-client/src/app/core/state/initial-state.const.ts
import { State } from './state.interface';
export const initialState = {
connectedClients: [],
data: {}
} as State;
我也在这里创建了一个桶导入,我有点喜欢它们。
export { initialState } from './initial-state.const';
export { State } from './state.interface';
export { reducer } from './state.reducer';
3:定义所有选择器
为了方便访问商店中的值,我创建了一组额外的可观察对象,它们基本上将状态映射到子状态,其工作方式类似于投影。
4:初始化表单
我刚刚使用 ReactiveForms 创建了一个非常非常简单的表单,如果您想了解更多信息,可以查看我的 ReactiveForms 系列。
5:连接所有套接字事件
正如我们刚才看到的,我们的服务可以发出三个事件,在这一步中,我们将监听这些事件并做出相应的响应。为了使其更清晰,我创建了一些动作创建器类。
// apps/web-client/src/app/core/actions/client-connected.action.ts
import { ActionTypes } from '@realtime-form/api-interfaces';
export class ClientConnected {
type = ActionTypes.ClientConnected;
constructor(public payload: string[]) {}
}
// apps/web-client/src/app/core/actions/data.action.ts
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';
export class Data {
type = ActionTypes.Data;
constructor(public payload: Partial<FormData>) {}
}
// apps/web-client/src/app/core/actions/value-patched.action.ts
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';
export class ValuePatched {
type = ActionTypes.ValuePatched;
constructor(public payload: Partial<FormData>) {}
}
并且不要忘记更新桶导入
// apps/web-client/src/app/core/actions/index.ts
import { Init } from './init.action';
import { Data } from './data.action';
import { ClientConnected } from './client-connected.action';
import { ValuePatched } from './value-patched.action';
export type Action = Init | Data | ClientConnected | ValuePatched;
export { Init, Data, ClientConnected, ValuePatched };
6:订阅所有效果
剩下的就是副作用了。让我们逐一看看:
当用户更新表单时,这些更改必须广播到所有其他客户端,为此我们需要将其发送到服务。我们可以这样做:
// apps/web-client/src/app/core/effects/patch-value.effect.ts
import { Action } from '../actions';
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn, filter, tap } from 'rxjs/operators';
import { ActionTypes } from '@realtime-form/api-interfaces';
import { Socket } from 'ngx-socket-io';
export const getPatchValueEffect = (
socket: Socket,
actions: Observable<Action>
) => {
return actions.pipe(
observeOn(asyncScheduler),
filter(action => action.type === ActionTypes.PatchValue),
tap(action => socket.emit(ActionTypes.PatchValue, action.payload))
);
};
注意:我使用
asyncScheduler
only 因为我想确保减速器始终是第一个。
当服务发出值已更改的消息,或在连接后发送当前表单状态时,我们必须做出相应的响应。在这两种情况下,我们已经将套接字事件映射到一个动作,现在我们只需要一个效果来为每个客户端本地更新表单。
// apps/web-client/src/app/core/effects/value-patched.effect.ts
import { Action } from '../actions';
import { Observable, asyncScheduler } from 'rxjs';
import { observeOn, filter, tap } from 'rxjs/operators';
import { ActionTypes } from '@realtime-form/api-interfaces';
import { FormGroup } from '@angular/forms';
export const getValuePatchedEffect = (
form: FormGroup,
actions: Observable<Action>
) => {
return actions.pipe(
observeOn(asyncScheduler),
filter(
action =>
action.type === ActionTypes.ValuePatched ||
action.type === ActionTypes.Data
),
tap(action => form.patchValue(action.payload, { emitEvent: false }))
);
};
最后,每当客户端与表单交互时,我们都希望向服务发出一条消息,将该更改传播到所有连接的客户端。
// apps/web-client/src/app/core/effects/form-changes.effect.ts
import { Action, PatchValue } from '../actions';
import { merge, BehaviorSubject } from 'rxjs';
import { debounceTime, map, tap } from 'rxjs/operators';
import { FormGroup } from '@angular/forms';
import { FormData } from '@realtime-form/api-interfaces';
export const getFormChangesEffect = (
form: FormGroup,
dispatcher: BehaviorSubject<Action>
) => {
const title$ = form
.get('title')
.valueChanges.pipe(map((title: string) => ({ title })));
const description$ = form
.get('description')
.valueChanges.pipe(map((description: string) => ({ description })));
return merge(title$, description$).pipe(
debounceTime(300),
tap((payload: Partial<FormData>) =>
dispatcher.next(new PatchValue(payload))
)
);
};
您可能注意到了一个新的PatchValue
动作,所以让我们创建它:
// apps/web-client/src/app/core/actions/patch-value.action.ts
import { ActionTypes, FormData } from '@realtime-form/api-interfaces';
export class PatchValue {
type = ActionTypes.PatchValue;
constructor(public payload: Partial<FormData>) {}
}
并更新桶的导入:
// apps/web-client/src/app/core/actions/index.ts
import { Init } from './init.action';
import { Data } from './data.action';
import { ClientConnected } from './client-connected.action';
import { ValuePatched } from './value-patched.action';
import { PatchValue } from './patch-value.action';
export type Action = Init | Data | ClientConnected | ValuePatched | PatchValue;
export { Init, Data, ClientConnected, ValuePatched, PatchValue };
因为我喜欢桶装进口,所以我创建了另一个桶装进口以实现效果:
// apps/web-client/src/app/core/effects/index.ts
export { getFormChangesEffect } from './form-changes.effect';
export { getPatchValueEffect } from './patch-value.effect';
export { getValuePatchedEffect } from './value-patched.effect';
现在您只需要在应用程序主目录中的不同终端中运行每个服务:
- 运行命令
ng serve
- 运行命令
ng serve api
结论
就是这样。我第一次做这件事真的很有挑战性,所以我尽量把每个步骤都讲得清晰易懂,希望你不会感到困惑。正如我之前提到的,这并非一个可以投入生产的实现,但确实是一个很好的起点。既然你已经知道如何解决这个问题了,别忘了有时解决方案可能更糟糕,在某些情况下,这可能会增加基础设施成本。
文章来源:https://dev.to/danmt/multiple-users-using-the-same-form-in-real-time-nx-nestjs-and-angular-2ck9