在 Elixir 中实现点对点网络 - 第一部分:服务器
这篇文章最初发布在opencode.space 博客上。
过去一周左右,我一直在用 Elixir 实现一个点对点网络,这是我正在进行的一个项目的一部分。我之所以想写这个系列文章,是因为我在网上找不到太多关于这类案例的资源,其次也想记录整个过程,以供将来参考。
即使您不想实现点对点网络,这仍然是一个很好的练习,可以学习更多知识并试验 OTP 应用程序和概念。
我们将实现一个点对点网络,允许点对点之间发送文本消息,并接收相同的消息。很简单,对吧?
本系列分为三个部分:
- 在 Elixir 中实现点对点网络 - 第一部分:服务器(当前)
-
在 Elixir 中实现点对点网络 - 第二部分:客户端 -
在 Elixir 中实现点对点网络 - 第 3 部分:增强功能
在本文中,我们将介绍点对点网络服务器端的逻辑。最终,你将拥有一个可以正常工作的 TCP 服务器,它可以监听并接受连接,并回显收到的每条消息。
完成后,您将能够使用 Telnet 连接并进行测试。
创建项目
使用Mix创建一个带有该--sup
标志的新项目,以生成包含监督树和application
回调设置的 OTP 应用程序框架。为了简单起见,我将该项目命名为network
:
$ mix new network --sup
设置依赖项
对于这个项目,我们唯一需要的依赖项是 Ranch 1。更新mix.exs以包含它:
defp deps do
[
{:ranch, "~> 1.4"}
]
end
完成后,获取依赖项:
$ mix deps.get
监听连接
为了让用户连接到我们的服务器,我们必须在他们到达时监听并接受他们的请求。这时 Ranch 1库就派上用场了。
创建lib/network/server.ex:
defmodule Network.Server do
@moduledoc """
A simple TCP server.
"""
use GenServer
alias Network.Handler
require Logger
@doc """
Starts the server.
"""
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
@doc """
Initiates the listener (pool of acceptors).
"""
def init(port: port) do
opts = [{:port, port}]
{:ok, pid} = :ranch.start_listener(:network, :ranch_tcp, opts, Handler, [])
Logger.info(fn ->
"Listening for connections on port #{port}"
end)
{:ok, pid}
end
end
函数中init/1
发生了神奇的事情。我们使用该:ranch.start_listener/5
函数创建一个接受器进程池,该进程将接受传入的连接,并在连接完成后,生成一个新进程,使用指定的协议(Network.Handler
)来处理它。
需要的五个参数:ranch.start_listener/5
是:
:network
--- 标识监听器的唯一名称:ranch_tcp
--- 运输[{:port, port}]
--- 运输选项Network.Handler
--- 协议处理程序[]
--- 处理程序的选项
处理连接
因为 Ranch 1让我们将协议处理抽象到它自己的模块中 --- 这非常有用,因为它可以最大限度地降低代码复杂性 --- 这就是我们现在要做的。
创建lib/network/handler.ex:
defmodule Network.Handler do
@moduledoc """
A simple TCP protocol handler that echoes all messages received.
"""
use GenServer
require Logger
# Client
@doc """
Starts the handler with `:proc_lib.spawn_link/3`.
"""
def start_link(ref, socket, transport, _opts) do
pid = :proc_lib.spawn_link(__MODULE__, :init, [ref, socket, transport])
{:ok, pid}
end
@doc """
Initiates the handler, acknowledging the connection was accepted.
Finally it makes the existing process into a `:gen_server` process and
enters the `:gen_server` receive loop with `:gen_server.enter_loop/3`.
"""
def init(ref, socket, transport) do
peername = stringify_peername(socket)
Logger.info(fn ->
"Peer #{peername} connecting"
end)
:ok = :ranch.accept_ack(ref)
:ok = transport.setopts(socket, [{:active, true}])
:gen_server.enter_loop(__MODULE__, [], %{
socket: socket,
transport: transport,
peername: peername
})
end
# Server callbacks
def handle_info(
{:tcp, _, message},
%{socket: socket, transport: transport, peername: peername} = state
) do
Logger.info(fn ->
"Received new message from peer #{peername}: #{inspect(message)}. Echoing it back"
end)
# Sends the message back
transport.send(socket, message)
{:noreply, state}
end
def handle_info({:tcp_closed, _}, %{peername: peername} = state) do
Logger.info(fn ->
"Peer #{peername} disconnected"
end)
{:stop, :normal, state}
end
def handle_info({:tcp_error, _, reason}, %{peername: peername} = state) do
Logger.info(fn ->
"Error with peer #{peername}: #{inspect(reason)}"
end)
{:stop, :normal, state}
end
# Helpers
defp stringify_peername(socket) do
{:ok, {addr, port}} = :inet.peername(socket)
address =
addr
|> :inet_parse.ntoa()
|> to_string()
"#{address}:#{port}"
end
end
这个模块有一些非常有趣的特性。首先,你可能已经注意到,我们GenServer
通过定义的函数和回调实现了行为,尽管我们没有使用GenServer.start_link/3
函数,而是使用了:proc_lib.spawn_link/3
。
在深入了解之前,我们先来看看这个init/3
函数。乍一看,一切都很清晰:我们确认与 的连接:ranch.accept_ack/1
,将连接设置为活动状态,然后……我们进入了一个循环?
当然!我们需要循环等待来自连接的新消息,收到消息后,进行必要的处理,然后进入循环并再次等待新消息。
当我们实现行为时,GenServer
我们必须使用:gen_server.enter_loop/3
将我们的流程转变为:gen_server
流程并进入:gen_server
流程接收循环的行为。
现在回过头来说,为什么:proc_lib.spawn_link/3
?如果你了解这个GenServer
行为,你就知道必须定义一个start/3
或start_link/3
函数来启动服务器,并且一旦服务器启动,它就会调用init
回调函数。到目前为止一切顺利。
这个问题的发生是因为这种行为的运作方式。根据GenServer.start_link/3
文档:
为了确保启动过程同步,该函数直到
c:init/1
已返回才会返回。
当我们需要进入循环时,这会带来一个大问题,因为一旦进入循环,它就永远不会返回,除非发生意外并返回错误。这就是为什么我们使用:proc.spawn_link/3
,因为它不是同步地生成进程,而是异步地生成进程,这样就不会出现任何问题。
实际上,唯一可以使用的进程:gen_server.enter_loop/3
是那些使用此特定功能启动的进程。
回调handle_info/2
将接收每个 TCP 事件。这些事件可以是:
{:tcp, socket, message}
--- 客户端发送的正常消息{:tcp_error, socket, reason}
--- 连接时发生的任何错误{:tcp_closed, socket}
--- 当连接关闭时
我们还定义了一个stringify_peername/1
辅助函数,为给定的连接提供一个友好名称。它使用该:inet.peername/1
函数检索连接的地址和端口
,并返回一个由这两个值组成的字符串。
启动服务器
更新config/config.exs以包含服务器配置:
use Mix.Config
config :network, :server,
port: String.to_integer(System.get_env("PORT") || "5555")
更新lib/network/application.ex以包含Network.Server
在应用程序的监督树中:
defmodule Network.Application do
@moduledoc false
use Application
def start(_type, _args) do
# Get configuration
config = Application.get_env(:network, :server)
children = [
# Add it to supervison tree
{Network.Server, config}
]
opts = [strategy: :one_for_one, name: Network.Supervisor]
Supervisor.start_link(children, opts)
end
end
测试
打开终端并启动应用程序:
$ mix run --no-halt
00:00:00.000 [info] Accepting connections on port 5555
打开另一个终端并使用 Telnet 连接:
$ telnet 127.0.0.1 5555
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^['.
很好,我们已成功连接到服务器。在应用程序运行的终端上,我们还应该看到一条消息,告知我们:
00:00:00.000 [info] Peer 127.0.0.1:00000 connecting
现在尝试通过 telnet 会话发送任何消息,它都会被回显。在应用程序的终端上,您将看到(例如):
00:00:00.000 [info] Received new message from 127.0.0.1:00000: "Hello, opencode.space!\r\n". Echoing it back
如果您关闭运行 Telnet 的终端,我们的应用程序也会收到通知:
00:00:00.000 [info] Peer 127.0.0.1:00000 disconnected
结论
仅此而已,~153 LOC
我们就成功实现了一个 TCP 服务器,它可以回显收到的每条消息。是不是很棒?
在本系列的下一部分中,我们将介绍如何实现客户端连接到服务器以实现对等网络。
请继续关注更新!
笔记
文章来源:https://dev.to/cerqueira/implementing-a-peer-to-peer-network-in-elixir---part-1-the-server-59p6