如何在 Golang 中构建容器化微服务:带有示例用例的分步指南
问题陈述
建筑学
操作和流程
入门
项目布局
介绍
我发现探索微服务和云原生应用领域非常有趣。近年来,微服务彻底改变了 Web 应用架构,提供了模块化、可独立部署的服务,重塑了我们构建和扩展应用程序的方式。此外,REST 和 gRPC 等传输协议也不断发展,效率和速度都得到了提升,使其成为微服务的理想选择。
我们的使命:使用 Go 构建云原生应用程序
在本篇博文中,我将指导您使用 Go 和微服务架构构建云原生应用程序。整个过程比您想象的要简单得多,我们将把它分解成几个易于管理的步骤。
步骤 1:构建微服务
1.1 创建微服务和容器化服务
在此步骤中,我们将创建微服务和容器化服务,每个服务都有与其逻辑组件密切相关的特定的、独立的任务。
1.2 利用 Go-Kit
我们将利用 Go-Kit,这是一个用于构建和开发每个服务组件的优秀框架。
1.3 开发 API
我们将使用 HTTP (REST) 和 Protobuf (gRPC) 作为传输机制来开发 API。我们还将使用 PostgreSQL 作为数据库,并探索在 Azure 上部署 API 管理和 CI/CD。
注意:虽然我们将讨论部署和 CI/CD,但在 Azure 或任何其他云平台上配置它们的具体细节超出了本博客的范围。
先决条件
在深入研究之前,请确保您具备必要的先决条件:
- 对 Web 服务、Rest API 和 gRPC 有基本的了解。
- GoLand 或 VS Code 作为您的开发环境。
- 已正确安装并配置 Go 环境。如果没有,请点击此链接获取指导。
- 在 GOPATH 下设置一个新的项目目录。
- 熟悉标准 Golang 项目结构,您可以在这里找到。
- 已安装 PostgreSQL 客户端。
- 我们将使用 Go Kit 来简化微服务开发。
问题陈述
我们的目标是开发一个 Web 应用程序来解决以下问题:
假设一家专门出版书籍和期刊的全球出版公司。他们需要一项服务来为文档添加水印。这些文档(无论是书籍还是期刊)都包含标题、作者和水印属性。水印可以处于以下三种状态之一:“已开始”、“进行中”或“已完成”。此外,只有特定用户才有权在文档上添加水印,并且一旦添加水印,该操作将一次性生效;文档永远无法再次添加水印。
如需更详细地了解此要求,请参阅此链接。
建筑学
在这个项目中,我们将建立三个微服务:身份验证服务、数据库服务和水印服务。这些服务将与 PostgreSQL 数据库服务器交互,并以 API 网关作为入口点。
现在,让我们深入了解每项服务的细节:
身份验证服务
此服务负责基于用户和角色的访问控制。它将根据用户角色对其进行身份验证,并返回相应的 HTTP 状态代码 - 授权用户返回 200,未授权用户返回 401。
蜜蜂:
- /user/access:对此端点的 GET 请求,用于用户身份验证,以用户名作为输入并返回其角色和相关权限。
- /authenticate:另一个安全的 GET 请求,这次带有用户和操作参数,用于对指定操作的用户进行身份验证。
- /healthz:一个安全的 GET 请求,用于检查服务的状态。
数据库服务
此服务管理应用程序的数据库,存储用户信息、用户角色、访问权限以及不带水印的文档。文档在创建时不能带有水印;只有当数据输入有效且数据库服务返回成功时,才算创建成功。
我们将为这两项服务使用两个独立的数据库,遵循微服务架构的“每个服务单数据库”原则。
蜜蜂:
- /get:根据特定过滤器检索文档的安全 GET 请求。
- /update:更新文档的安全 POST 请求。
- /add:一个安全的 POST 请求,用于添加新文档并返回其标题 ID。
- /remove:根据标题 ID 删除文档的安全 POST 请求。
- /healthz:一个安全的 GET 请求,用于检查服务的状态。
水印服务
此核心服务处理水印文档的请求。当用户需要为文档添加水印时,必须提供工单 ID 以及所需的水印。该服务将内部调用数据库的更新 API,返回水印处理状态,初始状态为“已启动”,然后转换为“进行中”,最终返回“已完成”(如果请求无效,则返回“错误”)。
蜜蜂:
- /get:根据特定过滤器检索文档的安全 GET 请求。
- /status:一个安全的 GET 请求,用于检查特定票证 ID 的水印状态。
- /addDocument:一个安全的 POST 请求,用于添加文档并返回标题 ID。
- /watermark:主要水印操作,接受标题和水印字符串的安全 POST 请求。
- /healthz:一个安全的 GET 请求,用于检查服务的状态。
操作和流程
水印服务 API 是用户请求水印或添加文档的网关。身份验证和数据库服务 API 是私有的,仅供其他服务内部调用。用户仅与 API 网关 URL 交互。
工作原理如下:
- 用户访问 API 网关 URL,提供其用户名、票证 ID 和水印。
- 用户不需要关心身份验证或数据库服务的复杂性。
- API 网关验证用户的请求和有效负载。
- 网关中明确定义的 API 转发规则将请求路由到相关服务。
- 对于水印请求,规则会将其转发到身份验证服务进行用户身份验证。身份验证服务会检查用户的授权,并返回相应的状态码。
- 授权服务检查用户的角色和权限,一旦获得授权,就将请求转发给水印服务。
- 水印服务执行必要的操作,包括添加水印或文档,并将结果转发给用户。
- 如果请求是添加文档,服务将返回 Ticket ID。如果请求是添加水印,则返回操作状态。
请注意,每个用户根据文档类型具有特定的角色,而不是特定的书籍或期刊名称。
入门
首先,在 $GOPATH 中为我们的应用程序创建一个文件夹。这将作为我们服务集合的根目录。
项目布局
我们的项目将遵循标准的 Golang 项目布局。
- api:此目录存储了 gRPC 接口的 API swagger 文件和 protobuf 文件的版本。
- cmd:它包含所有服务的入口点(main.go)文件,以及任何其他容器镜像。
- docs:这包含项目的文档。
- config:在这里存储示例和特定的配置文件。
- 部署:包含用于部署应用程序的部署文件。
- internal: Go 编译器可以识别的常规内部包。它包含项目内共享的包。目录结构说明:
- pkg:在此目录中,您将找到所有服务的完整功能代码,每个服务都组织成不同的包。
- 测试:此目录包含服务的所有集成和端到端(E2E)测试。
- 供应商:在这里,您将发现本地存储的第三方依赖项存储库,以防止将来出现版本不一致。
水印服务:
- 在 Go kit 框架中,服务应该始终通过接口来定义。
在“pkg”目录中创建一个名为“watermark”的包。在该包中,创建一个名为“service.go”的新文件,作为我们服务的蓝图。
package watermark
import (
"context"
"github.com/velotiotech/watermark-service/internal"
)
type Service interface {
// Get the list of all documents
Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error)
Status(ctx context.Context, ticketID string) (internal.Status, error)
Watermark(ctx context.Context, ticketID, mark string) (int, error)
AddDocument(ctx context.Context, doc *internal.Document) (string, error)
ServiceStatus(ctx context.Context) (int, error)
}
-
我们需要五个端点来处理服务接口中定义的方法的请求。为了有效地处理多个并发请求,我们使用了 context 包,尽管在本篇博文中我们可能不会大量使用它。但这仍然是推荐的使用方式。
-
实施我们的服务:
package watermark
import (
"context"
"net/http"
"os"
"github.com/velotiotech/watermark-service/internal"
"github.com/go-kit/kit/log"
"github.com/lithammer/shortuuid/v3"
)
type watermarkService struct{}
func NewService() Service {
return &watermarkService{}
}
func (w *watermarkService) Get(_ context.Context, filters ...internal.Filter) ([]internal.Document, error) {
// Query the database using the filters and return the list of documents.
// Return an error if the filter (key) is invalid or if no item is found.
doc := internal.Document{
Content: "book",
Title: "Harry Potter and the Half-Blood Prince",
Author: "J.K. Rowling",
Topic: "Fiction and Magic",
}
return []internal.Document{doc}, nil
}
func (w *watermarkService) Status(_ context.Context, ticketID string) (internal.Status, error) {
// Query the database using the ticketID and return the document info.
// Return an error if the ticketID is invalid or no document exists for that ticketID.
return internal.InProgress, nil
}
func (w *watermarkService) Watermark(_ context.Context, ticketID, mark string) (int, error) {
// Update the database entry with the watermark field as non-empty.
// First, check if the watermark status is not already in InProgress, Started, or Finished state.
// If yes, then return an invalid request.
// Return an error if no item is found using the ticketID.
return http.StatusOK, nil
}
func (w *watermarkService) AddDocument(_ context.Context, doc *internal.Document) (string, error) {
// Add the document entry in the database by calling the database service.
// Return an error if the doc is invalid and/or if there's a database entry error.
newTicketID := shortuuid.New()
return newTicketID, nil
}
func (w *watermarkService) ServiceStatus(_ context.Context) (int, error) {
logger.Log("Checking the Service health...")
return http.StatusOK, nil
}
var logger log.Logger
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}
我们定义了“watermarkService”类型,它是一个空结构体,用于实现我们定义的服务接口。该结构体的实现对外界隐藏。
“NewService”是作为我们服务“对象”的构造函数创建的。这是此包之外唯一可用于实例化服务的函数。
- 现在,我们将创建“endpoints”包,它将包含两个文件:一个用于存储所有类型的请求和响应,另一个用于实现请求解析和调用适当的服务功能。
- 创建一个名为“reqJSONMap.go”的文件。在此文件中,定义所有请求和响应的结构体及其字段。在这些结构体中添加请求输入和响应输出所需的字段。
package endpoints
import "github.com/velotiotech/watermark-service/internal"
type GetRequest struct {
Filters []internal.Filter `json:"filters,omitempty"`
}
type GetResponse struct {
Documents []internal.Document `json:"documents"`
Err string `json:"err,omitempty"`
}
type StatusRequest struct {
TicketID string `json:"ticketID"`
}
type StatusResponse struct {
Status internal.Status `json:"status"`
Err string `json:"err,omitempty"`
}
type WatermarkRequest struct {
TicketID string `json:"ticketID"`
Mark string `json:"mark"`
}
type WatermarkResponse struct {
Code int `json:"code"`
Err string `json:"err"`
}
type AddDocumentRequest struct {
Document *internal.Document `json:"document"`
}
type AddDocumentResponse struct {
TicketID string `json:"ticketID"`
Err string `json:"err,omitempty"`
}
type ServiceStatusRequest struct{}
type ServiceStatusResponse struct {
Code int `json:"status"`
Err string `json:"err,omitempty"`
}
- 创建一个名为“endpoints.go”的文件。在此文件中,您将实现服务已实现函数的实际调用。
package endpoints
import (
"context"
"errors"
"os"
"github.com/aayushrangwala/watermark-service/internal"
"github.com/aayushrangwala/watermark-service/pkg/watermark"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
)
type Set struct {
GetEndpoint endpoint.Endpoint
AddDocumentEndpoint endpoint.Endpoint
StatusEndpoint endpoint.Endpoint
ServiceStatusEndpoint endpoint.Endpoint
WatermarkEndpoint endpoint.Endpoint
}
func NewEndpointSet(svc watermark.Service) Set {
return Set{
GetEndpoint: MakeGetEndpoint(svc),
AddDocumentEndpoint: MakeAddDocumentEndpoint(svc),
StatusEndpoint: MakeStatusEndpoint(svc),
ServiceStatusEndpoint: MakeServiceStatusEndpoint(svc),
WatermarkEndpoint: MakeWatermarkEndpoint(svc),
}
}
func MakeGetEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(GetRequest)
docs, err := svc.Get(ctx, req.Filters...)
if err != nil {
return GetResponse{docs, err.Error()}, nil
}
return GetResponse{docs, ""}, nil
}
}
func MakeStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(StatusRequest)
status, err := svc.Status(ctx, req.TicketID)
if err != nil {
return StatusResponse{Status: status, Err: err.Error()}, nil
}
return StatusResponse{Status: status, Err
: ""}, nil
}
}
func MakeAddDocumentEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(AddDocumentRequest)
ticketID, err := svc.AddDocument(ctx, req.Document)
if err != nil {
return AddDocumentResponse{TicketID: ticketID, Err: err.Error()}, nil
}
return AddDocumentResponse{TicketID: ticketID, Err: ""}, nil
}
}
func MakeWatermarkEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
req := request.(WatermarkRequest)
code, err := svc.Watermark(ctx, req.TicketID, req.Mark)
if err != nil {
return WatermarkResponse{Code: code, Err: err.Error()}, nil
}
return WatermarkResponse{Code: code, Err: ""}, nil
}
}
func MakeServiceStatusEndpoint(svc watermark.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
_ = request.(ServiceStatusRequest)
code, err := svc.ServiceStatus(ctx)
if err != nil {
return ServiceStatusResponse{Code: code, Err: err.Error()}, nil
}
return ServiceStatusResponse{Code: code, Err: ""}, nil
}
}
func (s *Set) Get(ctx context.Context, filters ...internal.Filter) ([]internal.Document, error) {
resp, err := s.GetEndpoint(ctx, GetRequest{Filters: filters})
if err != nil {
return []internal.Document{}, err
}
getResp := resp.(GetResponse)
if getResp.Err != "" {
return []internal.Document{}, errors.New(getResp.Err)
}
return getResp.Documents, nil
}
func (s *Set) ServiceStatus(ctx context.Context) (int, error) {
resp, err := s.ServiceStatusEndpoint(ctx, ServiceStatusRequest{})
svcStatusResp := resp.(ServiceStatusResponse)
if err != nil {
return svcStatusResp.Code, err
}
if svcStatusResp.Err != "" {
return svcStatusResp.Code, errors.New(svcStatusResp.Err)
}
return svcStatusResp.Code, nil
}
func (s *Set) AddDocument(ctx context.Context, doc *internal.Document) (string, error) {
resp, err := s.AddDocumentEndpoint(ctx, AddDocumentRequest{Document: doc})
if err != nil {
return "", err
}
adResp := resp.(AddDocumentResponse)
if adResp.Err != "" {
return "", errors.New(adResp.Err)
}
return adResp.TicketID, nil
}
func (s *Set) Status(ctx context.Context, ticketID string) (internal.Status, error) {
resp, err := s.StatusEndpoint(ctx, StatusRequest{TicketID: ticketID})
if err != nil {
return internal.Failed, err
}
stsResp := resp.(StatusResponse)
if stsResp.Err != "" {
return internal.Failed, errors.New(stsResp.Err)
}
return stsResp.Status, nil
}
func (s *Set) Watermark(ctx context.Context, ticketID, mark string) (int, error) {
resp, err := s.WatermarkEndpoint(ctx, WatermarkRequest{TicketID: ticketID, Mark: mark})
wmResp := resp.(WatermarkResponse)
if err != nil {
return wmResp.Code, err
}
if wmResp.Err != "" {
return wmResp.Code, errors.New(wmResp.Err)
}
return wmResp.Code, nil
}
var logger log.Logger
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}
在这个文件中,我们有一个“Set”结构体,它是所有端点的集合。我们还有一个“Set”的构造函数。该文件定义了一些内部构造函数,这些函数返回实现了 Go Kit 接口的通用端点的对象,例如“MakeGetEndpoint()”、“MakeStatusEndpoint()”等等。
为了公开 Get、Status、Watermark、ServiceStatus 和 AddDocument API,我们为每个 API 创建了端点。这些函数负责处理传入的请求并调用特定的服务方法。
- 添加“transports”方法来公开服务。我们的服务将支持 HTTP,并使用 REST API 和 Protobuf 以及 gRPC 进行公开。
在“watermark”目录中创建一个名为“transport”的单独包。此包将包含特定类型传输机制的所有处理程序、解码器和编码器。
步骤 6:创建 HTTP 传输函数和处理程序
首先创建一个名为 http.go 的文件,其中包含 HTTP 的传输函数和处理程序。这些函数负责定义 API 路由及其各自的处理程序函数。
package transport
import (
context"
"encoding/json"
"net/http"
"os"
"github.com/velotiotech/watermark-service/internal/util"
"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
"github.com/go-kit/kit/log"
httptransport "github.com/go-kit/kit/transport/http"
)
// NewHTTPHandler initializes an HTTP handler for API endpoints.
func NewHTTPHandler(ep endpoints.Set) http.Handler {
m := http.NewServeMux()
// Define routes and associated handler functions.
m.Handle("/healthz", httptransport.NewServer(
ep.ServiceStatusEndpoint,
decodeHTTPServiceStatusRequest,
encodeResponse,
))
m.Handle("/status", httptransport.NewServer(
ep.StatusEndpoint,
decodeHTTPStatusRequest,
encodeResponse,
))
m.Handle("/addDocument", httptransport.NewServer(
ep.AddDocumentEndpoint,
decodeHTTPAddDocumentRequest,
encodeResponse,
))
m.Handle("/get", httptransport.NewServer(
ep.GetEndpoint,
decodeHTTPGetRequest,
encodeResponse,
))
m.Handle("/watermark", httptransport.NewServer(
ep.WatermarkEndpoint,
decodeHTTPWatermarkRequest,
encodeResponse,
))
return m
}
// Functions for decoding HTTP requests.
func decodeHTTPGetRequest(_ context.Context, r *http.Request) (interface{}, error) {
var req endpoints.GetRequest
if r.ContentLength == 0 {
logger.Log("Get request with no body")
return req, nil
}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
return nil, err
}
return req, nil
}
// ... (Similar decode functions for other endpoints)
// Function for encoding HTTP responses.
func encodeResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error {
if e, ok := response.(error); ok && e != nil {
encodeError(ctx, e, w)
return nil
}
return json.NewEncoder(w).Encode(response)
}
// ... (Similar functions for handling errors)
func encodeError(_ context.Context, err error, w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
switch err {
case util.ErrUnknown:
w.WriteHeader(http.StatusNotFound)
case util.ErrInvalidArgument:
w.WriteHeader(http.StatusBadRequest)
default:
w.WriteHeader(http.StatusInternalServerError)
}
json.NewEncoder(w).Encode(map[string]interface{}{
"error": err.Error(),
})
}
var logger log.Logger
func init() {
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
}
步骤 7:创建 gRPC 传输函数和处理程序
在同一传输包中名为 grpc.go 的文件中,类似于 HTTP 传输,您将为 gRPC 创建传输函数和处理程序。此文件将 protobuf 有效负载映射到相应的请求和响应。
package transport
import (
context"
"github.com/velotiotech/watermark-service/api/v1/pb/watermark"
"github.com/velotiotech/watermark-service/internal"
"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
grpctransport "github.com/go-kit/kit/transport/grpc"
)
// grpcServer is a struct responsible for creating gRPC servers and handling endpoint mappings.
type grpcServer struct {
get grpctransport.Handler
status grpctransport.Handler
addDocument grpctransport.Handler
watermark grpctransport.Handler
serviceStatus grpctransport.Handler
}
// NewGRPCServer initializes a gRPC server for the provided endpoints.
func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
return &grpcServer{
get: grpctransport.NewServer(
ep.GetEndpoint,
decodeGRPCGetRequest,
decodeGRPCGetResponse,
),
status: grpctransport.NewServer(
ep.StatusEndpoint,
decodeGRPCStatusRequest,
decodeGRPCStatusResponse,
),
addDocument: grpctransport.NewServer(
ep.AddDocumentEndpoint,
decodeGRPCAddDocumentRequest,
decodeGRPCAddDocumentResponse,
),
watermark: grpctransport.NewServer(
ep.WatermarkEndpoint,
decodeGRPCWatermarkRequest,
decodeGRPCWatermarkResponse,
),
serviceStatus: grpctransport.NewServer(
ep.ServiceStatusEndpoint,
decodeGRPCServiceStatusRequest,
decodeGRPCServiceStatusResponse,
),
}
}
// Functions for implementing gRPC service endpoints.
func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.GetReply), nil
}
func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.ServiceStatusReply), nil
}
func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
_, rep, err := g.addDocument.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.AddDocumentReply), nil
}
func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
_, rep, err := g.status.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.StatusReply), nil
}
func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
_, rep, err := g.watermark.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.WatermarkReply), nil
}
func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.GetRequest)
var filters []internal.Filter
for _, f := range req.Filters {
filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
}
return endpoints.GetRequest{Filters: filters}, nil
}
func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.StatusRequest)
return endpoints.StatusRequest{TicketID: req.TicketID}, nil
}
func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.WatermarkRequest)
return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
}
func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.AddDocumentRequest)
doc := &internal.Document{
Content: req.Document.Content,
Title: req.Document.Title,
Author: req.Document.Author,
Topic: req.Document.Topic,
Watermark: req.Document.Watermark,
}
return endpoints.AddDocumentRequest{Document: doc}, nil
}
func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
return endpoints.ServiceStatusRequest{}, nil
}
func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.GetReply)
var docs []internal.Document
for _, d := range reply.Documents {
doc := internal.Document{
Content: d.Content,
Title: d.Title,
Author: d.Author,
Topic: d.Topic,
Watermark: d.Watermark,
}
docs = append(docs, doc)
}
return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
}
func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.StatusReply)
return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
}
func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.WatermarkReply)
return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.AddDocumentReply)
return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
}
func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.ServiceStatusReply)
return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
步骤 8:为 gRPC 服务创建 Proto 文件
在包中创建一个 proto 文件 watermarksvc.proto api/v1/pb
。此 proto 文件用于定义服务接口和请求/响应结构。
syntax = "proto3";
package pb;
service Watermark {
rpc Get (GetRequest) returns (GetReply) {}
rpc Watermark (WatermarkRequest) returns (WatermarkReply) {}
rpc Status (StatusRequest) returns (StatusReply) {}
rpc AddDocument (AddDocumentRequest) returns (AddDocumentReply) {}
rpc ServiceStatus (ServiceStatusRequest) returns (ServiceStatusReply) {}
}
message Document {
string content = 1;
string title = 2;
string author = 3;
string topic = 4;
string watermark = 5;
}
message GetRequest {
message Filters {
string key = 1;
string value = 2;
}
repeated Filters filters = 1;
}
message GetReply {
repeated Document documents = 1;
string Err = 2;
}
message StatusRequest {
string ticketID = 1;
}
message StatusReply {
enum Status {
PENDING = 0;
STARTED = 1;
IN_PROGRESS = 2;
FINISHED = 3;
FAILED = 4;
}
Status status = 1;
string Err = 2;
}
message WatermarkRequest {
string ticketID = 1;
string mark = 2;
}
message WatermarkReply {
int64 code = 1;
string err = 2;
}
message AddDocumentRequest {
Document document = 1;
}
message AddDocumentReply {
string ticketID = 1;
string err = 2;
}
message ServiceStatusRequest {}
message ServiceStatusReply {
int64 code = 1;
string err = 2;
}
该proto文件定义了gRPC的服务接口和请求/响应消息结构。
最后可以.pb
使用工具从这个proto文件生成相应的文件protoc
,但是生成文件的过程.pb
不在本文档的介绍中。
实现请求和响应函数:
为了成功将协议缓冲区 (protobuf) 的请求和响应结构与我们的端点请求和响应结构集成,我们需要创建必要的解码和编码函数。这些函数有助于将 protobuf 的请求/响应结构映射到我们的端点的请求/响应结构。
让我们用一个例子来说明这个过程:考虑函数decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error)
。在这个函数中,我们从 中提取相关信息grpcReq
,该信息被断言为 类型pb.GetRequest
。然后,我们使用提取的字段来填充一个 类型的新结构体endpoints.GetRequest{}
。其他请求和响应也应该实现类似的解码和编码函数。
这是包中的代码transport
:
package transport
import (
// Imports...
grpctransport "github.com/go-kit/kit/transport/grpc"
)
type grpcServer struct {
get grpctransport.Handler
status grpctransport.Handler
addDocument grpctransport.Handler
watermark grpctransport.Handler
serviceStatus grpctransport.Handler
}
func NewGRPCServer(ep endpoints.Set) watermark.WatermarkServer {
return &grpcServer{
get: grpctransport.NewServer(
ep.GetEndpoint,
decodeGRPCGetRequest,
decodeGRPCGetResponse,
),
status: grpctransport.NewServer(
ep.StatusEndpoint,
decodeGRPCStatusRequest,
decodeGRPCStatusResponse,
),
addDocument: grpctransport.NewServer(
ep.AddDocumentEndpoint,
decodeGRPCAddDocumentRequest,
decodeGRPCAddDocumentResponse,
),
watermark: grpctransport.NewServer(
ep.WatermarkEndpoint,
decodeGRPCWatermarkRequest,
decodeGRPCWatermarkResponse,
),
serviceStatus: grpctransport.NewServer(
ep.ServiceStatusEndpoint,
decodeGRPCServiceStatusRequest,
decodeGRPCServiceStatusResponse,
),
}
}
func (g *grpcServer) Get(ctx context.Context, r *watermark.GetRequest) (*watermark.GetReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.GetReply), nil
}
func (g *grpcServer) ServiceStatus(ctx context.Context, r *watermark.ServiceStatusRequest) (*watermark.ServiceStatusReply, error) {
_, rep, err := g.get.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.ServiceStatusReply), nil
}
func (g *grpcServer) AddDocument(ctx context.Context, r *watermark.AddDocumentRequest) (*watermark.AddDocumentReply, error) {
_, rep, err := g.addDocument.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.AddDocumentReply), nil
}
func (g *grpcServer) Status(ctx context.Context, r *watermark.StatusRequest) (*watermark.StatusReply, error) {
_, rep, err := g.status.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.StatusReply), nil
}
func (g *grpcServer) Watermark(ctx context.Context, r *watermark.WatermarkRequest) (*watermark.WatermarkReply, error) {
_, rep, err := g.watermark.ServeGRPC(ctx, r)
if err != nil {
return nil, err
}
return rep.(*watermark.WatermarkReply), nil
}
func decodeGRPCGetRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.GetRequest)
var filters []internal.Filter
for _, f := range req.Filters {
filters = append(filters, internal.Filter{Key: f.Key, Value: f.Value})
}
return endpoints.GetRequest{Filters: filters}, nil
}
func decodeGRPCStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.StatusRequest)
return endpoints.StatusRequest{TicketID: req.TicketID}, nil
}
func decodeGRPCWatermarkRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.WatermarkRequest)
return endpoints.WatermarkRequest{TicketID: req.TicketID, Mark: req.Mark}, nil
}
func decodeGRPCAddDocumentRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
req := grpcReq.(*watermark.AddDocumentRequest)
doc := &internal.Document{
Content: req.Document.Content,
Title: req.Document.Title,
Author: req.Document.Author,
Topic: req.Document.Topic,
Watermark: req.Document.Watermark,
}
return endpoints.AddDocumentRequest{Document: doc}, nil
}
func decodeGRPCServiceStatusRequest(_ context.Context, grpcReq interface{}) (interface{}, error) {
return endpoints.ServiceStatusRequest{}, nil
}
func decodeGRPCGetResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.GetReply)
var docs []internal.Document
for _, d := range reply.Documents {
doc := internal.Document{
Content: d.Content,
Title: d.Title,
Author: d.Author,
Topic: d.Topic,
Watermark: d.Watermark,
}
docs = append(docs, doc)
}
return endpoints.GetResponse{Documents: docs, Err: reply.Err}, nil
}
func decodeGRPCStatusResponse(_ context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.StatusReply)
return endpoints.StatusResponse{Status: internal.Status(reply.Status), Err: reply.Err}, nil
}
func decodeGRPCWatermarkResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.WatermarkReply)
return endpoints.WatermarkResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
func decodeGRPCAddDocumentResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.AddDocumentReply)
return endpoints.AddDocumentResponse{TicketID: reply.TicketID, Err: reply.Err}, nil
}
func decodeGRPCServiceStatusResponse(ctx context.Context, grpcReply interface{}) (interface{}, error) {
reply := grpcReply.(*watermark.ServiceStatusReply)
return endpoints.ServiceStatusResponse{Code: int(reply.Code), Err: reply.Err}, nil
}
服务器入口点:
现在,下一步是在cmd
每个服务的目录中创建入口点文件(main)。我们已经通过调用服务函数将适当的路由映射到端点,并使用ServeGRPC()
函数将 Proto 服务服务器与端点集成,现在可以专注于构建和启动 HTTP 和 gRPC 服务器了。
为此,请在目录watermark
中创建一个包cmd
和一个watermark.go
文件。此文件将包含用于启动和停止服务的 HTTP 和 gRPC 服务器的代码。
这是该包的代码main
:
package main
import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
pb "github.com/velotiotech/watermark-service/api/v1/pb/watermark"
"github.com/velotiotech/watermark-service/pkg/watermark"
"github.com/velotiotech/watermark-service/pkg/watermark/endpoints"
"github.com/velotiotech/watermark-service/pkg/watermark/transport"
"github.com/go-kit/kit/log"
kitgrpc "github.com/go-kit/kit/transport/grpc"
"github.com/oklog/oklog/pkg/group"
"google.golang.org/grpc"
)
const (
defaultHTTPPort = "8081"
defaultGRPCPort = "8082"
)
func main() {
var (
logger log.Logger
httpAddr = net.JoinHostPort("localhost", envString("HTTP_PORT", defaultHTTPPort))
grpcAddr = net.JoinHostPort("localhost", envString("GRPC_PORT", defaultGRPCPort))
)
logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
var (
service = watermark.NewService()
eps = endpoints.NewEndpointSet(service)
httpHandler = transport.NewHTTPHandler(eps)
grpcServer = transport.NewGRPCServer(eps)
)
var g group.Group
{
// The HTTP listener mounts the Go kit HTTP handler we created.
httpListener, err := net.Listen("tcp", httpAddr)
if err != nil {
logger.Log("transport", "HTTP", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "HTTP", "addr", httpAddr)
return http.Serve(httpListener, httpHandler)
}, func(error) {
httpListener.Close()
})
}
{
// The gRPC listener mounts the Go kit gRPC server we created.
grpcListener, err := net.Listen("tcp", grpcAddr)
if err != nil {
logger.Log("transport", "gRPC", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "gRPC", "addr", grpcAddr)
// we add the Go Kit gRPC Interceptor to our gRPC service as it is used by
// the here demonstrated zipkin tracing middleware.
baseServer := grpc.NewServer(grpc.UnaryInterceptor(kitgrpc.Interceptor))
pb.RegisterWatermarkServer(baseServer, grpcServer)
return baseServer.Serve(grpcListener)
}, func(error) {
grpcListener.Close()
})
}
{
// This function just sits and waits for ctrl-C.
cancelInterrupt := make(chan struct{})
g.Add(func() error {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
select {
case sig := <-c:
return fmt.Errorf("received signal %s", sig)
case <-cancelInterrupt:
return nil
}
}, func(error) {
close(cancelInterrupt)
})
}
logger.Log("exit", g.Run())
}
func envString(env, fallback string) string {
e := os.Getenv(env)
if e == "" {
return fallback
}
return e
}
在这部分代码中,我们遵循以下步骤:
-
我们为服务器指定固定的监听端口:HTTP 服务器为 8081,gRPC 服务器为 8082。
-
我们继续创建 HTTP 和 gRPC 服务器,以及服务后端和服务本身的端点。
-
我们利用 来
oklog.Group
有效地管理多个 Goroutine。我们建立了三个 Goroutine:一个用于 HTTP 服务器,一个用于 gRPC 服务器,第三个用于监控取消中断。这确保了这些组件的高效并发运行。
总而言之,我们讨论了如何为 protobuf 结构设置解码和编码函数,以及如何为 HTTP 和 gRPC 服务器创建入口点文件,同时保持代码结构完整。服务器一旦启动,即可处理传入的请求。
要运行该服务,请执行以下命令:
go run ./cmd/watermark/watermark.go
此命令将启动水印服务,HTTP 和 gRPC 服务器均启动并运行,准备处理请求。
服务器已在本地启动。现在,您可以使用 Postman 或执行 curl 命令与其中一个可用端点进行交互来测试它。以下是示例:
➜ ~ curl http://localhost:8081/healthz
{"status":200}
此命令检查服务的状态,它应该返回“状态”为 200 的响应。
我们已经有效地开发了一项服务并执行了其端点。
此外:
我始终致力于确保项目全面,涵盖所有与维护相关的必要方面。这包括创建结构良好的 README 文件、设置正确的 .gitignore 和 .dockerignore 文件、编写 Makefile、Dockerfile、golang-ci-lint 配置文件以及配置 CI/CD 流水线等等。
对于这三个服务,我创建了位于 /images/ 路径下的不同 Dockerfile。我们使用一个多阶段 Dockerfile来创建并运行服务的二进制文件。这包括将相关代码目录复制到 Docker 镜像中,一步构建镜像,然后在同一个文件中创建新镜像,同时从上一个镜像中传输二进制文件。其他服务也创建了类似的 Dockerfile。在每个 Dockerfile 中,CMD
设置为go run watermark
,作为容器的入口点。
为了简化流程,我还构建了一个 Makefile,其中包含两个主要目标:build-image
和build-push
。前者用于构建 Docker 镜像,后者用于将其推送到存储库。
注意:鉴于本博客篇幅有限,我会尽量简洁。文章开头提供的代码仓库涵盖了与服务相关的大部分关键概念。我将继续致力于增强功能和新增功能,并持续提交代码。
现在,让我们深入研究部署过程:
我将演示如何使用 Kubernetes 等容器编排工具部署所有这些服务。我假设您已经具备 Kubernetes 的使用经验,至少是初学者水平。
在 deploy 目录中,您可以创建一个包含三个容器的示例部署auth
: 、watermark
和database
。由于每个容器的入口点命令已在 Dockerfile 中定义,因此无需在部署中指定其他参数或命令。
此外,您还需要一个服务来路由来自其他负载均衡器服务或 NodePort 类型服务的外部流量。为了实现这一点,您可能需要创建一个 NodePort 类型的服务来暴露watermark-service
并使其保持运行。
另一个关键且有趣的方面是 API 网关的部署。设置 API 网关需要对云提供商的堆栈有充分的了解。就我而言,我利用 Azure 堆栈,使用 Azure 平台中名为“API-Management”的资源部署了一个 API 网关。
最后,唯一未完成的任务是建立强大的CI/CD 设置,这是项目开发阶段结束后最关键的环节之一。虽然我很想更深入地探讨这些与部署相关的主题,但这超出了我本篇博客的讨论范围。或许,我会在另一篇博文中探讨这些主题。
继续阅读 https://dev.to/nikl/how-to-build-a-containerized-microservice-in-golang-a-step-by-step-guide-with-example-use-case-5ea8与此类似,我和其他热爱开源的开发者在 Slack 上运营一个以开发者为中心的社区。我们在这里讨论各种主题,包括实现、集成、一些爆料、奇葩聊天、线上聚会,为开源做贡献,以及一切有助于开发者保持理智的事情 ;) 毕竟,知识过量也可能带来危险。
我诚邀您加入我们的免费社区(我保证没有广告,而且我打算一直保持这种状态),参与讨论,分享您的精彩经验和专业知识。您可以填写这张表格,几天后您的 Slack 邀请函就会发送到您的邮箱。我们拥有来自一些优秀公司(Atlassian、Gong、Scaler)的优秀员工,您一定不想错过与他们互动的机会。邀请表格