Skip to content

Commit

Permalink
Create ES index templates instead of indices
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Jun 25, 2019
1 parent b18386f commit f78242e
Show file tree
Hide file tree
Showing 15 changed files with 248 additions and 138 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ generate-zipkin-swagger: idl-submodule

.PHONY: install-mockery
install-mockery:
go get -u github.com/vektra/mockery
go get -u github.com/vektra/mockery/.../

.PHONY: generate-mocks
generate-mocks: install-mockery
Expand Down
7 changes: 7 additions & 0 deletions pkg/es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
type Client interface {
IndexExists(index string) IndicesExistsService
CreateIndex(index string) IndicesCreateService
CreateTemplate(id string) TemplateCreateService
Index() IndexService
Search(indices ...string) SearchService
MultiSearch() MultiSearchService
Expand All @@ -42,6 +43,12 @@ type IndicesCreateService interface {
Do(ctx context.Context) (*elastic.IndicesCreateResult, error)
}

// TemplateCreateService is an abstraction for creating a mapping
type TemplateCreateService interface {
Body(mapping string) TemplateCreateService
Do(ctx context.Context) (*elastic.IndicesPutTemplateResponse, error)
}

// IndexService is an abstraction for elastic BulkService
type IndexService interface {
Index(index string) IndexService
Expand Down
21 changes: 19 additions & 2 deletions pkg/es/mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/es/mocks/IndexService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/es/mocks/IndicesCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/es/mocks/IndicesExistsService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/es/mocks/MultiSearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/es/mocks/SearchService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions pkg/es/mocks/TemplateCreateService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 25 additions & 0 deletions pkg/es/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (c ClientWrapper) CreateIndex(index string) es.IndicesCreateService {
return WrapESIndicesCreateService(c.client.CreateIndex(index))
}

// CreateTemplate calls this function to internal client.
func (c ClientWrapper) CreateTemplate(ttype string) es.TemplateCreateService {
return WrapESTemplateCreateService(c.client.IndexPutTemplate(ttype))
}

// Index calls this function to internal client.
func (c ClientWrapper) Index() es.IndexService {
r := elastic.NewBulkIndexRequest()
Expand Down Expand Up @@ -105,6 +110,26 @@ func (c IndicesCreateServiceWrapper) Do(ctx context.Context) (*elastic.IndicesCr
return c.indicesCreateService.Do(ctx)
}

// TemplateCreateServiceWrapper is a wrapper around elastic.IndicesPutTemplateService.
type TemplateCreateServiceWrapper struct {
mappingCreateService *elastic.IndicesPutTemplateService
}

// WrapESTemplateCreateService creates an TemplateCreateService out of *elastic.IndicesPutTemplateService.
func WrapESTemplateCreateService(mappingCreateService *elastic.IndicesPutTemplateService) TemplateCreateServiceWrapper {
return TemplateCreateServiceWrapper{mappingCreateService: mappingCreateService}
}

// Body calls this function to internal service.
func (c TemplateCreateServiceWrapper) Body(mapping string) es.TemplateCreateService {
return WrapESTemplateCreateService(c.mappingCreateService.BodyString(mapping))
}

// Do calls this function to internal service.
func (c TemplateCreateServiceWrapper) Do(ctx context.Context) (*elastic.IndicesPutTemplateResponse, error) {
return c.mappingCreateService.Do(ctx)
}

// ---

// IndexServiceWrapper is a wrapper around elastic.ESIndexService.
Expand Down
21 changes: 11 additions & 10 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,18 @@ func loadTagsFromFile(filePath string) ([]string, error) {

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
if !f.archiveConfig.IsEnabled() {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
cfg := f.Options.Get(archiveNamespace)
if !cfg.Enabled {
if !f.archiveConfig.IsEnabled() {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, cfg, true)
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
}

func createSpanReader(
Expand Down Expand Up @@ -179,7 +177,7 @@ func createSpanWriter(
}

spanMapping, serviceMapping := GetMappings(cfg.GetNumShards(), cfg.GetNumReplicas())
return esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
Expand All @@ -189,9 +187,12 @@ func createSpanWriter(
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
SpanMapping: spanMapping,
ServiceMapping: serviceMapping,
}), nil
})
err := writer.CreateTemplates(spanMapping, serviceMapping)
if err != nil {
return nil, err
}
return writer, nil
}

// GetMappings returns span and service mappings
Expand Down
26 changes: 22 additions & 4 deletions plugin/storage/es/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package es

import (
"context"
"errors"
"io/ioutil"
"os"
"strconv"
"strings"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics"
Expand All @@ -39,11 +41,17 @@ var _ storage.Factory = new(Factory)
type mockClientBuilder struct {
escfg.Configuration
err error
createTemplateError error
}

func (m *mockClientBuilder) NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error) {
if m.err == nil {
return &mocks.Client{}, nil
c := &mocks.Client{}
tService := &mocks.TemplateCreateService{}
tService.On("Body", mock.Anything).Return(tService)
tService.On("Do", context.Background()).Return(nil, m.createTemplateError)
c.On("CreateTemplate", mock.Anything).Return(tService)
return c, nil
}
return nil, m.err
}
Expand Down Expand Up @@ -149,9 +157,20 @@ func TestFactory_LoadMapping(t *testing.T) {
}
}

func TestCreateTemplateError(t *testing.T) {
f := NewFactory()
f.primaryConfig = &mockClientBuilder{createTemplateError: errors.New("template-error"), Configuration: escfg.Configuration{Enabled: true}}
f.archiveConfig = &mockClientBuilder{}
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
w, err := f.CreateSpanWriter()
assert.Nil(t, w)
assert.Error(t, err, "template-error")
}

func TestArchiveDisabled(t *testing.T) {
f := NewFactory()
f.Options.Get(archiveNamespace).Enabled = false
f.archiveConfig = &mockClientBuilder{Configuration: escfg.Configuration{Enabled: false}}
w, err := f.CreateArchiveSpanWriter()
assert.Nil(t, w)
assert.Nil(t, err)
Expand All @@ -163,10 +182,9 @@ func TestArchiveDisabled(t *testing.T) {
func TestArchiveEnabled(t *testing.T) {
f := NewFactory()
f.primaryConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{}
f.archiveConfig = &mockClientBuilder{Configuration: escfg.Configuration{Enabled: true}}
err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
f.Options.Get(archiveNamespace).Enabled = true
w, err := f.CreateArchiveSpanWriter()
require.NoError(t, err)
assert.NotNil(t, w)
Expand Down
Loading

0 comments on commit f78242e

Please sign in to comment.