Skip to content

Commit

Permalink
libbeat/processors/cache: don't write cache states that have not been…
Browse files Browse the repository at this point in the history
… altered (#36696)

Add a dirty flag to memStore to be used by fileStore so that needless writes
are not performed.
  • Loading branch information
efd6 committed Sep 29, 2023
1 parent 9e31636 commit 4d74439
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686] {pull}36696[36696]

==== Deprecated

Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (c *fileStore) readState() {
}
if e.Expires.Before(time.Now()) {
// Don't retain expired elements.
c.dirty = true // The cache now does not reflect the file.
continue
}
c.cache[e.Key] = &e
Expand Down Expand Up @@ -239,6 +240,9 @@ func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) {
// writeState writes the current cache state to the backing file.
// If final is true and the cache is empty, the file will be deleted.
func (c *fileStore) writeState(final bool) {
if !c.dirty {
return
}
if len(c.cache) == 0 && final {
err := os.Remove(c.path)
if err != nil {
Expand Down Expand Up @@ -291,4 +295,6 @@ func (c *fileStore) writeState(final bool) {
return
}
}
// Only mark as not dirty if we succeeded in the write.
c.dirty = false
}
7 changes: 7 additions & 0 deletions libbeat/processors/cache/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ var fileStoreTests = []struct {
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -221,6 +222,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -247,6 +249,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -267,6 +270,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -291,6 +295,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -315,6 +320,7 @@ var fileStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 1,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -333,6 +339,7 @@ var fileStoreTests = []struct {
cache: nil, // assistively nil-ed.
expiries: nil, // assistively nil-ed.
refs: 0,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand Down
5 changes: 5 additions & 0 deletions libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type memStore struct {
expiries expiryHeap
ttl time.Duration // ttl is the time entries are valid for in the cache.
refs int // refs is the number of processors referring to this store.
// dirty marks the cache as changed from the
// state in a backing file if it exists.
dirty bool

// id is the index into global cache store for the cache.
id string
Expand Down Expand Up @@ -176,6 +179,7 @@ func (c *memStore) Put(key string, val any) error {
}
c.cache[key] = e
heap.Push(&c.expiries, e)
c.dirty = true
return nil
}

Expand Down Expand Up @@ -212,6 +216,7 @@ func (c *memStore) Delete(key string) error {
}
heap.Remove(&c.expiries, v.index)
delete(c.cache, key)
c.dirty = true
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var memStoreTests = []struct {
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -209,6 +210,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -235,6 +237,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -255,6 +258,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -279,6 +283,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -303,6 +308,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 1,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -321,6 +327,7 @@ var memStoreTests = []struct {
cache: nil, // assistively nil-ed.
expiries: nil, // assistively nil-ed.
refs: 0,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand Down

0 comments on commit 4d74439

Please sign in to comment.