diff --git a/CHANGELOG-PENDING.md b/CHANGELOG-PENDING.md index b293e4aba66..2c969017bae 100644 --- a/CHANGELOG-PENDING.md +++ b/CHANGELOG-PENDING.md @@ -12,11 +12,12 @@ Month, DD, YYYY - [indexer] [Implement block and transaction indexing, enable TxSearch RPC endpoint #202](https://github.com/celestiaorg/optimint/pull/202) [@mattdf](https://github.com/mattdf) - [rpc] [Tendermint URI RPC #224](https://github.com/celestiaorg/optimint/pull/224) [@tzdybal](https://github.com/tzdybal/) +- [rpc] [Subscription methods #252](https://github.com/celestiaorg/optimint/pull/252) [@tzdybal](https://github.com/tzdybal/) ### IMPROVEMENTS - [ci] [Add more linters #219](https://github.com/celestiaorg/optimint/pull/219) [@tzdybal](https://github.com/tzdybal/) -- [deps] [Update dependencies: grpc, cors, cobra, viper, tm-db #245](https://github.com/celestiaorg/optimint/pull/245) [@tzdybal](https://github.com/tzdybal/) +- [deps] [Update dependencies: grpc, cors, cobra, viper, tm-db #245](https://github.com/celestiaorg/optimint/pull/245) [@tzdybal](https://github.com/tzdybal/) - [rpc] [Implement NumUnconfirmedTxs #255](https://github.com/celestiaorg/optimint/pull/255) [@tzdybal](https://github.com/tzdybal/) - [rpc] [Implement BlockByHash #256](https://github.com/celestiaorg/optimint/pull/256) [@mauriceLC92](https://github.com/mauriceLC92) - [rpc] [Implement BlockResults #263](https://github.com/celestiaorg/optimint/pull/263) [@tzdybal](https://github.com/tzdybal/) diff --git a/rpc/json/handler.go b/rpc/json/handler.go index 8b03b1314f2..82cc801b9c6 100644 --- a/rpc/json/handler.go +++ b/rpc/json/handler.go @@ -10,43 +10,50 @@ import ( "reflect" "strconv" + "github.com/celestiaorg/optimint/log" "github.com/gorilla/rpc/v2" "github.com/gorilla/rpc/v2/json2" - - "github.com/celestiaorg/optimint/log" ) type handler struct { - s *service - m *http.ServeMux - c rpc.Codec - l log.Logger + srv *service + mux *http.ServeMux + codec rpc.Codec + logger log.Logger } func newHandler(s *service, codec rpc.Codec, logger log.Logger) *handler { mux := http.NewServeMux() h := &handler{ - m: mux, - s: s, - c: codec, - l: logger, + srv: s, + mux: mux, + codec: codec, + logger: logger, } + mux.HandleFunc("/", h.serveJSONRPC) + mux.HandleFunc("/websocket", h.wsHandler) for name, method := range s.methods { logger.Debug("registering method", "name", name) mux.HandleFunc("/"+name, h.newHandler(method)) } + return h } func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { - h.m.ServeHTTP(w, r) + h.mux.ServeHTTP(w, r) } // serveJSONRPC serves HTTP request -// implementation is highly inspired by Gorilla RPC v2 (but simplified a lot) func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) { - // Create a new c request. - codecReq := h.c.NewRequest(r) + h.serveJSONRPCforWS(w, r, nil) +} + +// serveJSONRPC serves HTTP request +// implementation is highly inspired by Gorilla RPC v2 (but simplified a lot) +func (h *handler) serveJSONRPCforWS(w http.ResponseWriter, r *http.Request, wsConn *wsConn) { + // Create a new codec request. + codecReq := h.codec.NewRequest(r) // Get service method to be called. method, err := codecReq.Method() if err != nil { @@ -57,7 +64,7 @@ func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) { codecReq.WriteError(w, http.StatusBadRequest, err) return } - methodSpec, ok := h.s.methods[method] + methodSpec, ok := h.srv.methods[method] if !ok { codecReq.WriteError(w, int(json2.E_NO_METHOD), err) return @@ -70,10 +77,14 @@ func (h *handler) serveJSONRPC(w http.ResponseWriter, r *http.Request) { return } - rets := methodSpec.m.Call([]reflect.Value{ + callArgs := []reflect.Value{ reflect.ValueOf(r), args, - }) + } + if methodSpec.ws { + callArgs = append(callArgs, reflect.ValueOf(wsConn)) + } + rets := methodSpec.m.Call(callArgs) // Extract the result to error if needed. var errResult error @@ -171,7 +182,7 @@ func (h *handler) encodeAndWriteResponse(w http.ResponseWriter, result interface encoder := json.NewEncoder(w) err := encoder.Encode(resp) if err != nil { - h.l.Error("failed to encode RPC response", "error", err) + h.logger.Error("failed to encode RPC response", "error", err) } } diff --git a/rpc/json/service.go b/rpc/json/service.go index 0bef0797268..8c1f3748588 100644 --- a/rpc/json/service.go +++ b/rpc/json/service.go @@ -1,11 +1,16 @@ package json import ( - "errors" + "context" + "encoding/json" + "fmt" "net/http" "reflect" + "time" "github.com/gorilla/rpc/v2/json2" + "github.com/tendermint/tendermint/libs/pubsub" + tmquery "github.com/tendermint/tendermint/libs/pubsub/query" rpcclient "github.com/tendermint/tendermint/rpc/client" ctypes "github.com/tendermint/tendermint/rpc/core/types" @@ -14,13 +19,14 @@ import ( ) func GetHttpHandler(l *client.Client, logger log.Logger) (http.Handler, error) { - return newHandler(newService(l), json2.NewCodec(), logger), nil + return newHandler(newService(l, logger), json2.NewCodec(), logger), nil } type method struct { m reflect.Value argsType reflect.Type returnType reflect.Type + ws bool } func newMethod(m interface{}) *method { @@ -30,17 +36,20 @@ func newMethod(m interface{}) *method { m: reflect.ValueOf(m), argsType: mType.In(1).Elem(), returnType: mType.Out(0).Elem(), + ws: mType.NumIn() == 3, } } type service struct { client *client.Client methods map[string]*method + logger log.Logger } -func newService(c *client.Client) *service { +func newService(c *client.Client, l log.Logger) *service { s := service{ client: c, + logger: l, } s.methods = map[string]*method{ "subscribe": newMethod(s.Subscribe), @@ -76,16 +85,78 @@ func newService(c *client.Client) *service { return &s } -func (s *service) Subscribe(req *http.Request, args *SubscribeArgs) (*ctypes.ResultSubscribe, error) { - return nil, errors.New("not implemented") +func (s *service) Subscribe(req *http.Request, args *SubscribeArgs, wsConn *wsConn) (*ctypes.ResultSubscribe, error) { + addr := req.RemoteAddr + + // TODO(tzdybal): pass config and check subscriptions limits + + q, err := tmquery.New(args.Query) + if err != nil { + return nil, fmt.Errorf("failed to parse query: %w", err) + } + + s.logger.Debug("subscribe to query", "remote", addr, "query", args.Query) + + // TODO(tzdybal): extract consts or configs + const SubscribeTimeout = 5 * time.Second + const subBufferSize = 100 + ctx, cancel := context.WithTimeout(req.Context(), SubscribeTimeout) + defer cancel() + + sub, err := s.client.EventBus.Subscribe(ctx, addr, q, subBufferSize) + if err != nil { + return nil, fmt.Errorf("failed to subscribe: %w", err) + } + + go func() { + for { + select { + case msg := <-sub.Out(): + data, err := json.Marshal(msg.Data()) + if err != nil { + s.logger.Error("failed to marshal response data", "error", err) + continue + } + if wsConn != nil { + wsConn.queue <- data + } + case <-sub.Cancelled(): + if sub.Err() != pubsub.ErrUnsubscribed { + var reason string + if sub.Err() == nil { + reason = "unknown failure" + } else { + reason = sub.Err().Error() + } + s.logger.Error("subscription was cancelled", "reason", reason) + } + if wsConn != nil { + close(wsConn.queue) + } + return + } + } + }() + + return &ctypes.ResultSubscribe{}, nil } func (s *service) Unsubscribe(req *http.Request, args *UnsubscribeArgs) (*EmptyResult, error) { - return nil, errors.New("not implemented") + s.logger.Debug("unsubscribe from query", "remote", req.RemoteAddr, "query", args.Query) + err := s.client.Unsubscribe(context.Background(), req.RemoteAddr, args.Query) + if err != nil { + return nil, fmt.Errorf("failed to unsubscribe: %w", err) + } + return &EmptyResult{}, nil } func (s *service) UnsubscribeAll(req *http.Request, args *UnsubscribeAllArgs) (*EmptyResult, error) { - return nil, errors.New("not implemented") + s.logger.Debug("unsubscribe from all queries", "remote", req.RemoteAddr) + err := s.client.UnsubscribeAll(context.Background(), req.RemoteAddr) + if err != nil { + return nil, fmt.Errorf("failed to unsubscribe all: %w", err) + } + return &EmptyResult{}, nil } // info API diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index afdb0034ae6..16cc9d5a990 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -10,6 +10,7 @@ import ( "net/url" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -43,7 +44,7 @@ func TestHandlerMapping(t *testing.T) { resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) - assert.Equal(200, resp.Code) + assert.Equal(http.StatusOK, resp.Code) } func TestREST(t *testing.T) { @@ -65,20 +66,20 @@ func TestREST(t *testing.T) { bodyContains string }{ - {"invalid/malformed request", "/block?so{}wrong!", 200, int(json2.E_INVALID_REQ), ``}, - {"invalid/missing param", "/block", 200, int(json2.E_INVALID_REQ), `missing param 'height'`}, - {"valid/no params", "/abci_info", 200, -1, `"last_block_height":345`}, + {"invalid/malformed request", "/block?so{}wrong!", http.StatusOK, int(json2.E_INVALID_REQ), ``}, + {"invalid/missing param", "/block", http.StatusOK, int(json2.E_INVALID_REQ), `missing param 'height'`}, + {"valid/no params", "/abci_info", http.StatusOK, -1, `"last_block_height":345`}, // to keep test simple, allow returning application error in following case - {"valid/int param", "/block?height=321", 200, int(json2.E_INTERNAL), `"key not found"`}, - {"invalid/int param", "/block?height=foo", 200, int(json2.E_PARSE), "failed to parse param 'height'"}, + {"valid/int param", "/block?height=321", http.StatusOK, int(json2.E_INTERNAL), `"key not found"`}, + {"invalid/int param", "/block?height=foo", http.StatusOK, int(json2.E_PARSE), "failed to parse param 'height'"}, {"valid/bool int string params", "/tx_search?" + txSearchParams.Encode(), - 200, -1, `"total_count":0`}, + http.StatusOK, -1, `"total_count":0`}, {"invalid/bool int string params", "/tx_search?" + strings.Replace(txSearchParams.Encode(), "true", "blue", 1), - 200, int(json2.E_PARSE), "failed to parse param 'prove'"}, - {"valid/hex param", "/check_tx?tx=DEADBEEF", 200, -1, `"gas_used":"1000"`}, - {"invalid/hex param", "/check_tx?tx=QWERTY", 200, int(json2.E_PARSE), "failed to parse param 'tx'"}, + http.StatusOK, int(json2.E_PARSE), "failed to parse param 'prove'"}, + {"valid/hex param", "/check_tx?tx=DEADBEEF", http.StatusOK, -1, `"gas_used":"1000"`}, + {"invalid/hex param", "/check_tx?tx=QWERTY", http.StatusOK, int(json2.E_PARSE), "failed to parse param 'tx'"}, } _, local := getRPC(t) @@ -119,7 +120,7 @@ func TestEmptyRequest(t *testing.T) { resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) - assert.Equal(200, resp.Code) + assert.Equal(http.StatusOK, resp.Code) } func TestStringyRequest(t *testing.T) { @@ -139,9 +140,130 @@ func TestStringyRequest(t *testing.T) { resp := httptest.NewRecorder() handler.ServeHTTP(resp, req) - assert.Equal(200, resp.Code) + assert.Equal(http.StatusOK, resp.Code) assert.Equal(respJson, resp.Body.String()) +} + +func TestSubscription(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + const ( + query = "message.sender='cosmos1njr26e02fjcq3schxstv458a3w5szp678h23dh'" + query2 = "message.sender!='cosmos1njr26e02fjcq3schxstv458a3w5szp678h23dh'" + invalidQuery = "message.sender='broken" + ) + subscribeReq, err := json2.EncodeClientRequest("subscribe", &SubscribeArgs{ + Query: query, + }) + require.NoError(err) + require.NotEmpty(subscribeReq) + + subscribeReq2, err := json2.EncodeClientRequest("subscribe", &SubscribeArgs{ + Query: query2, + }) + require.NoError(err) + require.NotEmpty(subscribeReq2) + + invalidSubscribeReq, err := json2.EncodeClientRequest("subscribe", &SubscribeArgs{ + Query: invalidQuery, + }) + require.NoError(err) + require.NotEmpty(invalidSubscribeReq) + + unsubscribeReq, err := json2.EncodeClientRequest("unsubscribe", &UnsubscribeArgs{ + Query: query, + }) + require.NoError(err) + require.NotEmpty(unsubscribeReq) + + unsubscribeAllReq, err := json2.EncodeClientRequest("unsubscribe_all", &UnsubscribeAllArgs{}) + require.NoError(err) + require.NotEmpty(unsubscribeAllReq) + + _, local := getRPC(t) + handler, err := GetHttpHandler(local, log.TestingLogger()) + require.NoError(err) + + var ( + jsonResp response + ) + + // test valid subscription + req := httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(subscribeReq)) + resp := httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + assert.Nil(jsonResp.Error) + + // test valid subscription with second query + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(subscribeReq2)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + assert.Nil(jsonResp.Error) + + // test subscription with invalid query + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(invalidSubscribeReq)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + require.NotNil(jsonResp.Error) + assert.Contains(jsonResp.Error.Message, "failed to parse query") + + // test valid, but duplicate subscription + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(subscribeReq)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + require.NotNil(jsonResp.Error) + assert.Contains(jsonResp.Error.Message, "already subscribed") + + // test unsubscribing + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(unsubscribeReq)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + assert.Nil(jsonResp.Error) + + // test unsubscribing again + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(unsubscribeReq)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + require.NotNil(jsonResp.Error) + assert.Contains(jsonResp.Error.Message, "subscription not found") + + // test unsubscribe all + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(unsubscribeAllReq)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + assert.Nil(jsonResp.Error) + + // test unsubscribing all again + req = httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(unsubscribeAllReq)) + resp = httptest.NewRecorder() + handler.ServeHTTP(resp, req) + assert.Equal(http.StatusOK, resp.Code) + jsonResp = response{} + assert.NoError(json.Unmarshal(resp.Body.Bytes(), &jsonResp)) + require.NotNil(jsonResp.Error) + assert.Contains(jsonResp.Error.Message, "subscription not found") } // copied from rpc @@ -150,6 +272,9 @@ func getRPC(t *testing.T) (*mocks.Application, *client.Client) { require := require.New(t) app := &mocks.Application{} app.On("InitChain", mock.Anything).Return(abci.ResponseInitChain{}) + app.On("BeginBlock", mock.Anything).Return(abci.ResponseBeginBlock{}) + app.On("EndBlock", mock.Anything).Return(abci.ResponseEndBlock{}) + app.On("Commit", mock.Anything).Return(abci.ResponseCommit{}) app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{ GasWanted: 1000, GasUsed: 1000, @@ -162,10 +287,13 @@ func getRPC(t *testing.T) (*mocks.Application, *client.Client) { LastBlockAppHash: nil, }) key, _, _ := crypto.GenerateEd25519Key(rand.Reader) - node, err := node.NewNode(context.Background(), config.NodeConfig{DALayer: "mock"}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) + node, err := node.NewNode(context.Background(), config.NodeConfig{Aggregator: true, DALayer: "mock", BlockManagerConfig: config.BlockManagerConfig{BlockTime: 1 * time.Second}}, key, proxy.NewLocalClientCreator(app), &types.GenesisDoc{ChainID: "test"}, log.TestingLogger()) require.NoError(err) require.NotNil(node) + err = node.Start() + require.NoError(err) + local := client.NewClient(node) require.NotNil(local) diff --git a/rpc/json/types.go b/rpc/json/types.go index f5ec544c4b1..5f8c5c0f884 100644 --- a/rpc/json/types.go +++ b/rpc/json/types.go @@ -11,8 +11,10 @@ import ( ) type SubscribeArgs struct { + Query string `json:"query"` } type UnsubscribeArgs struct { + Query string `json:"query"` } type UnsubscribeAllArgs struct { } diff --git a/rpc/json/ws.go b/rpc/json/ws.go new file mode 100644 index 00000000000..ab737c41b33 --- /dev/null +++ b/rpc/json/ws.go @@ -0,0 +1,107 @@ +package json + +import ( + "bytes" + "io" + "net/http" + + "github.com/gorilla/websocket" + + "github.com/celestiaorg/optimint/log" +) + +type wsConn struct { + conn *websocket.Conn + queue chan []byte + logger log.Logger +} + +func (wsc *wsConn) sendLoop() { + for msg := range wsc.queue { + writer, err := wsc.conn.NextWriter(websocket.TextMessage) + if err != nil { + wsc.logger.Error("failed to create writer", "error", err) + continue + } + _, err = writer.Write(msg) + if err != nil { + wsc.logger.Error("failed to write message", "error", err) + } + if err = writer.Close(); err != nil { + wsc.logger.Error("failed to close writer", "error", err) + } + } +} + +func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) { + // TODO(tzdybal): configuration options + upgrader := websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + wsc, err := upgrader.Upgrade(w, r, nil) + remoteAddr := wsc.RemoteAddr().String() + if err != nil { + h.logger.Error("failed to update to WebSocket connection", "error", err, "address", remoteAddr) + } + defer func() { + err := wsc.Close() + if err != nil { + h.logger.Error("failed to close WebSocket connection", "err") + } + }() + + ws := &wsConn{ + conn: wsc, + queue: make(chan []byte), + } + go ws.sendLoop() + + for { + mt, r, err := wsc.NextReader() + if err != nil { + h.logger.Debug("failed to read next WebSocket message", "error", err) + } + + if mt != websocket.TextMessage { + h.logger.Error("expected text message") + continue + } + req, err := http.NewRequest(http.MethodGet, "", r) + req.RemoteAddr = remoteAddr + if err != nil { + h.logger.Error("failed to create request", "error", err) + continue + } + + writer := new(bytes.Buffer) + h.serveJSONRPCforWS(newResponseWriter(writer), req, ws) + ws.queue <- writer.Bytes() + } + +} + +func newResponseWriter(w io.Writer) http.ResponseWriter { + return &wsResponse{w} +} + +// wsResponse is a simple implementation of http.ResponseWriter +type wsResponse struct { + w io.Writer +} + +var _ http.ResponseWriter = wsResponse{} + +// Write use underlying writer to write response to WebSocket +func (w wsResponse) Write(bytes []byte) (int, error) { + return w.w.Write(bytes) +} + +func (w wsResponse) Header() http.Header { + return http.Header{} + +} + +func (w wsResponse) WriteHeader(statusCode int) { +} diff --git a/rpc/json/ws_test.go b/rpc/json/ws_test.go new file mode 100644 index 00000000000..53b890bf369 --- /dev/null +++ b/rpc/json/ws_test.go @@ -0,0 +1,83 @@ +package json + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/rpc/v2/json2" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tendermint/tendermint/libs/log" + tmtypes "github.com/tendermint/tendermint/types" +) + +func TestWebSockets(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + + _, local := getRPC(t) + handler, err := GetHttpHandler(local, log.TestingLogger()) + require.NoError(err) + + srv := httptest.NewServer(handler) + + conn, resp, err := websocket.DefaultDialer.Dial(strings.Replace(srv.URL, "http://", "ws://", 1)+"/websocket", nil) + require.NoError(err) + require.NotNil(resp) + require.NotNil(conn) + defer conn.Close() + + assert.Equal(http.StatusSwitchingProtocols, resp.StatusCode) + + err = conn.WriteMessage(websocket.TextMessage, []byte(` +{ + "jsonrpc": "2.0", + "method": "subscribe", + "id": 7, + "params": { + "query": "tm.event='NewBlock'" + } +} +`)) + assert.NoError(err) + + err = conn.SetReadDeadline(time.Now().Add(1 * time.Second)) + assert.NoError(err) + typ, msg, err := conn.ReadMessage() + assert.NoError(err) + assert.Equal(websocket.TextMessage, typ) + assert.NotEmpty(msg) + + // wait for new block event + err = conn.SetReadDeadline(time.Now().Add(3 * time.Second)) + assert.NoError(err) + typ, msg, err = conn.ReadMessage() + assert.NoError(err) + assert.Equal(websocket.TextMessage, typ) + assert.NotEmpty(msg) + var payload tmtypes.EventDataNewBlock + err = json.Unmarshal(msg, &payload) + assert.NoError(err) + assert.NotNil(payload.ResultBeginBlock) + assert.NotNil(payload.Block) + assert.GreaterOrEqual(payload.Block.Height, int64(1)) + assert.NotNil(payload.ResultEndBlock) + + unsubscribeAllReq, err := json2.EncodeClientRequest("unsubscribe_all", &UnsubscribeAllArgs{}) + require.NoError(err) + require.NotEmpty(unsubscribeAllReq) + req := httptest.NewRequest(http.MethodGet, "/", bytes.NewReader(unsubscribeAllReq)) + req.RemoteAddr = conn.LocalAddr().String() + rsp := httptest.NewRecorder() + handler.ServeHTTP(rsp, req) + assert.Equal(http.StatusOK, rsp.Code) + jsonResp := response{} + assert.NoError(json.Unmarshal(rsp.Body.Bytes(), &jsonResp)) + assert.Nil(jsonResp.Error) +}