使用 Golang 构建强大的 Webhook 服务:综合指南

2025-06-09

使用 Golang 构建强大的 Webhook 服务:综合指南

介绍

在软件工程中,某些概念(例如 Webhook)在实施之前需要谨慎的架构决策。无论您是否需要 Webhook,它都会给您的系统增加另一层复杂性,因此您必须谨慎处理它们,以确保其可靠性和价值。正因如此,许多企业更倾向于使用外部服务来处理向客户交付 Webhook 的服务。

如果您是一名软件工程师,了解 Webhook 的底层工作原理并自行构建一些 Webhook 将会是一项非常有价值的练习。在本文中,我们将探讨如何使用 Golang 为一个虚拟的支付网关构建一个 Webhook 服务,并提供 API 支持。

当有人在支付网关上发出付款请求时,我们会发送响应,但我们也会联系用 Golang 编写的 Webhook 服务来发送 Webhook 请求。我们将使用 Redis 通道在 Golang 服务与使用 Flask 构建的 API 之间进行通信。该通道将发送数据并指示 Golang 服务向有效负载中传递的 URL 发送请求(Webhook)。

在本综合指南中,我们将深入研究排队、webhook、goroutines 和指数退避等技术概念,并提供构建强大的 Golang webhook 服务器的示例和见解。

如何构建 webhook 服务?

在开始编码之前,我们需要讨论一下构建 webhook 服务的方法。首先,让我们了解一下什么是 webhook 以及它的工作原理。

什么是 webhook?

Webhook 是由特定事件触发的,从一个系统自动发送到另一个系统的消息。它们用于在事件发生时实时通知其他系统,而无需持续轮询。在支付网关中,当您发出支付请求时,可以使用交易 ID 或引用轮询到端点以获取更新状态。这种方法有其自身的缺点,因为您可能会受到请求数量的限制(节流),并且可能会出现服务器超时等情况。

下图说明了轮询的操作以及可能发生的情况,例如节流和服务器超时:

以下是对上图的描述:

  1. 客户端请求付款状态:客户端反复向支付网关服务器请求付款状态。

  2. 服务器响应付款状态:服务器响应当前付款状态。

  3. 达到节流限制:在达到一定数量的请求之后,服务器可能会限制客户端,限制进一步的请求。

  4. 服务器超时:如果服务器无法在特定时间段内响应,则可能导致超时错误。为了应对节流和服务器超时问题,企业为了提高可靠性和速度,结合允许客户端轮询和接收 Webhook。如果在特定时间后仍未收到 Webhook,客户端可以开始轮询以检索状态。这不仅为客户提供了多种获取付款更新的方式,也有助于企业保持更可靠、更稳健且更方便开发人员使用。

下图展示了首先使用 webhook,然后在 webhook 未送达时回退到轮询的过程:

以下是对上图的描述:

  1. 客户端请求付款:客户端向服务器发起付款请求。

  2. 服务器发起付款:服务器确认付款发起并开始处理。

  3. 服务器发送 Webhook:服务器尝试向客户端发送包含付款状态的 webhook。

  4. 未收到 Webhook:如果在一定时间后未将 Webhook 传送给客户端,则会记录此失败。

  5. 客户端开始轮询:客户端注意到 webhook 不存在,开始轮询服务器以获取付款状态。

  6. 服务器响应付款状态:服务器以付款状态响应轮询请求。

  7. 回退到轮询:整个过程演示了一种回退机制,如果未收到 webhook,客户端可以诉诸轮询。

这种方法确保客户端有多种方式获取付款更新,从而使系统更加可靠、强大且对开发人员更友好。

大多数情况下,Webhook 是向客户端提供的回调 URL 发出的 HTTP POST 请求。当系统 A 中发生指定事件时,它会触发向系统 B 中特定 URL 发出的 HTTP POST 请求。该请求通常包含一个包含事件相关信息的有效负载。然后,系统 B 的服务器会处理该请求并采取适当的措施。

现在我们对 webhook 有了更多的了解,让我们来讨论一下设计 webhook 服务所面临的挑战。

设计 Webhook 服务的困难

设计 webhook 服务是一项复杂的任务,需要仔细考虑各种因素,以确保可扩展性可靠性安全性。以下是对一些关键挑战的深入探讨:

  • 可扩展性:处理大量请求是 Webhook 服务面临的常见挑战。系统必须设计为能够水平扩展以适应负载,确保能够同时处理大量 Webhook 且性能不下降。实现并发对于同时管理多个 Webhook 至关重要。这需要一个能够高效处理并发请求的健壮并发模型。

  • 可靠性:管理 webhook 负载队列对于确保数据不丢失以及每个 webhook 都能及时处理至关重要。实施可靠的队列系统有助于维护 webhook 的顺序和完整性。然而,webhook 有时会无法投递。这种情况时有发生。必须建立强大的重试机制来处理失败的 webhook 投递。这通常需要实施指数退避策略,以确保接收系统不会因快速连续的重复尝试而不堪重负。

  • 安全性:验证有效负载可确保其符合预期格式且不包含恶意内容。实施严格的验证规则有助于防范潜在的安全风险。加密通过 Webhook 传输的数据可增加额外的安全保障,确保敏感信息在传输过程中得到保护。

  • 可维护性:全面的日志记录对于跟踪 Webhook 服务的行为、辅助调试以及洞察系统性能至关重要。实施详细的日志记录策略有助于监控系统,并且在识别和解决问题方面非常有价值。为了简化本文,我们将重点介绍应用程序的可扩展性和可靠性。安全性至关重要,这可以在另一篇文章中讨论。不过,您可以查看这篇关于如何为 Webhook 添加安全性的文章。

回到要开发的应用程序,我们将使用 Golang。但为什么呢?让我们讨论一下我们案例中的优缺点,以及为什么它是一种可靠的技术选择。

为什么要使用 Golang?

在构建 webhook 服务时,Golang 在并发性、可扩展性、性能和强类型方面的优势使其成为一个可靠的选择。Golang 最有趣的特性是 goroutine 和队列,它们能够帮助我们。让我们深入了解这些特性。

Goroutines:Golang 的并发能力增强版

Goroutine可以比作增强版的线程,但要轻量得多。它们允许函数与其他函数并发运行,从而实现高效的多任务处理。

为了更好地理解 Golang,这里有一个简单的类比。

想象一下,在一家大型餐厅的用餐高峰期。在传统的线程模型中,可能会有几名服务员(线程)负责接单、上菜和付款。如果餐厅客满,这些服务员可能会不堪重负,导致服务缓慢,顾客不满。现在,我们来思考一下这个餐厅场景中的 goroutine 模型。

你不再需要几个服务员,而是拥有一支庞大的敏捷助手(goroutine)团队,他们可以快速并发地处理多项任务。每个助手就像一个迷你服务员,可以接单、上菜或处理付款。他们可以同时工作,高效利用餐厅的可用空间和资源。

这些助手不仅数量众多,而且轻量级,这意味着它们可以在任务之间快速切换,且不会产生太多开销。如果一位助手暂时卡住或等待厨师,另一位助手可以接手,确保服务顺利进行。

餐厅管理器(Golang 的运行时调度程序)负责监督这些助手,确保它们有效地分布在可用的餐桌(CPU 核心)上,并且它们能够协作而不互相干扰。在这个比喻中,餐厅代表计算机资源,就餐任务代表待执行的函数,传统的服务员代表线程,而灵活的助手代表 goroutine。能够让多个这样的“助手”并发工作,可以实现高效且可扩展的服务,这使得 goroutine 成为 Golang 并发模型中一个强大的特性。

以下是 Goroutines 具有优势的原因:

  • 轻量级:Goroutine 比传统线程轻量级得多,仅占用几 KB 的堆栈空间。这使得您可以同时创建数千个甚至数百万个 Goroutine,而不会耗尽系统资源。

  • 简单的语法:启动一个 goroutine 非常简单,只需go在函数调用前添加关键字即可。例如,将作为 goroutinego myFunction()运行。myFunction

  • 高效调度:Golang 的运行时负责在可用的 CPU 核心上调度 Goroutine,从而确保最佳利用率。得益于高效的调度算法,一台拥有 4 个核心的机器可以并发运行数千个 Goroutine。以下是 Golang 中 Goroutine 的简单用法



func main() {
    for i := 0; i < 10000; i++ {
        go printNumber(i)
    }
}

func printNumber(number int) {
    fmt.Println(number)
}


Enter fullscreen mode Exit fullscreen mode

此代码将产生 10,000 个 goroutine 来同时打印数字。

Golang 中的队列

队列对于管理数据流至关重要,尤其是在必须处理大量请求的 Webhook 服务中。Golang 提供了两种主要的队列方式:

  1. 使用切片:您可以​​使用切片实现一个简单的队列,但这种方法缺乏并发控制并且可能导致竞争条件。

  2. 使用通道通道提供了一种在 Goroutine 之间通信的方式,可以充当队列。它们提供内置同步功能,确保数据在 Goroutine 之间安全传递。为了更好地理解通道,我们再举一个餐厅的比喻。

想象一下餐厅的厨房,订单就是在这里准备的。在传统的无排队系统中,服务员可能会直接把订单交给厨师,这在繁忙时段会导致混乱。如果一次性收到太多订单,厨师可能会不堪重负,导致延误和错误。

现在,我们来介绍一个使用通道的排队系统,类似于 Golang 的方法。在这个模型中,餐厅有一个组织良好的订单队列,用传送带(通道)表示。服务员接到顾客的订单后,会把订单放到传送带上,传送带会将订单系统有序地传送给厨师。

传送带确保订单按接收顺序处理(FIFO - 先进先出)。厨师可以一次从传送带上取下一个订单,准备菜肴,然后处理下一个订单。如果厨房繁忙,新订单只需在传送带上排队等候。这种排队系统使餐厅能够处理大量订单,而不会给厨师带来负担。

它还提供了灵活性。如果餐厅异常繁忙,他们可以增加更多厨师(goroutine)来处理传送带上的订单。如果是清淡的夜晚,他们可以减少厨师人数,传送带仍然能确保订单有序处理。

在 Golang 中,通道就像传送带一样,提供了一种在 goroutine 之间安全有序地发送和接收值的方式。它们允许你创建一个管道,让数据(订单)可以被多个厨师并发处理,而不会产生冲突或混乱。

下图是用 Golang 中的通道排队来类比餐厅的示意图:

在本文中,我们将使用通道,因为我们需要使用 goroutine 进行通信。现在我们已经更好地理解了 webhook 以及本文开发的解决方案背后的架构,让我们来构建我们的 webhook 服务。

项目设置

为了使事情更简单,我们将在这个项目中使用 Docker。我们将提供三项服务:

  • Redis:将运行用于在 Flask API 服务和 Webhook Golang 服务之间共享数据的 Redis 映像。

  • api:这是用 Flask 编写的服务名称。我们只需公开一个 GET 端点,该端点将生成随机有效负载,通过 Redis 将有效负载发送到 Webhook 服务,并返回生成的有效负载。无需实现复杂的端点,因为本文的重点是 Go 服务。

  • webhook:用 Golang 编写的服务名称。我们将构建一个应用程序,用于监听来自 Redis 通道的数据,然后处理负载并将其发送到负载中传入的 URL。由于我们正在处理 HTTP 请求,并且我们了解其他服务可能不可用,因此我们将实现重试机制(指数退避),并使用 Golang 队列使其更加健壮。设置过程需要一些时间,因此我建议使用以下命令克隆项目基础:



git clone --branch base https://github.com/koladev32/golang-wehook.git


Enter fullscreen mode Exit fullscreen mode

这将克隆base已经附带可运行的 Flask 项目和文件的项目分支docker-compose.yaml

让我们快速探索一下用 Python 编写的 Flask 服务的代码:



from datetime import datetime  
import json  
import os  
import random  
import uuid  
from flask import Flask  
import redis  


def get_payment():  
    return {  
        'url': os.getenv("WEBHOOK_ADDRESS", ""),  
        'webhookId': uuid.uuid4().hex,  
        'data': {  
            'id': uuid.uuid4().hex,  
            'payment': f"PY-{''.join((random.choice('abcdxyzpqr').capitalize() for i in range(5)))}",  
            'event': random.choice(["accepted", "completed", "canceled"]),  
            'created': datetime.now().strftime("%d/%m/%Y, %H:%M:%S"),  
        }  
    }  


redis_address = os.getenv("REDIS_ADDRESS", "")  
host, port = redis_address.split(":")  
port = int(port)  
# Create a connection to the Redis server  
redis_connection = redis.StrictRedis(host=host, port=port)  

app = Flask(__name__)  


@app.route('/payment')  
def payment():  
    webhook_payload_json = json.dumps(get_payment())  

    # Publish the JSON string to the "payments" channel in Redis  
    redis_connection.publish('payments', webhook_payload_json)  

    return webhook_payload_json  


if __name__ == '__main__':  
    app.run(host='0.0.0.0', port=8000)


Enter fullscreen mode Exit fullscreen mode

首先,我们创建一个名为 的函数,用于生成随机有效载荷get_payment。该函数将返回具有以下结构的随机有效载荷:



{
    "url": "http://example.com/webhook",
    "webhookId": "52d2fc2c7f25454c8d6f471a22bdfea9",
    "data": {
        "id": "97caab9b6f924f13a94b23a960b2fff2",
        "payment": "PY-QZPCQ",
        "event": "accepted",
        "date": "13/08/2023, 00:03:46"
    }
}


Enter fullscreen mode Exit fullscreen mode

之后,我们使用REDIS_ADDRESS环境变量初始化与 Redis 的连接。



...

redis_address = os.getenv("REDIS_ADDRESS", "")  
host, port = redis_address.split(":")  
port = int(port)  
# Create a connection to the Redis server  
redis_connection = redis.StrictRedis(host=host, port=port)  

...


Enter fullscreen mode Exit fullscreen mode

之所以redis_address被拆分,是因为REDIS_ADDRESS通常会像这样localhost:6379redis:6379(如果我们使用 Redis 容器)。之后,我们有一个路由处理函数,它通过名为 的 Redis 通道发送一个使用 方法格式化的payment随机负载,然后返回该随机负载。webhook_payload_jsonjson.dumpspayments

这是一个简单的支付 API 网关实现,或者简单地说是一个 Mock。现在我们了解了项目的基础,接下来让我们快速讨论一下该解决方案的架构,以及一些增强其健壮性的概念的实现。我们将在文章末尾讨论它们的缺点。

解决方案的架构

该解决方案的架构非常简单:

  • 我们有一个充当支付网关的 API。此 API 端点上的请求将返回一个有效负载,但该有效负载也会通过名为 的 Redis 通道发送payments。因此,所有监听此通道的服务都将收到发送的数据。

  • 然后,我们用 Golang 编写了一个 Webhook 服务。该服务监听paymentsRedis 通道。如果收到数据,则将有效负载格式化,发送到有效负载上指定的 URL。如果请求由于超时或其他错误而失败,则可以使用 Golang 通道排队和指数退避算法的重试机制来重试请求。

让我们编写 Golang 服务。

编写 Golang 服务

我们将在目录中使用 Golang 编写 webhook 服务的逻辑webhook。本节结束时我们将实现以下结构。



webhook
├── Dockerfile          # Defines the Docker container for the project
├── go.mod              # Module dependencies file for the Go project
├── go.sum              # Contains the expected cryptographic checksums of the content of specific module versions
├── main.go             # Main entry point for the application
├── queue
│   └── worker.go       # Contains the logic for queuing and processing tasks
├── redis
│   └── redis.go        # Handles the connection and interaction with Redis
├── sender
    └── webhook.go      # Responsible for sending the webhook requests


Enter fullscreen mode Exit fullscreen mode

让我们从创建 Go 项目开始。



go mod init .


Enter fullscreen mode Exit fullscreen mode

要创建一个 Go 项目,你可以使用。在我们的例子中,在命令末尾go mod init name-of-the-project添加点告诉 Go 使用目录名称作为模块名称。.

一旦创建了模块,让我们安装所需的依赖项,例如redis



 go get github.com/go-redis/redis/v8


Enter fullscreen mode Exit fullscreen mode

太棒了!我们现在可以开始写代码了。😈

添加 webhook 发送逻辑

稍微有点不走寻常路,我们先来编写 webhook 发送逻辑。由于我们使用的是 Golang 队列,为了简化开发流程,我们先添加系统的第一个依赖项:发送 webhook 的函数。

sender使用命令创建一个名为 的目录mkdir sender。然后在此目录中创建一个名为 的新文件webhook.go



mkdir sender && cd sender
touch webhook.go


Enter fullscreen mode Exit fullscreen mode

在新创建的文件中,让我们添加所需的命名、导入和结构。



package sender  

import (  
   "bytes"  
   "encoding/json"  
   "errors"  
   "io"  
   "log"  
   "net/http"  
)  

// Payload represents the structure of the data expected to be sent as a webhook  
type Payload struct {  
   Event   string  
   Date    string  
   Id      string  
   Payment string  
}


Enter fullscreen mode Exit fullscreen mode

接下来,让我们创建一个SendWebhook向 URL 发送 JSON POST 请求的函数。



// SendWebhook sends a JSON POST request to the specified URL and updates the event status in the database  
func SendWebhook(data interface{}, url string, webhookId string) error {  
   // Marshal the data into JSON  
   jsonBytes, err := json.Marshal(data)  
   if err != nil {  
      return err  
   }  

   // Prepare the webhook request  
   req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))  
   if err != nil {  
      return err  
   }  
   req.Header.Set("Content-Type", "application/json")  

   // Send the webhook request  
   client := &http.Client{}  
   resp, err := client.Do(req)  
   if err != nil {  
      return err  
   }  
   defer func(Body io.ReadCloser) {  
      if err := Body.Close(); err != nil {  
         log.Println("Error closing response body:", err)  
      }  
   }(resp.Body)  

   // Determine the status based on the response code  
   status := "failed"  
   if resp.StatusCode == http.StatusOK {  
      status = "delivered"  
   }  

   log.Println(status)  

   if status == "failed" {  
      return errors.New(status)  
   }  

   return nil  
}


Enter fullscreen mode Exit fullscreen mode

让我们解释一下这里发生了什么。

  1. 将数据编组为 JSON:传递给函数的数据将被编组为 JSON 字节数组。如果在此过程中出现错误,则会返回错误。


   jsonBytes, err := json.Marshal(data)
   if err != nil {
       return err
   }


Enter fullscreen mode Exit fullscreen mode
  • 准备 Webhook 请求:创建一个新的 HTTP POST 请求,以 JSON 数据作为正文。“Content-Type”标头设置为application/json


    req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBytes))
    if err != nil {
    return err
    }
    req.Header.Set("Content-Type", "application/json")


Enter fullscreen mode Exit fullscreen mode
  • 发送 Webhook 请求:HTTP 客户端发送已准备好的请求。如果发送请求时出错,则会返回错误。响应主体也会延迟到处理后关闭。


    client := &http.Client{}
    resp, err := client.Do(req)
    if err != nil {
        return err
    }
    defer func(Body io.ReadCloser) {
        if err := Body.Close(); err != nil {
            log.Println("Error closing response body:", err)
        }
    }(resp.Body)


Enter fullscreen mode Exit fullscreen mode
  • 根据响应代码确定状态:Webhook 的状态根据 HTTP 响应代码确定。如果状态代码为200 (OK),则状态设置为“已送达”;否则,状态设置为“失败”,并返回包含该状态代码的错误。


...
status := "failed"
if resp.StatusCode == http.StatusOK {
    status = "delivered"
}
log.Println(status)
if status == "failed" {
    return errors.New(status)
}
...


Enter fullscreen mode Exit fullscreen mode
  1. 返回成功:如果一切成功,函数返回nil,表示 webhook 发送成功。

这就是发送请求的逻辑。目前为止还不算太复杂,但我们即将进入最精彩的部分🤫。让我们添加一个包来处理对paymentsRedis 通道的监听。

监听 Redis 频道

Webhook 服务的另一个重要方面是它应该主动监听paymentsRedis 通道。这是 Redis 特性的概念pub/sub。Flask 服务将数据发布到通道中,然后订阅该通道的所有服务都会接收数据。

在 webhook 目录项目的根目录中,创建一个名为 的新目录redis。在此目录中创建了一个名为 的新文件redis.go。此文件将包含订阅和监听通道中传入数据的逻辑payments,同时还会格式化有效负载并将其发送到 Golang 队列通道。



package redis  

import (  
   "context"  
   "encoding/json"  
   "log"  

   "github.com/go-redis/redis/v8"  
)  

// WebhookPayload defines the structure of the data expected  
// to be received from Redis, including URL, Webhook ID, and relevant data.  
type WebhookPayload struct {  
   Url       string `json:"url"`  
   WebhookId string `json:"webhookId"`  
   Data      struct {  
      Id      string `json:"id"`  
      Payment string `json:"payment"`  
      Event   string `json:"event"`  
      Date    string `json:"created"`  
   } `json:"data"`  
}


Enter fullscreen mode Exit fullscreen mode

让我们编写该Subscribe函数。



func Subscribe(ctx context.Context, client *redis.Client, webhookQueue chan WebhookPayload) error {  
   // Subscribe to the "webhooks" channel in Redis  
   pubSub := client.Subscribe(ctx, "payments")  

   // Ensure that the PubSub connection is closed when the function exits  
   defer func(pubSub *redis.PubSub) {  
      if err := pubSub.Close(); err != nil {  
         log.Println("Error closing PubSub:", err)  
      }  
   }(pubSub)  

   var payload WebhookPayload  

   // Infinite loop to continuously receive messages from the "webhooks" channel  
   for {  
      // Receive a message from the channel  
      msg, err := pubSub.ReceiveMessage(ctx)  
      if err != nil {  
         return err // Return the error if there's an issue receiving the message  
      }  

      // Unmarshal the JSON payload into the WebhookPayload structure  
      err = json.Unmarshal([]byte(msg.Payload), &payload)  
      if err != nil {  
         log.Println("Error unmarshalling payload:", err)  
         continue // Continue with the next message if there's an error unmarshalling  
      }  

      webhookQueue <- payload // Sending the payload to the channel  
   }  
}


Enter fullscreen mode Exit fullscreen mode

这段代码定义了一个名为 的函数Subscribe,它订阅一个特定的 Redis 通道(“payments”),并持续监听该通道上的消息。收到消息后,它会处理该消息并将其发送到 Go 通道进行进一步处理。

  • 该代码使用提供的 Redis 客户端订阅 Redis 中的“payments”频道。任何发送到该频道的消息都将由该函数接收。


pubSub := client.Subscribe(ctx, "payments")


Enter fullscreen mode Exit fullscreen mode
  • 然后,我们确保函数退出时(无论是正常退出还是由于错误退出)与 Redis 的 PubSub(发布-订阅)连接都会关闭。这对于清理资源非常重要。


defer func(pubSub *redis.PubSub) {
    if err := pubSub.Close(); err != nil {
        log.Println("Error closing PubSub:", err)
    }
 }(pubSub)


Enter fullscreen mode Exit fullscreen mode
  • 这里的循环for无限运行,只要程序正在运行,函数就可以继续监听消息。


for {
    // ...
 }


Enter fullscreen mode Exit fullscreen mode

在循环内部,代码等待从 Redis 通道接收消息。如果接收消息时出错(通常是在有效负载的反序列化过程中),函数会记录错误,然后我们继续执行该函数。



err = json.Unmarshal([]byte(msg.Payload), &payload)
if err != nil {
    log.Println("Error unmarshalling payload:", err)
    continue // Continue with the next message if there's an error unmarshalling
}


Enter fullscreen mode Exit fullscreen mode

收到消息后,代码会尝试将消息负载从 JSON 转换为 Go 结构 ( WebhookPayload)。如果此过程中出现错误,它会记录错误并继续处理下一条消息。

  • 最后,代码将处理后的有效载荷发送到 Go 通道 ( webhookQueue)。此通道将在queue包中用于处理有效载荷。


webhookQueue <- payload // Sending the payload to the channel


Enter fullscreen mode Exit fullscreen mode

简单来说,这个函数就像一个调到特定电台(paymentsRedis 中的频道)的收音机。它不断监听消息(比如收音机里的歌曲)并进行处理(比如调整音质),然后将其传递给程序的另一部分(比如播放音乐的扬声器)。(好吧!我尽力用这些奇怪的比喻了!😤)

现在我们有了Subscribe方法,让我们添加排队逻辑。我们将在这里实现重试逻辑。

添加排队逻辑

队列是一种遵循先进先出 (FIFO) 原则的数据结构。想象一下在银行排队等候的人;队里第一个人会先得到服务,新的人会排在队尾。

在 Golang 中,您可以通过两种主要方式使用队列:

  • 切片:切片是 Go 中的动态大小数组。你可以使用它们来创建简单的队列,只需在末尾添加项目,并从开头删除项目即可。

  • 通道:通道更复杂,但提供了更多可能性。它们允许两个 Goroutine(并发函数)进行通信并同步执行。你可以将通道用作队列,其中一个 Goroutine 将数据发送到通道(入队),另一个 Goroutine 从通道接收数据(出队)。

在我们的具体案例中,我们将使用基于通道的排队。原因如下:

  • 并发:通道旨在处理并发操作,使其适用于多个功能需要通信或同步的场景。

  • 容量控制:您可以设置通道的容量,控制其一次可容纳的数据量。这有助于管理资源和流量控制。

  • 阻塞和非阻塞操作:通道可以以阻塞和非阻塞方式使用,让您控制发送和接收操作的行为方式。

我们将使用通道从Subscribe函数发送数据,然后在ProcessWebhooks接下来编写的函数中处理这些数据。通过使用通道,我们可以确保程序不同部分之间的顺畅通信,从而高效可靠地处理 Webhook。

编写排队逻辑

在 webhook 项目的根目录下,创建一个名为 的目录queue。在此目录中,添加一个名为 的文件worker.go。此文件将包含处理队列中接收的数据的逻辑。



mkdir queue && cd queue
touch worker.go


Enter fullscreen mode Exit fullscreen mode

像往常一样,我们首先从导入开始。



package queue  

import (  
   "context"  
   "log"  
   "time"  
   "webhook/sender"  

   redisClient "webhook/redis"  
)


Enter fullscreen mode Exit fullscreen mode

然后添加处理webhooks数据的函数,ProcessWebhooks



func ProcessWebhooks(ctx context.Context, webhookQueue chan redisClient.WebhookPayload) {  
   for payload := range webhookQueue {  
      go func(p redisClient.WebhookPayload) {  
         backoffTime := time.Second  // starting backoff time  
         maxBackoffTime := time.Hour // maximum backoff time  
         retries := 0  
         maxRetries := 5  

         for {  
            err := sender.SendWebhook(p.Data, p.Url, p.WebhookId)  
            if err == nil {  
               break  
            }  
            log.Println("Error sending webhook:", err)  

            retries++  
            if retries >= maxRetries {  
               log.Println("Max retries reached. Giving up on webhook:", p.WebhookId)  
               break  
            }  

            time.Sleep(backoffTime)  

            // Double the backoff time for the next iteration, capped at the max  
            backoffTime *= 2  
            log.Println(backoffTime)  
            if backoffTime > maxBackoffTime {  
               backoffTime = maxBackoffTime  
            }  
         }  
      }(payload)  
   }  
}


Enter fullscreen mode Exit fullscreen mode

让我们理解一下上面的代码。被调用的函数ProcessWebhooks接收一个包含 webhook 负载的 Go 通道并进行处理。如果发送 webhook 失败,它会使用指数退避策略重试。

  • 首先,我们循环遍历通道中的项目列表webhookQueue。只要列表中还有项目,我们就会继续处理数据。


for payload := range webhookQueue {
    // processing code
}


Enter fullscreen mode Exit fullscreen mode
  • 对于每个有效负载,都会启动一个新的 goroutine(轻量级线程)。这允许同时处理多个 webhook。


go func(p redisClient.WebhookPayload) {


Enter fullscreen mode Exit fullscreen mode
  • 接下来,我们初始化变量来控制重试逻辑。如果发送 webhook 失败,代码将等待 ( backoffTime) 后再重试。每次失败后,等待时间都会加倍,最长为 ( maxBackoffTime)。该过程最多会重试maxRetries次。


backoffTime := time.Second  // starting backoff time
maxBackoffTime := time.Hour // maximum backoff time
retries := 0
maxRetries := 5


Enter fullscreen mode Exit fullscreen mode
  • 在下一部分中,我们将尝试使用该SendWebhook函数发送 webhook。如果成功 ( err == nil),则循环中断,并转到下一个有效载荷。


err := sender.SendWebhook(p.Data, p.Url, p.WebhookId)
if err == nil {
    break
}
log.Println("Error sending webhook:", err)


Enter fullscreen mode Exit fullscreen mode
  • 如果发送 webhook 失败,代码会记录错误,增加重试次数,并等待backoffTime后再重试。每次失败后,退避时间都会加倍,但上限为maxBackoffTime


retries++
if retries >= maxRetries {
    log.Println("Max retries reached. Giving up on webhook:", p.WebhookId)
    break
}
time.Sleep(backoffTime)
backoffTime *= 2
if backoffTime > maxBackoffTime {
    backoffTime = maxBackoffTime
}


Enter fullscreen mode Exit fullscreen mode

ProcessWebhooks函数旨在处理 webhook 负载队列。它会尝试发送每个 webhook,如果失败,则使用指数退避策略重试。通过使用 goroutines,它可以并发处理多个 webhook,从而提高处理效率。

简单来说,这个函数就像邮局工作人员尝试投递包裹(webhook)。如果投递失败,工作人员每次都会等待一段时间再尝试,最多可以尝试一定次数。如果所有尝试都失败,工作人员就会继续投递下一个包裹。

我们已经编写了服务中最重要的部分。让我们将它们整合在一起。

把所有东西放在一起

现在是时候把我们编写的所有内容整合在一起了。在main.go文件中,我们将添加以下逻辑:创建 Redis 客户端来启动连接,创建用作队列的通道,然后启动所需的进程。



package main  

import (  
   "context"  
   "log"  
   "os"  

   redisClient "webhook/redis"  

   "webhook/queue"  

   "github.com/go-redis/redis/v8" // Make sure to use the correct version  
)  

func main() {  
   // Create a context  
   ctx, cancel := context.WithCancel(context.Background())  
   defer cancel()  

   // Initialize the Redis client  
   client := redis.NewClient(&redis.Options{  
      Addr:     os.Getenv("REDIS_ADDRESS"), // Use an environment variable to set the address  
      Password: "",                         // No password  
      DB:       0,                          // Default DB  
   })  

   // Create a channel to act as the queue  
   webhookQueue := make(chan redisClient.WebhookPayload, 100) // Buffer size 100  

   go queue.ProcessWebhooks(ctx, webhookQueue)  

   // Subscribe to the "transactions" channel  
   err := redisClient.Subscribe(ctx, client, webhookQueue)  

   if err != nil {  
      log.Println("Error:", err)  
   }  

   select {}  

}


Enter fullscreen mode Exit fullscreen mode

我们来解释一下上面的代码。

  • 我们首先从包声明和导入开始


package main

import (
    "context"
    "log"
    "os"
    redisClient "webhook/redis"
    "webhook/queue"
    "github.com/go-redis/redis/v8" // Make sure to use the correct version
)


Enter fullscreen mode Exit fullscreen mode

然后,我们声明将首先创建上下文的主函数。



ctx, cancel := context.WithCancel(context.Background())
defer cancel()


Enter fullscreen mode Exit fullscreen mode

创建上下文是为了管理程序不同部分之间的取消信号。这有助于在需要时优雅地关闭进程。

  • 然后,我们使用环境变量中的地址创建与 Redis 服务器的连接来初始化 Redis 客户端。


client := redis.NewClient(&redis.Options{
    Addr:     os.Getenv("REDIS_ADDRESS"), // Use an environment variable to set the address
    Password: "",                         // No password
    DB:       0,                          // Default DB
})


Enter fullscreen mode Exit fullscreen mode
  • 接下来的部分绝对重要,因为我们首先要创建一个作为 webhook 有效负载队列的通道。


webhookQueue := make(chan redisClient.WebhookPayload, 100) // Buffer size 100
go queue.ProcessWebhooks(ctx, webhookQueue)


Enter fullscreen mode Exit fullscreen mode

它的缓冲区大小为 100,这意味着它一次最多可以容纳 100 个项目。我们启动一个 goroutine 来处理来自webhookQueue通道的 webhook。处理逻辑定义在ProcessWebhooks函数中。

  • 然后,我们订阅一个名为 的 Redis 频道payments并监听消息。收到消息后,会将其添加到webhookQueue频道进行处理。


err := redisClient.Subscribe(ctx, client, webhookQueue)
if err != nil {
    log.Println("Error:", err)
}


Enter fullscreen mode Exit fullscreen mode

然后在函数的末尾,我们创建一个无限循环来保持程序运行。



select {}


Enter fullscreen mode Exit fullscreen mode

如果没有这个,程序启动后就会立即退出goroutines,并且它们没有机会运行。

简单来说,这段代码使用 Redis 搭建了一个简单的 webhook 处理系统。它初始化一个 Redis 连接,创建一个通道作为队列,启动一个 goroutine 来处理 webhook,并订阅一个 Redis 通道来接收新的 webhook 负载。之后,程序进入一个无限循环,让 goroutine 继续运行并处理收到的 webhook。

现在我们已经拥有了 webhook 服务运行所需的所有文件。我们可以将应用程序 docker 化,并启动 docker 容器。

运行项目

现在该运行项目了。让我们添加一个 Dockerfile 和所需的环境变量。在 webhook go 项目中,添加以下 Dockerfile。



# Start from a Debian-based Golang official image  
FROM golang:1.21-alpine as builder  

# Set the working directory inside the container  
WORKDIR /app  

# Copy the go mod and sum files  
COPY go.mod go.sum ./  

# Download all dependencies  
RUN go mod download  

# Copy the source code from your host to your image filesystem.  
COPY . .  

# Build the Go app  
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .  

# Use a minimal alpine image for the final stage  
FROM alpine:latest  

# Set the working directory inside the container  
WORKDIR /root/  

# Copy the binary from the builder stage  
COPY --from=builder /app/main .  

# Run the binary  
CMD ["./main"]


Enter fullscreen mode Exit fullscreen mode

Dockerfile 写好后,我们现在可以启动 Docker 容器了。但首先,你需要一个 webhook URL 来尝试这个项目。你可以在https://webhook.site轻松免费获取一个。完成后,在项目.env根目录下创建一个名为 的文件,该文件位于项目根目录下docker-compose.yaml。然后,确保内容与项目根目录下的文件类似。



REDIS_ADDRESS=redis:6379  
WEBHOOK_ADDRESS=<WEBHOOK_ADDRESS>


Enter fullscreen mode Exit fullscreen mode

替换为https://webhook.site<WEBHOOK_ADDRESS>提供的 webhook URL

然后,使用 构建并启动容器docker compose up -d --build

构建完成后,您可以使用docker compose logs -f命令跟踪webhook服务的日志。

访问http://127.0.0.1:8000/payment即可通过 Redis 向 webhook 服务发送数据。如果出现错误,日志内容如下。

如果成功则记录日志。

注意:要调整 webhook 接收错误的行为,您可以修改接收请求时发送的状态。在webhook.site仪表板上,点击Edit导航栏上的按钮。然后将 更改Default status code504以指示服务器超时。

恭喜!我们刚刚用 Golang 构建了一个 Webhook 服务。我们已经探索了 Redis 通道、Golang 中的通道队列、指数退避算法和 Goroutines。🔥

让我们快速讨论一下我们已经实施的解决方案的一些改进。

我们能做什么更好?

虽然我们实现的解决方案已经具备实用性,并演示了并发处理、指数退避和 Redis 集成等关键概念,但仍有几处地方可以改进。以下是一些缺陷的分析以及我们提出的解决方法:

可扩展性

我们当前的 webhook 服务实现利用了 Redis、队列和 goroutines。虽然这是一个坚实的基础,但对于处理大量并发 webhook 来说可能不够。

Goroutine 虽然轻量级,但会消耗系统资源。大量的并发 webhook 可能会耗尽这些资源,导致处理速度缓慢甚至系统崩溃。此外,Redis 虽然速度很快,但如果读写操作数量超出其容量,就会成为瓶颈,从而导致延迟。

我们的系统也存在单点故障。如果 Redis 服务器宕机,整个系统就无法运行,导致数据丢失和服务中断。

为了解决这些可扩展性挑战,我们可以考虑以下增强功能:

  1. 实现分布式队列系统:Kafka 或 RabbitMQ 等系统可以处理大量消息并将负载分布在多个服务器上。

  2. 利用工作池:使用工作池管理 goroutine 可以控制资源使用情况并防止系统过载。

  3. 水平扩展:通过添加更多 webhook 服务实例并使用负载均衡器,我们可以均匀分配负载并处理更多请求。

安全

安全性是另一个需要关注的关键方面。目前,我们的 Redis 服务器未使用密码,这使其容易受到未经授权的访问。任何能够连接到 Redis 服务器的人都可以读取或写入数据,这构成了潜在的数据泄露风险。

此外,如果 Redis 服务器宕机,系统可能会丢失已发送的有效负载,从而导致数据丢失。为了降低这种风险,我们可以:

  1. 使用 Redis 密码:这个简单的措施可以防止未经授权访问 Redis 服务器。

  2. 设置复制:Redis 复制可确保另一台服务器上有数据的副本,如果一台服务器出现故障,则可以提供后备。

  3. 启用持久性:定期将数据保存到磁盘可确保在服务器崩溃时可以恢复数据。

如果 Webhook 服务器本身宕机,有效负载将无法处理,数据可能会丢失。为了解决这个问题,我们可以:

  1. 实施重试机制:具有指数退避的强大重试机制可确保持续尝试处理有效载荷。

  2. 监控和警报:监控和警报可以通知管理员服务器停机时间,以便快速干预。

  3. 利用备份服务器:拥有备份或故障转移服务器可确保在一台服务器出现故障时能够持续提供服务。

结论

您构建的 Webhook 服务奠定了坚实的基础,这些增强功能可以将其提升到新的水平。通过关注可扩展性、可靠性、安全性和可维护性,您可以构建一个不仅满足当前需求,还能应对未来增长和变化的系统。请务必考虑应用程序的具体需求和限制,因为并非所有改进都对您的特定用例而言都是必要的或适用的。

如果您认为本文可以做得更好,请随意在下面发表评论。

另外,您可以在GitHub上查看本文项目的代码源

鏂囩珷鏉ユ簮锛�https://dev.to/koladev/building-a-robust-webhook-server-with-golang-a-compressive-guide-4oa0
PREV
如何与非开发人员交谈?
NEXT
Javascript 详解 | Sketch-notes 第一部分