可观察的 Web Workers,深入探讨实际用例
在上一篇文章中,我们了解了observable-webworker
允许我们使用熟悉的可观察模式来构建和管理 Web Worker 以及线程间通信的库。在本文中,我们将开发一个应用程序,首先不使用 Web Worker,然后将其重构为使用 Web Worker,以演示 Web Worker 的强大功能和实用性。如果您还没有阅读上一篇文章,我建议您先阅读一下,因为它详细介绍了启动和运行 Web Worker 的所有先决条件以及 Web Worker 的背景知识,我将在此略过。
本文是一篇面向高级开发者的深度文章,所以请先喝点饮料,静下心来阅读。我假设读者对 Typescript 和 RxJS 有中高级的了解,所以如果你是初学者,这篇文章可能不太适合你。这里有一个目录,方便你随时回来阅读。
简短的
我们要构建的应用程序有几个要求,我将把它们列为验收标准,以供以后参考。
作为用户,我希望能够搜索从古腾堡计划获取的各种小说文本。我的搜索结果应该包含我的搜索词,并在匹配段落的上下文中高亮显示。即使我输入错误或拼写错误,我仍然应该获得预期的结果。
为了好玩,我将在BDD Gherkin 语法中详细说明标准
Feature: Find phrases in books by searching for them
Background:
Given I'm a user on the home page
And I have selected the book "Alice in Wonderland"
And I see a free text field in which I can type my search phrase
# basic search
Scenario
When I type "we’re all mad here"
Then I should see the first search result containing the text "‘Oh, you can’t help that,’ said the Cat: ‘we’re all mad here. I’m mad. You’re mad.’"
# multiple result search
Scenario
When I type "mad"
Then I should see search results with "mad" highlighted at least "10" times
# search with typos
Scenario
When I type "were all madd her"
Then I should see the first search result containing the text "‘Oh, you can’t help that,’ said the Cat: ‘we’re all mad here. I’m mad. You’re mad.’"
除了这些用户旅程要求之外,我们还有以下性能要求:
- 用户界面必须始终保持响应
- 用户每次击键都可以实时获得搜索结果
- 用户可以更换书籍,搜索立即开始返回他们上次搜索的结果
需求中需要特别注意的难点在于处理错别字和拼写错误的能力。这大大增加了问题的复杂性,因为它并不像查找段落中的子字符串那么简单;我们需要对候选子字符串进行评分,以找到最佳匹配。
数据流计划
首先,我们将整理一个简单的 typescript 文件来证明可观察的流程,然后将其构建到应用程序中。
// playground/observable-flow.ts
import { from, Observable, of } from 'rxjs';
import { scan, shareReplay, switchMap } from 'rxjs/operators';
/**
* First off, we create a quick enum of the books we will use as a demo. Later
* we will update the urls to be correct
*/
enum BookChoice {
ALICE_IN_WONDERLAND = 'http://some-url-to-alice-in-wonderland-text',
SHERLOCK_HOLMES = 'http://some-url-to-sherlock-holmes-text',
}
/**
* This observable will be something like a regular dropdown, emitted whenever
* the user changes their selection.
*/
const userBookSelection$ = of(BookChoice.ALICE_IN_WONDERLAND);
/**
* This observable represents the stream of search terms the user will enter
* into a text box. We `shareReplay(1)` so that subsequent subscribers will get
* the latest value
*/
const userSearchTerm$ = of(`we’re all mad here`).pipe(shareReplay(1));
/**
* This function will be responsible for fetching the content of the book given
* the enum. We're cheating a little by making the enum value the url to fetch.
* For now we will just pretend the url is the content of the book.
* @todo implement
*/
function getBookText(bookChoice: BookChoice): Observable<string> {
return of(bookChoice);
}
/**
* This function will be responsible for taking the search term and returning
* the stream of paragraphs found, as soon as they are found.
*
* For the purposes of quick testing, we've hardcoded as if two search results
* were found
*
* We will also likely extend this return type in future to handle highlighting
* the search phrase, but for now just the paragraph is sufficient for testing
* @todo implement
*/
function getSearchResults(
searchTerm: string,
bookText: string,
): Observable<string> {
return from([
searchTerm + ' (this will be the first result)',
searchTerm + ' (this will be the second result)',
]);
}
/**
* Here we take the user selected book stream and pipe it via switchMap to fetch
* the content of the book. We use switchMap() because we want to cancel the
* download of the book if the user switches to a different book to search
* before the book has finished downloading.
*
* Next we again switchMap() the result of the book content to the user search
* term observable so that if the user has changed books, once it is loaded we
* will cancel the processing of the current search term.
*
* Next we pass that stream of search terms to getSearchResults() which will
* itself be returning a stream of search results for that search string.
*
* Finally, we use a scan() operator to collate the stream of search results
* into an array so that we can present all results to the user, not just the
* most recent one
*/
const searchResults$ = userBookSelection$.pipe(
switchMap(selection => getBookText(selection)),
switchMap(bookText => {
return userSearchTerm$.pipe(
switchMap(searchTerm =>
getSearchResults(searchTerm, bookText).pipe(
scan((searchResults: string[], searchResult) => {
return [...searchResults, searchResult];
}, []),
),
),
);
}),
);
/**
* Last but not least, to check our logic, we subscribe to the observable and
* bind the console so we see output.
*/
searchResults$.subscribe(console.log);
好了,我们已经对要构建的内容有了概念,那么运行它会得到什么呢?我喜欢用它来进行ts-node
这种快速测试,所以直接运行就行了npx ts-node --skip-project playground/observable-flow.ts
(提示:--skip-project
之所以出现这种情况,是因为我当前所在的工作目录中有一个 tsconfig.json 文件,而该文件与运行普通的 nodejs 脚本不兼容。)
我们对上述文件的输出如下:
[ 'we’re all mad here (this will be the first result)' ]
[
'we’re all mad here (this will be the first result)',
'we’re all mad here (this will be the second result)'
]
好的,这是预料之中的——我们首先将一个结果放入数组中,然后将两个结果都放入数组中。成功了吗?嗯,有点成功,但我们真的希望在这个脚本中加入一些现实元素,这样我们才能看到可观察对象的行为是正确的。
因此,我们现在将编辑刚刚编写的脚本,添加一些实际的延迟和一些日志记录,以查看实际发生的情况:
// playground/observable-flow-test.ts
import { from, Observable, of } from 'rxjs';
import {
concatMap,
delay,
map,
pairwise,
scan,
shareReplay,
startWith,
switchMap,
timestamp,
} from 'rxjs/operators';
enum BookChoice {
ALICE_IN_WONDERLAND = 'http://alice.text',
SHERLOCK_HOLMES = 'http://sherlock.text',
}
/**
* This is a nice little custom operator that spaces out observables by a certain amount, this is super handy for
* emulating user events (humans are slooow!)
*/
function separateEmissions<T>(delayTime: number) {
return (obs$: Observable<T>): Observable<T> => {
return obs$.pipe(
concatMap((v, i) => (i === 0 ? of(v) : of(v).pipe(delay(delayTime)))),
);
};
}
/**
* For the book selection, we've piped to separateEmissions() with 4000ms
* defined, this means when subscribed the observable will immediately emit
* Alice in Wonderland content, then 4 seconds later emit Sherlock Holmes content.
*/
const userBookSelection$ = from([
BookChoice.ALICE_IN_WONDERLAND,
BookChoice.SHERLOCK_HOLMES,
]).pipe(separateEmissions(4000));
/**
* Slightly different strategy for this one - we're
* 1. Piping delayed user book selection to vary the search phrase depending on
* which book is selected
* 2. creating a streams of individual characters
* 3. spacing out the emissions by 100ms (this is the inter-keystroke time)
* 4. using scan to combine the previous characters
* The result is a pretty good simulation of the user typing the phrase at 10
* keys per second
*/
const userSearchTerm$ = userBookSelection$.pipe(
delay(200),
switchMap(book => {
const searchPhrase =
book === BookChoice.ALICE_IN_WONDERLAND
? `we’re all mad here`
: `nothing more deceptive than an obvious fact`;
return from(searchPhrase).pipe(
separateEmissions(100),
scan((out, char) => out + char, ''),
);
}),
shareReplay(1),
);
/**
* Here, we're guessing it will take about 200ms to download the book. We've
* also put in a console.log so we can make sure we're not going to try download
* the book on every keystroke!
* @param bookChoice
*/
function getBookText(bookChoice: BookChoice): Observable<string> {
console.log(`getBookText called (${bookChoice})`);
return of(bookChoice).pipe(delay(200));
}
/**
* With this function we're saying that the search takes (20 milliseconds * the
* length of the search string)
* This is actually totally unrealistic, but the linear variability will help
* when understanding the logs
*/
function getSearchResults(
searchTerm: string,
bookText: string,
): Observable<string> {
return from([' (first search result)', ' (second search result)']).pipe(
map(result => `${bookText} : ${searchTerm} : ${result}`),
delay(20 * searchTerm.length),
separateEmissions(200),
);
}
/**
* This is unchanged from before
*/
const searchResults$ = userBookSelection$.pipe(
switchMap(selection => getBookText(selection)),
switchMap(bookText => {
return userSearchTerm$.pipe(
switchMap(searchTerm =>
getSearchResults(searchTerm, bookText).pipe(
scan((searchResults: string[], searchResult) => {
return [...searchResults, searchResult];
}, []),
),
),
);
}),
);
/**
* Lastly we'd doing a few tricks to make the output express what happened
* better.
* The combination of timestamp and pairwise gives us a stream of when the
* emission happened and bundles it with the previous one so we can compare
* times to get a time taken value. The startWith(null) just gives us the
* startup time as a baseline.
* Lastly we use our old friend map() to output the data in a nice format for
* the logger.
*/
searchResults$
.pipe(
startWith(null),
timestamp(),
pairwise(),
map(([before, tsResult], i) => {
const timeSinceLast = (tsResult.timestamp - before.timestamp) / 1000;
return `${i} : Search Result: [${tsResult.value.join(
', ',
)}] (+${timeSinceLast} seconds)`;
}),
)
.subscribe(console.log);
getBookText called (http://alice.text)
0 : Search Result: [http://alice.text : w : (first search result)] (+0.431 seconds)
1 : Search Result: [http://alice.text : we : (first search result)] (+0.123 seconds)
2 : Search Result: [http://alice.text : we’ : (first search result)] (+0.128 seconds)
3 : Search Result: [http://alice.text : we’r : (first search result)] (+0.121 seconds)
4 : Search Result: [http://alice.text : we’re : (first search result)] (+0.121 seconds)
5 : Search Result: [http://alice.text : we’re all mad here : (first search result)] (+1.6 seconds)
6 : Search Result: [http://alice.text : we’re all mad here : (first search result), http://alice.text : we’re all mad here : (second search result)] (+0.204 seconds)
getBookText called (http://sherlock.text)
7 : Search Result: [http://sherlock.text : n : (first search result)] (+1.704 seconds)
8 : Search Result: [http://sherlock.text : no : (first search result)] (+0.125 seconds)
9 : Search Result: [http://sherlock.text : not : (first search result)] (+0.124 seconds)
10 : Search Result: [http://sherlock.text : noth : (first search result)] (+0.125 seconds)
11 : Search Result: [http://sherlock.text : nothi : (first search result)] (+0.122 seconds)
12 : Search Result: [http://sherlock.text : nothing more deceptive than an obvious fact : (first search result)] (+4.68 seconds)
13 : Search Result: [http://sherlock.text : nothing more deceptive than an obvious fact : (first search result), http://sherlock.text : nothing more deceptive than an obvious fact : (second search result)] (+0.201 seconds)
好吧,让我们深入研究一下这个问题。
我们马上就能看到,我们立即获取了《爱丽丝梦游仙境》这本书,而且再也不会获取了——这太完美了。
接下来,当开始输入短语时,我们首先得到一个结果,然后将另一个结果附加到结果中,很好。
稍后(以 开头的行5
),我们可以看到搜索结果速度变慢,这意味着我们获得结果的频率降低了,而且这些结果针对的是较长的搜索短语,而不仅仅是下一个字符——这正是我们所期望的,因为这意味着正在取消订阅搜索处理器功能,因为需要处理不同的数据。这是一个很好的机会来比较、和switchMap
的不同行为:switchMap()
mergeMap()
exhaustMap()
-
如果我们选择
mergeMap()
,我们会看到每个按键对应的所有搜索结果,但这些结果很可能相互重叠,令人困惑。此外,假设 CPU 在处理搜索时已饱和,总时间也会更长。 -
如果我们选择了
exhaustMap()
,我们将按照正确的顺序获得详尽的结果集(因此得名!),但是由于我们必须按顺序等待,因此总时间会更长。
在这种情况下,我认为switchMap()
这是正确的行为,因为用户在完成输入之前对中间搜索结果不感兴趣,并且我们通过立即取消不相关搜索结果的计算来提高效率。
回到输出分析,在标记的那一行,7
我们看到我们已切换到使用新的搜索词来获取 Sherlock Holmes。成功!
好的,我们现在对一般数据流非常有信心,让我们介绍一下用于评分段落的算法,然后再开始构建应用程序。
算法
考虑到我们的搜索词可能包含拼写错误,找到最佳段落并呈现给用户的算法并非易事。市面上有许多不同的模糊字符串匹配算法,每种算法都有各自的优缺点。鉴于我们的要求是拼写错误,而不是词序错误,计算编辑距离听起来是个不错的选择。
编辑距离是指将一个字符串更改为另一个字符串所需的修改次数。修改可以是插入、删除或替换。该距离只是修改的最小次数。
例如,和之间的距离hello
为helo
,1
因为需要插入一次。两个相同的字符串之间的距离为0
。
现在,我们的输入不太适合这个算法,因为我们正在比较一个短的搜索短语和一个长的段落——我们的距离评分主要体现的是搜索词和段落的长度差异。相反,我们将修改算法,忽略搜索字符串的起始和结束,这样我们就可以计算搜索字符串在段落中任何位置时的最小编辑距离。
最后,我们将根据搜索字符串的长度对编辑距离进行归一化,以计算相似度得分。这将用于按最佳匹配对段落进行排序。
为了避免这篇文章太长,我将跳过实现部分,但你可以自己深入研究它:https: //github.com/zakhenry/blog-posts/tree/master/posts/observable-workers-deep-dive/src/app/book-search/common/fuzzy-substring.ts
重要的是要知道有一个功能fuzzySubstringSimilarity
:
// src/app/book-search/common/fuzzy-substring.ts#L70-L73
export function fuzzySubstringSimilarity(
needle: string,
haystack: string,
): FuzzyMatchSimilarity {
返回FuzzyMatchSimilarity
:
// src/app/book-search/common/fuzzy-substring.ts#L1-L9
export interface FuzzyMatch {
substringDistance: number;
startIndex: number;
endIndex: number;
}
export interface FuzzyMatchSimilarity extends FuzzyMatch {
similarityScore: number;
}
我们将使用这种返回类型来对小说的段落进行评分,并返回匹配的开始和结束索引,以便我们可以在输出中突出显示它。
让我们根据上面定义的用户故事测试该算法:
import { fuzzySubstringSimilarity } from '../src/app/book-search/common/fuzzy-substring';
const similarity = fuzzySubstringSimilarity(
'were all madd her',
'‘Oh, you can’t help that,’ said the Cat: ‘we’re all mad here. I’m mad. You’re mad.’',
);
console.log(`Similarity: `, similarity);
我们在这里所做的就是将needle定义为搜索字符串,将haystack定义为预期的匹配句子。运行此文件,我们将得到以下内容:
Similarity: {
substringDistance: 2,
startIndex: 42,
endIndex: 59,
similarityScore: 0.8823529411764706
}
正如预期,我们看到的 substringDistance 为2
(为了得到预期的子字符串,我们需要添加一个'
并删除一个d
,两次更改,距离为2
)。我们还可以看到similarityScore
非常高(范围是 0-1),因此我们可以预期此段落的搜索得分会高于其他段落。
好了,我们现在有了排序好的算法,以及管理它的数据流。让我们开始构建一个应用程序吧!
应用
到目前为止,我们只是使用单个 typescript 文件来实现我们的想法,但现在我们将开始使用框架,因为这对于我们希望通过理解本文解决的现实世界问题来说更加现实。
我将使用 Angular,但如果您更喜欢其他优秀的框架,也不用担心,因为本文与框架无关。如果您已经读到这里,可以肯定地说,您已经掌握了框架开发的基础知识,所以我就略过这一点。
ng new observable-workers-deep-dive
ng generate component book-search
好的,我们已经搭建了一个基本的框架框架,考虑到我们试图测试直接使用可观察对象和将可观察对象与 Web Worker 结合使用的区别,在构建应用程序时,我们将尝试将通用功能提取到一个地方。这样,在性能测试期间,尽可能多的变量都可以控制,因为 Worker 和主线程策略将使用完全相同的核心算法代码。
常用功能
首先,我们将构建一个实用函数来处理模糊匹配评分段落流:
// src/app/book-search/common/book-search.utils.ts#L8-L25
interface SearchMatch {
paragraph: string;
paragraphNumber: number;
searchMatch: FuzzyMatchSimilarity;
}
function getSearchResults(
searchTerm: string,
paragraphs: string[],
): Observable<SearchMatch> {
return from(paragraphs).pipe(
observeOn(asyncScheduler),
map((paragraph, index) => {
const searchMatch = fuzzySubstringSimilarity(searchTerm, paragraph);
return { searchMatch, paragraph, paragraphNumber: index };
}),
);
}
这里有几点非常重要的事项需要注意。我们将段落数组转换为可观察的独立段落流from()
。
之后我们使用observeOn(asyncScheduler)
- 这对于应用程序的响应速度至关重要。本质上,它的作用是将from()
可观察对象的发射从同步重新安排为异步。这使得我们流的订阅者可以断开与段落流的连接,而无需计算整本书的模糊子字符串分数。当用户输入更多字符导致搜索字符串无效时,这将允许我们丢弃部分搜索结果。
最后,我们map()
进入计算函数,并将段落索引号附加到输出 - 我们稍后将使用它来计算计算的百分比。
接下来我们的常用函数是累积函数,它将
- 获取我们的搜索结果流
- 将它们整理成数组
- 按分数对结果数组进行排序
- 取前十名结果
- 将结果段落切分为不匹配的文本和匹配的文本
// src/app/book-search/common/book-search.utils.ts#L27-L77
export interface MatchingParagraph {
before: string;
match: string;
after: string;
score: number;
}
export interface SearchResults {
paragraphs: MatchingParagraph[];
searchedParagraphCount: number;
paragraphCount: number;
}
function accumulateResults(paragraphCount: number) {
return (obs$: Observable<SearchMatch>): Observable<SearchResults> => {
return obs$.pipe(
scan((searchResults: SearchMatch[], searchResult: SearchMatch) => {
searchResults.push(searchResult);
return searchResults;
}, []),
startWith([]),
map(
(searchMatches: SearchMatch[]): SearchResults => {
const last = searchMatches[searchMatches.length - 1];
return {
searchedParagraphCount: last ? last.paragraphNumber : 0,
paragraphCount,
paragraphs: searchMatches
.sort(
(a, b) =>
b.searchMatch.similarityScore - a.searchMatch.similarityScore,
)
.slice(0, 10)
.map(({ searchMatch, paragraph }) => {
return {
score: searchMatch.similarityScore,
match: paragraph.substring(
searchMatch.startIndex,
searchMatch.endIndex,
),
before: paragraph.substring(0, searchMatch.startIndex),
after: paragraph.substring(searchMatch.endIndex),
};
}),
};
},
),
);
};
}
尽管行数很多,但这里没有什么特别复杂的东西,但是有几点需要注意:
- 使用运算
scan()
符是为了让我们在结果传入时获得流式传输的结果,如果我们想等待搜索完成,我们将使用reduce()
(rxjs 运算符,而不是Array.prototype.reduce
) - 我们使用
startWith([])
以便订阅者在订阅时立即获得空结果集
最后,我们将这两个函数合并为一个函数,该函数接受一个搜索词和一本书,并返回累积的搜索结果流。
// src/app/book-search/common/book-search.utils.ts#L79-L87
export function getAccumulatedSearchResults(
searchTerm: string,
bookText: string,
): Observable<SearchResults> {
const paragraphs = bookText.split('\n\n');
return getSearchResults(searchTerm, paragraphs).pipe(
accumulateResults(paragraphs.length),
);
}
服务
好了,所有完成繁重工作的基本功能都整理好了,我们差不多快进入尾声了!让我们创建一个服务来包装这些逻辑,稍后我们会在组件中使用它。
// src/app/book-search/main-thread/book-search.service.ts#L11-L44
@Injectable({
providedIn: 'root',
})
export class BookSearchService {
constructor(private http: HttpClient) {}
public search(
bookSelection$: Observable<BookChoice>,
searchTerm$: Observable<string>,
): Observable<SearchResults> {
return this.processSearch(bookSelection$, searchTerm$).pipe(
auditTime(1000 / 60), // emit results at a maximum of 60fps
share(),
);
}
protected processSearch(
url$: Observable<string>,
search$: Observable<string>,
): Observable<SearchResults> {
const sharedSearchTerm$ = search$.pipe(shareReplay(1));
return url$.pipe(
switchMap(url => this.http.get(url, { responseType: 'text' })),
switchMap(bookText => {
return sharedSearchTerm$.pipe(
switchMap(searchTerm => {
return getAccumulatedSearchResults(searchTerm, bookText);
}),
);
}),
);
}
}
通过这项服务,我们HttpClient
在构造函数中注入一个;这使我们能够获取书籍内容。接下来是一个我们将在组件中使用的公共方法。它需要两个流:
- 第一个是
BookChoice
。回想一下,这是一个字符串枚举,其值是书籍的 URL - 第二个是搜索短语本身,用于与书中的段落进行模糊匹配
我们调用一个本地processSearch
方法,该方法只需获取 url 并将其通过管道传输到switchMap()
获取内容的方法,然后依次通过管道传输到搜索词可观察对象,最后我们打开该可观察对象来调用getAccumulatedSearchResults
我们之前构建的方法。
回到search()
方法中,我们将结果通过管道传输processSearch()
,并首先使用auditTime()
运算符。这将数据输出速率限制为每秒 60 帧,否则我们可能会得到每秒数万帧的数据SearchResults
,当我们尝试在 DOM 中显示结果时,这将完全超出框架的变化检测策略。
最后,我们在搜索结果中添加一个share()
,因为我们不希望多个订阅者多次触发搜索结果的计算。
接下来让我们定义组件逻辑以将此服务连接到用户输入。
成分
我们的组件要求非常简单——它需要为书籍选择和搜索词提供表单控件,然后输出搜索结果。
// src/app/book-search/main-thread/book-search.component.ts#L13-L59
@Component({
selector: 'app-book-search',
templateUrl: './book-search.component.html',
styleUrls: ['./book-search.component.css'],
changeDetection: ChangeDetectionStrategy.OnPush,
})
export class BookSearchComponent {
public componentName = 'Main thread search';
public bookChoices = [
{
url: BookChoice.ALICE_IN_WONDERLAND,
name: 'Alice in Wonderland',
},
{
url: BookChoice.SHERLOCK_HOLMES,
name: 'Sherlock Holmes',
},
];
public bookSelectionFormControl = new FormControl(null);
public userBookSelection$: Observable<BookChoice> = this
.bookSelectionFormControl.valueChanges;
public searchTermFormControl = new FormControl(null);
public userSearchTerm$: Observable<string> = this.searchTermFormControl
.valueChanges;
private searchResults$: Observable<
SearchResults
> = this.bookSearchHandler.search(
this.userBookSelection$,
this.userSearchTerm$,
);
public searchResultParagraphs$: Observable<
MatchingParagraph[]
> = this.searchResults$.pipe(map(result => result.paragraphs));
public searchResultProgress$: Observable<
[number, number]
> = this.searchResults$.pipe(
map(result => [result.searchedParagraphCount, result.paragraphCount]),
);
constructor(private bookSearchHandler: BookSearchService) {}
}
此组件的兴趣点:
- 我们
FormControl
为图书选择构建一个新的,然后立即设置一个可观察对象来观察valueChanges
该控件 - 我们对搜索词也做了同样的事情
- 接下来,我们使用前两个可观察对象来构造
private searchResults$: Observable<SearchResults>
,它调用我们之前定义的服务 - 最后两个公共成员观察
searchResults$
,第一个提取匹配的段落,第二个提取搜索进度信息
接下来我们看看这将如何在模板中显示:
<!-- src/app/book-search/main-thread/book-search.component.html -->
<h2>{{ componentName }}</h2>
<select [formControl]="bookSelectionFormControl">
<option *ngFor="let book of bookChoices" [value]="book.url">
{{ book.name }}
</option>
</select>
<input type="text" [formControl]="searchTermFormControl" />
<span *ngIf="searchResultProgress$ | async as searchResultProgress">
Progress: {{ searchResultProgress[0] }} / {{ searchResultProgress[1] }} ({{
searchResultProgress[0] / searchResultProgress[1] | percent
}})
</span>
<ng-container *ngFor="let matchingParagraph of searchResultParagraphs$ | async">
<hr />
<blockquote>
<span>{{ matchingParagraph.before }}</span>
<strong>{{ matchingParagraph.match }}</strong>
<span>{{ matchingParagraph.after }}</span>
<footer>Score: {{ matchingParagraph.score }}</footer>
</blockquote>
</ng-container>
漂亮简单的模板,我们有
<select>
管理图书选择的基础- 然后
<input>
管理用户搜索 - 然后
<span>
输出搜索结果进度 - 最后重复
<blockquote>
输出搜索结果
请注意,我们正在使用AsyncPipe
来管理所有订阅 - 这极大地简化了组件逻辑,并允许我们使用OnPush
变更检测策略作为AsyncPipe
管理标记来检查组件。
终于,我们到了,让我们测试一下。
主线程测试
好的,运行应用后,我们会看到两个控件。我们选择“爱丽丝梦游仙境”,然后输入那个拼写错误的字符串。"were all madd her"
好的!很好——我们成功获取了搜索结果,第一个结果就是预期的匹配:
were all madd her
Progress: 900 / 901 (100%)
‘Oh, you can’t help that,’ said the Cat: ‘we’re all mad here. I’m mad. You’re mad.’
Score: 0.8823529411764706
然而,应用程序的响应速度非常糟糕——那个动图并没有卡顿,只是在输入第一个字符后就卡住了,然后所有结果在几秒钟后才显示出来。
幸好我们有一个神奇的工具——Web Workers,我们可以用它来释放这个过度饱和的主线程,让它只处理 UI 操作,并将昂贵的搜索结果计算放在一个单独的线程中。
工人
那么,让我们创建一个工人
ng generate web-worker book-search
这会将book-search.worker.ts
文件放入我们的应用程序中:
/// <reference lib="webworker" />
addEventListener('message', ({ data }) => {
const response = `worker response to ${data}`;
postMessage(response);
});
我们不想要任何样板,因此我们将删除它并实现processSearch
我们之前在服务中创建的方法的自己的工作版本:
// src/app/book-search/worker-thread/book-search.worker.ts
import { DoWork, ObservableWorker } from 'observable-webworker';
import { Observable } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import {
distinctUntilChanged,
map,
shareReplay,
switchMap,
} from 'rxjs/operators';
import {
getAccumulatedSearchResults,
SearchResults,
WorkerInput,
} from '../common/book-search.utils';
@ObservableWorker()
export class BookSearchWorker implements DoWork<WorkerInput, SearchResults> {
public work(input$: Observable<WorkerInput>): Observable<SearchResults> {
const url$ = input$.pipe(
map(({ url }) => url),
distinctUntilChanged(),
);
const searchTerm$ = input$.pipe(
map(({ searchTerm }) => searchTerm),
distinctUntilChanged(),
shareReplay(1),
);
return url$.pipe(
switchMap(url => ajax({ url, responseType: 'text' })),
map(result => result.response),
switchMap(bookText => {
return searchTerm$.pipe(
switchMap(searchTerm =>
getAccumulatedSearchResults(searchTerm, bookText),
),
);
}),
);
}
}
这看起来应该很熟悉,因为它实际上只是包中processSearch
一个@ObservableWorker()
- 装饰类中的一个函数observable-webworker
。唯一的区别在于,该方法的输入work()
是一个单独的流,Observable<WorkerInput>
我们将其拆分并分别观察。
从那里开始它本质上是相同的,尽管我们确实使用 rxjsajax()
方法而不是HttpClient
因为Injector
在工作者上下文中不可用。
现在由主线程类来管理这个工作线程:
// src/app/book-search/worker-thread/book-search-worker.service.ts
import { Injectable } from '@angular/core';
import { fromWorker } from 'observable-webworker';
import { combineLatest, Observable } from 'rxjs';
import { map } from 'rxjs/operators';
import { BookSearchService } from '../main-thread/book-search.service';
import { SearchResults, WorkerInput } from '../common/book-search.utils';
@Injectable({
providedIn: 'root',
})
export class BookSearchWorkerService extends BookSearchService {
protected processSearch(
url$: Observable<string>,
search$: Observable<string>,
): Observable<SearchResults> {
const input$: Observable<WorkerInput> = combineLatest(url$, search$).pipe(
map(([url, searchTerm]) => ({ searchTerm, url })),
);
return fromWorker(
() => new Worker('./book-search.worker', { type: 'module' }),
input$,
);
}
}
因为我们已经实现了search()
逻辑,所以我们只需扩展该类并在新类中BookSearchService
覆盖实现。processSearch()
BookSearchWorkerService
在实际情况下,您可能只会替换processSearch
方法,但在本文中我们将提供两种实现,以便我们稍后可以并排比较它们。
我们的实现processSearch
非常简单,我们将combineLatest
两个输入流转换为 Worker 所需的格式。然后使用fromWorker()
from 方法'observable-webworker'
包装 a 的构造new Worker
,并将输入流作为第二个参数传递。
现在,当我们试图保留旧的行为时,我们需要一个新组件来展示这个闪亮的新工人支持的服务:
// src/app/book-search/worker-thread/book-search-worker.component.ts
import { ChangeDetectionStrategy, Component } from '@angular/core';
import { BookSearchWorkerService } from './book-search-worker.service';
import { BookSearchComponent } from '../main-thread/book-search.component';
import { BookSearchService } from '../main-thread/book-search.service';
@Component({
selector: 'app-book-search-worker',
templateUrl: '../main-thread/book-search.component.html',
styleUrls: ['../main-thread/book-search.component.css'],
changeDetection: ChangeDetectionStrategy.OnPush,
providers: [
{ provide: BookSearchService, useClass: BookSearchWorkerService },
],
})
export class BookSearchWorkerComponent extends BookSearchComponent {
public componentName = 'Worker thread search';
}
这里没什么事!我们所做的就是扩展BookSearchComponent
,重用它的模板和样式,并提供新的BookSearchWorkerService
而不是默认的BookSearchService
。
现在app.component.html
我们可以在模板中插入这个新组件:
<!-- src/app/app.component.html -->
<app-book-search></app-book-search>
<app-book-search-worker></app-book-search-worker>
所以现在我们有两种图书搜索实现 - 一个在主线程中,另一个由工作线程服务支持。
工作线程测试
让我们测试新的工作组件!
哇!性能怎么样?!虽然我不太会说这是最好的用户体验,但你确实能从中看到 Web Worker 的强大之处。
此外,在这个演示中,您可以真正体会到我们不得不处理搜索结果的原因auditTime()
,因为这里的 DOM 更新频率非常高。事实上,如果您注释掉该函数,您会发现 Web Worker 实现的性能会显著下降,因为它以如此高的频率发出搜索结果,以至于对 Angular 的变更检测周期造成了背压,从而导致整体性能下降。这对我来说是一个真正的教训,希望您也一样——
Web Workers 可以使您的计算输出非常快,以至于在 DOM 中显示结果可能会成为主要的整体瓶颈
视觉比较固然很好,但让我们好好看看这两种策略的表现有何不同
表现
为了测试相对性能,我会"There is nothing more deceptive than an obvious fact"
在《福尔摩斯》这本书里搜索这个短语。我会先使用常规主线程策略执行三次,每次都清除输入,然后再使用 Web Worker 策略执行同样的操作。
从这些图中可以清楚地看出使用 Web Worker 带来的差异——主线程(图中黄色部分)在整整 15 秒内都保持在 100% 的利用率,在此期间重绘会受到很大影响。而使用 Web Worker 策略时,主线程的利用率峰值几乎不会达到 50%,实际上在此期间大部分时间都处于空闲状态。
值得注意的是,此时整体持续时间大致相同,并且在某些情况下,预计工作策略的整体速度可能会略慢一些。这是因为该策略需要启动一个新线程,而线程间数据的结构化复制并非零成本。
另一个需要考虑的问题是,我们没有充分利用 Web Worker 的强大功能——我们正在运行的计算任务可以很容易地分解成多个Worker 执行的较小任务,并使用线程池策略进行处理。这是另一篇文章的主题,我可能会在下一篇文章中讨论。
包起来
你已经到达终点了!这恐怕有点像一场马拉松,但要讲的内容实在太多了。总而言之,我们概述了一个真实场景:用户希望对一本小说进行稳健的全文搜索;然后,我们提出了一个数据流策略,接着提出了一个算法,并将其构建成一个应用程序;之后,我们进行了重构,使用了可观察的 Web Worker 策略;最后,我们做了一些性能指标来证明 Web Worker 的实用性。呼!
所有代码都在 Github 上,欢迎随时 clone 并试用。如果您发现任何错误或有改进空间,请提交 issue 或 pull request,我也在学习!
在本系列的下一篇中,我将演示如何使用线程池进一步提升此应用程序的性能,或者实现一个新的实际用例,用于执行密集的图像处理,这将演示在线程之间传递大量数据的高性能技术。如果您有偏好,请告诉我!
再次感谢您在这里关注我,我希望这篇文章对您有所帮助。
照片由 Annie Spratt 在 Unsplash 上拍摄
链接已发布 https://dev.to/zakhenry/observable-web-workers-a-deep-dive-into-a-realistic-use-case-4042