Skip to content

Commit

Permalink
Add gRPC server & reflection (#6463)
Browse files Browse the repository at this point in the history
* Add gRPC proxy

* Make GRPC disabled by default

* WIP on integration tests

* WIP on integration tests

* Start setting up in process tests

* Start setting up in process tests

* Make it compile

* Add start server to network util

* Add Println

* Use go routine

* Fix scopelint

* Move to proxy_test

* Add response type cache

* Remove proxy

* Tweaks

* Use channel to handle error

* Use error chan

* Update server/start.go

Co-authored-by: Federico Kunze <31522760+fedekunze@users.noreply.github.com>

* Use %w

* Add sdk.Context

* Add comments

* Fix lint

* Add header and tests

* Address comments

* Factorize some code

* Fix lint

* Add height and prove in req metadata

* Add reflection test

* Fix lint

* Put grpc test in server/grpc

* Update baseapp/grpcserver.go

* Update baseapp/grpcserver.go

* Remove proof header

Co-authored-by: Amaury Martiny <amaury.martiny@protonmail.com>
Co-authored-by: Federico Kunze <31522760+fedekunze@users.noreply.github.com>
Co-authored-by: Alexander Bezobchuk <alexanderbez@users.noreply.github.com>
Co-authored-by: SaReN <sahithnarahari@gmail.com>
  • Loading branch information
5 people committed Jul 27, 2020
1 parent 20488b4 commit e9534b0
Show file tree
Hide file tree
Showing 17 changed files with 384 additions and 70 deletions.
18 changes: 10 additions & 8 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (app *BaseApp) Query(req abci.RequestQuery) abci.ResponseQuery {
}

func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQuery) abci.ResponseQuery {
ctx, err := app.createQueryContext(req)
ctx, err := app.createQueryContext(req.Height, req.Prove)
if err != nil {
return sdkerrors.QueryResult(err)
}
Expand All @@ -364,26 +364,28 @@ func (app *BaseApp) handleQueryGRPC(handler GRPCQueryHandler, req abci.RequestQu
return res
}

func (app *BaseApp) createQueryContext(req abci.RequestQuery) (sdk.Context, error) {
// createQueryContext creates a new sdk.Context for a query, taking as args
// the block height and whether the query needs a proof or not.
func (app *BaseApp) createQueryContext(height int64, prove bool) (sdk.Context, error) {
// when a client did not provide a query height, manually inject the latest
if req.Height == 0 {
req.Height = app.LastBlockHeight()
if height == 0 {
height = app.LastBlockHeight()
}

if req.Height <= 1 && req.Prove {
if height <= 1 && prove {
return sdk.Context{},
sdkerrors.Wrap(
sdkerrors.ErrInvalidRequest,
"cannot query with proof when height <= 1; please provide a valid height",
)
}

cacheMS, err := app.cms.CacheMultiStoreWithVersion(req.Height)
cacheMS, err := app.cms.CacheMultiStoreWithVersion(height)
if err != nil {
return sdk.Context{},
sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"failed to load state at height %d; %s (latest height: %d)", req.Height, err, app.LastBlockHeight(),
"failed to load state at height %d; %s (latest height: %d)", height, err, app.LastBlockHeight(),
)
}

Expand Down Expand Up @@ -517,7 +519,7 @@ func handleQueryCustom(app *BaseApp, path []string, req abci.RequestQuery) abci.
return sdkerrors.QueryResult(sdkerrors.Wrapf(sdkerrors.ErrUnknownRequest, "no custom querier found for route %s", path[1]))
}

ctx, err := app.createQueryContext(req)
ctx, err := app.createQueryContext(req.Height, req.Prove)
if err != nil {
return sdkerrors.QueryResult(err)
}
Expand Down
11 changes: 4 additions & 7 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (app *BaseApp) MountStores(keys ...sdk.StoreKey) {
}
}

// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
// multistore.
// MountKVStores mounts all IAVL or DB stores to the provided keys in the
// BaseApp multistore.
func (app *BaseApp) MountKVStores(keys map[string]*sdk.KVStoreKey) {
for _, key := range keys {
if !app.fauxMerkleMode {
Expand All @@ -186,8 +186,8 @@ func (app *BaseApp) MountKVStores(keys map[string]*sdk.KVStoreKey) {
}
}

// MountStores mounts all IAVL or DB stores to the provided keys in the BaseApp
// multistore.
// MountTransientStores mounts all IAVL or DB stores to the provided keys in
// the BaseApp multistore.
func (app *BaseApp) MountTransientStores(keys map[string]*sdk.TransientStoreKey) {
for _, key := range keys {
app.MountStore(key, sdk.StoreTypeTransient)
Expand Down Expand Up @@ -297,9 +297,6 @@ func (app *BaseApp) Router() sdk.Router {
// QueryRouter returns the QueryRouter of a BaseApp.
func (app *BaseApp) QueryRouter() sdk.QueryRouter { return app.queryRouter }

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// Seal seals a BaseApp. It prohibits any further modifications to a BaseApp.
func (app *BaseApp) Seal() { app.sealed = true }

Expand Down
12 changes: 12 additions & 0 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ var protoCodec = encoding.GetCodec(proto.Name)
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
anyUnpacker types.AnyUnpacker
serviceData []serviceData
}

// serviceData represents a gRPC service, along with its handler.
type serviceData struct {
serviceDesc *grpc.ServiceDesc
handler interface{}
}

var _ gogogrpc.Server
Expand Down Expand Up @@ -83,6 +90,11 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
}, nil
}
}

qrt.serviceData = append(qrt.serviceData, serviceData{
serviceDesc: sd,
handler: handler,
})
}

// AnyUnpacker returns the AnyUnpacker for the router
Expand Down
86 changes: 86 additions & 0 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package baseapp

import (
"context"
"strconv"

gogogrpc "github.com/gogo/protobuf/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

servergrpc "github.com/cosmos/cosmos-sdk/server/grpc"
sdk "github.com/cosmos/cosmos-sdk/types"
)

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}

// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(servergrpc.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, err
}
}

// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.createQueryContext(height, false)
if err != nil {
return nil, err
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)

// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
}
md = metadata.Pairs(servergrpc.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return handler(grpcCtx, req)
}

// Loop through all services and methods, add the interceptor, and register
// the service.
for _, data := range app.GRPCQueryRouter().serviceData {
desc := data.serviceDesc
newMethods := make([]grpc.MethodDesc, len(desc.Methods))

for i, method := range desc.Methods {
methodHandler := method.Handler
newMethods[i] = grpc.MethodDesc{
MethodName: method.MethodName,
Handler: func(srv interface{}, ctx context.Context, dec func(interface{}) error, _ grpc.UnaryServerInterceptor) (interface{}, error) {
return methodHandler(srv, ctx, dec, interceptor)
},
}
}

newDesc := &grpc.ServiceDesc{
ServiceName: desc.ServiceName,
HandlerType: desc.HandlerType,
Methods: newMethods,
Streams: desc.Streams,
Metadata: desc.Metadata,
}

server.RegisterService(newDesc, data.handler)
}
}
18 changes: 18 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,23 @@ type APIConfig struct {
// Ref: https://github.com/cosmos/cosmos-sdk/issues/6420
}

// GRPCConfig defines configuration for the gRPC server.
type GRPCConfig struct {
// Enable defines if the gRPC server should be enabled.
Enable bool `mapstructure:"enable"`

// Address defines the API server to listen on
Address string `mapstructure:"address"`
}

// Config defines the server's top level configuration
type Config struct {
BaseConfig `mapstructure:",squash"`

// Telemetry defines the application telemetry configuration
Telemetry telemetry.Config `mapstructure:"telemetry"`
API APIConfig `mapstructure:"api"`
GRPC GRPCConfig `mapstructure:"grpc"`
}

// SetMinGasPrices sets the validator's minimum gas prices.
Expand Down Expand Up @@ -134,6 +144,10 @@ func DefaultConfig() *Config {
RPCReadTimeout: 10,
RPCMaxBodyBytes: 1000000,
},
GRPC: GRPCConfig{
Enable: false,
Address: "0.0.0.0:9090",
},
}
}

Expand Down Expand Up @@ -177,5 +191,9 @@ func GetConfig(v *viper.Viper) Config {
RPCMaxBodyBytes: v.GetUint("api.rpc-max-body-bytes"),
EnableUnsafeCORS: v.GetBool("api.enabled-unsafe-cors"),
},
GRPC: GRPCConfig{
Enable: v.GetBool("grpc.enable"),
Address: v.GetString("grpc.address"),
},
}
}
35 changes: 0 additions & 35 deletions server/constructors.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,15 @@
package server

import (
"encoding/json"
"io"
"os"
"path/filepath"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/log"
tmtypes "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/server/api"
sdk "github.com/cosmos/cosmos-sdk/types"
)

type (
// AppOptions defines an interface that is passed into an application
// constructor, typically used to set BaseApp options that are either supplied
// via config file or through CLI arguments/flags. The underlying implementation
// is defined by the server package and is typically implemented via a Viper
// literal defined on the server Context. Note, casting Get calls may not yield
// the expected types and could result in type assertion errors. It is recommend
// to either use the cast package or perform manual conversion for safety.
AppOptions interface {
Get(string) interface{}
}

// Application defines an application interface that wraps abci.Application.
// The interface defines the necessary contracts to be implemented in order
// to fully bootstrap and start an application.
Application interface {
abci.Application

RegisterAPIRoutes(*api.Server)
}

// AppCreator is a function that allows us to lazily initialize an
// application using various configurations.
AppCreator func(log.Logger, dbm.DB, io.Writer, AppOptions) Application

// AppExporter is a function that dumps all app state to
// JSON-serializable structure and returns the current validator set.
AppExporter func(log.Logger, dbm.DB, io.Writer, int64, bool, []string) (json.RawMessage, []tmtypes.GenesisValidator, *abci.ConsensusParams, error)
)

func openDB(rootDir string) (dbm.DB, error) {
dataDir := filepath.Join(rootDir, "data")
db, err := sdk.NewLevelDB("application", dataDir)
Expand Down
3 changes: 2 additions & 1 deletion server/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/server/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

Expand All @@ -23,7 +24,7 @@ const (
)

// ExportCmd dumps app state to JSON.
func ExportCmd(appExporter AppExporter, defaultNodeHome string) *cobra.Command {
func ExportCmd(appExporter types.AppExporter, defaultNodeHome string) *cobra.Command {
cmd := &cobra.Command{
Use: "export",
Short: "Export state to JSON",
Expand Down
47 changes: 47 additions & 0 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package grpc

import (
"fmt"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/cosmos/cosmos-sdk/server/types"
)

const (
// GRPCBlockHeightHeader is the gRPC header for block height.
GRPCBlockHeightHeader = "x-cosmos-block-height"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(grpcSrv)

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
reflection.Register(grpcSrv)

listener, err := net.Listen("tcp", address)
if err != nil {
return nil, err
}

errCh := make(chan error)
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
}()

select {
case err := <-errCh:
return nil, err
case <-time.After(5 * time.Second): // assume server started successfully
return grpcSrv, nil
}
}
Loading

0 comments on commit e9534b0

Please sign in to comment.