Skip to content

Commit

Permalink
OTP 24 support
Browse files Browse the repository at this point in the history
  • Loading branch information
koudelka committed May 17, 2021
1 parent 77dabeb commit bc23e3c
Show file tree
Hide file tree
Showing 23 changed files with 207 additions and 172 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
erlang 21.3.2
elixir 1.8.1
erlang 24.0
elixir 1.12.0-rc.1-otp-24
16 changes: 8 additions & 8 deletions examples/ecto_poll_queue/mix.postgres.lock
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
%{
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm"},
"db_connection": {:hex, :db_connection, "2.0.5", "ddb2ba6761a08b2bb9ca0e7d260e8f4dd39067426d835c24491a321b7f92a4da", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm"},
"ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm"},
"ecto_sql": {:hex, :ecto_sql, "3.0.5", "7e44172b4f7aca4469f38d7f6a3da394dbf43a1bcf0ca975e958cb957becd74e", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.6", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.3.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.1", "63247d4a5ad6b9de57a0bac5d807e1c32d41e39c04b8a4156a26c63bcd8a2e49", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"telemetry": {:hex, :telemetry, "0.3.0", "099a7f3ce31e4780f971b4630a3c22ec66d22208bc090fe33a2a3a6a67754a73", [:rebar3], [], "hexpm"},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},
"db_connection": {:hex, :db_connection, "2.0.5", "ddb2ba6761a08b2bb9ca0e7d260e8f4dd39067426d835c24491a321b7f92a4da", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm", "ced0780bed50430f770b74fcde870c4a50c815124ecf9fee20d67a465966eb4f"},
"decimal": {:hex, :decimal, "1.6.0", "bfd84d90ff966e1f5d4370bdd3943432d8f65f07d3bab48001aebd7030590dcc", [:mix], [], "hexpm", "bbd124e240e3ff40f407d50fced3736049e72a73d547f69201484d3a624ab569"},
"dialyxir": {:hex, :dialyxir, "0.5.1", "b331b091720fd93e878137add264bac4f644e1ddae07a70bf7062c7862c4b952", [:mix], [], "hexpm", "6c32a70ed5d452c6650916555b1f96c79af5fc4bf286997f8b15f213de786f73"},
"ecto": {:hex, :ecto, "3.0.7", "44dda84ac6b17bbbdeb8ac5dfef08b7da253b37a453c34ab1a98de7f7e5fec7f", [:mix], [{:decimal, "~> 1.6", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:poison, "~> 2.2 or ~> 3.0", [hex: :poison, repo: "hexpm", optional: true]}], "hexpm", "8acd54c5c92c7dbe5a9e76adc22ffb4e2e76e5298989eed2068a4f04bc8e6fef"},
"ecto_sql": {:hex, :ecto_sql, "3.0.5", "7e44172b4f7aca4469f38d7f6a3da394dbf43a1bcf0ca975e958cb957becd74e", [:mix], [{:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.0.6", [hex: :ecto, repo: "hexpm", optional: false]}, {:mariaex, "~> 0.9.1", [hex: :mariaex, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.14.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.3.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5bd47a499d27084afaa3c2154cfedb478ea2fcc926ef59fa515ee089701e390"},
"postgrex": {:hex, :postgrex, "0.14.1", "63247d4a5ad6b9de57a0bac5d807e1c32d41e39c04b8a4156a26c63bcd8a2e49", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "a20f189bdd5a219c484818fde18e09ace20cd15fe630a828fde70bd6efdeb23b"},
"telemetry": {:hex, :telemetry, "0.3.0", "099a7f3ce31e4780f971b4630a3c22ec66d22208bc090fe33a2a3a6a67754a73", [:rebar3], [], "hexpm", "63d9f37d319ff331a51f6221310deb5aac8ea3dcf5e0369d689121b5e52f72d4"},
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule EctoPollQueueExampleTest do
alias Honeydew.EctoSource.State
alias Honeydew.PollQueue.State, as: PollQueueState
alias Honeydew.Queue.State, as: QueueState
alias Honeydew.Processes

@moduletag :capture_log

Expand Down Expand Up @@ -109,7 +110,7 @@ defmodule EctoPollQueueExampleTest do
assert %{queue: %{stale: 1, ready: 0}} = Honeydew.status(User.notify_queue())

User.notify_queue()
|> Honeydew.get_queue
|> Processes.get_queue()
|> send(:__reset_stale__)

assert %{queue: %{stale: 0, ready: 1}} = Honeydew.status(User.notify_queue())
Expand Down Expand Up @@ -193,15 +194,15 @@ defmodule EctoPollQueueExampleTest do
defp get_source_state(queue) do
%QueueState{private: %PollQueueState{source: {EctoSource, state}}} =
queue
|> Honeydew.get_queue()
|> Processes.get_queue()
|> :sys.get_state

state
end

defp update_source_state(queue, state_fn) do
queue
|> Honeydew.get_queue()
|> Processes.get_queue()
|> :sys.replace_state(fn %QueueState{private: %PollQueueState{source: {EctoSource, state}} = poll_queue_state} = queue_state ->
%QueueState{queue_state |
private: %PollQueueState{poll_queue_state |
Expand Down
2 changes: 1 addition & 1 deletion examples/global/global.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ end
defmodule QueueApp do
def start do
nodes = [node()]
:ok = Honeydew.start_queue({:global, :my_queue}, queue: {Honeydew.Queue.Mnesia, [nodes, [disc_copies: nodes], []]})
:ok = Honeydew.start_queue({:global, :my_queue}, queue: {Honeydew.Queue.Mnesia, [disc_copies: nodes]})
end
end

Expand Down
128 changes: 23 additions & 105 deletions lib/honeydew.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ defmodule Honeydew do

alias Honeydew.Job
alias Honeydew.JobMonitor
alias Honeydew.Worker
alias Honeydew.WorkerStarter
alias Honeydew.WorkerGroupSupervisor
alias Honeydew.Processes
alias Honeydew.Queue
alias Honeydew.{Queues, Workers}
alias Honeydew.Queues
alias Honeydew.Worker
alias Honeydew.Workers

require Logger

@type mod_or_mod_args :: module | {module, args :: term}
Expand Down Expand Up @@ -125,8 +126,10 @@ defmodule Honeydew do
"""
@spec suspend(queue_name) :: :ok
def suspend(queue) do
Processes.start_process_group_scope(queue)

queue
|> get_all_members(Queues)
|> Processes.get_queues()
|> Enum.each(&Queue.suspend/1)
end

Expand All @@ -135,13 +138,13 @@ defmodule Honeydew do
"""
@spec resume(queue_name) :: :ok
def resume(queue) do
Processes.start_process_group_scope(queue)

queue
|> get_all_members(Queues)
|> Processes.get_queues()
|> Enum.each(&Queue.resume/1)
end



@doc """
Returns the currrent status of the queue and all attached workers.
Expand All @@ -153,9 +156,11 @@ defmodule Honeydew do
@type status_opt :: {:timeout, pos_integer}
@spec status(queue_name, [status_opt]) :: map()
def status(queue, opts \\ []) do
Processes.start_process_group_scope(queue)

queue_status =
queue
|> get_queue
|> Processes.get_queue()
|> Queue.status(opts)

busy_workers =
Expand All @@ -174,7 +179,7 @@ defmodule Honeydew do

workers =
queue
|> get_all_members(Workers)
|> Processes.get_workers()
|> Enum.map(&{&1, nil})
|> Enum.into(%{})
|> Map.merge(busy_workers)
Expand Down Expand Up @@ -213,7 +218,7 @@ defmodule Honeydew do
def filter(queue, filter) do
{:ok, jobs} =
queue
|> get_queue
|> Processes.get_queue()
|> Queue.filter(filter)

jobs
Expand All @@ -231,7 +236,7 @@ defmodule Honeydew do
@spec cancel(Job.t) :: :ok | {:error, :in_progress} | {:error, :not_found}
def cancel(%Job{queue: queue} = job) do
queue
|> get_queue
|> Processes.get_queue()
|> Queue.cancel(job)
end

Expand Down Expand Up @@ -278,11 +283,7 @@ defmodule Honeydew do
@doc false
def enqueue(%Job{queue: queue} = job) do
queue
|> get_queue
|> case do
nil -> raise RuntimeError, no_queues_running_error(job)
queue -> queue
end
|> Processes.get_queue()
|> Queue.enqueue(job)
end

Expand All @@ -302,13 +303,13 @@ defmodule Honeydew do
end

@doc false
def no_queues_running_error(%Job{queue: {:global, _} = queue} = job) do
"can't enqueue job because there aren't any queue processes running for the distributed queue `#{inspect queue}, are you connected to the cluster? #{inspect job} `"
def no_queues_running_error({:global, _} = queue) do
"can't enqueue job because there aren't any queue processes running for the distributed queue `#{inspect queue}`, are you connected to the cluster?"
end

@doc false
def no_queues_running_error(%Job{queue: queue} = job) do
"can't enqueue job #{inspect job} because there aren't any queue processes running for `#{inspect queue}`"
def no_queues_running_error(queue) do
"can't enqueue job because there aren't any queue processes running for `#{inspect queue}`"
end

@deprecated "Honeydew now supervises your queue processes, please use `Honeydew.start_queue/2 instead.`"
Expand Down Expand Up @@ -419,7 +420,7 @@ defmodule Honeydew do
- `Honeydew.start_workers({:global, "my_awesome_queue"}, MyJobModule, nodes: [:clientfacing@dax, :queue@dax])`
"""
defdelegate start_workers(name, module_and_args, opts \\ []), to: Honeydew.Workers
defdelegate start_workers(name, module_and_args, opts \\ []), to: Workers

@deprecated "Honeydew now supervises your worker processes, please use `Honeydew.start_workers/3 instead.`"
def worker_spec(_queue, _module_and_args, _opts) do
Expand All @@ -442,80 +443,6 @@ defmodule Honeydew do
Worker.module_init(self())
end

@groups [Workers, Queues]

Enum.each(@groups, fn group ->
@doc false
def group(queue, unquote(group)) do
name(queue, unquote(group))
end
end)

@processes [WorkerGroupSupervisor, WorkerStarter]

Enum.each(@processes, fn process ->
@doc false
def process(queue, unquote(process)) do
name(queue, unquote(process))
end
end)


@doc false
def create_groups(queue) do
Enum.each(@groups, fn name ->
queue |> group(name) |> :pg2.create
end)
end

@doc false
def delete_groups(queue) do
Enum.each(@groups, fn name ->
queue |> group(name) |> :pg2.delete
end)
end

@doc false
def get_all_members({:global, _} = queue, name) do
queue |> group(name) |> :pg2.get_members
end

@doc false
def get_all_members(queue, name) do
get_all_local_members(queue, name)
end

# we need to know local members to shut down local components
@doc false
def get_all_local_members(queue, name) do
queue |> group(name) |> :pg2.get_local_members
end


@doc false
def get_queue(queue) do
queue
|> get_all_queues
|> case do
{:error, {:no_such_group, _queue}} -> []
queues -> queues
end
|> List.first
end

@doc false
def get_all_queues({:global, _name} = queue) do
queue
|> group(Queues)
|> :pg2.get_members
end

@doc false
def get_all_queues(queue) do
queue
|> group(Queues)
|> :pg2.get_local_members
end

@doc false
def table_name({:global, queue}) do
Expand All @@ -527,14 +454,6 @@ defmodule Honeydew do
to_string(queue)
end

defp name({:global, queue}, component) do
name([:global, queue], component)
end

defp name(queue, component) do
[component, queue] |> List.flatten |> Enum.join(".") |> String.to_atom
end

@doc false
defmacro debug(ast) do
quote do
Expand All @@ -543,5 +462,4 @@ defmodule Honeydew do
end
end
end

end
2 changes: 2 additions & 0 deletions lib/honeydew/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ defmodule Honeydew.Application do

alias Honeydew.Queues
alias Honeydew.Workers
alias Honeydew.ProcessGroupScopeSupervisor

use Application

def start(_type, _args) do
children = [
{Queues, []},
{Workers, []},
{ProcessGroupScopeSupervisor, []}
]

opts = [strategy: :one_for_one, name: Honeydew.Supervisor]
Expand Down
3 changes: 2 additions & 1 deletion lib/honeydew/failure_mode/abandon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Honeydew.FailureMode.Abandon do
require Logger
alias Honeydew.Job
alias Honeydew.Queue
alias Honeydew.Processes

@behaviour Honeydew.FailureMode

Expand All @@ -25,7 +26,7 @@ defmodule Honeydew.FailureMode.Abandon do

# tell the queue that that job can be removed.
queue
|> Honeydew.get_queue
|> Processes.get_queue()
|> Queue.ack(job)

# send the error to the awaiting process, if necessary
Expand Down
3 changes: 2 additions & 1 deletion lib/honeydew/failure_mode/move.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Honeydew.FailureMode.Move do

alias Honeydew.Job
alias Honeydew.Queue
alias Honeydew.Processes

require Logger

Expand All @@ -30,7 +31,7 @@ defmodule Honeydew.FailureMode.Move do

# tell the queue that that job can be removed.
queue
|> Honeydew.get_queue
|> Processes.get_queue()
|> Queue.ack(job)

{:ok, job} =
Expand Down
3 changes: 2 additions & 1 deletion lib/honeydew/failure_mode/retry.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Honeydew.FailureMode.Retry do
alias Honeydew.Job
alias Honeydew.Queue
alias Honeydew.Processes
alias Honeydew.FailureMode.Abandon
alias Honeydew.FailureMode.Move

Expand Down Expand Up @@ -76,7 +77,7 @@ defmodule Honeydew.FailureMode.Retry do
job = %Job{job | failure_private: private, delay_secs: delay_secs, result: {:retrying, reason}}

queue
|> Honeydew.get_queue
|> Processes.get_queue()
|> Queue.nack(job)

# send the error to the awaiting process, if necessary
Expand Down
25 changes: 25 additions & 0 deletions lib/honeydew/process_group_scope_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# Dynamic supervision of :pg scopes (one per queue).
#
defmodule Honeydew.ProcessGroupScopeSupervisor do
@moduledoc false

use DynamicSupervisor

def start_link([]) do
DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end

def init(extra_args) do
DynamicSupervisor.init(strategy: :one_for_one, extra_arguments: extra_args)
end

def start_scope(name) do
child_spec = %{
id: name,
start: {:pg, :start_link, [name]}
}

DynamicSupervisor.start_child(__MODULE__, child_spec)
end
end
Loading

0 comments on commit bc23e3c

Please sign in to comment.