Skip to content

Commit

Permalink
optimize at commit branch (#346)
Browse files Browse the repository at this point in the history
* optimize at
  • Loading branch information
luky116 authored Nov 18, 2022
1 parent 902cd25 commit a16a38b
Show file tree
Hide file tree
Showing 17 changed files with 137 additions and 37 deletions.
1 change: 0 additions & 1 deletion pkg/datasource/sql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type Conn struct {
autoCommit bool
dbName string
dbType types.DBType
superConn *Conn
}

// ResetSession is called prior to executing a query on the connection
Expand Down
3 changes: 1 addition & 2 deletions pkg/datasource/sql/conn_at.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (c *ATConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,
c.txCtx.TxOpt = opts

if tm.IsGlobalTx(ctx) {
c.txCtx.XaID = tm.GetXID(ctx)
c.txCtx.XID = tm.GetXID(ctx)
c.txCtx.TransType = types.ATMode
}

Expand All @@ -145,7 +145,6 @@ func (c *ATConn) createOnceTxContext(ctx context.Context) bool {
if onceTx {
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.XaID = tm.GetXID(ctx)
c.txCtx.XID = tm.GetXID(ctx)
c.txCtx.TransType = types.ATMode
c.txCtx.GlobalLockRequire = true
Expand Down
12 changes: 6 additions & 6 deletions pkg/datasource/sql/conn_at_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ func TestATConn_ExecContext(t *testing.T) {
t.Logf("set xid=%s", tm.GetXID(ctx))

beforeHook := func(_ context.Context, execCtx *types.ExecContext) {
t.Logf("on exec xid=%s", execCtx.TxCtx.XaID)
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XaID)
t.Logf("on exec xid=%s", execCtx.TxCtx.XID)
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XID)
assert.Equal(t, types.ATMode, execCtx.TxCtx.TransType)
}
mi.before = beforeHook
Expand All @@ -111,7 +111,7 @@ func TestATConn_ExecContext(t *testing.T) {

t.Run("not xid", func(t *testing.T) {
mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, "", execCtx.TxCtx.XaID)
assert.Equal(t, "", execCtx.TxCtx.XID)
assert.Equal(t, types.Local, execCtx.TxCtx.TransType)
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func TestATConn_BeginTx(t *testing.T) {
assert.NoError(t, err)

mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, "", execCtx.TxCtx.XaID)
assert.Equal(t, "", execCtx.TxCtx.XID)
assert.Equal(t, types.Local, execCtx.TxCtx.TransType)
}

Expand All @@ -174,7 +174,7 @@ func TestATConn_BeginTx(t *testing.T) {
assert.NoError(t, err)

mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, "", execCtx.TxCtx.XaID)
assert.Equal(t, "", execCtx.TxCtx.XID)
assert.Equal(t, types.Local, execCtx.TxCtx.TransType)
}

Expand Down Expand Up @@ -202,7 +202,7 @@ func TestATConn_BeginTx(t *testing.T) {
assert.NoError(t, err)

mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XaID)
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XID)
assert.Equal(t, types.ATMode, execCtx.TxCtx.TransType)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/conn_xa.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (c *XAConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx,

if tm.IsGlobalTx(ctx) {
c.txCtx.TransType = types.XAMode
c.txCtx.XaID = tm.GetXID(ctx)
c.txCtx.XID = tm.GetXID(ctx)
}

tx, err := c.Conn.BeginTx(ctx, opts)
Expand All @@ -91,7 +91,7 @@ func (c *XAConn) createOnceTxContext(ctx context.Context) bool {
if onceTx {
c.txCtx = types.NewTxCtx()
c.txCtx.DBType = c.res.dbType
c.txCtx.XaID = tm.GetXID(ctx)
c.txCtx.XID = tm.GetXID(ctx)
c.txCtx.TransType = types.XAMode
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/datasource/sql/conn_xa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func TestXAConn_ExecContext(t *testing.T) {
t.Logf("set xid=%s", tm.GetXID(ctx))

before := func(_ context.Context, execCtx *types.ExecContext) {
t.Logf("on exec xid=%s", execCtx.TxCtx.XaID)
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XaID)
t.Logf("on exec xid=%s", execCtx.TxCtx.XID)
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XID)
assert.Equal(t, types.XAMode, execCtx.TxCtx.TransType)
}
mi.before = before
Expand All @@ -163,7 +163,7 @@ func TestXAConn_ExecContext(t *testing.T) {

t.Run("not xid", func(t *testing.T) {
before := func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, "", execCtx.TxCtx.XaID)
assert.Equal(t, "", execCtx.TxCtx.XID)
assert.Equal(t, types.Local, execCtx.TxCtx.TransType)
}
mi.before = before
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestXAConn_BeginTx(t *testing.T) {
assert.NoError(t, err)

mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, "", execCtx.TxCtx.XaID)
assert.Equal(t, "", execCtx.TxCtx.XID)
assert.Equal(t, types.Local, execCtx.TxCtx.TransType)
}

Expand All @@ -228,7 +228,7 @@ func TestXAConn_BeginTx(t *testing.T) {
assert.NoError(t, err)

mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, "", execCtx.TxCtx.XaID)
assert.Equal(t, "", execCtx.TxCtx.XID)
assert.Equal(t, types.Local, execCtx.TxCtx.TransType)
}

Expand Down Expand Up @@ -256,7 +256,7 @@ func TestXAConn_BeginTx(t *testing.T) {
assert.NoError(t, err)

mi.before = func(_ context.Context, execCtx *types.ExecContext) {
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XaID)
assert.Equal(t, tm.GetXID(ctx), execCtx.TxCtx.XID)
assert.Equal(t, types.XAMode, execCtx.TxCtx.TransType)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/hook/logger_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (h *loggerSQLHook) Before(ctx context.Context, execCtx *types.ExecContext)
}
fields := []zap.Field{
zap.String("tx-id", txID),
zap.String("xid", execCtx.TxCtx.XaID),
zap.String("xid", execCtx.TxCtx.XID),
zap.String("sql", execCtx.Query),
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/datasource/sql/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (tx *Tx) register(ctx *types.TransactionContext) error {
lockKey += k + ";"
}
request := rm.BranchRegisterParam{
Xid: ctx.XaID,
Xid: ctx.XID,
BranchType: ctx.TransType.GetBranchType(),
ResourceId: ctx.ResourceID,
LockKeys: lockKey,
Expand All @@ -176,7 +176,7 @@ func (tx *Tx) report(success bool) error {
}
status := getStatus(success)
request := message.BranchReportRequest{
Xid: tx.tranCtx.XaID,
Xid: tx.tranCtx.XID,
BranchId: int64(tx.tranCtx.BranchID),
ResourceId: tx.tranCtx.ResourceID,
Status: status,
Expand All @@ -187,7 +187,7 @@ func (tx *Tx) report(success bool) error {
err := dataSourceManager.BranchReport(context.Background(), request)
if err != nil {
retry--
log.Infof("Failed to report [%s / %s] commit done [%s] Retry Countdown: %s", tx.tranCtx.BranchID, tx.tranCtx.XaID, success, retry)
log.Infof("Failed to report [%s / %s] commit done [%s] Retry Countdown: %s", tx.tranCtx.BranchID, tx.tranCtx.XID, success, retry)
if retry == 0 {
log.Infof("Failed to report branch status: %s", err.Error())
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/types/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type RecordImage struct {
// Rows data row
Rows []RowImage `json:"rows"`
// TableMeta table information schema
TableMeta TableMeta
TableMeta TableMeta `json:"-"`
}

// RowImage Mirror data information information
Expand Down
13 changes: 12 additions & 1 deletion pkg/datasource/sql/types/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ func (m TableMeta) IsEmpty() bool {
return m.TableName == ""
}

func (m TableMeta) GetPrimaryKeyMap() map[string]ColumnMeta {
pk := make(map[string]ColumnMeta)
for _, index := range m.Indexs {
if index.IType == IndexTypePrimaryKey {
for _, column := range index.Columns {
pk[column.ColumnName] = column
}
}
}
return pk
}

func (m TableMeta) GetPrimaryKeyOnlyName() []string {
keys := make([]string, 0)
for _, index := range m.Indexs {
Expand All @@ -104,6 +116,5 @@ func (m TableMeta) GetPrimaryKeyOnlyName() []string {
}
}
}

return keys
}
66 changes: 66 additions & 0 deletions pkg/datasource/sql/types/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,69 @@ const (
SQLTypeDropIndex
SQLTypeMulti
)

func (s SQLType) MarshalText() (text []byte, err error) {
switch s {
case SQLTypeSelect:
return []byte("SELECT"), nil
case SQLTypeInsert:
return []byte("INSERT"), nil
case SQLTypeUpdate:
return []byte("UPDATE"), nil
case SQLTypeDelete:
return []byte("DELETE"), nil
case SQLTypeSelectForUpdate:
return []byte("SELECT_FOR_UPDATE"), nil
case SQLTypeReplace:
return []byte("REPLACE"), nil
case SQLTypeTruncate:
return []byte("TRUNCATE"), nil
case SQLTypeCreate:
return []byte("CREATE"), nil
case SQLTypeDrop:
return []byte("DROP"), nil
case SQLTypeLoad:
return []byte("LOAD"), nil
case SQLTypeMerge:
return []byte("MERGE"), nil
case SQLTypeShow:
return []byte("SHOW"), nil
case SQLTypeAlter:
return []byte("ALTER"), nil
case SQLTypeRename:
return []byte("RENAME"), nil
case SQLTypeDump:
return []byte("DUMP"), nil
case SQLTypeDebug:
return []byte("DEBUG"), nil
case SQLTypeExplain:
return []byte("EXPLAIN"), nil
case SQLTypeDesc:
return []byte("DESC"), nil
case SQLTypeSet:
return []byte("SET"), nil
case SQLTypeReload:
return []byte("RELOAD"), nil
case SQLTypeSelectUnion:
return []byte("SELECT_UNION"), nil
case SQLTypeCreateTable:
return []byte("CREATE_TABLE"), nil
case SQLTypeDropTable:
return []byte("DROP_TABLE"), nil
case SQLTypeAlterTable:
return []byte("ALTER_TABLE"), nil
case SQLTypeSelectFromUpdate:
return []byte("SELECT_FROM_UPDATE"), nil
case SQLTypeMultiDelete:
return []byte("MULTI_DELETE"), nil
case SQLTypeMultiUpdate:
return []byte("MULTI_UPDATE"), nil
case SQLTypeCreateIndex:
return []byte("CREATE_INDEX"), nil
case SQLTypeDropIndex:
return []byte("DROP_INDEX"), nil
case SQLTypeMulti:
return []byte("MULTI"), nil
}
return []byte("INVALID_SQLTYPE"), nil
}
24 changes: 22 additions & 2 deletions pkg/datasource/sql/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package types

import (
"database/sql/driver"
"fmt"
"strings"

"github.com/seata/seata-go/pkg/protocol/branch"
Expand All @@ -42,6 +43,27 @@ const (
IndexTypePrimaryKey IndexType = 1
)

func (i IndexType) MarshalText() (text []byte, err error) {
switch i {
case IndexTypePrimaryKey:
return []byte("PRIMARY_KEY"), nil
}
return []byte("NULL"), nil
}

func (i *IndexType) UnmarshalText(text []byte) error {
switch string(text) {
case "PRIMARY_KEY":
*i = IndexTypePrimaryKey
return nil
case "NULL":
*i = IndexTypeNull
return nil
default:
return fmt.Errorf("invalid index type")
}
}

const (
_ DBType = iota
DBTypeUnknown
Expand Down Expand Up @@ -109,8 +131,6 @@ type TransactionContext struct {
ResourceID string
// BranchID transaction branch unique id
BranchID uint64
// XaID XA id
XaID string // todo delete
// XID global transaction id
XID string
// GlobalLockRequire
Expand Down
8 changes: 4 additions & 4 deletions pkg/datasource/sql/undo/base/undo.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
ErrorDeleteUndoLogParamsFault = errors.New("xid or branch_id can't nil")
)

const (
var (
checkUndoLogTableExistSql = "SELECT 1 FROM " + constant.UndoLogTableName + " LIMIT 1"
insertUndoLogSql = "INSERT INTO " + constant.UndoLogTableName + "(branch_id,xid,context,rollback_info,log_status,log_created,log_modified) VALUES (?, ?, ?, ?, ?, now(6), now(6))"
)
Expand Down Expand Up @@ -89,7 +89,7 @@ func (m *BaseUndoLogManager) InsertUndoLog(record undo.UndologRecord, conn drive
if err != nil {
return err
}
_, err = stmt.Exec([]driver.Value{record.BranchID, record.XID, record.Context, record.RollbackInfo, record.LogStatus})
_, err = stmt.Exec([]driver.Value{record.BranchID, record.XID, record.Context, record.RollbackInfo, int64(record.LogStatus)})
if err != nil {
return err
}
Expand Down Expand Up @@ -187,15 +187,15 @@ func (m *BaseUndoLogManager) FlushUndoLog(tranCtx *types.TransactionContext, con
}

// use defalut encode
undoLogContent, err := json.Marshal(branchUndoLog)
rollbackInfo, err := json.Marshal(branchUndoLog)
if err != nil {
return err
}

parseContext := make(map[string]string, 0)
parseContext[SerializerKey] = "jackson"
parseContext[CompressorTypeKey] = "NONE"
rollbackInfo, err := json.Marshal(parseContext)
undoLogContent, err := json.Marshal(parseContext)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/undo/builder/basic_undo_log_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (b *BasicUndoLogBuilder) buildRecordImages(rowsi driver.Rows, tableMetaData
columnMeta := tableMetaData.Columns[name]

keyType := types.IndexTypeNull
if data, ok := tableMetaData.Indexs[name]; ok {
keyType = data.IType
if _, ok := tableMetaData.GetPrimaryKeyMap()[name]; ok {
keyType = types.IndexTypePrimaryKey
}
jdbcType := types.GetJDBCTypeByTypeName(columnMeta.ColumnTypeInfo.DatabaseTypeName())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (u *MySQLMultiUpdateUndoLogBuilder) BeforeImage(ctx context.Context, execCt
return nil, err
}

image.SQLType = execCtx.ParseContext.SQLType

return []*types.RecordImage{image}, nil
}

Expand Down Expand Up @@ -130,6 +132,7 @@ func (u *MySQLMultiUpdateUndoLogBuilder) AfterImage(ctx context.Context, execCtx
return nil, err
}

image.SQLType = execCtx.ParseContext.SQLType
return []*types.RecordImage{image}, nil
}

Expand Down
Loading

0 comments on commit a16a38b

Please sign in to comment.