Skip to content

Commit

Permalink
Deduplicate Restore(Muted)Objects common codes
Browse files Browse the repository at this point in the history
  • Loading branch information
julianbrost committed Jun 25, 2024
1 parent a544cc0 commit edaa61c
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 58 deletions.
37 changes: 0 additions & 37 deletions internal/object/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/pkg/errors"
"regexp"
"sort"
"strings"
Expand Down Expand Up @@ -67,42 +66,6 @@ func ClearCache() {
cache = make(map[string]*Object)
}

// RestoreObjects restores all objects and their (extra)tags matching the given IDs from the database.
// Returns error on any database failures and panics when trying to cache an object that's already in the cache store.
func RestoreObjects(ctx context.Context, db *database.DB, ids []types.Binary) error {
objects := map[string]*Object{}
err := utils.ForEachRow[Object](ctx, db, "id", ids, func(o *Object) {
o.db = db
o.Tags = map[string]string{}
o.ExtraTags = map[string]string{}

objects[o.ID.String()] = o
})
if err != nil {
return errors.Wrap(err, "cannot restore objects")
}

// Restore object ID tags matching the given object ids
err = utils.ForEachRow[IdTagRow](ctx, db, "object_id", ids, func(ir *IdTagRow) {
objects[ir.ObjectId.String()].Tags[ir.Tag] = ir.Value
})
if err != nil {
return errors.Wrap(err, "cannot restore objects ID tags")
}

// Restore object extra tags matching the given object ids
err = utils.ForEachRow[ExtraTagRow](ctx, db, "object_id", ids, func(et *ExtraTagRow) {
objects[et.ObjectId.String()].ExtraTags[et.Tag] = et.Value
})
if err != nil {
return errors.Wrap(err, "cannot restore objects extra tags")
}

addObjectsToCache(objects)

return nil
}

// FromEvent creates an object from the provided event tags if it's not in the cache
// and syncs all object related types with the database.
// Returns error on any database failure
Expand Down
57 changes: 36 additions & 21 deletions internal/object/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"sync"
Expand All @@ -30,14 +31,34 @@ func DeleteFromCache(id types.Binary) {
//
// Returns an error on any database failure and panics when trying to cache an object that's already in the cache store.
func RestoreMutedObjects(ctx context.Context, db *database.DB) error {
query := db.BuildSelectStmt(new(Object), new(Object)) + " WHERE mute_reason IS NOT NULL " +
"AND NOT EXISTS((SELECT 1 FROM incident WHERE object_id = object.id AND recovered_at IS NULL))"
return restoreObjectsFromQuery(ctx, db, query)
}

// RestoreObjects restores all objects and their (extra)tags matching the given IDs from the database.
// Returns error on any database failures and panics when trying to cache an object that's already in the cache store.
func RestoreObjects(ctx context.Context, db *database.DB, ids []types.Binary) error {
var obj *Object
query, args, err := sqlx.In(db.BuildSelectStmt(obj, obj)+" WHERE id IN (?)", ids)
if err != nil {
return errors.Wrapf(err, "cannot build placeholders for %q", query)
}

return restoreObjectsFromQuery(ctx, db, query, args...)
}

// restoreObjectsFromQuery takes a query that returns rows of the object table, executes it and loads the returned
// objects into the local cache.
//
// Returns an error on any database failure and panics when trying to cache an object that's already in the cache store.
func restoreObjectsFromQuery(ctx context.Context, db *database.DB, query string, args ...any) error {
objects := make(chan *Object)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(objects)

clause := `WHERE mute_reason IS NOT NULL AND NOT EXISTS((SELECT 1 FROM incident WHERE object_id = object.id AND recovered_at IS NULL))`
query := fmt.Sprintf("%s %s", db.BuildSelectStmt(new(Object), new(Object)), clause)
err := utils.ExecAndApply[Object](ctx, db, query, nil, func(o *Object) {
err := utils.ExecAndApply[Object](ctx, db, query, args, func(o *Object) {
o.db = db
o.Tags = map[string]string{}
o.ExtraTags = map[string]string{}
Expand All @@ -64,8 +85,8 @@ func RestoreMutedObjects(ctx context.Context, db *database.DB) error {
}

g.Go(func() error {
var ids []types.Binary
objectsMap := map[string]*Object{}
ids := make([]types.Binary, 0, len(bulk))
objectsMap := make(map[string]*Object, len(bulk))
for _, obj := range bulk {
objectsMap[obj.ID.String()] = obj
ids = append(ids, obj.ID)
Expand All @@ -87,7 +108,16 @@ func RestoreMutedObjects(ctx context.Context, db *database.DB) error {
return errors.Wrap(err, "cannot restore objects extra tags")
}

addObjectsToCache(objectsMap)
cacheMu.Lock()
defer cacheMu.Unlock()

for _, o := range objectsMap {
if obj, ok := cache[o.ID.String()]; ok {
panic(fmt.Sprintf("Object %q is already in the cache", obj.DisplayName()))
}

cache[o.ID.String()] = o
}

return nil
})
Expand All @@ -97,18 +127,3 @@ func RestoreMutedObjects(ctx context.Context, db *database.DB) error {

return g.Wait()
}

// addObjectsToCache adds the objects from the given map to the global object cache store.
// Panics when trying to cache an object that's already in the cache store.
func addObjectsToCache(objects map[string]*Object) {
cacheMu.Lock()
defer cacheMu.Unlock()

for _, o := range objects {
if obj, ok := cache[o.ID.String()]; ok {
panic(fmt.Sprintf("Object %q is already in the cache", obj.DisplayName()))
}

cache[o.ID.String()] = o
}
}

0 comments on commit edaa61c

Please sign in to comment.