Skip to content

Commit

Permalink
[es] Add index rollover mode that can choose day and hour (#2965)
Browse files Browse the repository at this point in the history
* add index rollover for ES storage

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* update that separate rollover of spans and services for the es index

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* fix the GetOperation issue and change IndexDateLayoutDependencies to defaultIndexRolloverFrequency

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* update explain for the es.index-rollover-frequency

Signed-off-by: WalkerWang731 <wxy1990731@hotmail.com>

* make fmt

Signed-off-by: albertteoh <albert.teoh@logz.io>

* Fix minor formatting issues in table tests

Signed-off-by: Albert Teoh <albert.teoh@logz.io>

Co-authored-by: Albert <26584478+albertteoh@users.noreply.github.com>
Co-authored-by: albertteoh <albert.teoh@logz.io>
  • Loading branch information
3 people committed May 17, 2021
1 parent d43af83 commit 19cbe35
Show file tree
Hide file tree
Showing 8 changed files with 344 additions and 185 deletions.
96 changes: 65 additions & 31 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,33 +44,37 @@ import (

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayout string `mapstructure:"index_date_layout"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
AllowTokenFromContext bool `mapstructure:"-"`
Sniffer bool `mapstructure:"sniffer"` // https://github.com/olivere/elastic/wiki/Sniffing
SnifferTLSEnabled bool `mapstructure:"sniffer_tls_enabled"`
MaxDocCount int `mapstructure:"-"` // Defines maximum number of results to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age" mapstructure:"-"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards" mapstructure:"num_shards"`
NumReplicas int64 `yaml:"replicas" mapstructure:"num_replicas"`
Timeout time.Duration `validate:"min=500" mapstructure:"-"`
BulkSize int `mapstructure:"-"`
BulkWorkers int `mapstructure:"-"`
BulkActions int `mapstructure:"-"`
BulkFlushInterval time.Duration `mapstructure:"-"`
IndexPrefix string `mapstructure:"index_prefix"`
IndexDateLayoutSpans string `mapstructure:"-"`
IndexDateLayoutServices string `mapstructure:"-"`
IndexDateLayoutDependencies string `mapstructure:"-"`
IndexRolloverFrequencySpans string `mapstructure:"-"`
IndexRolloverFrequencyServices string `mapstructure:"-"`
Tags TagsAsFields `mapstructure:"tags_as_fields"`
Enabled bool `mapstructure:"-"`
TLS tlscfg.Options `mapstructure:"tls"`
UseReadWriteAliases bool `mapstructure:"use_aliases"`
CreateIndexTemplates bool `mapstructure:"create_mappings"`
UseILM bool `mapstructure:"use_ilm"`
Version uint `mapstructure:"version"`
LogLevel string `mapstructure:"log_level"`
}

// TagsAsFields holds configuration for tag schema.
Expand All @@ -96,7 +100,11 @@ type ClientBuilder interface {
GetMaxSpanAge() time.Duration
GetMaxDocCount() int
GetIndexPrefix() string
GetIndexDateLayout() string
GetIndexDateLayoutSpans() string
GetIndexDateLayoutServices() string
GetIndexDateLayoutDependencies() string
GetIndexRolloverFrequencySpansDuration() time.Duration
GetIndexRolloverFrequencyServicesDuration() time.Duration
GetTagsFilePath() string
GetAllTagsAsFields() bool
GetTagDotReplacement() string
Expand Down Expand Up @@ -281,9 +289,35 @@ func (c *Configuration) GetIndexPrefix() string {
return c.IndexPrefix
}

// GetIndexDateLayout returns index date layout
func (c *Configuration) GetIndexDateLayout() string {
return c.IndexDateLayout
// GetIndexDateLayoutSpans returns jaeger-span index date layout
func (c *Configuration) GetIndexDateLayoutSpans() string {
return c.IndexDateLayoutSpans
}

// GetIndexDateLayoutServices returns jaeger-service index date layout
func (c *Configuration) GetIndexDateLayoutServices() string {
return c.IndexDateLayoutServices
}

// GetIndexDateLayoutDependencies returns jaeger-dependencies index date layout
func (c *Configuration) GetIndexDateLayoutDependencies() string {
return c.IndexDateLayoutDependencies
}

// GetIndexRolloverFrequencySpansDuration returns jaeger-span index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencySpansDuration() time.Duration {
if c.IndexRolloverFrequencySpans == "hour" {
return -1 * time.Hour
}
return -24 * time.Hour
}

// GetIndexRolloverFrequencyServicesDuration returns jaeger-service index rollover frequency duration
func (c *Configuration) GetIndexRolloverFrequencyServicesDuration() time.Duration {
if c.IndexRolloverFrequencyServices == "hour" {
return -1 * time.Hour
}
return -24 * time.Hour
}

// GetTagsFilePath returns a path to file containing tag keys
Expand Down
48 changes: 26 additions & 22 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(f.primaryClient, f.logger, f.primaryConfig.GetIndexPrefix(),
f.primaryConfig.GetIndexDateLayout(), f.primaryConfig.GetMaxDocCount())
f.primaryConfig.GetIndexDateLayoutDependencies(), f.primaryConfig.GetMaxDocCount())
return reader, nil
}

Expand Down Expand Up @@ -141,17 +141,20 @@ func createSpanReader(
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.GetMaxDocCount(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.GetMaxDocCount(),
MaxSpanAge: cfg.GetMaxSpanAge(),
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
SpanIndexRolloverFrequency: cfg.GetIndexRolloverFrequencySpansDuration(),
ServiceIndexRolloverFrequency: cfg.GetIndexRolloverFrequencyServicesDuration(),
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
}), nil
}

Expand Down Expand Up @@ -186,16 +189,17 @@ func createSpanWriter(
return nil, err
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
IndexDateLayout: cfg.GetIndexDateLayout(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.GetIndexPrefix(),
SpanIndexDateLayout: cfg.GetIndexDateLayoutSpans(),
ServiceIndexDateLayout: cfg.GetIndexDateLayoutServices(),
AllTagsAsFields: cfg.GetAllTagsAsFields(),
TagKeysAsFields: tags,
TagDotReplacement: cfg.GetTagDotReplacement(),
Archive: archive,
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
})
if cfg.IsCreateIndexTemplates() {
err := writer.CreateTemplates(spanMapping, serviceMapping, cfg.GetIndexPrefix())
Expand Down
95 changes: 61 additions & 34 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package es

import (
"flag"
"fmt"
"strings"
"time"

Expand All @@ -29,42 +28,46 @@ import (
)

const (
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
suffixUsername = ".username"
suffixPassword = ".password"
suffixSniffer = ".sniffer"
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
suffixBulkSize = ".bulk.size"
suffixBulkWorkers = ".bulk.workers"
suffixBulkActions = ".bulk.actions"
suffixBulkFlushInterval = ".bulk.flush-interval"
suffixTimeout = ".timeout"
suffixIndexPrefix = ".index-prefix"
suffixIndexDateSeparator = ".index-date-separator"
suffixIndexRolloverFrequencySpans = ".index-rollover-frequency-spans"
suffixIndexRolloverFrequencyServices = ".index-rollover-frequency-services"
suffixTagsAsFields = ".tags-as-fields"
suffixTagsAsFieldsAll = suffixTagsAsFields + ".all"
suffixTagsAsFieldsInclude = suffixTagsAsFields + ".include"
suffixTagsFile = suffixTagsAsFields + ".config-file"
suffixTagDeDotChar = suffixTagsAsFields + ".dot-replacement"
suffixReadAlias = ".use-aliases"
suffixUseILM = ".use-ilm"
suffixCreateIndexTemplate = ".create-index-templates"
suffixEnabled = ".enabled"
suffixVersion = ".version"
suffixMaxDocCount = ".max-doc-count"
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"

defaultIndexRolloverFrequency = "day"
)

// TODO this should be moved next to config.Configuration struct (maybe ./flags package)
Expand Down Expand Up @@ -205,7 +208,17 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
flagSet.String(
nsConfig.namespace+suffixIndexDateSeparator,
defaultIndexDateSeparator,
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20 \".")
"Optional date separator of Jaeger indices. For example \".\" creates \"jaeger-span-2020.11.20\".")
flagSet.String(
nsConfig.namespace+suffixIndexRolloverFrequencySpans,
defaultIndexRolloverFrequency,
"Rotates jaeger-span indices over the given period. For example \"day\" creates \"jaeger-span-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+
"This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover")
flagSet.String(
nsConfig.namespace+suffixIndexRolloverFrequencyServices,
defaultIndexRolloverFrequency,
"Rotates jaeger-service indices over the given period. For example \"day\" creates \"jaeger-service-yyyy-MM-dd\" every day after UTC 12AM. Valid options: [hour, day]. "+
"This does not delete old indices. For details on complete index management solutions supported by Jaeger, refer to: https://www.jaegertracing.io/docs/deployment/#elasticsearch-rollover")
flagSet.Bool(
nsConfig.namespace+suffixTagsAsFieldsAll,
nsConfig.Tags.AllAsFields,
Expand Down Expand Up @@ -295,7 +308,6 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.BulkFlushInterval = v.GetDuration(cfg.namespace + suffixBulkFlushInterval)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.IndexPrefix = v.GetString(cfg.namespace + suffixIndexPrefix)
cfg.IndexDateLayout = initDateLayout(v.GetString(cfg.namespace + suffixIndexDateSeparator))
cfg.Tags.AllAsFields = v.GetBool(cfg.namespace + suffixTagsAsFieldsAll)
cfg.Tags.Include = v.GetString(cfg.namespace + suffixTagsAsFieldsInclude)
cfg.Tags.File = v.GetString(cfg.namespace + suffixTagsFile)
Expand All @@ -317,6 +329,16 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
if len(remoteReadClusters) > 0 {
cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",")
}

cfg.IndexRolloverFrequencySpans = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencySpans))
cfg.IndexRolloverFrequencyServices = strings.ToLower(v.GetString(cfg.namespace + suffixIndexRolloverFrequencyServices))

separator := v.GetString(cfg.namespace + suffixIndexDateSeparator)
cfg.IndexDateLayoutSpans = initDateLayout(cfg.IndexRolloverFrequencySpans, separator)
cfg.IndexDateLayoutServices = initDateLayout(cfg.IndexRolloverFrequencyServices, separator)

// Dependencies calculation should be daily, and this index size is very small
cfg.IndexDateLayoutDependencies = initDateLayout(defaultIndexRolloverFrequency, separator)
}

// GetPrimary returns primary configuration.
Expand All @@ -343,6 +365,11 @@ func stripWhiteSpace(str string) string {
return strings.Replace(str, " ", "", -1)
}

func initDateLayout(separator string) string {
return fmt.Sprintf("2006%s01%s02", separator, separator)
func initDateLayout(rolloverFreq, sep string) string {
// default to daily format
indexLayout := "2006" + sep + "01" + sep + "02"
if rolloverFreq == "hour" {
indexLayout = indexLayout + sep + "15"
}
return indexLayout
}
Loading

0 comments on commit 19cbe35

Please sign in to comment.