Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Count :erlfdb function calls for layer correctness and documentation #27

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions lib/ecto_foundationdb/layer/index_inventory.ex
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,17 @@ defmodule EctoFoundationDB.Layer.IndexInventory do
defp tx_idxs_get_wait(tenant, tx, _adapter_opts, source, max_version_future, claim_future) do
{start_key, end_key} = idx_range(tenant, source)

idxs_future = :erlfdb.get_range(tx, start_key, end_key, wait: false)

[max_version, claim, idxs] =
:erlfdb.wait_for_all_interleaving(tx, [max_version_future, claim_future, idxs_future])

idxs =
tx
|> :erlfdb.get_range(start_key, end_key, wait: true)
idxs
|> Enum.map(fn {_, fdb_value} -> Pack.from_fdb_value(fdb_value) end)
# high priority first
|> Enum.sort(&(Keyword.get(&1, :priority, 0) > Keyword.get(&2, :priority, 0)))

max_version = :erlfdb.wait(max_version_future)

claim = :erlfdb.wait(claim_future)

case get_partial_idxs(claim, source) do
{claim_active?, []} ->
{MaxValue.decode(max_version), idxs, [], fn -> not claim_active? end}
Expand Down
5 changes: 4 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@ defmodule EctoFoundationdb.MixProject do
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
extra_applications: extra_applications(Mix.env())
]
end

defp extra_applications(:test), do: [:logger, :runtime_tools]
defp extra_applications(_), do: [:logger]

# Run "mix help deps" to learn about dependencies.
defp deps do
[
Expand Down
2 changes: 1 addition & 1 deletion test/ecto/integration/crud_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ defmodule Ecto.Integration.CrudTest do
test "tx_insert", context do
tenant = context[:tenant]

# Operations inside a FoundationDB Adapater Transaction have the tenant applied
# Operations inside a FoundationDB Adapter Transaction have the tenant applied
# automatically.
user =
TestRepo.transaction(
Expand Down
347 changes: 347 additions & 0 deletions test/ecto/integration/fdb_api_counting_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
defmodule Ecto.Integration.FdbApiCountingTest do
use Ecto.Integration.Case, async: false

alias Ecto.Adapters.FoundationDB
alias Ecto.Integration.TestRepo

alias EctoFoundationDB.Schemas.User

# This module tracks all calls from modules that begin with EctoFoundationDB.* to the following
# list of :erlfdb exported functions. If there are new relevant functions to track, they must be
# added here.
#
# By counting and documenting FDB API function calls, we can ensure new calls don't creep in unnoticed,
# and we can provide some explanatory documentation about how the Layer works.
@traced_calls [
{:erlfdb, :get, 2},
{:erlfdb, :get_range, 4},
{:erlfdb, :wait, 1},
{:erlfdb, :wait_for_any, 1},
{:erlfdb, :set, 3},
{:erlfdb, :clear, 2},
{:erlfdb, :clear_range, 3},
{:erlfdb, :max, 3},
{:erlfdb, :fold_range, 5},
{:erlfdb, :get_mapped_range, 5},
{:erlfdb, :wait_for_all, 1},
{:erlfdb, :watch, 2},
{:erlfdb, :wait_for_all_interleaving, 2}
]

def start_trace(target) do
tracer = spawn(fn -> trace_listener([]) end)
trace_flags = [:call, :arity]
match_spec = [{:_, [], [{:message, {{:cp, {:caller}}}}]}]
:erlang.trace_pattern(:on_load, match_spec, [:local])
:erlang.trace_pattern({:erlfdb, :_, :_}, match_spec, [:local])
:erlang.trace(target, true, [{:tracer, tracer} | trace_flags])
tracer
end

def stop_trace(tracer, target) do
:erlang.trace(target, false, [:all])
:erlang.send(tracer, {:dump, self()})

ret =
receive do
{:acc, acc} ->
{:ok, Enum.reverse(acc)}
after
5000 -> {:error, :timeout}
end

Process.exit(tracer, :normal)
ret
end

defp trace_listener(acc) do
receive do
{:dump, pid} ->
:erlang.send(pid, {:acc, acc})

{:trace, _pid, :call, {:erlfdb, fun, arity}, {:cp, {caller, _, _}}} ->
try do
case Module.split(caller) do
["EctoFoundationDB" | _] ->
if Enum.member?(@traced_calls, {:erlfdb, fun, arity}) do
trace_listener([{caller, fun} | acc])
else
trace_listener(acc)
end

_ ->
trace_listener(acc)
end
rescue
_e in ArgumentError ->
trace_listener(acc)
end

_term ->
trace_listener(acc)
end
end

def with_erlfdb_calls(fun) do
tracer = start_trace(self())
res = fun.()
{:ok, calls} = stop_trace(tracer, self())
{calls, res}
end

test "counting", context do
tenant = context[:tenant]

# =================================================================
# Insert (no index inventory cache)
# =================================================================

{calls, alice} =
with_erlfdb_calls(fn ->
{:ok, alice} =
%User{name: "Alice"}
|> FoundationDB.usetenant(tenant)
|> TestRepo.insert()

alice
end)

assert [
# get max_version
{EctoFoundationDB.Layer.IndexInventory, :get},

# get source claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},

# there is no cache, so get idxs
{EctoFoundationDB.Layer.IndexInventory, :get_range},

# wait for max_version, claim_key, and idxs range
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all_interleaving},

# check for existence of primary write
{EctoFoundationDB.Layer.Tx, :get},

# wait for existence check
{EctoFoundationDB.Layer.Tx, :wait_for_any},

# primary write
{EctoFoundationDB.Layer.TxInsert, :set},

# index write
{EctoFoundationDB.Indexer.Default, :set}
] == calls

# =================================================================
# Insert (with index inventory cache)
# =================================================================

{calls, _bob} =
with_erlfdb_calls(fn ->
{:ok, bob} =
%User{name: "Bob"}
|> FoundationDB.usetenant(tenant)
|> TestRepo.insert()

bob
end)

assert [
# get max_version and claim_key. We have cached the inventory, so
# these waits are deferred optimistically.
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# check for existence of primary write
{EctoFoundationDB.Layer.Tx, :get},
{EctoFoundationDB.Layer.Tx, :wait_for_any},

# primary write, index write
{EctoFoundationDB.Layer.TxInsert, :set},
{EctoFoundationDB.Indexer.Default, :set},

# wait for max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all}
] == calls

# =================================================================
# Get using primary id
# =================================================================

{calls, _} =
with_erlfdb_calls(fn ->
TestRepo.get(User, alice.id, prefix: tenant)
end)

assert [
# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# get and wait primary write key
{EctoFoundationDB.Layer.Query, :get},
{EctoFoundationDB.Future, :wait_for_all_interleaving},

# wait for max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all}
] = calls

# =================================================================
# Update an indexed field
# =================================================================

{calls, _} =
with_erlfdb_calls(fn ->
changeset = User.changeset(alice, %{name: "Alicia"})
{:ok, _} = TestRepo.update(changeset)
end)

assert [
# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# get and wait for existing data from primary write
{EctoFoundationDB.Layer.Tx, :get},
{EctoFoundationDB.Layer.Tx, :wait_for_any},

# set data in primary write
{EctoFoundationDB.Layer.Tx, :set},

# clear and set default index. :name has changed
{EctoFoundationDB.Indexer.Default, :clear},
{EctoFoundationDB.Indexer.Default, :set},

# wait for max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all}
] == calls

# =================================================================
# Update an non-indexed field
# =================================================================

{calls, _} =
with_erlfdb_calls(fn ->
changeset = User.changeset(alice, %{notes: "Hello world"})
{:ok, _} = TestRepo.update(changeset)
end)

assert [
# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# get and wait for existing data from primary write
{EctoFoundationDB.Layer.Tx, :get},
{EctoFoundationDB.Layer.Tx, :wait_for_any},

# set data in primary write
{EctoFoundationDB.Layer.Tx, :set},

# clear and set default index. @todo, no index has changed, this is write amplification (#25)
{EctoFoundationDB.Indexer.Default, :clear},
{EctoFoundationDB.Indexer.Default, :set},

# wait for max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all}
] == calls

# =================================================================
# Get by indexed field
# =================================================================

{calls, _} =
with_erlfdb_calls(fn ->
TestRepo.get_by(User, [name: "Alicia"], prefix: tenant)
end)

assert [
# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# get_mapped_range for :name index. The call to :get_range shown here
# is a tail call from get_mapped_range, so it's expected and harmless
{EctoFoundationDB.Layer.Query, :get_mapped_range},
{EctoFoundationDB.Layer.Query, :get_range},

# wait for get_mapped_range result
{EctoFoundationDB.Future, :wait_for_all_interleaving},

# wait for max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all}
] == calls

# =================================================================
# Async get by indexed field
# =================================================================

{calls, _} =
with_erlfdb_calls(fn ->
TestRepo.transaction(
fn ->
f1 = TestRepo.async_get_by(User, name: "Alicia")
f2 = TestRepo.async_get_by(User, name: "Bob")
TestRepo.await([f1, f2])
end,
prefix: tenant
)
end)

assert [
# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# get_mapped_range for :name index
{EctoFoundationDB.Layer.Query, :get_mapped_range},
{EctoFoundationDB.Layer.Query, :get_range},

# wait for max_version and claim_key, @todo ideally this happens at the very end (#26)
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all},

# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# get_mapped_range for :name index
{EctoFoundationDB.Layer.Query, :get_mapped_range},
{EctoFoundationDB.Layer.Query, :get_range},

# max_version and claim_key again. FDB implements a RWY-transaction, so
# this trivially reads from local memory
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all},

# wait for results of both get_mapped_range calls
{EctoFoundationDB.Future, :wait_for_all_interleaving}
] == calls

# =================================================================
# Delete
# =================================================================

{calls, _} =
with_erlfdb_calls(fn ->
{:ok, _} = TestRepo.delete(alice)
end)

assert [
# get max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :get},
{EctoFoundationDB.Layer.IndexInventory, :get},

# check for existence
{EctoFoundationDB.Layer.Tx, :get},
{EctoFoundationDB.Layer.Tx, :wait_for_any},

# clear primary write
{EctoFoundationDB.Layer.Tx, :clear},

# clear :name index
{EctoFoundationDB.Indexer.Default, :clear},

# wait for max_version and claim_key
{EctoFoundationDB.Layer.IndexInventory, :wait_for_all}
] == calls
end
end
Loading
Loading