如何在 Golang 中构建容器化微服务:包含示例用例问题陈述架构操作和流程入门项目布局的分步指南

2025-06-10

如何在 Golang 中构建容器化微服务:带有示例用例的分步指南

问题陈述

建筑学

操作和流程

入门

项目布局

介绍

我发现探索微服务和云原生应用领域非常有趣。近年来,微服务彻底改变了 Web 应用架构,提供了模块化、可独立部署的服务,重塑了我们构建和扩展应用程序的方式。此外,REST 和 gRPC 等传输协议也不断发展,效率和速度都得到了提升,使其成为微服务的理想选择。

我们的使命:使用 Go 构建云原生应用程序

在本篇博文中,我将指导您使用 Go 和微服务架构构建云原生应用程序。整个过程比您想象的要简单得多,我们将把它分解成几个易于管理的步骤。

步骤 1:构建微服务

1.1 创建微服务和容器化服务

在此步骤中,我们将创建微服务和容器化服务,每个服务都有与其逻辑组件密切相关的特定的、独立的任务。

1.2 利用 Go-Kit

我们将利用 Go-Kit,这是一个用于构建和开发每个服务组件的优秀框架。

1.3 开发 API

我们将使用 HTTP (REST) 和 Protobuf (gRPC) 作为传输机制来开发 API。我们还将使用 PostgreSQL 作为数据库,并探索在 Azure 上部署 API 管理和 CI/CD。

注意:虽然我们将讨论部署和 CI/CD,但在 Azure 或任何其他云平台上配置它们的具体细节超出了本博客的范围。

先决条件

在深入研究之前,请确保您具备必要的先决条件:

  • 对 Web 服务、Rest API 和 gRPC 有基本的了解。
  • GoLand 或 VS Code 作为您的开发环境。
  • 已正确安装并配置 Go 环境。如果没有,请点击此链接获取指导。
  • 在 GOPATH 下设置一个新的项目目录。
  • 熟悉标准 Golang 项目结构,您可以在这里找到。
  • 已安装 PostgreSQL 客户端。
  • 我们将使用 Go Kit 来简化微服务开发。

问题陈述

我们的目标是开发一个 Web 应用程序来解决以下问题:

假设一家专门出版书籍和期刊的全球出版公司。他们需要一项服务来为文档添加水印。这些文档(无论是书籍还是期刊)都包含标题、作者和水印属性。水印可以处于以下三种状态之一:“已开始”、“进行中”或“已完成”。此外,只有特定用户才有权在文档上添加水印,并且一旦添加水印,该操作将一次性生效;文档永远无法再次添加水印。

如需更详细地了解此要求,请参阅此链接

建筑学

在这个项目中,我们将建立三个微服务:身份验证服务、数据库服务和水印服务。这些服务将与 PostgreSQL 数据库服务器交互,并以 API 网关作为入口点。

图片描述

现在,让我们深入了解每项服务的细节:


身份验证服务

此服务负责基于用户和角色的访问控制。它将根据用户角色对其进行身份验证,并返回相应的 HTTP 状态代码 - 授权用户返回 200,未授权用户返回 401。

蜜蜂:

  • /user/access:对此端点的 GET 请求,用于用户身份验证,以用户名作为输入并返回其角色和相关权限。
  • /authenticate:另一个安全的 GET 请求,这次带有用户和操作参数,用于对指定操作的用户进行身份验证。
  • /healthz:一个安全的 GET 请求,用于检查服务的状态。

数据库服务

此服务管理应用程序的数据库,存储用户信息、用户角色、访问权限以及不带水印的文档。文档在创建时不能带有水印;只有当数据输入有效且数据库服务返回成功时,才算创建成功。

我们将为这两项服务使用两个独立的数据库,遵循微服务架构的“每个服务单数据库”原则。

蜜蜂:

  • /get:根据特定过滤器检索文档的安全 GET 请求。
  • /update:更新文档的安全 POST 请求。
  • /add:一个安全的 POST 请求,用于添加新文档并返回其标题 ID。
  • /remove:根据标题 ID 删除文档的安全 POST 请求。
  • /healthz:一个安全的 GET 请求,用于检查服务的状态。

水印服务

此核心服务处理水印文档的请求。当用户需要为文档添加水印时,必须提供工单 ID 以及所需的水印。该服务将内部调用数据库的更新 API,返回水印处理状态,初始状态为“已启动”,然后转换为“进行中”,最终返回“已完成”(如果请求无效,则返回“错误”)。

蜜蜂:

  • /get:根据特定过滤器检索文档的安全 GET 请求。
  • /status:一个安全的 GET 请求,用于检查特定票证 ID 的水印状态。
  • /addDocument:一个安全的 POST 请求,用于添加文档并返回标题 ID。
  • /watermark:主要水印操作,接受标题和水印字符串的安全 POST 请求。
  • /healthz:一个安全的 GET 请求,用于检查服务的状态。

操作和流程

水印服务 API 是用户请求水印或添加文档的网关。身份验证和数据库服务 API 是私有的,仅供其他服务内部调用。用户仅与 API 网关 URL 交互。

工作原理如下:

  • 用户访问 API 网关 URL,提供其用户名、票证 ID 和水印。
  • 用户不需要关心身份验证或数据库服务的复杂性。
  • API 网关验证用户的请求和有效负载。
  • 网关中明确定义的 API 转发规则将请求路由到相关服务。
  • 对于水印请求,规则会将其转发到身份验证服务进行用户身份验证。身份验证服务会检查用户的授权,并返回相应的状态码。
  • 授权服务检查用户的角色和权限,一旦获得授权,就将请求转发给水印服务。
  • 水印服务执行必要的操作,包括添加水印或文档,并将结果转发给用户。
  • 如果请求是添加文档,服务将返回 Ticket ID。如果请求是添加水印,则返回操作状态。

请注意,每个用户根据文档类型具有特定的角色,而不是特定的书籍或期刊名称。


入门

首先,在 $GOPATH 中为我们的应用程序创建一个文件夹。这将作为我们服务集合的根目录。

项目布局

我们的项目将遵循标准的 Golang 项目布局。

  • api:此目录存储了 gRPC 接口的 API swagger 文件和 protobuf 文件的版本。
  • cmd:它包含所有服务的入口点(main.go)文件,以及任何其他容器镜像。
  • docs:这包含项目的文档。
  • config:在这里存储示例和特定的配置文件。
  • 部署:包含用于部署应用程序的部署文件。
  • internal: Go 编译器可以识别的常规内部包。它包含项目内共享的包。目录结构说明:
  • pkg:在此目录中,您将找到所有服务的完整功能代码,每个服务都组织成不同的包。
  • 测试:此目录包含服务的所有集成和端到端(E2E)测试。
  • 供应商:在这里,您将发现本地存储的第三方依赖项存储库,以防止将来出现版本不一致。

水印服务:

  1. 在 Go kit 框架中,服务应该始终通过接口来定义。

在“pkg”目录中创建一个名为“watermark”的包。在该包中,创建一个名为“service.go”的新文件,作为我们服务的蓝图。



package watermark

import (
    "context"
    "github.com/velotiotech/watermark-service/internal"
)

type Service interface {
    // Get the list of all documents
    Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error)
    Status(ctx context.Context, ticketID string) (internal.Status, error)
    Watermark(ctx context.Context, ticketID, mark string) (int, error)
    AddDocument(ctx context.Context, doc *internal.Document) (string, error)
    ServiceStatus(ctx context.Context) (int, error)
}


Enter fullscreen mode Exit fullscreen mode
  1. 我们需要五个端点来处理服务接口中定义的方法的请求。为了有效地处理多个并发请求,我们使用了 context 包,尽管在本篇博文中我们可能不会大量使用它。但这仍然是推荐的使用方式。

  2. 实施我们的服务:



package watermark

import (
    "context"
    "net/http"
    "os"

    "github.com/velotiotech/watermark-service/internal"
    "github.com/go-kit/kit/log"
    "github.com/lithammer/shortuuid/v3"
)

type watermarkService struct{}

func NewService() Service {
    return &watermarkService{}
}

func (w *watermarkService) Get(_ context.Context, filters ...internal.Filter) ([]internal.Document, error) {
    // Query the database using the filters and return the list of documents.
    // Return an error if the filter (key) is invalid or if no item is found.
    doc := internal.Document{
        Content: "book",
        Title:   "Harry Potter and the Half-Blood Prince",
        Author:  "J.K. Rowling",
        Topic:   "Fiction and Magic",
    }
    return []internal.Document{doc}, nil
}

func (w *watermarkService) Status(_ context.Context, ticketID string) (internal.Status, error) {
    // Query the database using the ticketID and return the document info.
    // Return an error if the ticketID is invalid or no document exists for that ticketID.
    return internal.InProgress, nil
}

func (w *watermarkService) Watermark(_ context.Context, ticketID, mark string) (int, error) {
    // Update the database entry with the watermark field as non-empty.
    // First, check if the watermark status is not already in InProgress, Started, or Finished state.
    // If yes, then return an invalid request.
    // Return an error if no item is found using the ticketID.
    return http.StatusOK, nil
}

func (w *watermarkService) AddDocument(_ context.Context, doc *internal.Document) (string, error) {
    // Add the document entry in the database by calling the database service.
    // Return an error if the doc is invalid and/or if there's a database entry error.
    newTicketID := shortuuid.New()
    return newTicketID, nil
}

func (w *watermarkService) ServiceStatus(_ context.Context) (int, error) {
    logger.Log("Checking the Service health...")
    return http.StatusOK, nil
}

var logger log.Logger

func init() {
    logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}


Enter fullscreen mode Exit fullscreen mode

我们定义了“watermarkService”类型,它是一个空结构体,用于实现我们定义的服务接口。该结构体的实现对外界隐藏。

“NewService”是作为我们服务“对象”的构造函数创建的。这是此包之外唯一可用于实例化服务的函数。

  1. 现在,我们将创建“endpoints”包,它将包含两个文件:一个用于存储所有类型的请求和响应,另一个用于实现请求解析和调用适当的服务功能。
  • 创建一个名为“reqJSONMap.go”的文件。在此文件中,定义所有请求和响应的结构体及其字段。在这些结构体中添加请求输入和响应输出所需的字段。


package endpoints

import "github.com/velotiotech/watermark-service/internal"

type GetRequest struct {
    Filters []internal.Filter `json:"filters,omitempty"`
}

type GetResponse struct {
    Documents []internal.Document `json:"documents"`
    Err       string              `json:"err,omitempty"`
}

type StatusRequest struct {
    TicketID string `json:"ticketID"`
}

type StatusResponse struct {
    Status internal.Status `json:"status"`
    Err    string          `json:"err,omitempty"`
}

type WatermarkRequest struct {
    TicketID string `json:"ticketID"`
    Mark     string `json:"mark"`
}

type WatermarkResponse struct {
    Code int    `json:"code"`
    Err  string `json:"err"`
}

type AddDocumentRequest struct {
    Document *internal.Document `json:"document"`
}

type AddDocumentResponse struct {
    TicketID string `json:"ticketID"`
    Err      string `json:"err,omitempty"`
}

type ServiceStatusRequest struct{}

type ServiceStatusResponse struct {
    Code int    `json:"status"`
    Err  string `json:"err,omitempty"`
}


Enter fullscreen mode Exit fullscreen mode
  • 创建一个名为“endpoints.go”的文件。在此文件中,您将实现服务已实现函数的实际调用。


package endpoints

import (
    "context"
    "errors"
    "os"

    "github.com/aayushrangwala/watermark-service/internal"
    "github.com/aayushrangwala/watermark-service/pkg/watermark"

    "github.com/go-kit/kit/endpoint"
    "github.com/go-kit/kit/log"
)

type Set struct {
    GetEndpoint           endpoint.Endpoint
    AddDocumentEndpoint   endpoint.Endpoint
    StatusEndpoint        endpoint.Endpoint
    ServiceStatusEndpoint endpoint.Endpoint
    WatermarkEndpoint     endpoint.Endpoint
}

func NewEndpointSet(svc watermark.Service) Set {
    return Set{
        GetEndpoint:           MakeGetEndpoint(svc),
        AddDocumentEndpoint:   MakeAddDocumentEndpoint(svc),
        StatusEndpoint:        MakeStatusEndpoint(svc),
        ServiceStatusEndpoint: MakeServiceStatusEndpoint(svc),
        WatermarkEndpoint:     MakeWatermarkEndpoint(svc),
    }
}

func MakeGetEndpoint(svc watermark.Service) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(GetRequest)
        docs, err := svc.Get(ctx, req.Filters...)
        if err != nil {
            return GetResponse{docs, err.Error()}, nil
        }
        return GetResponse{docs, ""}, nil
    }
}

func MakeStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(StatusRequest)
        status, err := svc.Status(ctx, req.TicketID)
        if err != nil {
            return StatusResponse{Status: status, Err: err.Error()}, nil
        }
        return StatusResponse{Status: status, Err

: ""}, nil
    }
}

func MakeAddDocumentEndpoint(svc watermark.Service) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(AddDocumentRequest)
        ticketID, err := svc.AddDocument(ctx, req.Document)
        if err != nil {
            return AddDocumentResponse{TicketID: ticketID, Err: err.Error()}, nil
        }
        return AddDocumentResponse{TicketID: ticketID, Err: ""}, nil
    }
}

func MakeWatermarkEndpoint(svc watermark.Service) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        req := request.(WatermarkRequest)
        code, err := svc.Watermark(ctx, req.TicketID, req.Mark)
        if err != nil {
            return WatermarkResponse{Code: code, Err: err.Error()}, nil
        }
        return WatermarkResponse{Code: code, Err: ""}, nil
    }
}

func MakeServiceStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
    return func(ctx context.Context, request interface{}) (interface{}, error) {
        _ = request.(ServiceStatusRequest)
        code, err := svc.ServiceStatus(ctx)
        if err != nil {
            return ServiceStatusResponse{Code: code, Err: err.Error()}, nil
        }
        return ServiceStatusResponse{Code: code, Err: ""}, nil
    }
}

func (s *Set) Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error) {
    resp, err := s.GetEndpoint(ctx, GetRequest{Filters: filters})
    if err != nil {
        return []internal.Document{}, err
    }
    getResp := resp.(GetResponse)
    if getResp.Err != "" {
        return []internal.Document{}, errors.New(getResp.Err)
    }
    return getResp.Documents, nil
}

func (s *Set) ServiceStatus(ctx context.Context) (int, error) {
    resp, err := s.ServiceStatusEndpoint(ctx, ServiceStatusRequest{})
    svcStatusResp := resp.(ServiceStatusResponse)
    if err != nil {
        return svcStatusResp.Code, err
    }
    if svcStatusResp.Err != "" {
        return svcStatusResp.Code, errors.New(svcStatusResp.Err)
    }
    return svcStatusResp.Code, nil
}

func (s *Set) AddDocument(ctx context.Context, doc *internal.Document) (string, error) {
    resp, err := s.AddDocumentEndpoint(ctx, AddDocumentRequest{Document: doc})
    if err != nil {
        return "", err
    }
    adResp := resp.(AddDocumentResponse)
    if adResp.Err != "" {
        return "", errors.New(adResp.Err)
    }
    return adResp.TicketID, nil
}

func (s *Set) Status(ctx context.Context, ticketID string) (internal.Status, error) {
    resp, err := s.StatusEndpoint(ctx, StatusRequest{TicketID: ticketID})
    if err != nil {
        return internal.Failed, err
    }
    stsResp := resp.(StatusResponse)
    if stsResp.Err != "" {
        return internal.Failed, errors.New(stsResp.Err)
    }
    return stsResp.Status, nil
}

func (s *Set) Watermark(ctx context.Context, ticketID, mark string) (int, error) {
    resp, err := s.WatermarkEndpoint(ctx, WatermarkRequest{TicketID: ticketID, Mark: mark})
    wmResp := resp.(WatermarkResponse)
    if err != nil {
        return wmResp.Code, err
    }
    if wmResp.Err != "" {
        return wmResp.Code, errors.New(wmResp.Err)
    }
    return wmResp.Code, nil
}

var logger log.Logger

func init() {
    logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}


Enter fullscreen mode Exit fullscreen mode

在这个文件中,我们有一个“Set”结构体,它是所有端点的集合。我们还有一个“Set”的构造函数。该文件定义了一些内部构造函数,这些函数返回实现了 Go Kit 接口的通用端点的对象,例如“MakeGetEndpoint()”、“MakeStatusEndpoint()”等等。

为了公开 Get、Status、Watermark、ServiceStatus 和 AddDocument API,我们为每个 API 创建了端点。这些函数负责处理传入的请求并调用特定的服务方法。

  1. 添加“transports”方法来公开服务。我们的服务将支持 HTTP,并使用 REST API 和 Protobuf 以及 gRPC 进行公开。

在“watermark”目录中创建一个名为“transport”的单独包。此包将包含特定类型传输机制的所有处理程序、解码器和编码器。

步骤 6:创建 HTTP 传输函数和处理程序

首先创建一个名为 http.go 的文件,其中包含 HTTP 的传输函数和处理程序。这些函数负责定义 API 路由及其各自的处理程序函数。



package transport

import (
    context"
    "encoding/json"
    "net/http"
    "os"

    "github.com/velotiotech/watermark-service/internal/util"
    "github.com/velotiotech/watermark-service/pkg/watermark/endpoints"

    "github.com/go-kit/kit/log"
    httptransport "github.com/go-kit/kit/transport/http"
)

// NewHTTPHandler initializes an HTTP handler for API endpoints.
func NewHTTPHandler(ep endpoints.Set) http.Handler {
    m := http.NewServeMux()

    // Define routes and associated handler functions.
    m.Handle("/healthz", httptransport.NewServer(
        ep.ServiceStatusEndpoint,
        decodeHTTPServiceStatusRequest,
        encodeResponse,
    ))
    m.Handle("/status", httptransport.NewServer(
        ep.StatusEndpoint,
        decodeHTTPStatusRequest,
        encodeResponse,
    ))
    m.Handle("/addDocument", httptransport.NewServer(
        ep.AddDocumentEndpoint,
        decodeHTTPAddDocumentRequest,
        encodeResponse,
    ))
    m.Handle("/get", httptransport.NewServer(
        ep.GetEndpoint,
        decodeHTTPGetRequest,
        encodeResponse,
    ))
    m.Handle("/watermark", httptransport.NewServer(
        ep.WatermarkEndpoint,
        decodeHTTPWatermarkRequest,
        encodeResponse,
    ))

    return m
}

// Functions for decoding HTTP requests.

func decodeHTTPGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
    var req endpoints.GetRequest
    if r.ContentLength == 0 {
        logger.Log("Get request with no body")
        return req, nil
    }
    err := json.NewDecoder(r.Body).Decode(&req)
    if err != nil {
        return nil, err
    }
    return req, nil
}

// ... (Similar decode functions for other endpoints)

// Function for encoding HTTP responses.
func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
    if e, ok := response.(error); ok && e != nil {
        encodeError(ctx, e, w)
        return nil
    }
    return json.NewEncoder(w).Encode(response)
}

// ... (Similar functions for handling errors)
func encodeError(_ context.Context, err error, w http.ResponseWriter) {
    w.Header().Set("Content-Type", "application/json; charset=utf-8")
    switch err {
    case util.ErrUnknown:
        w.WriteHeader(http.StatusNotFound)
    case util.ErrInvalidArgument:
        w.WriteHeader(http.StatusBadRequest)
    default:
        w.WriteHeader(http.StatusInternalServerError)
    }
    json.NewEncoder(w).Encode(map[string]interface{}{
        "error": err.Error(),
    })
}

var logger log.Logger

func init() {
    logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}


Enter fullscreen mode Exit fullscreen mode

步骤 7:创建 gRPC 传输函数和处理程序

在同一传输包中名为 grpc.go 的文件中,类似于 HTTP 传输,您将为 gRPC 创建传输函数和处理程序。此文件将 protobuf 有效负载映射到相应的请求和响应。



package transport

import (
    context"

    "github.com/velotiotech/watermark-service/api/v1/pb/watermark"

    "github.com/velotiotech/watermark-service/internal"
    "github.com/velotiotech/watermark-service/pkg/watermark/endpoints"

    grpctransport "github.com/go-kit/kit/transport/grpc"
)

// grpcServer is a struct responsible for creating gRPC servers and handling endpoint mappings.
type grpcServer struct {
    get           grpctransport.Handler
    status        grpctransport.Handler
    addDocument   grpctransport.Handler
    watermark     grpctransport.Handler
    serviceStatus grpctransport.Handler
}

// NewGRPCServer initializes a gRPC server for the provided endpoints.
func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
    return &grpcServer{
        get: grpctransport.NewServer(
            ep.GetEndpoint,
            decodeGRPCGetRequest,
            decodeGRPCGetResponse,
        ),
        status: grpctransport.NewServer(
            ep.StatusEndpoint,
            decodeGRPCStatusRequest,
            decodeGRPCStatusResponse,
        ),
        addDocument: grpctransport.NewServer(
            ep.AddDocumentEndpoint,
            decodeGRPCAddDocumentRequest,
            decodeGRPCAddDocumentResponse,
        ),
        watermark: grpctransport.NewServer(
            ep.WatermarkEndpoint,
            decodeGRPCWatermarkRequest,
            decodeGRPCWatermarkResponse,
        ),
        serviceStatus: grpctransport.NewServer(
            ep.ServiceStatusEndpoint,
            decodeGRPCServiceStatusRequest,
            decodeGRPCServiceStatusResponse,
        ),
    }
}

// Functions for implementing gRPC service endpoints.

func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
    _, rep, err := g.get.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.GetReply), nil
}

func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
    _, rep, err := g.get.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.ServiceStatusReply), nil
}

func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
    _, rep, err := g.addDocument.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.AddDocumentReply), nil
}

func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
    _, rep, err := g.status.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.StatusReply), nil
}

func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
    _, rep, err := g.watermark.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.WatermarkReply), nil
}


func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.GetRequest)
    var filters []internal.Filter
    for _, f := range req.Filters {
        filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
    }
    return endpoints.GetRequest{Filters: filters}, nil
}

func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.StatusRequest)
    return endpoints.StatusRequest{TicketID: req.TicketID}, nil
}

func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.WatermarkRequest)
    return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
}

func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.AddDocumentRequest)
    doc := &internal.Document{
        Content:   req.Document.Content,
        Title:     req.Document.Title,
        Author:    req.Document.Author,
        Topic:     req.Document.Topic,
        Watermark: req.Document.Watermark,
    }
    return endpoints.AddDocumentRequest{Document: doc}, nil
}

func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    return endpoints.ServiceStatusRequest{}, nil
}

func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.GetReply)
    var docs []internal.Document
    for _, d := range reply.Documents {
        doc := internal.Document{
            Content:   d.Content,
            Title:     d.Title,
            Author:    d.Author,
            Topic:     d.Topic,
            Watermark: d.Watermark,
        }
        docs = append(docs, doc)
    }
    return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
}

func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.StatusReply)
    return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
}

func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.WatermarkReply)
    return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
}

func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.AddDocumentReply)
    return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
}

func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.ServiceStatusReply)
    return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
}


Enter fullscreen mode Exit fullscreen mode

步骤 8:为 gRPC 服务创建 Proto 文件

在包中创建一个 proto 文件 watermarksvc.proto api/v1/pb。此 proto 文件用于定义服务接口和请求/响应结构。



syntax = "proto3";

package pb;

service Watermark {
    rpc Get (GetRequest) returns (GetReply) {}

    rpc Watermark (WatermarkRequest) returns (WatermarkReply) {}

    rpc Status (StatusRequest) returns (StatusReply) {}

    rpc AddDocument (AddDocumentRequest) returns (AddDocumentReply) {}

    rpc ServiceStatus (ServiceStatusRequest) returns (ServiceStatusReply) {}
}

message Document {
    string content = 1;
    string title = 2;
    string author = 3;
    string topic = 4;
    string watermark = 5;
}

message GetRequest {
    message Filters {
        string key = 1;
        string value = 2;
    }
    repeated Filters filters = 1;
}

message GetReply {
    repeated Document documents = 1;
    string Err = 2;
}

message StatusRequest {
    string ticketID = 1;
}

message StatusReply {
    enum Status {
        PENDING = 0;
        STARTED = 1;
        IN_PROGRESS = 2;
        FINISHED = 3;
        FAILED = 4;
    }
    Status status = 1;
    string Err = 2;
}

message WatermarkRequest {
    string ticketID = 1;
    string mark = 2;
}

message WatermarkReply {
    int64 code = 1;
    string err = 2;
}

message AddDocumentRequest {
    Document document = 1;
}

message AddDocumentReply {
    string ticketID = 1;
    string err = 2;
}

message ServiceStatusRequest {}

message ServiceStatusReply {
    int64 code = 1;
    string err = 2;
}


Enter fullscreen mode Exit fullscreen mode

该proto文件定义了gRPC的服务接口和请求/响应消息结构。

最后可以.pb使用工具从这个proto文件生成相应的文件protoc,但是生成文件的过程.pb不在本文档的介绍中。

实现请求和响应函数:

为了成功将协议缓冲区 (protobuf) 的请求和响应结构与我们的端点请求和响应结构集成,我们需要创建必要的解码和编码函数。这些函数有助于将 protobuf 的请求/响应结构映射到我们的端点的请求/响应结构。

让我们用一个例子来说明这个过程:考虑函数decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error)。在这个函数中,我们从 中提取相关信息grpcReq,该信息被断言为 类型pb.GetRequest。然后,我们使用提取的字段来填充一个 类型的新结构体endpoints.GetRequest{}。其他请求和响应也应该实现类似的解码和编码函数。

这是包中的代码transport



package transport

import (
    // Imports...

    grpctransport "github.com/go-kit/kit/transport/grpc"
)

type grpcServer struct {
    get           grpctransport.Handler
    status        grpctransport.Handler
    addDocument   grpctransport.Handler
    watermark     grpctransport.Handler
    serviceStatus grpctransport.Handler
}

func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
    return &grpcServer{
        get: grpctransport.NewServer(
            ep.GetEndpoint,
            decodeGRPCGetRequest,
            decodeGRPCGetResponse,
        ),
        status: grpctransport.NewServer(
            ep.StatusEndpoint,
            decodeGRPCStatusRequest,
            decodeGRPCStatusResponse,
        ),
        addDocument: grpctransport.NewServer(
            ep.AddDocumentEndpoint,
            decodeGRPCAddDocumentRequest,
            decodeGRPCAddDocumentResponse,
        ),
        watermark: grpctransport.NewServer(
            ep.WatermarkEndpoint,
            decodeGRPCWatermarkRequest,
            decodeGRPCWatermarkResponse,
        ),
        serviceStatus: grpctransport.NewServer(
            ep.ServiceStatusEndpoint,
            decodeGRPCServiceStatusRequest,
            decodeGRPCServiceStatusResponse,
        ),
    }
}

func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
    _, rep, err := g.get.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.GetReply), nil
}

func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
    _, rep, err := g.get.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.ServiceStatusReply), nil
}

func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
    _, rep, err := g.addDocument.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.AddDocumentReply), nil
}

func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
    _, rep, err := g.status.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.StatusReply), nil
}

func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
    _, rep, err := g.watermark.ServeGRPC(ctx, r)
    if err != nil {
        return nil, err
    }
    return rep.(*watermark.WatermarkReply), nil
}

func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.GetRequest)
    var filters []internal.Filter
    for _, f := range req.Filters {
        filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
    }
    return endpoints.GetRequest{Filters: filters}, nil
}

func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.StatusRequest)
    return endpoints.StatusRequest{TicketID: req.TicketID}, nil
}

func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.WatermarkRequest)
    return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
}

func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    req := grpcReq.(*watermark.AddDocumentRequest)
    doc := &internal.Document{
        Content:   req.Document.Content,
        Title:     req.Document.Title,
        Author:    req.Document.Author,
        Topic:     req.Document.Topic,
        Watermark: req.Document.Watermark,
    }
    return endpoints.AddDocumentRequest{Document: doc}, nil
}

func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
    return endpoints.ServiceStatusRequest{}, nil
}

func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.GetReply)
    var docs []internal.Document
    for _, d := range reply.Documents {
        doc := internal.Document{
            Content:   d.Content,
            Title:     d.Title,
            Author:    d.Author,
            Topic:     d.Topic,
            Watermark: d.Watermark,
        }
        docs = append(docs, doc)
    }
    return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
}

func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.StatusReply)
    return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
}

func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.WatermarkReply)
    return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
}

func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.AddDocumentReply)
    return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
}

func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
    reply := grpcReply.(*watermark.ServiceStatusReply)
    return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
}


Enter fullscreen mode Exit fullscreen mode

服务器入口点:

现在,下一步是在cmd每个服务的目录中创建入口点文件(main)。我们已经通过调用服务函数将适当的路由映射到端点,并使用ServeGRPC()函数将 Proto 服务服务器与端点集成,现在可以专注于构建和启动 HTTP 和 gRPC 服务器了。

为此,请在目录watermark中创建一个包cmd和一个watermark.go文件。此文件将包含用于启动和停止服务的 HTTP 和 gRPC 服务器的代码。

这是该包的代码main




package main

import (
    "fmt"
    "net"
    "net/http"
    "os"
    "os/signal"
    "syscall"

    pb "github.com/velotiotech/watermark-service/api/v1/pb/watermark"
    "github.com/velotiotech/watermark-service/pkg/watermark"
    "github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
    "github.com/velotiotech/watermark-service/pkg/watermark/transport"

    "github.com/go-kit/kit/log"
    kitgrpc "github.com/go-kit/kit/transport/grpc"
    "github.com/oklog/oklog/pkg/group"
    "google.golang.org/grpc"
)

const (
    defaultHTTPPort = "8081"
    defaultGRPCPort = "8082"
)

func main() {
    var (
        logger   log.Logger
        httpAddr = net.JoinHostPort("localhost", envString("HTTP_PORT", defaultHTTPPort))
        grpcAddr = net.JoinHostPort("localhost", envString("GRPC_PORT", defaultGRPCPort))
    )

    logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
    logger = log.With(logger, "ts", log.DefaultTimestampUTC)

    var (
        service     = watermark.NewService()
        eps         = endpoints.NewEndpointSet(service)
        httpHandler = transport.NewHTTPHandler(eps)
        grpcServer  = transport.NewGRPCServer(eps)
    )

    var g group.Group
    {
        // The HTTP listener mounts the Go kit HTTP handler we created.
        httpListener, err := net.Listen("tcp", httpAddr)
        if err != nil {
            logger.Log("transport", "HTTP", "during", "Listen", "err", err)
            os.Exit(1)
        }
        g.Add(func() error {
            logger.Log("transport", "HTTP", "addr", httpAddr)
            return http.Serve(httpListener, httpHandler)
        }, func(error) {
            httpListener.Close()
        })
    }
    {
        // The gRPC listener mounts the Go kit gRPC server we created.
        grpcListener, err := net.Listen("tcp", grpcAddr)
        if err != nil {
            logger.Log("transport", "gRPC", "during", "Listen", "err", err)
            os.Exit(1)
        }
        g.Add(func() error {
            logger.Log("transport", "gRPC", "addr", grpcAddr)
            // we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
            // the here demonstrated zipkin tracing middleware.
            baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
            pb.RegisterWatermarkServer(baseServer, grpcServer)
            return baseServer.Serve(grpcListener)
        }, func(error) {
            grpcListener.Close()
        })
    }
    {
        // This function just sits and waits for ctrl-C.
        cancelInterrupt := make(chan struct{})
        g.Add(func() error {
            c := make(chan os.Signal, 1)
            signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
            select {
            case sig := <-c:
                return fmt.Errorf("received signal %s", sig)
            case <-cancelInterrupt:
                return nil
            }
        }, func(error) {
            close(cancelInterrupt)
        })
    }
    logger.Log("exit", g.Run())
}

func envString(env, fallback string) string {
    e := os.Getenv(env)
    if e == "" {
        return fallback
    }
    return e
}


Enter fullscreen mode Exit fullscreen mode

在这部分代码中,我们遵循以下步骤:

  • 我们为服务器指定固定的监听端口:HTTP 服务器为 8081,gRPC 服务器为 8082。

  • 我们继续创建 HTTP 和 gRPC 服务器,以及服务后端和服务本身的端点。

  • 我们利用 来oklog.Group有效地管理多个 Goroutine。我们建立了三个 Goroutine:一个用于 HTTP 服务器,一个用于 gRPC 服务器,第三个用于监控取消中断。这确保了这些组件的高效并发运行。

总而言之,我们讨论了如何为 protobuf 结构设置解码和编码函数,以及如何为 HTTP 和 gRPC 服务器创建入口点文件,同时保持代码结构完整。服务器一旦启动,即可处理传入的请求。

要运行该服务,请执行以下命令:



go run ./cmd/watermark/watermark.go


Enter fullscreen mode Exit fullscreen mode

此命令将启动水印服务,HTTP 和 gRPC 服务器均启动并运行,准备处理请求。

服务器已在本地启动。现在,您可以使用 Postman 或执行 curl 命令与其中一个可用端点进行交互来测试它。以下是示例:



➜ ~ curl http://localhost:8081/healthz
{"status":200}


Enter fullscreen mode Exit fullscreen mode

此命令检查服务的状态,它应该返回“状态”为 200 的响应。

我们已经有效地开发了一项服务并执行了其端点。

此外:

我始终致力于确保项目全面,涵盖所有与维护相关的必要方面。这包括创建结构良好的 README 文件、设置正确的 .gitignore 和 .dockerignore 文件、编写 Makefile、Dockerfile、golang-ci-lint 配置文件以及配置 CI/CD 流水线等等。

对于这三个服务,我创建了位于 /images/ 路径下的不同 Dockerfile。我们使用一个多阶段 Dockerfile来创建并运行服务的二进制文件。这包括将相关代码目录复制到 Docker 镜像中,一步构建镜像,然后在同一个文件中创建新镜像,同时从上一个镜像中传输二进制文件。其他服务也创建了类似的 Dockerfile。在每个 Dockerfile 中,CMD设置为go run watermark,作为容器的入口点。

为了简化流程,我还构建了一个 Makefile,其中包含两个主要目标:build-imagebuild-push。前者用于构建 Docker 镜像,后者用于将其推送到存储库。

注意:鉴于本博客篇幅有限,我会尽量简洁。文章开头提供的代码仓库涵盖了与服务相关的大部分关键概念。我将继续致力于增强功能和新增功能,并持续提交代码。

现在,让我们深入研究部署过程:

我将演示如何使用 Kubernetes 等容器编排工具部署所有这些服务。我假设您已经具备 Kubernetes 的使用经验,至少是初学者水平。

在 deploy 目录中,您可以创建一个包含三个容器的示例部署auth: 、watermarkdatabase。由于每个容器的入口点命令已在 Dockerfile 中定义,因此无需在部署中指定其他参数或命令。

此外,您还需要一个服务来路由来自其他负载均衡器服务或 NodePort 类型服务的外部流量。为了实现这一点,您可能需要创建一个 NodePort 类型的服务来暴露watermark-service并使其保持运行。

另一个关键且有趣的方面是 API 网关的部署。设置 API 网关需要对云提供商的堆栈有充分的了解。就我而言,我利用 Azure 堆栈,使用 Azure 平台中名为“API-Management”的资源部署了一个 API 网关。

最后,唯一未完成的任务是建立强大的CI/CD 设置,这是项目开发阶段结束后最关键的环节之一。虽然我很想更深入地探讨这些与部署相关的主题,但这超出了我本篇博客的讨论范围。或许,我会在另一篇博文中探讨这些主题。


与此类似,我和其他热爱开源的开发者在 Slack 上运营一个以开发者为中心的社区。我们在这里讨论各种主题,包括实现、集成、一些爆料、奇葩聊天、线上聚会,为开源做贡献,以及一切有助于开发者保持理智的事情 ;) 毕竟,知识过量也可能带来危险。

我诚邀您加入我们的免费社区(我保证没有广告,而且我打算一直保持这种状态),参与讨论,分享您的精彩经验和专业知识。您可以填写这张表格,几天后您的 Slack 邀请函就会发送到您的邮箱。我们拥有来自一些优秀公司(Atlassian、Gong、Scaler)的优秀员工,您一定不想错过与他们互动的机会。邀请表格

继续阅读 https://dev.to/nikl/how-to-build-a-containerized-microservice-in-golang-a-step-by-step-guide-with-example-use-case-5ea8
PREV
Laravel 路线图
NEXT
给 Web 开发者新手的 9 个实用技巧 GenAI LIVE! | 2025 年 6 月 4 日