Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add async task wrapper for trace propagation. #757

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions lib/absinthe/async_task_wrapper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule Absinthe.AsyncTaskWrapper do
@moduledoc """
Provides a way to extend Absinthe's asynchronous resolution.

Trace span propagation, Logger metadata, and other mechanisms use the process dictionary to
pass information down the call stack without needing every function to take it as an argument
and pass it down. Absinthe's asynchronous resolution disrupts this style of propagation. To
repair it, you can either:

* Replace all use of `Absinthe.Middleware.Async` with `Task.async/1` and the closure of your
choice, or

* Implement this behaviour and configure Absinthe to use it.

## Example

To propagate your app's custom `:myapp_span_context` value from the process dictionary of the
blueprint to any async resolver function, define a module with a `c:wrap/2` callback...

```elixir
defmodule MyApp.AsyncTaskWrapper do
@behaviour Absinthe.AsyncTaskWrapper
@impl true
def wrap(fun, _) do
ctx = Process.get(:myapp_span_context)
fn ->
Process.put(:myapp_span_context, ctx)
apply(fun, [])
end
end
end
```

... and configure Absinthe to use it:
```elixir
config :absinthe, async_task_wrapper: MyApp.AsyncTaskWrapper
```

See also:

* `Logger.metadata/0`
* `Logger.metadata/1`
* `Process.get/1`
* `Process.put/2`
* `Task.async/1`
* `Task.await/1`
"""

alias Absinthe.Blueprint.Execution
alias Absinthe.Resolution

@doc """
Wrap a function before its execution by `Task.async/1`.

Called with the original function and either:

* The `t:Absinthe.Blueprint.Execution.t/0` via `Absinthe.Middleware.Batch`, or
* The `t:Absinthe.Resolution.t/0` via `Absinthe.Middleware.Async`.

Your `c:wrap/2` [MUST] return a zero-arity anonymous function, as expected by `Task.async/1`.

[MUST]: https://tools.ietf.org/html/rfc2119#section-1
"""
@callback wrap(fun :: (() -> any()), exec :: Execution.t() | Resolution.t()) :: (() -> any())

@doc """
Starts a task that must be awaited on, after wrapping it as configured.

Intended for use by Absinthe and its plugins.
"""
def async(fun, res) when is_function(fun, 0) do
fun =
case Application.get_env(:absinthe, :async_task_wrapper) do
nil -> fun
module -> apply(module, :wrap, [fun, res])
end

Task.async(fun)
end
end
9 changes: 6 additions & 3 deletions lib/absinthe/middleware/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ defmodule Absinthe.Middleware.Async do
@moduledoc """
This plugin enables asynchronous execution of a field.

See also `Absinthe.Resolution.Helpers.async/1`
See also:

* `Absinthe.Resolution.Helpers.async/1`
* `Absinthe.AsyncTaskWrapper`

# Example Usage:

Expand All @@ -17,7 +20,7 @@ defmodule Absinthe.Middleware.Async do
end
```

Using the bare plugin API
Using the bare plugin API, disabling any configured `Absinthe.AsyncTaskWrapper`:
```elixir
field :time_consuming, :thing do
resolve fn _, _, _ ->
Expand Down Expand Up @@ -51,7 +54,7 @@ defmodule Absinthe.Middleware.Async do
# stack for this field. On the next resolution pass, we need to `Task.await` the
# task so we have actual data. Thus, we prepend this module to the middleware stack.
def call(%{state: :unresolved} = res, {fun, opts}) when is_function(fun),
do: call(res, {Task.async(fun), opts})
do: call(res, {Absinthe.AsyncTaskWrapper.async(fun, res), opts})

def call(%{state: :unresolved} = res, {task, opts}) do
task_data = {task, opts}
Expand Down
20 changes: 13 additions & 7 deletions lib/absinthe/middleware/batch.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,25 +134,31 @@ defmodule Absinthe.Middleware.Batch do
end

def after_resolution(exec) do
output = do_batching(exec.acc[__MODULE__][:input])
put_in(exec.acc[__MODULE__][:output], output)
exec |> do_batching() |> put_batching_output(exec)
end

defp do_batching(input) do
input
defp do_batching(exec) do
exec.acc[__MODULE__][:input]
|> Enum.group_by(&elem(&1, 0), &elem(&1, 1))
|> Enum.map(fn {{batch_fun, batch_opts}, batch_data} ->
{batch_opts,
Task.async(fn ->
{batch_fun, call_batch_fun(batch_fun, batch_data)}
end)}
Absinthe.AsyncTaskWrapper.async(
fn ->
{batch_fun, call_batch_fun(batch_fun, batch_data)}
end,
exec
)}
end)
|> Map.new(fn {batch_opts, task} ->
timeout = Keyword.get(batch_opts, :timeout, 5_000)
Task.await(task, timeout)
end)
end

defp put_batching_output(output, exec) do
put_in(exec.acc[__MODULE__][:output], output)
end

defp call_batch_fun({module, fun}, batch_data) do
call_batch_fun({module, fun, []}, batch_data)
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule Elixir.Absinthe.Integration.Execution.TelemetryTracePropagationTest do
use Absinthe.Case, async: true
import ExUnit.Assertions

setup context do
:telemetry.attach_many(
context.test,
[
[:absinthe, :resolve, :field, :start],
[:absinthe, :resolve, :field],
[:absinthe, :execute, :operation, :start],
[:absinthe, :execute, :operation]
],
&__MODULE__.handle_event/4,
%{}
)

on_exit(fn ->
:telemetry.detach(context.test)
end)

:ok
end

def handle_event(event, measurements, metadata, config) do
send(self(), {event, measurements, metadata, config})
end

defmodule TestSchema do
use Absinthe.Schema

object :field_resolution_timing do
field :begin_ms, :integer
field :delay_ms, :integer
field :end_ms, :integer
field :label, :integer
end

query do
field :delay_sync, :field_resolution_timing do
arg :delay_ms, :integer

resolve fn _, %{delay_ms: delay_ms}, _ ->
{:ok, delay_and_report(:delay_sync, delay_ms)}
end
end

field :delay_async, :field_resolution_timing do
arg :delay_ms, :integer

resolve fn _, %{delay_ms: delay_ms}, _ ->
async(fn ->
{:ok, delay_and_report(:delay_async, delay_ms)}
end)
end
end
end

def delay_and_report(field_name, delay_ms)
when is_atom(field_name) and is_integer(delay_ms) do
begin_ms = :os.system_time(:milli_seconds)
:timer.sleep(delay_ms)
end_ms = :os.system_time(:milli_seconds)

%{
begin_ms: begin_ms,
delay_ms: delay_ms,
end_ms: end_ms,
label: label()
}
end

defp label do
case :seq_trace.get_token(:label) do
[] -> 0
{:label, n} -> n
end
end
end

test "Execute expected telemetry events" do
query = """
query AskForAsyncThenSync ($delay_ms_async: Int!, $delay_ms_sync: Int!) {
delayAsync(delay_ms: $delay_ms_async) {
begin_ms
delay_ms
end_ms
label
}
delaySync(delay_ms: $delay_ms_sync) {
begin_ms
delay_ms
end_ms
label
}
}
"""

:seq_trace.set_token(:label, 23)

delay_ms_async = 10
delay_ms_sync = 100

{:ok, %{data: data}} =
Absinthe.run(query, TestSchema,
variables: %{"delay_ms_async" => delay_ms_async, "delay_ms_sync" => delay_ms_sync}
)

:seq_trace.set_token([])

assert %{"delayAsync" => result_async, "delaySync" => result_sync} = data
assert_in_delta(duration_ms(result_async), delay_ms_async, delay_ms_async / 10)
assert_in_delta(duration_ms(result_sync), delay_ms_sync, delay_ms_sync / 10)

IO.inspect({result_async, result_sync}, label: "results")
IO.inspect(duration_ms(result_async), label: "result_async duration_ms")
IO.inspect(duration_ms(result_sync), label: "result_sync duration_ms")
IO.inspect(overlap?(result_async, result_sync), label: "overlap")

assert_receive {[:absinthe, :resolve, :field], measurements1, _, _}
assert_receive {[:absinthe, :resolve, :field], measurements2, _, _}
assert is_number(measurements1[:duration])
assert is_number(measurements2[:duration])

[slower, faster] =
[measurements1, measurements2] |> Enum.map(& &1.duration) |> Enum.map(&(&1 / 1_000_000))

IO.inspect([slower, faster], label: ":telemetry reported durations (ms)")
assert_in_delta(faster, delay_ms_async, delay_ms_async / 10)
assert_in_delta(slower, delay_ms_sync, delay_ms_sync / 10)
end

defp duration_ms(%{"begin_ms" => begin_ms, "end_ms" => end_ms}), do: end_ms - begin_ms

defp overlap?(%{"begin_ms" => begin_ms_a, "end_ms" => end_ms_a}, %{
"begin_ms" => begin_ms_b,
"end_ms" => end_ms_b
}),
do: not (end_ms_a <= begin_ms_b or begin_ms_a >= end_ms_b)
end
Loading