diff --git a/pkg/flags/uint32.go b/pkg/flags/uint32.go new file mode 100644 index 00000000000..496730a4549 --- /dev/null +++ b/pkg/flags/uint32.go @@ -0,0 +1,45 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "strconv" +) + +type uint32Value uint32 + +// NewUint32Value creates an uint32 instance with the provided value. +func NewUint32Value(v uint32) *uint32Value { + val := new(uint32Value) + *val = uint32Value(v) + return val +} + +// Set parses a command line uint32 value. +// Implements "flag.Value" interface. +func (i *uint32Value) Set(s string) error { + v, err := strconv.ParseUint(s, 0, 32) + *i = uint32Value(v) + return err +} + +func (i *uint32Value) String() string { return strconv.FormatUint(uint64(*i), 10) } + +// Uint32FromFlag return the uint32 value of a flag with the given name +func Uint32FromFlag(fs *flag.FlagSet, name string) uint32 { + val := *fs.Lookup(name).Value.(*uint32Value) + return uint32(val) +} diff --git a/pkg/flags/uint32_test.go b/pkg/flags/uint32_test.go new file mode 100644 index 00000000000..aa7487a2320 --- /dev/null +++ b/pkg/flags/uint32_test.go @@ -0,0 +1,111 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package flags + +import ( + "flag" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUint32Value(t *testing.T) { + cases := []struct { + name string + s string + expectedVal uint32 + expectError bool + }{ + { + name: "normal uint32 value", + s: "200", + expectedVal: 200, + }, + { + name: "zero value", + s: "0", + expectedVal: 0, + }, + { + name: "negative int value", + s: "-200", + expectError: true, + }, + { + name: "invalid integer value", + s: "invalid", + expectError: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + var val uint32Value + err := val.Set(tc.s) + + if tc.expectError { + if err == nil { + t.Errorf("Expected failure on parsing uint32 value from %s", tc.s) + } + } else { + if err != nil { + t.Errorf("Unexpected error when parsing %s: %v", tc.s, err) + } + assert.Equal(t, uint32(val), tc.expectedVal) + } + }) + } +} + +func TestUint32FromFlag(t *testing.T) { + const flagName = "max-concurrent-streams" + + cases := []struct { + name string + defaultVal uint32 + arguments []string + expectedVal uint32 + }{ + { + name: "only default value", + defaultVal: 15, + arguments: []string{}, + expectedVal: 15, + }, + { + name: "argument has different value from the default one", + defaultVal: 16, + arguments: []string{"--max-concurrent-streams", "200"}, + expectedVal: 200, + }, + { + name: "argument has the same value from the default one", + defaultVal: 105, + arguments: []string{"--max-concurrent-streams", "105"}, + expectedVal: 105, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + fs := flag.NewFlagSet("etcd", flag.ContinueOnError) + fs.Var(NewUint32Value(tc.defaultVal), flagName, "Maximum concurrent streams that each client can open at a time.") + if err := fs.Parse(tc.arguments); err != nil { + t.Fatalf("Unexpected error: %v\n", err) + } + actualMaxStream := Uint32FromFlag(fs, flagName) + assert.Equal(t, actualMaxStream, tc.expectedVal) + }) + } +} diff --git a/server/config/config.go b/server/config/config.go index 75d7df6c4a6..9ecfc146336 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -129,6 +129,10 @@ type ServerConfig struct { // MaxRequestBytes is the maximum request size to send over raft. MaxRequestBytes uint + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. + MaxConcurrentStreams uint32 + WarningApplyDuration time.Duration WarningUnaryRequestDuration time.Duration diff --git a/server/embed/config.go b/server/embed/config.go index 2cacf5ea4f6..4e1f6a19c07 100644 --- a/server/embed/config.go +++ b/server/embed/config.go @@ -17,6 +17,7 @@ package embed import ( "errors" "fmt" + "math" "net" "net/http" "net/url" @@ -59,6 +60,7 @@ const ( DefaultWarningApplyDuration = 100 * time.Millisecond DefaultWarningUnaryRequestDuration = 300 * time.Millisecond DefaultMaxRequestBytes = 1.5 * 1024 * 1024 + DefaultMaxConcurrentStreams = math.MaxUint32 DefaultGRPCKeepAliveMinTime = 5 * time.Second DefaultGRPCKeepAliveInterval = 2 * time.Hour DefaultGRPCKeepAliveTimeout = 20 * time.Second @@ -205,6 +207,10 @@ type Config struct { MaxTxnOps uint `json:"max-txn-ops"` MaxRequestBytes uint `json:"max-request-bytes"` + // MaxConcurrentStreams specifies the maximum number of concurrent + // streams that each client can open at a time. + MaxConcurrentStreams uint32 `json:"max-concurrent-streams"` + LPUrls, LCUrls []url.URL APUrls, ACUrls []url.URL ClientTLSInfo transport.TLSInfo @@ -311,7 +317,7 @@ type Config struct { AuthToken string `json:"auth-token"` BcryptCost uint `json:"bcrypt-cost"` - //The AuthTokenTTL in seconds of the simple token + // AuthTokenTTL in seconds of the simple token AuthTokenTTL uint `json:"auth-token-ttl"` ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` @@ -462,6 +468,7 @@ func NewConfig() *Config { MaxTxnOps: DefaultMaxTxnOps, MaxRequestBytes: DefaultMaxRequestBytes, + MaxConcurrentStreams: DefaultMaxConcurrentStreams, ExperimentalWarningApplyDuration: DefaultWarningApplyDuration, ExperimentalWarningUnaryRequestDuration: DefaultWarningUnaryRequestDuration, diff --git a/server/embed/etcd.go b/server/embed/etcd.go index 766336ccb04..564ad5e7a2c 100644 --- a/server/embed/etcd.go +++ b/server/embed/etcd.go @@ -191,6 +191,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { BackendBatchInterval: cfg.BackendBatchInterval, MaxTxnOps: cfg.MaxTxnOps, MaxRequestBytes: cfg.MaxRequestBytes, + MaxConcurrentStreams: cfg.MaxConcurrentStreams, SocketOpts: cfg.SocketOpts, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, @@ -336,7 +337,10 @@ func print(lg *zap.Logger, ec Config, sc config.ServerConfig, memberInitialized zap.String("initial-cluster", sc.InitialPeerURLsMap.String()), zap.String("initial-cluster-state", ec.ClusterState), zap.String("initial-cluster-token", sc.InitialClusterToken), - zap.Int64("quota-size-bytes", quota), + zap.Int64("quota-backend-bytes", quota), + zap.Uint("max-request-bytes", sc.MaxRequestBytes), + zap.Uint32("max-concurrent-streams", sc.MaxConcurrentStreams), + zap.Bool("pre-vote", sc.PreVote), zap.Bool("initial-corrupt-check", sc.InitialCorruptCheck), zap.String("corrupt-check-time-interval", sc.CorruptCheckTime.String()), diff --git a/server/embed/serve.go b/server/embed/serve.go index cb3d1c72d9d..08a1dc841f2 100644 --- a/server/embed/serve.go +++ b/server/embed/serve.go @@ -30,6 +30,7 @@ import ( "go.etcd.io/etcd/client/v3/credentials" "go.etcd.io/etcd/pkg/v3/debugutil" "go.etcd.io/etcd/pkg/v3/httputil" + "go.etcd.io/etcd/server/v3/config" "go.etcd.io/etcd/server/v3/etcdserver" "go.etcd.io/etcd/server/v3/etcdserver/api/v3client" "go.etcd.io/etcd/server/v3/etcdserver/api/v3election" @@ -44,6 +45,7 @@ import ( "github.com/soheilhy/cmux" "github.com/tmc/grpc-websocket-proxy/wsproxy" "go.uber.org/zap" + "golang.org/x/net/http2" "golang.org/x/net/trace" "google.golang.org/grpc" ) @@ -148,6 +150,10 @@ func (sctx *serveCtx) serve( Handler: createAccessController(sctx.lg, s, httpmux), ErrorLog: logger, // do not log user error } + if err := configureHttpServer(srvhttp, s.Cfg); err != nil { + sctx.lg.Error("Configure http server failed", zap.Error(err)) + return err + } httpl := m.Match(cmux.HTTP1()) go func() { errHandler(srvhttp.Serve(httpl)) }() @@ -197,6 +203,10 @@ func (sctx *serveCtx) serve( TLSConfig: tlscfg, ErrorLog: logger, // do not log user error } + if err := configureHttpServer(srv, s.Cfg); err != nil { + sctx.lg.Error("Configure https server failed", zap.Error(err)) + return err + } go func() { errHandler(srv.Serve(tlsl)) }() sctx.serversC <- &servers{secure: true, grpc: gs, http: srv} @@ -209,6 +219,13 @@ func (sctx *serveCtx) serve( return m.Serve() } +func configureHttpServer(srv *http.Server, cfg config.ServerConfig) error { + // todo (ahrtr): should we support configuring other parameters in the future as well? + return http2.ConfigureServer(srv, &http2.Server{ + MaxConcurrentStreams: cfg.MaxConcurrentStreams, + }) +} + // grpcHandlerFunc returns an http.Handler that delegates to grpcServer on incoming gRPC // connections or otherHandler otherwise. Given in gRPC docs. func grpcHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Handler { diff --git a/server/etcdmain/config.go b/server/etcdmain/config.go index 8091589dfce..28f81e33eba 100644 --- a/server/etcdmain/config.go +++ b/server/etcdmain/config.go @@ -138,6 +138,8 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.SocketOpts.ReusePort, "socket-reuse-port", cfg.ec.SocketOpts.ReusePort, "Enable to set socket option SO_REUSEPORT on listeners allowing rebinding of a port already in use.") fs.BoolVar(&cfg.ec.SocketOpts.ReuseAddress, "socket-reuse-address", cfg.ec.SocketOpts.ReuseAddress, "Enable to set socket option SO_REUSEADDR on listeners allowing binding to an address in `TIME_WAIT` state.") + fs.Var(flags.NewUint32Value(cfg.ec.MaxConcurrentStreams), "max-concurrent-streams", "Maximum concurrent streams that each client can open at a time.") + // raft connection timeouts fs.DurationVar(&rafthttp.ConnReadTimeout, "raft-read-timeout", rafthttp.DefaultConnReadTimeout, "Read timeout set on each rafthttp connection") fs.DurationVar(&rafthttp.ConnWriteTimeout, "raft-write-timeout", rafthttp.DefaultConnWriteTimeout, "Write timeout set on each rafthttp connection") @@ -380,6 +382,8 @@ func (cfg *config) configFromCmdLine() error { cfg.ec.CipherSuites = flags.StringsFromFlag(cfg.cf.flagSet, "cipher-suites") + cfg.ec.MaxConcurrentStreams = flags.Uint32FromFlag(cfg.cf.flagSet, "max-concurrent-streams") + cfg.ec.LogOutputs = flags.UniqueStringsFromFlag(cfg.cf.flagSet, "log-outputs") cfg.ec.ClusterState = cfg.cf.clusterState.String() diff --git a/server/etcdmain/grpc_proxy.go b/server/etcdmain/grpc_proxy.go index 55cc96d0cf9..b13520695b7 100644 --- a/server/etcdmain/grpc_proxy.go +++ b/server/etcdmain/grpc_proxy.go @@ -47,6 +47,7 @@ import ( "github.com/soheilhy/cmux" "github.com/spf13/cobra" "go.uber.org/zap" + "golang.org/x/net/http2" "google.golang.org/grpc" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -95,6 +96,8 @@ var ( grpcKeepAliveMinTime time.Duration grpcKeepAliveTimeout time.Duration grpcKeepAliveInterval time.Duration + + maxConcurrentStreams uint32 ) const defaultGRPCMaxCallSendMsgSize = 1.5 * 1024 * 1024 @@ -159,6 +162,8 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().BoolVar(&grpcProxyDebug, "debug", false, "Enable debug-level logging for grpc-proxy.") + cmd.Flags().Uint32Var(&maxConcurrentStreams, "max-concurrent-streams", math.MaxUint32, "Maximum concurrent streams that each client can open at a time.") + return &cmd } @@ -212,6 +217,13 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { httpClient := mustNewHTTPClient(lg) srvhttp, httpl := mustHTTPListener(lg, m, tlsinfo, client, proxyClient) + + if err := http2.ConfigureServer(srvhttp, &http2.Server{ + MaxConcurrentStreams: maxConcurrentStreams, + }); err != nil { + lg.Fatal("Failed to configure the http server", zap.Error(err)) + } + errc := make(chan error, 3) go func() { errc <- newGRPCProxyServer(lg, client).Serve(grpcl) }() go func() { errc <- srvhttp.Serve(httpl) }() diff --git a/server/etcdmain/help.go b/server/etcdmain/help.go index efa2a77e8e0..a7f569a8e66 100644 --- a/server/etcdmain/help.go +++ b/server/etcdmain/help.go @@ -81,6 +81,8 @@ Member: Maximum number of operations permitted in a transaction. --max-request-bytes '1572864' Maximum client request size in bytes the server will accept. + --max-concurrent-streams 'math.MaxUint32' + Maximum concurrent streams that each client can open at a time. --grpc-keepalive-min-time '5s' Minimum duration interval that a client should wait before pinging server. --grpc-keepalive-interval '2h' diff --git a/server/etcdserver/api/v3rpc/grpc.go b/server/etcdserver/api/v3rpc/grpc.go index ea3dd75705f..349ebea4007 100644 --- a/server/etcdserver/api/v3rpc/grpc.go +++ b/server/etcdserver/api/v3rpc/grpc.go @@ -32,7 +32,6 @@ import ( const ( grpcOverheadBytes = 512 * 1024 - maxStreams = math.MaxUint32 maxSendBytes = math.MaxInt32 ) @@ -68,7 +67,7 @@ func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnarySer opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes))) opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes)) - opts = append(opts, grpc.MaxConcurrentStreams(maxStreams)) + opts = append(opts, grpc.MaxConcurrentStreams(s.Cfg.MaxConcurrentStreams)) grpcServer := grpc.NewServer(append(opts, gopts...)...) diff --git a/tests/e2e/ctl_v3_test.go b/tests/e2e/ctl_v3_test.go index 8b11cf6b516..20e4362f491 100644 --- a/tests/e2e/ctl_v3_test.go +++ b/tests/e2e/ctl_v3_test.go @@ -133,6 +133,7 @@ type ctlCtx struct { envMap map[string]string dialTimeout time.Duration + testTimeout time.Duration quorum bool // if true, set up 3-node cluster and linearizable read interactive bool @@ -166,6 +167,10 @@ func withDialTimeout(timeout time.Duration) ctlOption { return func(cx *ctlCtx) { cx.dialTimeout = timeout } } +func withTestTimeout(timeout time.Duration) ctlOption { + return func(cx *ctlCtx) { cx.testTimeout = timeout } +} + func withQuorum() ctlOption { return func(cx *ctlCtx) { cx.quorum = true } } @@ -198,6 +203,14 @@ func withFlagByEnv() ctlOption { return func(cx *ctlCtx) { cx.envMap = make(map[string]string) } } +// This function must be called after the `withCfg`, otherwise its value +// may be overwritten by `withCfg`. +func withMaxConcurrentStreams(streams uint32) ctlOption { + return func(cx *ctlCtx) { + cx.cfg.MaxConcurrentStreams = streams + } +} + func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { testCtlWithOffline(t, testFunc, nil, opts...) } @@ -262,10 +275,8 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx t.Log("---testFunc logic DONE") }() - timeout := 2*cx.dialTimeout + time.Second - if cx.dialTimeout == 0 { - timeout = 30 * time.Second - } + timeout := cx.getTestTimeout() + select { case <-time.After(timeout): testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) @@ -282,6 +293,17 @@ func runCtlTest(t *testing.T, testFunc func(ctlCtx), testOfflineFunc func(ctlCtx } } +func (cx *ctlCtx) getTestTimeout() time.Duration { + timeout := cx.testTimeout + if timeout == 0 { + timeout = 2*cx.dialTimeout + time.Second + if cx.dialTimeout == 0 { + timeout = 30 * time.Second + } + } + return timeout +} + func (cx *ctlCtx) prefixArgs(eps []string) []string { fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") diff --git a/tests/e2e/v3_curl_maxstream_test.go b/tests/e2e/v3_curl_maxstream_test.go new file mode 100644 index 00000000000..44dfd3dc1bd --- /dev/null +++ b/tests/e2e/v3_curl_maxstream_test.go @@ -0,0 +1,218 @@ +// Copyright 2022 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "encoding/json" + "fmt" + "sync" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/client/pkg/v3/testutil" + "go.etcd.io/etcd/tests/v3/framework/e2e" +) + +// NO TLS +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +/* +// There are lots of "device not configured" errors. I suspect it's an issue +// of the project `github.com/creack/pty`. I manually executed the test case +// with 1000 concurrent streams, and confirmed it's working as expected. +// TODO(ahrtr): investigate the test failure in the future. +func TestV3Curl_MaxStreamsNoTLS_BelowLimit_Large(t *testing.T) { + f, err := setRLimit(10240) + if err != nil { + t.Fatal(err) + } + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(1000), withTestTimeout(200*time.Second)) + f() +} */ + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_NoTLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigNoTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +// TLS +func TestV3Curl_MaxStreams_BelowLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_BelowLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, false, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Small(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(3)) +} + +func TestV3Curl_MaxStreams_ReachLimit_TLS_Medium(t *testing.T) { + testV3CurlMaxStream(t, true, withCfg(*e2e.NewConfigTLS()), withMaxConcurrentStreams(100), withTestTimeout(20*time.Second)) +} + +func testV3CurlMaxStream(t *testing.T, reachLimit bool, opts ...ctlOption) { + e2e.BeforeTest(t) + + // Step 1: generate configuration for creating cluster + t.Log("Generating configuration for creating cluster.") + cx := getDefaultCtlCtx(t) + cx.applyOpts(opts) + // We must set the `ClusterSize` to 1, otherwise different streams may + // connect to different members, accordingly it's difficult to test the + // behavior. + cx.cfg.ClusterSize = 1 + + // Step 2: create the cluster + t.Log("Creating an etcd cluster") + epc, err := e2e.NewEtcdProcessCluster(t, &cx.cfg) + if err != nil { + t.Fatalf("Failed to start etcd cluster: %v", err) + } + cx.epc = epc + cx.dataDir = epc.Procs[0].Config().DataDirPath + + // Step 3: run test + // (a) generate ${concurrentNumber} concurrent watch streams; + // (b) submit a range request. + var wg sync.WaitGroup + concurrentNumber := cx.cfg.MaxConcurrentStreams - 1 + expectedResponse := `"revision":"` + if reachLimit { + concurrentNumber = cx.cfg.MaxConcurrentStreams + expectedResponse = "Operation timed out" + } + wg.Add(int(concurrentNumber)) + t.Logf("Running the test, MaxConcurrentStreams: %d, concurrentNumber: %d, expectedResponse: %s\n", + cx.cfg.MaxConcurrentStreams, concurrentNumber, expectedResponse) + errCh := make(chan error, concurrentNumber) + submitConcurrentWatch(cx, int(concurrentNumber), &wg, errCh) + submitRangeAfterConcurrentWatch(cx, expectedResponse) + + // Step 4: check the watch errors. Note that we only check the watch error + // before closing cluster. Once we close the cluster, the watch must run + // into error, and we should ignore them by then. + t.Log("Checking watch error.") + select { + case werr := <-errCh: + t.Fatal(werr) + default: + } + + // Step 5: Close the cluster + t.Log("Closing test cluster...") + assert.NoError(t, epc.Close()) + t.Log("Closed test cluster") + + // Step 6: Waiting all watch goroutines to exit. + donec := make(chan struct{}) + go func() { + defer close(donec) + wg.Wait() + }() + + timeout := cx.getTestTimeout() + t.Logf("Waiting test case to finish, timeout: %s", timeout) + select { + case <-time.After(timeout): + testutil.FatalStack(t, fmt.Sprintf("test timed out after %v", timeout)) + case <-donec: + t.Log("All watch goroutines exited.") + } + + t.Log("testV3CurlMaxStream done!") +} + +func submitConcurrentWatch(cx ctlCtx, number int, wgDone *sync.WaitGroup, errCh chan<- error) { + watchData, err := json.Marshal(&pb.WatchRequest_CreateRequest{ + CreateRequest: &pb.WatchCreateRequest{ + Key: []byte("foo"), + }, + }) + if err != nil { + cx.t.Fatal(err) + } + + var wgSchedule sync.WaitGroup + wgSchedule.Add(number) + for i := 0; i < number; i++ { + go func(i int) { + wgSchedule.Done() + defer wgDone.Done() + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/watch", Value: string(watchData), Expected: `"revision":"`}); err != nil { + werr := fmt.Errorf("testV3CurlMaxStream watch failed: %d, error: %v", i, err) + cx.t.Error(werr) + errCh <- werr + } + }(i) + } + // make sure all goroutines have already been scheduled. + wgSchedule.Wait() + // We need to make sure all watch streams have already been created. + // For simplicity, we just sleep 3 second. We may consider improving + // it in the future. + time.Sleep(3 * time.Second) +} + +func submitRangeAfterConcurrentWatch(cx ctlCtx, expectedValue string) { + rangeData, err := json.Marshal(&pb.RangeRequest{ + Key: []byte("foo"), + }) + if err != nil { + cx.t.Fatal(err) + } + + cx.t.Log("Submitting range request...") + if err := e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: "/v3/kv/range", Value: string(rangeData), Expected: expectedValue, Timeout: 5}); err != nil { + cx.t.Fatalf("testV3CurlMaxStream get failed, error: %v", err) + } + cx.t.Log("range request done") +} + +// setRLimit sets the open file limitation, and return a function which +// is used to reset the limitation. +func setRLimit(nofile uint64) (func() error, error) { + var rLimit syscall.Rlimit + if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return nil, fmt.Errorf("failed to get open file limit, error: %v", err) + } + + var wLimit syscall.Rlimit + wLimit.Max = nofile + wLimit.Cur = nofile + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &wLimit); err != nil { + return nil, fmt.Errorf("failed to set max open file limit, %v", err) + } + + return func() error { + if err := syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { + return fmt.Errorf("failed reset max open file limit, %v", err) + } + return nil + }, nil +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4b1daf93d78..fece5f5b00f 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -176,6 +176,8 @@ type EtcdProcessClusterConfig struct { DiscoveryEndpoints []string // v3 discovery DiscoveryToken string LogLevel string + + MaxConcurrentStreams uint32 // default is math.MaxUint32 } // NewEtcdProcessCluster launches a new cluster from etcd processes, returning @@ -341,6 +343,10 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []* args = append(args, "--log-level", cfg.LogLevel) } + if cfg.MaxConcurrentStreams != 0 { + args = append(args, "--max-concurrent-streams", fmt.Sprintf("%d", cfg.MaxConcurrentStreams)) + } + etcdCfgs[i] = &EtcdServerProcessConfig{ lg: lg, ExecPath: cfg.ExecPath,