-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Custom queue makes Honeydew crashs by timeout #106
Comments
Hey @sezaru, Can you give me a bit more information about what your use case is? I just want to make sure this isn't an XY problem. :) I try to steer clear of |
Sure. So, I created a fork of the built-in mnesia queue to do 2 things, one is to add a priority parameter to the key so the queue would handle some prioritized jobs before others. And I also changed the mnesia calls to be The main issue here is probably the If Or maybe some way to allow the user to specify the timeout he desires, this is probably the best solution IMO, but I didn't see an easy way to do that in every part of the code that contains a config :honeydew, timeout: 10_000 For completeness, I'm pasting my custom mnesia queue bellow (note that I didn't add the Click to expand!defmodule Notification.Queues.PriorityMnesia do
alias Notification.Queues.PriorityMnesia.{Tables, State, WrappedJob}
alias Honeydew.Queue
require Logger
@behaviour Queue
@poll_interval 1_000
@impl Queue
defdelegate validate_args!(opts), to: Queue.Mnesia
@impl Queue
def init(queue_name, table_opts) do
Tables.all(queue_name)
|> Enum.each(fn {name, opts} ->
opts = Keyword.merge(table_opts, opts)
create_table!(name, opts)
end)
state = State.new!(queue_name)
reset_after_crash(state)
time_warp_mode_warning()
poll()
{:ok, state}
end
@impl Queue
def enqueue(job, %{table: table, counter_table: counter_table} = state) do
id = :mnesia.dirty_update_counter(counter_table, :counter, 1)
record = WrappedJob.new_record(job, id)
:mnesia.activity(:sync_transaction, fn -> :mnesia.write(table, record, :write) end)
{state, WrappedJob.job(record)}
end
@impl Queue
def reserve(%{table: table} = state) do
match_spec = WrappedJob.Helper.reserve_match_spec()
:mnesia.activity(:sync_transaction, fn -> :mnesia.select(table, match_spec, 1, :read) end)
|> case do
:"$end_of_table" ->
{:empty, state}
{[record], _} ->
move_to_in_progress_table(record, state)
{WrappedJob.job(record), state}
end
end
@impl Queue
def ack(%{private: id}, state) do
%{in_progress_table: in_progress_table} = state
pattern = WrappedJob.Helper.id_pattern(id)
:mnesia.activity(:sync_transaction, fn ->
[wrapped_job] = :mnesia.match_object(in_progress_table, pattern, :read)
:mnesia.delete_object(in_progress_table, wrapped_job, :write)
end)
state
end
@impl Queue
def nack(job, state) do
%{private: id, failure_private: failure_private, delay_secs: delay_secs} = job
move_to_pending_table(id, %{failure_private: failure_private, delay_secs: delay_secs}, state)
state
end
@impl Queue
def status(%{table: table, in_progress_table: in_progress_table}) do
info = %{
table => :mnesia.table_info(table, :all),
in_progress_table => :mnesia.table_info(in_progress_table, :all)
}
%{mnesia: info, count: info[table][:size], in_progress: info[in_progress_table][:size]}
end
@impl Queue
def filter(%{table: table}, map) when is_map(map) do
:mnesia.activity(:sync_transaction, fn ->
pattern = WrappedJob.Helper.filter_pattern(map)
table |> :mnesia.match_object(pattern, :read) |> Enum.map(&WrappedJob.job/1)
end)
end
def filter(%{table: table}, fun) when is_function(fun) do
:mnesia.activity(:sync_transaction, fn ->
:mnesia.foldl(
fn record, list ->
job = WrappedJob.job(record)
case fun.(job) do
true -> [job | list]
false -> list
end
end,
[],
table
)
end)
end
@impl Queue
def cancel(%{private: id}, state) do
%{table: table, in_progress_table: in_progress_table} = state
pattern = WrappedJob.Helper.id_pattern(id)
reply =
:mnesia.activity(:sync_transaction, fn ->
table
|> :mnesia.match_object(pattern, :read)
|> case do
[wrapped_job] ->
:mnesia.delete_object(table, wrapped_job, :write)
[] ->
in_progress_table
|> :mnesia.match_object(pattern, :read)
|> case do
[_] -> {:error, :in_progress}
[] -> {:error, :not_found}
end
end
end)
{reply, state}
end
@impl Queue
def handle_info(:__poll__, queue_state) do
poll()
{:noreply, Queue.dispatch(queue_state)}
end
defp create_table!(name, opts) do
{:atomic, :ok} =
with {:aborted, {:already_exists, ^name}} <- :mnesia.create_table(name, opts) do
{:atomic, :ok}
end
:ok = :mnesia.wait_for_tables([name], 15_000)
end
defp poll, do: {:ok, _} = :timer.send_after(@poll_interval, :__poll__)
defp reset_after_crash(%{in_progress_table: in_progress_table} = state) do
:mnesia.activity(:sync_transaction, fn -> :mnesia.first(in_progress_table) end)
|> case do
:"$end_of_table" ->
:ok
key ->
key |> WrappedJob.id_from_key() |> move_to_pending_table(%{}, state)
reset_after_crash(state)
end
end
defp move_to_in_progress_table(record, state) do
%{table: table, in_progress_table: in_progress_table} = state
:mnesia.activity(:sync_transaction, fn ->
:mnesia.delete(table, WrappedJob.key(record), :write)
:mnesia.write(in_progress_table, record, :write)
end)
end
defp move_to_pending_table(id, updates, state) do
%{table: table, in_progress_table: in_progress_table} = state
pattern = WrappedJob.Helper.id_pattern(id)
:mnesia.activity(:sync_transaction, fn ->
record = in_progress_table |> :mnesia.match_object(pattern, :read) |> List.first()
:mnesia.delete_object(in_progress_table, record, :write)
record = record |> WrappedJob.job() |> struct(updates) |> WrappedJob.new_record(id)
:mnesia.write(table, record, :write)
end)
end
defp time_warp_mode_warning do
if :erlang.system_info(:time_warp_mode) != :multi_time_warp do
Logger.warn(
"[Honeydew] It's recommended to use the Mnesia queue with the 'multi_time_warp' time correction mode to minimize montonic clock freezes, see http://erlang.org/doc/apps/erts/time_correction.html#multi-time-warp-mode."
)
end
end
end defmodule Notification.Queues.PriorityMnesia.State do
alias Notification.Queues.PriorityMnesia.Tables
use TypedStruct
typedstruct enforce: true do
field :table, reference
field :counter_table, reference
field :in_progress_table, reference
end
@spec new!(atom) :: t
def new!(queue_name) do
struct!(__MODULE__,
table: Tables.table_name(queue_name),
counter_table: Tables.counter_table_name(queue_name),
in_progress_table: Tables.in_progress_table_name(queue_name)
)
end
end defmodule Notification.Queues.PriorityMnesia.Tables do
@default attributes: [:key, :job], record_name: :wrapped_job
def all(queue_name) do
%{
table_name(queue_name) => Keyword.merge(@default, type: :ordered_set),
in_progress_table_name(queue_name) => Keyword.merge(@default, type: :set),
counter_table_name(queue_name) => [attributes: [:type, :id], type: :set]
}
end
def table_name(queue_name),
do: ["honeydew", inspect(queue_name)] |> Enum.join("_") |> String.to_atom()
def in_progress_table_name(queue_name),
do: ["honeydew", inspect(queue_name), "in_progress"] |> Enum.join("_") |> String.to_atom()
def counter_table_name(queue_name),
do: ["honeydew", inspect(queue_name), "counter"] |> Enum.join("_") |> String.to_atom()
end defmodule Notification.Queues.PriorityMnesia.WrappedJob do
alias Notification.Queues.PriorityMnesia.WrappedJob.{Priorities, Helper}
alias Honeydew.Job
use TypedStruct
typedstruct enforce: true do
field :priority, integer | :_
field :run_at, integer | :_
field :id, integer | :_
field :job, Job.t() | :_
end
def new(job, id) do
%{delay_secs: delay_secs} = job
run_at = Helper.now() + delay_secs
priority = Priorities.priority!(job)
job = %{job | private: id}
new(priority, run_at, id, job)
end
def new(priority, run_at, id, job),
do: struct!(__MODULE__, priority: priority, run_at: run_at, id: id, job: job)
def new_record(job, id), do: new(job, id) |> to_record()
def new_record(priority, run_at, id, job), do: new(priority, run_at, id, job) |> to_record()
def from_record({:wrapped_job, {priority, run_at, id}, job}),
do: new(priority, run_at, id, job)
def to_record(%{priority: priority, run_at: run_at, id: id, job: job}),
do: {:wrapped_job, key(priority, run_at, id), job}
def key(priority, run_at, id), do: {priority, run_at, id}
def key({:wrapped_job, key, _}), do: key
def job({:wrapped_job, _, job}), do: job
def id_from_key({_, _, id}), do: id
end defmodule Notification.Queues.PriorityMnesia.WrappedJob.Helper do
alias Notification.Queues.PriorityMnesia.WrappedJob
alias Honeydew.Job
@job_filter %Job{}
|> Map.from_struct()
|> Enum.map(fn {k, _} -> {k, :_} end)
|> (&struct(Job, &1)).()
def id_pattern(id), do: WrappedJob.new_record(:_, :_, id, :_)
def filter_pattern(map) do
job = struct(@job_filter, map)
WrappedJob.new_record(:_, :_, :_, job)
end
def reserve_match_spec do
pattern = WrappedJob.new_record(:_, :"$1", :_, :_)
[{pattern, [{:"=<", :"$1", now()}], [:"$_"]}]
end
def now, do: :erlang.monotonic_time(:second)
end PS. Totally off-topic, but I noticed that in the built-in mnesia queue you use |
Are you running a multi-node mnesia cluster? I ask because ensuring that a job is written to disk on a single-node cluster isn't a lot of assurance that you'll have continuity of service if that disk dies. I'm guessing that you're using a single node, since honeydew will automatically use the If that's the case, I believe that I'd really prefer not to mess with the messaging timeouts, five seconds is a really long time as it is. If you're waiting five seconds on a single job write, then you're probably doing so much traffic that you're going to be backing up the queue's mailbox into memory. Having that backpressure is really important, it prevents overloads from becoming systemic catastrophes. How big are your job parameters? Are you enqueuing very large binaries? At the moment, the mnesia queue is only intended to be run in a single process throughout the entire cluster, so that usage of I quite like your priority implementation, I'll have to steal that at some point. :) |
Sorry for taking a while to reply.
No, I'm running just a single node for mnesia.
Oh! I didn't know that! I guessed that the dirty functions didn't guarantee disk write. Nice!
Nah, the jobs are very small, it is a job to send notifications to my app via FCM, so their payload needs to be always less than 4kb anyway. I think the issue is that other parts of the system are loading and thrashing the disk so much that sometimes everything hangs for some time (my guess is that I fill my SSD cache, so it needs to dump it to disk and this gives very bad disk performance for some time). But the major issue I see is that I will receive the timeout error, but the job call will still finish correctly, for my case, this means that the job will be added to the queue, I will receive a timeout, meaning that the system will retry to add the job, adding a duplicated one. Since my jobs are notifications to users of my app, this means that they would receive duplicated notifications, which is bad, that's why for my case I prefer to hang there and wait then simply timeout and not be sure if the message went through or not.
haha thanks. |
Hello, I created a custom mnesia queue that do some specific things to my use case.
The issue is that my queue is slower than the built-in mnesia queue, so sometimes it takes more than 5 seconds to queue a new job, making
GenServer.call(queue_process, {:enqueue, job})
timeout and crash.The obvious solution to that is to bump the timeout value, but there is no way to configure that inside Honeydew.
I tried creating a fork and seeing if there is an easy way to pass the timeout value to all
Genserver
calls, but it doesn't seem to be an easy way to do that since these functions can be called in so many places.For me, an easy fix would be to simply change the default of all the calls to use
:infinity
as timeout since it is pretty safe and only creates a problem if you have a deadlock, but I'm not sure if that is ok for you.Do you have any suggestion?
The text was updated successfully, but these errors were encountered: