Skip to content

Commit

Permalink
feat(topology): creating Topology object based on grouped vNodes
Browse files Browse the repository at this point in the history
  • Loading branch information
yahortsaryk committed Aug 15, 2023
1 parent e391485 commit 2dc24e5
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 27 deletions.
10 changes: 5 additions & 5 deletions contract/pkg/mock/ddc_bucket_contract_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ type (
}
)

func mapSourceToDestination(vNodes []NodeVNodes) []bucket.NodeVNodesInfo {
func mapNodesVNodes(nodes []NodeVNodes) []bucket.NodeVNodesInfo {
var nodesVNodes []bucket.NodeVNodesInfo
for _, item := range vNodes {
for _, node := range nodes {
nodeVNodes := bucket.NodeVNodesInfo{
NodeKey: item.NodeKey,
VNodes: item.VNodes,
NodeKey: node.NodeKey,
VNodes: node.VNodes,
}
nodesVNodes = append(nodesVNodes, nodeVNodes)
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func (d *ddcBucketContractMock) ClusterGet(clusterId uint32) (*bucket.ClusterSta
Revenues: types.NewU128(*big.NewInt(1)),
TotalRent: types.NewU128(*big.NewInt(1)),
},
VNodes: mapSourceToDestination(cluster.VNodes),
VNodes: mapNodesVNodes(cluster.VNodes),
}, nil
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/pkg/topology/sync/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type ring struct {
mutex sync.RWMutex
}

func NewTopology(nodeKeys []string, vNodes [][]uint64, replicaFactor uint) topology.Ring {
func NewTopology(nodes topology.NodesVNodes, replicaFactor uint) topology.Ring {
return &ring{
ring: topology.NewTopology(nodeKeys, vNodes, replicaFactor),
ring: topology.NewTopology(nodes, replicaFactor),
}
}

Expand Down
17 changes: 13 additions & 4 deletions core/pkg/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,25 @@ type (
}
)

func NewTopology(NodesKeys []string, vNodes [][]uint64, replicaFactor uint) Ring {
type (
NodeVNodes struct {
nodeKey string
vNodes []uint64
}

NodesVNodes = []NodeVNodes
)

func NewTopology(nodes NodesVNodes, replicaFactor uint) Ring {
if replicaFactor == 0 {
replicaFactor = 1
}

topologyVNodes := make([]VNode, 0)
for i, nodeKey := range NodesKeys {
for _, token := range vNodes[i] {
for _, node := range nodes {
for _, token := range node.vNodes {
topologyVNode := VNode{
nodeKey: nodeKey,
nodeKey: node.nodeKey,
token: token,
}

Expand Down
112 changes: 96 additions & 16 deletions core/pkg/topology/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,111 @@ var NodeKey7 = "be5ddb1579b72e84524fc29e78609e3caf42e85aa118ebfe0b0ad404b5bdd25f

var clusters = []struct {
name string
nodeKeys []string
vNodes [][]uint64
nodesVNodes NodesVNodes
replicaFactor uint
}{
{"size 2 replication 3", []string{NodeKey1, NodeKey2}, [][]uint64{{9223372036854775806, 3074457345618258602, 15372286728091293010}, {12297829382473034408, 6148914691236517204, 18446744073709551612}}, 2},
{"size 3 replication 3", []string{NodeKey1, NodeKey2, NodeKey3}, [][]uint64{{12297829382473034408, 3074457345618258602}, {6148914691236517204, 15372286728091293010}, {18446744073709551612, 9223372036854775806}}, 3},
{"size 4 replication 3", []string{NodeKey1, NodeKey2, NodeKey3, NodeKey4}, [][]uint64{{12297829382473034408, 3074457345618258602}, {6148914691236517204, 15372286728091293010}, {18446744073709551612, 9223372036854775806}, {4611686018427387903}}, 3},
{"size 2 replication 1", []string{NodeKey1, NodeKey2}, [][]uint64{{9223372036854775806, 3074457345618258602, 15372286728091293010}, {12297829382473034408, 6148914691236517204, 18446744073709551612}}, 1},
{"size 3 replication 1", []string{NodeKey1, NodeKey2, NodeKey3}, [][]uint64{{12297829382473034408, 3074457345618258602}, {6148914691236517204, 15372286728091293010}, {18446744073709551612, 9223372036854775806}}, 1},
{
"size 2 replication 3",
NodesVNodes{
NodeVNodes{
nodeKey: NodeKey1,
vNodes: []uint64{9223372036854775806, 3074457345618258602, 15372286728091293010},
},
NodeVNodes{
nodeKey: NodeKey2,
vNodes: []uint64{12297829382473034408, 6148914691236517204, 18446744073709551612},
},
},
2,
},
{
"size 3 replication 3",
NodesVNodes{
NodeVNodes{
nodeKey: NodeKey1,
vNodes: []uint64{12297829382473034408, 3074457345618258602},
},
NodeVNodes{
nodeKey: NodeKey2,
vNodes: []uint64{6148914691236517204, 15372286728091293010},
},
NodeVNodes{
nodeKey: NodeKey3,
vNodes: []uint64{18446744073709551612, 9223372036854775806},
},
},
3,
},
{
"size 4 replication 3",
NodesVNodes{
NodeVNodes{
nodeKey: NodeKey1,
vNodes: []uint64{12297829382473034408, 3074457345618258602},
},
NodeVNodes{
nodeKey: NodeKey2,
vNodes: []uint64{6148914691236517204, 15372286728091293010},
},
NodeVNodes{
nodeKey: NodeKey3,
vNodes: []uint64{18446744073709551612, 9223372036854775806},
},
NodeVNodes{
nodeKey: NodeKey4,
vNodes: []uint64{4611686018427387903},
},
},
3,
},
{
"size 2 replication 1",
NodesVNodes{
NodeVNodes{
nodeKey: NodeKey1,
vNodes: []uint64{9223372036854775806, 3074457345618258602, 15372286728091293010},
},
NodeVNodes{
nodeKey: NodeKey2,
vNodes: []uint64{12297829382473034408, 6148914691236517204, 18446744073709551612},
},
},
1,
},
{
"size 3 replication 1",
NodesVNodes{
NodeVNodes{
nodeKey: NodeKey1,
vNodes: []uint64{12297829382473034408, 3074457345618258602},
},
NodeVNodes{
nodeKey: NodeKey2,
vNodes: []uint64{6148914691236517204, 15372286728091293010},
},
NodeVNodes{
nodeKey: NodeKey3,
vNodes: []uint64{18446744073709551612, 9223372036854775806},
},
},
1,
},
}

func TestTokens(t *testing.T) {
for _, test := range clusters {
t.Run(test.name, func(t *testing.T) {
//given
testSubject := NewTopology(test.nodeKeys, test.vNodes, test.replicaFactor)
testSubject := NewTopology(test.nodesVNodes, test.replicaFactor)

for i, nodeKey := range test.nodeKeys {
for _, node := range test.nodesVNodes {
//when
tokens := testSubject.Tokens(nodeKey)
tokens := testSubject.Tokens(node.nodeKey)

//then
assert.True(t, sort.SliceIsSorted(tokens, func(i, j int) bool { return tokens[i] < tokens[j] }))

expected := test.vNodes[i]
expected := node.vNodes
assert.Len(t, tokens, len(expected))
for _, value := range expected {
assert.Contains(t, tokens, value)
Expand Down Expand Up @@ -70,7 +150,7 @@ func TestNeighbours(t *testing.T) {
cluster := clusters[test.clusterId]
t.Run(cluster.name, func(t *testing.T) {
//given
testSubject := NewTopology(cluster.nodeKeys, cluster.vNodes, cluster.replicaFactor)
testSubject := NewTopology(cluster.nodesVNodes, cluster.replicaFactor)

//when
prev, next := testSubject.Neighbours(test.token)
Expand Down Expand Up @@ -103,7 +183,7 @@ func TestReplicas(t *testing.T) {
cluster := clusters[test.clusterId]
t.Run(cluster.name, func(t *testing.T) {
//given
testSubject := NewTopology(cluster.nodeKeys, cluster.vNodes, cluster.replicaFactor)
testSubject := NewTopology(cluster.nodesVNodes, cluster.replicaFactor)

//when
replicas := testSubject.Replicas(test.token)
Expand Down Expand Up @@ -188,7 +268,7 @@ func TestPartitions(t *testing.T) {
cluster := clusters[test.clusterId]
t.Run(cluster.name, func(t *testing.T) {
//given
testSubject := NewTopology(cluster.nodeKeys, cluster.vNodes, cluster.replicaFactor)
testSubject := NewTopology(cluster.nodesVNodes, cluster.replicaFactor)

//when
partitions := testSubject.Partitions(test.nodeKey)
Expand Down Expand Up @@ -223,7 +303,7 @@ func TestExcessPartitions(t *testing.T) {
cluster := clusters[test.clusterId]
t.Run(cluster.name, func(t *testing.T) {
//given
testSubject := NewTopology(cluster.nodeKeys, cluster.vNodes, cluster.replicaFactor)
testSubject := NewTopology(cluster.nodesVNodes, cluster.replicaFactor)

//when
partitions := testSubject.ExcessPartitions(test.nodeKey)
Expand Down Expand Up @@ -253,7 +333,7 @@ func TestRemoveVNode(t *testing.T) {
cluster := clusters[test.clusterId]
t.Run(cluster.name, func(t *testing.T) {
//given
testSubject := NewTopology(cluster.nodeKeys, cluster.vNodes, cluster.replicaFactor)
testSubject := NewTopology(cluster.nodesVNodes, cluster.replicaFactor)

//when
ok := testSubject.RemoveVNode(test.token)
Expand Down

0 comments on commit 2dc24e5

Please sign in to comment.