From 642f0e919b0d8b420e68c339fbb58a894fae2d54 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 26 Sep 2024 10:19:36 +0800 Subject: [PATCH] client: Support parallel TSO RPC requests on single dispatcher loop (#8633) close tikv/pd#8432 client: Support parallel TSO RPC requests on single dispatcher loop This commit supports handling multiple TSO RPC concurrently in one single dispatcher loop to reduce the expected time that each GetTS call spent on waiting the next batch. Signed-off-by: MyonKeminta Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- client/client.go | 6 + client/metrics.go | 10 ++ client/option.go | 15 ++ client/tso_batch_controller.go | 42 +++++ client/tso_dispatcher.go | 226 +++++++++++++++++++++++--- client/tso_dispatcher_test.go | 281 ++++++++++++++++++++++++++++++++- client/tso_request.go | 16 +- client/tso_stream.go | 106 ++++++++++++- client/tso_stream_test.go | 176 +++++++++++++++++++-- 9 files changed, 828 insertions(+), 50 deletions(-) diff --git a/client/client.go b/client/client.go index d5cf7cf28d0..ba4741e4ed9 100644 --- a/client/client.go +++ b/client/client.go @@ -797,6 +797,12 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") } c.option.setEnableFollowerHandle(enable) + case TSOClientRPCConcurrency: + value, ok := value.(int) + if !ok { + return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int") + } + c.option.setTSOClientRPCConcurrency(value) default: return errors.New("[pd] unsupported client option") } diff --git a/client/metrics.go b/client/metrics.go index a83b4a36407..d1b375aea8a 100644 --- a/client/metrics.go +++ b/client/metrics.go @@ -47,6 +47,7 @@ var ( tsoBatchSendLatency prometheus.Histogram requestForwarded *prometheus.GaugeVec ongoingRequestCountGauge *prometheus.GaugeVec + estimateTSOLatencyGauge *prometheus.GaugeVec ) func initMetrics(constLabels prometheus.Labels) { @@ -127,6 +128,14 @@ func initMetrics(constLabels prometheus.Labels) { Help: "Current count of ongoing batch tso requests", ConstLabels: constLabels, }, []string{"stream"}) + estimateTSOLatencyGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd_client", + Subsystem: "request", + Name: "estimate_tso_latency", + Help: "Estimated latency of an RTT of getting TSO", + ConstLabels: constLabels, + }, []string{"stream"}) } var ( @@ -236,4 +245,5 @@ func registerMetrics() { prometheus.MustRegister(tsoBatchSize) prometheus.MustRegister(tsoBatchSendLatency) prometheus.MustRegister(requestForwarded) + prometheus.MustRegister(estimateTSOLatencyGauge) } diff --git a/client/option.go b/client/option.go index 0109bfc4ed0..3f2b7119b52 100644 --- a/client/option.go +++ b/client/option.go @@ -29,6 +29,7 @@ const ( defaultMaxTSOBatchWaitInterval time.Duration = 0 defaultEnableTSOFollowerProxy = false defaultEnableFollowerHandle = false + defaultTSOClientRPCConcurrency = 1 ) // DynamicOption is used to distinguish the dynamic option type. @@ -43,6 +44,8 @@ const ( EnableTSOFollowerProxy // EnableFollowerHandle is the follower handle option. EnableFollowerHandle + // TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client. + TSOClientRPCConcurrency dynamicOptionCount ) @@ -77,6 +80,7 @@ func newOption() *option { co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval) co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy) co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle) + co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency) return co } @@ -127,3 +131,14 @@ func (o *option) setEnableTSOFollowerProxy(enable bool) { func (o *option) getEnableTSOFollowerProxy() bool { return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool) } + +func (o *option) setTSOClientRPCConcurrency(value int) { + old := o.getTSOClientRPCConcurrency() + if value != old { + o.dynamicOptions[TSOClientRPCConcurrency].Store(value) + } +} + +func (o *option) getTSOClientRPCConcurrency() int { + return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) +} diff --git a/client/tso_batch_controller.go b/client/tso_batch_controller.go index 32191889160..b810e108667 100644 --- a/client/tso_batch_controller.go +++ b/client/tso_batch_controller.go @@ -64,6 +64,17 @@ func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequ // TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here. tbc.collectedRequestCount = 0 for { + // If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more + // requests, and return when token is ready. + if tbc.collectedRequestCount >= tbc.maxBatchSize && !tokenAcquired { + select { + case <-ctx.Done(): + return ctx.Err() + case <-tokenCh: + return nil + } + } + select { case <-ctx.Done(): return ctx.Err() @@ -146,6 +157,37 @@ fetchPendingRequestsLoop: return nil } +// fetchRequestsWithTimer tries to fetch requests until the given timer ticks. The caller must set the timer properly +// before calling this function. +func (tbc *tsoBatchController) fetchRequestsWithTimer(ctx context.Context, tsoRequestCh <-chan *tsoRequest, timer *time.Timer) error { +batchingLoop: + for tbc.collectedRequestCount < tbc.maxBatchSize { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-tsoRequestCh: + tbc.pushRequest(req) + case <-timer.C: + break batchingLoop + } + } + + // Try to collect more requests in non-blocking way. +nonWaitingBatchLoop: + for tbc.collectedRequestCount < tbc.maxBatchSize { + select { + case <-ctx.Done(): + return ctx.Err() + case req := <-tsoRequestCh: + tbc.pushRequest(req) + default: + break nonWaitingBatchLoop + } + } + + return nil +} + func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) { tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq tbc.collectedRequestCount++ diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index a1e0b03a1fa..7febf194f3c 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -17,9 +17,11 @@ package pd import ( "context" "fmt" + "math" "math/rand" "runtime/trace" "sync" + "sync/atomic" "time" "github.com/opentracing/opentracing-go" @@ -61,6 +63,7 @@ type tsoInfo struct { respReceivedAt time.Time physical int64 logical int64 + sourceStreamID string } type tsoServiceProvider interface { @@ -69,6 +72,8 @@ type tsoServiceProvider interface { updateConnectionCtxs(ctx context.Context, dc string, connectionCtxs *sync.Map) bool } +const dispatcherCheckRPCConcurrencyInterval = time.Second * 5 + type tsoDispatcher struct { ctx context.Context cancel context.CancelFunc @@ -79,7 +84,7 @@ type tsoDispatcher struct { connectionCtxs *sync.Map tsoRequestCh chan *tsoRequest tsDeadlineCh chan *deadline - lastTSOInfo *tsoInfo + latestTSOInfo atomic.Pointer[tsoInfo] // For reusing tsoBatchController objects batchBufferPool *sync.Pool @@ -87,7 +92,10 @@ type tsoDispatcher struct { // A token must be acquired here before sending an RPC request, and the token must be put back after finishing the // RPC. This is used like a semaphore, but we don't use semaphore directly here as it cannot be selected with // other channels. - tokenCh chan struct{} + tokenCh chan struct{} + lastCheckConcurrencyTime time.Time + tokenCount int + rpcConcurrency int updateConnectionCtxsCh chan struct{} } @@ -115,7 +123,7 @@ func newTSODispatcher( provider: provider, connectionCtxs: &sync.Map{}, tsoRequestCh: tsoRequestCh, - tsDeadlineCh: make(chan *deadline, 1), + tsDeadlineCh: make(chan *deadline, tokenChCapacity), batchBufferPool: &sync.Pool{ New: func() any { return newTSOBatchController(maxBatchSize * 2) @@ -187,9 +195,6 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { batchController *tsoBatchController ) - // Currently only 1 concurrency is supported. Put one token in. - td.tokenCh <- struct{}{} - log.Info("[tso] tso dispatcher created", zap.String("dc-location", dc)) // Clean up the connectionCtxs when the dispatcher exits. defer func() { @@ -200,6 +205,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { return true }) if batchController != nil && batchController.collectedRequestCount != 0 { + // If you encounter this failure, please check the stack in the logs to see if it's a panic. log.Fatal("batched tso requests not cleared when exiting the tso dispatcher loop", zap.Any("panic", recover())) } tsoErr := errors.WithStack(errClosing) @@ -219,6 +225,12 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(option.timeout) defer streamLoopTimer.Stop() + + // Create a not-started-timer to be used for collecting batches for concurrent RPC. + batchingTimer := time.NewTimer(0) + <-batchingTimer.C + defer batchingTimer.Stop() + bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime) tsoBatchLoop: for { @@ -233,8 +245,18 @@ tsoBatchLoop: batchController = td.batchBufferPool.Get().(*tsoBatchController) } - // Start to collect the TSO requests. maxBatchWaitInterval := option.getMaxTSOBatchWaitInterval() + + currentBatchStartTime := time.Now() + // Update concurrency settings if needed. + if err = td.checkTSORPCConcurrency(ctx, maxBatchWaitInterval, currentBatchStartTime); err != nil { + // checkTSORPCConcurrency can only fail due to `ctx` being invalidated. + log.Info("[tso] stop checking tso rpc concurrency configurations due to context canceled", + zap.String("dc-location", dc), zap.Error(err)) + return + } + + // Start to collect the TSO requests. // Once the TSO requests are collected, must make sure they could be finished or revoked eventually, // otherwise the upper caller may get blocked on waiting for the results. if err = batchController.fetchPendingRequests(ctx, td.tsoRequestCh, td.tokenCh, maxBatchWaitInterval); err != nil { @@ -318,6 +340,57 @@ tsoBatchLoop: break streamChoosingLoop } + + noDelay := false + failpoint.Inject("tsoDispatcherConcurrentModeNoDelay", func() { + noDelay = true + }) + + // If concurrent RPC is enabled, the time for collecting each request batch is expected to be + // estimatedRPCDuration / concurrency. Note the time mentioned here is counted from starting trying to collect + // the batch, instead of the time when the first request arrives. + // Here, if the elapsed time since starting collecting this batch didn't reach the expected batch time, then + // continue collecting. + if td.isConcurrentRPCEnabled() { + estimatedLatency := stream.EstimatedRPCLatency() + goalBatchTime := estimatedLatency / time.Duration(td.rpcConcurrency) + + failpoint.Inject("tsoDispatcherConcurrentModeAssertDelayDuration", func(val failpoint.Value) { + if s, ok := val.(string); ok { + expected, err := time.ParseDuration(s) + if err != nil { + panic(err) + } + if math.Abs(expected.Seconds()-goalBatchTime.Seconds()) > 1e-6 { + log.Fatal("tsoDispatcher: trying to delay for unexpected duration for the batch", zap.Duration("goalBatchTime", goalBatchTime), zap.Duration("expectedBatchTime", expected)) + } + } else { + panic("invalid value for failpoint tsoDispatcherConcurrentModeAssertDelayDuration: expected string") + } + }) + + waitTimerStart := time.Now() + remainingBatchTime := goalBatchTime - waitTimerStart.Sub(currentBatchStartTime) + if remainingBatchTime > 0 && !noDelay { + if !batchingTimer.Stop() { + select { + case <-batchingTimer.C: + default: + } + } + batchingTimer.Reset(remainingBatchTime) + + err = batchController.fetchRequestsWithTimer(ctx, td.tsoRequestCh, batchingTimer) + if err != nil { + // There should not be other kinds of errors. + log.Info("[tso] stop fetching the pending tso requests due to context canceled", + zap.String("dc-location", dc), zap.Error(err)) + td.cancelCollectedRequests(batchController, invalidStreamID, errors.WithStack(ctx.Err())) + return + } + } + } + done := make(chan struct{}) dl := newTSDeadline(option.timeout, done, cancel) select { @@ -495,6 +568,9 @@ func (td *tsoDispatcher) processRequests( reqKeyspaceGroupID = svcDiscovery.GetKeyspaceGroupID() ) + // Load latest allocated ts for monotonicity assertion. + tsoInfoBeforeReq := td.latestTSOInfo.Load() + cb := func(result tsoRequestResult, reqKeyspaceGroupID uint32, err error) { // As golang doesn't allow double-closing a channel, here is implicitly a check that the callback // is never called twice or called while it's also being cancelled elsewhere. @@ -513,10 +589,12 @@ func (td *tsoDispatcher) processRequests( respReceivedAt: time.Now(), physical: result.physical, logical: result.logical, + sourceStreamID: stream.streamID, } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(result.logical, -int64(result.count)+1, result.suffixBits) - td.compareAndSwapTS(curTSOInfo, firstLogical) + // Do the check before releasing the token. + td.checkMonotonicity(tsoInfoBeforeReq, curTSOInfo, firstLogical) td.doneCollectedRequests(tbc, result.physical, firstLogical, result.suffixBits, stream.streamID) } @@ -542,32 +620,35 @@ func (td *tsoDispatcher) doneCollectedRequests(tbc *tsoBatchController, physical tbc.finishCollectedRequests(physical, firstLogical, suffixBits, streamID, nil) } -func (td *tsoDispatcher) compareAndSwapTS( - curTSOInfo *tsoInfo, firstLogical int64, +// checkMonotonicity checks whether the monotonicity of the TSO allocation is violated. +// It asserts (curTSOInfo, firstLogical) must be larger than lastTSOInfo, and updates td.latestTSOInfo if it grows. +// +// Note that when concurrent RPC is enabled, the lastTSOInfo may not be the latest value stored in td.latestTSOInfo +// field. Instead, it's the value that was loaded just before the current RPC request's beginning. The reason is, +// if two requests processing time has overlap, they don't have a strong order, and the later-finished one may be +// allocated later (with larger value) than another. We only need to guarantee request A returns larger ts than B +// if request A *starts* after request B *finishes*. +func (td *tsoDispatcher) checkMonotonicity( + lastTSOInfo *tsoInfo, curTSOInfo *tsoInfo, firstLogical int64, ) { - if td.lastTSOInfo != nil { - var ( - lastTSOInfo = td.lastTSOInfo - dc = td.dc - physical = curTSOInfo.physical - keyspaceID = td.provider.getServiceDiscovery().GetKeyspaceID() - ) - if td.lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { + keyspaceID := td.provider.getServiceDiscovery().GetKeyspaceID() + if lastTSOInfo != nil { + if lastTSOInfo.respKeyspaceGroupID != curTSOInfo.respKeyspaceGroupID { log.Info("[tso] keyspace group changed", - zap.String("dc-location", dc), + zap.String("dc-location", td.dc), zap.Uint32("old-group-id", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("new-group-id", curTSOInfo.respKeyspaceGroupID)) } // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then - // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned + // all TSOs we get will be [6, 7, 8, 9, 10]. latestTSOInfo.logical stores the logical part of the largest ts returned // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + if tsoutil.TSLessEqual(curTSOInfo.physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { log.Panic("[tso] timestamp fallback", - zap.String("dc-location", dc), + zap.String("dc-location", td.dc), zap.Uint32("keyspace", keyspaceID), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), - zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", curTSOInfo.physical, firstLogical)), zap.String("last-tso-server", lastTSOInfo.tsoServer), zap.String("cur-tso-server", curTSOInfo.tsoServer), zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), @@ -575,8 +656,103 @@ func (td *tsoDispatcher) compareAndSwapTS( zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), + zap.String("last-stream-id", lastTSOInfo.sourceStreamID), + zap.String("cur-stream-id", curTSOInfo.sourceStreamID)) + } + } + + if td.latestTSOInfo.CompareAndSwap(nil, curTSOInfo) { + // If latestTSOInfo is missing, simply store it and exit. + return + } + + // Replace if we are holding a larger ts than that has been recorded. + for { + old := td.latestTSOInfo.Load() + if tsoutil.TSLessEqual(curTSOInfo.physical, curTSOInfo.logical, old.physical, old.logical) { + // The current one is large enough. Skip. + break + } + if td.latestTSOInfo.CompareAndSwap(old, curTSOInfo) { + // Successfully replaced. + break + } + } +} + +// checkTSORPCConcurrency checks configurations about TSO RPC concurrency, and adjust the token count if needed. +// Some other options (EnableTSOFollowerProxy and MaxTSOBatchWaitInterval) may affect the availability of concurrent +// RPC requests. As the dispatcher loop loads MaxTSOBatchWaitInterval in each single circle, pass it directly to this +// function. Other configurations will be loaded within this function when needed. +// +// Behavior of the function: +// - As concurrent TSO RPC requests is an optimization aiming on the opposite purpose to that of EnableTSOFollowerProxy +// and MaxTSOBatchWaitInterval, so once either EnableTSOFollowerProxy and MaxTSOBatchWaitInterval is enabled, the +// concurrency will always be set to 1 no matter how the user configured it. +// - Normally, this function takes effect in a limited frequency controlled by dispatcherCheckRPCConcurrencyInterval. +// However, if the RPC concurrency is set to more than 1, and MaxTSOBatchWaitInterval is changed from disabled into +// enabled (0 -> positive), this function takes effect immediately to disable concurrent RPC requests. +// - After this function takes effect, the final decision of concurrency and token count will be set to +// td.rpcConcurrency and td.tokenCount; and tokens available in td.tokenCh will also be adjusted. +func (td *tsoDispatcher) checkTSORPCConcurrency(ctx context.Context, maxBatchWaitInterval time.Duration, now time.Time) error { + // If we currently enabled concurrent TSO RPC requests, but `maxBatchWaitInterval` is a positive value, it must + // because that MaxTSOBatchWaitInterval is just enabled. In this case, disable concurrent TSO RPC requests + // immediately, because MaxTSOBatchWaitInterval and concurrent RPC requests has opposite purpose. + immediatelyUpdate := td.rpcConcurrency > 1 && maxBatchWaitInterval > 0 + + // Allow always updating for test purpose. + failpoint.Inject("tsoDispatcherAlwaysCheckConcurrency", func() { + immediatelyUpdate = true + }) + + if !immediatelyUpdate && now.Sub(td.lastCheckConcurrencyTime) < dispatcherCheckRPCConcurrencyInterval { + return nil + } + td.lastCheckConcurrencyTime = now + + newConcurrency := td.provider.getOption().getTSOClientRPCConcurrency() + if maxBatchWaitInterval > 0 || td.provider.getOption().getEnableTSOFollowerProxy() { + newConcurrency = 1 + } + + if newConcurrency == td.rpcConcurrency { + return nil + } + + log.Info("[tso] switching tso rpc concurrency", zap.Int("old", td.rpcConcurrency), zap.Int("new", newConcurrency)) + td.rpcConcurrency = newConcurrency + + // Find a proper token count. + // When the concurrency is set to 1, there's only 1 token, which means only 1 RPC request can run at the same + // time. + // When the concurrency is set to more than 1, the time interval between sending two batches of requests is + // controlled by an estimation of an average RPC duration. But as the duration of an RPC may jitter in the network, + // and an RPC request may finish earlier or later. So we allow there to be the actual number of concurrent ongoing + // request to be fluctuating. So in this case, the token count will be set to 2 times the expected concurrency. + newTokenCount := newConcurrency + if newConcurrency > 1 { + newTokenCount = newConcurrency * 2 + } + + if newTokenCount > td.tokenCount { + for td.tokenCount < newTokenCount { + td.tokenCh <- struct{}{} + td.tokenCount++ + } + } else if newTokenCount < td.tokenCount { + for td.tokenCount > newTokenCount { + select { + case <-ctx.Done(): + return ctx.Err() + case <-td.tokenCh: + } + td.tokenCount-- } } - td.lastTSOInfo = curTSOInfo + return nil +} + +func (td *tsoDispatcher) isConcurrentRPCEnabled() bool { + return td.rpcConcurrency > 1 } diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index b8f0fcef208..bf038e7b7f3 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -18,20 +18,27 @@ import ( "context" "fmt" "sync" + "sync/atomic" "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "go.uber.org/zap/zapcore" ) type mockTSOServiceProvider struct { - option *option + option *option + createStream func(ctx context.Context) *tsoStream + updateConnMu sync.Mutex } -func newMockTSOServiceProvider(option *option) *mockTSOServiceProvider { +func newMockTSOServiceProvider(option *option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { return &mockTSOServiceProvider{ - option: option, + option: option, + createStream: createStream, } } @@ -43,17 +50,279 @@ func (*mockTSOServiceProvider) getServiceDiscovery() ServiceDiscovery { return NewMockPDServiceDiscovery([]string{mockStreamURL}, nil) } -func (*mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { +func (m *mockTSOServiceProvider) updateConnectionCtxs(ctx context.Context, _dc string, connectionCtxs *sync.Map) bool { + // Avoid concurrent updating in the background updating goroutine and active updating in the dispatcher loop when + // stream is missing. + m.updateConnMu.Lock() + defer m.updateConnMu.Unlock() + _, ok := connectionCtxs.Load(mockStreamURL) if ok { return true } ctx, cancel := context.WithCancel(ctx) - stream := newTSOStream(ctx, mockStreamURL, newMockTSOStreamImpl(ctx, true)) + var stream *tsoStream + if m.createStream == nil { + stream = newTSOStream(ctx, mockStreamURL, newMockTSOStreamImpl(ctx, resultModeGenerated)) + } else { + stream = m.createStream(ctx) + } connectionCtxs.LoadOrStore(mockStreamURL, &tsoConnectionContext{ctx, cancel, mockStreamURL, stream}) return true } +type testTSODispatcherSuite struct { + suite.Suite + re *require.Assertions + + streamInner *mockTSOStreamImpl + stream *tsoStream + dispatcher *tsoDispatcher + dispatcherWg sync.WaitGroup + option *option + + reqPool *sync.Pool +} + +func (s *testTSODispatcherSuite) SetupTest() { + s.re = require.New(s.T()) + s.option = newOption() + s.option.timeout = time.Hour + // As the internal logic of the tsoDispatcher allows it to create streams multiple times, but our tests needs + // single stable access to the inner stream, we do not allow it to create it more than once in these tests. + creating := new(atomic.Bool) + // To avoid data race on reading `stream` and `streamInner` fields. + created := new(atomic.Bool) + createStream := func(ctx context.Context) *tsoStream { + if !creating.CompareAndSwap(false, true) { + s.re.FailNow("testTSODispatcherSuite: trying to create stream more than once, which is unsupported in this tests") + } + s.streamInner = newMockTSOStreamImpl(ctx, resultModeGenerateOnSignal) + s.stream = newTSOStream(ctx, mockStreamURL, s.streamInner) + created.Store(true) + return s.stream + } + s.dispatcher = newTSODispatcher(context.Background(), globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(s.option, createStream)) + s.reqPool = &sync.Pool{ + New: func() any { + return &tsoRequest{ + done: make(chan error, 1), + physical: 0, + logical: 0, + dcLocation: globalDCLocation, + } + }, + } + + s.dispatcherWg.Add(1) + go s.dispatcher.handleDispatcher(&s.dispatcherWg) + + // Perform a request to ensure the stream must be created. + + { + ctx := context.Background() + req := s.sendReq(ctx) + s.reqMustNotReady(req) + // Wait until created + for !created.Load() { + time.Sleep(time.Millisecond) + } + s.streamInner.generateNext() + s.reqMustReady(req) + } + s.re.True(created.Load()) + s.re.NotNil(s.stream) +} + +func (s *testTSODispatcherSuite) TearDownTest() { + s.dispatcher.close() + s.streamInner.stop() + s.dispatcherWg.Wait() + s.stream.WaitForClosed() + s.streamInner = nil + s.stream = nil + s.dispatcher = nil + s.reqPool = nil +} + +func (s *testTSODispatcherSuite) getReq(ctx context.Context) *tsoRequest { + req := s.reqPool.Get().(*tsoRequest) + req.clientCtx = context.Background() + req.requestCtx = ctx + req.physical = 0 + req.logical = 0 + req.start = time.Now() + req.pool = s.reqPool + return req +} + +func (s *testTSODispatcherSuite) sendReq(ctx context.Context) *tsoRequest { + req := s.getReq(ctx) + s.dispatcher.push(req) + return req +} + +func (s *testTSODispatcherSuite) reqMustNotReady(req *tsoRequest) { + _, _, err := req.waitTimeout(time.Millisecond * 50) + s.re.Error(err) + s.re.ErrorIs(err, context.DeadlineExceeded) +} + +func (s *testTSODispatcherSuite) reqMustReady(req *tsoRequest) (physical int64, logical int64) { + physical, logical, err := req.waitTimeout(time.Second) + s.re.NoError(err) + return physical, logical +} + +func TestTSODispatcherTestSuite(t *testing.T) { + suite.Run(t, new(testTSODispatcherSuite)) +} + +func (s *testTSODispatcherSuite) TestBasic() { + ctx := context.Background() + req := s.sendReq(ctx) + s.reqMustNotReady(req) + s.streamInner.generateNext() + s.reqMustReady(req) +} + +func (s *testTSODispatcherSuite) checkIdleTokenCount(expectedTotal int) { + // When the tsoDispatcher is idle, the dispatcher loop will acquire a token and wait for requests. Therefore + // there should be N-1 free tokens remaining. + spinStart := time.Now() + for time.Since(spinStart) < time.Second { + if s.dispatcher.tokenCount != expectedTotal { + continue + } + if len(s.dispatcher.tokenCh) == expectedTotal-1 { + break + } + } + s.re.Equal(expectedTotal, s.dispatcher.tokenCount) + s.re.Len(s.dispatcher.tokenCh, expectedTotal-1) +} + +func (s *testTSODispatcherSuite) testStaticConcurrencyImpl(concurrency int) { + ctx := context.Background() + s.option.setTSOClientRPCConcurrency(concurrency) + + // Make sure the state of the mock stream is clear. Unexpected batching may make the requests sent to the stream + // less than expected, causing there are more `generateNext` signals or generated results. + s.re.Empty(s.streamInner.resultCh) + + // The dispatcher may block on fetching requests, which is after checking concurrency option. Perform a request + // to make sure the concurrency setting takes effect. + req := s.sendReq(ctx) + s.reqMustNotReady(req) + s.streamInner.generateNext() + s.reqMustReady(req) + + // For concurrent mode, the actual token count is twice the concurrency. + // Note that the concurrency is a hint, and it's allowed to have more than `concurrency` requests running. + tokenCount := concurrency + if concurrency > 1 { + tokenCount = concurrency * 2 + } + s.checkIdleTokenCount(tokenCount) + + // As the failpoint `tsoDispatcherConcurrentModeNoDelay` is set, tsoDispatcher won't collect requests in blocking + // way. And as `reqMustNotReady` delays for a while, requests shouldn't be batched as long as there are free tokens. + // The first N requests (N=tokenCount) will each be a single batch, occupying a token. The last 3 are blocked, + // and will be batched together once there is a free token. + reqs := make([]*tsoRequest, 0, tokenCount+3) + + for i := 0; i < tokenCount+3; i++ { + req := s.sendReq(ctx) + s.reqMustNotReady(req) + reqs = append(reqs, req) + } + + // The dispatcher won't process more request batches if tokens are used up. + // Note that `reqMustNotReady` contains a delay, which makes it nearly impossible that dispatcher is processing the + // second batch but not finished yet. + // Also note that in current implementation, the tsoStream tries to receive the next result before checking + // the `tsoStream.pendingRequests` queue. Changing this behavior may need to update this test. + for i := 0; i < tokenCount+3; i++ { + expectedPending := tokenCount + 1 - i + if expectedPending > tokenCount { + expectedPending = tokenCount + } + if expectedPending < 0 { + expectedPending = 0 + } + + // Spin for a while as the dispatcher loop may have not finished sending next batch to pendingRequests + spinStart := time.Now() + for time.Since(spinStart) < time.Second { + if expectedPending == len(s.stream.pendingRequests) { + break + } + } + s.re.Len(s.stream.pendingRequests, expectedPending) + + req := reqs[i] + // The last 3 requests should be in a single batch. Don't need to generate new results for the last 2. + if i <= tokenCount { + s.reqMustNotReady(req) + s.streamInner.generateNext() + } + s.reqMustReady(req) + } +} + +func (s *testTSODispatcherSuite) TestConcurrentRPC() { + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency", "return")) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay")) + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherAlwaysCheckConcurrency")) + }() + + s.testStaticConcurrencyImpl(1) + s.testStaticConcurrencyImpl(2) + s.testStaticConcurrencyImpl(4) + s.testStaticConcurrencyImpl(16) +} + +func (s *testTSODispatcherSuite) TestBatchDelaying() { + ctx := context.Background() + s.option.setTSOClientRPCConcurrency(2) + + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`)) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay")) + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency")) + }() + + // Make sure concurrency option takes effect. + req := s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + // Trigger the check. + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("6ms")`)) + defer func() { + s.re.NoError(failpoint.Disable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration")) + }() + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + // Try other concurrency. + s.option.setTSOClientRPCConcurrency(3) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`)) + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) + + s.option.setTSOClientRPCConcurrency(4) + s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`)) + req = s.sendReq(ctx) + s.streamInner.generateNext() + s.reqMustReady(req) +} + func BenchmarkTSODispatcherHandleRequests(b *testing.B) { log.SetLevel(zapcore.FatalLevel) @@ -80,7 +349,7 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) { return req } - dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption())) + dispatcher := newTSODispatcher(ctx, globalDCLocation, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption(), nil)) var wg sync.WaitGroup wg.Add(1) diff --git a/client/tso_request.go b/client/tso_request.go index fb2ae2bb92e..5c959673a8b 100644 --- a/client/tso_request.go +++ b/client/tso_request.go @@ -60,6 +60,11 @@ func (req *tsoRequest) tryDone(err error) { // Wait will block until the TSO result is ready. func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { + return req.waitCtx(req.requestCtx) +} + +// waitCtx waits for the TSO result with specified ctx, while not using req.requestCtx. +func (req *tsoRequest) waitCtx(ctx context.Context) (physical int64, logical int64, err error) { // If tso command duration is observed very high, the reason could be it // takes too long for Wait() be called. start := time.Now() @@ -78,13 +83,20 @@ func (req *tsoRequest) Wait() (physical int64, logical int64, err error) { cmdDurationWait.Observe(now.Sub(start).Seconds()) cmdDurationTSO.Observe(now.Sub(req.start).Seconds()) return - case <-req.requestCtx.Done(): - return 0, 0, errors.WithStack(req.requestCtx.Err()) + case <-ctx.Done(): + return 0, 0, errors.WithStack(ctx.Err()) case <-req.clientCtx.Done(): return 0, 0, errors.WithStack(req.clientCtx.Err()) } } +// waitTimeout waits for the TSO result for limited time. Currently only for test purposes. +func (req *tsoRequest) waitTimeout(timeout time.Duration) (physical int64, logical int64, err error) { + ctx, cancel := context.WithTimeout(req.requestCtx, timeout) + defer cancel() + return req.waitCtx(ctx) +} + type tsoRequestFastFail struct { err error } diff --git a/client/tso_stream.go b/client/tso_stream.go index 479beff2c6a..142ad71c6b9 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -18,11 +18,13 @@ import ( "context" "fmt" "io" + "math" "sync" "sync/atomic" "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" @@ -214,6 +216,8 @@ type tsoStream struct { state atomic.Int32 stoppedWithErr atomic.Pointer[error] + estimatedLatencyMicros atomic.Uint64 + ongoingRequestCountGauge prometheus.Gauge ongoingRequests atomic.Int32 } @@ -226,7 +230,10 @@ const ( var streamIDAlloc atomic.Int32 -const invalidStreamID = "" +const ( + invalidStreamID = "" + maxPendingRequestsInTSOStream = 64 +) func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAdapter) *tsoStream { streamID := fmt.Sprintf("%s-%d", serverURL, streamIDAlloc.Add(1)) @@ -238,7 +245,7 @@ func newTSOStream(ctx context.Context, serverURL string, stream grpcTSOStreamAda stream: stream, streamID: streamID, - pendingRequests: make(chan batchedRequests, 64), + pendingRequests: make(chan batchedRequests, maxPendingRequestsInTSOStream), cancel: cancel, @@ -363,6 +370,27 @@ func (s *tsoStream) recvLoop(ctx context.Context) { s.ongoingRequestCountGauge.Set(0) }() + // For calculating the estimated RPC latency. + const ( + filterCutoffFreq float64 = 1.0 + filterNewSampleWeightUpperbound float64 = 0.2 + ) + // The filter applies on logarithm of the latency of each TSO RPC in microseconds. + filter := newRCFilter(filterCutoffFreq, filterNewSampleWeightUpperbound) + + updateEstimatedLatency := func(sampleTime time.Time, latency time.Duration) { + if latency < 0 { + // Unreachable + return + } + currentSample := math.Log(float64(latency.Microseconds())) + filteredValue := filter.update(sampleTime, currentSample) + micros := math.Exp(filteredValue) + s.estimatedLatencyMicros.Store(uint64(micros)) + // Update the metrics in seconds. + estimateTSOLatencyGauge.WithLabelValues(s.streamID).Set(micros * 1e-6) + } + recvLoop: for { select { @@ -383,14 +411,15 @@ recvLoop: hasReq = false } - durationSeconds := time.Since(currentReq.startTime).Seconds() + latency := time.Since(currentReq.startTime) + latencySeconds := latency.Seconds() if err != nil { // If a request is pending and error occurs, observe the duration it has cost. // Note that it's also possible that the stream is broken due to network without being requested. In this // case, `Recv` may return an error while no request is pending. if hasReq { - requestFailedDurationTSO.Observe(durationSeconds) + requestFailedDurationTSO.Observe(latencySeconds) } if err == io.EOF { finishWithErr = errors.WithStack(errs.ErrClientTSOStreamClosed) @@ -403,9 +432,9 @@ recvLoop: break recvLoop } - latencySeconds := durationSeconds requestDurationTSO.Observe(latencySeconds) tsoBatchSize.Observe(float64(res.count)) + updateEstimatedLatency(currentReq.startTime, latency) if res.count != uint32(currentReq.count) { finishWithErr = errors.WithStack(errTSOLength) @@ -421,6 +450,28 @@ recvLoop: } } +// EstimatedRPCLatency returns an estimation of the duration of each TSO RPC. If the stream has never handled any RPC, +// this function returns 0. +func (s *tsoStream) EstimatedRPCLatency() time.Duration { + failpoint.Inject("tsoStreamSimulateEstimatedRPCLatency", func(val failpoint.Value) { + if s, ok := val.(string); ok { + duration, err := time.ParseDuration(s) + if err != nil { + panic(err) + } + failpoint.Return(duration) + } else { + panic("invalid failpoint value for `tsoStreamSimulateEstimatedRPCLatency`: expected string") + } + }) + latencyUs := s.estimatedLatencyMicros.Load() + // Limit it at least 100us + if latencyUs < 100 { + latencyUs = 100 + } + return time.Microsecond * time.Duration(latencyUs) +} + // GetRecvError returns the error (if any) that has been encountered when receiving response asynchronously. func (s *tsoStream) GetRecvError() error { perr := s.stoppedWithErr.Load() @@ -434,3 +485,48 @@ func (s *tsoStream) GetRecvError() error { func (s *tsoStream) WaitForClosed() { s.wg.Wait() } + +// rcFilter is a simple implementation of a discrete-time low-pass filter. +// Ref: https://en.wikipedia.org/wiki/Low-pass_filter#Simple_infinite_impulse_response_filter +// There are some differences between this implementation and the wikipedia one: +// - Time-interval between each two samples is not necessarily a constant. We allow non-even sample interval by simply +// calculating the alpha (which is calculated by `dt / (rc + dt)`) dynamically for each sample, at the expense of +// losing some mathematical strictness. +// - Support specifying the upperbound of the new sample when updating. This can be an approach to avoid the output +// jumps drastically when the samples come in a low frequency. +type rcFilter struct { + rc float64 + newSampleWeightUpperBound float64 + value float64 + lastSampleTime time.Time + firstSampleArrived bool +} + +// newRCFilter initializes an rcFilter. `cutoff` is the cutoff frequency in Hertz. `newSampleWeightUpperbound` controls +// the upper limit of the weight of each incoming sample (pass 1 for unlimited). +func newRCFilter(cutoff float64, newSampleWeightUpperBound float64) rcFilter { + rc := 1.0 / (2.0 * math.Pi * cutoff) + return rcFilter{ + rc: rc, + newSampleWeightUpperBound: newSampleWeightUpperBound, + } +} + +func (f *rcFilter) update(sampleTime time.Time, newSample float64) float64 { + // Handle the first sample + if !f.firstSampleArrived { + f.firstSampleArrived = true + f.lastSampleTime = sampleTime + f.value = newSample + return newSample + } + + // Delta time. + dt := sampleTime.Sub(f.lastSampleTime).Seconds() + // `alpha` is the weight of the new sample, limited with `newSampleWeightUpperBound`. + alpha := math.Min(dt/(f.rc+dt), f.newSampleWeightUpperBound) + f.value = (1-alpha)*f.value + alpha*newSample + + f.lastSampleTime = sampleTime + return f.value +} diff --git a/client/tso_stream_test.go b/client/tso_stream_test.go index b09c54baf3a..ab6f2786ff3 100644 --- a/client/tso_stream_test.go +++ b/client/tso_stream_test.go @@ -17,6 +17,7 @@ package pd import ( "context" "io" + "math" "testing" "time" @@ -42,6 +43,14 @@ type resultMsg struct { breakStream bool } +type resultMode int + +const ( + resultModeManual resultMode = iota + resultModeGenerated + resultModeGenerateOnSignal +) + type mockTSOStreamImpl struct { ctx context.Context requestCh chan requestMsg @@ -49,21 +58,21 @@ type mockTSOStreamImpl struct { keyspaceID uint32 errorState error - autoGenerateResult bool + resultMode resultMode // Current progress of generating TSO results resGenPhysical, resGenLogical int64 } -func newMockTSOStreamImpl(ctx context.Context, autoGenerateResult bool) *mockTSOStreamImpl { +func newMockTSOStreamImpl(ctx context.Context, resultMode resultMode) *mockTSOStreamImpl { return &mockTSOStreamImpl{ ctx: ctx, requestCh: make(chan requestMsg, 64), resultCh: make(chan resultMsg, 64), keyspaceID: 0, - autoGenerateResult: autoGenerateResult, - resGenPhysical: 10000, - resGenLogical: 0, + resultMode: resultMode, + resGenPhysical: 10000, + resGenLogical: 0, } } @@ -82,6 +91,17 @@ func (s *mockTSOStreamImpl) Send(clusterID uint64, _keyspaceID, keyspaceGroupID } func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { + var needGenerateResult, needResultSignal bool + switch s.resultMode { + case resultModeManual: + needResultSignal = true + case resultModeGenerated: + needGenerateResult = true + case resultModeGenerateOnSignal: + needResultSignal = true + needGenerateResult = true + } + // This stream have ever receive an error, it returns the error forever. if s.errorState != nil { return tsoRequestResult{}, s.errorState @@ -130,12 +150,12 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } s.errorState = res.err return tsoRequestResult{}, s.errorState - } else if s.autoGenerateResult { + } else if !needResultSignal { // Do not allow manually assigning result. panic("trying manually specifying result for mockTSOStreamImpl when it's auto-generating mode") } - } else if s.autoGenerateResult { - res = s.autoGenResult(req.count) + } else if !needResultSignal { + // Mark hasRes as true to skip receiving from resultCh. The actual value of the result will be generated later. hasRes = true } @@ -160,6 +180,10 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } } + if needGenerateResult { + res = s.autoGenResult(req.count) + } + // Both res and req should be ready here. if res.err != nil { s.errorState = res.err @@ -168,11 +192,14 @@ func (s *mockTSOStreamImpl) Recv() (tsoRequestResult, error) { } func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { + if count >= (1 << 18) { + panic("requested count too large") + } physical := s.resGenPhysical logical := s.resGenLogical + count if logical >= (1 << 18) { - physical += logical >> 18 - logical &= (1 << 18) - 1 + physical += 1 + logical = count } s.resGenPhysical = physical @@ -190,6 +217,9 @@ func (s *mockTSOStreamImpl) autoGenResult(count int64) resultMsg { } func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count uint32) { + if s.resultMode != resultModeManual { + panic("trying to manually specifying tso result on generating mode") + } s.resultCh <- resultMsg{ r: tsoRequestResult{ physical: physical, @@ -201,6 +231,13 @@ func (s *mockTSOStreamImpl) returnResult(physical int64, logical int64, count ui } } +func (s *mockTSOStreamImpl) generateNext() { + if s.resultMode != resultModeGenerateOnSignal { + panic("trying to signal generation when the stream is not generate-on-signal mode") + } + s.resultCh <- resultMsg{} +} + func (s *mockTSOStreamImpl) returnError(err error) { s.resultCh <- resultMsg{ err: err, @@ -233,7 +270,7 @@ type testTSOStreamSuite struct { func (s *testTSOStreamSuite) SetupTest() { s.re = require.New(s.T()) - s.inner = newMockTSOStreamImpl(context.Background(), false) + s.inner = newMockTSOStreamImpl(context.Background(), resultModeManual) s.stream = newTSOStream(context.Background(), mockStreamURL, s.inner) } @@ -454,10 +491,125 @@ func (s *testTSOStreamSuite) TestTSOStreamConcurrentRunning() { } } +func (s *testTSOStreamSuite) TestEstimatedLatency() { + s.inner.returnResult(100, 0, 1) + res := s.getResult(s.mustProcessRequestWithResultCh(1)) + s.re.NoError(res.err) + s.re.Equal(int64(100), res.result.physical) + s.re.Equal(int64(0), res.result.logical) + estimation := s.stream.EstimatedRPCLatency().Seconds() + s.re.Greater(estimation, 0.0) + s.re.InDelta(0.0, estimation, 0.01) + + // For each began request, record its startTime and send it to the result returning goroutine. + reqStartTimeCh := make(chan time.Time, maxPendingRequestsInTSOStream) + // Limit concurrent requests to be less than the capacity of tsoStream.pendingRequests. + tokenCh := make(chan struct{}, maxPendingRequestsInTSOStream-1) + for i := 0; i < 40; i++ { + tokenCh <- struct{}{} + } + // Return a result after 50ms delay for each requests + const delay = time.Millisecond * 50 + // The goroutine to delay and return the result. + go func() { + allocated := int64(1) + for reqStartTime := range reqStartTimeCh { + now := time.Now() + elapsed := now.Sub(reqStartTime) + if elapsed < delay { + time.Sleep(delay - elapsed) + } + s.inner.returnResult(100, allocated, 1) + allocated++ + } + }() + + // Limit the test time within 1s + startTime := time.Now() + resCh := make(chan (<-chan callbackInvocation), 100) + // The sending goroutine + go func() { + for time.Since(startTime) < time.Second { + <-tokenCh + reqStartTimeCh <- time.Now() + r := s.mustProcessRequestWithResultCh(1) + resCh <- r + } + close(reqStartTimeCh) + close(resCh) + }() + // Check the result + index := 0 + for r := range resCh { + // The first is 1 + index++ + res := s.getResult(r) + tokenCh <- struct{}{} + s.re.NoError(res.err) + s.re.Equal(int64(100), res.result.physical) + s.re.Equal(int64(index), res.result.logical) + } + + s.re.Greater(s.stream.EstimatedRPCLatency(), time.Duration(int64(0.9*float64(delay)))) + s.re.Less(s.stream.EstimatedRPCLatency(), time.Duration(math.Floor(1.1*float64(delay)))) +} + +func TestRCFilter(t *testing.T) { + re := require.New(t) + // Test basic calculation with frequency 1 + f := newRCFilter(1, 1) + now := time.Now() + // The first sample initializes the value. + re.Equal(10.0, f.update(now, 10)) + now = now.Add(time.Second) + expectedValue := 10 / (2*math.Pi + 1) + re.InEpsilon(expectedValue, f.update(now, 0), 1e-8) + expectedValue = expectedValue*(1/(2*math.Pi))/(1/(2*math.Pi)+2) + 100*2/(1/(2*math.Pi)+2) + now = now.Add(time.Second * 2) + re.InEpsilon(expectedValue, f.update(now, 100), 1e-8) + + // Test newSampleWeightUpperBound + f = newRCFilter(10, 0.5) + now = time.Now() + re.Equal(0.0, f.update(now, 0)) + now = now.Add(time.Second) + re.InEpsilon(1.0, f.update(now, 2), 1e-8) + now = now.Add(time.Second * 2) + re.InEpsilon(3.0, f.update(now, 5), 1e-8) + + // Test another cutoff frequency and weight upperbound. + f = newRCFilter(1/(2*math.Pi), 0.9) + now = time.Now() + re.Equal(1.0, f.update(now, 1)) + now = now.Add(time.Second) + re.InEpsilon(2.0, f.update(now, 3), 1e-8) + now = now.Add(time.Second * 2) + re.InEpsilon(6.0, f.update(now, 8), 1e-8) + now = now.Add(time.Minute) + re.InEpsilon(15.0, f.update(now, 16), 1e-8) + + // Test with dense samples + f = newRCFilter(1/(2*math.Pi), 0.9) + now = time.Now() + re.Equal(0.0, f.update(now, 0)) + lastOutput := 0.0 + // 10000 even samples in 1 second. + for i := 0; i < 10000; i++ { + now = now.Add(time.Microsecond * 100) + output := f.update(now, 1.0) + re.Greater(output, lastOutput) + re.Less(output, 1.0) + lastOutput = output + } + // Regarding the above samples as being close enough to a continuous function, the output after 1 second + // should be 1 - exp(-RC*t) = 1 - exp(-t). Here RC = 1/(2*pi*cutoff) = 1. + re.InDelta(0.63, lastOutput, 0.02) +} + func BenchmarkTSOStreamSendRecv(b *testing.B) { log.SetLevel(zapcore.FatalLevel) - streamInner := newMockTSOStreamImpl(context.Background(), true) + streamInner := newMockTSOStreamImpl(context.Background(), resultModeGenerated) stream := newTSOStream(context.Background(), mockStreamURL, streamInner) defer func() { streamInner.stop()