Skip to content

Commit

Permalink
Support configurable date separator for Elasticsearch index names (#2637
Browse files Browse the repository at this point in the history
)

* Add elasticsearch index date format configuration
Signed-off-by: Chen Zhengwei <chenzhengwei@inspur.com>

Signed-off-by: chen zhengwei <chenzhengwei@inspur.com>

* Set default separator to '-', remove 'none' config
Signed-off-by: Chen Zhengwei <chenzhengwei@inspur.com>

Signed-off-by: chen zhengwei <chenzhengwei@inspur.com>

* Update options_test.go and separator usage.
Signed-off-by: Chen Zhengwei <chenzhengwei@inspur.com>

Signed-off-by: chen zhengwei <chenzhengwei@inspur.com>

* Update options_test.go
Signed-off-by: Chen Zhengwei <chenzhengwei@inspur.com>

Signed-off-by: chen zhengwei <chenzhengwei@inspur.com>

* Remove separator's default value description.
Signed-off-by: Chen Zhengwei <chenzhengwei@inspur.com>

Signed-off-by: chen zhengwei <chenzhengwei@inspur.com>
  • Loading branch information
sniperking1234 committed Dec 3, 2020
1 parent ca53ba6 commit 6c2be45
Show file tree
Hide file tree
Showing 23 changed files with 199 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestLoadConfigAndFlags(t *testing.T) {
require.NoError(t, err)

v, c := jConfig.Viperize(DefaultOptions().AddFlags, flags.AddConfigFileFlag)
err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--config-file=./testdata/jaeger-config.yaml"})
err = c.ParseFlags([]string{"--es.server-urls=bar", "--es.index-prefix=staging", "--es.index-date-separator=-", "--config-file=./testdata/jaeger-config.yaml"})
require.NoError(t, err)

err = flags.TryLoadConfigFile(v)
Expand All @@ -74,6 +74,7 @@ func TestLoadConfigAndFlags(t *testing.T) {
assert.Equal(t, []string{"someUrl"}, esCfg.Servers)
assert.Equal(t, true, esCfg.CreateIndexTemplates)
assert.Equal(t, "staging", esCfg.IndexPrefix)
assert.Equal(t, "2006-01-02", esCfg.IndexDateLayout)
assert.Equal(t, int64(100), esCfg.NumShards)
assert.Equal(t, "user", esCfg.Username)
assert.Equal(t, "pass", esCfg.Password)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
esHostPort = host + ":" + esPort
esURL = "http://" + esHostPort
indexPrefix = "integration-test"
indexDateLayout = "2006-01-02"
tagKeyDeDotChar = "@"
maxSpanAge = time.Hour * 72
numShards = 5
Expand Down Expand Up @@ -91,8 +92,9 @@ func (s *IntegrationTest) esCleanUp(allTagsAsFields bool) error {

func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
cfg := config.Configuration{
Servers: []string{esURL},
IndexPrefix: indexPrefix,
Servers: []string{esURL},
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
Tags: config.TagsAsFields{
AllAsFields: allTagsAsFields,
},
Expand All @@ -118,14 +120,15 @@ func (s *IntegrationTest) initSpanstore(allTagsAsFields bool) error {
}
reader := esspanreader.NewEsSpanReader(elasticsearchClient, s.logger, esspanreader.Config{
IndexPrefix: indexPrefix,
IndexDateLayout: indexDateLayout,
TagDotReplacement: tagKeyDeDotChar,
MaxSpanAge: maxSpanAge,
MaxDocCount: defaultMaxDocCount,
})
s.SpanReader = reader

depMapping := es.GetDependenciesMappings(numShards, numReplicas, esVersion)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, defaultMaxDocCount)
depStore := esdependencyreader.NewDependencyStore(elasticsearchClient, s.logger, indexPrefix, indexDateLayout, defaultMaxDocCount)
if err := depStore.CreateTemplates(depMapping); err != nil {
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ func newEsSpanWriter(params config.Configuration, logger *zap.Logger, archive bo
logger: logger,
nameTag: tag.Insert(storagemetrics.TagExporterName(), name),
client: client,
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, alias, archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, alias, archive),
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, params.IndexPrefix, params.IndexDateLayout, alias, archive),
translator: esmodeltranslator.NewTranslator(params.Tags.AllAsFields, tagsKeysAsFields, params.GetTagDotReplacement()),
isArchive: archive,
serviceCache: cache.NewLRUWithOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func TestWriteSpans(t *testing.T) {
w := esSpanWriter{
logger: zap.NewNop(),
client: esClient,
spanIndexName: esutil.NewIndexNameProvider("span", "", esutil.AliasNone, false),
serviceIndexName: esutil.NewIndexNameProvider("service", "", esutil.AliasNone, false),
spanIndexName: esutil.NewIndexNameProvider("span", "", "2006-01-02", esutil.AliasNone, false),
serviceIndexName: esutil.NewIndexNameProvider("service", "", "2006-01-02", esutil.AliasNone, false),
serviceCache: cache.NewLRU(1),
nameTag: tag.Insert(storagemetrics.TagExporterName(), "name"),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func (s *StorageFactory) CreateSpanReader() (spanstore.Reader, error) {
Archive: false,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
Expand All @@ -101,7 +102,7 @@ func (s *StorageFactory) CreateDependencyReader() (dependencystore.Reader, error
if err != nil {
return nil, err
}
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetMaxDocCount()), nil
return esdependencyreader.NewDependencyStore(client, s.logger, cfg.GetIndexPrefix(), cfg.GetIndexDateLayout(), cfg.GetMaxDocCount()), nil
}

// CreateArchiveSpanReader creates archive spanstore.Reader
Expand All @@ -115,6 +116,7 @@ func (s *StorageFactory) CreateArchiveSpanReader() (spanstore.Reader, error) {
Archive: true,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
MaxSpanAge: cfg.GetMaxSpanAge(),
MaxDocCount: cfg.GetMaxDocCount(),
TagDotReplacement: cfg.GetTagDotReplacement(),
Expand Down
14 changes: 7 additions & 7 deletions cmd/opentelemetry/app/internal/esutil/index_name.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package esutil

import "time"

const indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20

// Alias is used to configure the kind of index alias
type Alias string

Expand All @@ -33,11 +31,12 @@ const (
// IndexNameProvider creates standard index names from dates
type IndexNameProvider struct {
index string
dateLayout string
useSingleIndex bool
}

// NewIndexNameProvider constructs a new IndexNameProvider
func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) IndexNameProvider {
func NewIndexNameProvider(index, prefix, layout string, alias Alias, archive bool) IndexNameProvider {
if prefix != "" {
index = prefix + "-" + index
}
Expand All @@ -53,6 +52,7 @@ func NewIndexNameProvider(index, prefix string, alias Alias, archive bool) Index
}
return IndexNameProvider{
index: index,
dateLayout: layout,
useSingleIndex: archive || (alias != AliasNone),
}
}
Expand All @@ -63,12 +63,12 @@ func (n IndexNameProvider) IndexNameRange(start, end time.Time) []string {
return []string{n.index}
}
var indices []string
firstIndex := n.index + start.UTC().Format(indexDateFormat)
currentIndex := n.index + end.UTC().Format(indexDateFormat)
firstIndex := n.index + start.UTC().Format(n.dateLayout)
currentIndex := n.index + end.UTC().Format(n.dateLayout)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
end = end.Add(-24 * time.Hour)
currentIndex = n.index + end.UTC().Format(indexDateFormat)
currentIndex = n.index + end.UTC().Format(n.dateLayout)
}
indices = append(indices, firstIndex)
return indices
Expand All @@ -79,6 +79,6 @@ func (n IndexNameProvider) IndexName(date time.Time) string {
if n.useSingleIndex {
return n.index
}
spanDate := date.UTC().Format(indexDateFormat)
spanDate := date.UTC().Format(n.dateLayout)
return n.index + spanDate
}
20 changes: 10 additions & 10 deletions cmd/opentelemetry/app/internal/esutil/index_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,33 @@ func TestIndexNames(t *testing.T) {
}{
{
name: "index prefix",
nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false),
indices: []string{"production-myindex-0001-01-01"},
},
{
name: "multiple dates",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false),
indices: []string{"myindex-2020-08-30", "myindex-2020-08-29", "myindex-2020-08-28"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
},
{
name: "use aliases",
nameProvider: NewIndexNameProvider("myindex", "", AliasRead, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, false),
indices: []string{"myindex-read"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true),
indices: []string{"myindex-archive"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive alias",
nameProvider: NewIndexNameProvider("myindex", "", AliasRead, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasRead, true),
indices: []string{"myindex-archive-read"},
start: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
end: time.Date(2020, 8, 30, 0, 0, 0, 0, time.UTC),
Expand All @@ -81,30 +81,30 @@ func TestIndexName(t *testing.T) {
}{
{
name: "index prefix",
nameProvider: NewIndexNameProvider("myindex", "production", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "production", "2006-01-02", AliasNone, false),
index: "production-myindex-0001-01-01",
},
{
name: "no prefix",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, false),
index: "myindex-2020-08-28",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use aliases",
nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, false),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, false),
index: "myindex-write",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive",
nameProvider: NewIndexNameProvider("myindex", "", AliasNone, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasNone, true),
index: "myindex-archive",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
{
name: "use archive alias",
nameProvider: NewIndexNameProvider("myindex", "", AliasWrite, true),
nameProvider: NewIndexNameProvider("myindex", "", "2006-01-02", AliasWrite, true),
index: "myindex-archive-write",
date: time.Date(2020, 8, 28, 0, 0, 0, 0, time.UTC),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,31 +35,31 @@ const (
dependencyIndexBaseName = "jaeger-dependencies"

timestampField = "timestamp"

indexDateFormat = "2006-01-02" // date format for index e.g. 2020-01-20
)

// DependencyStore defines Elasticsearch dependency store.
type DependencyStore struct {
client esclient.ElasticsearchClient
logger *zap.Logger
indexPrefix string
maxDocCount int
client esclient.ElasticsearchClient
logger *zap.Logger
indexPrefix string
indexDateLayout string
maxDocCount int
}

var _ dependencystore.Reader = (*DependencyStore)(nil)
var _ dependencystore.Writer = (*DependencyStore)(nil)

// NewDependencyStore creates dependency store.
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix string, maxDocCount int) *DependencyStore {
func NewDependencyStore(client esclient.ElasticsearchClient, logger *zap.Logger, indexPrefix, indexDateLayout string, maxDocCount int) *DependencyStore {
if indexPrefix != "" {
indexPrefix += "-"
}
return &DependencyStore{
client: client,
logger: logger,
indexPrefix: indexPrefix + dependencyIndexBaseName + "-",
maxDocCount: maxDocCount,
client: client,
logger: logger,
indexPrefix: indexPrefix + dependencyIndexBaseName + "-",
indexDateLayout: indexDateLayout,
maxDocCount: maxDocCount,
}
}

Expand All @@ -78,14 +78,14 @@ func (r *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
if err != nil {
return err
}
return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, ts), dependencyType)
return r.client.Index(context.Background(), bytes.NewReader(data), indexWithDate(r.indexPrefix, r.indexDateLayout, ts), dependencyType)
}

// GetDependencies implements dependencystore.Reader
func (r *DependencyStore) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
searchBody := getSearchBody(endTs, lookback, r.maxDocCount)

indices := dailyIndices(r.indexPrefix, endTs, lookback)
indices := dailyIndices(r.indexPrefix, r.indexDateLayout, endTs, lookback)
response, err := r.client.Search(ctx, searchBody, r.maxDocCount, indices...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -114,18 +114,18 @@ func getSearchBody(endTs time.Time, lookback time.Duration, maxDocCount int) esc
}
}

func indexWithDate(indexNamePrefix string, date time.Time) string {
return indexNamePrefix + date.UTC().Format(indexDateFormat)
func indexWithDate(indexNamePrefix, indexDateLayout string, date time.Time) string {
return indexNamePrefix + date.UTC().Format(indexDateLayout)
}

func dailyIndices(prefix string, ts time.Time, lookback time.Duration) []string {
func dailyIndices(prefix, format string, ts time.Time, lookback time.Duration) []string {
var indices []string
firstIndex := indexWithDate(prefix, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, ts)
firstIndex := indexWithDate(prefix, format, ts.Add(-lookback))
currentIndex := indexWithDate(prefix, format, ts)
for currentIndex != firstIndex {
indices = append(indices, currentIndex)
ts = ts.Add(-24 * time.Hour)
currentIndex = indexWithDate(prefix, ts)
currentIndex = indexWithDate(prefix, format, ts)
}
return append(indices, firstIndex)
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const defaultMaxDocCount = 10_000

func TestCreateTemplates(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
template := "template"
err := store.CreateTemplates(template)
require.NoError(t, err)
Expand All @@ -48,7 +48,7 @@ func TestCreateTemplates(t *testing.T) {

func TestWriteDependencies(t *testing.T) {
client := &mockClient{}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
dependencies := []model.DependencyLink{{Parent: "foo", Child: "bar", CallCount: 1}}
tsNow := time.Now()
err := store.WriteDependencies(tsNow, dependencies)
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestGetDependencies(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.NoError(t, err)
assert.Equal(t, timeDependencies, dbmodel.TimeDependencies{
Expand All @@ -109,7 +109,7 @@ func TestGetDependencies_err_unmarshall(t *testing.T) {
},
},
}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
require.Contains(t, err.Error(), "invalid character")
Expand All @@ -121,7 +121,7 @@ func TestGetDependencies_err_client(t *testing.T) {
client := &mockClient{
searchErr: searchErr,
}
store := NewDependencyStore(client, zap.NewNop(), "foo", defaultMaxDocCount)
store := NewDependencyStore(client, zap.NewNop(), "foo", "2006-01-02", defaultMaxDocCount)
tsNow := time.Now()
dependencies, err := store.GetDependencies(context.Background(), tsNow, time.Hour)
require.Error(t, err)
Expand Down Expand Up @@ -151,11 +151,12 @@ func TestSearchBody(t *testing.T) {
}

func TestIndexWithDate(t *testing.T) {
assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC)))
assert.Equal(t, "foo-2020-09-30", indexWithDate("foo-", "2006-01-02",
time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC)))
}

func TestDailyIndices(t *testing.T) {
indices := dailyIndices("foo-", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour)
indices := dailyIndices("foo-", "2006-01-02", time.Date(2020, 9, 30, 0, 0, 0, 0, time.UTC), time.Hour)
assert.Equal(t, []string{"foo-2020-09-30", "foo-2020-09-29"}, indices)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
Archive bool
UseReadWriteAliases bool
IndexPrefix string
IndexDateLayout string
MaxSpanAge time.Duration
MaxDocCount int
TagDotReplacement string
Expand All @@ -89,8 +90,8 @@ func NewEsSpanReader(client esclient.ElasticsearchClient, logger *zap.Logger, co
maxSpanAge: config.MaxSpanAge,
maxDocCount: config.MaxDocCount,
converter: dbmodel.NewToDomain(config.TagDotReplacement),
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, alias, config.Archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, alias, config.Archive),
spanIndexName: esutil.NewIndexNameProvider(spanIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive),
serviceIndexName: esutil.NewIndexNameProvider(serviceIndexBaseName, config.IndexPrefix, config.IndexDateLayout, alias, config.Archive),
}
}

Expand Down
Loading

0 comments on commit 6c2be45

Please sign in to comment.