Skip to content

Commit

Permalink
Implement heartbeat check-ins
Browse files Browse the repository at this point in the history
Add an `Appsignal.CheckIn.heartbeat` helper that emits a single
heartbeat for the check-in identifier given.

When called with `continuous: true` as the second argument, it
starts and links a separate Elixir process that emits a heartbeat
every thirty seconds.

Unlike the equivalent functionality in the Ruby integration, which
spawns a thread that will stay alive for the lifetime of the Ruby
process, the Elixir process is linked to the process that spawned
it, meaning it will be shut down when its parent process is shut
down. This allows it to be used to track the lifetime of individual
Elixir processes.

Additionally, it is also possible to add `Appsignal.CheckIn.Heartbeat`
as a child process to a supervisor, meaning its lifetime will be tied
to that of the other processes supervised by it.

Finally, the functionality seen in the Ruby integration could also be
achieved by manually calling `Appsignal.CheckIn.Heartbeat.start/1`,
keeping the process unlinked and therefore alive for the entirety of
the Elixir node's lifetime, though this is unlikely to be useful in
the Elixir process model.
  • Loading branch information
unflxw committed Sep 13, 2024
1 parent ab6d548 commit f476dfa
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 83 deletions.
39 changes: 39 additions & 0 deletions .changesets/add-heartbeat-check-ins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
---
bump: minor
type: add
---

Add support for heartbeat check-ins.

Use the `Appsignal.CheckIn.heartbeat` method to send a single heartbeat check-in event from your application. This can be used, for example, in a `GenServer`'s callback:

```elixir
@impl true
def handle_cast({:process_job, job}, jobs) do
Appsignal.CheckIn.heartbeat("job_processor")
{:noreply, [job | jobs], {:continue, :process_job}}
end
```

Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` method is called, at most one heartbeat with the same identifier will be sent every ten seconds.

Pass `continuous: true` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, during a `GenServer`'s initialisation:

```elixir
@impl true
def init(_arg) do
Appsignal.CheckIn.heartbeat("my_genserver", continuous: true)
{:ok, nil}
end
```

You can also use `Appsignal.CheckIn.Heartbeat` as a supervisor's child process, in order for heartbeats to be sent continuously during the lifetime of the supervisor. This can be used, for example, during an `Application`'s start:

```elixir
@impl true
def start(_type, _args) do
Supervisor.start_link([
{Appsignal.CheckIn.Heartbeat, "my_application"}
], strategy: :one_for_one, name: MyApplication.Supervisor)
end
```
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ if Mix.env() in [:bench, :test, :test_no_nif] do
config :appsignal, appsignal_span: Appsignal.Test.Span
config :appsignal, appsignal_tracer: Appsignal.Test.Tracer
config :appsignal, appsignal_tracer_nif: Appsignal.Test.Nif

config :appsignal, deletion_delay: 100
config :appsignal, appsignal_checkin_heartbeat_interval_milliseconds: 10

config :appsignal, :config,
otp_app: :appsignal,
Expand Down
21 changes: 21 additions & 0 deletions lib/appsignal/check_in/check_in.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
defmodule Appsignal.CheckIn do
alias Appsignal.CheckIn.Cron
alias Appsignal.CheckIn.Event

@scheduler Application.compile_env(
:appsignal,
:appsignal_checkin_scheduler,
Appsignal.CheckIn.Scheduler
)

@spec cron(String.t()) :: :ok
def cron(identifier) do
Expand All @@ -16,4 +23,18 @@ defmodule Appsignal.CheckIn do

output
end

@spec heartbeat(String.t()) :: :ok
@spec heartbeat(String.t(), continuous: boolean) :: :ok
def heartbeat(identifier) do
@scheduler.schedule(Event.heartbeat(identifier))
:ok
end

def heartbeat(identifier, continuous: true) do
Appsignal.CheckIn.Heartbeat.start_link(identifier)
:ok
end

def heartbeat(identifier, _), do: heartbeat(identifier)
end
35 changes: 3 additions & 32 deletions lib/appsignal/check_in/cron.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Appsignal.CheckIn.Cron do
alias __MODULE__
alias Appsignal.CheckIn.Cron.Event
alias Appsignal.CheckIn.Event

@scheduler Application.compile_env(
:appsignal,
Expand All @@ -25,40 +25,11 @@ defmodule Appsignal.CheckIn.Cron do

@spec start(Cron.t()) :: :ok
def start(cron) do
@scheduler.schedule(Event.new(cron, :start))
@scheduler.schedule(Event.cron(cron, :start))
end

@spec finish(Cron.t()) :: :ok
def finish(cron) do
@scheduler.schedule(Event.new(cron, :finish))
end
end

defmodule Appsignal.CheckIn.Cron.Event do
alias __MODULE__
alias Appsignal.CheckIn.Cron

@derive Jason.Encoder

@type kind :: :start | :finish
@type t :: %Event{
identifier: String.t(),
digest: String.t(),
kind: kind,
timestamp: integer,
check_in_type: :cron
}

defstruct [:identifier, :digest, :kind, :timestamp, :check_in_type]

@spec new(Cron.t(), kind) :: t
def new(%Cron{identifier: identifier, digest: digest}, kind) do
%Event{
identifier: identifier,
digest: digest,
kind: kind,
timestamp: System.system_time(:second),
check_in_type: :cron
}
@scheduler.schedule(Event.cron(cron, :finish))
end
end
92 changes: 92 additions & 0 deletions lib/appsignal/check_in/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
defmodule Appsignal.CheckIn.Event do
alias __MODULE__
alias Appsignal.CheckIn.Cron

@type kind :: :start | :finish
@type check_in_type :: :cron | :heartbeat
@type t :: %Event{
identifier: String.t(),
digest: String.t() | nil,
kind: kind | nil,
timestamp: integer,
check_in_type: check_in_type
}

defstruct [:identifier, :digest, :kind, :timestamp, :check_in_type]

@spec cron(Cron.t(), kind) :: t
def cron(%Cron{identifier: identifier, digest: digest}, kind) do
%Event{
identifier: identifier,
digest: digest,
kind: kind,
timestamp: System.system_time(:second),
check_in_type: :cron
}
end

@spec heartbeat(String.t()) :: t
def heartbeat(identifier) do
%Event{
identifier: identifier,
timestamp: System.system_time(:second),
check_in_type: :heartbeat
}
end

@spec describe([t]) :: String.t()
def describe([]) do
# This shouldn't happen.
"no check-in events"
end

def describe([%Event{check_in_type: :cron} = event]) do
"cron check-in `#{event.identifier || "unknown"}` " <>
"#{event.kind || "unknown"} event (digest #{event.digest || "unknown"})"
end

def describe([%Event{check_in_type: :heartbeat} = event]) do
"heartbeat check-in `#{event.identifier || "unknown"}` event"
end

def describe([_event]) do
# This shouldn't happen.
"unknown check-in event"
end

def describe(events) do
"#{Enum.count(events)} check-in events"
end

@spec redundant?(t, t) :: boolean
def redundant?(
%Event{check_in_type: :cron} = event,
%Event{check_in_type: :cron} = new_event
) do
# Consider any existing cron check-in event redundant if it has the
# same identifier, digest and kind as the one we're adding.
event.identifier == new_event.identifier &&
event.kind == new_event.kind &&
event.digest == new_event.digest
end

def redundant?(
%Event{check_in_type: :heartbeat} = event,
%Event{check_in_type: :heartbeat} = new_event
) do
# Consider any existing heartbeat check-in event redundant if it has
# the same identifier as the one we're adding.
event.identifier == new_event.identifier
end

def redundant?(_event, _new_event), do: false
end

defimpl Jason.Encoder, for: Appsignal.CheckIn.Event do
def encode(%Appsignal.CheckIn.Event{} = event, opts) do
event
|> Map.from_struct()
|> Map.reject(fn {k, v} -> is_nil(v) end)

Check warning on line 89 in lib/appsignal/check_in/event.ex

View workflow job for this annotation

GitHub Actions / test (1.11.x, 24.x)

variable "k" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 89 in lib/appsignal/check_in/event.ex

View workflow job for this annotation

GitHub Actions / test (1.12.x, 24.x)

variable "k" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 89 in lib/appsignal/check_in/event.ex

View workflow job for this annotation

GitHub Actions / test (1.12.x, 24.x)

Map.reject/2 is undefined or private

Check warning on line 89 in lib/appsignal/check_in/event.ex

View workflow job for this annotation

GitHub Actions / test (1.13.x, 24.x)

variable "k" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 89 in lib/appsignal/check_in/event.ex

View workflow job for this annotation

GitHub Actions / test (1.15.x, 26.x)

variable "k" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 89 in lib/appsignal/check_in/event.ex

View workflow job for this annotation

GitHub Actions / test (1.14.x, 25.x)

variable "k" is unused (if the variable is not meant to be used, prefix it with an underscore)
|> Jason.Encode.map(opts)
end
end
39 changes: 39 additions & 0 deletions lib/appsignal/check_in/heartbeat.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Appsignal.CheckIn.Heartbeat do
use GenServer, shutdown: :brutal_kill

@interval_milliseconds Application.compile_env(
:appsignal,
:appsignal_checkin_heartbeat_interval_milliseconds,
30_000
)

@impl true
def init(identifier) do
{:ok, identifier, {:continue, :heartbeat}}
end

def start(identifier) do
GenServer.start(__MODULE__, identifier)
end

def start_link(identifier) do
GenServer.start_link(__MODULE__, identifier)
end

def heartbeat(identifier) do
GenServer.cast(__MODULE__, {:heartbeat, identifier})
:ok
end

@impl true
def handle_continue(:heartbeat, identifier) do
Appsignal.CheckIn.heartbeat(identifier)
Process.send_after(self(), :heartbeat, @interval_milliseconds)
{:noreply, identifier}
end

@impl true
def handle_info(:heartbeat, identifier) do
{:noreply, identifier, {:continue, :heartbeat}}
end
end
59 changes: 16 additions & 43 deletions lib/appsignal/check_in/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ end
defmodule Appsignal.CheckIn.Scheduler do
use GenServer

alias Appsignal.CheckIn.Cron
alias Appsignal.CheckIn.Event

@debounce Application.compile_env(
:appsignal,
Expand Down Expand Up @@ -68,17 +68,15 @@ defmodule Appsignal.CheckIn.Scheduler do
if Appsignal.Config.active?() do
GenServer.cast(__MODULE__, {:schedule, event})
else
@integration_logger.debug(
"AppSignal not active, not scheduling #{describe_events([event])}"
)
@integration_logger.debug("AppSignal not active, not scheduling #{Event.describe([event])}")
end

:ok
end

@impl true
def handle_cast({:schedule, event}, state) do
@integration_logger.trace("Scheduling #{describe_events([event])} to be transmitted")
@integration_logger.trace("Scheduling #{Event.describe([event])} to be transmitted")

schedule_transmission(state)

Expand All @@ -95,7 +93,7 @@ defmodule Appsignal.CheckIn.Scheduler do

@impl true
def handle_continue({:transmit, events}, state) do
description = describe_events(events)
description = Event.describe(events)

config = Appsignal.Config.config()
endpoint = "#{config[:logging_endpoint]}/check_ins/json"
Expand Down Expand Up @@ -150,42 +148,17 @@ defmodule Appsignal.CheckIn.Scheduler do
defp add_event(events, event) do
# Remove redundant events, keeping the newly added one, which
# should be the one with the most recent timestamp.
[event | Enum.reject(events, &redundant_event?(&1, event))]
end

defp redundant_event?(%Cron.Event{} = event, %Cron.Event{} = new_event) do
# Consider any existing cron check-in event redundant if it has the
# same identifier, digest and kind as the one we're adding.
is_redundant =
event.identifier == new_event.identifier &&
event.kind == new_event.kind &&
event.digest == new_event.digest

if is_redundant do
@integration_logger.debug("Replacing previously scheduled #{describe_events([event])}")
end

is_redundant
end

defp redundant_event?(_event, _new_event), do: false

defp describe_events([]) do
# This shouldn't happen.
"no check-in events"
end

defp describe_events(events) when length(events) > 1 do
"#{Enum.count(events)} check-in events"
end

defp describe_events([%Cron.Event{} = event]) do
"cron check-in `#{event.identifier || "unknown"}` " <>
"#{event.kind || "unknown"} event (digest #{event.digest || "unknown"})"
end

defp describe_events([_event]) do
# This shouldn't happen.
"unknown check-in event"
[
event
| Enum.reject(events, fn existing_event ->
is_redundant = Event.redundant?(existing_event, event)

if is_redundant do
@integration_logger.debug("Replacing previously scheduled #{Event.describe([event])}")
end

is_redundant
end)
]
end
end
Loading

0 comments on commit f476dfa

Please sign in to comment.