Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: graceful reconnection handling #2

Merged
merged 19 commits into from
Nov 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ surrealix-*.tar

# SurrealDB shell history
history.txt

# SurrealDB data
data.db/
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v0.1.5 (2023-11-12)

- graceful reconnection handing
- ability to register on-connect callbacks
- ability to configure max backoff / step duration for connection retries

## v0.1.4 (2023-11-11)

- much simpler handling for live query callbacks (https://github.com/maxohq/surrealix/commit/c87fe9b3853d090cb622a2478595b99a213d7fa9)
Expand Down
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ end)
Surrealix.all_live_queries(pid)
```

## Handling reconnection

To properly deal with connection drops, provide an `on_auth`-callback when starting a Surrealix Socket. `on_auth` callbacks should include logic to authenticate the connection and select a namespace / database.

This callback is called in a non-blocking fashion, so it is important to wait until the `on_auth`-callback is finished. This is done via `Surrealix.wait_until_auth_ready(pid)` function, that checks auth status via busy-waiting.

Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function are registed on SocketState and will be re-established after a successful reconnection.

```elixir
{:ok, pid} =
Surrealix.start(
on_auth: fn pid, _state ->
IO.puts("PID: #{inspect(pid)}")
Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin)
Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use)
end
)

# blocks until the `on_auth` callback is executed
Surrealix.wait_until_auth_ready(pid)

# now we can execute queries, that require auth
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)

Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)
```

## Telemetry
Currently library publishes only 3 events:
```elixir
Expand All @@ -70,8 +101,9 @@ Surrealix.Telemetry.Logger.setup()

```elixir
## in config.exs / runtime.exs file
# default 5000
config :surrealix, timeout: :infinity
config :surrealix, backoff_max: 2000
config :surrealix, backoff_step: 50
config :surrealix, timeout: :infinity # default 5000
config :surrealix, :conn,
hostname: "0.0.0.0",
port: 8000
Expand Down
3 changes: 1 addition & 2 deletions bin/sur-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@ surreal start \
--allow-funcs \
--allow-net \
--bind 0.0.0.0:8000 \
memory
## file:mydatabase.db
file:data.db
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ config :logger, :console,

level: :warning

config :surrealix, :timeout, 5000

config :surrealix, :conn,
hostname: "0.0.0.0",
port: 8000
14 changes: 6 additions & 8 deletions gen/src/ApiGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class ApiGenerator extends GenBase {
Show all currently registered live queries (SQL)
"""
def all_live_queries(pid) do
:sys.get_state(pid) |> SocketState.all_lq()
:sys.get_state(pid) |> SocketState.all_live_queries()
end

@doc """
Expand All @@ -88,16 +88,14 @@ export class ApiGenerator extends GenBase {
Params:
sql: string
vars: map with variables to interpolate into SQL
callback: fn (event, data, config)
callback: fn (data, live_query_id)
"""
@spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok
@spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok
def live_query(pid, sql, vars \\\\ %{}, callback) do
with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)},
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
event = [:live_query, lq_id]
:ok = Surrealix.Dispatch.attach("#{lq_id}_main", event, callback)
:ok = WebSockex.cast(pid, {:register_lq, sql, lq_id})
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
:ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback})
{:ok, res}
else
{:sql_live_check, false} -> {:error, "Not a live query: \`#{sql}\`!"}
Expand Down
24 changes: 24 additions & 0 deletions lib/sand.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Sand do
def run do
{:ok, pid} =
Surrealix.start(
on_auth: fn pid, _state ->
IO.puts("PID: #{inspect(pid)}")
Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin)
Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use)
end
)

# blocks until the `on_auth` callback is executed
Surrealix.wait_until_auth_ready(pid)

# now we can execute normal "CRUD" queries
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)

Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)
end
end
3 changes: 3 additions & 0 deletions lib/surrealix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ defmodule Surrealix do
Show all currently registered live queries (SQL)
"""
defdelegate all_live_queries(pid), to: Api
defdelegate reset_live_queries(pid), to: Socket
defdelegate set_auth_ready(pid, value), to: Socket
defdelegate wait_until_auth_ready(pid), to: Socket

@doc """
Convenience method, that combines sending an query (live_query) and registering a callback
Expand Down
8 changes: 4 additions & 4 deletions lib/surrealix/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Surrealix.Api do
Show all currently registered live queries (SQL)
"""
def all_live_queries(pid) do
:sys.get_state(pid) |> SocketState.all_lq()
:sys.get_state(pid) |> SocketState.all_live_queries()
end

@doc """
Expand All @@ -27,14 +27,14 @@ defmodule Surrealix.Api do
Params:
sql: string
vars: map with variables to interpolate into SQL
callback: fn (event, data, config)
callback: fn (data, live_query_id)
"""
@spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok
@spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok
def live_query(pid, sql, vars \\ %{}, callback) do
with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)},
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
:ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback})
:ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback})
{:ok, res}
else
{:sql_live_check, false} -> {:error, "Not a live query: `#{sql}`!"}
Expand Down
4 changes: 3 additions & 1 deletion lib/surrealix/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ defmodule Surrealix.Application do

@impl true
def start(_type, _args) do
children = []
children = [
{Surrealix.RescueProcess, []}
]

opts = [strategy: :one_for_one, name: Surrealix.Supervisor]
Supervisor.start_link(children, opts)
Expand Down
7 changes: 7 additions & 0 deletions lib/surrealix/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,12 @@ defmodule Surrealix.Config do
def base_conn_opts, do: @base_connection_opts

@timeout Application.compile_env(:surrealix, :timeout, 5000)

def default_timeout, do: @timeout
def task_opts_default, do: [timeout: @timeout]

@backoff_max Application.compile_env(:surrealix, :backoff_max, 2000)
@backoff_step Application.compile_env(:surrealix, :backoff_step, 50)
def backoff_max, do: @backoff_max
def backoff_step, do: @backoff_step
end
134 changes: 134 additions & 0 deletions lib/surrealix/patiently.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
defmodule Surrealix.Patiently do
alias Surrealix.Patiently
## https://github.com/dantswain/patiently/blob/main/lib/patiently.ex
@moduledoc false
@type iteration :: (-> term)
@type reducer :: (term -> term)
@type predicate :: (term -> boolean)
@type condition :: (-> boolean)
@type opt :: {:dwell, pos_integer} | {:max_tries, pos_integer}
@type opts :: [opt]

defmodule GaveUp do
@moduledoc """
Exception raised by Patiently when a condition fails to converge
"""

defexception message: nil
@type t :: %__MODULE__{__exception__: true}

@doc false
@spec exception({pos_integer, pos_integer}) :: t
def exception({dwell, max_tries}) do
message =
"Gave up waiting for condition after #{max_tries} " <>
"iterations waiting #{dwell} msec between tries."

%Patiently.GaveUp{message: message}
end
end

@default_dwell 100
@default_tries 10

@spec wait_for(condition, opts) :: :ok | :error
def wait_for(condition, opts \\ []) do
wait_while(condition, & &1, opts)
end

@spec wait_for!(condition, opts) :: :ok | no_return
def wait_for!(condition, opts \\ []) do
ok_or_raise(wait_for(condition, opts), opts)
end

@spec wait_for(iteration, predicate, opts) :: :ok | :error
def wait_for(iteration, condition, opts) do
wait_while(iteration, condition, opts)
end

@spec wait_for!(iteration, predicate, opts) :: :ok | no_return
def wait_for!(iteration, condition, opts) do
ok_or_raise(wait_for(iteration, condition, opts), opts)
end

@spec wait_reduce(reducer, predicate, term, opts) :: {:ok, term} | {:error, term}
def wait_reduce(reducer, predicate, acc0, opts) do
wait_reduce_loop(reducer, predicate, acc0, 0, opts)
end

@spec wait_reduce!(reducer, predicate, term, opts) :: {:ok, term} | no_return
def wait_reduce!(reducer, predicate, acc0, opts) do
ok_or_raise(wait_reduce_loop(reducer, predicate, acc0, 0, opts), opts)
end

@spec wait_flatten(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | {:error, [term]}
def wait_flatten(iteration, predicate, opts \\ [])

def wait_flatten(iteration, min_length, opts) when is_integer(min_length) and min_length > 0 do
wait_flatten(iteration, fn acc -> length(acc) >= min_length end, opts)
end

def wait_flatten(iteration, predicate, opts) when is_function(predicate, 1) do
reducer = fn acc -> List.flatten([iteration.() | acc]) end
wait_reduce_loop(reducer, predicate, [], 0, opts)
end

@spec wait_flatten!(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | no_return
def wait_flatten!(iteration, predicate_or_min_length, opts) do
ok_or_raise(wait_flatten(iteration, predicate_or_min_length, opts), opts)
end

@spec wait_for_death(pid, opts) :: :ok | :error
def wait_for_death(pid, opts \\ []) do
wait_for(fn -> !Process.alive?(pid) end, opts)
end

@spec wait_for_death!(pid, opts) :: :ok | no_return
def wait_for_death!(pid, opts \\ []) do
ok_or_raise(wait_for_death(pid, opts), opts)
end

defp ok_or_raise(:ok, _), do: :ok
defp ok_or_raise({:ok, acc}, _), do: {:ok, acc}

defp ok_or_raise(:error, opts) do
raise Patiently.GaveUp, {dwell(opts), max_tries(opts)}
end

defp ok_or_raise({:error, _}, opts) do
raise Patiently.GaveUp, {dwell(opts), max_tries(opts)}
end

defp just_status({:ok, _}), do: :ok
defp just_status({:error, _}), do: :error

defp wait_while(poller, condition, opts) do
reducer = fn acc -> [poller.() | acc] end
predicate = fn [most_recent | _] -> condition.(most_recent) end
ok_or_err = wait_reduce_loop(reducer, predicate, [], 0, opts)
just_status(ok_or_err)
end

defp wait_reduce_loop(reducer, predicate, acc, tries, opts) do
acc_out = reducer.(acc)

if predicate.(acc_out) do
{:ok, acc_out}
else
if tries >= max_tries(opts) do
{:error, acc_out}
else
:timer.sleep(dwell(opts))
wait_reduce_loop(reducer, predicate, acc_out, tries + 1, opts)
end
end
end

defp dwell(opts) do
Keyword.get(opts, :dwell, @default_dwell)
end

defp max_tries(opts) do
Keyword.get(opts, :max_tries, @default_tries)
end
end
54 changes: 54 additions & 0 deletions lib/surrealix/rescue_process.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Surrealix.RescueProcess do
@moduledoc """
This module is responsible to execute callbacks for on_auth hooks, that happen after a connection is established.
This can not be done direcly in the `handle_connect` callback, since then it blocks the execution of the WebSockex process.

To workaround this issue, we delegate this responsibility to a `RescueProcess`, that executes those callbacks out-of-band.
Also we need to use `GenServer.cast`, so that the Socket can properly continue and not be deadlocked.
"""
use GenServer
alias Surrealix.SocketState

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def execute_callback({socket_pid, state = %SocketState{}}) do
GenServer.cast(__MODULE__, {:execute, socket_pid, state})
end

#################
# Callbacks
#################

@impl true
def init([]) do
{:ok, []}
end

@impl true
def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do
if(!is_nil(socket_state.on_auth)) do
# set AUTH status to false, so that the busy-waiting does not trigger
Surrealix.set_auth_ready(socket_pid, false)

# on_auth callback used to login / and pick NS / DB
socket_state.on_auth.(socket_pid, socket_state)

# now reconnect live queries
queries = SocketState.all_live_queries(socket_state)
# we need to reset the current state of live queries, since the connection was dead anyways
Surrealix.reset_live_queries(socket_pid)

# now we re-establish all live queries, that we very listening to before the connection drop.
for {sql, callback} <- queries do
Surrealix.live_query(socket_pid, sql, callback)
end

# set AUTH status to true, so `Surrealix.wait_until_auth_ready(pid)` unblocks and allows further queries on the authenticated socket.
Surrealix.set_auth_ready(socket_pid, true)
end

{:noreply, []}
end
end
Loading