GenServer

本章是Mix和OTP指南的一部分,它取决于本指南的前几章。有关更多信息,请阅读简介指南或查看边栏中的章节索引。

在前一章中,我们使用代理来表示我们的桶。在第一章中,我们指定了我们想要命名每个存储桶,以便我们可以执行以下操作:

CREATE shopping
OK

PUT shopping milk 1
OK

GET shopping milk
1
OK

在上面的会话中,我们与“shopping”桶进行了交互。

由于代理是进程,每个存储桶都有一个进程标识符(pid),但存储桶没有名称。回到过程章节,我们已经了解到,我们可以通过给他们原子名称来注册Elixir中的进程:

iex> Agent.start_link(fn -> %{} end, name: :shopping)
{:ok, #PID<0.43.0>}
iex> KV.Bucket.put(:shopping, "milk", 1)
:ok
iex> KV.Bucket.get(:shopping, "milk")
1

但是,用原子命名动态过程是一个可怕的主意!如果我们使用原子,我们需要将桶名称(通常从外部客户端接收)转换为原子,并且我们不应该将用户输入转换为原子。这是因为原子不是垃圾收集的。一旦创建了一个原子,它就永远不会被回收。从用户输入生成原子将意味着用户可以注入足够多的不同名称来耗尽我们的系统内存!

实际上,在内存不足的情况下,您很可能会达到Erlang VM的最大原子数限制,无论如何,这会使您的系统停机。

我们不会滥用内置的名称设施,而会创建我们自己的流程注册表,将存储桶名称与存储桶流程相关联。

注册表需要保证它始终处于最新状态。例如,如果其中一个存储桶进程由于错误而崩溃,则注册表必须注意到此更改并避免提供陈旧的条目。在Elixir中,我们说注册表需要监视每个桶。

我们将使用GenServer创建一个可以监视存储桶进程的注册表进程。GenServer为在Elixir和OTP中构建服务器提供了工业强度功能。

我们的第一台GenServer

GenServer分两部分实现:客户端API和服务器回调。您既可以将两个部件组合到一个模块中,也可以将它们分为客户端模块和服务器模块。客户端和服务器运行在不同的进程中,客户端在调用其功能时将消息传递给服务器。在这里,我们将为服务器回调和客户端API使用单个模块。

lib/kv/registry.ex用以下内容创建一个新文件:

defmodule KV.Registry do
  use GenServer

  ## Client API

  @doc """
  Starts the registry.
  """
  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Looks up the bucket pid for `name` stored in `server`.

  Returns `{:ok, pid}` if the bucket exists, `:error` otherwise.
  """
  def lookup(server, name) do
    GenServer.call(server, {:lookup, name})
  end

  @doc """
  Ensures there is a bucket associated with the given `name` in `server`.
  """
  def create(server, name) do
    GenServer.cast(server, {:create, name})
  end

  ## Server Callbacks

  def init(:ok) do
    {:ok, %{}}
  end

  def handle_call({:lookup, name}, _from, names) do
    {:reply, Map.fetch(names, name), names}
  end

  def handle_cast({:create, name}, names) do
    if Map.has_key?(names, name) do
      {:noreply, names}
    else
      {:ok, bucket} = KV.Bucket.start_link([])
      {:noreply, Map.put(names, name, bucket)}
    end
  end
end

第一个函数是start_link/1,它启动一个新的GenServer传递三个参数:

  • 实现服务器回调的模块,在这种情况下__MODULE__意味着当前模块
  • 初始化参数,在这种情况下,原子 :ok

可用于指定诸如服务器名称之类的选项列表。现在,我们转发我们在start_link / 1上收到的选项列表,该列表默认为空列表。稍后我们将对其进行自定义。您可以向GenServer发送两种类型的请求:呼叫和强制转换。调用是同步的,并且服务器必须发回一个响应给这样的请求。强制转换是异步的,服务器不会发回响应。接下来的两个函数lookup / 2和create / 2负责将这些请求发送到服务器。在这种情况下,我们分别使用了{:lookup,name}和{:create,name}。为了在第一个参数槽中提供多个“参数”,请求通常被指定为元组。通常将被请求的动作指定为元组的第一个元素,并在其余元素中指定该动作的参数。请注意,这些请求必须与handle_call / 3或handle_cast / 2的第一个参数相匹配。那就是客户端API。在服务器端,我们可以实现各种回调以保证服务器初始化,终止和处理请求。这些回调是可选的,现在我们只实现了我们关心的回调。第一个是init / 1回调,它接收GenServer.start_link / 3给出的第二个参数并返回{:ok,state},其中状态是一张新地图。我们已经注意到GenServer API如何使客户端/服务器隔离更加明显。 start_link / 3发生在客户端,而init / 1是在服务器上运行的相应回调函数。对于调用/ 2请求,我们实现了一个handle_call / 3回调,它接收请求,我们从中接收请求的过程(_from )和当前服务器状态(名称)。 handle_call / 3回调以{:reply,reply,new_state}的格式返回一个元组。元组的第一个元素,即:reply,表示服务器应该将回复发送回客户端。第二个元素reply是发送给客户端的,而第三个元素new_state是新的服务器状态。对于cast / 2请求,我们实现一个handle_cast / 2回调,它接收请求和当前服务器状态(名称) 。 handle_cast / 2回调以{:noreply,new_state}的格式返回一个元组。请注意,在实际的应用程序中,我们可能会实现以下回调:使用同步调用创建而不是异步投射。我们正在通过这种方式来说明如何实现强制回调。还有其他元组格式handle_call / 3和handle_cast / 2回调可能会返回。还有其他的回调,比如我们可以实现的terminate / 2和code_change / 3。欢迎您访问完整的GenServer文档以了解更多有关这些内容的信息。现在,让我们编写一些测试以确保GenServer按预期工作。测试GenServer测试GenServer与测试代理没有多大区别。我们将在设置回调中产生服务器,并在整个测试中使用它。在test / kv / registry_test.exs中使用以下命令创建一个文件:

defmodule KV.RegistryTest do use ExUnit.Case, async: true setup do {:ok, registry} = start_supervised KV.Registry %{registry: registry} end test "spawns buckets", %{registry: registry} do assert KV.Registry.lookup(registry, "shopping") == :error KV.Registry.create(registry, "shopping") assert {:ok, bucket} = KV.Registry.lookup(registry, "shopping") KV.Bucket.put(bucket, "milk", 1) assert KV.Bucket.get(bucket, "milk") == 1 end

我们的测试应该立即通过!再一次,ExUnit会在每次测试后关闭注册表,因为我们使用start_supervised来启动它。 如果需要停止GenServer作为应用程序逻辑的一部分,可以使用GenServer.stop / 1函数:

:## Client API @doc """ Stops the registry. """ def stop(server) do GenServer.stop(server) end

监控我们的注册表的需求几乎完成。 剩下的唯一问题是,如果一个桶停止或崩溃,注册表可能会失效。 让我们添加一个测试给KV.RegistryTest,它揭示了这个bug:test“在出口处删除桶”,%{registry:registry} do

   KV.Registry.create(注册表,“购物”)

   {:ok,bucket} = KV.Registry.lookup(注册表,“购物”)

  Agent.stop(桶)

   断言KV.Registry.lookup(注册表,“购物”)==:错误

end

上面的测试将在最后一次断言时失败,因为即使在我们停止存储桶过程之后,存储桶名仍保留在注册表中。为了修复此错误,我们需要注册表来监视它产生的每个存储桶。 一旦我们建立了一个监视器,注册表会在每次存储桶过程结束时收到一个通知,这样我们就可以清理注册表了。我们先用监视器和iex -S混合启动一个新控制台:

:iex> {:ok, pid} = KV.Bucket.start_link([]) {:ok, #PID<0.66.0>} iex> Process.monitor(pid) #Reference<0.0.0.551> iex> Agent.stop(pid) :ok iex> flush() {:DOWN, #Reference<0.0.0.551>, :process, #PID<0.66.0>, :normal}

注意Process.monitor(pid)返回一个唯一引用,它允许我们将即将到来的消息与该监视引用进行匹配。 在我们停止代理之后,我们可以清除/ 0所有消息,并注意到:DOWN消息到达,并且监视器返回确切的引用,通知bucket进程退出原因:normal.Let重新实现服务器回调以修复错误并且 使测试通过。 首先,我们将GenServer状态修改为两个字典:一个包含名称 - > pid,另一个包含ref - >名称。 然后,我们需要监视handle_cast / 2上的存储桶并执行handle_info / 2回调来处理监视消息。 完整的服务器回调实现如下所示:

## Server callbacks def init(:ok) do names = %{} refs = %{} {:ok, {names, refs}} end def handle_call({:lookup, name}, _from, {names, _} = state) do {:reply, Map.fetch(names, name), state} end def handle_cast({:create, name}, {names, refs}) do if Map.has_key?(names, name) do {:noreply, {names, refs}} else {:ok, pid} = KV.Bucket.start_link([]) ref = Process.monitor(pid) refs = Map.put(refs, ref, name) names = Map.put(names, name, pid) {:noreply, {names, refs}} end end def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do {name, refs} = Map.pop(refs, ref) names = Map.delete(names, name) {:noreply, {names, refs}} end def handle_info(_msg, state) do {:noreply, state} end

注意到我们能够在不更改任何客户端API的情况下大幅改变服务器实现。 这是显式隔离服务器和客户端的好处之一。最后,与其他回调不同,我们为handle_info / 2定义了一个“全部捕获”子句,它丢弃任何未知消息。 为了理解为什么,让我们继续下一节:.call,cast或info?到目前为止,我们已经使用了三个回调:handle_call / 3,handle_cast / 2和handle_info / 2。 以下是我们在决定何时使用每种方法时应该考虑的事项:

  1. handle_call/3必须用于同步请求。这应该是默认选择,因为等待服务器回复是有用的反压机制。
  • 当你不关心回复时,handle_cast / 2必须用于异步请求。 演员甚至不保证服务器已收到该消息,因此应谨慎使用。 例如,我们在本章中定义的create / 2函数应该使用call / 2。 我们使用了cast / 2作为教学目的。
  • handle_info / 2必须用于服务器可能收到的所有其他消息,这些消息不是通过GenServer.call/2或GenServer.cast / 2发送的,包括使用send / 2发送的常规消息。 监控:DOWN消息就是这样一个例子。

由于任何消息(包括通过send / 2发送的消息)都会转到handle_info / 2,因此可能会有意外的消息到达服务器。 因此,如果我们没有定义catch-all子句,这些消息可能会导致我们的注册表崩溃,因为没有子句匹配。 尽管我们不需要担心handle_call / 3和handle_cast / 2的这种情况。 调用和强制转换只能通过GenServer API完成,因此一个未知消息很可能是开发人员的错误。

为了帮助开发人员记住呼叫,演员表和信息之间的区别,支持的返回值等,Benjamin Tan Wei Hao创建了一个优秀的GenServer备忘单

监视器或链接?

我们之前已经了解了流程章节中的链接。现在,随着注册表完成,您可能会想知道:我们应该在什么时候使用监视器,什么时候应该使用链接?

链接是双向的。如果你连接两个进程并且其中一个进程崩溃,另一边也会崩溃(除非它陷入退出)。监视器是单向的:只有监视过程会收到关于监视过程的通知。换句话说:当您想要链接崩溃时使用链接,并监控何时您只想知道崩溃,退出等等。

回到我们的handle_cast / 2实现中,您可以看到注册表既连接又监视存储桶:

{:ok, pid} = KV.Bucket.start_link([])
ref = Process.monitor(pid)

这是一个坏主意,因为我们不希望注册表崩溃时发生崩溃! 我们通常避免直接创建新流程,相反,我们将此职责委托给主管。 正如我们将在下一章中看到的,主管依赖链接,这解释了为什么基于链接的API(spawn_link,start_link等)在Elixir和OTP中非常流行。