From 528b87a2d8e1fc2fb0d995c82d634dad28f1d3a2 Mon Sep 17 00:00:00 2001 From: Oleg Baranov Date: Thu, 13 Jun 2024 12:45:17 +0400 Subject: [PATCH] StickyContextNextNodeBalanced method --- liteclient/integration_test.go | 6 ++++- liteclient/pool.go | 44 +++++++++++++++++++++++++++++++++- ton/api.go | 1 + ton/dns/integration_test.go | 2 +- ton/retrier.go | 6 ++++- ton/timeouter.go | 4 ++++ ton/waiter.go | 4 ++++ 7 files changed, 63 insertions(+), 4 deletions(-) diff --git a/liteclient/integration_test.go b/liteclient/integration_test.go index fb172f81..c03ff8e3 100644 --- a/liteclient/integration_test.go +++ b/liteclient/integration_test.go @@ -87,13 +87,17 @@ func Test_ConnSticky(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - ctx = client.StickyContext(ctx) err := client.AddConnectionsFromConfigUrl(ctx, "https://tonutils.com/global.config.json") if err != nil { t.Fatal("add connections err", err) } + ctx, err = client.StickyContextNextNodeBalanced(ctx) + if err != nil { + t.Fatal("next balanced err", err) + } + doReq := func(expErr error) { var resp tl.Serializable err := client.QueryLiteserver(ctx, GetMasterchainInf{}, &resp) diff --git a/liteclient/pool.go b/liteclient/pool.go index 09168372..67fd07d6 100644 --- a/liteclient/pool.go +++ b/liteclient/pool.go @@ -22,6 +22,7 @@ const _StickyCtxUsedNodesKey = "_ton_used_nodes_sticky" var ( ErrNoActiveConnections = errors.New("no active connections") ErrADNLReqTimeout = errors.New("adnl request timeout") + ErrNoNodesLeft = errors.New("no more active nodes left") ) type OnDisconnectCallback func(addr, key string) @@ -97,6 +98,7 @@ func (c *ConnectionPool) StickyContext(ctx context.Context) context.Context { return context.WithValue(ctx, _StickyCtxKey, id) } +// StickyContextNextNode - select next node in the available list (pseudo random) func (c *ConnectionPool) StickyContextNextNode(ctx context.Context) (context.Context, error) { nodeID, _ := ctx.Value(_StickyCtxKey).(uint32) usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) @@ -118,7 +120,47 @@ iter: return context.WithValue(context.WithValue(ctx, _StickyCtxKey, node.id), _StickyCtxUsedNodesKey, usedNodes), nil } - return ctx, fmt.Errorf("no more active nodes left") + return ctx, ErrNoNodesLeft +} + +// StickyContextNextNodeBalanced - select next node based on its weight and availability +func (c *ConnectionPool) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) { + nodeID, _ := ctx.Value(_StickyCtxKey).(uint32) + usedNodes, _ := ctx.Value(_StickyCtxUsedNodesKey).([]uint32) + if nodeID > 0 { + usedNodes = append(usedNodes, nodeID) + } + + c.nodesMx.RLock() + defer c.nodesMx.RUnlock() + + var reqNode *connection + +iter: + for _, node := range c.activeNodes { + for _, usedNode := range usedNodes { + if usedNode == node.id { + continue iter + } + } + + if reqNode == nil { + reqNode = node + continue + } + + // select best node on this moment + nw, old := atomic.LoadInt64(&node.weight), atomic.LoadInt64(&reqNode.weight) + if nw > old || (nw == old && atomic.LoadInt64(&node.lastRespTime) < atomic.LoadInt64(&reqNode.lastRespTime)) { + reqNode = node + } + } + + if reqNode != nil { + return context.WithValue(context.WithValue(ctx, _StickyCtxKey, reqNode.id), _StickyCtxUsedNodesKey, usedNodes), nil + } + + return ctx, ErrNoNodesLeft } func (c *ConnectionPool) StickyContextWithNodeID(ctx context.Context, nodeId uint32) context.Context { diff --git a/ton/api.go b/ton/api.go index e86c74af..8ded6d8a 100644 --- a/ton/api.go +++ b/ton/api.go @@ -34,6 +34,7 @@ type LiteClient interface { QueryLiteserver(ctx context.Context, payload tl.Serializable, result tl.Serializable) error StickyContext(ctx context.Context) context.Context StickyContextNextNode(ctx context.Context) (context.Context, error) + StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) StickyNodeID(ctx context.Context) uint32 } diff --git a/ton/dns/integration_test.go b/ton/dns/integration_test.go index eee7ce62..2778b805 100644 --- a/ton/dns/integration_test.go +++ b/ton/dns/integration_test.go @@ -33,7 +33,7 @@ func TestDNSClient_Resolve(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) defer cancel() - ctx = client.StickyContext(ctx) + ctx, _ = client.StickyContextNextNodeBalanced(ctx) d, err := cli.Resolve(ctx, "foundation.ton") if err != nil { diff --git a/ton/retrier.go b/ton/retrier.go index 94edba32..70e17be5 100644 --- a/ton/retrier.go +++ b/ton/retrier.go @@ -25,7 +25,7 @@ func (w *retryClient) QueryLiteserver(ctx context.Context, payload tl.Serializab tries++ if err != nil { - if !errors.Is(err, liteclient.ErrADNLReqTimeout) && !errors.Is(err, context.DeadlineExceeded){ + if !errors.Is(err, liteclient.ErrADNLReqTimeout) && !errors.Is(err, context.DeadlineExceeded) { return err } @@ -72,3 +72,7 @@ func (w *retryClient) StickyNodeID(ctx context.Context) uint32 { func (w *retryClient) StickyContextNextNode(ctx context.Context) (context.Context, error) { return w.original.StickyContextNextNode(ctx) } + +func (w *retryClient) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) { + return w.original.StickyContextNextNodeBalanced(ctx) +} diff --git a/ton/timeouter.go b/ton/timeouter.go index cd617437..af850743 100644 --- a/ton/timeouter.go +++ b/ton/timeouter.go @@ -30,3 +30,7 @@ func (c *timeoutClient) StickyNodeID(ctx context.Context) uint32 { func (c *timeoutClient) StickyContextNextNode(ctx context.Context) (context.Context, error) { return c.original.StickyContextNextNode(ctx) } + +func (c *timeoutClient) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) { + return c.original.StickyContextNextNodeBalanced(ctx) +} diff --git a/ton/waiter.go b/ton/waiter.go index ae36d584..3350cf33 100644 --- a/ton/waiter.go +++ b/ton/waiter.go @@ -49,3 +49,7 @@ func (w *waiterClient) StickyNodeID(ctx context.Context) uint32 { func (w *waiterClient) StickyContextNextNode(ctx context.Context) (context.Context, error) { return w.original.StickyContextNextNode(ctx) } + +func (w *waiterClient) StickyContextNextNodeBalanced(ctx context.Context) (context.Context, error) { + return w.original.StickyContextNextNodeBalanced(ctx) +}