Skip to content

Commit

Permalink
feat: add RM config (apache#412)
Browse files Browse the repository at this point in the history
* feat: add rm config
  • Loading branch information
liiibpm authored Dec 31, 2022
1 parent 857a35b commit 04d1523
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 4 deletions.
4 changes: 3 additions & 1 deletion pkg/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/seata/seata-go/pkg/remoting/getty"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/rm/tcc"
"github.com/seata/seata-go/pkg/tm"
"github.com/seata/seata-go/pkg/util/flagext"
Expand All @@ -51,13 +52,14 @@ const (

type ClientConfig struct {
TmConfig tm.TmConfig `yaml:"tm" json:"tm,omitempty" koanf:"tm"`
RmConfig rm.Config `yaml:"rm" json:"rm,omitempty" koanf:"rm"`
}

func (c *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
// TODO: RmConf RegisterFlagsWithPrefix
// TODO: Undo RegisterFlagsWithPrefix
// TODO: LoadBalance RegisterFlagsWithPrefix
c.TmConfig.RegisterFlagsWithPrefix(prefix+".tm", f)
c.RmConfig.RegisterFlagsWithPrefix(prefix+".rm", f)
}

type Config struct {
Expand Down
36 changes: 33 additions & 3 deletions pkg/client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,20 @@ func TestLoadPath(t *testing.T) {
assert.Equal(t, time.Second*10, cfg.ClientConfig.TmConfig.DegradeCheckAllowTimes)
assert.Equal(t, -2147482648, cfg.ClientConfig.TmConfig.InterceptorOrder)

assert.Equal(t, 10000, cfg.ClientConfig.RmConfig.AsyncCommitBufferLimit)
assert.Equal(t, 5, cfg.ClientConfig.RmConfig.ReportRetryCount)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.TableMetaCheckEnable)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.ReportSuccessEnable)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaBranchRegisterEnable)
assert.Equal(t, "fastjson", cfg.ClientConfig.RmConfig.SagaJsonParser)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaCompensatePersistModeUpdate)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaRetryPersistModeUpdate)
assert.Equal(t, -2147482648, cfg.ClientConfig.RmConfig.TccActionInterceptorOrder)
assert.Equal(t, "druid", cfg.ClientConfig.RmConfig.SqlParserType)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict)

assert.NotNil(t, cfg.GettyConfig)
assert.NotNil(t, cfg.GettyConfig.SessionConfig)
assert.Equal(t, 0, cfg.GettyConfig.ReconnectInterval)
Expand Down Expand Up @@ -81,13 +95,29 @@ func TestLoadPath(t *testing.T) {
}

func TestLoadJson(t *testing.T) {
confJson := `{"client":{"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},
"transport":{"shutdown":{"wait":"3s"}, "type":"TCP", "server":"NIO", "heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"}}`
confJson := `{"client":{"rm":{"async-commit-buffer-limit":10000,"report-retry-count":5,"table-meta-check-enable":false,"report-success-enable":false,"saga-branch-register-enable":false,"saga-json-parser":"fastjson","saga-retry-persist-mode-update":false,"saga-compensate-persist-mode-update":false,"tcc-action-interceptor-order":-2147482648,"sql-parser-type":"druid","lock":{"retry-interval":10,"retry-times":"30s","retry-policy-branch-rollback-on-conflict":true}},
"tm":{"commit-retry-count":5,"rollback-retry-count":5,"default-global-transaction-timeout":"60s","degrade-check":false,"degrade-check-period":2000,"degrade-check-allow-times":"10s","interceptor-order":-2147482648}},
"tcc":{"fence":{"log-table-name":"tcc_fence_log_test2","clean-period":80000000000}},
"getty":{"reconnect-interval":1,"connection-num":10,"heartbeat-period":"10s","session":{"compress-encoding":true,"tcp-no-delay":false,"tcp-keep-alive":false,"keep-alive-period":"120s","tcp-r-buf-size":261120,"tcp-w-buf-size":32768,"tcp-read-timeout":"2s","tcp-write-timeout":"8s","wait-timeout":"2s","max-msg-len":261120,"session-name":"client_test","cron-period":"2s"}},
"transport":{"shutdown":{"wait":"3s"},"type":"TCP","server":"NIO","heartbeat":true,"serialization":"seata","compressor":"none"," enable-tm-client-batch-send-request":false,"enable-rm-client-batch-send-request":true,"rpc-rm-request-timeout":"30s","rpc-tm-request-timeout":"30s"}}`

cfg := LoadJson([]byte(confJson))
assert.NotNil(t, cfg)

assert.Equal(t, 10000, cfg.ClientConfig.RmConfig.AsyncCommitBufferLimit)
assert.Equal(t, 5, cfg.ClientConfig.RmConfig.ReportRetryCount)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.TableMetaCheckEnable)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.ReportSuccessEnable)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaBranchRegisterEnable)
assert.Equal(t, "fastjson", cfg.ClientConfig.RmConfig.SagaJsonParser)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaCompensatePersistModeUpdate)
assert.Equal(t, false, cfg.ClientConfig.RmConfig.SagaRetryPersistModeUpdate)
assert.Equal(t, -2147482648, cfg.ClientConfig.RmConfig.TccActionInterceptorOrder)
assert.Equal(t, "druid", cfg.ClientConfig.RmConfig.SqlParserType)
assert.Equal(t, 10, cfg.ClientConfig.RmConfig.LockConfig.RetryInterval)
assert.Equal(t, time.Second*30, cfg.ClientConfig.RmConfig.LockConfig.RetryTimes)
assert.Equal(t, true, cfg.ClientConfig.RmConfig.LockConfig.RetryPolicyBranchRollbackOnConflict)

assert.NotNil(t, cfg.TCCConfig)
assert.NotNil(t, cfg.TCCConfig.FenceConfig)
assert.Equal(t, "tcc_fence_log_test2", cfg.TCCConfig.FenceConfig.LogTableName)
Expand Down
63 changes: 63 additions & 0 deletions pkg/rm/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rm

import (
"flag"
"time"
)

type Config struct {
AsyncCommitBufferLimit int `yaml:"async-commit-buffer-limit" json:"async-commit-buffer-limit,omitempty" koanf:"async-commit-buffer-limit"`
ReportRetryCount int `yaml:"report-retry-count" json:"report-retry-count,omitempty" koanf:"report-retry-count"`
TableMetaCheckEnable bool `yaml:"table-meta-check-enable" json:"table-meta-check-enable" koanf:"table-meta-check-enable"`
ReportSuccessEnable bool `yaml:"report-success-enable" json:"report-success-enable,omitempty" koanf:"report-success-enable"`
SagaBranchRegisterEnable bool `yaml:"saga-branch-register-enable" json:"saga-branch-register-enable,omitempty" koanf:"saga-branch-register-enable"`
SagaJsonParser string `yaml:"saga-json-parser" json:"saga-json-parser,omitempty" koanf:"saga-json-parser"`
SagaRetryPersistModeUpdate bool `yaml:"saga-retry-persist-mode-update" json:"saga-retry-persist-mode-update,omitempty" koanf:"saga-retry-persist-mode-update"`
SagaCompensatePersistModeUpdate bool `yaml:"saga-compensate-persist-mode-update" json:"saga-compensate-persist-mode-update,omitempty" koanf:"saga-compensate-persist-mode-update"`
TccActionInterceptorOrder int `yaml:"tcc-action-interceptor-order" json:"tcc-action-interceptor-order,omitempty" koanf:"tcc-action-interceptor-order"`
SqlParserType string `yaml:"sql-parser-type" json:"sql-parser-type,omitempty" koanf:"sql-parser-type"`
LockConfig LockConfig `yaml:"lock" json:"lock,omitempty" koanf:"lock"`
}

type LockConfig struct {
RetryInterval int `yaml:"retry-interval" json:"retry-interval,omitempty" koanf:"retry-interval"`
RetryTimes time.Duration `yaml:"retry-times" json:"retry-times,omitempty" koanf:"retry-times"`
RetryPolicyBranchRollbackOnConflict bool `yaml:"retry-policy-branch-rollback-on-conflict" json:"retry-policy-branch-rollback-on-conflict,omitempty" koanf:"retry-policy-branch-rollback-on-conflict"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.AsyncCommitBufferLimit, prefix+".async-commit-buffer-limit", 10000, "The Maximum cache length of asynchronous queue.")
f.IntVar(&cfg.ReportRetryCount, prefix+".report-retry-count", 5, "The maximum number of retries when report reports the status.")
f.BoolVar(&cfg.TableMetaCheckEnable, prefix+".table-meta-check-enable", false, "Whether to check the metadata of the db(AT).")
f.BoolVar(&cfg.ReportSuccessEnable, prefix+".report-success-enable", false, "Whether to report the status if the transaction is successfully executed(AT)")
f.BoolVar(&cfg.SagaBranchRegisterEnable, prefix+".saga-branch-register-enable", false, "Whether to allow regular check of db metadata(AT)")
f.StringVar(&cfg.SagaJsonParser, prefix+".saga-json-parser", "fastjson", "The saga JsonParser.")
f.BoolVar(&cfg.SagaRetryPersistModeUpdate, prefix+".saga-retry-persist-mode-update", false, "Whether to retry SagaRetryPersistModeUpdate")
f.BoolVar(&cfg.SagaCompensatePersistModeUpdate, prefix+".saga-compensate-persist-mode-update", false, "")
f.IntVar(&cfg.TccActionInterceptorOrder, prefix+".tcc-action-interceptor-order", -2147482648, "The order of tccActionInterceptor.")
f.StringVar(&cfg.SqlParserType, prefix+".sql-parser-type", "druid", "The type of sql parser.")
cfg.LockConfig.RegisterFlagsWithPrefix(prefix, f)
}

func (cfg *LockConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.RetryInterval, prefix+".retry-interval", 10, "The maximum number of retries when lock fail.")
f.DurationVar(&cfg.RetryTimes, prefix+".retry-times", 30*time.Second, "The duration allowed for lock retrying.")
f.BoolVar(&cfg.RetryPolicyBranchRollbackOnConflict, prefix+".retry-policy-branch-rollback-on-conflict", true, "The switch for lock conflict.")
}

0 comments on commit 04d1523

Please sign in to comment.