Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add grpc_resolver using external discovery service #1498

Merged
merged 11 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 23 additions & 26 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/resolver/manual"

"github.com/jaegertracing/jaeger/pkg/discovery"
"github.com/jaegertracing/jaeger/pkg/discovery/grpcresolver"
)

// ConnBuilder Struct to hold configurations
Expand All @@ -39,15 +40,9 @@ type ConnBuilder struct {
TLSCA string
TLSServerName string

notifier discovery.Notifier
}

// WithDiscoveryNotifier sets service discovery notifier
// TODO User should provide their own notifier so that notifier can push address updates to grpc resolver by invoking notifier.Notify(instances []string)
// We will add integation code with custom notifier and resolver in next PR
func (b *ConnBuilder) WithDiscoveryNotifier(n discovery.Notifier) *ConnBuilder {
b.notifier = n
return b
DiscoveryMinPeers int
Notifier discovery.Notifier
Discoverer discovery.Discoverer
}

// NewConnBuilder creates a new grpc connection builder.
Expand Down Expand Up @@ -81,26 +76,28 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger) (*grpc.ClientConn, er
dialOptions = append(dialOptions, grpc.WithInsecure())
}

if b.notifier != nil {
return nil, errors.New("not implemented")
}
if b.CollectorHostPorts == nil {
return nil, errors.New("at least one collector hostPort address is required when resolver is not available")
}
if len(b.CollectorHostPorts) > 1 {
r, _ := manual.GenerateAndRegisterManualResolver()
var resolvedAddrs []resolver.Address
for _, addr := range b.CollectorHostPorts {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: addr})
}
r.InitialState(resolver.State{Addresses: resolvedAddrs})
dialTarget = r.Scheme() + ":///round_robin"
logger.Info("Agent is connecting to a static list of collectors", zap.String("dialTarget", dialTarget), zap.String("collector hosts", strings.Join(b.CollectorHostPorts, ",")))
if b.Notifier != nil && b.Discoverer != nil {
logger.Info("Using external discovery service with roundrobin load balancer")
grpcResolver := grpcresolver.New(b.Notifier, b.Discoverer, logger, b.DiscoveryMinPeers)
dialTarget = grpcResolver.Scheme() + ":///round_robin"
} else {
dialTarget = b.CollectorHostPorts[0]
if b.CollectorHostPorts == nil {
return nil, errors.New("at least one collector hostPort address is required when resolver is not available")
}
if len(b.CollectorHostPorts) > 1 {
r, _ := manual.GenerateAndRegisterManualResolver()
var resolvedAddrs []resolver.Address
for _, addr := range b.CollectorHostPorts {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: addr})
}
r.InitialState(resolver.State{Addresses: resolvedAddrs})
dialTarget = r.Scheme() + ":///round_robin"
logger.Info("Agent is connecting to a static list of collectors", zap.String("dialTarget", dialTarget), zap.String("collector hosts", strings.Join(b.CollectorHostPorts, ",")))
} else {
dialTarget = b.CollectorHostPorts[0]
}
}
dialOptions = append(dialOptions, grpc.WithBalancerName(roundrobin.Name))

dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(grpc_retry.UnaryClientInterceptor(grpc_retry.WithMax(b.MaxRetry))))
return grpc.Dial(dialTarget, dialOptions...)
}
15 changes: 10 additions & 5 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestBuilderWithCollectors(t *testing.T) {
hostPorts []string
checkSuffixOnly bool
notifier discovery.Notifier
discoverer discovery.Discoverer
err error
}{
{
Expand All @@ -86,28 +87,31 @@ func TestBuilderWithCollectors(t *testing.T) {
hostPorts: []string{"127.0.0.1:9876", "127.0.0.1:9877", "127.0.0.1:9878"},
checkSuffixOnly: true,
notifier: nil,
discoverer: nil,
},
{
target: "127.0.0.1:9876",
name: "with single host",
hostPorts: []string{"127.0.0.1:9876"},
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
},
{
target: "dns://random_stuff",
name: "with custom resolver",
target: "///round_robin",
name: "with custom resolver and fixed discoverer",
hostPorts: []string{"dns://random_stuff"},
checkSuffixOnly: false,
checkSuffixOnly: true,
notifier: noopNotifier{},
err: errors.New("not implemented"),
discoverer: discovery.FixedDiscoverer{},
},
{
target: "",
name: "without collectorPorts and resolver",
hostPorts: nil,
checkSuffixOnly: false,
notifier: nil,
discoverer: nil,
err: errors.New("at least one collector hostPort address is required when resolver is not available"),
},
}
Expand All @@ -117,7 +121,8 @@ func TestBuilderWithCollectors(t *testing.T) {
// Use NewBuilder for code coverage consideration
cfg := NewConnBuilder()
cfg.CollectorHostPorts = test.hostPorts
cfg.WithDiscoveryNotifier(test.notifier)
cfg.Notifier = test.notifier
cfg.Discoverer = test.discoverer

conn, err := cfg.CreateConnection(zap.NewNop())
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/grpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
collectorTLS = gRPCPrefix + "tls"
collectorTLSCA = gRPCPrefix + "tls.ca"
collectorTLSServerName = gRPCPrefix + "tls.server-name"
discoveryMinPeers = gRPCPrefix + "discovery.min-peers"
)

// AddFlags adds flags for Options.
Expand All @@ -38,6 +39,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Bool(collectorTLS, false, "Enable TLS.")
flags.String(collectorTLSCA, "", "Path to a TLS CA file. (default use the systems truststore)")
flags.String(collectorTLSServerName, "", "Override the TLS server name.")
flags.Int(discoveryMinPeers, 3, "Max number of collectors to which the agent will try to connect at any given time")
}

// InitFromViper initializes Options with properties retrieved from Viper.
Expand All @@ -50,5 +52,6 @@ func (b *ConnBuilder) InitFromViper(v *viper.Viper) *ConnBuilder {
b.TLS = v.GetBool(collectorTLS)
b.TLSCA = v.GetString(collectorTLSCA)
b.TLSServerName = v.GetString(collectorTLSServerName)
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
return b
}
6 changes: 4 additions & 2 deletions cmd/agent/app/reporter/grpc/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ func TestBindFlags(t *testing.T) {
expected *ConnBuilder
}{
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111", "--reporter.grpc.retry.max=15"},
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111"}, MaxRetry: 15}},
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111"}, MaxRetry: 15, DiscoveryMinPeers: 3}},
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222"},
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry}},
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry, DiscoveryMinPeers: 3}},
{cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222", "--reporter.grpc.discovery.min-peers=5"},
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry, DiscoveryMinPeers: 5}},
}
for _, test := range tests {
v := viper.New()
Expand Down
6 changes: 3 additions & 3 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func main() {
Namespace(metrics.NSOptions{Name: "agent"})

rOpts := new(reporter.Options).InitFromViper(v)
tChanOpts := tchannel.NewBuilder().InitFromViper(v, logger)
grpcOpts := grpc.NewConnBuilder().InitFromViper(v)
cp, err := app.CreateCollectorProxy(rOpts, tChanOpts, grpcOpts, logger, mFactory)
tchanBuilder := tchannel.NewBuilder().InitFromViper(v, logger)
grpcBuilder := grpc.NewConnBuilder().InitFromViper(v)
cp, err := app.CreateCollectorProxy(rOpts, tchanBuilder, grpcBuilder, logger, mFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,12 @@ func main() {

aOpts := new(agentApp.Builder).InitFromViper(v)
repOpts := new(agentRep.Options).InitFromViper(v)
tchannelRepOpts := agentTchanRep.NewBuilder().InitFromViper(v, logger)
grpcRepOpts := agentGrpcRep.NewConnBuilder().InitFromViper(v)
tchanBuilder := agentTchanRep.NewBuilder().InitFromViper(v, logger)
grpcBuilder := agentGrpcRep.NewConnBuilder().InitFromViper(v)
cOpts := new(collector.CollectorOptions).InitFromViper(v)
qOpts := new(queryApp.QueryOptions).InitFromViper(v)

startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory)
startAgent(aOpts, repOpts, tchanBuilder, grpcBuilder, cOpts, logger, metricsFactory)
collectorSrv := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, svc.HC())
querySrv := startQuery(
svc, qOpts, archiveOptions(storageFactory, logger),
Expand Down
Loading