Skip to content

Commit

Permalink
Implement heartbeat check-ins
Browse files Browse the repository at this point in the history
Add an `Appsignal::CheckIn.heartbeat` helper that emits a single
heartbeat for the check-in identifier given.

When given `forever: true` as the second argument, it spawns a
separate thread that emits a heartbeat every thirty seconds. This
is a convenience method for the use case where the heartbeat is
only meant as a check that the process is alive.

Split functions that deal with different event kinds out of the
scheduler and test them independently.
  • Loading branch information
unflxw committed Sep 12, 2024
1 parent 1971bbe commit 2abbd5e
Show file tree
Hide file tree
Showing 8 changed files with 455 additions and 40 deletions.
26 changes: 26 additions & 0 deletions .changesets/add-heartbeat-check-ins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
---
bump: minor
type: add
---

Add support for heartbeat check-ins.

Use the `Appsignal::CheckIn.heartbeat` method to send a single heartbeat check-in event from your application. This can be used, for example, in your application's main loop:

```ruby
loop do
Appsignal::CheckIn.heartbeat("job_processor")
process_job
end
```

Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` method is called, at most one heartbeat with the same identifier will be sent every ten seconds.

Pass `forever: true` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, after your application has finished its boot process:

```ruby
def main
start_app
Appsignal::CheckIn.heartbeat("my_app", forever: true)
end
```
40 changes: 39 additions & 1 deletion lib/appsignal/check_in.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,18 @@

module Appsignal
module CheckIn
HEARTBEAT_FOREVER_INTERVAL_SECONDS = 30
class << self
@@forever_heartbeats = [] # rubocop:disable Style/ClassVars

# @api private
def kill_forever_heartbeats
@@forever_heartbeats.each(&:kill)
end

# Track cron check-ins.
#
# Track the execution of certain processes by sending a cron check-in.
# Track the execution of scheduled processes by sending a cron check-in.
#
# To track the duration of a piece of code, pass a block to {.cron}
# to report both when the process starts, and when it finishes.
Expand Down Expand Up @@ -40,6 +48,35 @@ def cron(identifier)
output
end

# Track heartbeat check-ins.
#
# Track the execution of long-lived processes by sending a heartbeat
# check-in.
#
# @example Send a heartbeat check-in
# Appsignal::CheckIn.heartbeat("main_loop")
#
# @param identifier [String] identifier of the heartbeat check-in to report.
# @yield the block to monitor.
# @return [void]
# @since 4.1.0
# @see https://docs.appsignal.com/check-ins/heartbeat
def heartbeat(identifier, forever: false)
if forever
@@forever_heartbeats << Thread.new do
loop do
heartbeat(identifier)
sleep HEARTBEAT_FOREVER_INTERVAL_SECONDS
end
end

return
end

event = Event.heartbeat(:identifier => identifier)
scheduler.schedule(event)
end

# @api private
def transmitter
@transmitter ||= Transmitter.new(
Expand All @@ -60,5 +97,6 @@ def stop
end
end

require "appsignal/check_in/event"
require "appsignal/check_in/scheduler"
require "appsignal/check_in/cron"
8 changes: 3 additions & 5 deletions lib/appsignal/check_in/cron.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ def finish
private

def event(kind)
{
Event.cron(
:identifier => @identifier,
:digest => @digest,
:kind => kind,
:timestamp => Time.now.utc.to_i,
:check_in_type => "cron"
}
:kind => kind
)
end
end
end
Expand Down
70 changes: 70 additions & 0 deletions lib/appsignal/check_in/event.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
module Appsignal
module CheckIn
# @api private
class Event
class << self
def new(check_in_type:, identifier:, digest: nil, kind: nil)
{
:identifier => identifier,
:digest => digest,
:kind => kind,
:timestamp => Time.now.utc.to_i,
:check_in_type => check_in_type
}.compact
end

def cron(identifier:, digest:, kind:)
new(
:check_in_type => "cron",
:identifier => identifier,
:digest => digest,
:kind => kind
)
end

def heartbeat(identifier:)
new(
:check_in_type => "heartbeat",
:identifier => identifier
)
end

def redundant?(event, other)
return false if
other[:check_in_type] != event[:check_in_type] ||
other[:identifier] != event[:identifier]

return false if event[:check_in_type] == "cron" && (
other[:digest] != event[:digest] ||
other[:kind] != event[:kind]
)

return false if
event[:check_in_type] != "cron" &&
event[:check_in_type] != "heartbeat"

true
end

def describe(events)
if events.empty?
# This shouldn't happen.
"no check-in events"
elsif events.length > 1
"#{events.length} check-in events"
else
event = events.first
if event[:check_in_type] == "cron"
"cron check-in `#{event[:identifier] || "unknown"}` " \
"#{event[:kind] || "unknown"} event (digest #{event[:digest] || "unknown"})"
elsif event[:check_in_type] == "heartbeat"
"heartbeat check-in `#{event[:identifier] || "unknown"}` event"
else
"unknown check-in event"
end
end
end
end
end
end
end
44 changes: 10 additions & 34 deletions lib/appsignal/check_in/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ def initialize
def schedule(event)
unless Appsignal.active?
Appsignal.internal_logger.debug(
"Cannot transmit #{describe([event])}: AppSignal is not active"
"Cannot transmit #{Event.describe([event])}: AppSignal is not active"
)
return
end

@mutex.synchronize do
if @queue.closed?
Appsignal.internal_logger.debug(
"Cannot transmit #{describe([event])}: AppSignal is stopped"
"Cannot transmit #{Event.describe([event])}: AppSignal is stopped"
)
return
end
Expand All @@ -48,7 +48,7 @@ def schedule(event)
start_waker(INITIAL_DEBOUNCE_SECONDS) if @waker.nil?

Appsignal.internal_logger.debug(
"Scheduling #{describe([event])} to be transmitted"
"Scheduling #{Event.describe([event])} to be transmitted"
)

# Make sure to start the thread after an event has been added.
Expand Down Expand Up @@ -92,7 +92,7 @@ def run
end

def transmit(events)
description = describe(events)
description = Event.describe(events)

begin
response = CheckIn.transmitter.transmit(events, :format => :ndjson)
Expand All @@ -110,42 +110,18 @@ def transmit(events)
end
end

def describe(events)
if events.empty?
# This shouldn't happen.
"no check-in events"
elsif events.length > 1
"#{events.length} check-in events"
else
event = events.first
if event[:check_in_type] == "cron"
"cron check-in `#{event[:identifier] || "unknown"}` " \
"#{event[:kind] || "unknown"} event (digest #{event[:digest] || "unknown"})" \
else
"unknown check-in event"
end
end
end

# Must be called from within a `@mutex.synchronize` block.
def add_event(event)
# Remove redundant events, keeping the newly added one, which
# should be the one with the most recent timestamp.
if event[:check_in_type] == "cron"
# Remove any existing cron check-in event with the same identifier,
# digest and kind as the one we're adding.
@events.reject! do |existing_event|
next unless existing_event[:identifier] == event[:identifier] &&
existing_event[:digest] == event[:digest] &&
existing_event[:kind] == event[:kind] &&
existing_event[:check_in_type] == "cron"
@events.reject! do |existing_event|
next unless Event.redundant?(event, existing_event)

Appsignal.internal_logger.debug(
"Replacing previously scheduled #{describe([existing_event])}"
)
Appsignal.internal_logger.debug(
"Replacing previously scheduled #{Event.describe([existing_event])}"
)

true
end
true
end

@events << event
Expand Down
Loading

0 comments on commit 2abbd5e

Please sign in to comment.