Skip to content

Commit

Permalink
Implement GRPC executor
Browse files Browse the repository at this point in the history
Signed-off-by: Paulo Moura <itsme@paulomoura.com.pt>
  • Loading branch information
Espina2 committed Jan 22, 2022
1 parent 2dc2177 commit 043c59e
Show file tree
Hide file tree
Showing 10 changed files with 555 additions and 1 deletion.
7 changes: 6 additions & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ builds:
id: dkron-executor-gcppubsub
binary: dkron-executor-gcppubsub

- <<: *xbuild
main: ./builtin/bins/dkron-executor-grpc/
id: dkron-executor-grpc
binary: dkron-executor-grpc

- <<: *xbuild
main: ./builtin/bins/dkron-processor-files/
id: dkron-processor-files
Expand All @@ -63,7 +68,7 @@ builds:
main: ./builtin/bins/dkron-processor-syslog/
id: dkron-processor-syslog
binary: dkron-processor-syslog

- <<: *xbuild
main: ./builtin/bins/dkron-processor-fluent/
id: dkron-processor-fluent
Expand Down
126 changes: 126 additions & 0 deletions builtin/bins/dkron-executor-grpc/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package main

import (
"bytes"
"context"
"fmt"
"github.com/armon/circbuf"
dkplugin "github.com/distribworks/dkron/v3/plugin"
dktypes "github.com/distribworks/dkron/v3/plugin/types"
"github.com/fullstorydev/grpcurl"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"strconv"
"strings"
"time"
)

const (
defaultTimeout = 30
// maxBufSize limits how much data we collect from a handler.
// This is to prevent Serf's memory from growing to an enormous
// amount due to a faulty handler.
maxBufSize = 256000
)

type GRPC struct{}

// Execute Process method of the plugin
// "executor": "grpc",
// "executor_config": {
// "url": "127.0.0.1:9000/demo.DemoService/Demo", // Request url
// "body": "", // POST body
// "timeout": "30", // Request timeout, unit seconds
// "expectCode": "0", // Expect response code, any of the described here https://grpc.github.io/grpc/core/md_doc_statuscodes.html
// }
func (g *GRPC) Execute(args *dktypes.ExecuteRequest, _ dkplugin.StatusHelper) (*dktypes.ExecuteResponse, error) {
out, err := g.ExecuteImpl(args)
resp := &dktypes.ExecuteResponse{Output: out}
if err != nil {
resp.Error = err.Error()
}
return resp, nil
}

// ExecuteImpl do grpc request
func (g *GRPC) ExecuteImpl(args *dktypes.ExecuteRequest) ([]byte, error) {
output, _ := circbuf.NewBuffer(maxBufSize)

if args.Config["url"] == "" {
return output.Bytes(), errors.New("url is empty")
}

segments := strings.Split(args.Config["url"], "/")
if len(segments) < 2 {
return output.Bytes(), errors.New("we require at least a url and a path to do a proto request")
}

timeout := defaultTimeout
if args.Config["timeout"] != "" {
t, convErr := strconv.Atoi(args.Config["timeout"])
if convErr != nil {
return output.Bytes(), errors.New("Invalid timeout")
}

timeout = t
}

expectedStatusCode := codes.OK
if args.Config["expectCode"] != "" {
t, convErr := strconv.Atoi(args.Config["expectCode"])
if convErr != nil {
return output.Bytes(), errors.New("Invalid timeout")
}

expectedStatusCode = codes.Code(t)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
defer cancel()

body := strings.NewReader(args.Config["body"])

var descSource grpcurl.DescriptorSource
var refClient *grpcreflect.Client
var cc *grpc.ClientConn
var opts []grpc.DialOption
var creds credentials.TransportCredentials

cc, grpcDialErr := grpcurl.BlockingDial(ctx, "tcp", segments[0], creds, opts...)
if grpcDialErr != nil {
return output.Bytes(), grpcDialErr
}
md := grpcurl.MetadataFromHeaders([]string{})
refCtx := metadata.NewOutgoingContext(ctx, md)
refClient = grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(cc))
descSource = grpcurl.DescriptorSourceFromServer(ctx, refClient)

rf, formatter, refErr := grpcurl.RequestParserAndFormatter(grpcurl.FormatJSON, descSource, body, grpcurl.FormatOptions{})
if refErr != nil {
return output.Bytes(), errors.Wrap(refErr, "Failed querying reflection server")
}

var b []byte
out := bytes.NewBuffer(b)

h := &grpcurl.DefaultEventHandler{
Out: out,
Formatter: formatter,
}

rpcCallErr := grpcurl.InvokeRPC(ctx, descSource, cc, strings.Join(segments[1:], "/"), []string{}, h, rf.Next)
if rpcCallErr != nil {
return output.Bytes(), errors.Wrap(rpcCallErr, "Failed querying reflection server")
}

if h.Status.Code() != expectedStatusCode {
return output.Bytes(), fmt.Errorf("server returned %v code, expected %v", h.Status.Code(), expectedStatusCode)
}

return output.Bytes(), nil
}
91 changes: 91 additions & 0 deletions builtin/bins/dkron-executor-grpc/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package main

import (
"context"
"github.com/distribworks/dkron/v3/builtin/bins/dkron-executor-grpc/test"
dktypes "github.com/distribworks/dkron/v3/plugin/types"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
"net"
"testing"
)

type DemoServer struct {
test.UnimplementedTestServiceServer
}

func (d DemoServer) Test(_ context.Context, request *test.TestRequest) (*test.TestRequest, error) {
return request, nil
}

func serverSetup() *grpc.Server {
lis, _ := net.Listen("tcp", ":9000")
grpcServer := grpc.NewServer()

d := &DemoServer{}

test.RegisterTestServiceServer(grpcServer, d)
reflection.Register(grpcServer)
go func() {
grpcServer.Serve(lis)
}()

return grpcServer
}

func TestGRPC_ExecuteImpl(t *testing.T) {
type args struct {
config map[string]string
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "It passes if correct data is provided",
args: args{
config: map[string]string{
"url": "127.0.0.1:9000/test.TestService/Test",
"body": `{"body": "test"}`,
},
},
wantErr: false,
},
{
name: "it fails if bad address is provided",
args: args{
config: map[string]string{
"url": "127.0.0.1:9000",
"body": `{"body": "test"}`,
},
},
wantErr: true,
},
{
name: "it fails if service didn't returned expected code",
args: args{
config: map[string]string{
"url": "127.0.0.1:9000/test.TestService/Test",
"body": `{"body": "test"}`,
"expectCode": "1",
},
},
wantErr: true,
},
}

srv := serverSetup()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := &GRPC{}
_, err := g.ExecuteImpl(&dktypes.ExecuteRequest{Config: tt.args.config})
if (err != nil) != tt.wantErr {
t.Errorf("ExecuteImpl() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}

srv.Stop()
}
18 changes: 18 additions & 0 deletions builtin/bins/dkron-executor-grpc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package main

import (
dkplugin "github.com/distribworks/dkron/v3/plugin"
"github.com/hashicorp/go-plugin"
)

func main() {
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: dkplugin.Handshake,
Plugins: map[string]plugin.Plugin{
"executor": &dkplugin.ExecutorPlugin{Executor: &GRPC{}},
},

// A non-nil value here enables gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
}
Loading

0 comments on commit 043c59e

Please sign in to comment.