探索异步 PHP
由于每个 Web 请求中需要执行更多操作的需求日益增加,异步编程已成为扩展 Web 应用程序的基础构建块。一个典型的例子就是在请求中发送电子邮件。
在许多 Web 应用程序中,当服务器上处理某些内容时,我们希望通过电子邮件通知人们,这通常是向第三方服务(如 SendGrid、Mailchimp 等)发出单独的 HTTP 请求。
当你需要一次性发送大量电子邮件时,这就会成为一个非常重要的例子。在 PHP 中,如果你想发送一封电子邮件,而 HTTP 处理需要 100 毫秒才能完成,那么发送数十封甚至数百封电子邮件会迅速增加请求的总时间。
当然,任何好的第三方电子邮件服务都会提供批量端点来否定这一点,但为了举例说明 - 假设您要发送 100 封电子邮件,并且每封都必须单独处理。
因此,我们需要做出一个决定:如何将电子邮件的处理移到一个单独的进程中,以免阻塞原始的 Web 请求?
这正是我们将在本文中探讨的内容,特别是探讨在 PHP 中解决这个问题的各种不同方法,无论是否需要新的基础架构。
使用 exec()
exec()是 PHP 中的一个原生函数,可用于执行外部程序并返回结果。在我们的例子中,它可以是一个发送电子邮件的脚本。此函数使用操作系统生成一个全新的(空白的,没有任何复制或共享内容的)进程,您可以将所需的任何状态传递给它。
我们来看一个例子。
<?php
// handle a web request
// record the start time of the web request
$start = microtime(true);
$path = __DIR__ . '/send_email.php';
// output to /dev/null & so we don't block to wait for the result
$command = 'php ' . $path . ' --email=%s > /dev/null &';
$emails = ['joe@blogs.com', 'jack@test.com'];
// for each of the emails, call exec to start a new script
foreach ($emails as $email) {
// Execute the command
exec(sprintf($command, $email));
}
// record the finish time of the web request
$finish = microtime(true);
$duration = round($finish - $start, 4);
// output duration of web request
echo "finished web request in $duration\n";
发送电子邮件.php
<?php
$email = explode('--email=', $argv[1])[1];
// this blocking sleep won't affect the web request duration
// (illustrative purposes only)
sleep(5);
// here we can send the email
echo "sending email to $email\n";
输出
$ php src/exec.php
finished web request in 0.0184
上述脚本显示,即使sleep
send_email.php 脚本中有一个阻塞函数调用,Web 请求仍然会在几毫秒内完成。
它不阻塞的原因是因为我们已经exec
通过在命令中包含来告知> /dev/null &
我们不想等待exec
命令完成以便我们可以得到结果,这意味着它可以在后台发生并且 Web 请求可以继续。
这样,Web 请求脚本仅负责运行脚本,而不是监视其执行和/或失败。
这是该解决方案的一个固有缺点,因为进程的监控落在了进程本身,而且无法重新启动。然而,这是一种在 PHP 应用程序中轻松实现异步行为的简单方法,无需太多努力。
exec
在服务器上运行命令,因此您必须谨慎处理脚本的执行方式,尤其是在涉及用户输入的情况下。使用 进行管理可能会很困难,exec
尤其是在您管理应用程序的扩展时,因为脚本很可能在处理外部 Web 请求的同一台机器上运行,因此,如果通过 生成数百或数千个新进程,最终可能会耗尽 CPU 和内存exec
。
pcntl_fork
pcntl_fork是一个低级函数,需要启用 PCNTL 扩展,它是一种在 PHP 中编写异步代码的强大但容易出错的方法。
pcntl_fork
将会 fork 或 clone 当前进程,并将其拆分为一个父进程和多个子进程(取决于调用次数)。通过检测进程 ID 或 PID,我们可以在父进程或子进程的上下文中运行不同的代码。
父进程将负责生成子进程,并等待生成的进程完成后才能完成。
在这种情况下,我们可以更好地控制进程如何退出,并且可以轻松编写一些逻辑来处理子进程失败时的重试。
现在,我们来看看以非阻塞方式发送电子邮件的用例示例代码。
<?php
function sendEmail($to, $subject, $message)
{
// Code to send email (replace with your email sending logic)
// This is just a mock implementation for demonstration purposes
sleep(3); // Simulating sending email by sleeping for 3 seconds
echo "Email sent to: $to\n";
}
$emails = [
[
'to' => 'john@example.com',
'subject' => 'Hello John',
'message' => 'This is a test email for John.',
],
[
'to' => 'jane@example.com',
'subject' => 'Hello Jane',
'message' => 'This is a test email for Jane.',
],
// Add more email entries as needed
];
$children = [];
foreach ($emails as $email) {
$pid = pcntl_fork();
if ($pid == -1) {
// Fork failed
die('Error: Unable to fork process.');
} elseif ($pid == 0) {
// Child process
sendEmail($email['to'], $email['subject'], $email['message']);
exit(); // Exit the child process
} else {
// Parent process
$children[] = $pid;
}
}
echo "running some other things in parent process\n";
sleep(3);
// Parent process waits for each child process to finish
foreach ($children as $pid) {
pcntl_waitpid($pid, $status);
$status = pcntl_wexitstatus($status);
echo "Child process $pid exited with status: $status\n";
}
echo 'All emails sent.';
在上面的例子中,pcntl_fork
我们可以 fork 当前进程,将父进程复制到新的子进程中,并等待执行完成。此外,在 fork 子进程发送电子邮件之后,父进程可以继续执行其他操作,最终确保子进程完成。
这是使用上一步,exec
我们的能力受到很大限制,因为脚本是完全独立的上下文,所以从整体角度来看无法进行监控。
由于每个子进程都在独立的内存空间中运行,并且不会影响其他进程,因此我们还实现了进程隔离。
通过跟踪进程 ID,我们可以有效地监控和管理执行流程。
以这种方式直接从 Web 请求(父进程)分叉请求的一个缺点是,通过等待子进程完成,这样做对原始请求的响应时间没有任何好处。
幸运的是,有一个解决方案,那就是将两者结合起来exec
,pcntl_fork
获得两全其美的效果,如下所示:
- Web 请求使用 exec() 来生成新的 PHP 进程
- 生成的进程将收到一批电子邮件列表
- 生成的进程成为父进程,因为它会分叉并单独发送每封电子邮件
这一切都可以在后台发生,而不是阻止原始请求。
让我们看一下如何实现这个功能:
<?php
$start = microtime(true);
$path = __DIR__ . '/pcntl_fork_send_email.php';
$emails = implode(',', ['joe@blogs.com', 'jack@test.com']);
$command = 'php ' . $path . ' --emails=%s > /dev/null &';
// Execute the command
echo "running exec\n";
exec(sprintf($command, $emails));
$finish = microtime(true);
$duration = round($finish - $start, 4);
echo "finished web request in $duration\n";
pctnl_fork_send_email.php
<?php
$param = explode('--emails=', $argv[1])[1];
$emails = explode(',', $param);
function sendEmail($to)
{
sleep(3); // Simulating sending email by sleeping for 3 seconds
echo "Email sent to: $to\n";
}
$children = [];
foreach ($emails as $email) {
$pid = pcntl_fork();
if ($pid == -1) {
// Fork failed
die('Error: Unable to fork process.');
} elseif ($pid == 0) {
// Child process
sendEmail($email);
exit(); // Exit the child process
} else {
// Parent process
$children[] = $pid;
}
}
echo "running some other things in parent process\n";
sleep(3);
// Parent process waits for each child process to finish
foreach ($children as $pid) {
pcntl_waitpid($pid, $status);
$status = pcntl_wexitstatus($status);
echo "Child process $pid exited with status: $status\n";
}
echo "All emails sent.\n";
尽管这个解决方案比较复杂,但它的优点在于,您可以设置一个单独的进程,其职责是运行和监视分叉进程,以便异步地完成工作。
AMPHP
amphp(异步多任务 PHP)是一个库的集合,允许您使用 PHP 构建快速、并发的应用程序。
2021 年 11 月发布的 PHP 8.1 提供了对Fibers的支持,Fibers实现了轻量级的合作并发模型。
现在我们对它的工作原理以及它对 PHP 程序的未来有何影响有了一点了解amphp
,让我们看一个例子:
<?php
require __DIR__ . '/../vendor/autoload.php'; // Include the autoload file for the amphp/amp library
use function Amp\delay;
use function Amp\async;
function sendEmail($to, $subject, $message)
{
delay(3000)->onResolve(function () use ($to) {
echo "Email sent to: $to\n";
});
}
$emails = [
[
'to' => 'john@example.com',
'subject' => 'Hello John',
'message' => 'This is a test email for John.',
],
[
'to' => 'jane@example.com',
'subject' => 'Hello Jane',
'message' => 'This is a test email for Jane.',
],
// Add more email entries as needed
];
foreach ($emails as $email) {
$future = async(static function () use ($email) {
$to = $email['to'];
$subject = $email['subject'];
$message = $email['message'];
sendEmail($to, $subject, $message);
});
// block current process by running $future->await();
}
echo "All emails sent.\n";
上面的脚本是一个非常简单的异步运行版本。它将使用给定的闭包异步创建一个新的fiber,并返回一个Future(对象)。
这是一个比您自己动手简单得多的版本,并且为您完成了繁重的工作,这对于构建应用程序至关重要,因为您不需要担心工作如何在内部排队 - 您只知道它是异步发生的。
队列和工作者
在 PHP 之外也存在针对此问题的解决方案,并且在 PHP 8.1 之前它可以被视为黄金标准,因为它独立于语言并且具有高度的可扩展性。
一段时间以来,使用Amazon SQS、RabbitMQ或Apache Kafka等队列已经成为被广泛接受的解决方案。
队列是一种基础设施,用于独立于应用程序运行工作线程,以异步方式处理任何工作。这并非没有风险或缺点,但经过了时间的考验。
让我们来看一个例子:
在此示例中,发送方通常是您现有的 Web 应用程序。
发送者.php
<?php
require 'vendor/autoload.php';
use Aws\Sqs\SqsClient;
// Initialize the SQS client
$client = new SqsClient([
'region' => 'us-east-1',
'version' => 'latest',
'credentials' => [
'key' => 'YOUR_AWS_ACCESS_KEY',
'secret' => 'YOUR_AWS_SECRET_ACCESS_KEY',
],
]);
// Define the message details
$message = [
'to' => 'john@example.com',
'subject' => 'Hello John',
'message' => 'This is a test email for John.',
];
// Send the message to SQS
$result = $client->sendMessage([
'QueueUrl' => 'YOUR_SQS_QUEUE_URL',
'MessageBody' => json_encode($message),
]);
echo "Message sent to SQS with MessageId: " . $result['MessageId'] . "\n";
工作者是运行代码来处理作业的额外部署。
worker.php
<?php
require 'vendor/autoload.php';
use Aws\Sqs\SqsClient;
// Initialize the SQS client
$client = new SqsClient([
'region' => 'us-east-1',
'version' => 'latest',
'credentials' => [
'key' => 'YOUR_AWS_ACCESS_KEY',
'secret' => 'YOUR_AWS_SECRET_ACCESS_KEY',
],
]);
// Receive and process messages from SQS
while (true) {
$result = $client->receiveMessage([
'QueueUrl' => 'YOUR_SQS_QUEUE_URL',
'MaxNumberOfMessages' => 1,
'WaitTimeSeconds' => 20,
]);
if (!empty($result['Messages'])) {
foreach ($result['Messages'] as $message) {
$body = json_decode($message['Body'], true);
// Process the message (send email in this case)
sendEmail($body['to'], $body['subject'], $body['message']);
// Delete the message from SQS
$client->deleteMessage([
'QueueUrl' => 'YOUR_SQS_QUEUE_URL',
'ReceiptHandle' => $message['ReceiptHandle'],
]);
}
}
}
function sendEmail($to, $subject, $message)
{
sleep(3); // Simulating sending email by sleeping for 3 seconds
echo "Email sent to: $to\n";
}
该解决方案由两部分组成:
- 发送者(将消息推送到 SQS 队列)
- 工作者(从队列接收消息并发送电子邮件)
它可以通过增加相对于任意数量的发送者发送的消息数量的工作者数量来扩展。
通过使用队列,工作者完全独立于发送者,并且可以用任何语言编写,因为发送者和工作者之间的通信是通过 JSON 消息进行的。
哪种解决方案最好?
在我们上面探讨的所有解决方案中,几乎不可能说出哪一个最适合您的应用程序,因为尽管它们都旨在解决使用 PHP 运行异步代码的问题,但实现方式却截然不同,并且具有不同的优点和缺点。
总结一下每个选项的几点:
执行()
- 异步运行 PHP 脚本最简单、最有效的方法
- 充满潜在的安全隐患,尤其是在用户输入方面
- 没有什么可以共享,既是祝福,也是诅咒
- 可能会导致现有服务器资源(CPU/内存)增加
pcntl_fork()
- 允许管理父/子进程以定制行为
- 可以抽象成更简单的 API 供您的应用程序使用
- 克隆当前进程可能会导致其他下游问题
AMPHP
- Fibers 用户需要 PHP 8.1
- 库已经抽象出了运行异步代码的“困难部分”
- 与其他更传统的方法相比,学习曲线更陡峭(理解 PHP 中的事件循环和多任务处理)
队列和工作者
- 独立于语言,可灵活适用于任何用例
- 引入分布式系统(从长远来看可能是好事也可能是坏事)
- 有许多解决方案和不同的队列提供商可以简化操作
结论
我想更深入地研究 PHP 中异步代码的所有不同可能性的主要原因是了解 PHP 8.1 中 Fibers 的引入如何(如果有的话)改变我们将来编写异步程序的方式。
有许多不需要 PHP 8.1 的解决方案已经过实践检验,但有趣的是看到 PHP 语言的发展方向与Golang和Elixir等语言竞争,它们都支持异步编程并且已经这样做了很多年。
最终,考虑到可扩展性和跨平台/跨语言支持,我可能仍然会采用队列/工作者方法 - 然而我认为随着时间的推移,我们可能会看到诸如AMPHP
功能更加丰富的库,并且使这个问题更容易解决,而无需引入新的基础设施。
要查看本博文中使用的代码示例,您可以在GitHub上找到它们。
文章来源:https://dev.to/jackmarchant/exploring-async-php-5b68