diff --git a/.gitignore b/.gitignore index a2149e1..0b5cd23 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,6 @@ surrealix-*.tar # SurrealDB shell history history.txt + +# SurrealDB data +data.db/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 220944b..ca81269 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v0.1.5 (2023-11-12) + +- graceful reconnection handing +- ability to register on-connect callbacks +- ability to configure max backoff / step duration for connection retries + ## v0.1.4 (2023-11-11) - much simpler handling for live query callbacks (https://github.com/maxohq/surrealix/commit/c87fe9b3853d090cb622a2478595b99a213d7fa9) diff --git a/README.md b/README.md index 0af87c7..921f34a 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,37 @@ end) Surrealix.all_live_queries(pid) ``` +## Handling reconnection + +To properly deal with connection drops, provide an `on_auth`-callback when starting a Surrealix Socket. `on_auth` callbacks should include logic to authenticate the connection and select a namespace / database. + +This callback is called in a non-blocking fashion, so it is important to wait until the `on_auth`-callback is finished. This is done via `Surrealix.wait_until_auth_ready(pid)` function, that checks auth status via busy-waiting. + +Live queries that were setup via `Surrealix.live_query(pid, sql, callback)` function are registed on SocketState and will be re-established after a successful reconnection. + +```elixir +{:ok, pid} = + Surrealix.start( + on_auth: fn pid, _state -> + IO.puts("PID: #{inspect(pid)}") + Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) + Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) + end + ) + +# blocks until the `on_auth` callback is executed +Surrealix.wait_until_auth_ready(pid) + +# now we can execute queries, that require auth +Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") +end) + +Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") +end) +``` + ## Telemetry Currently library publishes only 3 events: ```elixir @@ -70,8 +101,9 @@ Surrealix.Telemetry.Logger.setup() ```elixir ## in config.exs / runtime.exs file -# default 5000 -config :surrealix, timeout: :infinity +config :surrealix, backoff_max: 2000 +config :surrealix, backoff_step: 50 +config :surrealix, timeout: :infinity # default 5000 config :surrealix, :conn, hostname: "0.0.0.0", port: 8000 diff --git a/bin/sur-server.sh b/bin/sur-server.sh index adfc743..e21a4a1 100755 --- a/bin/sur-server.sh +++ b/bin/sur-server.sh @@ -14,5 +14,4 @@ surreal start \ --allow-funcs \ --allow-net \ --bind 0.0.0.0:8000 \ - memory - ## file:mydatabase.db + file:data.db diff --git a/config/config.exs b/config/config.exs index 1dfbf39..9ded11e 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,6 +8,8 @@ config :logger, :console, level: :warning +config :surrealix, :timeout, 5000 + config :surrealix, :conn, hostname: "0.0.0.0", port: 8000 diff --git a/gen/src/ApiGenerator.ts b/gen/src/ApiGenerator.ts index d0e6abd..6ad53ca 100644 --- a/gen/src/ApiGenerator.ts +++ b/gen/src/ApiGenerator.ts @@ -79,7 +79,7 @@ export class ApiGenerator extends GenBase { Show all currently registered live queries (SQL) """ def all_live_queries(pid) do - :sys.get_state(pid) |> SocketState.all_lq() + :sys.get_state(pid) |> SocketState.all_live_queries() end @doc """ @@ -88,16 +88,14 @@ export class ApiGenerator extends GenBase { Params: sql: string vars: map with variables to interpolate into SQL - callback: fn (event, data, config) + callback: fn (data, live_query_id) """ - @spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok + @spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok def live_query(pid, sql, vars \\\\ %{}, callback) do with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)}, - {:ok, res} <- query(pid, sql, vars), - %{"result" => [%{"result" => lq_id}]} <- res do - event = [:live_query, lq_id] - :ok = Surrealix.Dispatch.attach("#{lq_id}_main", event, callback) - :ok = WebSockex.cast(pid, {:register_lq, sql, lq_id}) + {:ok, res} <- query(pid, sql, vars), + %{"result" => [%{"result" => lq_id}]} <- res do + :ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback}) {:ok, res} else {:sql_live_check, false} -> {:error, "Not a live query: \`#{sql}\`!"} diff --git a/lib/sand.ex b/lib/sand.ex new file mode 100644 index 0000000..dda7084 --- /dev/null +++ b/lib/sand.ex @@ -0,0 +1,24 @@ +defmodule Sand do + def run do + {:ok, pid} = + Surrealix.start( + on_auth: fn pid, _state -> + IO.puts("PID: #{inspect(pid)}") + Surrealix.signin(pid, %{user: "root", pass: "root"}) |> IO.inspect(label: :signin) + Surrealix.use(pid, "test", "test") |> IO.inspect(label: :use) + end + ) + + # blocks until the `on_auth` callback is executed + Surrealix.wait_until_auth_ready(pid) + + # now we can execute normal "CRUD" queries + Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") + end) + + Surrealix.live_query(pid, "LIVE SELECT * FROM person;", fn data, query_id -> + IO.inspect({data, query_id}, label: "callback") + end) + end +end diff --git a/lib/surrealix.ex b/lib/surrealix.ex index dab8779..5adf909 100644 --- a/lib/surrealix.ex +++ b/lib/surrealix.ex @@ -16,6 +16,9 @@ defmodule Surrealix do Show all currently registered live queries (SQL) """ defdelegate all_live_queries(pid), to: Api + defdelegate reset_live_queries(pid), to: Socket + defdelegate set_auth_ready(pid, value), to: Socket + defdelegate wait_until_auth_ready(pid), to: Socket @doc """ Convenience method, that combines sending an query (live_query) and registering a callback diff --git a/lib/surrealix/api.ex b/lib/surrealix/api.ex index cb4c09b..279acf8 100644 --- a/lib/surrealix/api.ex +++ b/lib/surrealix/api.ex @@ -18,7 +18,7 @@ defmodule Surrealix.Api do Show all currently registered live queries (SQL) """ def all_live_queries(pid) do - :sys.get_state(pid) |> SocketState.all_lq() + :sys.get_state(pid) |> SocketState.all_live_queries() end @doc """ @@ -27,14 +27,14 @@ defmodule Surrealix.Api do Params: sql: string vars: map with variables to interpolate into SQL - callback: fn (event, data, config) + callback: fn (data, live_query_id) """ - @spec live_query(pid(), String.t(), map(), (any, any, list() -> any)) :: :ok + @spec live_query(pid(), String.t(), map(), (any, String.t() -> any)) :: :ok def live_query(pid, sql, vars \\ %{}, callback) do with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)}, {:ok, res} <- query(pid, sql, vars), %{"result" => [%{"result" => lq_id}]} <- res do - :ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback}) + :ok = WebSockex.cast(pid, {:register_live_query, sql, lq_id, callback}) {:ok, res} else {:sql_live_check, false} -> {:error, "Not a live query: `#{sql}`!"} diff --git a/lib/surrealix/application.ex b/lib/surrealix/application.ex index c0b830a..0117f16 100644 --- a/lib/surrealix/application.ex +++ b/lib/surrealix/application.ex @@ -5,7 +5,9 @@ defmodule Surrealix.Application do @impl true def start(_type, _args) do - children = [] + children = [ + {Surrealix.RescueProcess, []} + ] opts = [strategy: :one_for_one, name: Surrealix.Supervisor] Supervisor.start_link(children, opts) diff --git a/lib/surrealix/config.ex b/lib/surrealix/config.ex index 587bb29..e17bdbb 100644 --- a/lib/surrealix/config.ex +++ b/lib/surrealix/config.ex @@ -20,5 +20,12 @@ defmodule Surrealix.Config do def base_conn_opts, do: @base_connection_opts @timeout Application.compile_env(:surrealix, :timeout, 5000) + + def default_timeout, do: @timeout def task_opts_default, do: [timeout: @timeout] + + @backoff_max Application.compile_env(:surrealix, :backoff_max, 2000) + @backoff_step Application.compile_env(:surrealix, :backoff_step, 50) + def backoff_max, do: @backoff_max + def backoff_step, do: @backoff_step end diff --git a/lib/surrealix/patiently.ex b/lib/surrealix/patiently.ex new file mode 100644 index 0000000..5e21984 --- /dev/null +++ b/lib/surrealix/patiently.ex @@ -0,0 +1,134 @@ +defmodule Surrealix.Patiently do + alias Surrealix.Patiently + ## https://github.com/dantswain/patiently/blob/main/lib/patiently.ex + @moduledoc false + @type iteration :: (-> term) + @type reducer :: (term -> term) + @type predicate :: (term -> boolean) + @type condition :: (-> boolean) + @type opt :: {:dwell, pos_integer} | {:max_tries, pos_integer} + @type opts :: [opt] + + defmodule GaveUp do + @moduledoc """ + Exception raised by Patiently when a condition fails to converge + """ + + defexception message: nil + @type t :: %__MODULE__{__exception__: true} + + @doc false + @spec exception({pos_integer, pos_integer}) :: t + def exception({dwell, max_tries}) do + message = + "Gave up waiting for condition after #{max_tries} " <> + "iterations waiting #{dwell} msec between tries." + + %Patiently.GaveUp{message: message} + end + end + + @default_dwell 100 + @default_tries 10 + + @spec wait_for(condition, opts) :: :ok | :error + def wait_for(condition, opts \\ []) do + wait_while(condition, & &1, opts) + end + + @spec wait_for!(condition, opts) :: :ok | no_return + def wait_for!(condition, opts \\ []) do + ok_or_raise(wait_for(condition, opts), opts) + end + + @spec wait_for(iteration, predicate, opts) :: :ok | :error + def wait_for(iteration, condition, opts) do + wait_while(iteration, condition, opts) + end + + @spec wait_for!(iteration, predicate, opts) :: :ok | no_return + def wait_for!(iteration, condition, opts) do + ok_or_raise(wait_for(iteration, condition, opts), opts) + end + + @spec wait_reduce(reducer, predicate, term, opts) :: {:ok, term} | {:error, term} + def wait_reduce(reducer, predicate, acc0, opts) do + wait_reduce_loop(reducer, predicate, acc0, 0, opts) + end + + @spec wait_reduce!(reducer, predicate, term, opts) :: {:ok, term} | no_return + def wait_reduce!(reducer, predicate, acc0, opts) do + ok_or_raise(wait_reduce_loop(reducer, predicate, acc0, 0, opts), opts) + end + + @spec wait_flatten(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | {:error, [term]} + def wait_flatten(iteration, predicate, opts \\ []) + + def wait_flatten(iteration, min_length, opts) when is_integer(min_length) and min_length > 0 do + wait_flatten(iteration, fn acc -> length(acc) >= min_length end, opts) + end + + def wait_flatten(iteration, predicate, opts) when is_function(predicate, 1) do + reducer = fn acc -> List.flatten([iteration.() | acc]) end + wait_reduce_loop(reducer, predicate, [], 0, opts) + end + + @spec wait_flatten!(iteration, predicate | pos_integer, opts) :: {:ok, [term]} | no_return + def wait_flatten!(iteration, predicate_or_min_length, opts) do + ok_or_raise(wait_flatten(iteration, predicate_or_min_length, opts), opts) + end + + @spec wait_for_death(pid, opts) :: :ok | :error + def wait_for_death(pid, opts \\ []) do + wait_for(fn -> !Process.alive?(pid) end, opts) + end + + @spec wait_for_death!(pid, opts) :: :ok | no_return + def wait_for_death!(pid, opts \\ []) do + ok_or_raise(wait_for_death(pid, opts), opts) + end + + defp ok_or_raise(:ok, _), do: :ok + defp ok_or_raise({:ok, acc}, _), do: {:ok, acc} + + defp ok_or_raise(:error, opts) do + raise Patiently.GaveUp, {dwell(opts), max_tries(opts)} + end + + defp ok_or_raise({:error, _}, opts) do + raise Patiently.GaveUp, {dwell(opts), max_tries(opts)} + end + + defp just_status({:ok, _}), do: :ok + defp just_status({:error, _}), do: :error + + defp wait_while(poller, condition, opts) do + reducer = fn acc -> [poller.() | acc] end + predicate = fn [most_recent | _] -> condition.(most_recent) end + ok_or_err = wait_reduce_loop(reducer, predicate, [], 0, opts) + just_status(ok_or_err) + end + + defp wait_reduce_loop(reducer, predicate, acc, tries, opts) do + acc_out = reducer.(acc) + + if predicate.(acc_out) do + {:ok, acc_out} + else + if tries >= max_tries(opts) do + {:error, acc_out} + else + :timer.sleep(dwell(opts)) + wait_reduce_loop(reducer, predicate, acc_out, tries + 1, opts) + end + end + end + + defp dwell(opts) do + Keyword.get(opts, :dwell, @default_dwell) + end + + defp max_tries(opts) do + Keyword.get(opts, :max_tries, @default_tries) + end +end diff --git a/lib/surrealix/rescue_process.ex b/lib/surrealix/rescue_process.ex new file mode 100644 index 0000000..11da77c --- /dev/null +++ b/lib/surrealix/rescue_process.ex @@ -0,0 +1,54 @@ +defmodule Surrealix.RescueProcess do + @moduledoc """ + This module is responsible to execute callbacks for on_auth hooks, that happen after a connection is established. + This can not be done direcly in the `handle_connect` callback, since then it blocks the execution of the WebSockex process. + + To workaround this issue, we delegate this responsibility to a `RescueProcess`, that executes those callbacks out-of-band. + Also we need to use `GenServer.cast`, so that the Socket can properly continue and not be deadlocked. + """ + use GenServer + alias Surrealix.SocketState + + def start_link([]) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def execute_callback({socket_pid, state = %SocketState{}}) do + GenServer.cast(__MODULE__, {:execute, socket_pid, state}) + end + + ################# + # Callbacks + ################# + + @impl true + def init([]) do + {:ok, []} + end + + @impl true + def handle_cast({:execute, socket_pid, socket_state = %SocketState{}}, _state) do + if(!is_nil(socket_state.on_auth)) do + # set AUTH status to false, so that the busy-waiting does not trigger + Surrealix.set_auth_ready(socket_pid, false) + + # on_auth callback used to login / and pick NS / DB + socket_state.on_auth.(socket_pid, socket_state) + + # now reconnect live queries + queries = SocketState.all_live_queries(socket_state) + # we need to reset the current state of live queries, since the connection was dead anyways + Surrealix.reset_live_queries(socket_pid) + + # now we re-establish all live queries, that we very listening to before the connection drop. + for {sql, callback} <- queries do + Surrealix.live_query(socket_pid, sql, callback) + end + + # set AUTH status to true, so `Surrealix.wait_until_auth_ready(pid)` unblocks and allows further queries on the authenticated socket. + Surrealix.set_auth_ready(socket_pid, true) + end + + {:noreply, []} + end +end diff --git a/lib/surrealix/socket.ex b/lib/surrealix/socket.ex index db4fade..56fd4ea 100644 --- a/lib/surrealix/socket.ex +++ b/lib/surrealix/socket.ex @@ -7,6 +7,8 @@ defmodule Surrealix.Socket do alias Surrealix.Api alias Surrealix.Config + alias Surrealix.Patiently + alias Surrealix.RescueProcess alias Surrealix.SocketState alias Surrealix.Telemetry alias Surrealix.Util @@ -14,7 +16,6 @@ defmodule Surrealix.Socket do require Logger @type base_connection_opts :: Config.socket_opts() - @type on_start :: {:ok, pid} | {:error, term} @spec start(Config.socket_opts()) :: on_start() @@ -29,16 +30,13 @@ defmodule Surrealix.Socket do defp generic_start(opts, fun_name) when fun_name in [:start, :start_link] do opts = Keyword.merge(Config.base_conn_opts(), opts) - - hostname = Keyword.get(opts, :hostname) port = Keyword.get(opts, :port) + hostname = Keyword.get(opts, :hostname) + on_auth = Keyword.get(opts, :on_auth) - apply(WebSockex, fun_name, [ - "ws://#{hostname}:#{port}/rpc", - __MODULE__, - SocketState.new(), - opts - ]) + state = SocketState.new(on_auth) + url = "ws://#{hostname}:#{port}/rpc" + apply(WebSockex, fun_name, [url, __MODULE__, state, opts]) end @spec stop(pid()) :: :ok @@ -47,44 +45,21 @@ defmodule Surrealix.Socket do :ok end - def terminate(reason, state) do - Logger.debug("Socket terminating:\n#{inspect(reason)}\n\n#{inspect(state)}\n") - exit(:normal) + def wait_until_auth_ready(pid) do + Patiently.wait_for(fn -> SocketState.is_auth_ready(:sys.get_state(pid)) end) end - def handle_cast({:register_lq, sql, query_id, callback}, state) do - state = SocketState.add_lq(state, sql, query_id, callback) - {:ok, state} + def set_auth_ready(pid, value) do + WebSockex.cast(pid, {:set_auth_ready, value}) end - def handle_cast({method, args, id, task}, state) do - Logger.debug("[surrealix] [handle_cast] #{inspect(state)}") - payload = Api.build_cast_payload(method, args, id) - state = SocketState.add_task(state, id, task) - frame = {:text, payload} - {:reply, frame, state} + def reset_live_queries(pid) do + WebSockex.cast(pid, {:reset_live_queries}) end - def handle_frame({_type, msg}, state) do - json = Jason.decode!(msg) - id = Map.get(json, "id") - task = SocketState.get_task(state, id) - - if is_nil(task) do - # No registered task for this ID, must be a live query update - lq_id = get_in(json, ["result", "id"]) - lq_item = SocketState.get_lq(state, lq_id) - - if(!is_nil(lq_item)) do - lq_item.callback.(json, lq_id) - end - else - if Process.alive?(task.pid) do - Process.send(task.pid, {:ok, json, id}, []) - end - end - - {:ok, SocketState.delete_task(state, id)} + def terminate(reason, state) do + debug("terminate", reason: reason, state: state) + exit(:normal) end def exec_method(pid, {method, args, task}, opts \\ []) do @@ -112,9 +87,79 @@ defmodule Surrealix.Socket do WebSockex.cast(pid, {method, args, id, task}) - task_timeout = Keyword.get(opts, :timeout, :infinity) + task_timeout = Keyword.get(opts, :timeout, Config.default_timeout()) res = Task.await(task, task_timeout) Telemetry.stop(:exec_method, start_time, meta) res end + + #################################### + # CALLBACKS + #################################### + + def handle_connect(conn, state = %SocketState{}) do + debug("handle_connect", state: state, conn: conn) + + if(state.on_auth) do + RescueProcess.execute_callback({self(), state}) + end + + {:ok, state} + end + + def handle_disconnect(connection_status_map, state) do + attempt_number = connection_status_map.attempt_number + to_sleep = min(Config.backoff_max(), attempt_number * Config.backoff_step()) + + debug("handle_disconnect", status: connection_status_map) + debug("handle_disconnect", "******** SLEEPING FOR #{to_sleep}ms...") + Process.sleep(to_sleep) + + {:reconnect, state} + end + + def handle_cast({:register_live_query, sql, query_id, callback}, state) do + {:ok, SocketState.register_live_query(state, sql, query_id, callback)} + end + + def handle_cast({:reset_live_queries}, state) do + {:ok, SocketState.reset_live_queries(state)} + end + + def handle_cast({:set_auth_ready, value}, state) do + {:ok, SocketState.set_auth_ready(state, value)} + end + + def handle_cast({method, args, id, task}, state) do + debug("handle_cast", state) + payload = Api.build_cast_payload(method, args, id) + + {:reply, {:text, payload}, SocketState.register_task(state, id, task)} + end + + def handle_frame({_type, msg}, state) do + json = Jason.decode!(msg) + id = Map.get(json, "id") + task = SocketState.get_task(state, id) + + if is_nil(task) do + # No registered task for this ID, must be a live query update + lq_id = get_in(json, ["result", "id"]) + lq_item = SocketState.get_live_query(state, lq_id) + + if(!is_nil(lq_item)) do + lq_item.callback.(json, lq_id) + end + else + if Process.alive?(task.pid) do + Process.send(task.pid, {:ok, json, id}, []) + end + end + + {:ok, SocketState.delete_task(state, id)} + end + + defp debug(area, data) do + Logger.debug("[surrealix] [#{area}] #{inspect(data)}") + end end diff --git a/lib/surrealix/socket_state.ex b/lib/surrealix/socket_state.ex index 3e446d3..284c718 100644 --- a/lib/surrealix/socket_state.ex +++ b/lib/surrealix/socket_state.ex @@ -8,18 +8,29 @@ defmodule Surrealix.SocketState do @doc """ - `pending`: Pending requests map: id => task - `lq_running`: live_queries map: id => %{sql: sql} - - `lq_sql`: live_queries SET with queries to register after re-connection + - `lq_sql`: live_queries MapSet with queries to register after re-connection """ defstruct pending: %{}, lq_running: %{}, - lq_sql: MapSet.new() + lq_sql: MapSet.new(), + auth_ready: false, + on_auth: nil def new(), do: %SocketState{} + def new(on_auth), do: %SocketState{on_auth: on_auth} + + def set_auth_ready(state = %SocketState{}, value) do + put_in(state, [:auth_ready], value) + end + + def is_auth_ready(state = %SocketState{}) do + state.auth_ready == true + end @doc """ Register a task for a particular request ID """ - def add_task(state = %SocketState{}, id, task) do + def register_task(state = %SocketState{}, id, task) do put_in(state, [:pending, id], task) end @@ -41,7 +52,7 @@ defmodule Surrealix.SocketState do @doc """ Register a SQL statement for a particular LiveQuery ID """ - def add_lq(state = %SocketState{}, sql, query_id, callback) do + def register_live_query(state = %SocketState{}, sql, query_id, callback) do lq_sql = MapSet.put(state.lq_sql, {sql, callback}) item = %{sql: sql, query_id: query_id, callback: callback} @@ -53,7 +64,7 @@ defmodule Surrealix.SocketState do @doc """ Get map that describes a particular LiveQuery ID (SQL / ID / etc) """ - def get_lq(state = %SocketState{}, query_id) do + def get_live_query(state = %SocketState{}, query_id) do state |> get_in([:lq_running, query_id]) end @@ -61,7 +72,7 @@ defmodule Surrealix.SocketState do @doc """ Remove a LiveQuery by ID """ - def delete_lq_by_id(state = %SocketState{}, query_id) do + def delete_live_query_by_id(state = %SocketState{}, query_id) do {item, state} = pop_in(state, [:lq_running, query_id]) if item do @@ -77,7 +88,7 @@ defmodule Surrealix.SocketState do @doc """ Remove a LiveQuery by SQL """ - def delete_lq_by_sql(state = %SocketState{}, sql) do + def delete_live_query_by_sql(state = %SocketState{}, sql) do found = Enum.find(state.lq_running, fn {_id, value} -> Map.get(value, :sql) == sql end) if !is_nil(found) do @@ -90,10 +101,14 @@ defmodule Surrealix.SocketState do end end + def reset_live_queries(state = %SocketState{}) do + Map.put(state, :lq_running, %{}) |> Map.put(:lq_sql, MapSet.new()) + end + @doc """ Currently registered LiveQueries """ - def all_lq(state = %SocketState{}) do + def all_live_queries(state = %SocketState{}) do state.lq_sql |> MapSet.to_list() end end diff --git a/lib/surrealix/socket_state_test.exs b/lib/surrealix/socket_state_test.exs index f692917..bd37e07 100644 --- a/lib/surrealix/socket_state_test.exs +++ b/lib/surrealix/socket_state_test.exs @@ -6,9 +6,9 @@ defmodule Surrealix.SocketStateTest do def dummy_callback(), do: fn -> nil end describe "tasks" do - test "add_task / get_task / delete_task" do + test "register_task / get_task / delete_task" do state = SocketState.new() - state = state |> SocketState.add_task("111", :task1) + state = SocketState.register_task(state, "111", :task1) assert SocketState.get_task(state, "111") == :task1 state = SocketState.delete_task(state, "111") @@ -17,69 +17,69 @@ defmodule Surrealix.SocketStateTest do test "delete accepts non-existing ids" do state = SocketState.new() - state = state |> SocketState.add_task("111", :task1) + state = SocketState.register_task(state, "111", :task1) state = SocketState.delete_task(state, "1111") assert SocketState.get_task(state, "111") == :task1 end end describe "lq" do - test "add_lq / get_lq" do + test "register_live_query / get_live_query" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) - assert SocketState.get_lq(state, "11-22") == %{ + assert SocketState.get_live_query(state, "11-22") == %{ callback: cb, query_id: "11-22", sql: "select * from person" } end - test "all_lq" do + test "all_live_queries" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) - state = state |> SocketState.add_lq("select * from user", "11-23", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from user", "11-23", cb) - assert SocketState.all_lq(state) == [ + assert SocketState.all_live_queries(state) == [ {"select * from person", cb}, {"select * from user", cb} ] end - test "delete_lq_by_id" do + test "delete_live_query_by_id" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) - state = state |> SocketState.add_lq("select * from user", "11-23", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from user", "11-23", cb) - assert SocketState.all_lq(state) == [ + assert SocketState.all_live_queries(state) == [ {"select * from person", cb}, {"select * from user", cb} ] - state = state |> SocketState.delete_lq_by_id("11-23") - assert SocketState.all_lq(state) == [{"select * from person", cb}] - assert SocketState.get_lq(state, "11-23") == nil + state = SocketState.delete_live_query_by_id(state, "11-23") + assert SocketState.all_live_queries(state) == [{"select * from person", cb}] + assert SocketState.get_live_query(state, "11-23") == nil end - test "delete_lq_by_sql" do + test "delete_live_query_by_sql" do state = SocketState.new() cb = dummy_callback() - state = state |> SocketState.add_lq("select * from person", "11-22", cb) - state = state |> SocketState.add_lq("select * from user", "11-23", cb) + state = SocketState.register_live_query(state, "select * from person", "11-22", cb) + state = SocketState.register_live_query(state, "select * from user", "11-23", cb) - assert SocketState.all_lq(state) == [ + assert SocketState.all_live_queries(state) == [ {"select * from person", cb}, {"select * from user", cb} ] - state = SocketState.delete_lq_by_sql(state, "select * from person") - assert SocketState.all_lq(state) == [{"select * from user", cb}] - assert SocketState.get_lq(state, "11-22") == nil + state = SocketState.delete_live_query_by_sql(state, "select * from person") + assert SocketState.all_live_queries(state) == [{"select * from user", cb}] + assert SocketState.get_live_query(state, "11-22") == nil - assert SocketState.get_lq(state, "11-23") == %{ + assert SocketState.get_live_query(state, "11-23") == %{ query_id: "11-23", sql: "select * from user", callback: cb