Skip to content

Commit

Permalink
Passthrough OAuth bearer token supplied to Query service through to E…
Browse files Browse the repository at this point in the history
…S storage

Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
  • Loading branch information
rubenvp8510 committed Jun 26, 2019
1 parent 5b52726 commit 53aafca
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 55 deletions.
14 changes: 10 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
)

const (
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryTokenPropagation = "query.bearer-token-propagation"
)

// QueryOptions holds configuration for query service
Expand All @@ -39,6 +40,8 @@ type QueryOptions struct {
StaticAssets string
// UIConfig is the path to a configuration file for the UI
UIConfig string
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
}

// AddFlags adds flags for QueryOptions
Expand All @@ -47,6 +50,8 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format")
flagSet.Bool(queryTokenPropagation, true, "Allow propagation of bearer token to be used by storage plugins")

}

// InitFromViper initializes QueryOptions with properties from viper
Expand All @@ -55,5 +60,6 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper) *QueryOptions {
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
return qOpts
}
11 changes: 7 additions & 4 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"

"github.com/gorilla/handlers"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -80,11 +80,14 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,

apiHandler.RegisterRoutes(r)
RegisterStaticHandler(r, logger, queryOpts)
compressHandler := handlers.CompressHandler(r)
var handler http.Handler = r
if queryOpts.BearerTokenPropagation {
handler = bearerTokenPropagationHandler(logger, r)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

return &http.Server{
Handler: recoveryHandler(compressHandler),
Handler: recoveryHandler(handler),
}
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestServer(t *testing.T) {
querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}

server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP}, tracer)
server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP,
BearerTokenPropagation: true}, tracer)
assert.NoError(t, server.Start())

// TODO wait for servers to come up and test http and grpc endpoints
Expand Down
84 changes: 84 additions & 0 deletions cmd/query/app/token_propagation_hander_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)


func Test_bearTokenPropagationHandler(t *testing.T) {
logger := zap.NewNop()
bearerToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI"

validTokenHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, ok := spanstore.GetBearerToken(ctx)
assert.Equal(t, token, bearerToken)
assert.True(t, ok)
stop.Done()
})
}

emptyHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, _ := spanstore.GetBearerToken(ctx)
assert.Empty(t, token, bearerToken)
stop.Done()
})
}

testCases := []struct {
name string
sendHeader bool
header string
handler func(stop *sync.WaitGroup) http.HandlerFunc
}{
{ name:"Bearer token", sendHeader: true, header: "Bearer " + bearerToken, handler:validTokenHandler},
{ name:"Invalid header",sendHeader: true, header: bearerToken, handler:emptyHandler},
{ name:"No header", sendHeader: false, handler:emptyHandler},
{ name:"Basic Auth", sendHeader: true, header: "Basic " + bearerToken, handler:emptyHandler},
{ name:"X-Forwarded-Access-Token", sendHeader: true, header: "Bearer " + bearerToken, handler:validTokenHandler},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
stop := sync.WaitGroup{}
stop.Add(1)
r := bearerTokenPropagationHandler(logger, testCase.handler(&stop))
server := httptest.NewServer(r)
defer server.Close()
req , err := http.NewRequest("GET", server.URL, nil)
assert.Nil(t,err)
if testCase.sendHeader {
req.Header.Add("Authorization", testCase.header)
}
_, err = httpClient.Do(req)
assert.Nil(t, err)
stop.Wait()
})
}

}
51 changes: 51 additions & 0 deletions cmd/query/app/token_propagation_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"net/http"
"strings"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)

func bearerTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authHeaderValue := r.Header.Get("Authorization")
// If no Authorization header is present, try with X-Forwarded-Access-Token
if authHeaderValue == "" {
authHeaderValue = r.Header.Get("X-Forwarded-Access-Token")
}
if authHeaderValue != "" {
headerValue := strings.Split(authHeaderValue, " ")
token := ""
if len(headerValue) == 2 {
// Make sure we only capture bearer token , not other types like Basic auth.
if headerValue[0] == "Bearer" {
token = headerValue[1]
}
} else {
logger.Warn("Invalid authorization header, skipping bearer token propagation")
}
h.ServeHTTP(w, r.WithContext(spanstore.ContextWithBearerToken(ctx, token)))
} else {
h.ServeHTTP(w, r.WithContext(ctx))
}
})

}
6 changes: 4 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
istorage "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

Expand Down Expand Up @@ -78,7 +79,9 @@ func main() {
}
defer closer.Close()
opentracing.SetGlobalTracer(tracer)

queryOpts := new(app.QueryOptions).InitFromViper(v)
// TODO: Need to figure out set enable/disable propagation on storage plugins.
v.Set(spanstore.StoragePropagationKey, queryOpts.BearerTokenPropagation)
storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
Expand All @@ -98,7 +101,6 @@ func main() {
dependencyReader,
*queryServiceOptions)

queryOpts := new(app.QueryOptions).InitFromViper(v)
server := app.NewServer(svc, queryService, queryOpts, tracer)

if err := server.Start(); err != nil {
Expand Down
85 changes: 55 additions & 30 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,34 @@ import (

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
TokenFilePath string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
Servers []string
Username string
Password string
TokenFilePath string
AllowTokenFromContext bool
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
Expand Down Expand Up @@ -90,7 +92,7 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
options, err := c.getConfigOptions()
options, err := c.getConfigOptions(logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,8 +249,13 @@ func (c *Configuration) IsEnabled() bool {
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) {

options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
// Disable health check when token from context is allowed, this is because at this time
// we don' have a valid token to do the check ad if we don't disable the check the service that
// uses this won't start.
elastic.SetHealthcheck(!c.AllowTokenFromContext)}
httpClient := &http.Client{
Timeout: c.Timeout,
}
Expand All @@ -271,14 +278,24 @@ func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
}
httpTransport.TLSClientConfig = &tls.Config{RootCAs: ca}
}

token := ""
if c.TokenFilePath != "" {
token, err := loadToken(c.TokenFilePath)
if c.AllowTokenFromContext {
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
}
tokenFromFile, err := loadToken(c.TokenFilePath)
if err != nil {
return nil, err
}
token = tokenFromFile
}

if token != "" || c.AllowTokenFromContext {
httpClient.Transport = &tokenAuthTransport{
token: token,
wrapped: httpTransport,
token: token,
allowOverrideFromCtx: c.AllowTokenFromContext,
wrapped: httpTransport,
}
} else {
httpClient.Transport = httpTransport
Expand Down Expand Up @@ -329,12 +346,20 @@ func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {

// TokenAuthTransport
type tokenAuthTransport struct {
token string
wrapped *http.Transport
token string
allowOverrideFromCtx bool
wrapped *http.Transport
}

func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) {
r.Header.Set("Authorization", "Bearer "+tr.token)
token := tr.token
if tr.allowOverrideFromCtx {
headerToken, _ := spanstore.GetBearerToken(r.Context())
if headerToken != "" {
token = headerToken
}
}
r.Header.Set("Authorization", "Bearer "+token)
return tr.wrapped.RoundTrip(r)
}

Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand Down Expand Up @@ -255,6 +256,8 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
}

// GetPrimary returns primary configuration.
Expand Down
Loading

0 comments on commit 53aafca

Please sign in to comment.