Skip to content

Commit

Permalink
feature: add transport config (apache#406)
Browse files Browse the repository at this point in the history
* add transport config
  • Loading branch information
Chovyyyyyy authored and georgehao committed Feb 3, 2023
1 parent 116950d commit d322c84
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 18 deletions.
2 changes: 1 addition & 1 deletion conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ seata:
heartbeat: true
# the client batch send request enable
enableClientBatchSendRequest: true
compressor: nome
compressor: none
service:


Expand Down
4 changes: 2 additions & 2 deletions conf/seata_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ seata:
# Allow batch sending of requests (RM)
enable-rm-client-batch-send-request: true
# RM send request timeout
rpc-rm-request-timeout: 3s
rpc-rm-request-timeout: 30s
# TM send request timeout
rpc-tm-request-timeout: 3s
rpc-tm-request-timeout: 30s
# Configuration Center
config:
type: file
Expand Down
8 changes: 5 additions & 3 deletions pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,17 @@ func (c *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
}

type Config struct {
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
TCCConfig tcc.Config `yaml:"tcc" json:"tcc" koanf:"tcc"`
ClientConfig ClientConfig `yaml:"client" json:"client" koanf:"client"`
GettyConfig getty.Config `yaml:"getty" json:"getty" koanf:"getty"`
TransportConfig getty.TransportConfig `yaml:"transport" json:"transport" koanf:"transport"`
}

func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.TCCConfig.FenceConfig.RegisterFlagsWithPrefix("tcc", f)
c.ClientConfig.RegisterFlagsWithPrefix("client", f)
c.GettyConfig.RegisterFlagsWithPrefix("getty", f)
c.TransportConfig.RegisterFlagsWithPrefix("transport", f)
}

type loaderConf struct {
Expand Down
30 changes: 29 additions & 1 deletion pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,28 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName)
assert.Equal(t, time.Second, cfg.GettyConfig.SessionConfig.CronPeriod)

assert.NotNil(t, cfg.TransportConfig)
assert.NotNil(t, cfg.TransportConfig.ShutdownConfig)
assert.Equal(t, time.Second*3, cfg.TransportConfig.ShutdownConfig.Wait)
assert.Equal(t, "TCP", cfg.TransportConfig.Type)
assert.Equal(t, "NIO", cfg.TransportConfig.Server)
assert.Equal(t, true, cfg.TransportConfig.Heartbeat)
assert.Equal(t, "seata", cfg.TransportConfig.Serialization)
assert.Equal(t, "none", cfg.TransportConfig.Compressor)
assert.Equal(t, false, cfg.TransportConfig.EnableTmClientBatchSendRequest)
assert.Equal(t, true, cfg.TransportConfig.EnableRmClientBatchSendRequest)
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCRmRequestTimeout)
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCTmRequestTimeout)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}

func TestLoadJson(t *testing.T) {
confJson := `{"client":{"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}}}`
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},
"transport":{"shutdown":{"wait":"3s"}, "type":"TCP", "server":"NIO", "heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"}}`

cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)

Expand Down Expand Up @@ -106,6 +121,19 @@ func TestLoadJson(t *testing.T) {
assert.Equal(t, "client_test", cfg.GettyConfig.SessionConfig.SessionName)
assert.Equal(t, time.Second*2, cfg.GettyConfig.SessionConfig.CronPeriod)

assert.NotNil(t, cfg.TransportConfig)
assert.NotNil(t, cfg.TransportConfig.ShutdownConfig)
assert.Equal(t, time.Second*3, cfg.TransportConfig.ShutdownConfig.Wait)
assert.Equal(t, "TCP", cfg.TransportConfig.Type)
assert.Equal(t, "NIO", cfg.TransportConfig.Server)
assert.Equal(t, true, cfg.TransportConfig.Heartbeat)
assert.Equal(t, "seata", cfg.TransportConfig.Serialization)
assert.Equal(t, "none", cfg.TransportConfig.Compressor)
assert.Equal(t, false, cfg.TransportConfig.EnableTmClientBatchSendRequest)
assert.Equal(t, true, cfg.TransportConfig.EnableRmClientBatchSendRequest)
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCRmRequestTimeout)
assert.Equal(t, time.Second*30, cfg.TransportConfig.RPCTmRequestTimeout)

// reset flag.CommandLine
flag.CommandLine = flag.NewFlagSet(os.Args[0], flag.ExitOnError)
}
4 changes: 2 additions & 2 deletions pkg/config/seata_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestInit(t *testing.T) {
assert.Equal(t, DefaultSeataConf.Seata.Transport.Compressor, "none")
assert.Equal(t, DefaultSeataConf.Seata.Transport.EnableTmClientBatchSendRequest, false)
assert.Equal(t, DefaultSeataConf.Seata.Transport.EnableRmClientBatchSendRequest, true)
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCRmRequestTimeout, time.Duration(3_000_000_000))
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCTmRequestTimeout, time.Duration(3_000_000_000))
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCRmRequestTimeout, time.Duration(30_000_000_000))
assert.Equal(t, DefaultSeataConf.Seata.Transport.RPCTmRequestTimeout, time.Duration(30_000_000_000))

// config
assert.Equal(t, DefaultSeataConf.Seata.Config.Type, "file")
Expand Down
8 changes: 4 additions & 4 deletions pkg/datasource/sql/exec/at/delete_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type deleteExecutor struct {
execContent *types.ExecContext
}

//NewDeleteExecutor get delete executor
// NewDeleteExecutor get delete executor
func NewDeleteExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor {
return &deleteExecutor{parserCtx: parserCtx, execContent: execContent, baseExecutor: baseExecutor{hooks: hooks}}
}

//ExecContext exec SQL, and generate before image and after image
// ExecContext exec SQL, and generate before image and after image
func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNamedValue) (types.ExecResult, error) {
d.beforeHooks(ctx, d.execContent)
defer func() {
Expand All @@ -72,7 +72,7 @@ func (d deleteExecutor) ExecContext(ctx context.Context, f exec.CallbackWithName
return res, nil
}

//beforeImage build before image
// beforeImage build before image
func (d *deleteExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
selectSQL, selectArgs, err := d.buildBeforeImageSQL(d.execContent.Query, d.execContent.NamedValues)
if err != nil {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (d *deleteExecutor) buildBeforeImageSQL(query string, args []driver.NamedVa
return sql, d.buildSelectArgs(&selStmt, args), nil
}

//afterImage build after image
// afterImage build after image
func (d *deleteExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {
tableName, _ := d.parserCtx.GteTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, d.execContent.DBName, tableName)
Expand Down
6 changes: 3 additions & 3 deletions pkg/datasource/sql/exec/at/insert_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type insertExecutor struct {
businesSQLResult types.ExecResult
}

//NewInsertExecutor get insert executor
// NewInsertExecutor get insert executor
func NewInsertExecutor(parserCtx *types.ParseContext, execContent *types.ExecContext, hooks []exec.SQLHook) executor {
return &insertExecutor{parserCtx: parserCtx, execContent: execContent, baseExecutor: baseExecutor{hooks: hooks}}
}
Expand Down Expand Up @@ -76,7 +76,7 @@ func (i *insertExecutor) ExecContext(ctx context.Context, f exec.CallbackWithNam
return res, nil
}

//beforeImage build before image
// beforeImage build before image
func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, error) {
tableName, _ := i.parserCtx.GteTableName()
metaData, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, i.execContent.DBName, tableName)
Expand All @@ -86,7 +86,7 @@ func (i *insertExecutor) beforeImage(ctx context.Context) (*types.RecordImage, e
return types.NewEmptyRecordImage(metaData, types.SQLTypeInsert), nil
}

//afterImage build after image
// afterImage build after image
func (i *insertExecutor) afterImage(ctx context.Context) (*types.RecordImage, error) {
if !i.isAstStmtValid() {
return nil, nil
Expand Down
34 changes: 34 additions & 0 deletions pkg/remoting/getty/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,44 @@ type Config struct {
SessionConfig SessionConfig `yaml:"session" json:"session" koanf:"session"`
}

type ShutdownConfig struct {
Wait time.Duration `yaml:"wait" json:"wait" konaf:"wait"`
}

type TransportConfig struct {
ShutdownConfig ShutdownConfig `yaml:"shutdown" json:"shutdown" koanf:"shutdown"`
Type string `yaml:"type" json:"type" koanf:"type"`
Server string `yaml:"server" json:"server" koanf:"server"`
Heartbeat bool `yaml:"heartbeat" json:"heartbeat" koanf:"heartbeat"`
Serialization string `yaml:"serialization" json:"serialization" koanf:"serialization"`
Compressor string `yaml:"compressor" json:"compressor" koanf:"compressor"`
EnableTmClientBatchSendRequest bool `yaml:"enable-tm-client-batch-send-request" json:"enable-tm-client-batch-send-request" koanf:"enable-tm-client-batch-send-request"`
EnableRmClientBatchSendRequest bool `yaml:"enable-rm-client-batch-send-request" json:"enable-rm-client-batch-send-request" koanf:"enable-rm-client-batch-send-request"`
RPCRmRequestTimeout time.Duration `yaml:"rpc-rm-request-timeout" json:"rpc-rm-request-timeout" koanf:"rpc-rm-request-timeout"`
RPCTmRequestTimeout time.Duration `yaml:"rpc-tm-request-timeout" json:"rpc-tm-request-timeout" koanf:"rpc-tm-request-timeout"`
}

// RegisterFlagsWithPrefix for Config.
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ReconnectInterval, prefix+".reconnect-interval", 0, "Reconnect interval.")
f.IntVar(&cfg.ConnectionNum, prefix+".connection-num", 16, "The getty_session pool.")
f.DurationVar(&cfg.HeartbeatPeriod, prefix+".heartbeat-period", 15*time.Second, "The heartbeat period.")
cfg.SessionConfig.RegisterFlagsWithPrefix(prefix+".session", f)
}

func (cfg *ShutdownConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.Wait, prefix+".wait", 3*time.Second, "Shutdown wait time.")
}

func (cfg *TransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
cfg.ShutdownConfig.RegisterFlagsWithPrefix(prefix+".shutdown", f)
f.StringVar(&cfg.Type, prefix+".type", "TCP", "Transport protocol type.")
f.StringVar(&cfg.Server, prefix+".server", "NIO", "Server type.")
f.BoolVar(&cfg.Heartbeat, prefix+".heartbeat", true, "Heartbeat.")
f.StringVar(&cfg.Serialization, prefix+".serialization", "seata", "Encoding and decoding mode.")
f.StringVar(&cfg.Compressor, prefix+".compressor", "none", "Message compression mode.")
f.BoolVar(&cfg.EnableTmClientBatchSendRequest, prefix+".enable-tm-client-batch-send-request", false, "Allow batch sending of requests (TM).")
f.BoolVar(&cfg.EnableRmClientBatchSendRequest, prefix+".enable-rm-client-batch-send-request", true, "Allow batch sending of requests (RM).")
f.DurationVar(&cfg.RPCRmRequestTimeout, prefix+".rpc-rm-request-timeout", 30*time.Second, "RM send request timeout.")
f.DurationVar(&cfg.RPCTmRequestTimeout, prefix+".rpc-tm-request-timeout", 30*time.Second, "TM send request timeout.")
}
4 changes: 2 additions & 2 deletions testdata/conf/seatago.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ seata:
# Allow batch sending of requests (RM)
enable-rm-client-batch-send-request: true
# RM send request timeout
rpc-rm-request-timeout: 3s
rpc-rm-request-timeout: 30s
# TM send request timeout
rpc-tm-request-timeout: 3s
rpc-tm-request-timeout: 30s
# Configuration Center
config:
type: file
Expand Down

0 comments on commit d322c84

Please sign in to comment.