使

使用 Publish/Sub 在 Go 中构建事件驱动系统

2025-06-05

使用 Publish/Sub 在 Go 中构建事件驱动系统

发布者和订阅者 (Pub/Sub) 是后端应用程序中强大的构建块。它允许您使用事件驱动架构构建通过异步广播事件进行通信的系统。这是一种很好的服务解耦方法,可以提高可靠性和响应速度。

🧠在本指南中,我们将比较 Pub/Sub 方法与更传统的纯 API 方案。我们还将介绍Encore,它是一种高效构建事件驱动应用程序的方法,无需手动处理基础设施。

✨Pub/Sub 的好处

  • Pub/Sub 可用于通过减少故障组件和瓶颈的爆炸半径来提高应用程序的可靠性。
  • 可以使用Pub/Sub来提高对用户的响应速度。
  • Pub/Sub 甚至可以通过反转服务之间的依赖关系来帮助减少开发人员的认知开销。

🤔 它与普通的 API 实现有何不同?

🖼 让我们看一下用户注册服务中的示例 API。

我们想要实现的行为是,在用户注册后,向用户发送一封欢迎邮件,并在分析系统中创建注册记录。现在,让我们看看如何仅使用 API 来实现这一点,并与 Pub/Sub 的实现方式进行比较。

📞 仅使用 API 的方法

使用服务之间的 API 调用,我们可以设计一个在用户注册时如下所示的系统:

  1. user服务启动数据库事务并在其数据库中记录用户。
  2. user服务调用该email服务来发送欢迎电子邮件。
  3. 然后,该email服务会调用电子邮件提供商来实际发送电子邮件。
  4. 成功后,email服务会回复该user服务请求已被处理。
  5. user然后该服务调用该analytics服务来记录注册。
  6. analytics服务写入数据仓库来记录信息。
  7. 然后该analytics服务会回复该user服务,告知该请求已被处理。
  8. 服务user提交数据库事务。
  9. 然后服务user可以回复用户注册成功。

👎 仅使用 API 方法的缺点

请注意,我们必须等待所有操作完成之后才能回复用户,告知他们我们已经注册了。

  • 这意味着,如果我们的电子邮件提供商需要 3 秒钟来发送电子邮件,那么我们现在也需要 3 秒钟来响应用户,而实际上,一旦用户被写入数据库,我们就可以立即响应用户以确认注册。
  • 这种方法的另一个缺点是,如果我们的数据仓库目前出现故障并报告错误,那么每当有人尝试注册时,我们的系统也会报告错误!既然分析纯粹是内部的,不会影响用户,那么分析系统故障又怎么会影响用户注册呢?

👉 比较:Pub/Sub 方法

一个更好的解决方案是,我们可以将向用户发送电子邮件的行为和记录我们的分析分离,这样用户服务只需要在自己的数据库中记录用户并让用户知道他们已经注册 - 而不必担心下游的影响。

值得庆幸的是,这正是Pub/Sub 主题允许我们做的事情。

在此示例中,当用户注册时会发生以下情况:

  1. user服务启动数据库事务并在其数据库中记录用户。
  2. 向主题发布注册事件signups
  3. 提交交易并回复用户注册成功。

此时,用户可以自由地继续与应用程序交互,并且我们已将注册行为
与应用程序的其余部分隔离。

同时,emailanalytics服务将接收来自主题的注册事件signups,然后执行各自的任务。如果任一服务返回错误,该事件将自动回退并重试,
直到服务能够成功处理该事件,或达到最大尝试次数并被放入
死信队列 (DLQ)。

👍 此版本的好处:

  • 其他两项服​​务的处理时间不会影响最终用户,事实上该user服务甚至没有意识到emailanalytics服务。
  • 可以将需要了解新用户注册的新系统添加到应用程序中,而无需更改user服务或影响其性能。

🔨 使用 Go 和 Pub/Sub 构建事件驱动的应用程序

既然我们已经展示了使用 Pub/Sub 的优势,现在是时候构建我们​​自己的事件驱动的 Go 后端了!我们当然会使用 Pub/Sub,并且会使用Encore(一个后端开发平台)来自动化基础设施配置。它既可以在本地环境运行,也可以在您自己的 AWS/GCP 云环境中运行。

👉 让我们看一下如何在 Go 中构建完全类型安全的事件驱动后端,并以实现正常运行时间监控系统为例。

🚀 我们将做什么:

  • 安装 Encore
  • 从启动分支创建你的应用程序
  • 在本地运行以尝试前端
  • 构建后端
  • 部署到 Encore 的免费开发云

现在不想构建?那就观看视频吧:
用 Go 构建事件驱动的后端应用程序

✨最终结果:

正常运行时间监控器

演示应用程序:试用该应用程序

完成后,我们将拥有一个具有这种类型安全的事件驱动架构的后端:

正常运行时间监控器架构
在此图表(由 Encore 自动生成)中,您可以将各个服务视为白框,将 Pub/Sub 主题视为黑框。

🏁 出发啦!

为了让您更容易跟随,我们布置了一系列羊角面包来引导您的路线。

每当您看到🥐时,就意味着您有事要做!

💽 安装 Encore

安装 Encore CLI 来运行您的本地环境:

  • macOS: brew install encoredev/tap/encore
  • Linux: curl -L https://encore.dev/install.sh | bash
  • 视窗: iwr https://encore.dev/install.ps1 | iex

创建您的 Encore 应用程序

🥐 从此启动分支创建您的新应用程序,并使用可立即使用的前端:



encore app create uptime --example=github.com/encoredev/example-app-uptime/tree/starting-point


Enter fullscreen mode Exit fullscreen mode

💻 在本地运行你的应用

🥐 通过在本地运行您的应用程序来检查您的前端是否正常工作。



cd uptime
encore run


Enter fullscreen mode Exit fullscreen mode

您应该看到以下内容:
再来一次跑步这意味着 Encore 已启动您的本地环境并为 Pub/Sub 和数据库创建了本地基础设施。

然后访问http://localhost:4000/frontend/查看前端。
由于我们尚未构建后端,因此该功能目前无法使用。

–我们现在就这么做!

🔨创建监控服务

让我们首先创建检查网站当前是否正常运行或关闭的功能。

稍后我们会将此结果存储在数据库中,以便我们能够检测状态何时发生变化并
发送警报。

🥐 创建一个名为 的服务,monitor其中包含一个名为 的文件ping.go。使用 Encore,您可以通过创建一个 Go 包来实现:



mkdir monitor
touch monitor/ping.go


Enter fullscreen mode Exit fullscreen mode

🥐 添加一个名为的 API 端点,Ping该端点以 URL 作为输入并返回指示站点是启动还是关闭的响应。

使用 Encore,您可以通过创建一个函数并向//encore:api其添加注释来实现此目的。

将其粘贴到ping.go文件中:



package monitor

import (
    "context"
    "net/http"
    "strings"
)

// PingResponse is the response from the Ping endpoint.
type PingResponse struct {
    Up bool `json:"up"`
}

// Ping pings a specific site and determines whether it's up or down right now.
//
//encore:api public path=/ping/*url
func Ping(ctx context.Context, url string) (*PingResponse, error) {
    // If the url does not start with "http:" or "https:", default to "https:".
    if !strings.HasPrefix(url, "http:") && !strings.HasPrefix(url, "https:") {
        url = "https://" + url
    }

    // Make an HTTP request to check if it's up.
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return &PingResponse{Up: false}, nil
    }
    resp.Body.Close()

    // 2xx and 3xx status codes are considered up
    up := resp.StatusCode < 400
    return &PingResponse{Up: up}, nil
}


Enter fullscreen mode Exit fullscreen mode

🥐 快来试试吧!确保你已经安装并运行了Docker,然后encore run在终端中运行,你就会看到服务启动了。

🥐 现在打开在http://localhost:9400运行的本地开发仪表板并尝试调用
端点,并将其作为 URLmonitor.Ping传入。google.com

如果您更喜欢使用终端,curl http://localhost:4000/ping/google.com请在新的终端中运行。无论哪种方式,您都应该看到响应:



{"up": true}


Enter fullscreen mode Exit fullscreen mode

您也可以尝试使用httpstat.us/400some-non-existing-url.com,它应该会响应{"up": false}
(测试负面情况也总是一个好主意。)

🧪 添加测试

🥐 让我们编写一个自动化测试,这样我们就不会随着时间的推移破坏这个端点。创建文件monitor/ping_test.go并添加以下代码:



package monitor

import (
    "context"
    "testing"
)

func TestPing(t *testing.T) {
    ctx := context.Background()
    tests := []struct {
        URL string
        Up  bool
    }{
        {"encore.dev", true},
        {"google.com", true},
        // Test both with and without "https://"
        {"httpbin.org/status/200", true},
        {"https://httpbin.org/status/200", true},

        // 4xx and 5xx should considered down.
        {"httpbin.org/status/400", false},
        {"https://httpbin.org/status/500", false},
        // Invalid URLs should be considered down.
        {"invalid://scheme", false},
    }

    for _, test := range tests {
        resp, err := Ping(ctx, test.URL)
        if err != nil {
            t.Errorf("url %s: unexpected error: %v", test.URL, err)
        } else if resp.Up != test.Up {
            t.Errorf("url %s: got up=%v, want %v", test.URL, resp.Up, test.Up)
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

🥐 运行encore test ./...并检查一切是否按预期运行。你应该看到类似这样的内容:



$ encore test ./...
9:38AM INF starting request endpoint=Ping service=monitor test=TestPing
9:38AM INF request completed code=ok duration=71.861792 endpoint=Ping http_code=200 service=monitor test=TestPing
[... lots more lines ...]
PASS
ok      encore.app/monitor      1.660


Enter fullscreen mode Exit fullscreen mode

🎉 成功了!干得好!

🔨创建站点服务

接下来,我们要跟踪要监控的网站列表。

由于大多数 API 都是简单的 CRUD(创建/读取/更新/删除)端点,因此让我们使用GORM来构建此服务,GORM 是一个 ORM 库,可以非常简单地构建 CRUD 端点。

🥐 创建一个名为 SQL 数据库的新服务site。为此,请site在应用程序根目录中创建一个新目录,并migrations在该文件夹中创建一个文件夹:



mkdir site
mkdir site/migrations


Enter fullscreen mode Exit fullscreen mode

🥐 在该文件夹中添加一个名为 的数据库迁移文件1_create_tables.up.sql。文件名很重要(它必须类似于1_<name>.up.sql),因为 Encore 会使用文件名自动运行迁移。

添加以下内容:



CREATE TABLE sites (
    id BIGSERIAL PRIMARY KEY,
    url TEXT NOT NULL
);


Enter fullscreen mode Exit fullscreen mode

🥐 接下来,安装 GORM 库和 PostgreSQL 驱动程序:



go get -u gorm.io/gorm gorm.io/driver/postgres


Enter fullscreen mode Exit fullscreen mode

现在让我们创建site服务本身。为此,我们将使用 Encore 的依赖注入支持来注入 GORM 数据库连接。

🥐 创建site/service.go并添加此代码:



package site

import (
    "encore.dev/storage/sqldb"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
)

//encore:service
type Service struct {
    db *gorm.DB
}

var siteDB = sqldb.Named("site").Stdlib()

// initService initializes the site service.
// It is automatically called by Encore on service startup.
func initService() (*Service, error) {
    db, err := gorm.Open(postgres.New(postgres.Config{
        Conn: siteDB,
    }))
    if err != nil {
        return nil, err
    }
    return &Service{db: db}, nil
}


Enter fullscreen mode Exit fullscreen mode

🥐 这样,我们现在就可以创建 CRUD 端点了。
创建以下文件:

site/get.go



package site

import "context"

// Site describes a monitored site.
type Site struct {
    // ID is a unique ID for the site.
    ID int `json:"id"`
    // URL is the site's URL.
    URL string `json:"url"`
}

// Get gets a site by id.
//
//encore:api public method=GET path=/site/:siteID
func (s *Service) Get(ctx context.Context, siteID int) (*Site, error) {
    var site Site
    if err := s.db.Where("id = $1", siteID).First(&site).Error; err != nil {
        return nil, err
    }
    return &site, nil
}


Enter fullscreen mode Exit fullscreen mode

site/add.go



package site

import "context"

// AddParams are the parameters for adding a site to be monitored.
type AddParams struct {
    // URL is the URL of the site. If it doesn't contain a scheme
    // (like "http:" or "https:") it defaults to "https:".
    URL string `json:"url"`
}

// Add adds a new site to the list of monitored websites.
//
//encore:api public method=POST path=/site
func (s *Service) Add(ctx context.Context, p *AddParams) (*Site, error) {
    site := &Site{URL: p.URL}
    if err := s.db.Create(site).Error; err != nil {
        return nil, err
    }
    return site, nil
}


Enter fullscreen mode Exit fullscreen mode

site/list.go



package site

import "context"

type ListResponse struct {
    // Sites is the list of monitored sites.
    Sites []*Site `json:"sites"`
}

// List lists the monitored websites.
//
//encore:api public method=GET path=/site
func (s *Service) List(ctx context.Context) (*ListResponse, error) {
    var sites []*Site
    if err := s.db.Find(&sites).Error; err != nil {
        return nil, err
    }
    return &ListResponse{Sites: sites}, nil
}


Enter fullscreen mode Exit fullscreen mode

site/delete.go



package site

import "context"

// Delete deletes a site by id.
//
//encore:api public method=DELETE path=/site/:siteID
func (s *Service) Delete(ctx context.Context, siteID int) error {
    return s.db.Delete(&Site{ID: siteID}).Error
}


Enter fullscreen mode Exit fullscreen mode

🥐 重新启动encore runsite创建数据库,然后调用site.Add端点:



curl -X POST 'http://localhost:4000/site' -d '{"url": "https://encore.dev"}'
{
  "id": 1,
  "url": "https://encore.dev"
}


Enter fullscreen mode Exit fullscreen mode

📝 记录正常运行时间检查

为了在网站关闭或恢复时发出通知,我们需要跟踪它之前的状态。

为此,我们monitor还向服务添加一个数据库。

🥐 创建目录monitor/migrations和文件monitor/migrations/1_create_tables.up.sql



CREATE TABLE checks (
    id BIGSERIAL PRIMARY KEY,
    site_id BIGINT NOT NULL,
    up BOOLEAN NOT NULL,
    checked_at TIMESTAMP WITH TIME ZONE NOT NULL
);


Enter fullscreen mode Exit fullscreen mode

每次检查站点是否启动时,我们都会插入一行数据库。

Check🥐向服务添加一个新端点monitor,该端点接收站点 ID、ping 站点并在表中插入数据库行checks

对于这项服务,我们将使用 Encore 的sqldb而不是 GORM(为了展示两种方法)。

将其添加到monitor/check.go




package monitor

import (
    "context"

    "encore.app/site"
    "encore.dev/storage/sqldb"
)

// Check checks a single site.
//
//encore:api public method=POST path=/check/:siteID
func Check(ctx context.Context, siteID int) error {
    site, err := site.Get(ctx, siteID)
    if err != nil {
        return err
    }
    result, err := Ping(ctx, site.URL)
    if err != nil {
        return err
    }
    _, err = sqldb.Exec(ctx, `
        INSERT INTO checks (site_id, up, checked_at)
        VALUES ($1, $2, NOW())
    `, site.ID, result.Up)
    return err
}


Enter fullscreen mode Exit fullscreen mode

🥐 重新启动encore runmonitor创建数据库,然后调用新的monitor.Check端点:



curl -X POST 'http://localhost:4000/check/1'


Enter fullscreen mode Exit fullscreen mode

🥐 检查数据库以确保一切正常:



encore db shell monitor


Enter fullscreen mode Exit fullscreen mode

你应该看到这个:




psql (14.4, server 14.2)
Type "help" for help.

monitor=> SELECT * FROM checks;
 id | site_id | up |          checked_at
----+---------+----+-------------------------------
  1 |       1 | t  | 2022-10-21 09:58:30.674265+00


Enter fullscreen mode Exit fullscreen mode

如果您看到的是这样的,那么一切都很顺利!🎉

⏰ 添加一个 cron 任务来检查所有网站

我们现在希望定期检查所有被跟踪的网站,以便
在任何一个网站出现故障时我们能够立即做出反应。

我们将CheckAll在服务中创建一个新的 API 端点monitor,它将列出所有跟踪的站点并检查所有站点。

🥐 让我们将为端点编写的一些功能提取
Check到一个单独的函数中。

monitor/check.go看起来应该是这样的:



// Check checks a single site.
//
//encore:api public method=POST path=/check/:siteID
func Check(ctx context.Context, siteID int) error {
    site, err := site.Get(ctx, siteID)
    if err != nil {
        return err
    }
    return check(ctx, site)
}

func check(ctx context.Context, site *site.Site) error {
    result, err := Ping(ctx, site.URL)
    if err != nil {
        return err
    }
    _, err = sqldb.Exec(ctx, `
        INSERT INTO checks (site_id, up, checked_at)
        VALUES ($1, $2, NOW())
    `, site.ID, result.Up)
    return err
}


Enter fullscreen mode Exit fullscreen mode

现在我们准备创建新的CheckAll端点。

🥐CheckAll在里面创建新的端点monitor/check.go



import "golang.org/x/sync/errgroup"

// CheckAll checks all sites.
//
//encore:api public method=POST path=/checkall
func CheckAll(ctx context.Context) error {
    // Get all the tracked sites.
    resp, err := site.List(ctx)
    if err != nil {
        return err
    }

    // Check up to 8 sites concurrently.
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(8)
    for _, site := range resp.Sites {
        site := site // capture for closure
        g.Go(func() error {
            return check(ctx, site)
        })
    }
    return g.Wait()
}


Enter fullscreen mode Exit fullscreen mode

它使用错误组 (errgroup)同时检查最多 8 个站点,如果遇到任何错误,则会提前终止。(请注意,网站宕机不被视为错误。)

🥐 运行go get golang.org/x/sync/errgroup以安装该依赖项。

🥐 现在我们有了一个CheckAll端点,定义一个cron 作业以每 5 分钟自动调用一次。

将其添加到monitor/check.go



import "encore.dev/cron"

// Check all tracked sites every 5 minutes.
var _ = cron.NewJob("check-all", cron.JobConfig{
    Title:    "Check all sites",
    Endpoint: CheckAll,
    Every:    5 * cron.Minute,
})


Enter fullscreen mode Exit fullscreen mode

注意:为了便于开发,cron 作业在本地运行应用程序时不会触发,但在将应用程序部署到云时会工作。

🚀 部署到 Encore 的免费开发云

为了真正试用您的正常运行时间监视器,让我们将其部署到 Encore 的开发云。

Encore 内置 CI/CD,部署过程非常简单git push encore

(您还可以与 GitHub 集成以激活每个 Pull 请求预览环境,在CI/CD 文档中了解更多信息。)

🥐 通过运行以下命令部署您的应用:



git add -A .
git commit -m 'Initial commit'
git push encore


Enter fullscreen mode Exit fullscreen mode

Encore 现在将构建和测试您的应用程序、提供所需的基础设施并将您的应用程序部署到云端。

触发部署后,您将看到一个 URL,您可以在 Encore 的云仪表板中查看其进度。它看起来像这样:https://app.encore.dev/$APP_ID/deploys/...

从那里您还可以查看指标、跟踪,将您的应用链接到 GitHub 存储库以在新的提交上自动部署,并连接您自己的 AWS 或 GCP 帐户以用于生产部署。

🥐 部署完成后,您可以通过以下方式试用正常运行时间监视器:
https://staging-$APP_ID.encr.app/frontend

现在,您已经在云端运行了正常运行时间监视器,做得好!✨

当网站瘫痪时发布 Pub/Sub 事件


如果正常运行时间监控系统在网站瘫痪时无法真正通知您,那么它就没有多大用处。

为此,让我们添加一个Pub/Sub 主题,
每当站点从正常运行转为宕机或从宕机转为正常运行时,我们都会在该主题上发布一条消息。

🔬 类型安全基础设施:实际示例

通常,发布/订阅机制对其处理的消息的数据结构视而不见。这会导致一些难以捕捉的错误,调试起来可能非常困难。

然而,得益于 Encore 的基础设施 SDK,您将获得完全类型安全的基础设施!现在,您可以从消息发布到交付的整个过程中实现端到端的类型安全。这不仅消除了那些令人烦恼且难以调试的错误,还为我们开发人员节省了大量时间。

— 现在让我们真正实现它!👇

🥐 在新文件中使用 Encore 的 Pub/Sub 包定义主题monitor/alerts.go



package monitor

import "encore.dev/pubsub"

// TransitionEvent describes a transition of a monitored site
// from up->down or from down->up.
type TransitionEvent struct {
    // Site is the monitored site in question.
    Site *site.Site `json:"site"`
    // Up specifies whether the site is now up or down (the new value).
    Up bool `json:"up"`
}

// TransitionTopic is a pubsub topic with transition events for when a monitored site
// transitions from up->down or from down->up.
var TransitionTopic = pubsub.NewTopic[*TransitionEvent]("uptime-transition", pubsub.TopicConfig{
    DeliveryGuarantee: pubsub.AtLeastOnce,
})


Enter fullscreen mode Exit fullscreen mode

TransitionTopic现在,如果站点的启动/关闭状态与之前的测量结果不同,我们就发布一条消息。

🥐 创建一个getPreviousMeasurement函数alerts.go来报告最后的启动/关闭状态:



import "encore.dev/storage/sqldb"

// getPreviousMeasurement reports whether the given site was
// up or down in the previous measurement.
func getPreviousMeasurement(ctx context.Context, siteID int) (up bool, err error) {
    err = sqldb.QueryRow(ctx, `
        SELECT up FROM checks
        WHERE site_id = $1
        ORDER BY checked_at DESC
        LIMIT 1
    `, siteID).Scan(&up)

    if errors.Is(err, sqldb.ErrNoRows) {
        // There was no previous ping; treat this as if the site was up before
        return true, nil
    } else if err != nil {
        return false, err
    }
    return up, nil
}


Enter fullscreen mode Exit fullscreen mode

🥐 现在添加一个函数,alerts.go当 up/down 状态不同时有条件地发布消息:



import "encore.app/site"

func publishOnTransition(ctx context.Context, site *site.Site, isUp bool) error {
    wasUp, err := getPreviousMeasurement(ctx, site.ID)
    if err != nil {
        return err
    }
    if isUp == wasUp {
        // Nothing to do
        return nil
    }
    _, err = TransitionTopic.Publish(ctx, &TransitionEvent{
        Site: site,
        Up:   isUp,
    })
    return err
}


Enter fullscreen mode Exit fullscreen mode

🥐 最后修改check中的函数check.go来调用该publishOnTransition函数:



func check(ctx context.Context, site *site.Site) error {
    result, err := Ping(ctx, site.URL)
    if err != nil {
        return err
    }

    // Publish a Pub/Sub message if the site transitions
    // from up->down or from down->up.
    if err := publishOnTransition(ctx, site, result.Up); err != nil {
        return err
    }

    _, err = sqldb.Exec(ctx, `
        INSERT INTO checks (site_id, up, checked_at)
        VALUES ($1, $2, NOW())
    `, site.ID, result.Up)
    return err
}


Enter fullscreen mode Exit fullscreen mode

TransitionTopic现在,每当受监控站点从启动->停止或从停止->启动时,监控系统都会发布消息。

然而,它并不知道也不关心到底是谁在监听这些消息。事实上,现在根本没人监听。所以,我们来解决这个问题,添加一个 Pub/Sub 订阅者,将这些事件发布到 Slack。

当网站瘫痪时发送 Slack 通知

🥐 首先创建一个slack/slack.go包含以下内容的 Slack 服务:



package slack

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "net/http"
)

type NotifyParams struct {
    // Text is the Slack message text to send.
    Text string `json:"text"`
}

// Notify sends a Slack message to a pre-configured channel using a
// Slack Incoming Webhook (see https://api.slack.com/messaging/webhooks).
//
//encore:api private
func Notify(ctx context.Context, p *NotifyParams) error {
    reqBody, err := json.Marshal(p)
    if err != nil {
        return err
    }
    req, err := http.NewRequestWithContext(ctx, "POST", secrets.SlackWebhookURL, bytes.NewReader(reqBody))
    if err != nil {
        return err
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode >= 400 {
        body, _ := io.ReadAll(resp.Body)
        return fmt.Errorf("notify slack: %s: %s", resp.Status, body)
    }
    return nil
}

var secrets struct {
    // SlackWebhookURL defines the Slack webhook URL to send
    // uptime notifications to.
    SlackWebhookURL string
}


Enter fullscreen mode Exit fullscreen mode

🥐 现在,前往您选择的 Slack 社区(您有权在该社区创建新的 Incoming Webhook)。如果您还没有,请加入Encore Slack并提出问题#help,我们很乐意为您提供帮助。

🥐 获得 Webhook URL 后,使用 Encore 的内置机密管理器将其保存为机密:



encore secret set --local,dev,prod SlackWebhookURL


Enter fullscreen mode Exit fullscreen mode

🥐slack.Notify通过 cURL 调用来测试端点:



curl 'http://localhost:4000/slack.Notify' -d '{"Text": "Testing Slack webhook"}'


Enter fullscreen mode Exit fullscreen mode

您应该会看到测试 Slack webhook消息出现在您为 webhook 指定的 Slack 频道中。

🥐 现在是时候添加一个 Pub/Sub 订阅者了,这样当受监控的站点启动或关闭时,它就会自动通知 Slack。将以下内容添加到slack/slack.go



import (
    "encore.dev/pubsub"
    "encore.app/monitor"
)

var _ = pubsub.NewSubscription(monitor.TransitionTopic, "slack-notification", pubsub.SubscriptionConfig[*monitor.TransitionEvent]{
    Handler: func(ctx context.Context, event *monitor.TransitionEvent) error {
        // Compose our message.
        msg := fmt.Sprintf("*%s is down!*", event.Site.URL)
        if event.Up {
            msg = fmt.Sprintf("*%s is back up.*", event.Site.URL)
        }

        // Send the Slack notification.
        return Notify(ctx, &NotifyParams{Text: msg})
    },
})


Enter fullscreen mode Exit fullscreen mode

🎉 部署完成的 Uptime Monitor

现在,您可以部署完成的 Uptime Monitor,并完成 Slack 集成!

🥐 和以前一样,将您的应用部署到云端就像运行以下命令一样简单:



git add -A .
git commit -m 'Add slack integration'
git push encore


Enter fullscreen mode Exit fullscreen mode

现在,您已拥有一个功能齐全、可立即投入生产的云端正常运行时间监控系统。做得好!✨

🤯 总结:所有这些代码只用了 300 多行

现在,您已经构建了一个功能齐全的正常运行时间监控系统,并且只用很少的代码就完成了大量的工作:

  • 您已构建了三种不同的服务(sitemonitorslack
  • 您已添加两个数据库(到sitemonitor服务)用于跟踪受监控的站点和监控结果
  • 您已添加 cron 作业,用于每 5 分钟自动检查一次网站
  • 您已经设置了一个完全类型安全的 Pub/Sub 实现,以将监控系统与 Slack 通知分离
  • 您已添加 Slack 集成,使用 secrets 安全地存储 webhook URL,并监听 Pub/Sub 订阅以获取上/下转换事件

所有这些仅用了 300 多行代码!🤯

🎉 干得好 - 你完成了!

继续使用这些开源应用程序模板进行构建。👈

如果您有任何疑问或想分享您的工作,请加入Encore 社区 Slack中的开发者聚会。👈

文章来源:https://dev.to/encore/building-an-event-driven-system-in-go-using-pubsub-4l0h
PREV
从单体到微服务:架构入门
NEXT
使用 TypeScript 构建生产就绪的 SaaS 应用程序