Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate Ports for GRPC and HTTP requests in Query Server #2387

Merged
merged 13 commits into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
queryHostPort = "query.host-port"
queryPort = "query.port"
queryPortWarning = "(deprecated, will be removed after 2020-08-31 or in release v1.20.0, whichever is later)"
queryHOSTPortWarning = "(deprecated, will be removed after 2020-12-31 or in release v1.21.0, whichever is later)"
queryHTTPHostPort = "query.http-server.host-port"
queryGRPCHostPort = "query.grpc-server.host-port"
queryBasePath = "query.base-path"
queryStaticFiles = "query.static-files"
queryUIConfig = "query.ui-config"
Expand All @@ -56,8 +59,12 @@ var tlsFlagsConfig = tlscfg.ServerFlagsConfig{

// QueryOptions holds configuration for query service
type QueryOptions struct {
// HostPort is the host:port address that the query service listens o n
// HostPort is the host:port address that the query service listens on
HostPort string
// HTTPHostPort is the host:port address that the query service listens in on for http requests
HTTPHostPort string
// GRPCHostPort is the host:port address that the query service listens in on for gRPC requests
GRPCHostPort string
// BasePath is the prefix for all UI and API HTTP routes
BasePath string
// StaticAssets is the path for the static assets for the UI (https://github.com/uber/jaeger-ui)
Expand All @@ -77,7 +84,9 @@ type QueryOptions struct {
// AddFlags adds flags for QueryOptions
func AddFlags(flagSet *flag.FlagSet) {
flagSet.Var(&config.StringSlice{}, queryAdditionalHeaders, `Additional HTTP response headers. Can be specified multiple times. Format: "Key: Value"`)
flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
flagSet.String(queryHostPort, ports.PortToHostPort(ports.QueryHTTP), queryHOSTPortWarning+" see --"+queryHTTPHostPort+" and --"+queryGRPCHostPort)
flagSet.String(queryHTTPHostPort, ports.PortToHostPort(ports.QueryHTTP), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's HTTP server")
flagSet.String(queryGRPCHostPort, ports.PortToHostPort(ports.QueryGRPC), "The host:port (e.g. 127.0.0.1:5555 or :5555) of the query's gRPC server")
flagSet.Int(queryPort, 0, queryPortWarning+" see --"+queryHostPort)
flagSet.String(queryBasePath, "/", "The base path for all HTTP routes, e.g. /jaeger; useful when running behind a reverse proxy")
flagSet.String(queryStaticFiles, "", "The directory path override for the static assets for the UI")
Expand All @@ -86,14 +95,33 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.Duration(queryMaxClockSkewAdjust, time.Second, "The maximum delta by which span timestamps may be adjusted in the UI due to clock skew; set to 0s to disable clock skew adjustments")
}

// InitPortsConfigFromViper initializes the port numbers and TLS configuration of ports
func (qOpts *QueryOptions) InitPortsConfigFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions {
qOpts.HTTPHostPort = v.GetString(queryHTTPHostPort)
qOpts.GRPCHostPort = v.GetString(queryGRPCHostPort)
qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort))

qOpts.TLS = tlsFlagsConfig.InitFromViper(v)

// query.host-port is not defined and at least one of query.grpc-server.host-port or query.http-server.host-port is defined.
// User intends to use separate GRPC and HTTP host:port flags
if !(v.IsSet(queryHostPort) || v.IsSet(queryPort)) && (v.IsSet(queryHTTPHostPort) || v.IsSet(queryGRPCHostPort)) {
return qOpts
}
logger.Warn(fmt.Sprintf("Use of %s and %s is deprecated. Use %s and %s instead", queryPort, queryHostPort, queryHTTPHostPort, queryGRPCHostPort))
qOpts.HTTPHostPort = qOpts.HostPort
qOpts.GRPCHostPort = qOpts.HostPort
return qOpts

}

// InitFromViper initializes QueryOptions with properties from viper
func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) *QueryOptions {
qOpts.HostPort = ports.GetAddressFromCLIOptions(v.GetInt(queryPort), v.GetString(queryHostPort))
qOpts = qOpts.InitPortsConfigFromViper(v, logger)
qOpts.BasePath = v.GetString(queryBasePath)
qOpts.StaticAssets = v.GetString(queryStaticFiles)
qOpts.UIConfig = v.GetString(queryUIConfig)
qOpts.BearerTokenPropagation = v.GetBool(queryTokenPropagation)
qOpts.TLS = tlsFlagsConfig.InitFromViper(v)
qOpts.MaxClockSkewAdjust = v.GetDuration(queryMaxClockSkewAdjust)

stringSlice := v.GetStringSlice(queryAdditionalHeaders)
Expand Down
21 changes: 21 additions & 0 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/mocks"
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
)
Expand Down Expand Up @@ -60,6 +61,26 @@ func TestQueryBuilderFlags(t *testing.T) {
assert.Equal(t, 10*time.Second, qOpts.MaxClockSkewAdjust)
}

func TestQueryBuilderFlagsSeparatePorts(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
"--query.http-server.host-port=127.0.0.1:8080",
})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())
assert.Equal(t, "127.0.0.1:8080", qOpts.HTTPHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryGRPC), qOpts.GRPCHostPort)
}

func TestQueryBuilderFlagsSeparateNoPorts(t *testing.T) {
rjs211 marked this conversation as resolved.
Show resolved Hide resolved
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
qOpts := new(QueryOptions).InitFromViper(v, zap.NewNop())

assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HTTPHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.GRPCHostPort)
assert.Equal(t, ports.PortToHostPort(ports.QueryHTTP), qOpts.HostPort)
}

func TestQueryBuilderBadHeadersFlags(t *testing.T) {
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
Expand Down
105 changes: 86 additions & 19 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,26 @@ type Server struct {
tracer opentracing.Tracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
grpcServer *grpc.Server
httpServer *http.Server
separatePorts bool
unavailableChannel chan healthcheck.Status
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) {

_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
return nil, err
}
_, grpcPort, err := net.SplitHostPort(options.GRPCHostPort)
if err != nil {
return nil, err
}

grpcServer, err := createGRPCServer(querySvc, options, logger, tracer)
if err != nil {
return nil, err
Expand All @@ -61,6 +74,7 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, options *Que
tracer: tracer,
grpcServer: grpcServer,
httpServer: createHTTPServer(querySvc, options, tracer, logger),
separatePorts: (grpcPort != httpPort),
unavailableChannel: make(chan healthcheck.Status),
}, nil
}
Expand Down Expand Up @@ -117,11 +131,27 @@ func createHTTPServer(querySvc *querysvc.QueryService, queryOpts *QueryOptions,
}
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
// initListener initialises listeners of the server
func (s *Server) initListener() (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPCHostPort)
if err != nil {
return nil, err
}

s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTPHostPort)
if err != nil {
return nil, err
}
s.logger.Info("Query server started")
return nil, nil
}

// old behavior using cmux
conn, err := net.Listen("tcp", s.queryOptions.HostPort)
if err != nil {
return err
return nil, err
}
s.conn = conn

Expand All @@ -138,16 +168,46 @@ func (s *Server) Start() error {
// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

grpcListener := cmuxServer.MatchWithWriters(
s.grpcConn = cmuxServer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
httpListener := cmuxServer.Match(cmux.Any())
s.httpConn = cmuxServer.Match(cmux.Any())
s.queryOptions.HTTPHostPort = s.queryOptions.HostPort
s.queryOptions.GRPCHostPort = s.queryOptions.HostPort

return cmuxServer, nil

}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
cmuxServer, err := s.initListener()
if err != nil {
return err
}

var tcpPort int
if !s.separatePorts {
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
}
}

var httpPort int
if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil {
httpPort = port
}

var grpcPort int
if port, err := netutils.GetPort(s.grpcConn.Addr()); err == nil {
grpcPort = port
}

go func() {
s.logger.Info("Starting HTTP server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
s.logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTPHostPort))

switch err := s.httpServer.Serve(httpListener); err {
switch err := s.httpServer.Serve(s.httpConn); err {
case nil, http.ErrServerClosed, cmux.ErrListenerClosed:
// normal exit, nothing to do
default:
Expand All @@ -158,25 +218,27 @@ func (s *Server) Start() error {

// Start GRPC server concurrently
go func() {
s.logger.Info("Starting GRPC server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
s.logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPCHostPort))

if err := s.grpcServer.Serve(grpcListener); err != nil {
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.logger.Error("Could not start GRPC server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()

// Start cmux server concurrently.
go func() {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))
if !s.separatePorts {
go func() {
s.logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HostPort))

err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
err := cmuxServer.Serve()
// TODO: Remove string comparison when https://github.com/soheilhy/cmux/pull/69 is merged
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.logger.Error("Could not start multiplexed server", zap.Error(err))
}
s.unavailableChannel <- healthcheck.Unavailable
}()
}

return nil
}
Expand All @@ -186,6 +248,11 @@ func (s *Server) Close() error {
s.queryOptions.TLS.Close()
s.grpcServer.Stop()
s.httpServer.Close()
s.conn.Close()
if s.separatePorts {
s.httpConn.Close()
s.grpcConn.Close()
} else {
s.conn.Close()
}
return nil
}
Loading