Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libbeat/processors/cache: don't write cache states that have not been altered #36696

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686] {pull}36696[36696]

==== Deprecated

Expand Down
6 changes: 6 additions & 0 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@
var e CacheEntry
err = dec.Decode(&e)
if err != nil {
if err != io.EOF {

Check failure on line 193 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
switch err := err.(type) {

Check failure on line 194 in libbeat/processors/cache/file_store.go

View workflow job for this annotation

GitHub Actions / lint (windows)

type switch on error will fail on wrapped errors. Use errors.As to check for specific errors (errorlint)
case *json.SyntaxError:
c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset)
default:
Expand All @@ -202,6 +202,7 @@
}
if e.Expires.Before(time.Now()) {
// Don't retain expired elements.
c.dirty = true // The cache now does not reflect the file.
continue
}
c.cache[e.Key] = &e
Expand Down Expand Up @@ -239,6 +240,9 @@
// writeState writes the current cache state to the backing file.
// If final is true and the cache is empty, the file will be deleted.
func (c *fileStore) writeState(final bool) {
if !c.dirty {
return
}
if len(c.cache) == 0 && final {
err := os.Remove(c.path)
if err != nil {
Expand Down Expand Up @@ -291,4 +295,6 @@
return
}
}
// Only mark as not dirty if we succeeded in the write.
c.dirty = false
}
7 changes: 7 additions & 0 deletions libbeat/processors/cache/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -221,6 +222,7 @@
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -247,6 +249,7 @@
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -267,6 +270,7 @@
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -291,6 +295,7 @@
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -315,6 +320,7 @@
{Key: "three", Value: int(3), index: 1},
},
refs: 1,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -333,6 +339,7 @@
cache: nil, // assistively nil-ed.
expiries: nil, // assistively nil-ed.
refs: 0,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand Down Expand Up @@ -403,7 +410,7 @@
var e CacheEntry
err = dec.Decode(&e)
if err != nil {
if err != io.EOF {

Check failure on line 413 in libbeat/processors/cache/file_store_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
t.Fatalf("unexpected error reading persisted cache data: %v", err)
}
break
Expand Down
5 changes: 5 additions & 0 deletions libbeat/processors/cache/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type memStore struct {
expiries expiryHeap
ttl time.Duration // ttl is the time entries are valid for in the cache.
refs int // refs is the number of processors referring to this store.
// dirty marks the cache as changed from the
// state in a backing file if it exists.
dirty bool

// id is the index into global cache store for the cache.
id string
Expand Down Expand Up @@ -176,6 +179,7 @@ func (c *memStore) Put(key string, val any) error {
}
c.cache[key] = e
heap.Push(&c.expiries, e)
c.dirty = true
return nil
}

Expand Down Expand Up @@ -212,6 +216,7 @@ func (c *memStore) Delete(key string) error {
}
heap.Remove(&c.expiries, v.index)
delete(c.cache, key)
c.dirty = true
return nil
}

Expand Down
7 changes: 7 additions & 0 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ var memStoreTests = []struct {
id: "test",
cache: map[string]*CacheEntry{},
refs: 2,
dirty: false,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -209,6 +210,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -235,6 +237,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 2},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -255,6 +258,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -279,6 +283,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 2,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -303,6 +308,7 @@ var memStoreTests = []struct {
{Key: "three", Value: int(3), index: 1},
},
refs: 1,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand All @@ -321,6 +327,7 @@ var memStoreTests = []struct {
cache: nil, // assistively nil-ed.
expiries: nil, // assistively nil-ed.
refs: 0,
dirty: true,
ttl: time.Second,
cap: 1000,
effort: 10,
Expand Down
Loading