Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate OpenTelemetry context in tasks #152

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions guides/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,29 @@ After a query is executed, you'll see something like:
}
}
```

## Opentelemetry

When using Opentelemetry, one usually wants to correlate spans that are created
in spawned tasks with the main trace. For example, you might have a trace started
in a Phoenix endpoint, and then have spans around database access.

One can correlate manually by attaching the OTel context the task function:

```elixir
ctx = OpenTelemetry.Ctx.get_current()

Task.async(fn ->
OpenTelemetry.Ctx.attach(ctx)

# do stuff that might create spans
end)
```

When using Dataloader, the tasks are spawned by the loader itself, so you can't
attach the context manually.

Instead, you can add the `:opentelemetry_process_propagator` package to your
dependencies, which has suitable wrappers that will attach the context
automatically. If the package is installed, Dataloader will use it in place
of the default `Task.async/1` and `Task.async_stream/3`.
20 changes: 18 additions & 2 deletions lib/dataloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ defmodule Dataloader do
# if the current process is linked to something, and then that something
# dies in the middle of us loading stuff.
task =
Task.async(fn ->
async(fn ->
# The purpose of `:trap_exit` here is so that we can ensure that any failures
# within the tasks do not kill the current process. We want to get results
# back no matter what.
Expand Down Expand Up @@ -359,7 +359,7 @@ defmodule Dataloader do
results =
if Keyword.get(opts, :async?, true) do
items
|> Task.async_stream(fun, task_opts)
|> async_stream(fun, task_opts)
|> Enum.map(fn
{:ok, result} -> {:ok, result}
{:exit, reason} -> {:error, reason}
Expand Down Expand Up @@ -390,4 +390,20 @@ defmodule Dataloader do
def pmap(items, fun, opts \\ []) do
async_safely(__MODULE__, :run_tasks, [items, fun, opts])
end

# Optionally use `async/1` and `async_stream/3` functions from
# `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: OpentelemetryProcessPropagator.Task

@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task
else
@spec async((() -> any)) :: Task.t()
defdelegate async(fun), to: Task

@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: Task
end
end
12 changes: 11 additions & 1 deletion lib/dataloader/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ if Code.ensure_loaded?(Ecto) do
end

defp maybe_async_stream(batches, fun, options, true) do
Task.async_stream(batches, fun, options)
async_stream(batches, fun, options)
end

defp maybe_async_stream(batches, fun, _options, _) do
Expand Down Expand Up @@ -964,6 +964,16 @@ if Code.ensure_loaded?(Ecto) do
other
end
end

# Optionally use `async_stream/3` function from
# `opentelemetry_process_propagator` if available
if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do
@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task
else
@spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t()
defdelegate async_stream(items, fun, opts), to: Task
end
end
end
end
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Dataloader.Mixfile do
],
dialyzer: [
plt_core_path: "priv/plts",
plt_add_apps: [:mix, :ecto, :ecto_sql]
plt_add_apps: [:mix, :ecto, :ecto_sql, :opentelemetry_process_propagator]
]
]
end
Expand Down Expand Up @@ -62,6 +62,7 @@ defmodule Dataloader.Mixfile do
[
{:telemetry, "~> 1.0 or ~> 0.4"},
{:ecto, ">= 3.4.3 and < 4.0.0", optional: true},
{:opentelemetry_process_propagator, "~> 0.2.1", optional: true},
{:ecto_sql, "~> 3.0", optional: true, only: :test},
{:postgrex, "~> 0.14", only: :test, runtime: false},
{:dialyxir, "~> 1.0.0", only: [:dev, :test], runtime: false},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
"makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"opentelemetry_api": {:hex, :opentelemetry_api, "1.2.0", "454a35655b4c1924405ef1f3587f2c6f141bf73366b2c5e8a38dcc619b53eaa0", [:mix, :rebar3], [], "hexpm", "9e677c68243de0f70538798072e66e1fb1d4a2ca8888a6eb493c0a41e5480c35"},
"opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.2.1", "20ac37648faf7175cade16fda8d58e6f1ff1b7f2a50a8ef9d70a032c41aba315", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "f317237e39636d4f6140afa5d419e85ed3dc9e9a57072e7cd442df42af7b8aac"},
"postgrex": {:hex, :postgrex, "0.15.10", "2809dee1b1d76f7cbabe570b2a9285c2e7b41be60cf792f5f2804a54b838a067", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "1560ca427542f6b213f8e281633ae1a3b31cdbcd84ebd7f50628765b8f6132be"},
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"},
}
16 changes: 16 additions & 0 deletions test/dataloader/kv_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,18 @@ defmodule Dataloader.KVTest do
assert not_found_users == [nil]
end

test "propagates the OTel context", %{loader: loader} do
OpenTelemetry.Ctx.set_value("stored_value", "some_value")

context_value =
loader
|> Dataloader.load(Test, :otel_context, "stored_value")
|> Dataloader.run()
|> Dataloader.get(Test, :otel_context, "stored_value")

assert context_value == "some_value"
end

defp query(batch_key, ids, test_pid) do
send(test_pid, :querying)

Expand All @@ -216,6 +228,10 @@ defmodule Dataloader.KVTest do
defp query(_batch_key, "something_that_errors"),
do: raise("Failed when fetching key 'something_that_errors'")

defp query(:otel_context, key) do
{key, OpenTelemetry.Ctx.get_value(key, nil)}
end

defp query(batch_key, id) do
item =
@data[batch_key]
Expand Down