diff --git a/lib/ecto_foundationdb/layer/index_inventory.ex b/lib/ecto_foundationdb/layer/index_inventory.ex index 2c684ea..ab6c5da 100644 --- a/lib/ecto_foundationdb/layer/index_inventory.ex +++ b/lib/ecto_foundationdb/layer/index_inventory.ex @@ -289,17 +289,17 @@ defmodule EctoFoundationDB.Layer.IndexInventory do defp tx_idxs_get_wait(tenant, tx, _adapter_opts, source, max_version_future, claim_future) do {start_key, end_key} = idx_range(tenant, source) + idxs_future = :erlfdb.get_range(tx, start_key, end_key, wait: false) + + [max_version, claim, idxs] = + :erlfdb.wait_for_all_interleaving(tx, [max_version_future, claim_future, idxs_future]) + idxs = - tx - |> :erlfdb.get_range(start_key, end_key, wait: true) + idxs |> Enum.map(fn {_, fdb_value} -> Pack.from_fdb_value(fdb_value) end) # high priority first |> Enum.sort(&(Keyword.get(&1, :priority, 0) > Keyword.get(&2, :priority, 0))) - max_version = :erlfdb.wait(max_version_future) - - claim = :erlfdb.wait(claim_future) - case get_partial_idxs(claim, source) do {claim_active?, []} -> {MaxValue.decode(max_version), idxs, [], fn -> not claim_active? end} diff --git a/mix.exs b/mix.exs index 731150b..a3b0ce3 100644 --- a/mix.exs +++ b/mix.exs @@ -49,10 +49,13 @@ defmodule EctoFoundationdb.MixProject do # Run "mix help compile.app" to learn about applications. def application do [ - extra_applications: [:logger] + extra_applications: extra_applications(Mix.env()) ] end + defp extra_applications(:test), do: [:logger, :runtime_tools] + defp extra_applications(_), do: [:logger] + # Run "mix help deps" to learn about dependencies. defp deps do [ diff --git a/test/ecto/integration/crud_test.exs b/test/ecto/integration/crud_test.exs index d2bb0ea..87e62a4 100644 --- a/test/ecto/integration/crud_test.exs +++ b/test/ecto/integration/crud_test.exs @@ -174,7 +174,7 @@ defmodule Ecto.Integration.CrudTest do test "tx_insert", context do tenant = context[:tenant] - # Operations inside a FoundationDB Adapater Transaction have the tenant applied + # Operations inside a FoundationDB Adapter Transaction have the tenant applied # automatically. user = TestRepo.transaction( diff --git a/test/ecto/integration/fdb_api_counting_test.exs b/test/ecto/integration/fdb_api_counting_test.exs new file mode 100644 index 0000000..b6dd33c --- /dev/null +++ b/test/ecto/integration/fdb_api_counting_test.exs @@ -0,0 +1,347 @@ +defmodule Ecto.Integration.FdbApiCountingTest do + use Ecto.Integration.Case, async: false + + alias Ecto.Adapters.FoundationDB + alias Ecto.Integration.TestRepo + + alias EctoFoundationDB.Schemas.User + + # This module tracks all calls from modules that begin with EctoFoundationDB.* to the following + # list of :erlfdb exported functions. If there are new relevant functions to track, they must be + # added here. + # + # By counting and documenting FDB API function calls, we can ensure new calls don't creep in unnoticed, + # and we can provide some explanatory documentation about how the Layer works. + @traced_calls [ + {:erlfdb, :get, 2}, + {:erlfdb, :get_range, 4}, + {:erlfdb, :wait, 1}, + {:erlfdb, :wait_for_any, 1}, + {:erlfdb, :set, 3}, + {:erlfdb, :clear, 2}, + {:erlfdb, :clear_range, 3}, + {:erlfdb, :max, 3}, + {:erlfdb, :fold_range, 5}, + {:erlfdb, :get_mapped_range, 5}, + {:erlfdb, :wait_for_all, 1}, + {:erlfdb, :watch, 2}, + {:erlfdb, :wait_for_all_interleaving, 2} + ] + + def start_trace(target) do + tracer = spawn(fn -> trace_listener([]) end) + trace_flags = [:call, :arity] + match_spec = [{:_, [], [{:message, {{:cp, {:caller}}}}]}] + :erlang.trace_pattern(:on_load, match_spec, [:local]) + :erlang.trace_pattern({:erlfdb, :_, :_}, match_spec, [:local]) + :erlang.trace(target, true, [{:tracer, tracer} | trace_flags]) + tracer + end + + def stop_trace(tracer, target) do + :erlang.trace(target, false, [:all]) + :erlang.send(tracer, {:dump, self()}) + + ret = + receive do + {:acc, acc} -> + {:ok, Enum.reverse(acc)} + after + 5000 -> {:error, :timeout} + end + + Process.exit(tracer, :normal) + ret + end + + defp trace_listener(acc) do + receive do + {:dump, pid} -> + :erlang.send(pid, {:acc, acc}) + + {:trace, _pid, :call, {:erlfdb, fun, arity}, {:cp, {caller, _, _}}} -> + try do + case Module.split(caller) do + ["EctoFoundationDB" | _] -> + if Enum.member?(@traced_calls, {:erlfdb, fun, arity}) do + trace_listener([{caller, fun} | acc]) + else + trace_listener(acc) + end + + _ -> + trace_listener(acc) + end + rescue + _e in ArgumentError -> + trace_listener(acc) + end + + _term -> + trace_listener(acc) + end + end + + def with_erlfdb_calls(fun) do + tracer = start_trace(self()) + res = fun.() + {:ok, calls} = stop_trace(tracer, self()) + {calls, res} + end + + test "counting", context do + tenant = context[:tenant] + + # ================================================================= + # Insert (no index inventory cache) + # ================================================================= + + {calls, alice} = + with_erlfdb_calls(fn -> + {:ok, alice} = + %User{name: "Alice"} + |> FoundationDB.usetenant(tenant) + |> TestRepo.insert() + + alice + end) + + assert [ + # get max_version + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get source claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # there is no cache, so get idxs + {EctoFoundationDB.Layer.IndexInventory, :get_range}, + + # wait for max_version, claim_key, and idxs range + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all_interleaving}, + + # check for existence of primary write + {EctoFoundationDB.Layer.Tx, :get}, + + # wait for existence check + {EctoFoundationDB.Layer.Tx, :wait_for_any}, + + # primary write + {EctoFoundationDB.Layer.TxInsert, :set}, + + # index write + {EctoFoundationDB.Indexer.Default, :set} + ] == calls + + # ================================================================= + # Insert (with index inventory cache) + # ================================================================= + + {calls, _bob} = + with_erlfdb_calls(fn -> + {:ok, bob} = + %User{name: "Bob"} + |> FoundationDB.usetenant(tenant) + |> TestRepo.insert() + + bob + end) + + assert [ + # get max_version and claim_key. We have cached the inventory, so + # these waits are deferred optimistically. + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # check for existence of primary write + {EctoFoundationDB.Layer.Tx, :get}, + {EctoFoundationDB.Layer.Tx, :wait_for_any}, + + # primary write, index write + {EctoFoundationDB.Layer.TxInsert, :set}, + {EctoFoundationDB.Indexer.Default, :set}, + + # wait for max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all} + ] == calls + + # ================================================================= + # Get using primary id + # ================================================================= + + {calls, _} = + with_erlfdb_calls(fn -> + TestRepo.get(User, alice.id, prefix: tenant) + end) + + assert [ + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get and wait primary write key + {EctoFoundationDB.Layer.Query, :get}, + {EctoFoundationDB.Future, :wait_for_all_interleaving}, + + # wait for max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all} + ] = calls + + # ================================================================= + # Update an indexed field + # ================================================================= + + {calls, _} = + with_erlfdb_calls(fn -> + changeset = User.changeset(alice, %{name: "Alicia"}) + {:ok, _} = TestRepo.update(changeset) + end) + + assert [ + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get and wait for existing data from primary write + {EctoFoundationDB.Layer.Tx, :get}, + {EctoFoundationDB.Layer.Tx, :wait_for_any}, + + # set data in primary write + {EctoFoundationDB.Layer.Tx, :set}, + + # clear and set default index. :name has changed + {EctoFoundationDB.Indexer.Default, :clear}, + {EctoFoundationDB.Indexer.Default, :set}, + + # wait for max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all} + ] == calls + + # ================================================================= + # Update an non-indexed field + # ================================================================= + + {calls, _} = + with_erlfdb_calls(fn -> + changeset = User.changeset(alice, %{notes: "Hello world"}) + {:ok, _} = TestRepo.update(changeset) + end) + + assert [ + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get and wait for existing data from primary write + {EctoFoundationDB.Layer.Tx, :get}, + {EctoFoundationDB.Layer.Tx, :wait_for_any}, + + # set data in primary write + {EctoFoundationDB.Layer.Tx, :set}, + + # clear and set default index. @todo, no index has changed, this is write amplification (#25) + {EctoFoundationDB.Indexer.Default, :clear}, + {EctoFoundationDB.Indexer.Default, :set}, + + # wait for max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all} + ] == calls + + # ================================================================= + # Get by indexed field + # ================================================================= + + {calls, _} = + with_erlfdb_calls(fn -> + TestRepo.get_by(User, [name: "Alicia"], prefix: tenant) + end) + + assert [ + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get_mapped_range for :name index. The call to :get_range shown here + # is a tail call from get_mapped_range, so it's expected and harmless + {EctoFoundationDB.Layer.Query, :get_mapped_range}, + {EctoFoundationDB.Layer.Query, :get_range}, + + # wait for get_mapped_range result + {EctoFoundationDB.Future, :wait_for_all_interleaving}, + + # wait for max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all} + ] == calls + + # ================================================================= + # Async get by indexed field + # ================================================================= + + {calls, _} = + with_erlfdb_calls(fn -> + TestRepo.transaction( + fn -> + f1 = TestRepo.async_get_by(User, name: "Alicia") + f2 = TestRepo.async_get_by(User, name: "Bob") + TestRepo.await([f1, f2]) + end, + prefix: tenant + ) + end) + + assert [ + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get_mapped_range for :name index + {EctoFoundationDB.Layer.Query, :get_mapped_range}, + {EctoFoundationDB.Layer.Query, :get_range}, + + # wait for max_version and claim_key, @todo ideally this happens at the very end (#26) + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all}, + + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # get_mapped_range for :name index + {EctoFoundationDB.Layer.Query, :get_mapped_range}, + {EctoFoundationDB.Layer.Query, :get_range}, + + # max_version and claim_key again. FDB implements a RWY-transaction, so + # this trivially reads from local memory + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all}, + + # wait for results of both get_mapped_range calls + {EctoFoundationDB.Future, :wait_for_all_interleaving} + ] == calls + + # ================================================================= + # Delete + # ================================================================= + + {calls, _} = + with_erlfdb_calls(fn -> + {:ok, _} = TestRepo.delete(alice) + end) + + assert [ + # get max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :get}, + {EctoFoundationDB.Layer.IndexInventory, :get}, + + # check for existence + {EctoFoundationDB.Layer.Tx, :get}, + {EctoFoundationDB.Layer.Tx, :wait_for_any}, + + # clear primary write + {EctoFoundationDB.Layer.Tx, :clear}, + + # clear :name index + {EctoFoundationDB.Indexer.Default, :clear}, + + # wait for max_version and claim_key + {EctoFoundationDB.Layer.IndexInventory, :wait_for_all} + ] == calls + end +end diff --git a/test/support/schemas/user.ex b/test/support/schemas/user.ex index da3148f..05503e6 100644 --- a/test/support/schemas/user.ex +++ b/test/support/schemas/user.ex @@ -9,13 +9,14 @@ defmodule EctoFoundationDB.Schemas.User do schema "users" do field(:name, :string) + field(:notes, :string) timestamps() end def changeset(struct, attrs) do struct - |> cast(attrs, [:name]) + |> cast(attrs, [:name, :notes]) |> validate_required([:name]) end end