使用 PHP 处理十亿行!
您可能听说过“十亿行挑战”(1brc),如果您没有听说过,请查看Gunnar Morlings 的 1brc repo。
我之所以被吸引,是因为我的两个同事都参加了比赛,并且名列前茅。
PHP 并不以速度著称,但当我正在使用 PHP 分析器时,我想尝试一下,看看它的速度有多快。
第一个天真的方法
我克隆了代码库,并在 中创建了十亿行数据集measurements.txt
。之后,我开始构建第一个可以解决这一挑战的简单实现:
<?php
$stations = [];
$fp = fopen('measurements.txt', 'r');
while ($data = fgetcsv($fp, null, ';')) {
if (!isset($stations[$data[0]])) {
$stations[$data[0]] = [
$data[1],
$data[1],
$data[1],
1
];
} else {
$stations[$data[0]][3] ++;
$stations[$data[0]][2] += $data[1];
if ($data[1] < $stations[$data[0]][0]) {
$stations[$data[0]][0] = $data[1];
}
if ($data[1] > $stations[$data[0]][1]) {
$stations[$data[0]][1] = $data[1];
}
}
}
ksort($stations);
echo '{';
foreach($stations as $k=>&$station) {
$station[2] = $station[2]/$station[3];
echo $k, '=', $station[0], '/', $station[2], '/', $station[1], ', ';
}
echo '}';
这没什么特别的,只需打开文件,fgetcsv()
读取数据即可。如果尚未找到该站点,则创建它;否则,增加计数器,计算温度总和,并判断当前温度是低于还是高于最小值或最大值,并进行相应的更新。
一旦我把所有东西都整理好,我就会ksort()
将$stations
数组按顺序排列,然后回显列表并计算平均温度(总和/计数)。
在我的笔记本电脑上运行这个简单的代码需要25 分钟🤯
是时候进行优化并查看分析器了:
时间线可视化帮助我看到,这显然是 CPU 密集型的,脚本开始时的文件编译可以忽略不计,并且没有垃圾收集事件。
火焰图视图也很有用,它显示了我花费了 46% 的 CPU 时间fgetcsv()
。
fgets()
而不是fgetcsv()
第一个优化是使用 来手动fgets()
获取一行并按;
字符分割,而不是依赖于fgetcsv()
。这是因为fgetcsv()
它的功能远远超出了我的需求。
// ...
while ($data = fgets($fp, 999)) {
$pos = strpos($data, ';');
$city = substr($data, 0, $pos);
$temp = substr($data, $pos+1, -1);
// ...
此外,我$data[0]
还重构了$city
所有地方。$data[1]
$temp
仅用这一处修改,再次运行脚本,运行时间就缩减到了19 分 49 秒。从绝对值来看,这仍然很多,但同时,也降低了 21%!
火焰图反映了这一变化,切换到按行显示 CPU 时间也揭示了根框架中发生的情况:
我在第 18 行和第 23 行花费了大约 38% 的 CPU 时间,它们是:
18 | $stations[$city][3] ++;
| // ...
23 | if ($temp > $stations[$city][1]) {
第 18 行是循环中第一次访问$stations
数组,否则它只是一个增量,而第 23 行是一个比较,乍一看似乎没有什么昂贵的,但让我们做更多的优化,你就会明白这里花费的时间。
尽可能使用参考
$station = &$stations[$city];
$station[3] ++;
$station[2] += $temp;
// instead of
$stations[$city][3] ++;
$stations[$city][2] += $data[1];
这应该有助于 PHP 在每次访问数组时不搜索$stations
数组中的键,而是将其视为访问数组中“当前”站的缓存。
而且它确实有帮助,运行它只需要17 分 48 秒,又减少了 10%!
仅一次比较
在查看代码时,我偶然发现了这一部分:
if ($temp < $station[0]) {
$station[0] = $temp;
}
if ($temp > $station[1]) {
$station[1] = $temp;
}
如果温度低于最低温度,它就不能再高于最高温度,所以我这样做elseif
也许可以节省一些 CPU 周期。
顺便说一句:我对温度的顺序一无所知measurements.txt
,但根据该顺序,如果我首先检查其中一个,可能会有所不同。
新版本耗时 17 分 30 秒,又提升了约 2%。比抖动问题好了一些,但提升不大。
添加类型转换
PHP 被称为动态语言,我刚开始写软件时非常看重它,因为它可以减少一个需要担心的问题。但另一方面,了解类型有助于引擎在运行代码时做出更好的决策。
$temp = (float)substr($data, $pos+1, -1);
你猜怎么着?这个简单的转换使得脚本仅用13 分 32 秒就运行完毕,性能提升了 21%!
18 | $station = &$stations[$city];
| // ...
23 | } elseif ($temp > $station[1]) {
第 18 行仍然显示有 11% 的 CPU 时间花费,这是对数组的访问(在哈希映射中查找键,哈希映射是 PHP 中关联数组使用的底层数据结构)。
第 23 行的 CPU 时间从约 32% 下降到约 15%。这是因为 PHP 不再进行类型转换。在类型转换之前,$temp
//$station[0]
是$station[1]
,strings
因此 PHP 必须将它们转换为 ,float
才能在每次比较时进行比较。
JIT 怎么样?
PHP 中的 OPCache 在 CLI 中默认处于禁用状态,需要将opcache.enable_cli
设置更改为on
。JIT(作为 OPCache 的一部分)默认启用,但由于缓冲区大小设置为0
,因此实际上已被禁用,因此我将 设置opcache.jit-buffer-size
为某个值,最终选择了10M
。应用这些更改后,我使用 JIT 重新运行了脚本,看到它在以下时间内完成:
7分19秒🚀
所花的时间减少了45.9% !!
还有什么?
我已经将运行时间从一开始的 25 分钟缩短到了大约 7 分钟。令我惊讶的是,fgets()
读取一个 13 GB 的文件需要分配大约 56 GiB/m 的内存。似乎有些不对劲,所以我检查了的实现fgets()
,发现只要省略 的len
参数,就能节省很多内存分配fgets()
:
while ($data = fgets($fp)) {
// instead of
while ($data = fgets($fp, 999)) {
比较更改前后的配置文件得出以下结果:
你可能会认为这会带来很大的性能提升,但实际上只有约 1% 左右。这是因为这些分配很小,ZendMM 可以在 bin 中处理,而且速度非常快。
我们可以让它变得更快吗?
是的,我们可以!到目前为止,我的方法是单线程的,这也是大多数 PHP 软件的本质,但 PHP 确实通过并行扩展支持用户空间中的线程。
正如性能分析器清晰显示的那样,在 PHP 中读取数据是一个瓶颈。切换到fgetcsv()
并fgets()
手动拆分会有所帮助,但这仍然会耗费大量时间。因此,我们可以使用线程来并行读取和处理数据,然后再合并来自工作线程的中间结果。
<?php
$file = 'measurements.txt';
$threads_cnt = 16;
/**
* Get the chunks that each thread needs to process with start and end position.
* These positions are aligned to \n chars because we use `fgets()` to read
* which itself reads till a \n character.
*
* @return array<int, array{0: int, 1: int}>
*/
function get_file_chunks(string $file, int $cpu_count): array {
$size = filesize($file);
if ($cpu_count == 1) {
$chunk_size = $size;
} else {
$chunk_size = (int) ($size / $cpu_count);
}
$fp = fopen($file, 'rb');
$chunks = [];
$chunk_start = 0;
while ($chunk_start < $size) {
$chunk_end = min($size, $chunk_start + $chunk_size);
if ($chunk_end < $size) {
fseek($fp, $chunk_end);
fgets($fp); // moves fp to next \n char
$chunk_end = ftell($fp);
}
$chunks[] = [
$chunk_start,
$chunk_end
];
$chunk_start = $chunk_end;
}
fclose($fp);
return $chunks;
}
/**
* This function will open the file passed in `$file` and read and process the
* data from `$chunk_start` to `$chunk_end`.
*
* The returned array has the name of the city as the key and an array as the
* value, containing the min temp in key 0, the max temp in key 1, the sum of
* all temperatures in key 2 and count of temperatures in key 3.
*
* @return array<string, array{0: float, 1: float, 2: float, 3: int}>
*/
$process_chunk = function (string $file, int $chunk_start, int $chunk_end): array {
$stations = [];
$fp = fopen($file, 'rb');
fseek($fp, $chunk_start);
while ($data = fgets($fp)) {
$chunk_start += strlen($data);
if ($chunk_start > $chunk_end) {
break;
}
$pos2 = strpos($data, ';');
$city = substr($data, 0, $pos2);
$temp = (float)substr($data, $pos2+1, -1);
if (isset($stations[$city])) {
$station = &$stations[$city];
$station[3] ++;
$station[2] += $temp;
if ($temp < $station[0]) {
$station[0] = $temp;
} elseif ($temp > $station[1]) {
$station[1] = $temp;
}
} else {
$stations[$city] = [
$temp,
$temp,
$temp,
1
];
}
}
return $stations;
};
$chunks = get_file_chunks($file, $threads_cnt);
$futures = [];
for ($i = 0; $i < $threads_cnt; $i++) {
$runtime = new \parallel\Runtime();
$futures[$i] = $runtime->run(
$process_chunk,
[
$file,
$chunks[$i][0],
$chunks[$i][1]
]
);
}
$results = [];
for ($i = 0; $i < $threads_cnt; $i++) {
// `value()` blocks until a result is available, so the main thread waits
// for the thread to finish
$chunk_result = $futures[$i]->value();
foreach ($chunk_result as $city => $measurement) {
if (isset($results[$city])) {
$result = &$results[$city];
$result[2] += $measurement[2];
$result[3] += $measurement[3];
if ($measurement[0] < $result[0]) {
$result[0] = $measurement[0];
}
if ($measurement[1] < $result[1]) {
$result[1] = $measurement[1];
}
} else {
$results[$city] = $measurement;
}
}
}
ksort($results);
echo '{', PHP_EOL;
foreach($results as $k=>&$station) {
echo "\t", $k, '=', $station[0], '/', ($station[2]/$station[3]), '/', $station[1], ',', PHP_EOL;
}
echo '}', PHP_EOL;
这段代码做了几件事。首先,我扫描文件并将其拆分成\n
对齐的块(以便稍后使用fgets()
)。块准备好后,我启动$threads_cnt
工作线程,所有工作线程打开同一个文件,并定位到它们指定的块的起始位置,读取并处理数据直到块的结束,返回一个中间结果,然后在主线程中对其进行合并、排序和打印。
这种多线程方法只需:
1分35秒🚀
这就结束了吗?
不,当然不是。这个解决方案至少还有两点:
- 我在 Apple Silicon 硬件上的 MacOS 上运行此代码,当在 PHP 的 ZTS 版本中使用 JIT 时会崩溃,因此 1 分 35 秒的结果是没有 JIT 的,如果我可以使用它,速度可能会更快
- 我意识到我正在运行的 PHP 版本是
CFLAGS="-g -O0 ..."
根据我日常工作中的需求编译的🤦
我应该在一开始就检查这一点,所以我重新编译了 PHP 8.3 CFLAGS="-Os ..."
,我的最终数字(有 16 个线程)是:
🚀 27.7 秒🚀
这个数字绝对无法与您在原始挑战的排行榜上看到的数字相比,这是因为我在完全不同的硬件上运行了此代码。
这是包含 10 个线程的时间线视图:
最底层的线程是主线程,它等待工作线程的结果。一旦这些工作线程返回了中间结果,我们就可以看到主线程正在对所有内容进行合并和排序。我们也可以清楚地看到,主线程绝不是瓶颈。如果你想进一步优化它,可以专注于工作线程。
在这一路上我学到了什么?
每个抽象层都只是用 CPU 周期或内存来换取可用性/集成度。fgetcsv()
它超级易用,而且隐藏了很多内容,但这是有代价的。即使fgets()
对我们隐藏了一些东西,也让我们能够非常方便地读取数据。
向代码中添加类型将有助于语言优化执行或停止类型杂耍,这是您看不到的,但仍然需要 CPU 周期的代价。
JIT 非常棒,特别是在处理 CPU 受限问题时!
这绝对不是大多数 PHP 软件的本质,但由于并行化(使用ext-parallel
),我们可以显著降低数字。
结束了吗?
希望您和我一样享受阅读本文的乐趣。如果您想进一步优化代码,请随时抓住这个链接并留言,告诉我们您完成了多少。
[于 2024 年 3 月 18 日编辑添加以下内容]
性能提升更多!
这篇博文发布后,评论中出现了一些关于如何提高速度的想法,@xjakub提出了三条建议
删除isset()
我们可能不需要检查isset()
,我们可以“直接”创建对车站的引用,当车站不存在时即可NULL
。这意味着:即使城市存在(大多数情况如此),也能节省一次数组访问。
# before
if (isset($stations[$city])) {
$station = &$stations[$city];
// ..
# after
$station = &$stations[$city];
if ($station !== NULL) {
// ..
这可节省约 2.5% 的挂机时间!
不检查fgets()
返回值
读取文件的主循环当前如下所示:
while ($data = fgets($fp)) {
$chunk_start += strlen($data);
if ($chunk_start > $chunk_end) {
break;
}
// ..
$chunk_start > $chunk_end
由于每个线程只处理文件的一部分,因此增加了对 的检查,这是转向并行化后新增的。现在@xjakub提到,不再需要检查fgets()
返回值了,因为只要我们位于 和 之间$chunk_start
,它就始终会返回一个字符串$chunk_end
,这意味着我们可以将其作为循环中的表达式,然后直接读取而不进行检查。
while ($chunk_start < $chunk_end) {
$data = fgets($fp);
$chunk_start += strlen($data);
// ..
这一变化从循环中删除了一个分支,并导致挂钟时间再次下降约 2.7%!
fgets()
对比stream_get_line()
实际读取和存储$city
如下$temp
:
$data = fgets($fp);
$chunk_start += strlen($data);
$pos2 = strpos($data, ';');
$city = substr($data, 0, $pos2);
$temp = (float)substr($data, $pos2+1, -1);
我从来没有听说过stream_get_line()
它的行为几乎与它相同,fgets()
除了它允许你指定行尾分隔符!
$city = stream_get_line($fp, 99, ';');
$temp = stream_get_line($fp, 99, "\n");
$chunk_start += strlen($city) + strlen($temp) + 2;
$temp = (float)$temp;
这一改变又推迟了约15%的挂机时间!
fgets()
这是为什么呢?和 的实现stream_get_line()
非常接近,都使用了 PHP 流层。主要的变化在于我们不再需要fgets()
使用 将字符串(来自 )拆分为子字符串substr()
。额外的strlen()
调用可以忽略不计,因为 PHP 中的变量实际上是 zval,它保存的是字符串的长度。
那么我们这个 PHP 脚本的实际执行时间是多少呢?
@codemunky 出现在评论中,并在 Hetzner 的同一台 AX161 上运行了基准测试,Java 人员曾用它在它上面运行他们的实现。
最终结果(目前为止):12.76 秒🎉
又结束了吗?
我不知道,也许这里还有一些需要优化的地方,但是在设法花费约 83% 的挂钟时间之后stream_get_line()
,看起来我们已经实现了 PHP 流层允许的目标。
我们要么找到另一个绕过 PHP 流层并提供更直接访问文件系统的函数,要么尝试优化该层本身。
祝您黑客愉快!
文章来源:https://dev.to/realflowcontrol/processing-one-billion-rows-in-php-3eg0