Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[es] Add remote read clusters option for cross-cluster querying #2874

Merged
10 changes: 10 additions & 0 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string `mapstructure:"server_urls"`
RemoteReadClusters []string `mapstructure:"remote_read_clusters"`
Username string `mapstructure:"username"`
Password string `mapstructure:"password" json:"-"`
TokenFilePath string `mapstructure:"token_file"`
Expand Down Expand Up @@ -89,6 +90,7 @@ type TagsAsFields struct {
// ClientBuilder creates new es.Client
type ClientBuilder interface {
NewClient(logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)
GetRemoteReadClusters() []string
GetNumShards() int64
GetNumReplicas() int64
GetMaxSpanAge() time.Duration
Expand Down Expand Up @@ -193,6 +195,9 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac

// ApplyDefaults copies settings from source unless its own value is non-zero.
func (c *Configuration) ApplyDefaults(source *Configuration) {
if len(c.RemoteReadClusters) == 0 {
c.RemoteReadClusters = source.RemoteReadClusters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this line ever be hit? I believe c.RemoteReadClusters defaults to []string{""} because strings.Split() should result in a non-empty slice overwriting the initial empty slice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @albertteoh - I didn't realize this nuance with strings.Split() but after looking at the docs, this makes sense. I didn't see a great (easy) way around this since the cli takes in strings, so I updated options.go to check the length and only split when necessary (otherwise default to []string{}). I added 2 tests cases around options as well to cover this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the test cases to cover the empty string. I think there's also the following case: --es.remote-read-clusters=cluster_one,,cluster_two, where the second item is empty but I think we can consider this a user error.

}
if c.Username == "" {
c.Username = source.Username
}
Expand Down Expand Up @@ -246,6 +251,11 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
}
}

// GetRemoteReadClusters returns list of remote read clusters
func (c *Configuration) GetRemoteReadClusters() []string {
return c.RemoteReadClusters
}

// GetNumShards returns number of shards from Configuration
func (c *Configuration) GetNumShards() int64 {
return c.NumShards
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func createSpanReader(
TagDotReplacement: cfg.GetTagDotReplacement(),
UseReadWriteAliases: cfg.GetUseReadWriteAliases(),
Archive: archive,
RemoteReadClusters: cfg.GetRemoteReadClusters(),
}), nil
}

Expand Down
17 changes: 15 additions & 2 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (
suffixSnifferTLSEnabled = ".sniffer-tls-enabled"
suffixTokenPath = ".token-file"
suffixServerURLs = ".server-urls"
suffixRemoteReadClusters = ".remote-read-clusters"
suffixMaxSpanAge = ".max-span-age"
suffixNumShards = ".num-shards"
suffixNumReplicas = ".num-replicas"
Expand All @@ -59,8 +60,9 @@ const (
suffixLogLevel = ".log-level"
// default number of documents to return from a query (elasticsearch allowed limit)
// see search.max_buckets and index.max_result_window
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultMaxDocCount = 10_000
defaultServerURL = "http://127.0.0.1:9200"
defaultRemoteReadClusters = ""
// default separator for Elasticsearch index date layout.
defaultIndexDateSeparator = "-"
)
Expand Down Expand Up @@ -102,6 +104,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
CreateIndexTemplates: true,
Version: 0,
Servers: []string{defaultServerURL},
RemoteReadClusters: []string{},
MaxDocCount: defaultMaxDocCount,
LogLevel: "error",
}
Expand Down Expand Up @@ -162,6 +165,11 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixServerURLs,
defaultServerURL,
"The comma-separated list of Elasticsearch servers, must be full url i.e. http://localhost:9200")
flagSet.String(
nsConfig.namespace+suffixRemoteReadClusters,
defaultRemoteReadClusters,
"Comma-separated list of Elasticsearch remote cluster names for cross-cluster querying."+
"See Elasticsearch remote clusters and cross-cluster query api.")
flagSet.Duration(
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
Expand Down Expand Up @@ -304,6 +312,11 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
cfg.TLS = cfg.getTLSFlagsConfig().InitFromViper(v)

remoteReadClusters := stripWhiteSpace(v.GetString(cfg.namespace + suffixRemoteReadClusters))
if len(remoteReadClusters) > 0 {
cfg.RemoteReadClusters = strings.Split(remoteReadClusters, ",")
}
}

// GetPrimary returns primary configuration.
Expand Down
16 changes: 16 additions & 0 deletions plugin/storage/es/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestOptions(t *testing.T) {
assert.Empty(t, primary.Username)
assert.Empty(t, primary.Password)
assert.NotEmpty(t, primary.Servers)
assert.Empty(t, primary.RemoteReadClusters)
assert.Equal(t, int64(5), primary.NumShards)
assert.Equal(t, int64(1), primary.NumReplicas)
assert.Equal(t, 72*time.Hour, primary.MaxSpanAge)
Expand Down Expand Up @@ -58,6 +59,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--es.num-replicas=10",
"--es.index-date-separator=",
// a couple overrides
"--es.remote-read-clusters=cluster_one,cluster_two",
"--es.aux.server-urls=3.3.3.3, 4.4.4.4",
"--es.aux.max-span-age=24h",
"--es.aux.num-replicas=10",
Expand All @@ -77,6 +79,7 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, "hello", primary.Username)
assert.Equal(t, "/foo/bar", primary.TokenFilePath)
assert.Equal(t, []string{"1.1.1.1", "2.2.2.2"}, primary.Servers)
assert.Equal(t, []string{"cluster_one", "cluster_two"}, primary.RemoteReadClusters)
assert.Equal(t, 48*time.Hour, primary.MaxSpanAge)
assert.True(t, primary.Sniffer)
assert.True(t, primary.SnifferTLSEnabled)
Expand All @@ -103,6 +106,19 @@ func TestOptionsWithFlags(t *testing.T) {
assert.True(t, primary.UseILM)
}

func TestEmptyRemoteReadClusters(t *testing.T) {
opts := NewOptions("es", "es.aux")
v, command := config.Viperize(opts.AddFlags)
err := command.ParseFlags([]string{
"--es.remote-read-clusters=",
})
require.NoError(t, err)
opts.InitFromViper(v)

primary := opts.GetPrimary()
assert.Equal(t, []string{}, primary.RemoteReadClusters)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the following would be more expressive/readable: assert.Empty(t, primary.RemoteReadClusters)

}

func TestMaxSpanAgeSetErrorInArchiveMode(t *testing.T) {
opts := NewOptions("es", archiveNamespace)
_, command := config.Viperize(opts.AddFlags)
Expand Down
37 changes: 29 additions & 8 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type SpanReaderParams struct {
TagDotReplacement string
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
}

// NewSpanReader returns a new SpanReader with a metrics.
Expand All @@ -129,7 +130,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex),
indexDateLayout: p.IndexDateLayout,
spanConverter: dbmodel.NewToDomain(p.TagDotReplacement),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases),
timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases, p.RemoteReadClusters),
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
}
Expand All @@ -139,24 +140,44 @@ type timeRangeIndexFn func(indexName string, indexDateLayout string, startTime t

type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource

func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn {
func getTimeRangeIndexFn(archive, useReadWriteAliases bool, remoteReadClusters []string) timeRangeIndexFn {
if archive {
var archiveSuffix string
if useReadWriteAliases {
archiveSuffix = archiveReadIndexSuffix
} else {
archiveSuffix = archiveIndexSuffix
}
return func(indexName, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{archiveIndex(indexName, archiveSuffix)}
}
return addRemoteReadClusters(func(indexPrefix, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{archiveIndex(indexPrefix, archiveSuffix)}
}, remoteReadClusters)
}
if useReadWriteAliases {
return func(indices string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{indices + "read"}
return addRemoteReadClusters(func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
return []string{indexPrefix + "read"}
}, remoteReadClusters)
}
return addRemoteReadClusters(timeRangeIndices, remoteReadClusters)
}

// Add a remote cluster prefix for each cluster and for each index and add it to the list of original indices.
// Elasticsearch cross cluster api example GET /twitter,cluster_one:twitter,cluster_two:twitter/_search.
func addRemoteReadClusters(fn timeRangeIndexFn, remoteReadClusters []string) timeRangeIndexFn {
return func(indexPrefix string, indexDateLayout string, startTime time.Time, endTime time.Time) []string {
jaegerIndices := fn(indexPrefix, indexDateLayout, startTime, endTime)
if len(remoteReadClusters) == 0 {
return jaegerIndices
}

for _, jaegerIndex := range jaegerIndices {
for _, remoteCluster := range remoteReadClusters {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the earlier comment, if remoteReadClusters defaults to []string{""}, the returned indices count would be doubled with the new indices looking like :<index-name>. Is that expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be avoided now with the changes to plugin/storage/es/options.go

remoteIndex := remoteCluster + ":" + jaegerIndex
jaegerIndices = append(jaegerIndices, remoteIndex)
}
}

return jaegerIndices
}
return timeRangeIndices
}

func getSourceFn(archive bool, maxDocCount int) sourceFn {
Expand Down
44 changes: 34 additions & 10 deletions plugin/storage/es/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,35 +149,59 @@ func TestSpanReaderIndices(t *testing.T) {
date := time.Date(2019, 10, 10, 5, 0, 0, 0, time.UTC)
dateFormat := date.UTC().Format("2006-01-02")
testCases := []struct {
index string
params SpanReaderParams
indices []string
params SpanReaderParams
}{
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false},
index: spanIndex + dateFormat},
indices: []string{spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", UseReadWriteAliases: true},
index: spanIndex + "read"},
indices: []string{spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: false},
index: "foo:" + indexPrefixSeparator + spanIndex + dateFormat},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", UseReadWriteAliases: true},
index: "foo:-" + spanIndex + "read"},
indices: []string{"foo:-" + spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true},
index: spanIndex + archiveIndexSuffix},
indices: []string{spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true},
index: "foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "foo:", Archive: true, UseReadWriteAliases: true},
index: "foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix},
indices: []string{"foo:" + indexPrefixSeparator + spanIndex + archiveReadIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
albertteoh marked this conversation as resolved.
Show resolved Hide resolved
indices: []string{
spanIndex + dateFormat,
"cluster_one:" + spanIndex + dateFormat,
"cluster_two:" + spanIndex + dateFormat}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + archiveIndexSuffix,
"cluster_one:" + spanIndex + archiveIndexSuffix,
"cluster_two:" + spanIndex + archiveIndexSuffix}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: false, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + "read",
"cluster_one:" + spanIndex + "read",
"cluster_two:" + spanIndex + "read"}},
{params: SpanReaderParams{Client: client, Logger: logger, MetricsFactory: metricsFactory,
IndexPrefix: "", Archive: true, UseReadWriteAliases: true, RemoteReadClusters: []string{"cluster_one", "cluster_two"}},
indices: []string{
spanIndex + archiveReadIndexSuffix,
"cluster_one:" + spanIndex + archiveReadIndexSuffix,
"cluster_two:" + spanIndex + archiveReadIndexSuffix}},
}
for _, testCase := range testCases {
r := NewSpanReader(testCase.params)
actual := r.timeRangeIndices(r.spanIndexPrefix, "2006-01-02", date, date)
assert.Equal(t, []string{testCase.index}, actual)
assert.Equal(t, testCase.indices, actual)
}
}

Expand Down