From d322c84d33cb44e1acf05119a41e02acc2d726ea Mon Sep 17 00:00:00 2001 From: Zihao Yu <81380056+Chovyyyyyy@users.noreply.github.com> Date: Sat, 24 Dec 2022 22:02:37 +0800 Subject: [PATCH] feature: add transport config (#406) * add transport config --- conf/config.yaml | 2 +- conf/seata_config.yml | 4 +-- pkg/client/config.go | 8 +++-- pkg/client/config_test.go | 30 +++++++++++++++- pkg/config/seata_config_test.go | 4 +-- pkg/datasource/sql/exec/at/delete_executor.go | 8 ++--- pkg/datasource/sql/exec/at/insert_executor.go | 6 ++-- pkg/remoting/getty/config.go | 34 +++++++++++++++++++ testdata/conf/seatago.yml | 4 +-- 9 files changed, 82 insertions(+), 18 deletions(-) diff --git a/conf/config.yaml b/conf/config.yaml index 696ee6960..9de1b52ad 100644 --- a/conf/config.yaml +++ b/conf/config.yaml @@ -26,7 +26,7 @@ seata: heartbeat: true # the client batch send request enable enableClientBatchSendRequest: true - compressor: nome + compressor: none service: diff --git a/conf/seata_config.yml b/conf/seata_config.yml index 98bbb210e..6e8fb4620 100644 --- a/conf/seata_config.yml +++ b/conf/seata_config.yml @@ -100,9 +100,9 @@ seata: # Allow batch sending of requests (RM) enable-rm-client-batch-send-request: true # RM send request timeout - rpc-rm-request-timeout: 3s + rpc-rm-request-timeout: 30s # TM send request timeout - rpc-tm-request-timeout: 3s + rpc-tm-request-timeout: 30s # Configuration Center config: type: file diff --git a/pkg/client/config.go b/pkg/client/config.go index 62ce9a2e7..1f69c301f 100644 --- a/pkg/client/config.go +++ b/pkg/client/config.go @@ -61,15 +61,17 @@ func (c *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { } type Config struct { - TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` - ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` - GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"` + TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"` + ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"` + GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"` + TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"` } func (c *Config) RegisterFlags(f *flag.FlagSet) { c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f) c.ClientConfig.RegisterFlagsWithPrefix("client", f) c.GettyConfig.RegisterFlagsWithPrefix("getty", f) + c.TransportConfig.RegisterFlagsWithPrefix("transport", f) } type loaderConf struct { diff --git a/pkg/client/config_test.go b/pkg/client/config_test.go index bb5d68e29..80fb50365 100644 --- a/pkg/client/config_test.go +++ b/pkg/client/config_test.go @@ -63,13 +63,28 @@ func TestLoadPath(t *testing.T) { assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName) assert.Equal(t, time.Second, cfg.GettyConfig.SessionConfig.CronPeriod) + assert.NotNil(t, cfg.TransportConfig) + assert.NotNil(t, cfg.TransportConfig.ShutdownConfig) + assert.Equal(t, time.Second*3, cfg.TransportConfig.ShutdownConfig.Wait) + assert.Equal(t, "TCP", cfg.TransportConfig.Type) + assert.Equal(t, "NIO", cfg.TransportConfig.Server) + assert.Equal(t, true, cfg.TransportConfig.Heartbeat) + assert.Equal(t, "seata", cfg.TransportConfig.Serialization) + assert.Equal(t, "none", cfg.TransportConfig.Compressor) + assert.Equal(t, false, cfg.TransportConfig.EnableTmClientBatchSendRequest) + assert.Equal(t, true, cfg.TransportConfig.EnableRmClientBatchSendRequest) + assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCRmRequestTimeout) + assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCTmRequestTimeout) + // reset flag.CommandLine flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) } func TestLoadJson(t *testing.T) { confJson := `{"client":{"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}}, - "getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}}` + "getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}, + "transport":{"shutdown":{"wait":"3s"}, "type":"TCP", "server":"NIO", "heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"}}` + cfg := LoadJson([]byte(confJson)) assert.NotNil(t, cfg) @@ -106,6 +121,19 @@ func TestLoadJson(t *testing.T) { assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName) assert.Equal(t, time.Second*2, cfg.GettyConfig.SessionConfig.CronPeriod) + assert.NotNil(t, cfg.TransportConfig) + assert.NotNil(t, cfg.TransportConfig.ShutdownConfig) + assert.Equal(t, time.Second*3, cfg.TransportConfig.ShutdownConfig.Wait) + assert.Equal(t, "TCP", cfg.TransportConfig.Type) + assert.Equal(t, "NIO", cfg.TransportConfig.Server) + assert.Equal(t, true, cfg.TransportConfig.Heartbeat) + assert.Equal(t, "seata", cfg.TransportConfig.Serialization) + assert.Equal(t, "none", cfg.TransportConfig.Compressor) + assert.Equal(t, false, cfg.TransportConfig.EnableTmClientBatchSendRequest) + assert.Equal(t, true, cfg.TransportConfig.EnableRmClientBatchSendRequest) + assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCRmRequestTimeout) + assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCTmRequestTimeout) + // reset flag.CommandLine flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError) } diff --git a/pkg/config/seata_config_test.go b/pkg/config/seata_config_test.go index 724cca7b5..547f8233b 100644 --- a/pkg/config/seata_config_test.go +++ b/pkg/config/seata_config_test.go @@ -84,8 +84,8 @@ func TestInit(t *testing.T) { assert.Equal(t, DefaultSeataConf.Seata.Transport.Compressor, "none") assert.Equal(t, DefaultSeataConf.Seata.Transport.EnableTmClientBatchSendRequest, false) assert.Equal(t, DefaultSeataConf.Seata.Transport.EnableRmClientBatchSendRequest, true) - assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCRmRequestTimeout, time.Duration(3_000_000_000)) - assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCTmRequestTimeout, time.Duration(3_000_000_000)) + assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCRmRequestTimeout, time.Duration(30_000_000_000)) + assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCTmRequestTimeout, time.Duration(30_000_000_000)) // config assert.Equal(t, DefaultSeataConf.Seata.Config.Type, "file") diff --git a/pkg/datasource/sql/exec/at/delete_executor.go b/pkg/datasource/sql/exec/at/delete_executor.go index c75f29819..589f8b5b9 100644 --- a/pkg/datasource/sql/exec/at/delete_executor.go +++ b/pkg/datasource/sql/exec/at/delete_executor.go @@ -40,12 +40,12 @@ type deleteExecutor struct { execContent *types.ExecContext } -//NewDeleteExecutor get delete executor +// NewDeleteExecutor get delete executor func NewDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor { return &deleteExecutor{parserCtx: parserCtx, execContent: execContent, baseExecutor: baseExecutor{hooks: hooks}} } -//ExecContext exec SQL, and generate before image and after image +// ExecContext exec SQL, and generate before image and after image func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) { d.beforeHooks(ctx, d.execContent) defer func() { @@ -72,7 +72,7 @@ func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithName return res, nil } -//beforeImage build before image +// beforeImage build before image func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) { selectSQL, selectArgs, err := d.buildBeforeImageSQL(d.execContent.Query, d.execContent.NamedValues) if err != nil { @@ -148,7 +148,7 @@ func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedVa return sql, d.buildSelectArgs(&selStmt, args), nil } -//afterImage build after image +// afterImage build after image func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) { tableName, _ := d.parserCtx.GteTableName() metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContent.DBName, tableName) diff --git a/pkg/datasource/sql/exec/at/insert_executor.go b/pkg/datasource/sql/exec/at/insert_executor.go index eb8ee78af..14d731356 100644 --- a/pkg/datasource/sql/exec/at/insert_executor.go +++ b/pkg/datasource/sql/exec/at/insert_executor.go @@ -45,7 +45,7 @@ type insertExecutor struct { businesSQLResult types.ExecResult } -//NewInsertExecutor get insert executor +// NewInsertExecutor get insert executor func NewInsertExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor { return &insertExecutor{parserCtx: parserCtx, execContent: execContent, baseExecutor: baseExecutor{hooks: hooks}} } @@ -76,7 +76,7 @@ func (i *insertExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNam return res, nil } -//beforeImage build before image +// beforeImage build before image func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) { tableName, _ := i.parserCtx.GteTableName() metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContent.DBName, tableName) @@ -86,7 +86,7 @@ func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e return types.NewEmptyRecordImage(metaData, types.SQLTypeInsert), nil } -//afterImage build after image +// afterImage build after image func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) { if !i.isAstStmtValid() { return nil, nil diff --git a/pkg/remoting/getty/config.go b/pkg/remoting/getty/config.go index 92589822e..e0762b2cf 100644 --- a/pkg/remoting/getty/config.go +++ b/pkg/remoting/getty/config.go @@ -29,6 +29,23 @@ type Config struct { SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"` } +type ShutdownConfig struct { + Wait time.Duration `yaml:"wait" json:"wait" konaf:"wait"` +} + +type TransportConfig struct { + ShutdownConfig ShutdownConfig `yaml:"shutdown" json:"shutdown" koanf:"shutdown"` + Type string `yaml:"type" json:"type" koanf:"type"` + Server string `yaml:"server" json:"server" koanf:"server"` + Heartbeat bool `yaml:"heartbeat" json:"heartbeat" koanf:"heartbeat"` + Serialization string `yaml:"serialization" json:"serialization" koanf:"serialization"` + Compressor string `yaml:"compressor" json:"compressor" koanf:"compressor"` + EnableTmClientBatchSendRequest bool `yaml:"enable-tm-client-batch-send-request" json:"enable-tm-client-batch-send-request" koanf:"enable-tm-client-batch-send-request"` + EnableRmClientBatchSendRequest bool `yaml:"enable-rm-client-batch-send-request" json:"enable-rm-client-batch-send-request" koanf:"enable-rm-client-batch-send-request"` + RPCRmRequestTimeout time.Duration `yaml:"rpc-rm-request-timeout" json:"rpc-rm-request-timeout" koanf:"rpc-rm-request-timeout"` + RPCTmRequestTimeout time.Duration `yaml:"rpc-tm-request-timeout" json:"rpc-tm-request-timeout" koanf:"rpc-tm-request-timeout"` +} + // RegisterFlagsWithPrefix for Config. func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.") @@ -36,3 +53,20 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.") cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f) } + +func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.") +} + +func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + cfg.ShutdownConfig.RegisterFlagsWithPrefix(prefix+".shutdown", f) + f.StringVar(&cfg.Type, prefix+".type", "TCP", "Transport protocol type.") + f.StringVar(&cfg.Server, prefix+".server", "NIO", "Server type.") + f.BoolVar(&cfg.Heartbeat, prefix+".heartbeat", true, "Heartbeat.") + f.StringVar(&cfg.Serialization, prefix+".serialization", "seata", "Encoding and decoding mode.") + f.StringVar(&cfg.Compressor, prefix+".compressor", "none", "Message compression mode.") + f.BoolVar(&cfg.EnableTmClientBatchSendRequest, prefix+".enable-tm-client-batch-send-request", false, "Allow batch sending of requests (TM).") + f.BoolVar(&cfg.EnableRmClientBatchSendRequest, prefix+".enable-rm-client-batch-send-request", true, "Allow batch sending of requests (RM).") + f.DurationVar(&cfg.RPCRmRequestTimeout, prefix+".rpc-rm-request-timeout", 30*time.Second, "RM send request timeout.") + f.DurationVar(&cfg.RPCTmRequestTimeout, prefix+".rpc-tm-request-timeout", 30*time.Second, "TM send request timeout.") +} diff --git a/testdata/conf/seatago.yml b/testdata/conf/seatago.yml index e3889a8d6..b3097441d 100644 --- a/testdata/conf/seatago.yml +++ b/testdata/conf/seatago.yml @@ -100,9 +100,9 @@ seata: # Allow batch sending of requests (RM) enable-rm-client-batch-send-request: true # RM send request timeout - rpc-rm-request-timeout: 3s + rpc-rm-request-timeout: 30s # TM send request timeout - rpc-tm-request-timeout: 3s + rpc-tm-request-timeout: 30s # Configuration Center config: type: file