Skip to content

Commit

Permalink
Fix template/policy is always overwritten
Browse files Browse the repository at this point in the history
The force flag for policy and template overwrites have been set to true
in the standard Elasticsearch callback. Due to this every reconnect will
result in the policy and template being overwritten.

This fix sets the flags to false and ensure that the template is only
overwritten if a new policy is created in Elasticsearch.
  • Loading branch information
urso committed Apr 5, 2019
1 parent ebdf66d commit 0d971ec
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 16 deletions.
2 changes: 1 addition & 1 deletion libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ func (b *Beat) registerESIndexManagement() error {
func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback {
return func(esClient *elasticsearch.Client) error {
m := b.index.Manager(esClient, idxmgmt.BeatsAssets(b.Fields))
return m.Setup(true, true)
return m.Setup(false, false)
}
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/idxmgmt/idxmgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type ESClient interface {
// Manager is used to initialize indices, ILM policies, and aliases within the
// Elastic Stack.
type Manager interface {
Setup(template, policy bool) error
Setup(forceTemplate, forcePolicy bool) error
}

// DefaultSupport initializes the default index management support used by most Beats.
Expand Down
8 changes: 7 additions & 1 deletion libbeat/idxmgmt/ilm/ilm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,14 @@ type Supporter interface {
// Manager uses an APIHandler to install a policy.
type Manager interface {
Enabled() (bool, error)

EnsureAlias() error
EnsurePolicy(overwrite bool) error

// EnsurePolicy installs a policy if it does not exit yet. The policy is always
// written if overwrite is set.
// The created flag is set to true only if a new policy is created. `created`
// is false if an existing policy gets overwritten.
EnsurePolicy(overwrite bool) (created bool, err error)
}

// APIHandler defines the interface between a remote service and the Manager.
Expand Down
6 changes: 5 additions & 1 deletion libbeat/idxmgmt/ilm/ilm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,11 @@ func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) {
calls []onCall
overwrite bool
cfg map[string]interface{}
create bool
fail error
}{
"create new policy": {
create: true,
calls: []onCall{
onHasILMPolicy(testPolicy.Name).Return(false, nil),
onCreateILMPolicy(testPolicy).Return(nil),
Expand Down Expand Up @@ -258,14 +260,16 @@ func TestDefaultSupport_Manager_EnsurePolicy(t *testing.T) {

h := newMockHandler(test.calls...)
m := createManager(t, h, test.cfg)
err := m.EnsurePolicy(test.overwrite)
created, err := m.EnsurePolicy(test.overwrite)

if test.fail == nil {
assert.Equal(t, test.create, created)
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Equal(t, test.fail, ErrReason(err))
}

h.AssertExpectations(t)
})
}
Expand Down
6 changes: 3 additions & 3 deletions libbeat/idxmgmt/ilm/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ func (*noopSupport) Alias() Alias { return Alias{} }
func (*noopSupport) Policy() Policy { return Policy{} }
func (*noopSupport) Manager(_ APIHandler) Manager { return (*noopManager)(nil) }

func (*noopManager) Enabled() (bool, error) { return false, nil }
func (*noopManager) EnsureAlias() error { return errOf(ErrOpNotAvailable) }
func (*noopManager) EnsurePolicy(_ bool) error { return errOf(ErrOpNotAvailable) }
func (*noopManager) Enabled() (bool, error) { return false, nil }
func (*noopManager) EnsureAlias() error { return errOf(ErrOpNotAvailable) }
func (*noopManager) EnsurePolicy(_ bool) (bool, error) { return false, errOf(ErrOpNotAvailable) }
8 changes: 4 additions & 4 deletions libbeat/idxmgmt/ilm/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,26 +114,26 @@ func (m *singlePolicyManager) EnsureAlias() error {
return m.client.CreateAlias(m.alias)
}

func (m *singlePolicyManager) EnsurePolicy(overwrite bool) error {
func (m *singlePolicyManager) EnsurePolicy(overwrite bool) (bool, error) {
log := m.log
overwrite = overwrite || m.overwrite

exists := true
if m.checkExists && !overwrite {
b, err := m.client.HasILMPolicy(m.policy.Name)
if err != nil {
return err
return false, err
}
exists = b
}

if !exists || overwrite {
return m.client.CreateILMPolicy(m.policy)
return !exists, m.client.CreateILMPolicy(m.policy)
}

log.Infof("do not generate ilm policy: exists=%v, overwrite=%v",
exists, overwrite)
return nil
return false, nil
}

func (c *infoCache) Valid() bool {
Expand Down
4 changes: 2 additions & 2 deletions libbeat/idxmgmt/mockilm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ func (m *mockILMSupport) EnsureAlias() error {
}

func onEnsurePolicy() onCall { return makeOnCall("EnsurePolicy") }
func (m *mockILMSupport) EnsurePolicy(overwrite bool) error {
func (m *mockILMSupport) EnsurePolicy(overwrite bool) (bool, error) {
args := m.Called()
return args.Error(0)
return args.Bool(0), args.Error(1)
}

func makeOnCall(name string, args ...interface{}) onCall {
Expand Down
12 changes: 9 additions & 3 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ func (s *indexSupport) BuildSelector(cfg *common.Config) (outputs.IndexSelector,
}, nil
}

func (m *indexManager) Setup(template, policy bool) error {
return m.load(template, policy)
func (m *indexManager) Setup(forceTemplate, forcePolicy bool) error {
return m.load(forceTemplate, forcePolicy)
}

func (m *indexManager) Load() error {
Expand Down Expand Up @@ -231,10 +231,16 @@ func (m *indexManager) load(forceTemplate, forcePolicy bool) error {

// install ilm policy
if withILM {
if err := m.ilm.EnsurePolicy(forcePolicy); err != nil {
policyCreated, err := m.ilm.EnsurePolicy(forcePolicy)
if err != nil {
return err
}
log.Info("ILM policy successfully loaded.")

// The template should be updated if a new policy is created.
if policyCreated {
forceTemplate = true
}
}

// create and install template
Expand Down

0 comments on commit 0d971ec

Please sign in to comment.