Skip to content

Commit

Permalink
Merge pull request #2 from maxohq/feat/reconnect
Browse files Browse the repository at this point in the history
Feat: graceful reconnection handling
  • Loading branch information
mindreframer authored Nov 12, 2023
2 parents 5b6f024 + f4a9be2 commit 6e3df7d
Show file tree
Hide file tree
Showing 16 changed files with 416 additions and 92 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ surrealix-*.tar

# SurrealDB shell history
history.txt

# SurrealDB data
data.db/
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v0.1.5 (2023-11-12)

- graceful reconnection handing
- ability to register on-connect callbacks
- ability to configure max backoff / step duration for connection retries

## v0.1.4 (2023-11-11)

- much simpler handling for live query callbacks (https://github.com/maxohq/surrealix/commit/c87fe9b3853d090cb622a2478595b99a213d7fa9)
Expand Down
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,37 @@ end)
Surrealix.all_live_queries(pid)
```

## Handling reconnection

To properly deal with connection drops, provide an `on_auth`-callback when starting a Surrealix Socket. `on_auth` callbacks should include logic to authenticate the connection and select a namespace / database.

This callback is called in a non-blocking fashion, so it is important to wait until the `on_auth`-callback is finished. This is done via `Surrealix.wait_until_auth_ready(pid)` function, that checks auth status via busy-waiting.

Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function are registed on SocketState and will be re-established after a successful reconnection.

```elixir
{:ok, pid} =
Surrealix.start(
on_auth: fn pid, _state ->
IO.puts("PID: #{inspect(pid)}")
Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin)
Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use)
end
)

# blocks until the `on_auth` callback is executed
Surrealix.wait_until_auth_ready(pid)

# now we can execute queries, that require auth
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)

Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)
```

## Telemetry
Currently library publishes only 3 events:
```elixir
Expand All @@ -70,8 +101,9 @@ Surrealix.Telemetry.Logger.setup()

```elixir
## in config.exs / runtime.exs file
# default 5000
config :surrealix, timeout: :infinity
config :surrealix, backoff_max: 2000
config :surrealix, backoff_step: 50
config :surrealix, timeout: :infinity # default 5000
config :surrealix, :conn,
hostname: "0.0.0.0",
port: 8000
Expand Down
3 changes: 1 addition & 2 deletions bin/sur-server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,4 @@ surreal start \
--allow-funcs \
--allow-net \
--bind 0.0.0.0:8000 \
memory
## file:mydatabase.db
file:data.db
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ config :logger, :console,

level: :warning

config :surrealix, :timeout, 5000

config :surrealix, :conn,
hostname: "0.0.0.0",
port: 8000
14 changes: 6 additions & 8 deletions gen/src/ApiGenerator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class ApiGenerator extends GenBase {
Show all currently registered live queries (SQL)
"""
def all_live_queries(pid) do
:sys.get_state(pid) |> SocketState.all_lq()
:sys.get_state(pid) |> SocketState.all_live_queries()
end
@doc """
Expand All @@ -88,16 +88,14 @@ export class ApiGenerator extends GenBase {
Params:
sql: string
vars: map with variables to interpolate into SQL
callback: fn (event, data, config)
callback: fn (data, live_query_id)
"""
@spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok
@spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok
def live_query(pid, sql, vars \\\\ %{}, callback) do
with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)},
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
event = [:live_query, lq_id]
:ok = Surrealix.Dispatch.attach("#{lq_id}_main", event, callback)
:ok = WebSockex.cast(pid, {:register_lq, sql, lq_id})
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
:ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback})
{:ok, res}
else
{:sql_live_check, false} -> {:error, "Not a live query: \`#{sql}\`!"}
Expand Down
24 changes: 24 additions & 0 deletions lib/sand.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Sand do
def run do
{:ok, pid} =
Surrealix.start(
on_auth: fn pid, _state ->
IO.puts("PID: #{inspect(pid)}")
Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin)
Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use)
end
)

# blocks until the `on_auth` callback is executed
Surrealix.wait_until_auth_ready(pid)

# now we can execute normal "CRUD" queries
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)

Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)
end
end
3 changes: 3 additions & 0 deletions lib/surrealix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ defmodule Surrealix do
Show all currently registered live queries (SQL)
"""
defdelegate all_live_queries(pid), to: Api
defdelegate reset_live_queries(pid), to: Socket
defdelegate set_auth_ready(pid, value), to: Socket
defdelegate wait_until_auth_ready(pid), to: Socket

@doc """
Convenience method, that combines sending an query (live_query) and registering a callback
Expand Down
8 changes: 4 additions & 4 deletions lib/surrealix/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Surrealix.Api do
Show all currently registered live queries (SQL)
"""
def all_live_queries(pid) do
:sys.get_state(pid) |> SocketState.all_lq()
:sys.get_state(pid) |> SocketState.all_live_queries()
end

@doc """
Expand All @@ -27,14 +27,14 @@ defmodule Surrealix.Api do
Params:
sql: string
vars: map with variables to interpolate into SQL
callback: fn (event, data, config)
callback: fn (data, live_query_id)
"""
@spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok
@spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok
def live_query(pid, sql, vars \\ %{}, callback) do
with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)},
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
:ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback})
:ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback})
{:ok, res}
else
{:sql_live_check, false} -> {:error, "Not a live query: `#{sql}`!"}
Expand Down
4 changes: 3 additions & 1 deletion lib/surrealix/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ defmodule Surrealix.Application do

@impl true
def start(_type, _args) do
children = []
children = [
{Surrealix.RescueProcess, []}
]

opts = [strategy: :one_for_one, name: Surrealix.Supervisor]
Supervisor.start_link(children, opts)
Expand Down
7 changes: 7 additions & 0 deletions lib/surrealix/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,12 @@ defmodule Surrealix.Config do
def base_conn_opts, do: @base_connection_opts

@timeout Application.compile_env(:surrealix, :timeout, 5000)

def default_timeout, do: @timeout
def task_opts_default, do: [timeout: @timeout]

@backoff_max Application.compile_env(:surrealix, :backoff_max, 2000)
@backoff_step Application.compile_env(:surrealix, :backoff_step, 50)
def backoff_max, do: @backoff_max
def backoff_step, do: @backoff_step
end
134 changes: 134 additions & 0 deletions lib/surrealix/patiently.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
defmodule Surrealix.Patiently do
alias Surrealix.Patiently
## https://github.com/dantswain/patiently/blob/main/lib/patiently.ex
@moduledoc false
@type iteration :: (-> term)
@type reducer :: (term -> term)
@type predicate :: (term -> boolean)
@type condition :: (-> boolean)
@type opt :: {:dwell, pos_integer} | {:max_tries, pos_integer}
@type opts :: [opt]

defmodule GaveUp do
@moduledoc """
Exception raised by Patiently when a condition fails to converge
"""

defexception message: nil
@type t :: %__MODULE__{__exception__: true}

@doc false
@spec exception({pos_integer, pos_integer}) :: t
def exception({dwell, max_tries}) do
message =
"Gave up waiting for condition after #{max_tries} " <>
"iterations waiting #{dwell} msec between tries."

%Patiently.GaveUp{message: message}
end
end

@default_dwell 100
@default_tries 10

@spec wait_for(condition, opts) :: :ok | :error
def wait_for(condition, opts \\ []) do
wait_while(condition, & &1, opts)
end

@spec wait_for!(condition, opts) :: :ok | no_return
def wait_for!(condition, opts \\ []) do
ok_or_raise(wait_for(condition, opts), opts)
end

@spec wait_for(iteration, predicate, opts) :: :ok | :error
def wait_for(iteration, condition, opts) do
wait_while(iteration, condition, opts)
end

@spec wait_for!(iteration, predicate, opts) :: :ok | no_return
def wait_for!(iteration, condition, opts) do
ok_or_raise(wait_for(iteration, condition, opts), opts)
end

@spec wait_reduce(reducer, predicate, term, opts) :: {:ok, term} | {:error, term}
def wait_reduce(reducer, predicate, acc0, opts) do
wait_reduce_loop(reducer, predicate, acc0, 0, opts)
end

@spec wait_reduce!(reducer, predicate, term, opts) :: {:ok, term} | no_return
def wait_reduce!(reducer, predicate, acc0, opts) do
ok_or_raise(wait_reduce_loop(reducer, predicate, acc0, 0, opts), opts)
end

@spec wait_flatten(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | {:error, [term]}
def wait_flatten(iteration, predicate, opts \\ [])

def wait_flatten(iteration, min_length, opts) when is_integer(min_length) and min_length > 0 do
wait_flatten(iteration, fn acc -> length(acc) >= min_length end, opts)
end

def wait_flatten(iteration, predicate, opts) when is_function(predicate, 1) do
reducer = fn acc -> List.flatten([iteration.() | acc]) end
wait_reduce_loop(reducer, predicate, [], 0, opts)
end

@spec wait_flatten!(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | no_return
def wait_flatten!(iteration, predicate_or_min_length, opts) do
ok_or_raise(wait_flatten(iteration, predicate_or_min_length, opts), opts)
end

@spec wait_for_death(pid, opts) :: :ok | :error
def wait_for_death(pid, opts \\ []) do
wait_for(fn -> !Process.alive?(pid) end, opts)
end

@spec wait_for_death!(pid, opts) :: :ok | no_return
def wait_for_death!(pid, opts \\ []) do
ok_or_raise(wait_for_death(pid, opts), opts)
end

defp ok_or_raise(:ok, _), do: :ok
defp ok_or_raise({:ok, acc}, _), do: {:ok, acc}

defp ok_or_raise(:error, opts) do
raise Patiently.GaveUp, {dwell(opts), max_tries(opts)}
end

defp ok_or_raise({:error, _}, opts) do
raise Patiently.GaveUp, {dwell(opts), max_tries(opts)}
end

defp just_status({:ok, _}), do: :ok
defp just_status({:error, _}), do: :error

defp wait_while(poller, condition, opts) do
reducer = fn acc -> [poller.() | acc] end
predicate = fn [most_recent | _] -> condition.(most_recent) end
ok_or_err = wait_reduce_loop(reducer, predicate, [], 0, opts)
just_status(ok_or_err)
end

defp wait_reduce_loop(reducer, predicate, acc, tries, opts) do
acc_out = reducer.(acc)

if predicate.(acc_out) do
{:ok, acc_out}
else
if tries >= max_tries(opts) do
{:error, acc_out}
else
:timer.sleep(dwell(opts))
wait_reduce_loop(reducer, predicate, acc_out, tries + 1, opts)
end
end
end

defp dwell(opts) do
Keyword.get(opts, :dwell, @default_dwell)
end

defp max_tries(opts) do
Keyword.get(opts, :max_tries, @default_tries)
end
end
54 changes: 54 additions & 0 deletions lib/surrealix/rescue_process.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
defmodule Surrealix.RescueProcess do
@moduledoc """
This module is responsible to execute callbacks for on_auth hooks, that happen after a connection is established.
This can not be done direcly in the `handle_connect` callback, since then it blocks the execution of the WebSockex process.
To workaround this issue, we delegate this responsibility to a `RescueProcess`, that executes those callbacks out-of-band.
Also we need to use `GenServer.cast`, so that the Socket can properly continue and not be deadlocked.
"""
use GenServer
alias Surrealix.SocketState

def start_link([]) do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end

def execute_callback({socket_pid, state = %SocketState{}}) do
GenServer.cast(__MODULE__, {:execute, socket_pid, state})
end

#################
# Callbacks
#################

@impl true
def init([]) do
{:ok, []}
end

@impl true
def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do
if(!is_nil(socket_state.on_auth)) do
# set AUTH status to false, so that the busy-waiting does not trigger
Surrealix.set_auth_ready(socket_pid, false)

# on_auth callback used to login / and pick NS / DB
socket_state.on_auth.(socket_pid, socket_state)

# now reconnect live queries
queries = SocketState.all_live_queries(socket_state)
# we need to reset the current state of live queries, since the connection was dead anyways
Surrealix.reset_live_queries(socket_pid)

# now we re-establish all live queries, that we very listening to before the connection drop.
for {sql, callback} <- queries do
Surrealix.live_query(socket_pid, sql, callback)
end

# set AUTH status to true, so `Surrealix.wait_until_auth_ready(pid)` unblocks and allows further queries on the authenticated socket.
Surrealix.set_auth_ready(socket_pid, true)
end

{:noreply, []}
end
end
Loading

0 comments on commit 6e3df7d

Please sign in to comment.