From 37d3e4b985e7355e27e19641307b8d850d660bcf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BA=8C=E6=89=8B=E6=8E=89=E5=8C=85=E5=B7=A5=E7=A8=8B?= =?UTF-8?q?=E5=B8=88?= Date: Sun, 1 Aug 2021 11:25:05 +0800 Subject: [PATCH] cmd(refactor): add server cmd and cleanup old server and test cmds (#2383) --- cmd/root.go | 6 + cmd/server.go | 233 -------------------- cmd/test.go | 73 ------- cmd/version.go | 33 --- pkg/cmd/cmd.go | 2 + pkg/cmd/factory/factory_impl.go | 7 + pkg/cmd/server/server.go | 288 +++++++++++++++++++++++++ {cmd => pkg/cmd/server}/server_test.go | 205 +++++++++++------- pkg/cmd/util/helper.go | 87 ++++++++ pkg/cmd/util/helper_test.go | 106 +++++++++ 10 files changed, 628 insertions(+), 412 deletions(-) delete mode 100644 cmd/server.go delete mode 100644 cmd/test.go delete mode 100644 cmd/version.go create mode 100644 pkg/cmd/server/server.go rename {cmd => pkg/cmd/server}/server_test.go (70%) diff --git a/cmd/root.go b/cmd/root.go index 22ee518f938..8804cba62ae 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -16,6 +16,8 @@ package cmd import ( "os" + "github.com/pingcap/ticdc/pkg/cmd/server" + "github.com/pingcap/ticdc/pkg/cmd/version" "github.com/spf13/cobra" ) @@ -29,6 +31,10 @@ var rootCmd = &cobra.Command{ func Execute() { // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) + + rootCmd.AddCommand(server.NewCmdServer()) + rootCmd.AddCommand(version.NewCmdVersion()) + if err := rootCmd.Execute(); err != nil { rootCmd.Println(err) os.Exit(1) diff --git a/cmd/server.go b/cmd/server.go deleted file mode 100644 index 97eb2d3ef24..00000000000 --- a/cmd/server.go +++ /dev/null @@ -1,233 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "context" - "strings" - "time" - - "github.com/fatih/color" - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/pingcap/ticdc/cdc" - "github.com/pingcap/ticdc/cdc/puller/sorter" - "github.com/pingcap/ticdc/pkg/config" - cerror "github.com/pingcap/ticdc/pkg/errors" - "github.com/pingcap/ticdc/pkg/logutil" - "github.com/pingcap/ticdc/pkg/util" - "github.com/pingcap/ticdc/pkg/version" - ticonfig "github.com/pingcap/tidb/config" - "github.com/spf13/cobra" - "github.com/spf13/pflag" - "go.uber.org/zap" -) - -var ( - serverPdAddr string - serverConfigFilePath string - - serverConfig = config.GetDefaultServerConfig() - - serverCmd = &cobra.Command{ - Use: "server", - Short: "Start a TiCDC capture server", - RunE: runEServer, - } -) - -func patchTiDBConf() { - ticonfig.UpdateGlobal(func(conf *ticonfig.Config) { - // Disable kv client batch send loop introduced by tidb library, which is not used in TiCDC server - conf.TiKVClient.MaxBatchSize = 0 - }) -} - -func init() { - patchTiDBConf() - rootCmd.AddCommand(serverCmd) - initServerCmd(serverCmd) -} - -func initServerCmd(cmd *cobra.Command) { - defaultServerConfig := config.GetDefaultServerConfig() - cmd.Flags().StringVar(&serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use ',' to separate multiple PDs") - cmd.Flags().StringVar(&serverConfig.Addr, "addr", defaultServerConfig.Addr, "Set the listening address") - cmd.Flags().StringVar(&serverConfig.AdvertiseAddr, "advertise-addr", defaultServerConfig.AdvertiseAddr, "Set the advertise listening address for client communication") - cmd.Flags().StringVar(&serverConfig.TZ, "tz", defaultServerConfig.TZ, "Specify time zone of TiCDC cluster") - cmd.Flags().Int64Var(&serverConfig.GcTTL, "gc-ttl", defaultServerConfig.GcTTL, "CDC GC safepoint TTL duration, specified in seconds") - cmd.Flags().StringVar(&serverConfig.LogFile, "log-file", defaultServerConfig.LogFile, "log file path") - cmd.Flags().StringVar(&serverConfig.LogLevel, "log-level", defaultServerConfig.LogLevel, "log level (etc: debug|info|warn|error)") - cmd.Flags().StringVar(&serverConfig.DataDir, "data-dir", defaultServerConfig.DataDir, "the path to the directory used to store TiCDC-generated data") - cmd.Flags().DurationVar((*time.Duration)(&serverConfig.OwnerFlushInterval), "owner-flush-interval", time.Duration(defaultServerConfig.OwnerFlushInterval), "owner flushes changefeed status interval") - cmd.Flags().DurationVar((*time.Duration)(&serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(defaultServerConfig.ProcessorFlushInterval), "processor flushes task status interval") - - cmd.Flags().IntVar(&serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", defaultServerConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size") - cmd.Flags().IntVar(&serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", defaultServerConfig.Sorter.NumConcurrentWorker, "sorter concurrency level") - cmd.Flags().Uint64Var(&serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", defaultServerConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting") - // 80 is safe on most systems. - cmd.Flags().IntVar(&serverConfig.Sorter.MaxMemoryPressure, "sorter-max-memory-percentage", defaultServerConfig.Sorter.MaxMemoryPressure, "system memory usage threshold for forcing in-disk sort") - // We use 8GB as a safe default before we support local configuration file. - cmd.Flags().Uint64Var(&serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", defaultServerConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort") - cmd.Flags().StringVar(&serverConfig.Sorter.SortDir, "sort-dir", defaultServerConfig.Sorter.SortDir, "sorter's temporary file directory") - - addSecurityFlags(cmd.Flags(), true /* isServer */) - - cmd.Flags().StringVar(&serverConfigFilePath, "config", "", "Path of the configuration file") - _ = cmd.Flags().MarkHidden("sort-dir") //nolint:errcheck -} - -func runEServer(cmd *cobra.Command, args []string) error { - conf, err := loadAndVerifyServerConfig(cmd) - if err != nil { - return errors.Trace(err) - } - - cancel := initCmd(cmd, &logutil.Config{ - File: conf.LogFile, - Level: conf.LogLevel, - FileMaxSize: conf.Log.File.MaxSize, - FileMaxDays: conf.Log.File.MaxDays, - FileMaxBackups: conf.Log.File.MaxBackups, - }) - defer cancel() - tz, err := util.GetTimezone(conf.TZ) - if err != nil { - return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`") - } - config.StoreGlobalServerConfig(conf) - ctx := util.PutTimezoneInCtx(defaultContext, tz) - ctx = util.PutCaptureAddrInCtx(ctx, conf.AdvertiseAddr) - - version.LogVersionInfo() - if util.FailpointBuild { - for _, path := range failpoint.List() { - status, err := failpoint.Status(path) - if err != nil { - log.Error("fail to get failpoint status", zap.Error(err)) - } - log.Info("failpoint enabled", zap.String("path", path), zap.String("status", status)) - } - } - - logHTTPProxies() - server, err := cdc.NewServer(strings.Split(serverPdAddr, ",")) - if err != nil { - return errors.Annotate(err, "new server") - } - err = server.Run(ctx) - if err != nil && errors.Cause(err) != context.Canceled { - log.Error("run server", zap.String("error", errors.ErrorStack(err))) - return errors.Annotate(err, "run server") - } - server.Close() - sorter.UnifiedSorterCleanUp() - log.Info("cdc server exits successfully") - - return nil -} - -func loadAndVerifyServerConfig(cmd *cobra.Command) (*config.ServerConfig, error) { - serverConfig.Security = getCredential() - - conf := config.GetDefaultServerConfig() - if len(serverConfigFilePath) > 0 { - if err := strictDecodeFile(serverConfigFilePath, "TiCDC server", conf); err != nil { - return nil, err - } - // user specified sort-dir should not take effect, it's always `/tmp/sorter` - // if user try to set sort-dir by config file, warn it. - if conf.Sorter.SortDir != config.DefaultSortDir { - cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in server settings. " + - "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) - - conf.Sorter.SortDir = config.DefaultSortDir - } - } - cmd.Flags().Visit(func(flag *pflag.Flag) { - switch flag.Name { - case "addr": - conf.Addr = serverConfig.Addr - case "advertise-addr": - conf.AdvertiseAddr = serverConfig.AdvertiseAddr - case "tz": - conf.TZ = serverConfig.TZ - case "gc-ttl": - conf.GcTTL = serverConfig.GcTTL - case "log-file": - conf.LogFile = serverConfig.LogFile - case "log-level": - conf.LogLevel = serverConfig.LogLevel - case "data-dir": - conf.DataDir = serverConfig.DataDir - case "owner-flush-interval": - conf.OwnerFlushInterval = serverConfig.OwnerFlushInterval - case "processor-flush-interval": - conf.ProcessorFlushInterval = serverConfig.ProcessorFlushInterval - case "sorter-num-workerpool-goroutine": - conf.Sorter.NumWorkerPoolGoroutine = serverConfig.Sorter.NumWorkerPoolGoroutine - case "sorter-num-concurrent-worker": - conf.Sorter.NumConcurrentWorker = serverConfig.Sorter.NumConcurrentWorker - case "sorter-chunk-size-limit": - conf.Sorter.ChunkSizeLimit = serverConfig.Sorter.ChunkSizeLimit - case "sorter-max-memory-percentage": - conf.Sorter.MaxMemoryPressure = serverConfig.Sorter.MaxMemoryPressure - case "sorter-max-memory-consumption": - conf.Sorter.MaxMemoryConsumption = serverConfig.Sorter.MaxMemoryConsumption - case "ca": - conf.Security.CAPath = serverConfig.Security.CAPath - case "cert": - conf.Security.CertPath = serverConfig.Security.CertPath - case "key": - conf.Security.KeyPath = serverConfig.Security.KeyPath - case "cert-allowed-cn": - conf.Security.CertAllowedCN = serverConfig.Security.CertAllowedCN - case "sort-dir": - conf.Sorter.SortDir = serverConfig.Sorter.SortDir - case "pd", "config": - // do nothing - default: - log.Panic("unknown flag, please report a bug", zap.String("flagName", flag.Name)) - } - }) - - // user specified sorter dir should not take effect, it's always `/tmp/sorter` - // if user try to set sort-dir by flag, warn it. - if conf.Sorter.SortDir != config.DefaultSortDir { - cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in server settings. " + - "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) - - conf.Sorter.SortDir = config.DefaultSortDir - } - - if err := conf.ValidateAndAdjust(); err != nil { - return nil, errors.Trace(err) - } - if len(serverPdAddr) == 0 { - return nil, cerror.ErrInvalidServerOption.GenWithStack("empty PD address") - } - for _, ep := range strings.Split(serverPdAddr, ",") { - if err := verifyPdEndpoint(ep, conf.Security.IsTLSEnabled()); err != nil { - return nil, cerror.ErrInvalidServerOption.Wrap(err).GenWithStackByCause() - } - } - - if conf.DataDir == "" { - cmd.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " + - "Please use `cdc server --data-dir` to start the cdc server if possible.\n")) - } - - return conf, nil -} diff --git a/cmd/test.go b/cmd/test.go deleted file mode 100644 index 636f9c0c92b..00000000000 --- a/cmd/test.go +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "fmt" - "os" - "strings" - - "github.com/pingcap/ticdc/cdc/kv" - "github.com/spf13/cobra" - "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" -) - -var testPdAddr string - -func init() { - rootCmd.AddCommand(testKVCmd) - - testKVCmd.Flags().StringVar(&testPdAddr, "pd", "http://127.0.0.1:2379", "address of PD") -} - -type testingT struct { -} - -// Errorf implements require.TestingT -func (t *testingT) Errorf(format string, args ...interface{}) { - fmt.Printf(format, args...) -} - -// FailNow implements require.TestingT -func (t *testingT) FailNow() { - os.Exit(-1) -} - -var testKVCmd = &cobra.Command{ - Hidden: true, - Use: "testkv", - Short: "test kv", - Long: ``, - Run: func(cmd *cobra.Command, args []string) { - addrs := strings.Split(testPdAddr, ",") - cli, err := pd.NewClient(addrs, getCredential().PDSecurityOption()) - if err != nil { - fmt.Println(err) - return - } - - storage, err := kv.CreateStorage(addrs[0]) - if err != nil { - fmt.Println(err) - return - } - - tikvStorage := storage.(tikv.Storage) // we know it is tikv. - - t := new(testingT) - kv.TestGetKVSimple(t, cli, tikvStorage, storage) - kv.TestSplit(t, cli, tikvStorage, storage) - }, -} diff --git a/cmd/version.go b/cmd/version.go deleted file mode 100644 index f257e054f8a..00000000000 --- a/cmd/version.go +++ /dev/null @@ -1,33 +0,0 @@ -// Copyright 2020 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "fmt" - - "github.com/pingcap/ticdc/pkg/version" - "github.com/spf13/cobra" -) - -func init() { - rootCmd.AddCommand(versionCmd) -} - -var versionCmd = &cobra.Command{ - Use: "version", - Short: "Output version information", - Run: func(cmd *cobra.Command, args []string) { - fmt.Println(version.GetRawInfo()) - }, -} diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 63e178569b8..fedc3cbc8e3 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -16,6 +16,7 @@ package cmd import ( "os" + "github.com/pingcap/ticdc/pkg/cmd/server" "github.com/pingcap/ticdc/pkg/cmd/version" "github.com/spf13/cobra" ) @@ -36,6 +37,7 @@ func Run() { // Outputs cmd.Print to stdout. cmd.SetOut(os.Stdout) + cmd.AddCommand(server.NewCmdServer()) cmd.AddCommand(version.NewCmdVersion()) if err := cmd.Execute(); err != nil { diff --git a/pkg/cmd/factory/factory_impl.go b/pkg/cmd/factory/factory_impl.go index 21b91ad42e3..b9f4014773e 100644 --- a/pkg/cmd/factory/factory_impl.go +++ b/pkg/cmd/factory/factory_impl.go @@ -47,26 +47,32 @@ func NewFactory(clientGetter ClientGetter) Factory { return f } +// ToTLSConfig returns the configuration of tls. func (f *factoryImpl) ToTLSConfig() (*tls.Config, error) { return f.clientGetter.ToTLSConfig() } +// ToGRPCDialOption returns the option of GRPC dial. func (f *factoryImpl) ToGRPCDialOption() (grpc.DialOption, error) { return f.clientGetter.ToGRPCDialOption() } +// GetPdAddr returns pd address. func (f *factoryImpl) GetPdAddr() string { return f.clientGetter.GetPdAddr() } +// GetLogLevel returns log level. func (f *factoryImpl) GetLogLevel() string { return f.clientGetter.GetLogLevel() } +// GetCredential returns security credentials. func (f *factoryImpl) GetCredential() *security.Credential { return f.clientGetter.GetCredential() } +// EtcdClient creates new cdc etcd client. func (f *factoryImpl) EtcdClient() (*kv.CDCEtcdClient, error) { ctx := cmdconetxt.GetDefaultContext() @@ -119,6 +125,7 @@ func (f *factoryImpl) EtcdClient() (*kv.CDCEtcdClient, error) { return &client, nil } +// PdClient creates new pd client. func (f factoryImpl) PdClient() (pd.Client, error) { ctx := cmdconetxt.GetDefaultContext() diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go new file mode 100644 index 00000000000..c5ad05d3867 --- /dev/null +++ b/pkg/cmd/server/server.go @@ -0,0 +1,288 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "strings" + "time" + + "github.com/fatih/color" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc" + "github.com/pingcap/ticdc/cdc/puller/sorter" + cmdcontext "github.com/pingcap/ticdc/pkg/cmd/context" + "github.com/pingcap/ticdc/pkg/cmd/util" + "github.com/pingcap/ticdc/pkg/config" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/logutil" + "github.com/pingcap/ticdc/pkg/security" + ticdcutil "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/pkg/version" + ticonfig "github.com/pingcap/tidb/config" + "github.com/spf13/cobra" + "github.com/spf13/pflag" + "go.uber.org/zap" +) + +// options defines flags for the `server` command. +type options struct { + serverConfig *config.ServerConfig + serverPdAddr string + serverConfigFilePath string + + // TODO(hi-rustin): Consider using a client construction factory here. + caPath string + certPath string + keyPath string + allowedCertCN string +} + +// newOptions creates new options for the `server` command. +func newOptions() *options { + return &options{ + serverConfig: config.GetDefaultServerConfig(), + } +} + +// addFlags receives a *cobra.Command reference and binds +// flags related to template printing to it. +func (o *options) addFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&o.serverConfig.Addr, "addr", o.serverConfig.Addr, "Set the listening address") + cmd.Flags().StringVar(&o.serverConfig.AdvertiseAddr, "advertise-addr", o.serverConfig.AdvertiseAddr, "Set the advertise listening address for client communication") + cmd.Flags().StringVar(&o.serverConfig.TZ, "tz", o.serverConfig.TZ, "Specify time zone of TiCDC cluster") + cmd.Flags().Int64Var(&o.serverConfig.GcTTL, "gc-ttl", o.serverConfig.GcTTL, "CDC GC safepoint TTL duration, specified in seconds") + cmd.Flags().StringVar(&o.serverConfig.LogFile, "log-file", o.serverConfig.LogFile, "log file path") + cmd.Flags().StringVar(&o.serverConfig.LogLevel, "log-level", o.serverConfig.LogLevel, "log level (etc: debug|info|warn|error)") + cmd.Flags().StringVar(&o.serverConfig.DataDir, "data-dir", o.serverConfig.DataDir, "the path to the directory used to store TiCDC-generated data") + cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.OwnerFlushInterval), "owner-flush-interval", time.Duration(o.serverConfig.OwnerFlushInterval), "owner flushes changefeed status interval") + cmd.Flags().DurationVar((*time.Duration)(&o.serverConfig.ProcessorFlushInterval), "processor-flush-interval", time.Duration(o.serverConfig.ProcessorFlushInterval), "processor flushes task status interval") + cmd.Flags().IntVar(&o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter-num-workerpool-goroutine", o.serverConfig.Sorter.NumWorkerPoolGoroutine, "sorter workerpool size") + cmd.Flags().IntVar(&o.serverConfig.Sorter.NumConcurrentWorker, "sorter-num-concurrent-worker", o.serverConfig.Sorter.NumConcurrentWorker, "sorter concurrency level") + cmd.Flags().Uint64Var(&o.serverConfig.Sorter.ChunkSizeLimit, "sorter-chunk-size-limit", o.serverConfig.Sorter.ChunkSizeLimit, "size of heaps for sorting") + // 80 is safe on most systems. + cmd.Flags().IntVar(&o.serverConfig.Sorter.MaxMemoryPressure, "sorter-max-memory-percentage", o.serverConfig.Sorter.MaxMemoryPressure, "system memory usage threshold for forcing in-disk sort") + // We use 8GB as a safe default before we support local configuration file. + cmd.Flags().Uint64Var(&o.serverConfig.Sorter.MaxMemoryConsumption, "sorter-max-memory-consumption", o.serverConfig.Sorter.MaxMemoryConsumption, "maximum memory consumption of in-memory sort") + cmd.Flags().StringVar(&o.serverConfig.Sorter.SortDir, "sort-dir", o.serverConfig.Sorter.SortDir, "sorter's temporary file directory") + cmd.Flags().StringVar(&o.serverPdAddr, "pd", "http://127.0.0.1:2379", "Set the PD endpoints to use. Use ',' to separate multiple PDs") + cmd.Flags().StringVar(&o.serverConfigFilePath, "config", "", "Path of the configuration file") + + cmd.Flags().StringVar(&o.caPath, "ca", "", "CA certificate path for TLS connection") + cmd.Flags().StringVar(&o.certPath, "cert", "", "Certificate path for TLS connection") + cmd.Flags().StringVar(&o.keyPath, "key", "", "Private key path for TLS connection") + cmd.Flags().StringVar(&o.allowedCertCN, "cert-allowed-cn", "", "Verify caller's identity (cert Common Name). Use ',' to separate multiple CN") + _ = cmd.Flags().MarkHidden("sort-dir") +} + +// run runs the server cmd. +func (o *options) run(cmd *cobra.Command) error { + cancel := util.InitCmd(cmd, &logutil.Config{ + File: o.serverConfig.LogFile, + Level: o.serverConfig.LogLevel, + FileMaxSize: o.serverConfig.Log.File.MaxSize, + FileMaxDays: o.serverConfig.Log.File.MaxDays, + FileMaxBackups: o.serverConfig.Log.File.MaxBackups, + }) + defer cancel() + + tz, err := ticdcutil.GetTimezone(o.serverConfig.TZ) + if err != nil { + return errors.Annotate(err, "can not load timezone, Please specify the time zone through environment variable `TZ` or command line parameters `--tz`") + } + + config.StoreGlobalServerConfig(o.serverConfig) + ctx := ticdcutil.PutTimezoneInCtx(cmdcontext.GetDefaultContext(), tz) + ctx = ticdcutil.PutCaptureAddrInCtx(ctx, o.serverConfig.AdvertiseAddr) + + version.LogVersionInfo() + if ticdcutil.FailpointBuild { + for _, path := range failpoint.List() { + status, err := failpoint.Status(path) + if err != nil { + log.Error("fail to get failpoint status", zap.Error(err)) + } + log.Info("failpoint enabled", zap.String("path", path), zap.String("status", status)) + } + } + + util.LogHTTPProxies() + server, err := cdc.NewServer(strings.Split(o.serverPdAddr, ",")) + if err != nil { + return errors.Annotate(err, "new server") + } + err = server.Run(ctx) + if err != nil && errors.Cause(err) != context.Canceled { + log.Error("run server", zap.String("error", errors.ErrorStack(err))) + return errors.Annotate(err, "run server") + } + server.Close() + sorter.UnifiedSorterCleanUp() + log.Info("cdc server exits successfully") + + return nil +} + +// complete adapts from the command line args and config file to the data required. +func (o *options) complete(cmd *cobra.Command) error { + o.serverConfig.Security = o.getCredential() + + cfg := config.GetDefaultServerConfig() + + if len(o.serverConfigFilePath) > 0 { + if err := util.StrictDecodeFile(o.serverConfigFilePath, "TiCDC server", cfg); err != nil { + return err + } + + // User specified sort-dir should not take effect, it's always `/tmp/sorter` + // if user try to set sort-dir by config file, warn it. + if cfg.Sorter.SortDir != config.DefaultSortDir { + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in server settings. " + + "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) + + cfg.Sorter.SortDir = config.DefaultSortDir + } + } + + cmd.Flags().Visit(func(flag *pflag.Flag) { + switch flag.Name { + case "addr": + cfg.Addr = o.serverConfig.Addr + case "advertise-addr": + cfg.AdvertiseAddr = o.serverConfig.AdvertiseAddr + case "tz": + cfg.TZ = o.serverConfig.TZ + case "gc-ttl": + cfg.GcTTL = o.serverConfig.GcTTL + case "log-file": + cfg.LogFile = o.serverConfig.LogFile + case "log-level": + cfg.LogLevel = o.serverConfig.LogLevel + case "data-dir": + cfg.DataDir = o.serverConfig.DataDir + case "owner-flush-interval": + cfg.OwnerFlushInterval = o.serverConfig.OwnerFlushInterval + case "processor-flush-interval": + cfg.ProcessorFlushInterval = o.serverConfig.ProcessorFlushInterval + case "sorter-num-workerpool-goroutine": + cfg.Sorter.NumWorkerPoolGoroutine = o.serverConfig.Sorter.NumWorkerPoolGoroutine + case "sorter-num-concurrent-worker": + cfg.Sorter.NumConcurrentWorker = o.serverConfig.Sorter.NumConcurrentWorker + case "sorter-chunk-size-limit": + cfg.Sorter.ChunkSizeLimit = o.serverConfig.Sorter.ChunkSizeLimit + case "sorter-max-memory-percentage": + cfg.Sorter.MaxMemoryPressure = o.serverConfig.Sorter.MaxMemoryPressure + case "sorter-max-memory-consumption": + cfg.Sorter.MaxMemoryConsumption = o.serverConfig.Sorter.MaxMemoryConsumption + case "ca": + cfg.Security.CAPath = o.serverConfig.Security.CAPath + case "cert": + cfg.Security.CertPath = o.serverConfig.Security.CertPath + case "key": + cfg.Security.KeyPath = o.serverConfig.Security.KeyPath + case "cert-allowed-cn": + cfg.Security.CertAllowedCN = o.serverConfig.Security.CertAllowedCN + case "sort-dir": + // user specified sorter dir should not take effect, it's always `/tmp/sorter` + // if user try to set sort-dir by flag, warn it. + if o.serverConfig.Sorter.SortDir != config.DefaultSortDir { + cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in server settings. " + + "sort-dir will be set to `{data-dir}/tmp/sorter`. The sort-dir here will be no-op\n")) + } + cfg.Sorter.SortDir = config.DefaultSortDir + case "pd", "config": + // do nothing + default: + log.Panic("unknown flag, please report a bug", zap.String("flagName", flag.Name)) + } + }) + + if err := cfg.ValidateAndAdjust(); err != nil { + return errors.Trace(err) + } + + if cfg.DataDir == "" { + cmd.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " + + "Please use `cdc server --data-dir` to start the cdc server if possible.\n")) + } + + o.serverConfig = cfg + + return nil +} + +// validate checks that the provided attach options are specified. +func (o *options) validate() error { + if len(o.serverPdAddr) == 0 { + return cerror.ErrInvalidServerOption.GenWithStack("empty PD address") + } + for _, ep := range strings.Split(o.serverPdAddr, ",") { + // NOTICE: The configuration used here is the one that has been completed, + // as it may be configured by the configuration file. + if err := util.VerifyPdEndpoint(ep, o.serverConfig.Security.IsTLSEnabled()); err != nil { + return cerror.ErrInvalidServerOption.Wrap(err).GenWithStackByCause() + } + } + return nil +} + +// getCredential returns security credential. +func (o *options) getCredential() *security.Credential { + var certAllowedCN []string + if len(o.allowedCertCN) != 0 { + certAllowedCN = strings.Split(o.allowedCertCN, ",") + } + + return &security.Credential{ + CAPath: o.caPath, + CertPath: o.certPath, + KeyPath: o.keyPath, + CertAllowedCN: certAllowedCN, + } +} + +// NewCmdServer creates the `server` command. +func NewCmdServer() *cobra.Command { + o := newOptions() + + command := &cobra.Command{ + Use: "server", + Short: "Start a TiCDC capture server", + RunE: func(cmd *cobra.Command, args []string) error { + err := o.complete(cmd) + if err != nil { + return err + } + err = o.validate() + if err != nil { + return err + } + return o.run(cmd) + }, + } + + patchTiDBConf() + o.addFlags(command) + + return command +} + +func patchTiDBConf() { + ticonfig.UpdateGlobal(func(conf *ticonfig.Config) { + // Disable kv client batch send loop introduced by tidb library, which is not used in TiCDC server + conf.TiKVClient.MaxBatchSize = 0 + }) +} diff --git a/cmd/server_test.go b/pkg/cmd/server/server_test.go similarity index 70% rename from cmd/server_test.go rename to pkg/cmd/server/server_test.go index c9aadc7930c..ee547cf600a 100644 --- a/cmd/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -11,12 +11,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cmd +package server import ( "fmt" "io/ioutil" "path/filepath" + "testing" "time" "github.com/pingcap/check" @@ -26,6 +27,8 @@ import ( "github.com/spf13/cobra" ) +func TestSuite(t *testing.T) { check.TestingT(t) } + type serverSuite struct{} var _ = check.Suite(&serverSuite{}) @@ -37,84 +40,90 @@ func (s *serverSuite) TestPatchTiDBConf(c *check.C) { c.Assert(cfg.TiKVClient.MaxBatchSize, check.Equals, uint(0)) } -func (s *serverSuite) TestDataDirServerConfig(c *check.C) { +func (s *serverSuite) TestValidateWithEmptyPdAddress(c *check.C) { defer testleak.AfterTest(c)() cmd := new(cobra.Command) - initServerCmd(cmd) - c.Assert(cmd.ParseFlags([]string{}), check.IsNil) - cfg, err := loadAndVerifyServerConfig(cmd) - c.Assert(err, check.IsNil) - c.Assert(cfg, check.NotNil) - // data dir default to "" - c.Assert(cfg.DataDir, check.Equals, "") - c.Assert(cfg.Sorter.SortDir, check.Equals, filepath.Join("", "/tmp/sorter")) + o := newOptions() + o.addFlags(cmd) - dataDir := c.MkDir() - cmd = new(cobra.Command) - initServerCmd(cmd) - c.Assert(cmd.ParseFlags([]string{"--data-dir=" + dataDir}), check.IsNil) - cfg, err = loadAndVerifyServerConfig(cmd) + c.Assert(cmd.ParseFlags([]string{"--pd="}), check.IsNil) + err := o.complete(cmd) c.Assert(err, check.IsNil) - c.Assert(cfg, check.NotNil) - c.Assert(cfg.DataDir, check.Equals, dataDir) - // sorter-dir is not set yet - c.Assert(cfg.Sorter.SortDir, check.Equals, "/tmp/sorter") + err = o.validate() + c.Assert(err, check.ErrorMatches, ".*empty PD address.*") } -func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { +func (s *serverSuite) TestValidateWithInvalidPdAddress(c *check.C) { defer testleak.AfterTest(c)() - // test default flag values cmd := new(cobra.Command) - initServerCmd(cmd) - c.Assert(cmd.ParseFlags([]string{}), check.IsNil) - cfg, err := loadAndVerifyServerConfig(cmd) - c.Assert(err, check.IsNil) - c.Assert(cfg, check.NotNil) - - defcfg := config.GetDefaultServerConfig() - c.Assert(defcfg.ValidateAndAdjust(), check.IsNil) - c.Assert(cfg, check.DeepEquals, defcfg) - c.Assert(serverPdAddr, check.Equals, "http://127.0.0.1:2379") - - // test empty PD address - cmd = new(cobra.Command) - initServerCmd(cmd) - c.Assert(cmd.ParseFlags([]string{"--pd="}), check.IsNil) - _, err = loadAndVerifyServerConfig(cmd) - c.Assert(err, check.ErrorMatches, ".*empty PD address.*") + o := newOptions() + o.addFlags(cmd) - // test invalid PD address - cmd = new(cobra.Command) - initServerCmd(cmd) c.Assert(cmd.ParseFlags([]string{"--pd=aa"}), check.IsNil) - _, err = loadAndVerifyServerConfig(cmd) + err := o.complete(cmd) + c.Assert(err, check.IsNil) + err = o.validate() c.Assert(err, check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*") +} + +func (s *serverSuite) TestValidateWithInvalidPdAddressWithoutHost(c *check.C) { + defer testleak.AfterTest(c)() + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) - // test invalid PD address(without host) - cmd = new(cobra.Command) - initServerCmd(cmd) c.Assert(cmd.ParseFlags([]string{"--pd=http://"}), check.IsNil) - _, err = loadAndVerifyServerConfig(cmd) + err := o.complete(cmd) + c.Assert(err, check.IsNil) + err = o.validate() c.Assert(err, check.ErrorMatches, ".*PD endpoint should be a valid http or https URL.*") +} + +func (s *serverSuite) TestValidateWithHttpsPdAddressWithoutCertificate(c *check.C) { + defer testleak.AfterTest(c)() + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) - // test missing certificate - cmd = new(cobra.Command) - initServerCmd(cmd) c.Assert(cmd.ParseFlags([]string{"--pd=https://aa"}), check.IsNil) - _, err = loadAndVerifyServerConfig(cmd) + err := o.complete(cmd) + c.Assert(err, check.IsNil) + err = o.validate() c.Assert(err, check.ErrorMatches, ".*PD endpoint scheme is https, please provide certificate.*") +} + +func (s *serverSuite) TestAddUnknownFlag(c *check.C) { + defer testleak.AfterTest(c)() + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) - // test undefined flag - cmd = new(cobra.Command) - initServerCmd(cmd) c.Assert(cmd.ParseFlags([]string{"--PD="}), check.ErrorMatches, ".*unknown flag: --PD.*") - _, err = loadAndVerifyServerConfig(cmd) +} + +func (s *serverSuite) TestDefaultCfg(c *check.C) { + defer testleak.AfterTest(c)() + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) + + c.Assert(cmd.ParseFlags([]string{}), check.IsNil) + err := o.complete(cmd) c.Assert(err, check.IsNil) - // test flags without config file + defaultCfg := config.GetDefaultServerConfig() + c.Assert(defaultCfg.ValidateAndAdjust(), check.IsNil) + c.Assert(o.serverConfig, check.DeepEquals, defaultCfg) + c.Assert(o.serverPdAddr, check.Equals, "http://127.0.0.1:2379") +} + +func (s *serverSuite) TestParseCfg(c *check.C) { + defer testleak.AfterTest(c)() dataDir := c.MkDir() - cmd = new(cobra.Command) - initServerCmd(cmd) + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) + c.Assert(cmd.ParseFlags([]string{ "--addr", "127.5.5.1:8833", "--advertise-addr", "127.5.5.1:7777", @@ -135,9 +144,12 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { "--sorter-num-workerpool-goroutine", "90", "--sort-dir", "/tmp/just_a_test", }), check.IsNil) - cfg, err = loadAndVerifyServerConfig(cmd) + + err := o.complete(cmd) c.Assert(err, check.IsNil) - c.Assert(cfg, check.DeepEquals, &config.ServerConfig{ + err = o.validate() + c.Assert(err, check.IsNil) + c.Assert(o.serverConfig, check.DeepEquals, &config.ServerConfig{ Addr: "127.5.5.1:8833", AdvertiseAddr: "127.5.5.1:7777", LogFile: "/root/cdc.log", @@ -175,9 +187,11 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { RegionScanLimit: 40, }, }) +} - // test decode config file - dataDir = c.MkDir() +func (s *serverSuite) TestDecodeCfg(c *check.C) { + defer testleak.AfterTest(c)() + dataDir := c.MkDir() tmpDir := c.MkDir() configPath := filepath.Join(tmpDir, "ticdc.toml") configContent := fmt.Sprintf(` @@ -208,14 +222,20 @@ num-concurrent-worker = 4 num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" `, dataDir) - err = ioutil.WriteFile(configPath, []byte(configContent), 0o644) + err := ioutil.WriteFile(configPath, []byte(configContent), 0o644) c.Assert(err, check.IsNil) - cmd = new(cobra.Command) - initServerCmd(cmd) + + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) + c.Assert(cmd.ParseFlags([]string{"--config", configPath}), check.IsNil) - cfg, err = loadAndVerifyServerConfig(cmd) + + err = o.complete(cmd) c.Assert(err, check.IsNil) - c.Assert(cfg, check.DeepEquals, &config.ServerConfig{ + err = o.validate() + c.Assert(err, check.IsNil) + c.Assert(o.serverConfig, check.DeepEquals, &config.ServerConfig{ Addr: "128.0.0.1:1234", AdvertiseAddr: "127.0.0.1:1111", LogFile: "/root/cdc1.log", @@ -249,18 +269,54 @@ sort-dir = "/tmp/just_a_test" RegionScanLimit: 40, }, }) +} + +func (s *serverSuite) TestDecodeCfgWithFlags(c *check.C) { + defer testleak.AfterTest(c)() + dataDir := c.MkDir() + tmpDir := c.MkDir() + configPath := filepath.Join(tmpDir, "ticdc.toml") + configContent := fmt.Sprintf(` +addr = "128.0.0.1:1234" +advertise-addr = "127.0.0.1:1111" + +log-file = "/root/cdc1.log" +log-level = "warn" + +data-dir = "%+v" +gc-ttl = 500 +tz = "US" +capture-session-ttl = 10 + +owner-flush-interval = "600ms" +processor-flush-interval = "600ms" + +[log.file] +max-size = 200 +max-days = 1 +max-backups = 1 + +[sorter] +chunk-size-limit = 10000000 +max-memory-consumption = 2000000 +max-memory-percentage = 3 +num-concurrent-worker = 4 +num-workerpool-goroutine = 5 +sort-dir = "/tmp/just_a_test" - configContent = configContent + ` [security] ca-path = "aa" cert-path = "bb" key-path = "cc" cert-allowed-cn = ["dd","ee"] -` - err = ioutil.WriteFile(configPath, []byte(configContent), 0o644) +`, dataDir) + err := ioutil.WriteFile(configPath, []byte(configContent), 0o644) c.Assert(err, check.IsNil) - cmd = new(cobra.Command) - initServerCmd(cmd) + + cmd := new(cobra.Command) + o := newOptions() + o.addFlags(cmd) + c.Assert(cmd.ParseFlags([]string{ "--addr", "127.5.5.1:8833", "--log-file", "/root/cdc.log", @@ -277,9 +333,12 @@ cert-allowed-cn = ["dd","ee"] "--sorter-num-concurrent-worker", "3", "--config", configPath, }), check.IsNil) - cfg, err = loadAndVerifyServerConfig(cmd) + + err = o.complete(cmd) + c.Assert(err, check.IsNil) + err = o.validate() c.Assert(err, check.IsNil) - c.Assert(cfg, check.DeepEquals, &config.ServerConfig{ + c.Assert(o.serverConfig, check.DeepEquals, &config.ServerConfig{ Addr: "127.5.5.1:8833", AdvertiseAddr: "127.0.0.1:1111", LogFile: "/root/cdc.log", diff --git a/pkg/cmd/util/helper.go b/pkg/cmd/util/helper.go index c3cee5429b4..4467cf29fae 100644 --- a/pkg/cmd/util/helper.go +++ b/pkg/cmd/util/helper.go @@ -14,9 +14,21 @@ package util import ( + "context" "net/url" + "os" + "os/signal" + "strings" + "syscall" + "github.com/BurntSushi/toml" "github.com/pingcap/errors" + "github.com/pingcap/log" + cmdconetxt "github.com/pingcap/ticdc/pkg/cmd/context" + "github.com/pingcap/ticdc/pkg/logutil" + "github.com/spf13/cobra" + "go.uber.org/zap" + "golang.org/x/net/http/httpproxy" ) // Endpoint schemes. @@ -25,6 +37,81 @@ const ( HTTPS = "https" ) +// InitCmd initializes the logger, the default context and returns its cancel function. +func InitCmd(cmd *cobra.Command, logCfg *logutil.Config) context.CancelFunc { + // Init log. + err := logutil.InitLogger(logCfg) + if err != nil { + cmd.Printf("init logger error %v\n", errors.ErrorStack(err)) + os.Exit(1) + } + log.Info("init log", zap.String("file", logCfg.File), zap.String("level", logCfg.Level)) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + sig := <-sc + log.Info("got signal to exit", zap.Stringer("signal", sig)) + cancel() + }() + + cmdconetxt.SetDefaultContext(ctx) + + return cancel +} + +// LogHTTPProxies logs HTTP proxy relative environment variables. +func LogHTTPProxies() { + fields := findProxyFields() + if len(fields) > 0 { + log.Info("using proxy config", fields...) + } +} + +func findProxyFields() []zap.Field { + proxyCfg := httpproxy.FromEnvironment() + fields := make([]zap.Field, 0, 3) + if proxyCfg.HTTPProxy != "" { + fields = append(fields, zap.String("http_proxy", proxyCfg.HTTPProxy)) + } + if proxyCfg.HTTPSProxy != "" { + fields = append(fields, zap.String("https_proxy", proxyCfg.HTTPSProxy)) + } + if proxyCfg.NoProxy != "" { + fields = append(fields, zap.String("no_proxy", proxyCfg.NoProxy)) + } + return fields +} + +// StrictDecodeFile decodes the toml file strictly. If any item in confFile file is not mapped +// into the Config struct, issue an error and stop the server from starting. +func StrictDecodeFile(path, component string, cfg interface{}) error { + metaData, err := toml.DecodeFile(path, cfg) + if err != nil { + return errors.Trace(err) + } + + if undecoded := metaData.Undecoded(); len(undecoded) > 0 { + var b strings.Builder + for i, item := range undecoded { + if i != 0 { + b.WriteString(", ") + } + b.WriteString(item.String()) + } + err = errors.Errorf("component %s's config file %s contained unknown configuration options: %s", + component, path, b.String()) + } + + return errors.Trace(err) +} + // VerifyPdEndpoint verifies whether the pd endpoint is a valid http or https URL. // The certificate is required when using https. func VerifyPdEndpoint(pdEndpoint string, useTLS bool) error { diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 1d4c43a4794..3ccf174cca0 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -14,9 +14,14 @@ package util import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" "testing" "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -26,6 +31,38 @@ type utilsSuite struct{} var _ = check.Suite(&utilsSuite{}) +func (s *utilsSuite) TestProxyFields(c *check.C) { + defer testleak.AfterTest(c)() + revIndex := map[string]int{ + "http_proxy": 0, + "https_proxy": 1, + "no_proxy": 2, + } + envs := []string{"http_proxy", "https_proxy", "no_proxy"} + envPreset := []string{"http://127.0.0.1:8080", "https://127.0.0.1:8443", "localhost,127.0.0.1"} + + // Exhaust all combinations of those environment variables' selection. + // Each bit of the mask decided whether this index of `envs` would be set. + for mask := 0; mask <= 0b111; mask++ { + for _, env := range envs { + c.Assert(os.Unsetenv(env), check.IsNil) + } + + for i := 0; i < 3; i++ { + if (1<