Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
Iunyasha committed Dec 16, 2022
2 parents 4b563fe + 382cb5c commit a7aee56
Show file tree
Hide file tree
Showing 11 changed files with 791 additions and 3 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
google.golang.org/grpc v1.49.0
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
gotest.tools v2.2.0+incompatible
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
)

Expand Down Expand Up @@ -73,6 +74,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1425,6 +1425,7 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
2 changes: 1 addition & 1 deletion pkg/datasource/sql/conn_at_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func initAtConnTestResource(t *testing.T) (*gomock.Controller, *sql.DB, *mockSQL
mockConn := mock.NewMockTestDriverConn(ctrl)
mockConn.EXPECT().Begin().AnyTimes().Return(mockTx, nil)
mockConn.EXPECT().BeginTx(gomock.Any(), gomock.Any()).AnyTimes().Return(mockTx, nil)
baseMoclConn(mockConn)
baseMockConn(mockConn)

connector := mock.NewMockTestDriverConnector(ctrl)
connector.EXPECT().Connect(gomock.Any()).AnyTimes().Return(mockConn, nil)
Expand Down
4 changes: 2 additions & 2 deletions pkg/datasource/sql/conn_xa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (mi *mockTxHook) BeforeRollback(tx *Tx) {
}
}

func baseMoclConn(mockConn *mock.MockTestDriverConn) {
func baseMockConn(mockConn *mock.MockTestDriverConn) {
mockConn.EXPECT().ExecContext(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(&driver.ResultNoRows, nil)
mockConn.EXPECT().Exec(gomock.Any(), gomock.Any()).AnyTimes().Return(&driver.ResultNoRows, nil)
mockConn.EXPECT().ResetSession(gomock.Any()).AnyTimes().Return(nil)
Expand All @@ -103,7 +103,7 @@ func initXAConnTestResource(t *testing.T) (*gomock.Controller, *sql.DB, *mockSQL
mockConn := mock.NewMockTestDriverConn(ctrl)
mockConn.EXPECT().Begin().AnyTimes().Return(mockTx, nil)
mockConn.EXPECT().BeginTx(gomock.Any(), gomock.Any()).AnyTimes().Return(mockTx, nil)
baseMoclConn(mockConn)
baseMockConn(mockConn)

connector := mock.NewMockTestDriverConnector(ctrl)
connector.EXPECT().Connect(gomock.Any()).AnyTimes().Return(mockConn, nil)
Expand Down
171 changes: 171 additions & 0 deletions pkg/datasource/sql/exec/mysql_xa_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package exec

import (
"context"
"database/sql/driver"
"fmt"
"io"
"strings"
"time"

"github.com/pkg/errors"
)

type MysqlXAConn struct {
driver.Conn
}

func (c *MysqlXAConn) Commit(xid string, onePhase bool) error {
var sb strings.Builder
sb.WriteString("XA COMMIT ")
sb.WriteString(xid)
if onePhase {
sb.WriteString(" ONE PHASE")
}

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(context.TODO(), sb.String(), nil)
return err
}

func (c *MysqlXAConn) End(xid string, flags int) error {
var sb strings.Builder
sb.WriteString("XA END ")
sb.WriteString(xid)

switch flags {
case TMSUCCESS:
break
case TMSUSPEND:
sb.WriteString(" SUSPEND")
break
case TMFAIL:
break
default:
return errors.New("invalid arguments")
}

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(context.TODO(), sb.String(), nil)
return err
}

func (c *MysqlXAConn) Forget(xid string) error {
// mysql doesn't support this
return errors.New("mysql doesn't support this")
}

func (c *MysqlXAConn) GetTransactionTimeout() time.Duration {
return 0
}

// IsSameRM is called to determine if the resource manager instance represented by the target object
// is the same as the resource manager instance represented by the parameter xares.
func (c *MysqlXAConn) IsSameRM(xares XAResource) bool {
// todo: the fn depends on the driver.Conn, but it doesn't support
return false
}

func (c *MysqlXAConn) XAPrepare(xid string) error {
var sb strings.Builder
sb.WriteString("XA PREPARE ")
sb.WriteString(xid)

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(context.TODO(), sb.String(), nil)
return err
}

// Recover Obtains a list of prepared transaction branches from a resource manager.
// The transaction manager calls this method during recovery to obtain the list of transaction branches
// that are currently in prepared or heuristically completed states.
func (c *MysqlXAConn) Recover(flag int) (xids []string, err error) {
startRscan := (flag & TMSTARTRSCAN) > 0
endRscan := (flag & TMENDRSCAN) > 0

if !startRscan && !endRscan && flag != TMNOFLAGS {
return nil, errors.New("invalid arguments")
}

if !startRscan {
return nil, nil
}

conn := c.Conn.(driver.QueryerContext)
res, err := conn.QueryContext(context.TODO(), "XA RECOVER", nil)
if err != nil {
return nil, err
}

dest := make([]driver.Value, 4)
for true {
if err = res.Next(dest); err != nil {
if err == io.EOF {
return xids, nil
}
return nil, err
}
gtridAndbqual, ok := dest[3].(string)
if !ok {
return nil, errors.New("the protocol of XA RECOVER statement is error")
}
fmt.Printf("gtr: %v", gtridAndbqual)

xids = append(xids, string(gtridAndbqual))
}
return xids, err
}

func (c *MysqlXAConn) Rollback(xid string) error {
var sb strings.Builder
sb.WriteString("XA ROLLBACK ")
sb.WriteString(xid)

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(context.TODO(), sb.String(), nil)
return err
}

func (c *MysqlXAConn) SetTransactionTimeout(duration time.Duration) bool {
return false
}

func (c *MysqlXAConn) Start(xid string, flags int) error {
var sb strings.Builder
sb.WriteString("XA START")
sb.WriteString(xid)

switch flags {
case TMJOIN:
sb.WriteString(" JOIN")
break
case TMRESUME:
sb.WriteString(" RESUME")
break
case TMNOFLAGS:
break
default:
return errors.New("invalid arguments")
}

conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(context.TODO(), sb.String(), nil)
return err
}
Loading

0 comments on commit a7aee56

Please sign in to comment.