Skip to content

Commit

Permalink
WebSocket support and subscription methods (cosmos#252)
Browse files Browse the repository at this point in the history
Resolves: cosmos#188 
Resolves: cosmos#244
  • Loading branch information
tzdybal committed Feb 2, 2022
1 parent 4a03ceb commit 39e4050
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 39 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG-PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ Month, DD, YYYY

- [indexer] [Implement block and transaction indexing, enable TxSearch RPC endpoint #202](https://github.com/celestiaorg/optimint/pull/202) [@mattdf](https://github.com/mattdf)
- [rpc] [Tendermint URI RPC #224](https://github.com/celestiaorg/optimint/pull/224) [@tzdybal](https://github.com/tzdybal/)
- [rpc] [Subscription methods #252](https://github.com/celestiaorg/optimint/pull/252) [@tzdybal](https://github.com/tzdybal/)

### IMPROVEMENTS

- [ci] [Add more linters #219](https://github.com/celestiaorg/optimint/pull/219) [@tzdybal](https://github.com/tzdybal/)
- [deps] [Update dependencies: grpc, cors, cobra, viper, tm-db #245](https://github.com/celestiaorg/optimint/pull/245) [@tzdybal](https://github.com/tzdybal/)
- [deps] [Update dependencies: grpc, cors, cobra, viper, tm-db #245](https://github.com/celestiaorg/optimint/pull/245) [@tzdybal](https://github.com/tzdybal/)
- [rpc] [Implement NumUnconfirmedTxs #255](https://github.com/celestiaorg/optimint/pull/255) [@tzdybal](https://github.com/tzdybal/)
- [rpc] [Implement BlockByHash #256](https://github.com/celestiaorg/optimint/pull/256) [@mauriceLC92](https://github.com/mauriceLC92)
- [rpc] [Implement BlockResults #263](https://github.com/celestiaorg/optimint/pull/263) [@tzdybal](https://github.com/tzdybal/)
Expand Down
47 changes: 29 additions & 18 deletions rpc/json/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,43 +10,50 @@ import (
"reflect"
"strconv"

"github.com/celestiaorg/optimint/log"
"github.com/gorilla/rpc/v2"
"github.com/gorilla/rpc/v2/json2"

"github.com/celestiaorg/optimint/log"
)

type handler struct {
s *service
m *http.ServeMux
c rpc.Codec
l log.Logger
srv *service
mux *http.ServeMux
codec rpc.Codec
logger log.Logger
}

func newHandler(s *service, codec rpc.Codec, logger log.Logger) *handler {
mux := http.NewServeMux()
h := &handler{
m: mux,
s: s,
c: codec,
l: logger,
srv: s,
mux: mux,
codec: codec,
logger: logger,
}

mux.HandleFunc("/", h.serveJSONRPC)
mux.HandleFunc("/websocket", h.wsHandler)
for name, method := range s.methods {
logger.Debug("registering method", "name", name)
mux.HandleFunc("/"+name, h.newHandler(method))
}

return h
}
func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.m.ServeHTTP(w, r)
h.mux.ServeHTTP(w, r)
}

// serveJSONRPC serves HTTP request
// implementation is highly inspired by Gorilla RPC v2 (but simplified a lot)
func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) {
// Create a new c request.
codecReq := h.c.NewRequest(r)
h.serveJSONRPCforWS(w, r, nil)
}

// serveJSONRPC serves HTTP request
// implementation is highly inspired by Gorilla RPC v2 (but simplified a lot)
func (h *handler) serveJSONRPCforWS(w http.ResponseWriter, r *http.Request, wsConn *wsConn) {
// Create a new codec request.
codecReq := h.codec.NewRequest(r)
// Get service method to be called.
method, err := codecReq.Method()
if err != nil {
Expand All @@ -57,7 +64,7 @@ func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) {
codecReq.WriteError(w, http.StatusBadRequest, err)
return
}
methodSpec, ok := h.s.methods[method]
methodSpec, ok := h.srv.methods[method]
if !ok {
codecReq.WriteError(w, int(json2.E_NO_METHOD), err)
return
Expand All @@ -70,10 +77,14 @@ func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) {
return
}

rets := methodSpec.m.Call([]reflect.Value{
callArgs := []reflect.Value{
reflect.ValueOf(r),
args,
})
}
if methodSpec.ws {
callArgs = append(callArgs, reflect.ValueOf(wsConn))
}
rets := methodSpec.m.Call(callArgs)

// Extract the result to error if needed.
var errResult error
Expand Down Expand Up @@ -171,7 +182,7 @@ func (h *handler) encodeAndWriteResponse(w http.ResponseWriter, result interface
encoder := json.NewEncoder(w)
err := encoder.Encode(resp)
if err != nil {
h.l.Error("failed to encode RPC response", "error", err)
h.logger.Error("failed to encode RPC response", "error", err)
}
}

Expand Down
85 changes: 78 additions & 7 deletions rpc/json/service.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package json

import (
"errors"
"context"
"encoding/json"
"fmt"
"net/http"
"reflect"
"time"

"github.com/gorilla/rpc/v2/json2"
"github.com/tendermint/tendermint/libs/pubsub"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
rpcclient "github.com/tendermint/tendermint/rpc/client"
ctypes "github.com/tendermint/tendermint/rpc/core/types"

Expand All @@ -14,13 +19,14 @@ import (
)

func GetHttpHandler(l *client.Client, logger log.Logger) (http.Handler, error) {
return newHandler(newService(l), json2.NewCodec(), logger), nil
return newHandler(newService(l, logger), json2.NewCodec(), logger), nil
}

type method struct {
m reflect.Value
argsType reflect.Type
returnType reflect.Type
ws bool
}

func newMethod(m interface{}) *method {
Expand All @@ -30,17 +36,20 @@ func newMethod(m interface{}) *method {
m: reflect.ValueOf(m),
argsType: mType.In(1).Elem(),
returnType: mType.Out(0).Elem(),
ws: mType.NumIn() == 3,
}
}

type service struct {
client *client.Client
methods map[string]*method
logger log.Logger
}

func newService(c *client.Client) *service {
func newService(c *client.Client, l log.Logger) *service {
s := service{
client: c,
logger: l,
}
s.methods = map[string]*method{
"subscribe": newMethod(s.Subscribe),
Expand Down Expand Up @@ -76,16 +85,78 @@ func newService(c *client.Client) *service {
return &s
}

func (s *service) Subscribe(req *http.Request, args *SubscribeArgs) (*ctypes.ResultSubscribe, error) {
return nil, errors.New("not implemented")
func (s *service) Subscribe(req *http.Request, args *SubscribeArgs, wsConn *wsConn) (*ctypes.ResultSubscribe, error) {
addr := req.RemoteAddr

// TODO(tzdybal): pass config and check subscriptions limits

q, err := tmquery.New(args.Query)
if err != nil {
return nil, fmt.Errorf("failed to parse query: %w", err)
}

s.logger.Debug("subscribe to query", "remote", addr, "query", args.Query)

// TODO(tzdybal): extract consts or configs
const SubscribeTimeout = 5 * time.Second
const subBufferSize = 100
ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout)
defer cancel()

sub, err := s.client.EventBus.Subscribe(ctx, addr, q, subBufferSize)
if err != nil {
return nil, fmt.Errorf("failed to subscribe: %w", err)
}

go func() {
for {
select {
case msg := <-sub.Out():
data, err := json.Marshal(msg.Data())
if err != nil {
s.logger.Error("failed to marshal response data", "error", err)
continue
}
if wsConn != nil {
wsConn.queue <- data
}
case <-sub.Cancelled():
if sub.Err() != pubsub.ErrUnsubscribed {
var reason string
if sub.Err() == nil {
reason = "unknown failure"
} else {
reason = sub.Err().Error()
}
s.logger.Error("subscription was cancelled", "reason", reason)
}
if wsConn != nil {
close(wsConn.queue)
}
return
}
}
}()

return &ctypes.ResultSubscribe{}, nil
}

func (s *service) Unsubscribe(req *http.Request, args *UnsubscribeArgs) (*EmptyResult, error) {
return nil, errors.New("not implemented")
s.logger.Debug("unsubscribe from query", "remote", req.RemoteAddr, "query", args.Query)
err := s.client.Unsubscribe(context.Background(), req.RemoteAddr, args.Query)
if err != nil {
return nil, fmt.Errorf("failed to unsubscribe: %w", err)
}
return &EmptyResult{}, nil
}

func (s *service) UnsubscribeAll(req *http.Request, args *UnsubscribeAllArgs) (*EmptyResult, error) {
return nil, errors.New("not implemented")
s.logger.Debug("unsubscribe from all queries", "remote", req.RemoteAddr)
err := s.client.UnsubscribeAll(context.Background(), req.RemoteAddr)
if err != nil {
return nil, fmt.Errorf("failed to unsubscribe all: %w", err)
}
return &EmptyResult{}, nil
}

// info API
Expand Down
Loading

0 comments on commit 39e4050

Please sign in to comment.