Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add streamingSpanWriterPlugin #3640

Merged
merged 3 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions examples/memstore-plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package main

import (
"flag"
"path"
"strings"

"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
Expand All @@ -42,15 +41,17 @@ func main() {
flag.StringVar(&configPath, "config", "", "A path to the plugin's configuration file")
flag.Parse()

if configPath != "" {
viper.SetConfigFile(path.Base(configPath))
viper.AddConfigPath(path.Dir(configPath))
}

v := viper.New()
v.AutomaticEnv()
v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_"))

if configPath != "" {
v.SetConfigFile(configPath)
if err := v.ReadInConfig(); err != nil {
panic(err)
}
}

opts := memory.Options{}
opts.InitFromViper(v)

Expand All @@ -71,10 +72,14 @@ func main() {
opentracing.SetGlobalTracer(tracer)

memStorePlugin := grpcMemory.NewStoragePlugin(memory.NewStore(), memory.NewStore())
grpc.ServeWithGRPCServer(&shared.PluginServices{
service := &shared.PluginServices{
Store: memStorePlugin,
ArchiveStore: memStorePlugin,
}, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
}
if v.GetBool("enable_streaming_writer") {
vuuihc marked this conversation as resolved.
Show resolved Hide resolved
service.StreamingSpanWriter = memStorePlugin
}
grpc.ServeWithGRPCServer(service, func(options []googleGRPC.ServerOption) *googleGRPC.Server {
return plugin.DefaultGRPCServer([]googleGRPC.ServerOption{
googleGRPC.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer)),
googleGRPC.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(tracer)),
Expand Down
2 changes: 2 additions & 0 deletions plugin/storage/grpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ grpc.Serve(&shared.PluginServices{
})
```

To support writing span via client stream (which can enlarge the throughput), you must fill `StreamingSpanWriter` with the same plugin with `Store` field. Note that use streaming spanWriter may make the `save_by_svr` metric inaccurate, in which case users will need to pay attention to the metrics provided by plugin.

Running with a plugin
---------------------
A plugin can be run using the `all-in-one` application within the top level `cmd` package of the Jaeger project. To do this
Expand Down
10 changes: 8 additions & 2 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
streamingSpanWriterPlugin, ok := raw.(shared.StreamingSpanWriterPlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
Expand All @@ -171,8 +176,9 @@ func (c *Configuration) buildPlugin(logger *zap.Logger) (*ClientPluginServices,

return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
}, nil
Expand Down
13 changes: 10 additions & 3 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ type Factory struct {

builder config.PluginBuilder

store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
capabilities shared.PluginCapabilities
store shared.StoragePlugin
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities
vuuihc marked this conversation as resolved.
Show resolved Hide resolved
}

var _ io.Closer = (*Factory)(nil)
Expand Down Expand Up @@ -81,6 +82,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.store = services.Store
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand All @@ -92,6 +94,11 @@ func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
if f.capabilities != nil && f.streamingSpanWriter != nil {
if capabilities, err := f.capabilities.Capabilities(); err == nil && capabilities.StreamingSpanWriter {
return f.streamingSpanWriter.StreamingSpanWriter(), nil
}
}
return f.store.SpanWriter(), nil
}

Expand Down
109 changes: 93 additions & 16 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ import (
var _ storage.Factory = new(Factory)

type mockPluginBuilder struct {
plugin *mockPlugin
err error
plugin *mockPlugin
writerType string
err error
}

func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginServices, error) {
Expand All @@ -53,6 +54,9 @@ func (b *mockPluginBuilder) Build(logger *zap.Logger) (*grpcConfig.ClientPluginS
ArchiveStore: b.plugin,
},
}
if b.writerType == "streaming" {
services.PluginServices.StreamingSpanWriter = b.plugin
}
if b.plugin.capabilities != nil {
services.Capabilities = b.plugin
}
Expand All @@ -65,12 +69,13 @@ func (b *mockPluginBuilder) Close() error {
}

type mockPlugin struct {
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
spanReader spanstore.Reader
spanWriter spanstore.Writer
archiveReader spanstore.Reader
archiveWriter spanstore.Writer
streamingSpanWriter spanstore.Writer
capabilities shared.PluginCapabilities
dependencyReader dependencystore.Reader
}

func (mp *mockPlugin) Capabilities() (*shared.Capabilities, error) {
Expand All @@ -93,6 +98,10 @@ func (mp *mockPlugin) SpanWriter() spanstore.Writer {
return mp.spanWriter
}

func (mp *mockPlugin) StreamingSpanWriter() spanstore.Writer {
return mp.streamingSpanWriter
}

func (mp *mockPlugin) DependencyReader() dependencystore.Reader {
return mp.dependencyReader
}
Expand Down Expand Up @@ -143,16 +152,19 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
}, nil)
ArchiveSpanReader: true,
ArchiveSpanWriter: true,
StreamingSpanWriter: true,
}, nil).Times(3)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
streamingSpanWriter: new(spanStoreMocks.Writer),
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

Expand All @@ -163,6 +175,9 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.NoError(t, err)
assert.NotNil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.streamingSpanWriter.StreamingSpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
Expand All @@ -173,15 +188,17 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
capabilities := new(mocks.PluginCapabilities)
capabilities.On("Capabilities").
Return(&shared.Capabilities{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
StreamingSpanWriter: false,
}, nil)

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -193,6 +210,9 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, storage.ErrArchiveStorageNotSupported.Error())
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
Expand All @@ -210,6 +230,7 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
capabilities: capabilities,
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -221,6 +242,9 @@ func TestGRPCStorageFactory_CapabilitiesError(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.EqualError(t, err, customError.Error())
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
Expand All @@ -232,6 +256,7 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
Expand All @@ -243,6 +268,9 @@ func TestGRPCStorageFactory_CapabilitiesNil(t *testing.T) {
writer, err := f.CreateArchiveSpanWriter()
assert.Equal(t, err, storage.ErrArchiveStorageNotSupported)
assert.Nil(t, writer)
writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestWithConfiguration(t *testing.T) {
Expand Down Expand Up @@ -272,3 +300,52 @@ func TestInitFromOptions(t *testing.T) {
assert.Equal(t, o, f.options)
assert.Equal(t, &o.Configuration, f.builder)
}

func TestStreamingSpanWriterFactory_CapabilitiesNil(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v, zap.NewNop())

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer)
}

func TestStreamingSpanWriterFactory_Capabilities(t *testing.T) {
f := NewFactory()
v := viper.New()
f.InitFromViper(v, zap.NewNop())

capabilities := new(mocks.PluginCapabilities)
customError := errors.New("made-up error")
capabilities.On("Capabilities").
Return(nil, customError).Once().
On("Capabilities").Return(&shared.Capabilities{}, nil).Once()

f.builder = &mockPluginBuilder{
plugin: &mockPlugin{
archiveWriter: new(spanStoreMocks.Writer),
archiveReader: new(spanStoreMocks.Reader),
spanWriter: new(spanStoreMocks.Writer),
capabilities: capabilities,
},
writerType: "streaming",
}
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
writer, err := f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return error

writer, err = f.CreateSpanWriter()
assert.NoError(t, err)
assert.Equal(t, f.store.SpanWriter(), writer) // get unary writer when Capabilities return false
}
1 change: 1 addition & 0 deletions plugin/storage/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func ServeWithGRPCServer(services *shared.PluginServices, grpcServer func([]grpc
shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{
Impl: services.Store,
ArchiveImpl: services.ArchiveStore,
StreamImpl: services.StreamingSpanWriter,
},
},
},
Expand Down
4 changes: 4 additions & 0 deletions plugin/storage/grpc/memory/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (ns *storagePlugin) SpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) StreamingSpanWriter() spanstore.Writer {
return ns.store
}

func (ns *storagePlugin) ArchiveSpanReader() spanstore.Reader {
return ns.archiveStore
}
Expand Down
1 change: 1 addition & 0 deletions plugin/storage/grpc/memory/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestPluginUsesMemoryStorage(t *testing.T) {
assert.Equal(t, mainStorage, memStorePlugin.DependencyReader())
assert.Equal(t, mainStorage, memStorePlugin.SpanReader())
assert.Equal(t, mainStorage, memStorePlugin.SpanWriter())
assert.Equal(t, mainStorage, memStorePlugin.StreamingSpanWriter())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanReader())
assert.Equal(t, archiveStorage, memStorePlugin.ArchiveSpanWriter())

Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/grpc/proto/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ service SpanWriterPlugin {
rpc Close(CloseWriterRequest) returns (CloseWriterResponse);
}

service StreamingSpanWriterPlugin {
rpc WriteSpanStream(stream WriteSpanRequest) returns (WriteSpanResponse);
}

service SpanReaderPlugin {
// spanstore/Reader
rpc GetTrace(GetTraceRequest) returns (stream SpansResponseChunk);
Expand Down Expand Up @@ -178,6 +182,7 @@ message CapabilitiesRequest {
message CapabilitiesResponse {
bool archiveSpanReader = 1;
bool archiveSpanWriter = 2;
bool streamingSpanWriter = 3;
}

service PluginCapabilities {
Expand Down
Loading