Task and gen_tcp

本章是Mix和OTP指南的一部分,它取决于本指南的前几章。有关更多信息,请阅读简介指南或查看边栏中的章节索引。

在本章中,我们将学习如何使用Erlang的:gen_tcp模块来处理请求。这为探索Elixir的Task模块提供了一个很好的机会。在将来的章节中,我们将扩展我们的服务器,以便它可以真正服务于这些命令。

回声服务器

我们将首先实现一个echo服务器来启动我们的TCP服务器。它将发送一个响应,并在请求中收到它收到的文本。我们将慢慢改进我们的服务器,直到它受到监督并准备处理多个连接。

一个TCP服务器,广泛的笔触,执行以下步骤:

  1. 侦听端口,直到端口可用,并获取套接字

2. 在该端口上等待客户端连接并接受它

3. 读取客户端请求并写回响应

让我们执行这些步骤。移至apps/kv_server应用程序,打开lib/kv_server.ex并添加以下功能:

require Logger

def accept(port) do
  # The options below mean:
  #
  # 1. `:binary` - receives data as binaries (instead of lists)
  # 2. `packet: :line` - receives data line by line
  # 3. `active: false` - blocks on `:gen_tcp.recv/2` until data is available
  # 4. `reuseaddr: true` - allows us to reuse the address if the listener crashes
  #
  {:ok, socket} = :gen_tcp.listen(port,
                    [:binary, packet: :line, active: false, reuseaddr: true])
  Logger.info "Accepting connections on port #{port}"
  loop_acceptor(socket)
end

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

defp serve(socket) do
  socket
  |> read_line()
  |> write_line(socket)

  serve(socket)
end

defp read_line(socket) do
  {:ok, data} = :gen_tcp.recv(socket, 0)
  data
end

defp write_line(line, socket) do
  :gen_tcp.send(socket, line)
end

我们打算通过调用来启动我们的服务器KVServer.accept(4040),其中4040是端口。第一步accept/1是监听端口,直到套接字变为可用,然后调用loop_acceptor/1loop_acceptor/1是一个接受客户端连接的循环。对于每一个接受的连接,我们调用serve/1

serve/1是另一个从套接字读取一行并将这些行写回套接字的循环。请注意,该serve/1函数使用管道运算符|>来表示这种操作流程。管道运算符评估左侧并将其结果作为第一个参数传递给右侧的函数。上面的例子:

socket |> read_line() |> write_line(socket)

相当于:

write_line(read_line(socket), socket)

read_line/1执行使用从套接字接收数据:gen_tcp.recv/2write_line/2写入使用套接字:gen_tcp.send/2

请注意,这serve/1是一个内部依次调用的无限循环loop_acceptor/1,所以尾部调用loop_acceptor/1永远不会到达并且可以避免。但是,正如我们将看到的,我们需要serve/1在一个单独的过程中执行,因此我们将很快需要尾部呼叫。

这几乎是我们实现我们的echo服务器所需要的。试一试吧!

kv_server应用程序在应用程序内启动一个IEx会话iex -S mix。在IEx内部,运行:

iex> KVServer.accept(4040)

服务器现在正在运行,您甚至会注意到控制台已被阻止。让我们用一个telnet客户端来访问我们的服务器。大多数操作系统上都有可用的客户端,它们的命令行通常是相似的:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello
is it me
is it me
you are looking for?
you are looking for?

输入“hello”,按回车键,你会收到“hello”。优秀!

我的特定telnet客户端可以通过键入ctrl + ],键入quit和按下来退出<Enter>,但您的客户端可能需要不同的步骤。

一旦退出telnet客户端,您可能会在IEx会话中看到错误:

** (MatchError) no match of right hand side value: {:error, :closed}
    (kv_server) lib/kv_server.ex:45: KVServer.read_line/1
    (kv_server) lib/kv_server.ex:37: KVServer.serve/1
    (kv_server) lib/kv_server.ex:30: KVServer.loop_acceptor/1

这是因为我们期望来自:gen_tcp.recv/2客户端的数据关闭连接。我们需要在今后的服务器修订版中更好地处理这些情况。

目前,我们需要修复一个更重要的错误:如果我们的TCP接收器崩溃会发生什么?由于没有监督,服务器死亡,我们将无法提供更多请求,因为它不会重新启动。这就是为什么我们必须将我们的服务器移到监督树上。

任务

我们了解了代理商,通用服务器和主管。它们都是为了处理多个消息或管理状态。但是当我们只需要执行一些任务时,我们使用了什么?就是这样?

任务模块完全提供了此功能。例如,它有一个start_link/3接收模块,函数和参数的函数,允许我们运行给定的函数作为监督树的一部分。

试一试吧。打开lib/kv_server/application.ex,让我们把这个start/2功能中的主管更改为以下内容:

  def start(_type, _args) do
    children = [
      {Task, fn -> KVServer.accept(4040) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

有了这个改变,我们说我们想要KVServer.accept(4040)作为一项任务运行。我们现在对端口进行了硬编码,但这可以通过几种方式进行更改,例如,在启动应用程序时通过从系统环境中读取端口:

port = String.to_integer(System.get_env("PORT") || raise "missing $PORT environment variable")
# ...
{Task, fn -> KVServer.accept(port) end}

现在服务器是监督树的一部分,它应该在我们运行应用程序时自动启动。键入mix run --no-halt终端,并再次使用telnet客户端来确保一切仍然有效:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
say you
say you
say me
say me

是的,它行得通!但是,它的规模

尝试同时连接两个telnet客户端。当你这样做时,你会注意到第二个客户端没有回应:

$ telnet 127.0.0.1 4040
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
hello?
HELLOOOOOO?

它似乎并没有行得通。这是因为我们在接受连接的同一过程中提供请求。当一个客户连接时,我们不能接受另一个客户。

任务监控器

为了让我们的服务器处理同时连接,我们需要有一个进程作为接受者工作,以产生其他进程来处理请求。一种解决方案是改变:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  serve(client)
  loop_acceptor(socket)
end

使用Task.start_link/1,这是类似的Task.start_link/3,但它接收一个匿名函数,而不是模块,函数和参数:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  Task.start_link(fn -> serve(client) end)
  loop_acceptor(socket)
end

我们直接从接受者流程开始一个链接任务。但是我们已经犯了一次这个错误。你还记得吗?

这与我们KV.Bucket.start_link/1直接从注册表中调用时所犯的错误类似。这意味着任何存储桶中的失败都会导致整个注册表失效。

上面的代码也会有相同的缺陷:如果我们将serve(client)任务链接到接受者,发送请求时的崩溃会使接受者以及所有其他连接失效。

我们通过为一位主管使用简单的问题来解决注册表问题。我们将在这里使用相同的策略,不同之处在于这种模式对于Task已经带有解决方案的任务非常常见:对于一个主管来说,开始执行临时任务是监督树的一部分。

让我们再次改变start/2,为我们的树添加一个主管:

  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      {Task, fn -> KVServer.accept(4040) end}
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

我们现在将开始一个Task.Supervisor名称的过程KVServer.TaskSupervisor。请记住,由于接受者的任务取决于该主管,所以主管必须先启动。

现在我们需要更改loop_acceptor/1以用于Task.Supervisor为每个请求提供服务:

defp loop_acceptor(socket) do
  {:ok, client} = :gen_tcp.accept(socket)
  {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
  :ok = :gen_tcp.controlling_process(client, pid)
  loop_acceptor(socket)
end

您可能会注意到我们添加了一行,:ok = :gen_tcp.controlling_process(client, pid)。这使得子进程处理client套接字的“控制进程” 。如果我们没有这样做,那么接受者会在所有客户端崩溃的情况下关闭所有客户端,因为套接字将与接受它们的进程绑定(这是默认行为)。

用新的服务器启动PORT=4040 mix run --no-halt,我们现在可以打开许多并发的远程登录客户端。您还会注意到,退出客户并不会使接受者失望。优秀!

以下是完整的回声服务器实现:

defmodule KVServer do
  require Logger

  @doc """
  Starts accepting connections on the given `port`.
  """
  def accept(port) do
    {:ok, socket} = :gen_tcp.listen(port,
                      [:binary, packet: :line, active: false, reuseaddr: true])
    Logger.info "Accepting connections on port #{port}"
    loop_acceptor(socket)
  end

  defp loop_acceptor(socket) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, pid} = Task.Supervisor.start_child(KVServer.TaskSupervisor, fn -> serve(client) end)
    :ok = :gen_tcp.controlling_process(client, pid)
    loop_acceptor(socket)
  end

  defp serve(socket) do
    socket
    |> read_line()
    |> write_line(socket)

    serve(socket)
  end

  defp read_line(socket) do
    {:ok, data} = :gen_tcp.recv(socket, 0)
    data
  end

  defp write_line(line, socket) do
    :gen_tcp.send(socket, line)
  end
end

由于我们改变了主管规格,我们需要问:我们的监管策略是否仍然正确?

在这种情况下,答案是肯定的:如果接受者崩溃,则不需要使现有连接崩溃。另一方面,如果任务主管崩溃,则不需要让接受者崩溃。

但是,仍然有一个问题需要解决,那就是重启战略。默认情况下,任务的:restart值设置为:temporary,这意味着它们不会重新启动。这对于通过连接启动的连接来说是一个很好的默认设置Task.Supervisor,因为重启失败的连接没有任何意义,但对于接受方来说这是一个不好的选择。如果接受者崩溃,我们想让接受者重新开始运行。

我们可以通过定义自己的模块调用解决这一问题use Task, restart: :permanent并调用一个start_link负责重新启动任务,非常相似的功能AgentGenServer。但是,我们在这里采取不同的方法。与其他人的图书馆进行整合时,我们将无法更改其代理,任务和服务器的定义方式。相反,我们需要能够动态地定制他们的子规格。这可以通过使用Supervisor.child_spec/2我们在前几章中已经知道的函数来完成。让我们重写start/2KVServer.Application一次:

  def start(_type, _args) do
    children = [
      {Task.Supervisor, name: KVServer.TaskSupervisor},
      Supervisor.child_spec({Task, fn -> KVServer.accept(4040) end}, restart: :permanent)
    ]

    opts = [strategy: :one_for_one, name: KVServer.Supervisor]
    Supervisor.start_link(children, opts)
  end

Supervisor.child_spec/2能够从给定的模块和/或元组构建一个子规范,并且它也接受覆盖子规范的值。现在我们有一个始终运行的接受器,它在始终运行的任务管理器下启动临时任务进程。

在下一章中,我们将开始解析客户端请求并发送响应,完成我们的服务器。