From d8e93c7fea6cb35200f373fd2b601075bab621b9 Mon Sep 17 00:00:00 2001 From: Vitaliy Filippov Date: Mon, 19 Oct 2020 15:14:47 +0300 Subject: [PATCH] etcdserver: Fix 64 KB websocket notification message limit This fixes etcd being unable to send any message longer than 64 KB as a notification over the websocket. This was because the older version of grpc-websocket-proxy was used and WithMaxRespBodyBufferSize option wasn't set. --- embed/serve.go | 1 + go.mod | 2 +- go.sum | 4 +- .../wsproxy/websocket_proxy.go | 108 +++++++++++++++++- 4 files changed, 106 insertions(+), 9 deletions(-) diff --git a/embed/serve.go b/embed/serve.go index a3b20c46c38f..adf0ee8beacb 100644 --- a/embed/serve.go +++ b/embed/serve.go @@ -286,6 +286,7 @@ func (sctx *serveCtx) createMux(gwmux *gw.ServeMux, handler http.Handler) *http. return outgoing }, ), + wsproxy.WithMaxRespBodyBufferSize(0x7fffffff), ), ) } diff --git a/go.mod b/go.mod index a08f171ad8f2..d6c0407abd98 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( github.com/soheilhy/cmux v0.1.4 github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.1 - github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 + github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 github.com/urfave/cli v1.20.0 github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 go.etcd.io/bbolt v1.3.3 diff --git a/go.sum b/go.sum index 6f5aacc95809..df6019580536 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,8 @@ github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE= -github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966 h1:j6JEOq5QWFker+d7mFQYOhjTZonQ7YkLTHm56dbn+yM= +github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= diff --git a/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go b/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go index 0fca05a008ae..03cc5c68de19 100644 --- a/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go +++ b/vendor/github.com/tmc/grpc-websocket-proxy/wsproxy/websocket_proxy.go @@ -2,9 +2,11 @@ package wsproxy import ( "bufio" + "fmt" "io" "net/http" "strings" + "time" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" @@ -26,11 +28,16 @@ type RequestMutatorFunc func(incoming *http.Request, outgoing *http.Request) *ht // Proxy provides websocket transport upgrade to compatible endpoints. type Proxy struct { - h http.Handler - logger Logger - methodOverrideParam string - tokenCookieName string - requestMutator RequestMutatorFunc + h http.Handler + logger Logger + maxRespBodyBufferBytes int + methodOverrideParam string + tokenCookieName string + requestMutator RequestMutatorFunc + headerForwarder func(header string) bool + pingInterval time.Duration + pingWait time.Duration + pongWait time.Duration } // Logger collects log messages. @@ -50,6 +57,15 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Option allows customization of the proxy. type Option func(*Proxy) +// WithMaxRespBodyBufferSize allows specification of a custom size for the +// buffer used while reading the response body. By default, the bufio.Scanner +// used to read the response body sets the maximum token size to MaxScanTokenSize. +func WithMaxRespBodyBufferSize(nBytes int) Option { + return func(p *Proxy) { + p.maxRespBodyBufferBytes = nBytes + } +} + // WithMethodParamOverride allows specification of the special http parameter that is used in the proxied streaming request. func WithMethodParamOverride(param string) Option { return func(p *Proxy) { @@ -71,6 +87,13 @@ func WithRequestMutator(fn RequestMutatorFunc) Option { } } +// WithForwardedHeaders allows controlling which headers are forwarded. +func WithForwardedHeaders(fn func(header string) bool) Option { + return func(p *Proxy) { + p.headerForwarder = fn + } +} + // WithLogger allows a custom FieldLogger to be supplied func WithLogger(logger Logger) Option { return func(p *Proxy) { @@ -78,6 +101,28 @@ func WithLogger(logger Logger) Option { } } +// WithPingControl allows specification of ping pong control. The interval +// parameter specifies the pingInterval between pings. The allowed wait time +// for a pong response is (pingInterval * 10) / 9. +func WithPingControl(interval time.Duration) Option { + return func(proxy *Proxy) { + proxy.pingInterval = interval + proxy.pongWait = (interval * 10) / 9 + proxy.pingWait = proxy.pongWait / 6 + } +} + +var defaultHeadersToForward = map[string]bool{ + "Origin": true, + "origin": true, + "Referer": true, + "referer": true, +} + +func defaultHeaderForwarder(header string) bool { + return defaultHeadersToForward[header] +} + // WebsocketProxy attempts to expose the underlying handler as a bidi websocket stream with newline-delimited // JSON as the content encoding. // @@ -96,6 +141,7 @@ func WebsocketProxy(h http.Handler, opts ...Option) http.Handler { logger: logrus.New(), methodOverrideParam: MethodOverrideParam, tokenCookieName: TokenCookieName, + headerForwarder: defaultHeaderForwarder, } for _, o := range opts { o(p) @@ -144,7 +190,12 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) { return } if swsp := r.Header.Get("Sec-WebSocket-Protocol"); swsp != "" { - request.Header.Set("Authorization", strings.Replace(swsp, "Bearer, ", "Bearer ", 1)) + request.Header.Set("Authorization", transformSubProtocolHeader(swsp)) + } + for header := range r.Header { + if p.headerForwarder(header) { + request.Header.Set(header, r.Header.Get(header)) + } } // If token cookie is present, populate Authorization header from the cookie instead. if cookie, err := r.Cookie(p.tokenCookieName); err == nil { @@ -175,6 +226,10 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) { // read loop -- take messages from websocket and write to http request go func() { + if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 { + conn.SetReadDeadline(time.Now().Add(p.pongWait)) + conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(p.pongWait)); return nil }) + } defer func() { cancelFn() }() @@ -206,8 +261,38 @@ func (p *Proxy) proxy(w http.ResponseWriter, r *http.Request) { } } }() + // ping write loop + if p.pingInterval > 0 && p.pingWait > 0 && p.pongWait > 0 { + go func() { + ticker := time.NewTicker(p.pingInterval) + defer func() { + ticker.Stop() + conn.Close() + }() + for { + select { + case <-ctx.Done(): + p.logger.Debugln("ping loop done") + return + case <-ticker.C: + conn.SetWriteDeadline(time.Now().Add(p.pingWait)) + if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } + }() + } // write loop -- take messages from response and write to websocket scanner := bufio.NewScanner(responseBodyR) + + // if maxRespBodyBufferSize has been specified, use custom buffer for scanner + var scannerBuf []byte + if p.maxRespBodyBufferBytes > 0 { + scannerBuf = make([]byte, 0, 64*1024) + scanner.Buffer(scannerBuf, p.maxRespBodyBufferBytes) + } + for scanner.Scan() { if len(scanner.Bytes()) == 0 { p.logger.Warnln("[write] empty scan", scanner.Err()) @@ -239,6 +324,17 @@ func newInMemoryResponseWriter(w io.Writer) *inMemoryResponseWriter { } } +// IE and Edge do not delimit Sec-WebSocket-Protocol strings with spaces +func transformSubProtocolHeader(header string) string { + tokens := strings.SplitN(header, "Bearer,", 2) + + if len(tokens) < 2 { + return "" + } + + return fmt.Sprintf("Bearer %v", strings.Trim(tokens[1], " ")) +} + func (w *inMemoryResponseWriter) Write(b []byte) (int, error) { return w.Writer.Write(b) }