From 0e8b35c4d8f8ef934f9a53712889da521676cca5 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 14:00:26 +0100 Subject: [PATCH 01/19] Fix: correct the type specs for live_query + adjust code gen --- gen/src/ApiGenerator.ts | 12 +++++------- lib/surrealix/api.ex | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/gen/src/ApiGenerator.ts b/gen/src/ApiGenerator.ts index d0e6abd..79ad620 100644 --- a/gen/src/ApiGenerator.ts +++ b/gen/src/ApiGenerator.ts @@ -88,16 +88,14 @@ export class ApiGenerator extends GenBase { Params: sql: string vars: map with variables to interpolate into SQL - callback: fn (event, data, config) + callback: fn (data, live_query_id) """ - @spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok + @spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok def live_query(pid, sql, vars \\\\ %{}, callback) do with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)}, - {:ok, res} <- query(pid, sql, vars), - %{"result" => [%{"result" => lq_id}]} <- res do - event = [:live_query, lq_id] - :ok = Surrealix.Dispatch.attach("#{lq_id}_main", event, callback) - :ok = WebSockex.cast(pid, {:register_lq, sql, lq_id}) + {:ok, res} <- query(pid, sql, vars), + %{"result" => [%{"result" => lq_id}]} <- res do + :ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback}) {:ok, res} else {:sql_live_check, false} -> {:error, "Not a live query: \`#{sql}\`!"} diff --git a/lib/surrealix/api.ex b/lib/surrealix/api.ex index cb4c09b..d51d84a 100644 --- a/lib/surrealix/api.ex +++ b/lib/surrealix/api.ex @@ -27,9 +27,9 @@ defmodule Surrealix.Api do Params: sql: string vars: map with variables to interpolate into SQL - callback: fn (event, data, config) + callback: fn (data, live_query_id) """ - @spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok + @spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok def live_query(pid, sql, vars \\ %{}, callback) do with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)}, {:ok, res} <- query(pid, sql, vars), From e969af3fc018f11c80bbde2eb14c85fcd6d1ecab Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 14:00:58 +0100 Subject: [PATCH 02/19] Chore: changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 220944b..e7dfa0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## v0.1.5 (xxx) + +- graceful reconnection handing + ## v0.1.4 (2023-11-11) - much simpler handling for live query callbacks (https://github.com/maxohq/surrealix/commit/c87fe9b3853d090cb622a2478595b99a213d7fa9) From e5c0cbbd5c3f1526d49de1e3296ca67dbf2c95a4 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 17:02:31 +0100 Subject: [PATCH 03/19] WIP: reconnection attempts --- lib/sand.ex | 39 +++++++++++++++++++++++++++++++++++ lib/surrealix/socket.ex | 24 ++++++++++++++++++++- lib/surrealix/socket_state.ex | 4 +++- 3 files changed, 65 insertions(+), 2 deletions(-) create mode 100644 lib/sand.ex diff --git a/lib/sand.ex b/lib/sand.ex new file mode 100644 index 0000000..8b00b72 --- /dev/null +++ b/lib/sand.ex @@ -0,0 +1,39 @@ +# @callback handle_connect(conn :: WebSockex.Conn.t(), state :: term) :: {:ok, new_state :: term} +# @callback handle_disconnect(connection_status_map, state :: term) :: +# {:ok, new_state} +# | {:reconnect, new_state} +# | {:reconnect, new_conn :: WebSockex.Conn.t(), new_state} +# when new_state: term +# @callback terminate(close_reason, state :: term) :: any + +defmodule Sand do + def run do + # {:ok, pid} = Surrealix.start(namespace: "test", database: "test", debug: [:trace]) + {:ok, pid} = + Surrealix.start( + namespace: "test", + database: "test", + on_connect: fn pid, _state, _conn -> + IO.puts("GOT PID: #{inspect(pid)}") + # {:ok, _} = Surrealix.signin(pid, %{user: "root", pass: "root"}) + # {:ok, _} = Surrealix.use(pid, "test", "test") + end, + async: true, + debug: [:trace] + ) + + # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + # IO.inspect({data, query_id}, label: "callback") + # end) + + # Surrealix.on_connect(pid, fn -> + # Surrealix.signin(pid, %{user: "root", pass: "root"}) + # Surrealix.use(pid, "test", "test") + # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + # IO.inspect({data, query_id}, label: "callback") + # end) + # end) + + Surrealix.stop(pid) + end +end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index db4fade..706826b 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -32,11 +32,12 @@ defmodule Surrealix.Socket do hostname = Keyword.get(opts, :hostname) port = Keyword.get(opts, :port) + on_connect = Keyword.get(opts, :on_connect) apply(WebSockex, fun_name, [ "ws://#{hostname}:#{port}/rpc", __MODULE__, - SocketState.new(), + SocketState.new(on_connect), opts ]) end @@ -52,6 +53,27 @@ defmodule Surrealix.Socket do exit(:normal) end + def handle_disconnect(connection_status_map, state) do + IO.inspect(%{status: connection_status_map}, label: "DISCONNECT") + attempt_number = connection_status_map.attempt_number + to_sleep = attempt_number * 5 + IO.puts("******** SLEEPING FOR #{to_sleep}ms...") + + Process.sleep(to_sleep) + {:reconnect, state} + end + + def handle_connect(conn, state = %SocketState{}) do + IO.inspect(%{state: state, conn: conn}, label: "CONNECT") + + if(state.on_connect) do + IO.inspect(%{pid: self(), state: state, conn: conn}, label: "***** ON_CONNECT callback") + # state.on_connect.(self(), state, conn) + end + + {:ok, state} + end + def handle_cast({:register_lq, sql, query_id, callback}, state) do state = SocketState.add_lq(state, sql, query_id, callback) {:ok, state} diff --git a/lib/surrealix/socket_state.ex b/lib/surrealix/socket_state.ex index 3e446d3..9de522b 100644 --- a/lib/surrealix/socket_state.ex +++ b/lib/surrealix/socket_state.ex @@ -12,9 +12,11 @@ defmodule Surrealix.SocketState do """ defstruct pending: %{}, lq_running: %{}, - lq_sql: MapSet.new() + lq_sql: MapSet.new(), + on_connect: nil def new(), do: %SocketState{} + def new(on_connect), do: %SocketState{on_connect: on_connect} @doc """ Register a task for a particular request ID From 4abeb732526edfa0ed646d9a87a23b9398ece32c Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 22:55:04 +0100 Subject: [PATCH 04/19] Chore: configure timeout to 5000 ms --- config/config.exs | 2 ++ lib/surrealix/config.ex | 2 ++ lib/surrealix/socket.ex | 2 +- 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/config/config.exs b/config/config.exs index 1dfbf39..9ded11e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,6 +8,8 @@ config :logger, :console, level: :warning +config :surrealix, :timeout, 5000 + config :surrealix, :conn, hostname: "0.0.0.0", port: 8000 diff --git a/lib/surrealix/config.ex b/lib/surrealix/config.ex index 587bb29..14f7534 100644 --- a/lib/surrealix/config.ex +++ b/lib/surrealix/config.ex @@ -20,5 +20,7 @@ defmodule Surrealix.Config do def base_conn_opts, do: @base_connection_opts @timeout Application.compile_env(:surrealix, :timeout, 5000) + + def default_timeout, do: @timeout def task_opts_default, do: [timeout: @timeout] end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index 706826b..0193168 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -134,7 +134,7 @@ defmodule Surrealix.Socket do WebSockex.cast(pid, {method, args, id, task}) - task_timeout = Keyword.get(opts, :timeout, :infinity) + task_timeout = Keyword.get(opts, :timeout, Config.default_timeout()) res = Task.await(task, task_timeout) Telemetry.stop(:exec_method, start_time, meta) res From 94577abfe486036bade3bedb22366db803b3ae4b Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 23:01:08 +0100 Subject: [PATCH 05/19] Feat: introduce RescueProcess to handle callbacks in non-blocking fashion --- lib/sand.ex | 8 ++++--- lib/surrealix/application.ex | 4 +++- lib/surrealix/rescue_process.ex | 40 +++++++++++++++++++++++++++++++++ lib/surrealix/socket.ex | 14 +++++------- 4 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 lib/surrealix/rescue_process.ex diff --git a/lib/sand.ex b/lib/sand.ex index 8b00b72..99a09d6 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -13,10 +13,12 @@ defmodule Sand do Surrealix.start( namespace: "test", database: "test", - on_connect: fn pid, _state, _conn -> + on_connect: fn pid, _state -> IO.puts("GOT PID: #{inspect(pid)}") - # {:ok, _} = Surrealix.signin(pid, %{user: "root", pass: "root"}) - # {:ok, _} = Surrealix.use(pid, "test", "test") + IO.puts("SIGNIN...") + Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect() + IO.puts("USE...") + Surrealix.use(pid, "test", "test") |> IO.inspect() end, async: true, debug: [:trace] diff --git a/lib/surrealix/application.ex b/lib/surrealix/application.ex index c0b830a..0117f16 100644 --- a/lib/surrealix/application.ex +++ b/lib/surrealix/application.ex @@ -5,7 +5,9 @@ defmodule Surrealix.Application do @impl true def start(_type, _args) do - children = [] + children = [ + {Surrealix.RescueProcess, []} + ] opts = [strategy: :one_for_one, name: Surrealix.Supervisor] Supervisor.start_link(children, opts) diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex new file mode 100644 index 0000000..e315455 --- /dev/null +++ b/lib/surrealix/rescue_process.ex @@ -0,0 +1,40 @@ +defmodule Surrealix.RescueProcess do + @moduledoc """ + This module is reponsible to execute callbacks for on_connect / re-connect hooks. + + Since we usually like to prepare the websocket connections by executing some further commands on it, + this blocks the websockex loop if we try it directly in the `handle_connect` callback. + + To workaround this issue, we delegate this responsibility to a `RescueProcess`, that executes + the logic to handle connection/re-connection out-of-band. Also we need to use `GenServer.cast`, so that the + Socket can properly continue and not be deadlocked. + """ + use GenServer + alias Surrealix.SocketState + + def start_link([]) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def execute_callback({socket_pid, state = %SocketState{}}) do + GenServer.cast(__MODULE__, {:execute, socket_pid, state}) + end + + # Callbacks + + @impl true + def init([]) do + {:ok, %{}} + end + + @impl true + def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, state) do + IO.puts("******* handle_cast: execute") + + if(!is_nil(socket_state.on_connect)) do + socket_state.on_connect.(socket_pid, socket_state) + end + + {:noreply, state} + end +end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index 0193168..4678862 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -29,17 +29,13 @@ defmodule Surrealix.Socket do defp generic_start(opts, fun_name) when fun_name in [:start, :start_link] do opts = Keyword.merge(Config.base_conn_opts(), opts) - + on_connect = Keyword.get(opts, :on_connect) hostname = Keyword.get(opts, :hostname) port = Keyword.get(opts, :port) - on_connect = Keyword.get(opts, :on_connect) - apply(WebSockex, fun_name, [ - "ws://#{hostname}:#{port}/rpc", - __MODULE__, - SocketState.new(on_connect), - opts - ]) + state = SocketState.new(on_connect) + url = "ws://#{hostname}:#{port}/rpc" + apply(WebSockex, fun_name, [url, __MODULE__, state, opts]) end @spec stop(pid()) :: :ok @@ -68,7 +64,7 @@ defmodule Surrealix.Socket do if(state.on_connect) do IO.inspect(%{pid: self(), state: state, conn: conn}, label: "***** ON_CONNECT callback") - # state.on_connect.(self(), state, conn) + Surrealix.RescueProcess.execute_callback({self(), state}) end {:ok, state} From e96dbec62913664783df97c06b9280445db335f5 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 23:07:11 +0100 Subject: [PATCH 06/19] Chore: configure sur-server to use persistent data --- .gitignore | 3 +++ bin/sur-server.sh | 3 +-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index a2149e1..0b5cd23 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ surrealix-*.tar # SurrealDB shell history history.txt + +# SurrealDB data +data.db/ diff --git a/bin/sur-server.sh b/bin/sur-server.sh index adfc743..e21a4a1 100755 --- a/bin/sur-server.sh +++ b/bin/sur-server.sh @@ -14,5 +14,4 @@ surreal start \ --allow-funcs \ --allow-net \ --bind 0.0.0.0:8000 \ - memory - ## file:mydatabase.db + file:data.db From 88bc2ef35c1e10557a0766113e3bc7057f3897d3 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 23:07:17 +0100 Subject: [PATCH 07/19] WIP --- lib/sand.ex | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/lib/sand.ex b/lib/sand.ex index 99a09d6..e70a270 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -20,22 +20,15 @@ defmodule Sand do IO.puts("USE...") Surrealix.use(pid, "test", "test") |> IO.inspect() end, - async: true, - debug: [:trace] + async: true ) + Surrealix.query(pid, "select * from user") + # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> # IO.inspect({data, query_id}, label: "callback") # end) - # Surrealix.on_connect(pid, fn -> - # Surrealix.signin(pid, %{user: "root", pass: "root"}) - # Surrealix.use(pid, "test", "test") - # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> - # IO.inspect({data, query_id}, label: "callback") - # end) - # end) - Surrealix.stop(pid) end end From 9aee5179e840156e65249d6292447ee61f63e72f Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 23:18:53 +0100 Subject: [PATCH 08/19] Less noise in logs --- lib/sand.ex | 2 +- lib/surrealix/rescue_process.ex | 2 -- lib/surrealix/socket.ex | 6 +++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/lib/sand.ex b/lib/sand.ex index e70a270..8e19ef4 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -29,6 +29,6 @@ defmodule Sand do # IO.inspect({data, query_id}, label: "callback") # end) - Surrealix.stop(pid) + # Surrealix.stop(pid) end end diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index e315455..1760809 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -29,8 +29,6 @@ defmodule Surrealix.RescueProcess do @impl true def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, state) do - IO.puts("******* handle_cast: execute") - if(!is_nil(socket_state.on_connect)) do socket_state.on_connect.(socket_pid, socket_state) end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index 4678862..d9e98e7 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -7,6 +7,7 @@ defmodule Surrealix.Socket do alias Surrealix.Api alias Surrealix.Config + alias Surrealix.RescueProcess alias Surrealix.SocketState alias Surrealix.Telemetry alias Surrealix.Util @@ -52,7 +53,7 @@ defmodule Surrealix.Socket do def handle_disconnect(connection_status_map, state) do IO.inspect(%{status: connection_status_map}, label: "DISCONNECT") attempt_number = connection_status_map.attempt_number - to_sleep = attempt_number * 5 + to_sleep = attempt_number * 20 IO.puts("******** SLEEPING FOR #{to_sleep}ms...") Process.sleep(to_sleep) @@ -63,8 +64,7 @@ defmodule Surrealix.Socket do IO.inspect(%{state: state, conn: conn}, label: "CONNECT") if(state.on_connect) do - IO.inspect(%{pid: self(), state: state, conn: conn}, label: "***** ON_CONNECT callback") - Surrealix.RescueProcess.execute_callback({self(), state}) + RescueProcess.execute_callback({self(), state}) end {:ok, state} From 93ab42b19eb3314607bad31a8950a36b6f7aec12 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sat, 11 Nov 2023 23:40:46 +0100 Subject: [PATCH 09/19] Kinda seems to work... needs live query re-establishment --- lib/sand.ex | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/lib/sand.ex b/lib/sand.ex index 8e19ef4..b217cde 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -16,14 +16,22 @@ defmodule Sand do on_connect: fn pid, _state -> IO.puts("GOT PID: #{inspect(pid)}") IO.puts("SIGNIN...") - Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect() + Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) IO.puts("USE...") - Surrealix.use(pid, "test", "test") |> IO.inspect() + Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) + + Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") + end) end, async: true ) - Surrealix.query(pid, "select * from user") + # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + # IO.inspect({data, query_id}, label: "callback") + # end) + + # Surrealix.query(pid, "select * from user") # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> # IO.inspect({data, query_id}, label: "callback") From 6fdf08cada65d6609aa5d09de5f3ae405c8e9d40 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 01:27:35 +0100 Subject: [PATCH 10/19] Feat: dirty, but working code to re-establish subscriptions on reconnect --- lib/sand.ex | 18 +++-- lib/surrealix.ex | 2 + lib/surrealix/patiently.ex | 134 ++++++++++++++++++++++++++++++++ lib/surrealix/rescue_process.ex | 10 +++ lib/surrealix/socket.ex | 18 +++++ lib/surrealix/socket_state.ex | 9 +++ 6 files changed, 185 insertions(+), 6 deletions(-) create mode 100644 lib/surrealix/patiently.ex diff --git a/lib/sand.ex b/lib/sand.ex index b217cde..f4347c4 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -14,19 +14,25 @@ defmodule Sand do namespace: "test", database: "test", on_connect: fn pid, _state -> + Surrealix.set_connected(pid, false) IO.puts("GOT PID: #{inspect(pid)}") IO.puts("SIGNIN...") Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) IO.puts("USE...") Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) - - Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> - IO.inspect({data, query_id}, label: "callback") - end) - end, - async: true + end ) + Surrealix.Patiently.wait_for(fn -> :sys.get_state(pid) |> Map.get(:connected) == true end) + + Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") + end) + + Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") + end) + # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> # IO.inspect({data, query_id}, label: "callback") # end) diff --git a/lib/surrealix.ex b/lib/surrealix.ex index dab8779..2dd13f4 100644 --- a/lib/surrealix.ex +++ b/lib/surrealix.ex @@ -16,6 +16,8 @@ defmodule Surrealix do Show all currently registered live queries (SQL) """ defdelegate all_live_queries(pid), to: Api + defdelegate reset_live_queries(pid), to: Socket + defdelegate set_connected(pid, value), to: Socket @doc """ Convenience method, that combines sending an query (live_query) and registering a callback diff --git a/lib/surrealix/patiently.ex b/lib/surrealix/patiently.ex new file mode 100644 index 0000000..5e21984 --- /dev/null +++ b/lib/surrealix/patiently.ex @@ -0,0 +1,134 @@ +defmodule Surrealix.Patiently do + alias Surrealix.Patiently + ## https://github.com/dantswain/patiently/blob/main/lib/patiently.ex + @moduledoc false + @type iteration :: (-> term) + @type reducer :: (term -> term) + @type predicate :: (term -> boolean) + @type condition :: (-> boolean) + @type opt :: {:dwell, pos_integer} | {:max_tries, pos_integer} + @type opts :: [opt] + + defmodule GaveUp do + @moduledoc """ + Exception raised by Patiently when a condition fails to converge + """ + + defexception message: nil + @type t :: %__MODULE__{__exception__: true} + + @doc false + @spec exception({pos_integer, pos_integer}) :: t + def exception({dwell, max_tries}) do + message = + "Gave up waiting for condition after #{max_tries} " <> + "iterations waiting #{dwell} msec between tries." + + %Patiently.GaveUp{message: message} + end + end + + @default_dwell 100 + @default_tries 10 + + @spec wait_for(condition, opts) :: :ok | :error + def wait_for(condition, opts \\ []) do + wait_while(condition, & &1, opts) + end + + @spec wait_for!(condition, opts) :: :ok | no_return + def wait_for!(condition, opts \\ []) do + ok_or_raise(wait_for(condition, opts), opts) + end + + @spec wait_for(iteration, predicate, opts) :: :ok | :error + def wait_for(iteration, condition, opts) do + wait_while(iteration, condition, opts) + end + + @spec wait_for!(iteration, predicate, opts) :: :ok | no_return + def wait_for!(iteration, condition, opts) do + ok_or_raise(wait_for(iteration, condition, opts), opts) + end + + @spec wait_reduce(reducer, predicate, term, opts) :: {:ok, term} | {:error, term} + def wait_reduce(reducer, predicate, acc0, opts) do + wait_reduce_loop(reducer, predicate, acc0, 0, opts) + end + + @spec wait_reduce!(reducer, predicate, term, opts) :: {:ok, term} | no_return + def wait_reduce!(reducer, predicate, acc0, opts) do + ok_or_raise(wait_reduce_loop(reducer, predicate, acc0, 0, opts), opts) + end + + @spec wait_flatten(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | {:error, [term]} + def wait_flatten(iteration, predicate, opts \\ []) + + def wait_flatten(iteration, min_length, opts) when is_integer(min_length) and min_length > 0 do + wait_flatten(iteration, fn acc -> length(acc) >= min_length end, opts) + end + + def wait_flatten(iteration, predicate, opts) when is_function(predicate, 1) do + reducer = fn acc -> List.flatten([iteration.() | acc]) end + wait_reduce_loop(reducer, predicate, [], 0, opts) + end + + @spec wait_flatten!(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | no_return + def wait_flatten!(iteration, predicate_or_min_length, opts) do + ok_or_raise(wait_flatten(iteration, predicate_or_min_length, opts), opts) + end + + @spec wait_for_death(pid, opts) :: :ok | :error + def wait_for_death(pid, opts \\ []) do + wait_for(fn -> !Process.alive?(pid) end, opts) + end + + @spec wait_for_death!(pid, opts) :: :ok | no_return + def wait_for_death!(pid, opts \\ []) do + ok_or_raise(wait_for_death(pid, opts), opts) + end + + defp ok_or_raise(:ok, _), do: :ok + defp ok_or_raise({:ok, acc}, _), do: {:ok, acc} + + defp ok_or_raise(:error, opts) do + raise Patiently.GaveUp, {dwell(opts), max_tries(opts)} + end + + defp ok_or_raise({:error, _}, opts) do + raise Patiently.GaveUp, {dwell(opts), max_tries(opts)} + end + + defp just_status({:ok, _}), do: :ok + defp just_status({:error, _}), do: :error + + defp wait_while(poller, condition, opts) do + reducer = fn acc -> [poller.() | acc] end + predicate = fn [most_recent | _] -> condition.(most_recent) end + ok_or_err = wait_reduce_loop(reducer, predicate, [], 0, opts) + just_status(ok_or_err) + end + + defp wait_reduce_loop(reducer, predicate, acc, tries, opts) do + acc_out = reducer.(acc) + + if predicate.(acc_out) do + {:ok, acc_out} + else + if tries >= max_tries(opts) do + {:error, acc_out} + else + :timer.sleep(dwell(opts)) + wait_reduce_loop(reducer, predicate, acc_out, tries + 1, opts) + end + end + end + + defp dwell(opts) do + Keyword.get(opts, :dwell, @default_dwell) + end + + defp max_tries(opts) do + Keyword.get(opts, :max_tries, @default_tries) + end +end diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index 1760809..e50b64a 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -30,7 +30,17 @@ defmodule Surrealix.RescueProcess do @impl true def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, state) do if(!is_nil(socket_state.on_connect)) do + ## this is to login / and pick ns&db socket_state.on_connect.(socket_pid, socket_state) + # now reconnect Live queries + queries = SocketState.all_lq(socket_state) + Surrealix.reset_live_queries(socket_pid) |> IO.inspect(label: :reset_live_queries) + + for {sql, callback} <- queries do + Surrealix.live_query(socket_pid, sql, callback) + end + + Surrealix.set_connected(socket_pid, true) end {:noreply, state} diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index d9e98e7..365f3e5 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -45,6 +45,14 @@ defmodule Surrealix.Socket do :ok end + def set_connected(pid, value) do + WebSockex.cast(pid, {:set_connected, value}) + end + + def reset_live_queries(pid) do + WebSockex.cast(pid, {:reset_live_queries}) + end + def terminate(reason, state) do Logger.debug("Socket terminating:\n#{inspect(reason)}\n\n#{inspect(state)}\n") exit(:normal) @@ -75,6 +83,16 @@ defmodule Surrealix.Socket do {:ok, state} end + def handle_cast({:reset_live_queries}, state) do + state = SocketState.reset_lq(state) + {:ok, state} + end + + def handle_cast({:set_connected, value}, state) do + state = SocketState.set_connected(state, value) + {:ok, state} + end + def handle_cast({method, args, id, task}, state) do Logger.debug("[surrealix] [handle_cast] #{inspect(state)}") payload = Api.build_cast_payload(method, args, id) diff --git a/lib/surrealix/socket_state.ex b/lib/surrealix/socket_state.ex index 9de522b..6d71f2b 100644 --- a/lib/surrealix/socket_state.ex +++ b/lib/surrealix/socket_state.ex @@ -13,11 +13,16 @@ defmodule Surrealix.SocketState do defstruct pending: %{}, lq_running: %{}, lq_sql: MapSet.new(), + connected: false, on_connect: nil def new(), do: %SocketState{} def new(on_connect), do: %SocketState{on_connect: on_connect} + def set_connected(state = %SocketState{}, value) do + put_in(state, [:connected], value) + end + @doc """ Register a task for a particular request ID """ @@ -92,6 +97,10 @@ defmodule Surrealix.SocketState do end end + def reset_lq(state = %SocketState{}) do + Map.put(state, :lq_running, %{}) |> Map.put(:lq_sql, MapSet.new()) + end + @doc """ Currently registered LiveQueries """ From d00a19e7979e8b1ad1b0d5beeadaed99890e4eaa Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 13:42:15 +0100 Subject: [PATCH 11/19] Feat: consistent debug logging for Socket module --- lib/surrealix/socket.ex | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index 365f3e5..b675ed1 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -54,22 +54,23 @@ defmodule Surrealix.Socket do end def terminate(reason, state) do - Logger.debug("Socket terminating:\n#{inspect(reason)}\n\n#{inspect(state)}\n") + debug("terminate", reason: reason, state: state) exit(:normal) end def handle_disconnect(connection_status_map, state) do - IO.inspect(%{status: connection_status_map}, label: "DISCONNECT") attempt_number = connection_status_map.attempt_number to_sleep = attempt_number * 20 - IO.puts("******** SLEEPING FOR #{to_sleep}ms...") + debug("handle_disconnect", status: connection_status_map) + debug("handle_disconnect", "******** SLEEPING FOR #{to_sleep}ms...") Process.sleep(to_sleep) + {:reconnect, state} end def handle_connect(conn, state = %SocketState{}) do - IO.inspect(%{state: state, conn: conn}, label: "CONNECT") + debug("handle_connect", state: state, conn: conn) if(state.on_connect) do RescueProcess.execute_callback({self(), state}) @@ -94,7 +95,7 @@ defmodule Surrealix.Socket do end def handle_cast({method, args, id, task}, state) do - Logger.debug("[surrealix] [handle_cast] #{inspect(state)}") + debug("handle_cast", state) payload = Api.build_cast_payload(method, args, id) state = SocketState.add_task(state, id, task) frame = {:text, payload} @@ -153,4 +154,8 @@ defmodule Surrealix.Socket do Telemetry.stop(:exec_method, start_time, meta) res end + + defp debug(area, data) do + Logger.debug("[surrealix] [#{area}] #{inspect(data)}") + end end From 6f769d757ed8e1dd51c37d431df256c7fd44b998 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 13:43:21 +0100 Subject: [PATCH 12/19] Chore: cleanup on_connect handling --- lib/sand.ex | 28 +--------------------------- lib/surrealix/rescue_process.ex | 3 ++- 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/lib/sand.ex b/lib/sand.ex index f4347c4..b542611 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -1,24 +1,10 @@ -# @callback handle_connect(conn :: WebSockex.Conn.t(), state :: term) :: {:ok, new_state :: term} -# @callback handle_disconnect(connection_status_map, state :: term) :: -# {:ok, new_state} -# | {:reconnect, new_state} -# | {:reconnect, new_conn :: WebSockex.Conn.t(), new_state} -# when new_state: term -# @callback terminate(close_reason, state :: term) :: any - defmodule Sand do def run do - # {:ok, pid} = Surrealix.start(namespace: "test", database: "test", debug: [:trace]) {:ok, pid} = Surrealix.start( - namespace: "test", - database: "test", on_connect: fn pid, _state -> - Surrealix.set_connected(pid, false) - IO.puts("GOT PID: #{inspect(pid)}") - IO.puts("SIGNIN...") + IO.puts("PID: #{inspect(pid)}") Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) - IO.puts("USE...") Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) end ) @@ -32,17 +18,5 @@ defmodule Sand do Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id -> IO.inspect({data, query_id}, label: "callback") end) - - # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> - # IO.inspect({data, query_id}, label: "callback") - # end) - - # Surrealix.query(pid, "select * from user") - - # Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> - # IO.inspect({data, query_id}, label: "callback") - # end) - - # Surrealix.stop(pid) end end diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index e50b64a..0373bb9 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -1,6 +1,6 @@ defmodule Surrealix.RescueProcess do @moduledoc """ - This module is reponsible to execute callbacks for on_connect / re-connect hooks. + This module is reponsible to execute callbacks for on-connect / re-connect hooks. Since we usually like to prepare the websocket connections by executing some further commands on it, this blocks the websockex loop if we try it directly in the `handle_connect` callback. @@ -30,6 +30,7 @@ defmodule Surrealix.RescueProcess do @impl true def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, state) do if(!is_nil(socket_state.on_connect)) do + Surrealix.set_connected(socket_pid, false) ## this is to login / and pick ns&db socket_state.on_connect.(socket_pid, socket_state) # now reconnect Live queries From d742b91fa32554b27ba07c0da6c8d408e40c56e3 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 14:13:16 +0100 Subject: [PATCH 13/19] Feat: cleanup naming / docs / formatting --- gen/src/ApiGenerator.ts | 4 +- lib/sand.ex | 4 +- lib/surrealix.ex | 3 +- lib/surrealix/api.ex | 4 +- lib/surrealix/rescue_process.ex | 19 ++--- lib/surrealix/socket.ex | 114 ++++++++++++++-------------- lib/surrealix/socket_state.ex | 26 ++++--- lib/surrealix/socket_state_test.exs | 50 ++++++------ 8 files changed, 118 insertions(+), 106 deletions(-) diff --git a/gen/src/ApiGenerator.ts b/gen/src/ApiGenerator.ts index 79ad620..6ad53ca 100644 --- a/gen/src/ApiGenerator.ts +++ b/gen/src/ApiGenerator.ts @@ -79,7 +79,7 @@ export class ApiGenerator extends GenBase { Show all currently registered live queries (SQL) """ def all_live_queries(pid) do - :sys.get_state(pid) |> SocketState.all_lq() + :sys.get_state(pid) |> SocketState.all_live_queries() end @doc """ @@ -95,7 +95,7 @@ export class ApiGenerator extends GenBase { with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)}, {:ok, res} <- query(pid, sql, vars), %{"result" => [%{"result" => lq_id}]} <- res do - :ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback}) + :ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback}) {:ok, res} else {:sql_live_check, false} -> {:error, "Not a live query: \`#{sql}\`!"} diff --git a/lib/sand.ex b/lib/sand.ex index b542611..2379396 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -9,8 +9,10 @@ defmodule Sand do end ) - Surrealix.Patiently.wait_for(fn -> :sys.get_state(pid) |> Map.get(:connected) == true end) + # blocks until the `on_connect` callback is executed + Surrealix.wait_until_crud_ready(pid) + # now we can execute normal "CRUD" queries Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> IO.inspect({data, query_id}, label: "callback") end) diff --git a/lib/surrealix.ex b/lib/surrealix.ex index 2dd13f4..5ea5d1c 100644 --- a/lib/surrealix.ex +++ b/lib/surrealix.ex @@ -17,7 +17,8 @@ defmodule Surrealix do """ defdelegate all_live_queries(pid), to: Api defdelegate reset_live_queries(pid), to: Socket - defdelegate set_connected(pid, value), to: Socket + defdelegate set_crud_ready(pid, value), to: Socket + defdelegate wait_until_crud_ready(pid), to: Socket @doc """ Convenience method, that combines sending an query (live_query) and registering a callback diff --git a/lib/surrealix/api.ex b/lib/surrealix/api.ex index d51d84a..279acf8 100644 --- a/lib/surrealix/api.ex +++ b/lib/surrealix/api.ex @@ -18,7 +18,7 @@ defmodule Surrealix.Api do Show all currently registered live queries (SQL) """ def all_live_queries(pid) do - :sys.get_state(pid) |> SocketState.all_lq() + :sys.get_state(pid) |> SocketState.all_live_queries() end @doc """ @@ -34,7 +34,7 @@ defmodule Surrealix.Api do with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)}, {:ok, res} <- query(pid, sql, vars), %{"result" => [%{"result" => lq_id}]} <- res do - :ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback}) + :ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback}) {:ok, res} else {:sql_live_check, false} -> {:error, "Not a live query: `#{sql}`!"} diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index 0373bb9..312c9a3 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -24,26 +24,27 @@ defmodule Surrealix.RescueProcess do @impl true def init([]) do - {:ok, %{}} + {:ok, []} end @impl true - def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, state) do + def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do if(!is_nil(socket_state.on_connect)) do - Surrealix.set_connected(socket_pid, false) - ## this is to login / and pick ns&db + # mark as not ready to execute normal CRUD queries + Surrealix.set_crud_ready(socket_pid, false) + # on_connect callback used to login / and pick NS / DB socket_state.on_connect.(socket_pid, socket_state) - # now reconnect Live queries - queries = SocketState.all_lq(socket_state) - Surrealix.reset_live_queries(socket_pid) |> IO.inspect(label: :reset_live_queries) + # now reconnect live queries + queries = SocketState.all_live_queries(socket_state) + Surrealix.reset_live_queries(socket_pid) for {sql, callback} <- queries do Surrealix.live_query(socket_pid, sql, callback) end - Surrealix.set_connected(socket_pid, true) + Surrealix.set_crud_ready(socket_pid, true) end - {:noreply, state} + {:noreply, []} end end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index b675ed1..360d4f6 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -7,6 +7,7 @@ defmodule Surrealix.Socket do alias Surrealix.Api alias Surrealix.Config + alias Surrealix.Patiently alias Surrealix.RescueProcess alias Surrealix.SocketState alias Surrealix.Telemetry @@ -15,7 +16,6 @@ defmodule Surrealix.Socket do require Logger @type base_connection_opts :: Config.socket_opts() - @type on_start :: {:ok, pid} | {:error, term} @spec start(Config.socket_opts()) :: on_start() @@ -30,9 +30,9 @@ defmodule Surrealix.Socket do defp generic_start(opts, fun_name) when fun_name in [:start, :start_link] do opts = Keyword.merge(Config.base_conn_opts(), opts) - on_connect = Keyword.get(opts, :on_connect) - hostname = Keyword.get(opts, :hostname) port = Keyword.get(opts, :port) + hostname = Keyword.get(opts, :hostname) + on_connect = Keyword.get(opts, :on_connect) state = SocketState.new(on_connect) url = "ws://#{hostname}:#{port}/rpc" @@ -45,8 +45,12 @@ defmodule Surrealix.Socket do :ok end - def set_connected(pid, value) do - WebSockex.cast(pid, {:set_connected, value}) + def wait_until_crud_ready(pid) do + Patiently.wait_for(fn -> SocketState.is_crud_ready(:sys.get_state(pid)) end) + end + + def set_crud_ready(pid, value) do + WebSockex.cast(pid, {:set_crud_ready, value}) end def reset_live_queries(pid) do @@ -58,17 +62,41 @@ defmodule Surrealix.Socket do exit(:normal) end - def handle_disconnect(connection_status_map, state) do - attempt_number = connection_status_map.attempt_number - to_sleep = attempt_number * 20 + def exec_method(pid, {method, args, task}, opts \\ []) do + start_time = System.monotonic_time() + meta = %{method: method, args: args} + Telemetry.start(:exec_method, meta) + id = Util.uuid(40) - debug("handle_disconnect", status: connection_status_map) - debug("handle_disconnect", "******** SLEEPING FOR #{to_sleep}ms...") - Process.sleep(to_sleep) + task = + if !is_nil(task), + do: task, + else: + Task.async(fn -> + receive do + {:ok, msg, ^id} -> + if is_map(msg) and Map.has_key?(msg, "error"), do: {:error, msg}, else: {:ok, msg} - {:reconnect, state} + {:error, reason} -> + {:error, reason} + + e -> + {:error, "Unknown Error #{inspect(e)}"} + end + end) + + WebSockex.cast(pid, {method, args, id, task}) + + task_timeout = Keyword.get(opts, :timeout, Config.default_timeout()) + res = Task.await(task, task_timeout) + Telemetry.stop(:exec_method, start_time, meta) + res end + #################################### + # CALLBACKS + #################################### + def handle_connect(conn, state = %SocketState{}) do debug("handle_connect", state: state, conn: conn) @@ -79,27 +107,34 @@ defmodule Surrealix.Socket do {:ok, state} end - def handle_cast({:register_lq, sql, query_id, callback}, state) do - state = SocketState.add_lq(state, sql, query_id, callback) - {:ok, state} + def handle_disconnect(connection_status_map, state) do + attempt_number = connection_status_map.attempt_number + to_sleep = attempt_number * 20 + + debug("handle_disconnect", status: connection_status_map) + debug("handle_disconnect", "******** SLEEPING FOR #{to_sleep}ms...") + Process.sleep(to_sleep) + + {:reconnect, state} + end + + def handle_cast({:register_live_query, sql, query_id, callback}, state) do + {:ok, SocketState.register_live_query(state, sql, query_id, callback)} end def handle_cast({:reset_live_queries}, state) do - state = SocketState.reset_lq(state) - {:ok, state} + {:ok, SocketState.reset_live_queries(state)} end - def handle_cast({:set_connected, value}, state) do - state = SocketState.set_connected(state, value) - {:ok, state} + def handle_cast({:set_crud_ready, value}, state) do + {:ok, SocketState.set_crud_ready(state, value)} end def handle_cast({method, args, id, task}, state) do debug("handle_cast", state) payload = Api.build_cast_payload(method, args, id) - state = SocketState.add_task(state, id, task) - frame = {:text, payload} - {:reply, frame, state} + + {:reply, {:text, payload}, SocketState.register_task(state, id, task)} end def handle_frame({_type, msg}, state) do @@ -110,7 +145,7 @@ defmodule Surrealix.Socket do if is_nil(task) do # No registered task for this ID, must be a live query update lq_id = get_in(json, ["result", "id"]) - lq_item = SocketState.get_lq(state, lq_id) + lq_item = SocketState.get_live_query(state, lq_id) if(!is_nil(lq_item)) do lq_item.callback.(json, lq_id) @@ -124,37 +159,6 @@ defmodule Surrealix.Socket do {:ok, SocketState.delete_task(state, id)} end - def exec_method(pid, {method, args, task}, opts \\ []) do - start_time = System.monotonic_time() - meta = %{method: method, args: args} - Telemetry.start(:exec_method, meta) - id = Util.uuid(40) - - task = - if !is_nil(task), - do: task, - else: - Task.async(fn -> - receive do - {:ok, msg, ^id} -> - if is_map(msg) and Map.has_key?(msg, "error"), do: {:error, msg}, else: {:ok, msg} - - {:error, reason} -> - {:error, reason} - - e -> - {:error, "Unknown Error #{inspect(e)}"} - end - end) - - WebSockex.cast(pid, {method, args, id, task}) - - task_timeout = Keyword.get(opts, :timeout, Config.default_timeout()) - res = Task.await(task, task_timeout) - Telemetry.stop(:exec_method, start_time, meta) - res - end - defp debug(area, data) do Logger.debug("[surrealix] [#{area}] #{inspect(data)}") end diff --git a/lib/surrealix/socket_state.ex b/lib/surrealix/socket_state.ex index 6d71f2b..32ca645 100644 --- a/lib/surrealix/socket_state.ex +++ b/lib/surrealix/socket_state.ex @@ -8,25 +8,29 @@ defmodule Surrealix.SocketState do @doc """ - `pending`: Pending requests map: id => task - `lq_running`: live_queries map: id => %{sql: sql} - - `lq_sql`: live_queries SET with queries to register after re-connection + - `lq_sql`: live_queries MapSet with queries to register after re-connection """ defstruct pending: %{}, lq_running: %{}, lq_sql: MapSet.new(), - connected: false, + crud_ready: false, on_connect: nil def new(), do: %SocketState{} def new(on_connect), do: %SocketState{on_connect: on_connect} - def set_connected(state = %SocketState{}, value) do - put_in(state, [:connected], value) + def set_crud_ready(state = %SocketState{}, value) do + put_in(state, [:crud_ready], value) + end + + def is_crud_ready(state = %SocketState{}) do + state.crud_ready == true end @doc """ Register a task for a particular request ID """ - def add_task(state = %SocketState{}, id, task) do + def register_task(state = %SocketState{}, id, task) do put_in(state, [:pending, id], task) end @@ -48,7 +52,7 @@ defmodule Surrealix.SocketState do @doc """ Register a SQL statement for a particular LiveQuery ID """ - def add_lq(state = %SocketState{}, sql, query_id, callback) do + def register_live_query(state = %SocketState{}, sql, query_id, callback) do lq_sql = MapSet.put(state.lq_sql, {sql, callback}) item = %{sql: sql, query_id: query_id, callback: callback} @@ -60,7 +64,7 @@ defmodule Surrealix.SocketState do @doc """ Get map that describes a particular LiveQuery ID (SQL / ID / etc) """ - def get_lq(state = %SocketState{}, query_id) do + def get_live_query(state = %SocketState{}, query_id) do state |> get_in([:lq_running, query_id]) end @@ -68,7 +72,7 @@ defmodule Surrealix.SocketState do @doc """ Remove a LiveQuery by ID """ - def delete_lq_by_id(state = %SocketState{}, query_id) do + def delete_live_query_by_id(state = %SocketState{}, query_id) do {item, state} = pop_in(state, [:lq_running, query_id]) if item do @@ -84,7 +88,7 @@ defmodule Surrealix.SocketState do @doc """ Remove a LiveQuery by SQL """ - def delete_lq_by_sql(state = %SocketState{}, sql) do + def delete_live_query_by_sql(state = %SocketState{}, sql) do found = Enum.find(state.lq_running, fn {_id, value} -> Map.get(value, :sql) == sql end) if !is_nil(found) do @@ -97,14 +101,14 @@ defmodule Surrealix.SocketState do end end - def reset_lq(state = %SocketState{}) do + def reset_live_queries(state = %SocketState{}) do Map.put(state, :lq_running, %{}) |> Map.put(:lq_sql, MapSet.new()) end @doc """ Currently registered LiveQueries """ - def all_lq(state = %SocketState{}) do + def all_live_queries(state = %SocketState{}) do state.lq_sql |> MapSet.to_list() end end diff --git a/lib/surrealix/socket_state_test.exs b/lib/surrealix/socket_state_test.exs index f692917..bd37e07 100644 --- a/lib/surrealix/socket_state_test.exs +++ b/lib/surrealix/socket_state_test.exs @@ -6,9 +6,9 @@ defmodule Surrealix.SocketStateTest do def dummy_callback(), do: fn -> nil end describe "tasks" do - test "add_task / get_task / delete_task" do + test "register_task / get_task / delete_task" do state = SocketState.new() - state = state |> SocketState.add_task("111", :task1) + state = SocketState.register_task(state, "111", :task1) assert SocketState.get_task(state, "111") == :task1 state = SocketState.delete_task(state, "111") @@ -17,69 +17,69 @@ defmodule Surrealix.SocketStateTest do test "delete accepts non-existing ids" do state = SocketState.new() - state = state |> SocketState.add_task("111", :task1) + state = SocketState.register_task(state, "111", :task1) state = SocketState.delete_task(state, "1111") assert SocketState.get_task(state, "111") == :task1 end end describe "lq" do - test "add_lq / get_lq" do + test "register_live_query / get_live_query" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) - assert SocketState.get_lq(state, "11-22") == %{ + assert SocketState.get_live_query(state, "11-22") == %{ callback: cb, query_id: "11-22", sql: "select * from person" } end - test "all_lq" do + test "all_live_queries" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) - state = state |> SocketState.add_lq("select * from user", "11-23", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from user", "11-23", cb) - assert SocketState.all_lq(state) == [ + assert SocketState.all_live_queries(state) == [ {"select * from person", cb}, {"select * from user", cb} ] end - test "delete_lq_by_id" do + test "delete_live_query_by_id" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) - state = state |> SocketState.add_lq("select * from user", "11-23", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from user", "11-23", cb) - assert SocketState.all_lq(state) == [ + assert SocketState.all_live_queries(state) == [ {"select * from person", cb}, {"select * from user", cb} ] - state = state |> SocketState.delete_lq_by_id("11-23") - assert SocketState.all_lq(state) == [{"select * from person", cb}] - assert SocketState.get_lq(state, "11-23") == nil + state = SocketState.delete_live_query_by_id(state, "11-23") + assert SocketState.all_live_queries(state) == [{"select * from person", cb}] + assert SocketState.get_live_query(state, "11-23") == nil end - test "delete_lq_by_sql" do + test "delete_live_query_by_sql" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) - state = state |> SocketState.add_lq("select * from user", "11-23", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from user", "11-23", cb) - assert SocketState.all_lq(state) == [ + assert SocketState.all_live_queries(state) == [ {"select * from person", cb}, {"select * from user", cb} ] - state = SocketState.delete_lq_by_sql(state, "select * from person") - assert SocketState.all_lq(state) == [{"select * from user", cb}] - assert SocketState.get_lq(state, "11-22") == nil + state = SocketState.delete_live_query_by_sql(state, "select * from person") + assert SocketState.all_live_queries(state) == [{"select * from user", cb}] + assert SocketState.get_live_query(state, "11-22") == nil - assert SocketState.get_lq(state, "11-23") == %{ + assert SocketState.get_live_query(state, "11-23") == %{ query_id: "11-23", sql: "select * from user", callback: cb From eb58368fbd92d307bdc578044b316bc05ba7e2f1 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 14:46:58 +0100 Subject: [PATCH 14/19] Chore: make connection backoff configurable --- CHANGELOG.md | 4 +++- README.md | 5 +++-- lib/surrealix/config.ex | 5 +++++ lib/surrealix/socket.ex | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e7dfa0c..ca81269 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ -## v0.1.5 (xxx) +## v0.1.5 (2023-11-12) - graceful reconnection handing +- ability to register on-connect callbacks +- ability to configure max backoff / step duration for connection retries ## v0.1.4 (2023-11-11) diff --git a/README.md b/README.md index 0af87c7..f135813 100644 --- a/README.md +++ b/README.md @@ -70,8 +70,9 @@ Surrealix.Telemetry.Logger.setup() ```elixir ## in config.exs / runtime.exs file -# default 5000 -config :surrealix, timeout: :infinity +config :surrealix, backoff_max: 2000 +config :surrealix, backoff_step: 50 +config :surrealix, timeout: :infinity # default 5000 config :surrealix, :conn, hostname: "0.0.0.0", port: 8000 diff --git a/lib/surrealix/config.ex b/lib/surrealix/config.ex index 14f7534..e17bdbb 100644 --- a/lib/surrealix/config.ex +++ b/lib/surrealix/config.ex @@ -23,4 +23,9 @@ defmodule Surrealix.Config do def default_timeout, do: @timeout def task_opts_default, do: [timeout: @timeout] + + @backoff_max Application.compile_env(:surrealix, :backoff_max, 2000) + @backoff_step Application.compile_env(:surrealix, :backoff_step, 50) + def backoff_max, do: @backoff_max + def backoff_step, do: @backoff_step end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index 360d4f6..1ba0a63 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -109,7 +109,7 @@ defmodule Surrealix.Socket do def handle_disconnect(connection_status_map, state) do attempt_number = connection_status_map.attempt_number - to_sleep = attempt_number * 20 + to_sleep = min(Config.backoff_max(), attempt_number * Config.backoff_step()) debug("handle_disconnect", status: connection_status_map) debug("handle_disconnect", "******** SLEEPING FOR #{to_sleep}ms...") From c92df93c806df235cfd51491ce02f3822b36f6d5 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 14:57:27 +0100 Subject: [PATCH 15/19] Feat: documentation for on_connect callbacks --- README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/README.md b/README.md index f135813..2cb2c99 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,37 @@ end) Surrealix.all_live_queries(pid) ``` +## Handling reconnection + +To properly deal with connection drops, provide an `on_connect`-callback when starting the a Surrealix Socket. Usually on_connect callbacks has logic to authenticate the connection and select a namespace / database. + +This callback is called in a non-blocking fashion, so it's important to wait until the connection is ready for further use. This is done via `Surrealix.wait_until_crud_ready(pid)` function, that implements busy-waiting intil auth for connection is finished. + +Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function, are registed on the SocketState and will be re-established after a successful reconnection. + +```elixir +{:ok, pid} = + Surrealix.start( + on_connect: fn pid, _state -> + IO.puts("PID: #{inspect(pid)}") + Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) + Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) + end + ) + +# blocks until the `on_connect` callback is executed +Surrealix.wait_until_crud_ready(pid) + +# now we can execute queries, that require auth +Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") +end) + +Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") +end) +``` + ## Telemetry Currently library publishes only 3 events: ```elixir From f1831b4d94914cd179fb7b5876928e25316734ec Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 14:58:51 +0100 Subject: [PATCH 16/19] Chore: rename `on_connect` to `on_auth` callback --- README.md | 6 +++--- lib/sand.ex | 4 ++-- lib/surrealix/rescue_process.ex | 6 +++--- lib/surrealix/socket.ex | 6 +++--- lib/surrealix/socket_state.ex | 4 ++-- 5 files changed, 13 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 2cb2c99..2676e73 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ Surrealix.all_live_queries(pid) ## Handling reconnection -To properly deal with connection drops, provide an `on_connect`-callback when starting the a Surrealix Socket. Usually on_connect callbacks has logic to authenticate the connection and select a namespace / database. +To properly deal with connection drops, provide an `on_auth`-callback when starting the a Surrealix Socket. Usually on_auth callbacks has logic to authenticate the connection and select a namespace / database. This callback is called in a non-blocking fashion, so it's important to wait until the connection is ready for further use. This is done via `Surrealix.wait_until_crud_ready(pid)` function, that implements busy-waiting intil auth for connection is finished. @@ -58,14 +58,14 @@ Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` func ```elixir {:ok, pid} = Surrealix.start( - on_connect: fn pid, _state -> + on_auth: fn pid, _state -> IO.puts("PID: #{inspect(pid)}") Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) end ) -# blocks until the `on_connect` callback is executed +# blocks until the `on_auth` callback is executed Surrealix.wait_until_crud_ready(pid) # now we can execute queries, that require auth diff --git a/lib/sand.ex b/lib/sand.ex index 2379396..10e4f23 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -2,14 +2,14 @@ defmodule Sand do def run do {:ok, pid} = Surrealix.start( - on_connect: fn pid, _state -> + on_auth: fn pid, _state -> IO.puts("PID: #{inspect(pid)}") Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) end ) - # blocks until the `on_connect` callback is executed + # blocks until the `on_auth` callback is executed Surrealix.wait_until_crud_ready(pid) # now we can execute normal "CRUD" queries diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index 312c9a3..272132b 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -29,11 +29,11 @@ defmodule Surrealix.RescueProcess do @impl true def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do - if(!is_nil(socket_state.on_connect)) do + if(!is_nil(socket_state.on_auth)) do # mark as not ready to execute normal CRUD queries Surrealix.set_crud_ready(socket_pid, false) - # on_connect callback used to login / and pick NS / DB - socket_state.on_connect.(socket_pid, socket_state) + # on_auth callback used to login / and pick NS / DB + socket_state.on_auth.(socket_pid, socket_state) # now reconnect live queries queries = SocketState.all_live_queries(socket_state) Surrealix.reset_live_queries(socket_pid) diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index 1ba0a63..bae3aab 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -32,9 +32,9 @@ defmodule Surrealix.Socket do opts = Keyword.merge(Config.base_conn_opts(), opts) port = Keyword.get(opts, :port) hostname = Keyword.get(opts, :hostname) - on_connect = Keyword.get(opts, :on_connect) + on_auth = Keyword.get(opts, :on_auth) - state = SocketState.new(on_connect) + state = SocketState.new(on_auth) url = "ws://#{hostname}:#{port}/rpc" apply(WebSockex, fun_name, [url, __MODULE__, state, opts]) end @@ -100,7 +100,7 @@ defmodule Surrealix.Socket do def handle_connect(conn, state = %SocketState{}) do debug("handle_connect", state: state, conn: conn) - if(state.on_connect) do + if(state.on_auth) do RescueProcess.execute_callback({self(), state}) end diff --git a/lib/surrealix/socket_state.ex b/lib/surrealix/socket_state.ex index 32ca645..e184d20 100644 --- a/lib/surrealix/socket_state.ex +++ b/lib/surrealix/socket_state.ex @@ -14,10 +14,10 @@ defmodule Surrealix.SocketState do lq_running: %{}, lq_sql: MapSet.new(), crud_ready: false, - on_connect: nil + on_auth: nil def new(), do: %SocketState{} - def new(on_connect), do: %SocketState{on_connect: on_connect} + def new(on_auth), do: %SocketState{on_auth: on_auth} def set_crud_ready(state = %SocketState{}, value) do put_in(state, [:crud_ready], value) From 359e471a48174f77873f637c7a39ecbdcfd0f105 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 15:18:12 +0100 Subject: [PATCH 17/19] Chore: rename is_crud_ready to is_auth_ready --- README.md | 4 ++-- lib/sand.ex | 2 +- lib/surrealix.ex | 4 ++-- lib/surrealix/rescue_process.ex | 4 ++-- lib/surrealix/socket.ex | 12 ++++++------ lib/surrealix/socket_state.ex | 10 +++++----- 6 files changed, 18 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 2676e73..5f8400a 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Surrealix.all_live_queries(pid) To properly deal with connection drops, provide an `on_auth`-callback when starting the a Surrealix Socket. Usually on_auth callbacks has logic to authenticate the connection and select a namespace / database. -This callback is called in a non-blocking fashion, so it's important to wait until the connection is ready for further use. This is done via `Surrealix.wait_until_crud_ready(pid)` function, that implements busy-waiting intil auth for connection is finished. +This callback is called in a non-blocking fashion, so it's important to wait until the connection is ready for further use. This is done via `Surrealix.wait_until_auth_ready(pid)` function, that implements busy-waiting intil auth for connection is finished. Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function, are registed on the SocketState and will be re-established after a successful reconnection. @@ -66,7 +66,7 @@ Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` func ) # blocks until the `on_auth` callback is executed -Surrealix.wait_until_crud_ready(pid) +Surrealix.wait_until_auth_ready(pid) # now we can execute queries, that require auth Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> diff --git a/lib/sand.ex b/lib/sand.ex index 10e4f23..dda7084 100644 --- a/lib/sand.ex +++ b/lib/sand.ex @@ -10,7 +10,7 @@ defmodule Sand do ) # blocks until the `on_auth` callback is executed - Surrealix.wait_until_crud_ready(pid) + Surrealix.wait_until_auth_ready(pid) # now we can execute normal "CRUD" queries Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> diff --git a/lib/surrealix.ex b/lib/surrealix.ex index 5ea5d1c..5adf909 100644 --- a/lib/surrealix.ex +++ b/lib/surrealix.ex @@ -17,8 +17,8 @@ defmodule Surrealix do """ defdelegate all_live_queries(pid), to: Api defdelegate reset_live_queries(pid), to: Socket - defdelegate set_crud_ready(pid, value), to: Socket - defdelegate wait_until_crud_ready(pid), to: Socket + defdelegate set_auth_ready(pid, value), to: Socket + defdelegate wait_until_auth_ready(pid), to: Socket @doc """ Convenience method, that combines sending an query (live_query) and registering a callback diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index 272132b..a05980e 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -31,7 +31,7 @@ defmodule Surrealix.RescueProcess do def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do if(!is_nil(socket_state.on_auth)) do # mark as not ready to execute normal CRUD queries - Surrealix.set_crud_ready(socket_pid, false) + Surrealix.set_auth_ready(socket_pid, false) # on_auth callback used to login / and pick NS / DB socket_state.on_auth.(socket_pid, socket_state) # now reconnect live queries @@ -42,7 +42,7 @@ defmodule Surrealix.RescueProcess do Surrealix.live_query(socket_pid, sql, callback) end - Surrealix.set_crud_ready(socket_pid, true) + Surrealix.set_auth_ready(socket_pid, true) end {:noreply, []} diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index bae3aab..56fd4ea 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -45,12 +45,12 @@ defmodule Surrealix.Socket do :ok end - def wait_until_crud_ready(pid) do - Patiently.wait_for(fn -> SocketState.is_crud_ready(:sys.get_state(pid)) end) + def wait_until_auth_ready(pid) do + Patiently.wait_for(fn -> SocketState.is_auth_ready(:sys.get_state(pid)) end) end - def set_crud_ready(pid, value) do - WebSockex.cast(pid, {:set_crud_ready, value}) + def set_auth_ready(pid, value) do + WebSockex.cast(pid, {:set_auth_ready, value}) end def reset_live_queries(pid) do @@ -126,8 +126,8 @@ defmodule Surrealix.Socket do {:ok, SocketState.reset_live_queries(state)} end - def handle_cast({:set_crud_ready, value}, state) do - {:ok, SocketState.set_crud_ready(state, value)} + def handle_cast({:set_auth_ready, value}, state) do + {:ok, SocketState.set_auth_ready(state, value)} end def handle_cast({method, args, id, task}, state) do diff --git a/lib/surrealix/socket_state.ex b/lib/surrealix/socket_state.ex index e184d20..284c718 100644 --- a/lib/surrealix/socket_state.ex +++ b/lib/surrealix/socket_state.ex @@ -13,18 +13,18 @@ defmodule Surrealix.SocketState do defstruct pending: %{}, lq_running: %{}, lq_sql: MapSet.new(), - crud_ready: false, + auth_ready: false, on_auth: nil def new(), do: %SocketState{} def new(on_auth), do: %SocketState{on_auth: on_auth} - def set_crud_ready(state = %SocketState{}, value) do - put_in(state, [:crud_ready], value) + def set_auth_ready(state = %SocketState{}, value) do + put_in(state, [:auth_ready], value) end - def is_crud_ready(state = %SocketState{}) do - state.crud_ready == true + def is_auth_ready(state = %SocketState{}) do + state.auth_ready == true end @doc """ From ef004d8ce71424f5c8c73c8fe8563a8cfaeabb38 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 15:22:58 +0100 Subject: [PATCH 18/19] Readme wording --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 5f8400a..921f34a 100644 --- a/README.md +++ b/README.md @@ -49,11 +49,11 @@ Surrealix.all_live_queries(pid) ## Handling reconnection -To properly deal with connection drops, provide an `on_auth`-callback when starting the a Surrealix Socket. Usually on_auth callbacks has logic to authenticate the connection and select a namespace / database. +To properly deal with connection drops, provide an `on_auth`-callback when starting a Surrealix Socket. `on_auth` callbacks should include logic to authenticate the connection and select a namespace / database. -This callback is called in a non-blocking fashion, so it's important to wait until the connection is ready for further use. This is done via `Surrealix.wait_until_auth_ready(pid)` function, that implements busy-waiting intil auth for connection is finished. +This callback is called in a non-blocking fashion, so it is important to wait until the `on_auth`-callback is finished. This is done via `Surrealix.wait_until_auth_ready(pid)` function, that checks auth status via busy-waiting. -Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function, are registed on the SocketState and will be re-established after a successful reconnection. +Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function are registed on SocketState and will be re-established after a successful reconnection. ```elixir {:ok, pid} = From f4a9be2d9968090740a947cef37ade59b0bd3b25 Mon Sep 17 00:00:00 2001 From: Roman Heinrich Date: Sun, 12 Nov 2023 15:34:54 +0100 Subject: [PATCH 19/19] Chore: better docs for RescueProcess --- lib/surrealix/rescue_process.ex | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex index a05980e..11da77c 100644 --- a/lib/surrealix/rescue_process.ex +++ b/lib/surrealix/rescue_process.ex @@ -1,13 +1,10 @@ defmodule Surrealix.RescueProcess do @moduledoc """ - This module is reponsible to execute callbacks for on-connect / re-connect hooks. + This module is responsible to execute callbacks for on_auth hooks, that happen after a connection is established. + This can not be done direcly in the `handle_connect` callback, since then it blocks the execution of the WebSockex process. - Since we usually like to prepare the websocket connections by executing some further commands on it, - this blocks the websockex loop if we try it directly in the `handle_connect` callback. - - To workaround this issue, we delegate this responsibility to a `RescueProcess`, that executes - the logic to handle connection/re-connection out-of-band. Also we need to use `GenServer.cast`, so that the - Socket can properly continue and not be deadlocked. + To workaround this issue, we delegate this responsibility to a `RescueProcess`, that executes those callbacks out-of-band. + Also we need to use `GenServer.cast`, so that the Socket can properly continue and not be deadlocked. """ use GenServer alias Surrealix.SocketState @@ -20,7 +17,9 @@ defmodule Surrealix.RescueProcess do GenServer.cast(__MODULE__, {:execute, socket_pid, state}) end + ################# # Callbacks + ################# @impl true def init([]) do @@ -30,18 +29,23 @@ defmodule Surrealix.RescueProcess do @impl true def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do if(!is_nil(socket_state.on_auth)) do - # mark as not ready to execute normal CRUD queries + # set AUTH status to false, so that the busy-waiting does not trigger Surrealix.set_auth_ready(socket_pid, false) + # on_auth callback used to login / and pick NS / DB socket_state.on_auth.(socket_pid, socket_state) + # now reconnect live queries queries = SocketState.all_live_queries(socket_state) + # we need to reset the current state of live queries, since the connection was dead anyways Surrealix.reset_live_queries(socket_pid) + # now we re-establish all live queries, that we very listening to before the connection drop. for {sql, callback} <- queries do Surrealix.live_query(socket_pid, sql, callback) end + # set AUTH status to true, so `Surrealix.wait_until_auth_ready(pid)` unblocks and allows further queries on the authenticated socket. Surrealix.set_auth_ready(socket_pid, true) end