Skip to content

Commit

Permalink
Add event caching to address #121
Browse files Browse the repository at this point in the history
  • Loading branch information
0x2142 committed Sep 18, 2024
1 parent f4dba9e commit 1fb7864
Show file tree
Hide file tree
Showing 9 changed files with 212 additions and 32 deletions.
1 change: 1 addition & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## [v0.3.5](https://github.com/0x2142/frigate-notify/releases/tag/v0.3.5) - Upcoming Release
- Fixed issue where built-in alert templates were not being included in binary releases
- Fixed issue where a notification may not be sent if previous event update from Frigate did not contain a snapshot
- Added `from` & `ignoressl` config items to `smtp` notifier

## [v0.3.4](https://github.com/0x2142/frigate-notify/releases/tag/v0.3.4) - Aug 15 2024
Expand Down
88 changes: 88 additions & 0 deletions events/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package frigate

import (
"slices"
"strings"
"time"

"github.com/0x2142/frigate-notify/models"
"github.com/rs/zerolog/log"

"github.com/maypok86/otter"
)

var zoneCache otter.Cache[string, []string]

func InitZoneCache() {
var err error
log.Debug().Msg("Setting up zone cache...")
zoneCache, err = otter.MustBuilder[string, []string](500).WithTTL(1 * time.Hour).Build()
if err != nil {
log.Warn().
Err(err).
Msg("Error setting up zone cache")
}
log.Debug().Msg("Zone cache ready")
}

func CloseZoneCache() {
log.Debug().Msg("Cache tear down")
zoneCache.Close()
}

// Add zone to list of zones that have already generated notifications for specified event ID
func setZoneAlerted(event models.Event) {
// Get current list of zones by event ID, if it exists
alreadyAlerted, _ := zoneCache.Get(event.ID)
alreadyAlerted = append(alreadyAlerted, event.CurrentZones...)
// Remove duplicates
slices.Sort(alreadyAlerted)
alreadyAlerted = slices.Compact(alreadyAlerted)
// Update cache with new list
zoneCache.Set(event.ID, alreadyAlerted)
}

// Query cache to see if zone already generated alert
func zoneAlreadyAlerted(event models.Event) bool {
// Check if event already in cache & if so, get contents
alreadyAlerted, ok := zoneCache.Get(event.ID)
// If event not found, create cache entry & add zones
if !ok {
log.Debug().
Str("event_id", event.ID).
Str("camera", event.Camera).
Str("zones", strings.Join(event.CurrentZones, ",")).
Msg("Event not in cache, adding...")
setZoneAlerted(event)
return false
}
// If event found, check to see if there are any new zones to notify on
for _, zone := range event.CurrentZones {
if !slices.Contains(alreadyAlerted, zone) {
log.Debug().
Str("event_id", event.ID).
Str("camera", event.Camera).
Str("zones", strings.Join(event.CurrentZones, ",")).
Msg("Found new zone not in cache")
setZoneAlerted(event)
return false
}
}
// If no new zones, then assume all have been notified already
log.Debug().
Str("event_id", event.ID).
Str("camera", event.Camera).
Str("zones", strings.Join(event.CurrentZones, ",")).
Msg("All zones in event have already notified")
return true
}

// Remove zone alert cache for event ID
func delZoneAlerted(event models.Event) {
zoneCache.Delete(event.ID)
log.Debug().
Str("event_id", event.ID).
Str("camera", event.Camera).
Str("zones", strings.Join(event.CurrentZones, ",")).
Msg("Event removed from cache")
}
75 changes: 75 additions & 0 deletions events/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package frigate

import (
"testing"

"github.com/0x2142/frigate-notify/models"
)

func TestSetZoneAlerted(t *testing.T) {
// Setup
InitZoneCache()
defer CloseZoneCache()
event := models.Event{ID: "test-event-id", CurrentZones: []string{"test_zone", "test_zone"}}

setZoneAlerted(event)

expected := []string{"test_zone"}
result, ok := zoneCache.Get(event.ID)
if !ok {
t.Error("Could not find event ID")
}

// Check if zone added
if result[0] != expected[0] {
t.Errorf("Expected: %s, Got: %s", expected, result)
}

// Check if duplicates removed
if len(result) != 1 {
t.Errorf("Expected: %s, Got: %s", expected, result)
}
}

func TestZoneAlreadyAlerted(t *testing.T) {
// Setup
InitZoneCache()
defer CloseZoneCache()
event := models.Event{ID: "test-event-id", CurrentZones: []string{"test_zone", "test_zone"}}

// Test new event
result := zoneAlreadyAlerted(event)
if result != false {
t.Errorf("Expected: false, Got: %v", result)
}

// Test adding new zone to existing event
event.CurrentZones = append(event.CurrentZones, "another_zone")
result = zoneAlreadyAlerted(event)
if result != false {
t.Errorf("Expected: false, Got: %v", result)
}

// Test event that has already generated alert
result = zoneAlreadyAlerted(event)
if result != true {
t.Errorf("Expected: true, Got: %v", result)
}
}

func TestDelZoneAlerted(t *testing.T) {
// Setup
InitZoneCache()
defer CloseZoneCache()
event := models.Event{ID: "test-event-id", CurrentZones: []string{"test_zone", "test_zone"}}

// Create new cache entry
setZoneAlerted(event)

// Test delete
delZoneAlerted(event)
_, ok := zoneCache.Get(event.ID)
if ok {
t.Errorf("Cache entry not deleted")
}
}
16 changes: 16 additions & 0 deletions events/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,22 @@ import (

// checkEventFilters processes incoming event through configured filters to determine if it should generate a notification
func checkEventFilters(event models.Event) bool {
// Drop event if no snapshot or clip is available - Event is likely being filtered on Frigate side.
// For example, if a camera has `required_zones` set - then there may not be any clip or snap until
// object moves into required zone
if !event.HasClip && !event.HasSnapshot {
log.Info().
Str("event_id", event.ID).
Msg("Event dropped - No snapshot or clip available")
return false
}
// Drop event if no snapshot & skip_nosnap is true
if !event.HasSnapshot && strings.ToLower(config.ConfigData.Alerts.General.NoSnap) == "drop" {
log.Info().
Str("event_id", event.ID).
Msg("Event dropped - No snapshot available")
return false
}
// Check quiet hours
if isQuietHours() {
log.Info().
Expand Down
33 changes: 18 additions & 15 deletions events/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,28 +92,22 @@ func processEvent(client mqtt.Client, msg mqtt.Message) {
return
}

// Skip update events where zone didn't change
// Compares current detected zone to previous list of zones entered
zoneChanged := false
for _, zone := range event.After.CurrentZones {
if !slices.Contains(event.Before.EnteredZones, zone) {
zoneChanged = true
log.Debug().
Str("event_id", event.After.ID).
Str("camera", event.After.Camera).
Str("label", event.After.Label).
Str("zones", strings.Join(event.After.CurrentZones, ",")).
Msg("Object entered new zone")
}
}
if event.Type == "update" && !zoneChanged {
// Check if already notified on zones
if zoneAlreadyAlerted(event.After.Event) {
log.Info().
Str("event_id", event.After.ID).
Str("camera", event.After.Camera).
Str("label", event.After.Label).
Str("zones", strings.Join(event.After.CurrentZones, ",")).
Msg("Event dropped - Already notified on this zone")
return
} else {
log.Debug().
Str("event_id", event.After.ID).
Str("camera", event.After.Camera).
Str("label", event.After.Label).
Str("zones", strings.Join(event.After.CurrentZones, ",")).
Msg("Object entered new zone")
}

// If snapshot was collected, pull down image to send with alert
Expand All @@ -127,6 +121,15 @@ func processEvent(client mqtt.Client, msg mqtt.Message) {
// Send alert with snapshot
notifier.SendAlert(event.After.Event, snapshotURL, snapshot, event.After.ID)
}

// Clear event cache entry when event ends
if event.Type == "end" {
log.Debug().
Str("event_id", event.After.ID).
Msg("Event ended")
delZoneAlerted(event.After.Event)
return
}
}

// connectionLostHandler logs error message on MQTT connection loss
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ require (
require (
github.com/disgoorg/json v1.1.0 // indirect
github.com/disgoorg/snowflake/v2 v2.0.1 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/maypok86/otter v1.2.2 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ github.com/disgoorg/json v1.1.0 h1:7xigHvomlVA9PQw9bMGO02PHGJJPqvX5AnwlYg/Tnys=
github.com/disgoorg/json v1.1.0/go.mod h1:BHDwdde0rpQFDVsRLKhma6Y7fTbQKub/zdGO5O9NqqA=
github.com/disgoorg/snowflake/v2 v2.0.1 h1:CuUxGLwggUxEswZOmZ+mZ5i0xSumQdXW9tXW7uGqe+0=
github.com/disgoorg/snowflake/v2 v2.0.1/go.mod h1:SPU9c2CNn5DSyb86QcKtdZgix9osEtKrHLW4rMhfLCs=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0=
github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 h1:wG8n/XJQ07TmjbITcGiUaOtXxdrINDz1b0J1w0SzqDc=
github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1/go.mod h1:A2S0CWkNylc2phvKXWBBdD3K0iGnDBGbzRpISP2zBl8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand All @@ -33,6 +37,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/maypok86/otter v1.2.2 h1:jJi0y8ruR/ZcKmJ4FbQj3QQTqKwV+LNrSOo2S1zbF5M=
github.com/maypok86/otter v1.2.2/go.mod h1:mKLfoI7v1HOmQMwFgX4QkRk23mX6ge3RDvjdHOWG4R4=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pelletier/go-toml/v2 v2.1.1 h1:LWAJwfNvjQZCFIDKWYQaM62NcYeYViCmWIwmOStowAI=
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/0x2142/frigate-notify/util"
)

var APP_VER = "v0.3.4"
var APP_VER = "v0.3.5-dev"
var debug, debugenv bool
var jsonlog, jsonlogenv bool
var nocolor, nocolorenv bool
Expand Down Expand Up @@ -88,6 +88,10 @@ func main() {
}
// Connect MQTT
if config.ConfigData.Frigate.MQTT.Enabled {
// Set up event cache
frigate.InitZoneCache()
defer frigate.CloseZoneCache()

log.Debug().Msg("Connecting to MQTT Server...")
frigate.SubscribeMQTT()
log.Info().Msg("App running. Press Ctrl-C to quit.")
Expand Down
16 changes: 0 additions & 16 deletions notifier/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,6 @@ var TemplateFiles embed.FS
func SendAlert(event models.Event, snapshotURL string, snapshot io.Reader, eventid string) {
// Add Frigate Major version metadata
event.Extra.FrigateMajorVersion = config.ConfigData.Frigate.Version
// Drop event if no snapshot or clip is available - Event is likely being filtered on Frigate side.
// For example, if a camera has `required_zones` set - then there may not be any clip or snap until
// object moves into required zone
if !event.HasClip && !event.HasSnapshot {
log.Info().
Str("event_id", event.ID).
Msg("Event dropped - No snapshot or clip available")
return
}
// Drop event if no snapshot & skip_nosnap is true
if !event.HasSnapshot && strings.ToLower(config.ConfigData.Alerts.General.NoSnap) == "drop" {
log.Info().
Str("event_id", event.ID).
Msg("Event dropped - No snapshot available")
return
}
// Create copy of snapshot for each alerting method
var snap []byte
if snapshot != nil {
Expand Down

0 comments on commit 1fb7864

Please sign in to comment.