From 5965013bb944871f3ca8cd87b8aeffb4e6eb5d09 Mon Sep 17 00:00:00 2001 From: Saverio Date: Wed, 8 Feb 2023 12:29:47 +0100 Subject: [PATCH 1/4] Porpagate OpenTelemetry context in tasks --- guides/telemetry.md | 26 ++++++++++++++++++++++++++ lib/dataloader.ex | 20 ++++++++++++++++++-- lib/dataloader/ecto.ex | 12 +++++++++++- mix.exs | 1 + mix.lock | 2 ++ test/dataloader/kv_test.exs | 16 ++++++++++++++++ 6 files changed, 74 insertions(+), 3 deletions(-) diff --git a/guides/telemetry.md b/guides/telemetry.md index 8a502cd..a1981ed 100644 --- a/guides/telemetry.md +++ b/guides/telemetry.md @@ -50,3 +50,29 @@ After a query is executed, you'll see something like: } } ``` + +## Opentelemetry + +When using Opentelemetry, one usually wants to correlate spans that are created +in spawned tasks with the main trace. For example, you might have a trace started +in a Phoenix endpoint, and then have spans around database access. + +One can correlate manually by attaching the OTel context the task function: + +```elixir +ctx = OpenTelemetry.Ctx.get_current() + +Task.async(fn -> + OpenTelemetry.Ctx.attach(ctx) + + # do stuff that might create spans +end) +``` + +When using Dataloader, the tasks are spawned by the loader itself, so you can't +attach the context manually. + +Instead, you can add the `:opentelemetry_process_propagator` package to your +dependencies, which has suitable wrappers that will attach the context +automatically. If the package is installed, Dataloader will use it in place +of the default `Task.async/1` and `Task.async_stream/3`. diff --git a/lib/dataloader.ex b/lib/dataloader.ex index bdb15b8..f4dd03d 100644 --- a/lib/dataloader.ex +++ b/lib/dataloader.ex @@ -304,7 +304,7 @@ defmodule Dataloader do # if the current process is linked to something, and then that something # dies in the middle of us loading stuff. task = - Task.async(fn -> + async(fn -> # The purpose of `:trap_exit` here is so that we can ensure that any failures # within the tasks do not kill the current process. We want to get results # back no matter what. @@ -359,7 +359,7 @@ defmodule Dataloader do results = if Keyword.get(opts, :async?, true) do items - |> Task.async_stream(fun, task_opts) + |> async_stream(fun, task_opts) |> Enum.map(fn {:ok, result} -> {:ok, result} {:exit, reason} -> {:error, reason} @@ -390,4 +390,20 @@ defmodule Dataloader do def pmap(items, fun, opts \\ []) do async_safely(__MODULE__, :run_tasks, [items, fun, opts]) end + + # Optionally use `async/1` and `async_stream/3` functions from + # `opentelemetry_process_propagator` if available + if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do + @spec async((() -> any)) :: Task.t() + defdelegate async(fun), to: OpentelemetryProcessPropagator.Task + + @spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t() + defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task + else + @spec async((() -> any)) :: Task.t() + defdelegate async(fun), to: Task + + @spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t() + defdelegate async_stream(items, fun, opts), to: Task + end end diff --git a/lib/dataloader/ecto.ex b/lib/dataloader/ecto.ex index 9ec6e57..1454b03 100644 --- a/lib/dataloader/ecto.ex +++ b/lib/dataloader/ecto.ex @@ -638,7 +638,7 @@ if Code.ensure_loaded?(Ecto) do end defp maybe_async_stream(batches, fun, options, true) do - Task.async_stream(batches, fun, options) + async_stream(batches, fun, options) end defp maybe_async_stream(batches, fun, _options, _) do @@ -964,6 +964,16 @@ if Code.ensure_loaded?(Ecto) do other end end + + # Optionally use `async_stream/3` function from + # `opentelemetry_process_propagator` if available + if Code.ensure_loaded?(OpentelemetryProcessPropagator.Task) do + @spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t() + defdelegate async_stream(items, fun, opts), to: OpentelemetryProcessPropagator.Task + else + @spec async_stream(Enumerable.t(), (term -> term), keyword) :: Enumerable.t() + defdelegate async_stream(items, fun, opts), to: Task + end end end end diff --git a/mix.exs b/mix.exs index ffe96c6..26a5727 100644 --- a/mix.exs +++ b/mix.exs @@ -62,6 +62,7 @@ defmodule Dataloader.Mixfile do [ {:telemetry, "~> 1.0 or ~> 0.4"}, {:ecto, ">= 3.4.3 and < 4.0.0", optional: true}, + {:opentelemetry_process_propagator, "~> 0.2.1", optional: true}, {:ecto_sql, "~> 3.0", optional: true, only: :test}, {:postgrex, "~> 0.14", only: :test, runtime: false}, {:dialyxir, "~> 1.0.0", only: [:dev, :test], runtime: false}, diff --git a/mix.lock b/mix.lock index 652ae53..8024000 100644 --- a/mix.lock +++ b/mix.lock @@ -12,6 +12,8 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.15.1", "b5888c880d17d1cc3e598f05cdb5b5a91b7b17ac4eaf5f297cb697663a1094dd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "db68c173234b07ab2a07f645a5acdc117b9f99d69ebf521821d89690ae6c6ec8"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, "nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.2.0", "454a35655b4c1924405ef1f3587f2c6f141bf73366b2c5e8a38dcc619b53eaa0", [:mix, :rebar3], [], "hexpm", "9e677c68243de0f70538798072e66e1fb1d4a2ca8888a6eb493c0a41e5480c35"}, + "opentelemetry_process_propagator": {:hex, :opentelemetry_process_propagator, "0.2.1", "20ac37648faf7175cade16fda8d58e6f1ff1b7f2a50a8ef9d70a032c41aba315", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "f317237e39636d4f6140afa5d419e85ed3dc9e9a57072e7cd442df42af7b8aac"}, "postgrex": {:hex, :postgrex, "0.15.10", "2809dee1b1d76f7cbabe570b2a9285c2e7b41be60cf792f5f2804a54b838a067", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm", "1560ca427542f6b213f8e281633ae1a3b31cdbcd84ebd7f50628765b8f6132be"}, "telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"}, } diff --git a/test/dataloader/kv_test.exs b/test/dataloader/kv_test.exs index 858d21a..3c8581e 100644 --- a/test/dataloader/kv_test.exs +++ b/test/dataloader/kv_test.exs @@ -205,6 +205,18 @@ defmodule Dataloader.KVTest do assert not_found_users == [nil] end + test "propagates the OTel context", %{loader: loader} do + OpenTelemetry.Ctx.set_value("stored_value", "some_value") + + context_value = + loader + |> Dataloader.load(Test, :otel_context, "stored_value") + |> Dataloader.run() + |> Dataloader.get(Test, :otel_context, "stored_value") + + assert context_value == "some_value" + end + defp query(batch_key, ids, test_pid) do send(test_pid, :querying) @@ -216,6 +228,10 @@ defmodule Dataloader.KVTest do defp query(_batch_key, "something_that_errors"), do: raise("Failed when fetching key 'something_that_errors'") + defp query(:otel_context, key) do + {key, OpenTelemetry.Ctx.get_value(key, nil)} + end + defp query(batch_key, id) do item = @data[batch_key] From 218464f99fd70e7e01289d61111478b4c5d5f16b Mon Sep 17 00:00:00 2001 From: Saverio Date: Thu, 9 Feb 2023 14:01:34 +0100 Subject: [PATCH 2/4] Run dialyzer taking ito account all dependencies --- mix.exs | 1 + 1 file changed, 1 insertion(+) diff --git a/mix.exs b/mix.exs index 26a5727..098be78 100644 --- a/mix.exs +++ b/mix.exs @@ -20,6 +20,7 @@ defmodule Dataloader.Mixfile do ], dialyzer: [ plt_core_path: "priv/plts", + plt_add_deps: :transitive, plt_add_apps: [:mix, :ecto, :ecto_sql] ] ] From d37df3ff10b80c7524a4cbb0cd772893565117f9 Mon Sep 17 00:00:00 2001 From: Saverio Date: Thu, 9 Feb 2023 14:56:27 +0100 Subject: [PATCH 3/4] Run dialyzer taking ito account all dependencies --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 098be78..8b225a1 100644 --- a/mix.exs +++ b/mix.exs @@ -20,7 +20,7 @@ defmodule Dataloader.Mixfile do ], dialyzer: [ plt_core_path: "priv/plts", - plt_add_deps: :transitive, + plt_add_deps: true, plt_add_apps: [:mix, :ecto, :ecto_sql] ] ] From 2ef689bd99e95ae3453b34061615e4832280e418 Mon Sep 17 00:00:00 2001 From: Saverio Date: Fri, 10 Feb 2023 16:03:46 +0100 Subject: [PATCH 4/4] Explicitly add optional dep to generated PLTs --- mix.exs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mix.exs b/mix.exs index 8b225a1..7114c0e 100644 --- a/mix.exs +++ b/mix.exs @@ -20,8 +20,7 @@ defmodule Dataloader.Mixfile do ], dialyzer: [ plt_core_path: "priv/plts", - plt_add_deps: true, - plt_add_apps: [:mix, :ecto, :ecto_sql] + plt_add_apps: [:mix, :ecto, :ecto_sql, :opentelemetry_process_propagator] ] ] end