Skip to content

Commit

Permalink
StickyContextNextNodeBalanced method
Browse files Browse the repository at this point in the history
  • Loading branch information
xssnick committed Jun 13, 2024
1 parent f0539e0 commit 528b87a
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 4 deletions.
6 changes: 5 additions & 1 deletion liteclient/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,17 @@ func Test_ConnSticky(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
ctx = client.StickyContext(ctx)

err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/global.config.json")
if err != nil {
t.Fatal("add connections err", err)
}

ctx, err = client.StickyContextNextNodeBalanced(ctx)
if err != nil {
t.Fatal("next balanced err", err)
}

doReq := func(expErr error) {
var resp tl.Serializable
err := client.QueryLiteserver(ctx, GetMasterchainInf{}, &resp)
Expand Down
44 changes: 43 additions & 1 deletion liteclient/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const _StickyCtxUsedNodesKey = "_ton_used_nodes_sticky"
var (
ErrNoActiveConnections = errors.New("no active connections")
ErrADNLReqTimeout = errors.New("adnl request timeout")
ErrNoNodesLeft = errors.New("no more active nodes left")
)

type OnDisconnectCallback func(addr, key string)
Expand Down Expand Up @@ -97,6 +98,7 @@ func (c *ConnectionPool) StickyContext(ctx context.Context) context.Context {
return context.WithValue(ctx, _StickyCtxKey, id)
}

// StickyContextNextNode - select next node in the available list (pseudo random)
func (c *ConnectionPool) StickyContextNextNode(ctx context.Context) (context.Context, error) {
nodeID, _ := ctx.Value(_StickyCtxKey).(uint32)
usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32)
Expand All @@ -118,7 +120,47 @@ iter:
return context.WithValue(context.WithValue(ctx, _StickyCtxKey, node.id), _StickyCtxUsedNodesKey, usedNodes), nil
}

return ctx, fmt.Errorf("no more active nodes left")
return ctx, ErrNoNodesLeft
}

// StickyContextNextNodeBalanced - select next node based on its weight and availability
func (c *ConnectionPool) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) {
nodeID, _ := ctx.Value(_StickyCtxKey).(uint32)
usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32)
if nodeID > 0 {
usedNodes = append(usedNodes, nodeID)
}

c.nodesMx.RLock()
defer c.nodesMx.RUnlock()

var reqNode *connection

iter:
for _, node := range c.activeNodes {
for _, usedNode := range usedNodes {
if usedNode == node.id {
continue iter
}
}

if reqNode == nil {
reqNode = node
continue
}

// select best node on this moment
nw, old := atomic.LoadInt64(&node.weight), atomic.LoadInt64(&reqNode.weight)
if nw > old || (nw == old && atomic.LoadInt64(&node.lastRespTime) < atomic.LoadInt64(&reqNode.lastRespTime)) {
reqNode = node
}
}

if reqNode != nil {
return context.WithValue(context.WithValue(ctx, _StickyCtxKey, reqNode.id), _StickyCtxUsedNodesKey, usedNodes), nil
}

return ctx, ErrNoNodesLeft
}

func (c *ConnectionPool) StickyContextWithNodeID(ctx context.Context, nodeId uint32) context.Context {
Expand Down
1 change: 1 addition & 0 deletions ton/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type LiteClient interface {
QueryLiteserver(ctx context.Context, payload tl.Serializable, result tl.Serializable) error
StickyContext(ctx context.Context) context.Context
StickyContextNextNode(ctx context.Context) (context.Context, error)
StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error)
StickyNodeID(ctx context.Context) uint32
}

Expand Down
2 changes: 1 addition & 1 deletion ton/dns/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestDNSClient_Resolve(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()

ctx = client.StickyContext(ctx)
ctx, _ = client.StickyContextNextNodeBalanced(ctx)

d, err := cli.Resolve(ctx, "foundation.ton")
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion ton/retrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (w *retryClient) QueryLiteserver(ctx context.Context, payload tl.Serializab
tries++

if err != nil {
if !errors.Is(err, liteclient.ErrADNLReqTimeout) && !errors.Is(err, context.DeadlineExceeded){
if !errors.Is(err, liteclient.ErrADNLReqTimeout) && !errors.Is(err, context.DeadlineExceeded) {
return err
}

Expand Down Expand Up @@ -72,3 +72,7 @@ func (w *retryClient) StickyNodeID(ctx context.Context) uint32 {
func (w *retryClient) StickyContextNextNode(ctx context.Context) (context.Context, error) {
return w.original.StickyContextNextNode(ctx)
}

func (w *retryClient) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) {
return w.original.StickyContextNextNodeBalanced(ctx)
}
4 changes: 4 additions & 0 deletions ton/timeouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@ func (c *timeoutClient) StickyNodeID(ctx context.Context) uint32 {
func (c *timeoutClient) StickyContextNextNode(ctx context.Context) (context.Context, error) {
return c.original.StickyContextNextNode(ctx)
}

func (c *timeoutClient) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) {
return c.original.StickyContextNextNodeBalanced(ctx)
}
4 changes: 4 additions & 0 deletions ton/waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,7 @@ func (w *waiterClient) StickyNodeID(ctx context.Context) uint32 {
func (w *waiterClient) StickyContextNextNode(ctx context.Context) (context.Context, error) {
return w.original.StickyContextNextNode(ctx)
}

func (w *waiterClient) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) {
return w.original.StickyContextNextNodeBalanced(ctx)
}

0 comments on commit 528b87a

Please sign in to comment.