Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Passthrough OAuth bearer token supplied to Query service through to ES storage #1599

Merged
merged 1 commit into from
Jun 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ import (
)

const (
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryPort = "query.port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
queryTokenPropagation = "query.bearer-token-propagation"
)

// QueryOptions holds configuration for query service
Expand All @@ -39,6 +40,8 @@ type QueryOptions struct {
StaticAssets string
// UIConfig is the path to a configuration file for the UI
UIConfig string
// BearerTokenPropagation activate/deactivate bearer token propagation to storage
BearerTokenPropagation bool
}

// AddFlags adds flags for QueryOptions
Expand All @@ -47,6 +50,8 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
flagSet.String(queryUIConfig, "", "The path to the UI configuration file in JSON format")
flagSet.Bool(queryTokenPropagation, true, "Allow propagation of bearer token to be used by storage plugins")

}

// InitFromViper initializes QueryOptions with properties from viper
Expand All @@ -55,5 +60,6 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper) *QueryOptions {
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
return qOpts
}
11 changes: 7 additions & 4 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"strings"

"github.com/gorilla/handlers"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -80,11 +80,14 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,

apiHandler.RegisterRoutes(r)
RegisterStaticHandler(r, logger, queryOpts)
compressHandler := handlers.CompressHandler(r)
var handler http.Handler = r
if queryOpts.BearerTokenPropagation {
handler = bearerTokenPropagationHandler(logger, r)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(logger, true)

return &http.Server{
Handler: recoveryHandler(compressHandler),
Handler: recoveryHandler(handler),
}
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestServer(t *testing.T) {
querySvc := &querysvc.QueryService{}
tracer := opentracing.NoopTracer{}

server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP}, tracer)
server := NewServer(flagsSvc, querySvc, &QueryOptions{Port: ports.QueryAdminHTTP,
BearerTokenPropagation: true}, tracer)
assert.NoError(t, server.Start())

// TODO wait for servers to come up and test http and grpc endpoints
Expand Down
84 changes: 84 additions & 0 deletions cmd/query/app/token_propagation_hander_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"net/http"
"net/http/httptest"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)


func Test_bearTokenPropagationHandler(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look, I found another bear!

logger := zap.NewNop()
bearerToken := "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhZG1pbiIsIm5hbWUiOiJKb2huIERvZSIsImlhdCI"

validTokenHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, ok := spanstore.GetBearerToken(ctx)
assert.Equal(t, token, bearerToken)
assert.True(t, ok)
stop.Done()
})
}

emptyHandler := func(stop *sync.WaitGroup) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
token, _ := spanstore.GetBearerToken(ctx)
assert.Empty(t, token, bearerToken)
stop.Done()
})
}

testCases := []struct {
name string
sendHeader bool
header string
handler func(stop *sync.WaitGroup) http.HandlerFunc
}{
{ name:"Bearer token", sendHeader: true, header: "Bearer " + bearerToken, handler:validTokenHandler},
{ name:"Invalid header",sendHeader: true, header: bearerToken, handler:emptyHandler},
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
{ name:"No header", sendHeader: false, handler:emptyHandler},
{ name:"Basic Auth", sendHeader: true, header: "Basic " + bearerToken, handler:emptyHandler},
{ name:"X-Forwarded-Access-Token", sendHeader: true, header: "Bearer " + bearerToken, handler:validTokenHandler},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
stop := sync.WaitGroup{}
stop.Add(1)
r := bearerTokenPropagationHandler(logger, testCase.handler(&stop))
server := httptest.NewServer(r)
defer server.Close()
req , err := http.NewRequest("GET", server.URL, nil)
assert.Nil(t,err)
if testCase.sendHeader {
req.Header.Add("Authorization", testCase.header)
}
_, err = httpClient.Do(req)
assert.Nil(t, err)
stop.Wait()
})
}

}
51 changes: 51 additions & 0 deletions cmd/query/app/token_propagation_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"net/http"
"strings"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/storage/spanstore"
)

func bearerTokenPropagationHandler(logger *zap.Logger, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
authHeaderValue := r.Header.Get("Authorization")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need to be checking for X-Forwarded-Access-Token - as (for example) if using the OpenShift OAuthProxy - it will receive the bearer token from the UI, and if configured to do so, will pass it in the upstream request in the X-Forwarded-Access-Token header instead of Authorization.

@jpkrohling does this sound reasonable to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it does. That's a common header set by reverse proxies.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put support for X-Forwarded-Access-Token, I tried to get token from there if Authorization header is not defined.

// If no Authorization header is present, try with X-Forwarded-Access-Token
if authHeaderValue == "" {
authHeaderValue = r.Header.Get("X-Forwarded-Access-Token")
}
if authHeaderValue != "" {
headerValue := strings.Split(authHeaderValue, " ")
token := ""
if len(headerValue) == 2 {
// Make sure we only capture bearer token , not other types like Basic auth.
if headerValue[0] == "Bearer" {
token = headerValue[1]
}
} else {
logger.Warn("Invalid authorization header, skipping bearer token propagation")
}
h.ServeHTTP(w, r.WithContext(spanstore.ContextWithBearerToken(ctx, token)))
} else {
h.ServeHTTP(w, r.WithContext(ctx))
}
})

}
6 changes: 4 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
istorage "github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

Expand Down Expand Up @@ -78,7 +79,9 @@ func main() {
}
defer closer.Close()
opentracing.SetGlobalTracer(tracer)

queryOpts := new(app.QueryOptions).InitFromViper(v)
// TODO: Need to figure out set enable/disable propagation on storage plugins.
v.Set(spanstore.StoragePropagationKey, queryOpts.BearerTokenPropagation)
storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
Expand All @@ -98,7 +101,6 @@ func main() {
dependencyReader,
*queryServiceOptions)

queryOpts := new(app.QueryOptions).InitFromViper(v)
server := app.NewServer(svc, queryService, queryOpts, tracer)

if err := server.Start(); err != nil {
Expand Down
85 changes: 55 additions & 30 deletions pkg/es/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,34 @@ import (

"github.com/jaegertracing/jaeger/pkg/es"
"github.com/jaegertracing/jaeger/pkg/es/wrapper"
"github.com/jaegertracing/jaeger/storage/spanstore"
storageMetrics "github.com/jaegertracing/jaeger/storage/spanstore/metrics"
)

// Configuration describes the configuration properties needed to connect to an ElasticSearch cluster
type Configuration struct {
Servers []string
Username string
Password string
TokenFilePath string
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
Servers []string
Username string
Password string
TokenFilePath string
AllowTokenFromContext bool
Sniffer bool // https://github.com/olivere/elastic/wiki/Sniffing
MaxNumSpans int // defines maximum number of spans to fetch from storage per query
MaxSpanAge time.Duration `yaml:"max_span_age"` // configures the maximum lookback on span reads
NumShards int64 `yaml:"shards"`
NumReplicas int64 `yaml:"replicas"`
Timeout time.Duration `validate:"min=500"`
BulkSize int
BulkWorkers int
BulkActions int
BulkFlushInterval time.Duration
IndexPrefix string
TagsFilePath string
AllTagsAsFields bool
TagDotReplacement string
Enabled bool
TLS TLSConfig
UseReadWriteAliases bool
}

// TLSConfig describes the configuration properties to connect tls enabled ElasticSearch cluster
Expand Down Expand Up @@ -90,7 +92,7 @@ func (c *Configuration) NewClient(logger *zap.Logger, metricsFactory metrics.Fac
if len(c.Servers) < 1 {
return nil, errors.New("No servers specified")
}
options, err := c.getConfigOptions()
options, err := c.getConfigOptions(logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -247,8 +249,13 @@ func (c *Configuration) IsEnabled() bool {
}

// getConfigOptions wraps the configs to feed to the ElasticSearch client init
func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer)}
func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOptionFunc, error) {

options := []elastic.ClientOptionFunc{elastic.SetURL(c.Servers...), elastic.SetSniff(c.Sniffer),
// Disable health check when token from context is allowed, this is because at this time
// we don' have a valid token to do the check ad if we don't disable the check the service that
// uses this won't start.
elastic.SetHealthcheck(!c.AllowTokenFromContext)}
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
httpClient := &http.Client{
Timeout: c.Timeout,
}
Expand All @@ -271,14 +278,24 @@ func (c *Configuration) getConfigOptions() ([]elastic.ClientOptionFunc, error) {
}
httpTransport.TLSClientConfig = &tls.Config{RootCAs: ca}
}

token := ""
if c.TokenFilePath != "" {
token, err := loadToken(c.TokenFilePath)
if c.AllowTokenFromContext {
logger.Warn("Token file and token propagation are both enabled, token from file won't be used")
}
tokenFromFile, err := loadToken(c.TokenFilePath)
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
token = tokenFromFile
}

if token != "" || c.AllowTokenFromContext {
httpClient.Transport = &tokenAuthTransport{
token: token,
wrapped: httpTransport,
token: token,
allowOverrideFromCtx: c.AllowTokenFromContext,
wrapped: httpTransport,
}
} else {
httpClient.Transport = httpTransport
Expand Down Expand Up @@ -329,12 +346,20 @@ func (tlsConfig *TLSConfig) loadPrivateKey() (*tls.Certificate, error) {

// TokenAuthTransport
type tokenAuthTransport struct {
token string
wrapped *http.Transport
token string
allowOverrideFromCtx bool
wrapped *http.Transport
}

func (tr *tokenAuthTransport) RoundTrip(r *http.Request) (*http.Response, error) {
r.Header.Set("Authorization", "Bearer "+tr.token)
token := tr.token
if tr.allowOverrideFromCtx {
headerToken, _ := spanstore.GetBearerToken(r.Context())
if headerToken != "" {
token = headerToken
}
}
r.Header.Set("Authorization", "Bearer "+token)
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
return tr.wrapped.RoundTrip(r)
}

Expand Down
3 changes: 3 additions & 0 deletions plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

const (
Expand Down Expand Up @@ -255,6 +256,8 @@ func initFromViper(cfg *namespaceConfig, v *viper.Viper) {
cfg.TagDotReplacement = v.GetString(cfg.namespace + suffixTagDeDotChar)
cfg.UseReadWriteAliases = v.GetBool(cfg.namespace + suffixReadAlias)
cfg.Enabled = v.GetBool(cfg.namespace + suffixEnabled)
// TODO: Need to figure out a better way for do this.
cfg.AllowTokenFromContext = v.GetBool(spanstore.StoragePropagationKey)
}

// GetPrimary returns primary configuration.
Expand Down
Loading