diff --git a/go.mod b/go.mod index f14164372..4f0ef02c3 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.16 require ( dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220508105316-b27ec53b7bab github.com/BurntSushi/toml v1.1.0 // indirect + github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/apache/dubbo-getty v1.4.8 github.com/dubbogo/gost v1.12.5 diff --git a/go.sum b/go.sum index d28d015c4..a2dee49ee 100644 --- a/go.sum +++ b/go.sum @@ -56,6 +56,7 @@ github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbi github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/CloudyKit/fastprinter v0.0.0-20170127035650-74b38d55f37a/go.mod h1:EFZQ978U7x8IRnstaskI3IysnWY5Ao3QgZUKOXlsAdw= github.com/CloudyKit/jet v2.1.3-0.20180809161101-62edd43e4f88+incompatible/go.mod h1:HPYO+50pSWkPoj9Q/eq0aRGByCL6ScRlUmiEX5Zgm+w= +github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= diff --git a/pkg/common/error/error.go b/pkg/common/error/error.go deleted file mode 100644 index ceccdd7f5..000000000 --- a/pkg/common/error/error.go +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package error - -import "github.com/pkg/errors" - -var ( - ErrorTooManySessions = errors.New("too many seeessions") - ErrorHeartBeatTimeOut = errors.New("heart beat time out") -) - -type TransactionExceptionCode byte - -const ( - /** - * Unknown transaction exception code. - */ - TransactionExceptionCodeUnknown = TransactionExceptionCode(0) - - /** - * BeginFailed - */ - TransactionExceptionCodeBeginFailed = TransactionExceptionCode(1) - - /** - * Lock key conflict transaction exception code. - */ - TransactionExceptionCodeLockKeyConflict = TransactionExceptionCode(2) - - /** - * Io transaction exception code. - */ - IO = TransactionExceptionCode(3) - /** - * Branch rollback failed retriable transaction exception code. - */ - TransactionExceptionCodeBranchRollbackFailedRetriable = TransactionExceptionCode(4) - - /** - * Branch rollback failed unretriable transaction exception code. - */ - TransactionExceptionCodeBranchRollbackFailedUnretriable = TransactionExceptionCode(5) - - /** - * Branch register failed transaction exception code. - */ - TransactionExceptionCodeBranchRegisterFailed = TransactionExceptionCode(6) - - /** - * Branch report failed transaction exception code. - */ - TransactionExceptionCodeBranchReportFailed = TransactionExceptionCode(7) - - /** - * Lockable check failed transaction exception code. - */ - TransactionExceptionCodeLockableCheckFailed = TransactionExceptionCode(8) - - /** - * Branch transaction not exist transaction exception code. - */ - TransactionExceptionCodeBranchTransactionNotExist = TransactionExceptionCode(9) - - /** - * Global transaction not exist transaction exception code. - */ - TransactionExceptionCodeGlobalTransactionNotExist = TransactionExceptionCode(10) - - /** - * Global transaction not active transaction exception code. - */ - TransactionExceptionCodeGlobalTransactionNotActive = TransactionExceptionCode(11) - - /** - * Global transaction status invalid transaction exception code. - */ - TransactionExceptionCodeGlobalTransactionStatusInvalid = TransactionExceptionCode(12) - - /** - * Failed to send branch commit request transaction exception code. - */ - TransactionExceptionCodeFailedToSendBranchCommitRequest = TransactionExceptionCode(13) - - /** - * Failed to send branch rollback request transaction exception code. - */ - TransactionExceptionCodeFailedToSendBranchRollbackRequest = TransactionExceptionCode(14) - - /** - * Failed to add branch transaction exception code. - */ - TransactionExceptionCodeFailedToAddBranch = TransactionExceptionCode(15) - - /** - * Failed to lock global transaction exception code. - */ - TransactionExceptionCodeFailedLockGlobalTranscation = TransactionExceptionCode(16) - - /** - * FailedWriteSession - */ - TransactionExceptionCodeFailedWriteSession = TransactionExceptionCode(17) - - /** - * Failed to holder exception code - */ - FailedStore = TransactionExceptionCode(18) - - /** - * Lock key conflict fail fast transaction exception code. - */ - LockKeyConflictFailFast = TransactionExceptionCode(19) -) - -type TransactionException struct { - Code TransactionExceptionCode - Message string -} - -func (e TransactionException) Error() string { - return "TransactionException: " + e.Message -} diff --git a/pkg/common/errors/error.go b/pkg/common/errors/error.go new file mode 100644 index 000000000..f1b6efd06 --- /dev/null +++ b/pkg/common/errors/error.go @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package errors + +import ( + "fmt" + + "github.com/pkg/errors" +) + +var ( + ErrorTooManySessions = errors.New("too many sessions") + ErrorHeartBeatTimeOut = errors.New("heart beat time out") +) + +type TransactionError struct { + Code byte + Message string +} + +func (e TransactionError) Error() string { + return fmt.Sprintf("TransactionError code %d, msg %s", e.Code, e.Message) +} + +type TccFenceError struct { + Code TransactionErrorCode + Message string + Parent error +} + +func (e TccFenceError) Error() string { + return fmt.Sprintf("TccFenceError code %d, msg %s, parent msg is %s", e.Code, e.Message, e.Parent) +} + +func NewTccFenceError(code TransactionErrorCode, msg string, parent error) *TccFenceError { + return &TccFenceError{ + code, + msg, + parent, + } +} diff --git a/pkg/common/errors/error_code.go b/pkg/common/errors/error_code.go new file mode 100644 index 000000000..edf4628a7 --- /dev/null +++ b/pkg/common/errors/error_code.go @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package errors + +type TransactionErrorCode int32 + +const ( + // TransactionErrorCodeUnknown Unknown transaction errors code. + TransactionErrorCodeUnknown = TransactionErrorCode(0) + + // TransactionErrorCodeBeginFailed BeginFailed + TransactionErrorCodeBeginFailed = TransactionErrorCode(1) + + // TransactionErrorCodeLockKeyConflict Lock key conflict transaction errors code. + TransactionErrorCodeLockKeyConflict = TransactionErrorCode(2) + + // Io transaction errors code. + IO = TransactionErrorCode(3) + + // TransactionErrorCodeBranchRollbackFailedRetriable Branch rollback failed retriable transaction errors code. + TransactionErrorCodeBranchRollbackFailedRetriable = TransactionErrorCode(4) + + // TransactionErrorCodeBranchRollbackFailedUnretriable Branch rollback failed unretriable transaction errors code. + TransactionErrorCodeBranchRollbackFailedUnretriable = TransactionErrorCode(5) + + // TransactionErrorCodeBranchRegisterFailed Branch register failed transaction errors code. + TransactionErrorCodeBranchRegisterFailed = TransactionErrorCode(6) + + // TransactionErrorCodeBranchReportFailed Branch report failed transaction errors code. + TransactionErrorCodeBranchReportFailed = TransactionErrorCode(7) + + // TransactionErrorCodeLockableCheckFailed Lockable check failed transaction errors code. + TransactionErrorCodeLockableCheckFailed = TransactionErrorCode(8) + + // TransactionErrorCodeBranchTransactionNotExist Branch transaction not exist transaction errors code. + TransactionErrorCodeBranchTransactionNotExist = TransactionErrorCode(9) + + // TransactionErrorCodeGlobalTransactionNotExist Global transaction not exist transaction errors code. + TransactionErrorCodeGlobalTransactionNotExist = TransactionErrorCode(10) + + // TransactionErrorCodeGlobalTransactionNotActive Global transaction not active transaction errors code. + TransactionErrorCodeGlobalTransactionNotActive = TransactionErrorCode(11) + + // TransactionErrorCodeGlobalTransactionStatusInvalid Global transaction status invalid transaction errors code. + TransactionErrorCodeGlobalTransactionStatusInvalid = TransactionErrorCode(12) + + // TransactionErrorCodeFailedToSendBranchCommitRequest Failed to send branch commit request transaction errors code. + TransactionErrorCodeFailedToSendBranchCommitRequest = TransactionErrorCode(13) + + // TransactionErrorCodeFailedToSendBranchRollbackRequest Failed to send branch rollback request transaction errors code. + TransactionErrorCodeFailedToSendBranchRollbackRequest = TransactionErrorCode(14) + + // TransactionErrorCodeFailedToAddBranch Failed to add branch transaction errors code. + TransactionErrorCodeFailedToAddBranch = TransactionErrorCode(15) + + // TransactionErrorCodeFailedLockGlobalTranscation Failed to lock global transaction errors code. + TransactionErrorCodeFailedLockGlobalTranscation = TransactionErrorCode(16) + + // TransactionErrorCodeFailedWriteSession FailedWriteSession + TransactionErrorCodeFailedWriteSession = TransactionErrorCode(17) + + // FailedStore Failed to holder errors code + FailedStore = TransactionErrorCode(18) + + // LockKeyConflictFailFast Lock key conflict fail fast transaction exception code. + LockKeyConflictFailFast = TransactionErrorCode(19) + + // TccFenceDbDuplicateKeyError Insert tcc fence record duplicate key errors + TccFenceDbDuplicateKeyError = TransactionErrorCode(20) + + // RollbackFenceError rollback tcc fence error + RollbackFenceError = TransactionErrorCode(21) + + // CommitFenceError commit tcc fence error + CommitFenceError = TransactionErrorCode(22) + + // TccFenceDbError query tcc fence prepare sql failed + TccFenceDbError = TransactionErrorCode(23) + + // PrepareFenceError prepare tcc fence error + PrepareFenceError = TransactionErrorCode(24) + + // FenceBusinessError callback business method maybe return this error type + FenceBusinessError = TransactionErrorCode(26) + + // FencePhaseError have fence phase but is not illegal value + FencePhaseError = TransactionErrorCode(27) +) diff --git a/pkg/protocol/codec/branch_commit_response_codec.go b/pkg/protocol/codec/branch_commit_response_codec.go index 734a131b4..aa4220443 100644 --- a/pkg/protocol/codec/branch_commit_response_codec.go +++ b/pkg/protocol/codec/branch_commit_response_codec.go @@ -20,8 +20,9 @@ package codec import ( "math" + serror "github.com/seata/seata-go/pkg/common/errors" + "github.com/seata/seata-go/pkg/common/bytes" - serror "github.com/seata/seata-go/pkg/common/error" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -41,7 +42,7 @@ func (g *BranchCommitResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString8Length(buf) } - data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf)) data.Xid = bytes.ReadString16Length(buf) data.BranchId = int64(bytes.ReadUInt64(buf)) data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf)) @@ -61,7 +62,7 @@ func (g *BranchCommitResponseCodec) Encode(in interface{}) []byte { } bytes.WriteString8Length(msg, buf) } - buf.WriteByte(byte(data.TransactionExceptionCode)) + buf.WriteByte(byte(data.TransactionErrorCode)) bytes.WriteString16Length(data.Xid, buf) buf.WriteInt64(data.BranchId) buf.WriteByte(byte(data.BranchStatus)) diff --git a/pkg/protocol/codec/branch_commit_response_codec_test.go b/pkg/protocol/codec/branch_commit_response_codec_test.go index 6e5b0a615..51193434a 100644 --- a/pkg/protocol/codec/branch_commit_response_codec_test.go +++ b/pkg/protocol/codec/branch_commit_response_codec_test.go @@ -20,12 +20,11 @@ package codec import ( "testing" - serror "github.com/seata/seata-go/pkg/common/error" + "github.com/stretchr/testify/assert" + serror "github.com/seata/seata-go/pkg/common/errors" model2 "github.com/seata/seata-go/pkg/protocol/branch" - "github.com/seata/seata-go/pkg/protocol/message" - "github.com/stretchr/testify/assert" ) func TestBranchCommitResponseCodec(t *testing.T) { @@ -35,7 +34,7 @@ func TestBranchCommitResponseCodec(t *testing.T) { BranchId: 56678, BranchStatus: model2.BranchStatusPhaseoneFailed, AbstractTransactionResponse: message.AbstractTransactionResponse{ - TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed, + TransactionErrorCode: serror.TransactionErrorCodeBeginFailed, AbstractResultMessage: message.AbstractResultMessage{ ResultCode: message.ResultCodeFailed, Msg: "FAILED", diff --git a/pkg/protocol/codec/branch_register_response_codec.go b/pkg/protocol/codec/branch_register_response_codec.go index cb12db1c5..dff976e63 100644 --- a/pkg/protocol/codec/branch_register_response_codec.go +++ b/pkg/protocol/codec/branch_register_response_codec.go @@ -21,7 +21,7 @@ import ( "math" "github.com/seata/seata-go/pkg/common/bytes" - serror "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/errors" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -40,7 +40,7 @@ func (g *BranchRegisterResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString16Length(buf) } - data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf)) data.BranchId = int64(bytes.ReadUInt64(buf)) return data @@ -58,7 +58,7 @@ func (c *BranchRegisterResponseCodec) Encode(in interface{}) []byte { } bytes.WriteString16Length(msg, buf) } - buf.WriteByte(byte(data.TransactionExceptionCode)) + buf.WriteByte(byte(data.TransactionErrorCode)) buf.WriteInt64(data.BranchId) return buf.Bytes() diff --git a/pkg/protocol/codec/branch_register_response_codec_test.go b/pkg/protocol/codec/branch_register_response_codec_test.go index 8a961cacb..1643f7151 100644 --- a/pkg/protocol/codec/branch_register_response_codec_test.go +++ b/pkg/protocol/codec/branch_register_response_codec_test.go @@ -20,7 +20,7 @@ package codec import ( "testing" - serror "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/errors" "github.com/seata/seata-go/pkg/protocol/message" "github.com/stretchr/testify/assert" @@ -33,7 +33,7 @@ func TestBranchRegisterResponseCodec(t *testing.T) { ResultCode: message.ResultCodeFailed, Msg: "FAILED", }, - TransactionExceptionCode: serror.TransactionExceptionCodeUnknown, + TransactionErrorCode: serror.TransactionErrorCodeUnknown, }, BranchId: 124356567, } diff --git a/pkg/protocol/codec/branch_rollback_response_codec.go b/pkg/protocol/codec/branch_rollback_response_codec.go index e4cb9ee6c..a8bf12d75 100644 --- a/pkg/protocol/codec/branch_rollback_response_codec.go +++ b/pkg/protocol/codec/branch_rollback_response_codec.go @@ -21,7 +21,7 @@ import ( "math" "github.com/seata/seata-go/pkg/common/bytes" - serror "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/errors" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -41,7 +41,7 @@ func (g *BranchRollbackResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString8Length(buf) } - data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf)) data.Xid = bytes.ReadString16Length(buf) data.BranchId = int64(bytes.ReadUInt64(buf)) data.BranchStatus = branch.BranchStatus(bytes.ReadByte(buf)) @@ -61,7 +61,7 @@ func (g *BranchRollbackResponseCodec) Encode(in interface{}) []byte { } bytes.WriteString8Length(msg, buf) } - buf.WriteByte(byte(data.TransactionExceptionCode)) + buf.WriteByte(byte(data.TransactionErrorCode)) bytes.WriteString16Length(data.Xid, buf) buf.WriteInt64(data.BranchId) buf.WriteByte(byte(data.BranchStatus)) diff --git a/pkg/protocol/codec/branch_rollback_response_codec_test.go b/pkg/protocol/codec/branch_rollback_response_codec_test.go index 0059ca691..620cb5c30 100644 --- a/pkg/protocol/codec/branch_rollback_response_codec_test.go +++ b/pkg/protocol/codec/branch_rollback_response_codec_test.go @@ -20,7 +20,7 @@ package codec import ( "testing" - serror "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/errors" model2 "github.com/seata/seata-go/pkg/protocol/branch" @@ -35,7 +35,7 @@ func TestBranchRollbackResponseCodec(t *testing.T) { BranchId: 56678, BranchStatus: model2.BranchStatusPhaseoneFailed, AbstractTransactionResponse: message.AbstractTransactionResponse{ - TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed, + TransactionErrorCode: serror.TransactionErrorCodeBeginFailed, AbstractResultMessage: message.AbstractResultMessage{ ResultCode: message.ResultCodeFailed, Msg: "FAILED", diff --git a/pkg/protocol/codec/common_global_end_response_codec.go b/pkg/protocol/codec/common_global_end_response_codec.go index cb9f521c4..4542d6b8b 100644 --- a/pkg/protocol/codec/common_global_end_response_codec.go +++ b/pkg/protocol/codec/common_global_end_response_codec.go @@ -21,7 +21,7 @@ import ( "math" "github.com/seata/seata-go/pkg/common/bytes" - serror "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/errors" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -40,7 +40,7 @@ func (c *CommonGlobalEndResponseCodec) Encode(in interface{}) []byte { } bytes.WriteString16Length(msg, buf) } - buf.WriteByte(byte(data.TransactionExceptionCode)) + buf.WriteByte(byte(data.TransactionErrorCode)) buf.WriteByte(byte(data.GlobalStatus)) return buf.Bytes() @@ -54,7 +54,7 @@ func (c *CommonGlobalEndResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString16Length(buf) } - data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf)) data.GlobalStatus = message.GlobalStatus(bytes.ReadByte(buf)) return data diff --git a/pkg/protocol/codec/global_begin_response_codec.go b/pkg/protocol/codec/global_begin_response_codec.go index 52eb66ce2..4dd808797 100644 --- a/pkg/protocol/codec/global_begin_response_codec.go +++ b/pkg/protocol/codec/global_begin_response_codec.go @@ -21,7 +21,7 @@ import ( "math" "github.com/seata/seata-go/pkg/common/bytes" - serror "github.com/seata/seata-go/pkg/common/error" + serror "github.com/seata/seata-go/pkg/common/errors" "github.com/seata/seata-go/pkg/protocol/message" ) @@ -44,7 +44,7 @@ func (c *GlobalBeginResponseCodec) Encode(in interface{}) []byte { } bytes.WriteString16Length(msg, buf) } - buf.WriteByte(byte(data.TransactionExceptionCode)) + buf.WriteByte(byte(data.TransactionErrorCode)) bytes.WriteString16Length(data.Xid, buf) bytes.WriteString16Length(string(data.ExtraData), buf) @@ -59,7 +59,7 @@ func (g *GlobalBeginResponseCodec) Decode(in []byte) interface{} { if data.ResultCode == message.ResultCodeFailed { data.Msg = bytes.ReadString16Length(buf) } - data.TransactionExceptionCode = serror.TransactionExceptionCode(bytes.ReadByte(buf)) + data.TransactionErrorCode = serror.TransactionErrorCode(bytes.ReadByte(buf)) data.Xid = bytes.ReadString16Length(buf) data.ExtraData = []byte(bytes.ReadString16Length(buf)) diff --git a/pkg/protocol/codec/global_begin_response_codec_test.go b/pkg/protocol/codec/global_begin_response_codec_test.go index 2b645fbac..28c262f43 100644 --- a/pkg/protocol/codec/global_begin_response_codec_test.go +++ b/pkg/protocol/codec/global_begin_response_codec_test.go @@ -20,10 +20,10 @@ package codec import ( "testing" - serror "github.com/seata/seata-go/pkg/common/error" + "github.com/stretchr/testify/assert" + serror "github.com/seata/seata-go/pkg/common/errors" "github.com/seata/seata-go/pkg/protocol/message" - "github.com/stretchr/testify/assert" ) func TestGlobalBeginResponseCodec(t *testing.T) { @@ -33,7 +33,7 @@ func TestGlobalBeginResponseCodec(t *testing.T) { ResultCode: message.ResultCodeFailed, Msg: "FAILED", }, - TransactionExceptionCode: serror.TransactionExceptionCodeBeginFailed, + TransactionErrorCode: serror.TransactionErrorCodeBeginFailed, }, Xid: "test-transaction-id", diff --git a/pkg/protocol/message/response_message.go b/pkg/protocol/message/response_message.go index fc23c34a8..988d4c3c4 100644 --- a/pkg/protocol/message/response_message.go +++ b/pkg/protocol/message/response_message.go @@ -18,13 +18,13 @@ package message import ( - serror "github.com/seata/seata-go/pkg/common/error" + "github.com/seata/seata-go/pkg/common/errors" model2 "github.com/seata/seata-go/pkg/protocol/branch" ) type AbstractTransactionResponse struct { AbstractResultMessage - TransactionExceptionCode serror.TransactionExceptionCode + TransactionErrorCode errors.TransactionErrorCode } type AbstractBranchEndResponse struct { diff --git a/pkg/remoting/processor/client/rm_branch_commit_processor.go b/pkg/remoting/processor/client/rm_branch_commit_processor.go index f92b72eff..08749d5c5 100644 --- a/pkg/remoting/processor/client/rm_branch_commit_processor.go +++ b/pkg/remoting/processor/client/rm_branch_commit_processor.go @@ -52,7 +52,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, branchResource) if err != nil { - log.Infof("branch commit error: %s", err.Error()) + log.Errorf("branch commit error: %s", err.Error()) return err } log.Infof("branch commit success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) @@ -69,7 +69,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag } // reply commit response to tc server - // todo add TransactionExceptionCode + // todo add TransactionErrorCode response := message.BranchCommitResponse{ AbstractBranchEndResponse: message.AbstractBranchEndResponse{ AbstractTransactionResponse: message.AbstractTransactionResponse{ diff --git a/pkg/remoting/processor/client/rm_branch_rollback_processor.go b/pkg/remoting/processor/client/rm_branch_rollback_processor.go index 4d230edd1..6d3fda422 100644 --- a/pkg/remoting/processor/client/rm_branch_rollback_processor.go +++ b/pkg/remoting/processor/client/rm_branch_rollback_processor.go @@ -52,7 +52,7 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess } status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, branchResource) if err != nil { - log.Infof("branch rollback error: %s", err.Error()) + log.Errorf("branch rollback error: %s", err.Error()) return err } log.Infof("branch rollback success: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData) diff --git a/pkg/rm/tcc/fence/config/tcc_fence_config.go b/pkg/rm/tcc/fence/config/tcc_fence_config.go new file mode 100644 index 000000000..d66f167e0 --- /dev/null +++ b/pkg/rm/tcc/fence/config/tcc_fence_config.go @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "go.uber.org/atomic" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/handler" +) + +type TccFenceConfig struct { + Initialized atomic.Bool `default:"false"` + LogTableName string `default:"tcc_fence_log"` +} + +func InitFence() { + // todo implement +} + +func InitCleanTask() { + handler.GetFenceHandler().InitLogCleanChannel() +} + +func Destroy() { + handler.GetFenceHandler().DestroyLogCleanChannel() +} diff --git a/pkg/rm/tcc/fence/enum/fence_phase.go b/pkg/rm/tcc/fence/enum/fence_phase.go new file mode 100644 index 000000000..c056100df --- /dev/null +++ b/pkg/rm/tcc/fence/enum/fence_phase.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package enum + +// FencePhase used to mark the stage of a tcc transaction +type FencePhase byte + +const ( + // FencePhaseNotExist fence phase not exist + FencePhaseNotExist = FencePhase(0) + + // FencePhasePrepare prepare fence phase + FencePhasePrepare = FencePhase(1) + + // FencePhaseCommit commit fence phase + FencePhaseCommit = FencePhase(2) + + // FencePhaseRollback rollback fence phase + FencePhaseRollback = FencePhase(3) +) diff --git a/pkg/rm/tcc/fence/enum/fence_status.go b/pkg/rm/tcc/fence/enum/fence_status.go new file mode 100644 index 000000000..a19f8cb28 --- /dev/null +++ b/pkg/rm/tcc/fence/enum/fence_status.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package enum + +// FenceStatus Used to mark the state of a branch transaction +type FenceStatus byte + +const ( + // StatusTried phase 1: the commit tried. + StatusTried = FenceStatus(1) + + // StatusCommitted phase 2: the committed. + StatusCommitted = FenceStatus(2) + + // StatusRollbacked phase 2: the rollbacked. + StatusRollbacked = FenceStatus(3) + + // StatusSuspended suspended status. + StatusSuspended = FenceStatus(4) +) diff --git a/pkg/rm/tcc/fence/fence_api.go b/pkg/rm/tcc/fence/fence_api.go new file mode 100644 index 000000000..74a58416a --- /dev/null +++ b/pkg/rm/tcc/fence/fence_api.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fence + +import ( + "context" + "database/sql" + "fmt" + + "github.com/seata/seata-go/pkg/common/errors" + "github.com/seata/seata-go/pkg/common/log" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/rm/tcc/fence/handler" + "github.com/seata/seata-go/pkg/tm" +) + +// WithFence This method is a suspended API interface that asserts the phase timing of a transaction +// and performs corresponding database operations to ensure transaction consistency +// case 1: if fencePhase is FencePhaseNotExist, will return a fence not found error. +// case 2: if fencePhase is FencePhasePrepare, will do prepare fence operation. +// case 3: if fencePhase is FencePhaseCommit, will do commit fence operation. +// case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation. +// case 5: if fencePhase not in above case, will return a fence phase illegal error. +func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) { + fp := tm.GetFencePhase(ctx) + h := handler.GetFenceHandler() + + switch { + case fp == enum.FencePhaseNotExist: + err = errors.NewTccFenceError( + errors.FencePhaseError, + fmt.Sprintf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)), + nil, + ) + case fp == enum.FencePhasePrepare: + err = h.PrepareFence(ctx, tx, callback) + case fp == enum.FencePhaseCommit: + err = h.CommitFence(ctx, tx, callback) + case fp == enum.FencePhaseRollback: + err = h.RollbackFence(ctx, tx, callback) + default: + err = errors.NewTccFenceError( + errors.FencePhaseError, + fmt.Sprintf("fence phase: %v illegal", fp), + nil, + ) + } + + if err != nil { + log.Error(err) + } + + return +} diff --git a/pkg/rm/tcc/fence/fence_api_test.go b/pkg/rm/tcc/fence/fence_api_test.go new file mode 100644 index 000000000..8255fda33 --- /dev/null +++ b/pkg/rm/tcc/fence/fence_api_test.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fence + +import ( + "context" + "database/sql" + "fmt" + "testing" + + "github.com/seata/seata-go/pkg/common/errors" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/assert" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/tm" +) + +func TestWithFence(t *testing.T) { + tests := []struct { + xid string + txName string + fencePhase enum.FencePhase + bac tm.BusinessActionContext + callback func() error + wantErr bool + errStr string + wantCommit bool + wantRollback bool + }{ + { + xid: "123", + txName: "test", + callback: func() error { + return nil + }, + wantErr: true, + errStr: errors.NewTccFenceError( + errors.FencePhaseError, + fmt.Sprintf("xid 123, tx name test, fence phase not exist"), + nil, + ).Error(), + }, + } + + for _, v := range tests { + db, mock, _ := sqlmock.New() + mock.ExpectBegin() + if v.wantCommit { + mock.ExpectCommit() + } + if v.wantRollback { + mock.ExpectRollback() + } + ctx := context.Background() + ctx = tm.InitSeataContext(ctx) + tm.SetXID(ctx, v.xid) + tm.SetTxName(ctx, v.txName) + tm.SetFencePhase(ctx, v.fencePhase) + tm.SetBusinessActionContext(ctx, &v.bac) + tx, _ := db.BeginTx(ctx, &sql.TxOptions{}) + + if v.wantErr { + assert.Equal(t, v.errStr, WithFence(ctx, tx, v.callback).Error()) + } else { + assert.Nil(t, WithFence(ctx, tx, v.callback)) + } + } +} diff --git a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go new file mode 100644 index 000000000..5ed7e5334 --- /dev/null +++ b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package handler + +import ( + "container/list" + "context" + "database/sql" + "fmt" + "sync" + "time" + + seataErrors "github.com/seata/seata-go/pkg/common/errors" + "github.com/seata/seata-go/pkg/common/log" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/dao" + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model" + "github.com/seata/seata-go/pkg/tm" +) + +type tccFenceWrapperHandler struct { + tccFenceDao dao.TCCFenceStore + logQueue chan *FenceLogIdentity + logCache list.List + logQueueOnce sync.Once + logQueueCloseOnce sync.Once +} + +type FenceLogIdentity struct { + xid string + branchId int64 +} + +const ( + maxQueueSize = 500 +) + +var ( + fenceHandler *tccFenceWrapperHandler + fenceOnce sync.Once +) + +func GetFenceHandler() *tccFenceWrapperHandler { + if fenceHandler == nil { + fenceOnce.Do(func() { + fenceHandler = &tccFenceWrapperHandler{ + tccFenceDao: dao.GetTccFenceStoreDatabaseMapper(), + } + }) + } + return fenceHandler +} + +func (handler *tccFenceWrapperHandler) PrepareFence(ctx context.Context, tx *sql.Tx, callback func() error) error { + xid := tm.GetBusinessActionContext(ctx).Xid + branchId := tm.GetBusinessActionContext(ctx).BranchId + actionName := tm.GetBusinessActionContext(ctx).ActionName + + err := handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusTried) + if err != nil { + dbError, ok := err.(seataErrors.TccFenceError) + if ok && dbError.Code == seataErrors.TccFenceDbDuplicateKeyError { + // todo add clean command to channel. + handler.pushCleanChannel(xid, branchId) + } + + return seataErrors.NewTccFenceError( + seataErrors.PrepareFenceError, + fmt.Sprintf("insert tcc fence record errors, prepare fence failed. xid= %s, branchId= %d", xid, branchId), + err, + ) + } + + log.Info("the phase 1 callback method will be called.") + err = callback() + if err != nil { + return seataErrors.NewTccFenceError( + seataErrors.FenceBusinessError, + fmt.Sprintf("the business method error msg of: %p", callback), + err, + ) + } + + return nil +} + +func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql.Tx, callback func() error) error { + xid := tm.GetBusinessActionContext(ctx).Xid + branchId := tm.GetBusinessActionContext(ctx).BranchId + + fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) + if err != nil { + return seataErrors.NewTccFenceError(seataErrors.CommitFenceError, + fmt.Sprintf(" commit fence method failed. xid= %s, branchId= %d ", xid, branchId), + err, + ) + } + if fenceDo == nil { + return seataErrors.NewTccFenceError(seataErrors.CommitFenceError, + fmt.Sprintf("tcc fence record not exists, commit fence method failed. xid= %s, branchId= %d ", xid, branchId), + err, + ) + } + + if fenceDo.Status == enum.StatusCommitted { + log.Infof("branch transaction has already committed before. idempotency rejected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + return nil + } + if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { + // enable warn level + log.Warnf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) + return seataErrors.NewTccFenceError(seataErrors.CommitFenceError, + fmt.Sprintf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status), + nil, + ) + } + + return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusCommitted) +} + +func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx, callback func() error) error { + xid := tm.GetBusinessActionContext(ctx).Xid + branchId := tm.GetBusinessActionContext(ctx).BranchId + actionName := tm.GetBusinessActionContext(ctx).ActionName + fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) + + if err != nil { + return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError, + fmt.Sprintf(" rollback fence method failed. xid= %s, branchId= %d ", xid, branchId), + err, + ) + } + + // record is null, mean the need suspend + if fenceDo == nil { + err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended) + if err != nil { + return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError, + fmt.Sprintf("insert tcc fence suspend record error, rollback fence method failed. xid= %s, branchId= %d", xid, branchId), + err, + ) + } + log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId) + return nil + } + + // have rollbacked or suspended + if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { + // enable warn level + log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) + return nil + } + if fenceDo.Status == enum.StatusCommitted { + log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + return seataErrors.NewTccFenceError(seataErrors.RollbackFenceError, + fmt.Sprintf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status), + err, + ) + } + + return handler.updateFenceStatusAndInvokeCallback(tx, callback, xid, branchId, enum.StatusRollbacked) +} + +func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error { + tccFenceDo := model.TCCFenceDO{ + Xid: xid, + BranchId: branchId, + ActionName: actionName, + Status: status, + } + return handler.tccFenceDao.InsertTCCFenceDO(tx, &tccFenceDo) +} + +func (handler *tccFenceWrapperHandler) updateFenceStatusAndInvokeCallback(tx *sql.Tx, callback func() error, xid string, branchId int64, status enum.FenceStatus) error { + if err := handler.tccFenceDao.UpdateTCCFenceDO(tx, xid, branchId, enum.StatusTried, status); err != nil { + return err + } + + log.Infof("the phase %d callback method will be called", status) + if err := callback(); err != nil { + return seataErrors.NewTccFenceError( + seataErrors.FenceBusinessError, + fmt.Sprintf("the business method error msg of: %p", callback), + err, + ) + } + + return nil +} + +func (handler *tccFenceWrapperHandler) InitLogCleanChannel() { + handler.logQueueOnce.Do(func() { + go handler.traversalCleanChannel() + }) +} + +func (handler *tccFenceWrapperHandler) DestroyLogCleanChannel() { + handler.logQueueCloseOnce.Do(func() { + close(handler.logQueue) + }) +} + +func (handler *tccFenceWrapperHandler) deleteFence(xid string, id int64) error { + // todo implement + return nil +} + +func (handler *tccFenceWrapperHandler) deleteFenceByDate(datetime time.Time) int32 { + // todo implement + return 0 +} + +func (handler *tccFenceWrapperHandler) pushCleanChannel(xid string, branchId int64) { + // todo implement + fli := &FenceLogIdentity{ + xid: xid, + branchId: branchId, + } + select { + case handler.logQueue <- fli: + // todo add batch delete from log cache. + default: + handler.logCache.PushBack(fli) + } + log.Infof("add one log to clean queue: %v ", fli) +} + +func (handler *tccFenceWrapperHandler) traversalCleanChannel() { + handler.logQueue = make(chan *FenceLogIdentity, maxQueueSize) + for li := range handler.logQueue { + if err := handler.deleteFence(li.xid, li.branchId); err != nil { + log.Errorf("delete fence log failed, xid: %s, branchId: &s", li.xid, li.branchId) + } + } +} diff --git a/pkg/rm/tcc/fence/store/db/dao/store_api.go b/pkg/rm/tcc/fence/store/db/dao/store_api.go new file mode 100644 index 000000000..8dd2a37f9 --- /dev/null +++ b/pkg/rm/tcc/fence/store/db/dao/store_api.go @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dao + +import ( + "database/sql" + "time" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model" +) + +// The TCC Fence Store +type TCCFenceStore interface { + + // QueryTCCFenceDO tcc fence do. + // param tx the tx will bind with user business method + // param xid the global transaction id + // param branchId the branch transaction id + // return the tcc fence do and error msg + QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error) + + // InsertTCCFenceDO tcc fence do boolean. + // param tx the tx will bind with user business method + // param tccFenceDO the tcc fence do + // return the error msg + InsertTCCFenceDO(tx *sql.Tx, tccFenceDo *model.TCCFenceDO) error + + // UpdateTCCFenceDO tcc fence do boolean. + // param tx the tx will bind with user business method + // param xid the global transaction id + // param branchId the branch transaction id + // param newStatus the new status + // return the error msg + UpdateTCCFenceDO(tx *sql.Tx, xid string, branchId int64, oldStatus enum.FenceStatus, newStatus enum.FenceStatus) error + + // DeleteTCCFenceDO tcc fence do boolean. + // param tx the tx will bind with user business method + // param xid the global transaction id + // param branchId the branch transaction id + // return the error msg + DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error + + // DeleteTCCFenceDOByMdfDate tcc fence by datetime. + // param tx the tx will bind with user business method + // param datetime modify time + // return the error msg + DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error + + // SetLogTableName LogTable Name + // param logTableName logTableName + SetLogTableName(logTable string) +} diff --git a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go new file mode 100644 index 000000000..da1a9aee4 --- /dev/null +++ b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dao + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + "github.com/go-sql-driver/mysql" + + "github.com/seata/seata-go/pkg/common/errors" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model" + sql2 "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/sql" +) + +var ( + once sync.Once + tccFenceStoreDatabaseMapper *TccFenceStoreDatabaseMapper +) + +func GetTccFenceStoreDatabaseMapper() *TccFenceStoreDatabaseMapper { + if tccFenceStoreDatabaseMapper == nil { + once.Do(func() { + tccFenceStoreDatabaseMapper = &TccFenceStoreDatabaseMapper{} + tccFenceStoreDatabaseMapper.InitLogTableName() + }) + } + return tccFenceStoreDatabaseMapper + +} + +func (t *TccFenceStoreDatabaseMapper) InitLogTableName() { + // todo get log table name from config + // set log table name + // default name is tcc_fence_log + t.logTableName = "tcc_fence_log" +} + +type TccFenceStoreDatabaseMapper struct { + logTableName string +} + +func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, branchId int64) (*model.TCCFenceDO, error) { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetQuerySQLByBranchIdAndXid(t.logTableName)) + if err != nil { + return nil, errors.NewTccFenceError(errors.TccFenceDbError, "query tcc fence prepare sql failed", err) + } + defer prepareStmt.Close() + + result := prepareStmt.QueryRow(xid, branchId) + var ( + actionName string + status enum.FenceStatus + gmtCreate time.Time + gmtModify time.Time + ) + + if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil { + // will return error, if rows is empty + if err.Error() == "sql: no rows in result set" { + return nil, errors.NewTccFenceError(errors.TccFenceDbError, "query tcc fence get scan row,no rows in result set", err) + } else { + return nil, errors.NewTccFenceError(errors.TccFenceDbError, "query tcc fence get scan row failed", err) + } + } + + tccFenceDo := &model.TCCFenceDO{ + Xid: xid, + BranchId: branchId, + ActionName: actionName, + Status: status, + GmtModified: gmtModify, + GmtCreate: gmtCreate, + } + return tccFenceDo, nil +} + +func (t *TccFenceStoreDatabaseMapper) InsertTCCFenceDO(tx *sql.Tx, tccFenceDo *model.TCCFenceDO) error { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetInsertLocalTCCLogSQL(t.logTableName)) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "insert tcc fence prepare sql failed", err) + } + defer prepareStmt.Close() + + timeNow := time.Now() + result, err := prepareStmt.Exec(tccFenceDo.Xid, tccFenceDo.BranchId, tccFenceDo.ActionName, tccFenceDo.Status, timeNow, timeNow) + if err != nil { + if mysqlError, ok := err.(*mysql.MySQLError); ok && mysqlError.Number == 1062 { + return errors.NewTccFenceError(errors.TccFenceDbDuplicateKeyError, + fmt.Sprintf("Insert tcc fence record duplicate key exception. xid= %s, branchId= %d", tccFenceDo.Xid, tccFenceDo.BranchId), + err) + } else { + return errors.NewTccFenceError(errors.TccFenceDbError, "insert tcc fence exec sql failed", err) + } + } + + affected, err := result.RowsAffected() + if err != nil || affected == 0 { + return errors.NewTccFenceError(errors.TccFenceDbError, "insert tcc fence get row affect failed", err) + } + + return nil +} + +func (t *TccFenceStoreDatabaseMapper) UpdateTCCFenceDO(tx *sql.Tx, xid string, branchId int64, oldStatus enum.FenceStatus, newStatus enum.FenceStatus) error { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetUpdateStatusSQLByBranchIdAndXid(t.logTableName)) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "update tcc fence prepare sql failed", err) + } + defer prepareStmt.Close() + + result, err := prepareStmt.Exec(newStatus, time.Now(), xid, branchId, oldStatus) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "update tcc fence exec sql failed", err) + } + + affected, err := result.RowsAffected() + if err != nil || affected == 0 { + return errors.NewTccFenceError(errors.TccFenceDbError, "update tcc fence get row affect failed", err) + } + + return nil +} + +func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDO(tx *sql.Tx, xid string, branchId int64) error { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByBranchIdAndXid(t.logTableName)) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence prepare sql failed ", err) + } + defer prepareStmt.Close() + + result, err := prepareStmt.Exec(xid, branchId) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence execute sql failed", err) + } + + affected, err := result.RowsAffected() + if err != nil || affected == 0 { + return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence get rows affected failed", err) + } + + return nil +} + +func (t *TccFenceStoreDatabaseMapper) DeleteTCCFenceDOByMdfDate(tx *sql.Tx, datetime time.Time) error { + prepareStmt, err := tx.PrepareContext(context.Background(), sql2.GetDeleteSQLByMdfDateAndStatus(t.logTableName)) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence prepare sql failed", err) + } + defer prepareStmt.Close() + + result, err := prepareStmt.Exec(datetime) + if err != nil { + return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence exec sql failed", err) + } + + affected, err := result.RowsAffected() + if err != nil || affected == 0 { + return errors.NewTccFenceError(errors.TccFenceDbError, "delete tcc fence get rows affected failed", err) + } + + return nil +} + +func (t *TccFenceStoreDatabaseMapper) SetLogTableName(logTable string) { + t.logTableName = logTable +} diff --git a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go new file mode 100644 index 000000000..e205a8ebe --- /dev/null +++ b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db_test.go @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package dao + +import ( + "context" + "database/sql" + "database/sql/driver" + "math" + "reflect" + "testing" + "time" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + _ "github.com/go-sql-driver/mysql" + "github.com/stretchr/testify/assert" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" + "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/model" + sql2 "github.com/seata/seata-go/pkg/rm/tcc/fence/store/db/sql" +) + +func TestTccFenceStoreDatabaseMapper_SetLogTableName(t *testing.T) { + GetTccFenceStoreDatabaseMapper().SetLogTableName("tcc_fence_log") + value := reflect.ValueOf(GetTccFenceStoreDatabaseMapper()) + assert.Equal(t, "tcc_fence_log", value.Elem().FieldByName("logTableName").String()) +} + +func TestTccFenceStoreDatabaseMapper_InsertTCCFenceDO(t *testing.T) { + tccFenceDo := &model.TCCFenceDO{ + Xid: "123123124124", + BranchId: 12312312312, + ActionName: "fence_test", + Status: enum.StatusSuspended, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GetInsertLocalTCCLogSQL("tcc_fence_log")). + ExpectExec(). + WithArgs(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId), driver.Value(tccFenceDo.ActionName), + driver.Value(tccFenceDo.Status), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + + err = GetTccFenceStoreDatabaseMapper().InsertTCCFenceDO(tx, tccFenceDo) + tx.Commit() + assert.Equal(t, nil, err) +} + +func TestTccFenceStoreDatabaseMapper_QueryTCCFenceDO(t *testing.T) { + now := time.Now() + tccFenceDo := &model.TCCFenceDO{ + Xid: "123123124124", + BranchId: 12312312312, + ActionName: "fence_test", + Status: enum.StatusTried, + GmtCreate: now, + GmtModified: now, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GetQuerySQLByBranchIdAndXid("tcc_fence_log")). + ExpectQuery(). + WithArgs(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId)). + WillReturnRows(sqlmock.NewRows([]string{"xid", "branch_id", "action_name", "status", "gmt_create", "gmt_modified"}). + AddRow(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId), driver.Value(tccFenceDo.ActionName), + driver.Value(tccFenceDo.Status), driver.Value(now), driver.Value(now))) + mock.ExpectCommit() + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + + actualFenceDo, err := GetTccFenceStoreDatabaseMapper().QueryTCCFenceDO(tx, tccFenceDo.Xid, tccFenceDo.BranchId) + tx.Commit() + assert.Equal(t, tccFenceDo.Xid, actualFenceDo.Xid) + assert.Equal(t, tccFenceDo.BranchId, actualFenceDo.BranchId) + assert.Equal(t, tccFenceDo.Status, actualFenceDo.Status) + assert.Equal(t, tccFenceDo.ActionName, actualFenceDo.ActionName) + assert.NotEmpty(t, actualFenceDo.GmtModified) + assert.NotEmpty(t, actualFenceDo.GmtCreate) + assert.Nil(t, err) +} + +func TestTccFenceStoreDatabaseMapper_UpdateTCCFenceDO(t *testing.T) { + now := time.Now() + tccFenceDo := &model.TCCFenceDO{ + Xid: "123123124124", + BranchId: 12312312312, + ActionName: "fence_test", + Status: enum.StatusTried, + GmtCreate: now, + GmtModified: now, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GetUpdateStatusSQLByBranchIdAndXid("tcc_fence_log")). + ExpectExec(). + WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + + err = GetTccFenceStoreDatabaseMapper(). + UpdateTCCFenceDO(tx, tccFenceDo.Xid, tccFenceDo.BranchId, tccFenceDo.Status, enum.StatusCommitted) + tx.Commit() + assert.Equal(t, nil, err) +} + +func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDO(t *testing.T) { + now := time.Now() + tccFenceDo := &model.TCCFenceDO{ + Xid: "123123124124", + BranchId: 12312312312, + ActionName: "fence_test", + Status: enum.StatusTried, + GmtCreate: now, + GmtModified: now, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GetDeleteSQLByBranchIdAndXid("tcc_fence_log")). + ExpectExec(). + WithArgs(driver.Value(tccFenceDo.Xid), driver.Value(tccFenceDo.BranchId)). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + + err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDO(tx, tccFenceDo.Xid, tccFenceDo.BranchId) + tx.Commit() + assert.Equal(t, nil, err) +} + +func TestTccFenceStoreDatabaseMapper_DeleteTCCFenceDOByMdfDate(t *testing.T) { + now := time.Now() + tccFenceDo := &model.TCCFenceDO{ + GmtCreate: now, + } + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + if err != nil { + t.Fatalf("open db failed msg: %v", err) + } + defer db.Close() + mock.ExpectBegin() + mock.ExpectPrepare(sql2.GetDeleteSQLByMdfDateAndStatus("tcc_fence_log")). + ExpectExec(). + WithArgs(driver.Value(tccFenceDo.GmtModified.Add(math.MaxInt32))). + WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectCommit() + + tx, err := db.BeginTx(context.Background(), &sql.TxOptions{}) + if err != nil { + t.Fatalf("open conn failed msg :%v", err) + } + err = GetTccFenceStoreDatabaseMapper().DeleteTCCFenceDOByMdfDate(tx, tccFenceDo.GmtModified.Add(math.MaxInt32)) + tx.Commit() + assert.Equal(t, nil, err) +} diff --git a/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go b/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go new file mode 100644 index 000000000..9347a1c7d --- /dev/null +++ b/pkg/rm/tcc/fence/store/db/model/tcc_fence_do.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +import ( + "time" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" +) + +type TCCFenceDO struct { + + // Xid the global transaction id + Xid string + + // BranchId the branch transaction id + BranchId int64 + + // ActionName the action name + ActionName string + + // Status the tcc fence status + // tried: 1; committed: 2; rollbacked: 3; suspended: 4 + Status enum.FenceStatus + + // GmtCreate create time + GmtCreate time.Time + + // GmtModified update time + GmtModified time.Time +} diff --git a/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go b/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go new file mode 100644 index 000000000..4052fb57d --- /dev/null +++ b/pkg/rm/tcc/fence/store/db/sql/tcc_fence_store_sql.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sql + +import ( + "fmt" + "strconv" + + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" +) + +var ( + // localTccLogPlaced The enum LocalTccLogPlaced + localTccLogPlaced = " %s " + + // insertLocalTccLog The enum InsertLocalTccLog + insertLocalTccLog = "insert into " + localTccLogPlaced + " (xid, branch_id, action_name, status, gmt_create, gmt_modified) values ( ?,?,?,?,?,?)" + + // queryByBranchIdAndXid The enum QueryByBranchIdAndXid + queryByBranchIdAndXid = "select xid, branch_id, action_name, status, gmt_create, gmt_modified from " + localTccLogPlaced + " where xid = ? and branch_id = ? for update" + + // updateStatusByBranchIdAndXid The enum UpdateStatusByBranchIdAndXid + updateStatusByBranchIdAndXid = "update " + localTccLogPlaced + " set status = ?, gmt_modified = ? where xid = ? and branch_id = ? and status = ? " + + // deleteByBranchIdAndXid The enum DeleteByBranchIdAndXid + deleteByBranchIdAndXid = "delete from " + localTccLogPlaced + " where xid = ? and branch_id = ? " + + // deleteByDateAndStatus The enum DeleteByDateAndStatus + deleteByDateAndStatus = "delete from " + localTccLogPlaced + " where gmt_modified < ? and status in (" + strconv.Itoa(int(enum.StatusCommitted)) + " , " + strconv.Itoa(int(enum.StatusRollbacked)) + " , " + strconv.Itoa(int(enum.StatusSuspended)) + ")" +) + +func GetInsertLocalTCCLogSQL(localTccTable string) string { + return fmt.Sprintf(insertLocalTccLog, localTccTable) + +} + +func GetQuerySQLByBranchIdAndXid(localTccTable string) string { + return fmt.Sprintf(queryByBranchIdAndXid, localTccTable) +} + +func GetUpdateStatusSQLByBranchIdAndXid(localTccTable string) string { + return fmt.Sprintf(updateStatusByBranchIdAndXid, localTccTable) +} + +func GetDeleteSQLByBranchIdAndXid(localTccTable string) string { + return fmt.Sprintf(deleteByBranchIdAndXid, localTccTable) + +} + +func GetDeleteSQLByMdfDateAndStatus(localTccTable string) string { + return fmt.Sprintf(deleteByDateAndStatus, localTccTable) +} diff --git a/pkg/rm/tcc/tcc_resource.go b/pkg/rm/tcc/tcc_resource.go index 2a7f070fa..40b6dc6b2 100644 --- a/pkg/rm/tcc/tcc_resource.go +++ b/pkg/rm/tcc/tcc_resource.go @@ -27,6 +27,7 @@ import ( "github.com/seata/seata-go/pkg/common/log" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/rm" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" "github.com/seata/seata-go/pkg/tm" ) @@ -132,7 +133,15 @@ func (t *TCCResourceManager) BranchCommit(ctx context.Context, branchResource rm tccResource, _ = resource.(*TCCResource) } - _, err := tccResource.TwoPhaseAction.Commit(ctx, t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData)) + businessActionContext := t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData) + + // to set up the fence phase + ctx = tm.InitSeataContext(ctx) + tm.SetXID(ctx, branchResource.Xid) + tm.SetFencePhase(ctx, enum.FencePhaseCommit) + tm.SetBusinessActionContext(ctx, businessActionContext) + + _, err := tccResource.TwoPhaseAction.Commit(ctx, businessActionContext) if err != nil { return branch.BranchStatusPhasetwoCommitFailedRetryable, err } @@ -169,7 +178,15 @@ func (t *TCCResourceManager) BranchRollback(ctx context.Context, branchResource tccResource, _ = resource.(*TCCResource) } - _, err := tccResource.TwoPhaseAction.Rollback(ctx, t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData)) + businessActionContext := t.getBusinessActionContext(branchResource.Xid, branchResource.BranchId, branchResource.ResourceId, branchResource.ApplicationData) + + // to set up the fence phase + ctx = tm.InitSeataContext(ctx) + tm.SetXID(ctx, branchResource.Xid) + tm.SetFencePhase(ctx, enum.FencePhaseRollback) + tm.SetBusinessActionContext(ctx, businessActionContext) + + _, err := tccResource.TwoPhaseAction.Rollback(ctx, businessActionContext) if err != nil { return branch.BranchStatusPhasetwoRollbackFailedRetryable, err } diff --git a/pkg/rm/tcc/tcc_service.go b/pkg/rm/tcc/tcc_service.go index 0d0dcf1dd..af40d6242 100644 --- a/pkg/rm/tcc/tcc_service.go +++ b/pkg/rm/tcc/tcc_service.go @@ -25,12 +25,14 @@ import ( "time" "github.com/pkg/errors" + "github.com/seata/seata-go/pkg/common" "github.com/seata/seata-go/pkg/common/log" "github.com/seata/seata-go/pkg/common/net" "github.com/seata/seata-go/pkg/common/types" "github.com/seata/seata-go/pkg/protocol/branch" "github.com/seata/seata-go/pkg/rm" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" "github.com/seata/seata-go/pkg/tm" ) @@ -81,6 +83,9 @@ func (t *TCCServiceProxy) Prepare(ctx context.Context, params interface{}) (inte return nil, err } } + + // to set up the fence phase + tm.SetFencePhase(ctx, enum.FencePhasePrepare) return t.TCCResource.Prepare(ctx, params) } @@ -176,7 +181,7 @@ func (t *TCCServiceProxy) initBusinessActionContext(ctx context.Context, params // 1. null: create new BusinessActionContext // 2. tm.BusinessActionContext: return it // 3. *tm.BusinessActionContext: if nil then create new BusinessActionContext, else return it -// 4. Struct: if there is an attribute of businessactioncontext type and it is not nil, return it +// 4. Struct: if there is an attribute of businessactioncontext enum and it is not nil, return it // 5. else: create new BusinessActionContext func (t *TCCServiceProxy) getOrCreateBusinessActionContext(params interface{}) *tm.BusinessActionContext { if params == nil { @@ -222,7 +227,7 @@ func (t *TCCServiceProxy) getOrCreateBusinessActionContext(params interface{}) * return &tm.BusinessActionContext{} } -// obtainStructValueType check o is struct or pointer type +// obtainStructValueType check o is struct or pointer enum func obtainStructValueType(o interface{}) (bool, reflect.Value, reflect.Type) { v := reflect.ValueOf(o) t := reflect.TypeOf(o) diff --git a/pkg/tm/context.go b/pkg/tm/context.go index 3d8053ddd..9c14eb6cc 100644 --- a/pkg/tm/context.go +++ b/pkg/tm/context.go @@ -21,6 +21,7 @@ import ( "context" "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" ) type ContextParam string @@ -42,6 +43,7 @@ type ContextVariable struct { TxName string Xid string XidCopy string + FencePhase enum.FencePhase TxRole *GlobalTransactionRole BusinessActionContext *BusinessActionContext TxStatus *message.GlobalStatus @@ -157,3 +159,18 @@ func UnbindXid(ctx context.Context) { variable.(*ContextVariable).XidCopy = "" } } + +func SetFencePhase(ctx context.Context, phase enum.FencePhase) { + variable := ctx.Value(seataContextVariable) + if variable != nil { + variable.(*ContextVariable).FencePhase = phase + } +} + +func GetFencePhase(ctx context.Context) enum.FencePhase { + variable := ctx.Value(seataContextVariable) + if variable != nil { + return variable.(*ContextVariable).FencePhase + } + return enum.FencePhaseNotExist +} diff --git a/pkg/tm/context_test.go b/pkg/tm/context_test.go index f280f3b1d..adc6776db 100644 --- a/pkg/tm/context_test.go +++ b/pkg/tm/context_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/seata/seata-go/pkg/protocol/message" + "github.com/seata/seata-go/pkg/rm/tcc/fence/enum" ) func TestInitSeataContext(t *testing.T) { @@ -134,3 +135,19 @@ func TestUnbindXid(t *testing.T) { UnbindXid(ctx) assert.Empty(t, GetXID(ctx)) } + +func TestSetFencePhase(t *testing.T) { + ctx := InitSeataContext(context.Background()) + phase := enum.FencePhaseCommit + SetFencePhase(ctx, phase) + assert.Equal(t, phase, + ctx.Value(seataContextVariable).(*ContextVariable).FencePhase) +} + +func TestGetFencePhase(t *testing.T) { + ctx := InitSeataContext(context.Background()) + phase := enum.FencePhaseCommit + SetFencePhase(ctx, phase) + assert.Equal(t, phase, + GetFencePhase(ctx)) +} diff --git a/sample/tcc/fence/README_ZH.md b/sample/tcc/fence/README_ZH.md new file mode 100644 index 000000000..c38502d15 --- /dev/null +++ b/sample/tcc/fence/README_ZH.md @@ -0,0 +1,9 @@ +## 用例介绍 +此用例介绍如何在tcc本地模式下使用防悬挂功能 + +## 使用步骤 + +- 在您的数据库中使用``./sample/tcc/fence/script/mysql.sql``脚本创建防悬挂所需的日志记录表,如果您使用的是其他数据库则运行对应数据库的脚本文件。 +- 在``./sample/tcc/fence/service/service.go``中修改数据库驱动名为对应数据库类型并引入相关驱动包,mysql无需修改。此外需要注意用户名和密码是否正确。 +- 启动``seata tc server`` +- 使用以下命令运行用例``go run ./sample/tcc/fence/cmd/main.go`` \ No newline at end of file diff --git a/sample/tcc/fence/cmd/main.go b/sample/tcc/fence/cmd/main.go new file mode 100644 index 000000000..1f5d478c4 --- /dev/null +++ b/sample/tcc/fence/cmd/main.go @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + + _ "github.com/go-sql-driver/mysql" + + "github.com/seata/seata-go/pkg/client" + "github.com/seata/seata-go/pkg/common/log" + "github.com/seata/seata-go/pkg/tm" + "github.com/seata/seata-go/sample/tcc/fence/service" +) + +func main() { + client.Init() + var err error + ctx := tm.Begin(context.Background(), "TestTCCServiceBusiness") + defer func() { + resp := tm.CommitOrRollback(ctx, err == nil) + log.Infof("tx result %v", resp) + <-make(chan struct{}) + }() + + tccService := service.NewTestTCCServiceBusinessProxy() + tccService2 := service.NewTestTCCServiceBusiness2Proxy() + + _, err = tccService.Prepare(ctx, 1) + if err != nil { + log.Errorf("TestTCCServiceBusiness prepare error, %v", err.Error()) + return + } + _, err = tccService2.Prepare(ctx, 3) + if err != nil { + log.Errorf("TestTCCServiceBusiness2 prepare error, %v", err.Error()) + return + } +} diff --git a/sample/tcc/fence/script/mysql.sql b/sample/tcc/fence/script/mysql.sql new file mode 100644 index 000000000..863834867 --- /dev/null +++ b/sample/tcc/fence/script/mysql.sql @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- -------------------------------- The script use tcc fence -------------------------------- +CREATE TABLE IF NOT EXISTS `tcc_fence_log` +( + `xid` VARCHAR(128) NOT NULL COMMENT 'global id', + `branch_id` BIGINT NOT NULL COMMENT 'branch id', + `action_name` VARCHAR(64) NOT NULL COMMENT 'action name', + `status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)', + `gmt_create` DATETIME(3) NOT NULL COMMENT 'create time', + `gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time', + PRIMARY KEY (`xid`, `branch_id`), + KEY `idx_gmt_modified` (`gmt_modified`), + KEY `idx_status` (`status`) +) ENGINE = InnoDB +DEFAULT CHARSET = utf8mb4; \ No newline at end of file diff --git a/sample/tcc/fence/script/oracle.sql b/sample/tcc/fence/script/oracle.sql new file mode 100644 index 000000000..7157874cd --- /dev/null +++ b/sample/tcc/fence/script/oracle.sql @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- -------------------------------- The script used for tcc fence -------------------------------- +CREATE TABLE tcc_fence_log +( + xid VARCHAR2(128) NOT NULL, + branch_id NUMBER(19) NOT NULL, + action_name VARCHAR2(64) NOT NULL, + status NUMBER(3) NOT NULL, + gmt_create TIMESTAMP(3) NOT NULL, + gmt_modified TIMESTAMP(3) NOT NULL, + PRIMARY KEY (xid, branch_id) +); +CREATE INDEX idx_gmt_modified ON tcc_fence_log (gmt_modified); +CREATE INDEX idx_status ON tcc_fence_log (status); \ No newline at end of file diff --git a/sample/tcc/fence/script/postgresql.sql b/sample/tcc/fence/script/postgresql.sql new file mode 100644 index 000000000..88137c6cf --- /dev/null +++ b/sample/tcc/fence/script/postgresql.sql @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- -------------------------------- The script used for tcc fence -------------------------------- +CREATE TABLE IF NOT EXISTS public.tcc_fence_log +( + xid VARCHAR(128) NOT NULL, + branch_id BIGINT NOT NULL, + action_name VARCHAR(64) NOT NULL, + status SMALLINT NOT NULL, + gmt_create TIMESTAMP(3) NOT NULL, + gmt_modified TIMESTAMP(3) NOT NULL, + CONSTRAINT pk_tcc_fence_log PRIMARY KEY (xid, branch_id) +); +CREATE INDEX idx_gmt_modified ON public.tcc_fence_log (gmt_modified); +CREATE INDEX idx_status ON public.tcc_fence_log (status); \ No newline at end of file diff --git a/sample/tcc/fence/service/service.go b/sample/tcc/fence/service/service.go new file mode 100644 index 000000000..360412882 --- /dev/null +++ b/sample/tcc/fence/service/service.go @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "context" + "database/sql" + "fmt" + "sync" + + "github.com/seata/seata-go/pkg/common/log" + "github.com/seata/seata-go/pkg/rm/tcc" + "github.com/seata/seata-go/pkg/rm/tcc/fence" + "github.com/seata/seata-go/pkg/tm" +) + +const ( + DriverName = "mysql" + Url = "root:root@tcp(127.0.0.1:3306)/seata?charset=utf8&parseTime=True" +) + +var ( + tccService *tcc.TCCServiceProxy + tccServiceOnce sync.Once + + tccService2 *tcc.TCCServiceProxy + tccService2Once sync.Once + + commitTimes int + commitFenceTimes int + + rollbackTimes int + rollbackFenceTimes int +) + +type TestTCCServiceBusiness struct { +} + +func NewTestTCCServiceBusinessProxy() *tcc.TCCServiceProxy { + if tccService != nil { + return tccService + } + tccServiceOnce.Do(func() { + var err error + tccService, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness{}) + if err != nil { + panic(fmt.Errorf("get TestTCCServiceBusiness tcc service proxy error, %v", err.Error())) + } + }) + return tccService +} + +func (T TestTCCServiceBusiness) Prepare(ctx context.Context, params interface{}) (b bool, err error) { + db, err := sql.Open(DriverName, Url) + if err != nil { + return false, fmt.Errorf("database connect failed, msg :%s", err.Error()) + } + defer db.Close() + tx, err := db.Begin() + if err != nil { + return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error()) + } + + defer func() { + if err != nil { + err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback()) + return + } + b, err = true, tx.Commit() + }() + + err = fence.WithFence(ctx, tx, func() error { + log.Infof("TestTCCServiceBusiness Prepare, param %v", params) + return nil + }) + + return +} + +func (T TestTCCServiceBusiness) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) { + db, err := sql.Open(DriverName, Url) + if err != nil { + return false, fmt.Errorf("database connect failed, msg :%s", err.Error()) + } + defer db.Close() + tx, err := db.Begin() + if err != nil { + return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error()) + } + + defer func() { + if err != nil { + err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback()) + return + } + b, err = true, tx.Commit() + }() + + err = fence.WithFence(ctx, tx, func() error { + log.Infof("TestTCCServiceBusiness Commit, param %v", businessActionContext) + return nil + }) + + return +} + +func (T TestTCCServiceBusiness) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) { + db, err := sql.Open(DriverName, Url) + if err != nil { + return false, fmt.Errorf("database connect failed, msg :%s", err.Error()) + } + defer db.Close() + tx, err := db.Begin() + if err != nil { + return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error()) + } + + defer func() { + if err != nil { + err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback()) + return + } + b, err = true, tx.Commit() + }() + + err = fence.WithFence(ctx, tx, func() error { + log.Infof("TestTCCServiceBusiness Rollback, param %v", businessActionContext) + return nil + }) + + return +} + +func (T TestTCCServiceBusiness) GetActionName() string { + return "TestTCCServiceBusiness" +} + +type TestTCCServiceBusiness2 struct { +} + +func NewTestTCCServiceBusiness2Proxy() *tcc.TCCServiceProxy { + if tccService2 != nil { + return tccService2 + } + tccService2Once.Do(func() { + var err error + tccService2, err = tcc.NewTCCServiceProxy(&TestTCCServiceBusiness2{}) + if err != nil { + panic(fmt.Errorf("TestTCCServiceBusiness2 get tcc service proxy error, %v", err.Error())) + } + if err != nil { + panic(fmt.Errorf("TestTCCServiceBusiness2 register resource error, %v", err.Error())) + } + }) + return tccService2 +} + +func (T TestTCCServiceBusiness2) Prepare(ctx context.Context, params interface{}) (b bool, err error) { + db, err := sql.Open(DriverName, Url) + if err != nil { + return false, fmt.Errorf("database connect failed, msg :%s", err.Error()) + } + defer db.Close() + tx, err := db.Begin() + if err != nil { + return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error()) + } + + defer func() { + if err != nil { + err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback()) + return + } + b, err = true, tx.Commit() + }() + + err = fence.WithFence(ctx, tx, func() error { + log.Infof("TestTCCServiceBusiness2 Prepare, param %v", params) + return nil + }) + + return +} + +func (T TestTCCServiceBusiness2) Commit(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) { + db, err := sql.Open(DriverName, Url) + if err != nil { + return false, fmt.Errorf("database connect failed, msg :%s", err.Error()) + } + defer db.Close() + tx, err := db.Begin() + if err != nil { + return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error()) + } + + defer func() { + if err != nil { + err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback()) + return + } + b, err = true, tx.Commit() + }() + + err = fence.WithFence(ctx, tx, func() error { + log.Infof("TestTCCServiceBusiness2 Commit, param %v", businessActionContext) + return nil + }) + + return +} + +func (T TestTCCServiceBusiness2) Rollback(ctx context.Context, businessActionContext *tm.BusinessActionContext) (b bool, err error) { + db, err := sql.Open(DriverName, Url) + if err != nil { + return false, fmt.Errorf("database connect failed, msg :%s", err.Error()) + } + defer db.Close() + tx, err := db.Begin() + if err != nil { + return false, fmt.Errorf("transaction begin failed, msg :%s", err.Error()) + } + + defer func() { + if err != nil { + err = fmt.Errorf("business method throw error: %s, rollback result: %s", err, tx.Rollback()) + return + } + b, err = true, tx.Commit() + }() + + err = fence.WithFence(ctx, tx, func() error { + log.Infof("TestTCCServiceBusiness2 Rollback, param %v", businessActionContext) + return nil + }) + + return +} + +func (T TestTCCServiceBusiness2) GetActionName() string { + return "TestTCCServiceBusiness2" +}