Skip to content

Commit

Permalink
optimize named for the resource manager api and tcc resource, adjust … (
Browse files Browse the repository at this point in the history
apache#125)

update optimize the resource manager.
  • Loading branch information
106umao authored Jul 23, 2022
1 parent ed33bad commit d356756
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 136 deletions.
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ require (
dubbo.apache.org/dubbo-go/v3 v3.0.2-0.20220508105316-b27ec53b7bab
github.com/BurntSushi/toml v1.1.0 // indirect
github.com/apache/dubbo-getty v1.4.8
github.com/apache/dubbo-go-hessian2 v1.11.0
github.com/dubbogo/gost v1.12.5
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
golang.org/x/tools v0.1.11 // indirect
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
)
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zouyx/agollo/v3 v3.4.5 h1:7YCxzY9ZYaH9TuVUBvmI6Tk0mwMggikah+cfbYogcHQ=
Expand Down Expand Up @@ -841,7 +840,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 h1:0es+/5331RGQPcXlMfP+WrnIIS6dNnNRe0WB02W0F4M=
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
Expand Down Expand Up @@ -881,8 +879,6 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20180530234432-1e491301e022/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -927,7 +923,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210917221730-978cfadd31cf/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211105192438-b53810dc28af/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 h1:CIJ76btIcR3eFI5EgSo6k1qKw9KJexJuRLI9G7Hp5wE=
Expand Down Expand Up @@ -1017,7 +1012,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211106132015-ebca88c72f68/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211117180635-dee7805ff2e1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220111092808-5a964db01320/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -1099,8 +1093,6 @@ golang.org/x/tools v0.0.0-20201014170642-d1624618ad65/go.mod h1:z6u4i615ZeAfBE4X
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.11 h1:loJ25fNOEhSXfHrpoGj91eCUThwdNX6u24rO1xnNteY=
golang.org/x/tools v0.1.11/go.mod h1:SgwaegtQh8clINPpECJMqnxLv9I09HLqnW3RMqW0CA4=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *rmBranchCommitProcessor) Process(ctx context.Context, rpcMessage messag
applicationData := request.ApplicationData
log.Infof("Branch committing: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchCommit(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("branch commit error: %s", err.Error())
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestRmBranchCommitProcessor(t *testing.T) {
var ctx context.Context
var rbcProcessor rmBranchCommitProcessor

rm.GetResourceManagerInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())
rm.GetRmCacheInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())

// run tests
for _, tc := range tests {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *rmBranchRollbackProcessor) Process(ctx context.Context, rpcMessage mess
applicationData := request.ApplicationData
log.Infof("Branch rollback request: xid %s, branchID %s, resourceID %s, applicationData %s", xid, branchID, resourceID, applicationData)

status, err := rm.GetResourceManagerInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
status, err := rm.GetRmCacheInstance().GetResourceManager(request.BranchType).BranchRollback(ctx, request.BranchType, xid, branchID, resourceID, applicationData)
if err != nil {
log.Infof("branch rollback error: %s", err.Error())
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestRmBranchRollbackProcessor(t *testing.T) {
var ctx context.Context
var rbrProcessor rmBranchRollbackProcessor

rm.GetResourceManagerInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())
rm.GetRmCacheInstance().RegisterResourceManager(tcc.GetTCCResourceManagerInstance())

// run tests
for _, tc := range tests {
Expand Down
105 changes: 0 additions & 105 deletions pkg/rm/resource_manager.go

This file was deleted.

6 changes: 3 additions & 3 deletions pkg/protocol/resource/resource.go → pkg/rm/rm_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package resource
package rm

import (
"context"
Expand Down Expand Up @@ -59,11 +59,11 @@ type ResourceManager interface {
// Unregister a Resource from the Resource Manager
UnregisterResource(resource Resource) error
// Get all resources managed by this manager
GetManagedResources() *sync.Map
GetCachedResources() *sync.Map
// Get the BranchType
GetBranchType() branch.BranchType
}

type ResourceManagerGetter interface {
GetResourceManager() ResourceManager
GetResourceManager(branchType branch.BranchType) ResourceManager
}
57 changes: 57 additions & 0 deletions pkg/rm/rm_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rm

import (
"fmt"
"sync"

"github.com/seata/seata-go/pkg/protocol/branch"
)

var (
// singletone ResourceManagerCache
rmCacheInstance *ResourceManagerCache
onceRMFacade = &sync.Once{}
)

func GetRmCacheInstance() *ResourceManagerCache {
if rmCacheInstance == nil {
onceRMFacade.Do(func() {
rmCacheInstance = &ResourceManagerCache{}
})
}
return rmCacheInstance
}

type ResourceManagerCache struct {
// BranchType -> ResourceManagerCache
resourceManagerMap sync.Map
}

func (d *ResourceManagerCache) RegisterResourceManager(resourceManager ResourceManager) {
d.resourceManagerMap.Store(resourceManager.GetBranchType(), resourceManager)
}

func (d *ResourceManagerCache) GetResourceManager(branchType branch.BranchType) ResourceManager {
rm, ok := d.resourceManagerMap.Load(branchType)
if !ok {
panic(fmt.Sprintf("No ResourceManagerCache for BranchType: %v", branchType))
}
return rm.(ResourceManager)
}
4 changes: 1 addition & 3 deletions pkg/rm/rm_remoting.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package rm
import (
"sync"

"github.com/seata/seata-go/pkg/protocol/resource"

"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
Expand Down Expand Up @@ -84,7 +82,7 @@ func (RMRemoting) LockQuery(branchType branch.BranchType, resourceId, xid, lockK
return false, nil
}

func (r *RMRemoting) RegisterResource(resource resource.Resource) error {
func (r *RMRemoting) RegisterResource(resource Resource) error {
req := message.RegisterRMRequest{
AbstractIdentifyRequest: message.AbstractIdentifyRequest{
//todo replace with config
Expand Down
13 changes: 5 additions & 8 deletions pkg/rm/tcc/tcc_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,9 @@ import (

"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"

"github.com/seata/seata-go/pkg/protocol/resource"
"github.com/seata/seata-go/pkg/tm"

"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/tm"
)

var (
Expand Down Expand Up @@ -71,7 +68,7 @@ func (t *TCCResource) GetBranchType() branch.BranchType {
}

func init() {
rm.GetResourceManagerInstance().RegisterResourceManager(GetTCCResourceManagerInstance())
rm.GetRmCacheInstance().RegisterResourceManager(GetTCCResourceManagerInstance())
}

func GetTCCResourceManagerInstance() *TCCResourceManager {
Expand Down Expand Up @@ -107,20 +104,20 @@ func (t *TCCResourceManager) LockQuery(ctx context.Context, ranchType branch.Bra
panic("implement me")
}

func (t *TCCResourceManager) UnregisterResource(resource resource.Resource) error {
func (t *TCCResourceManager) UnregisterResource(resource rm.Resource) error {
//TODO implement me
panic("implement me")
}

func (t *TCCResourceManager) RegisterResource(resource resource.Resource) error {
func (t *TCCResourceManager) RegisterResource(resource rm.Resource) error {
if _, ok := resource.(*TCCResource); !ok {
panic(fmt.Sprintf("register tcc resource error, TCCResource is needed, param %v", resource))
}
t.resourceManagerMap.Store(resource.GetResourceId(), resource)
return t.rmRemoting.RegisterResource(resource)
}

func (t *TCCResourceManager) GetManagedResources() *sync.Map {
func (t *TCCResourceManager) GetCachedResources() *sync.Map {
return &t.resourceManagerMap
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/rm/tcc/tcc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@ import (

"github.com/seata/seata-go/pkg/common/types"

"github.com/seata/seata-go/pkg/tm"

"github.com/pkg/errors"
"github.com/seata/seata-go/pkg/common"
"github.com/seata/seata-go/pkg/common/log"
"github.com/seata/seata-go/pkg/common/net"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/rm"
"github.com/seata/seata-go/pkg/tm"
)

type TCCServiceProxy struct {
Expand All @@ -56,7 +55,7 @@ func NewTCCServiceProxy(service interface{}) (*TCCServiceProxy, error) {
func (t *TCCServiceProxy) RegisterResource() error {
var err error
t.registerResourceOnce.Do(func() {
err = rm.GetResourceManagerInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)
err = rm.GetRmCacheInstance().GetResourceManager(branch.BranchTypeTCC).RegisterResource(t.TCCResource)
if err != nil {
log.Errorf("NewTCCServiceProxy RegisterResource error: %#v", err.Error())
}
Expand Down

0 comments on commit d356756

Please sign in to comment.