Skip to content

Commit

Permalink
client: Make tsoStream receives asynchronously (#8483)
Browse files Browse the repository at this point in the history
ref #8432

client: Make tsoStream receives asynchronously. This makes it possible to allow the tsoDispatcher send multiple requests and wait for their responses concurrently.

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
MyonKeminta and ti-chi-bot[bot] committed Sep 14, 2024
1 parent 098b802 commit 71f6f96
Show file tree
Hide file tree
Showing 9 changed files with 1,084 additions and 150 deletions.
24 changes: 17 additions & 7 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ func initAndRegisterMetrics(constLabels prometheus.Labels) {
}

var (
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
cmdDuration *prometheus.HistogramVec
cmdFailedDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
tsoBestBatchSize prometheus.Histogram
tsoBatchSize prometheus.Histogram
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -117,6 +118,15 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "The status to indicate if the request is forwarded",
ConstLabels: constLabels,
}, []string{"host", "delegate"})

ongoingRequestCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "ongoing_requests_count",
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down
93 changes: 55 additions & 38 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,85 @@ import (
"runtime/trace"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/zap"
)

type tsoBatchController struct {
maxBatchSize int
// bestBatchSize is a dynamic size that changed based on the current batch effect.
bestBatchSize int

tsoRequestCh chan *tsoRequest
collectedRequests []*tsoRequest
collectedRequestCount int

batchStartTime time.Time
// The time after getting the first request and the token, and before performing extra batching.
extraBatchingStartTime time.Time
}

func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController {
func newTSOBatchController(maxBatchSize int) *tsoBatchController {
return &tsoBatchController{
maxBatchSize: maxBatchSize,
bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */
tsoRequestCh: tsoRequestCh,
collectedRequests: make([]*tsoRequest, maxBatchSize+1),
collectedRequestCount: 0,
}
}

// fetchPendingRequests will start a new round of the batch collecting from the channel.
// It returns true if everything goes well, otherwise false which means we should stop the service.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, maxBatchWaitInterval time.Duration) error {
var firstRequest *tsoRequest
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest = <-tbc.tsoRequestCh:
}
// Start to batch when the first TSO request arrives.
tbc.batchStartTime = time.Now()
// It returns nil error if everything goes well, otherwise a non-nil error which means we should stop the service.
// It's guaranteed that if this function failed after collecting some requests, then these requests will be cancelled
// when the function returns, so the caller don't need to clear them manually.
func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequestCh <-chan *tsoRequest, tokenCh chan struct{}, maxBatchWaitInterval time.Duration) (errRet error) {
var tokenAcquired bool
defer func() {
if errRet != nil {
// Something went wrong when collecting a batch of requests. Release the token and cancel collected requests
// if any.
if tokenAcquired {
tokenCh <- struct{}{}
}
tbc.finishCollectedRequests(0, 0, 0, invalidStreamID, errRet)
}
}()

// Wait until BOTH the first request and the token have arrived.
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
tbc.collectedRequestCount = 0
tbc.pushRequest(firstRequest)
for {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
// Start to batch when the first TSO request arrives.
tbc.pushRequest(req)
// A request arrives but the token is not ready yet. Continue waiting, and also allowing collecting the next
// request if it arrives.
continue
case <-tokenCh:
tokenAcquired = true
}

// The token is ready. If the first request didn't arrive, wait for it.
if tbc.collectedRequestCount == 0 {
select {
case <-ctx.Done():
return ctx.Err()
case firstRequest := <-tsoRequestCh:
tbc.pushRequest(firstRequest)
}
}

// Both token and the first request have arrived.
break
}

tbc.extraBatchingStartTime = time.Now()

// This loop is for trying best to collect more requests, so we use `tbc.maxBatchSize` here.
fetchPendingRequestsLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -88,7 +120,7 @@ fetchPendingRequestsLoop:
defer after.Stop()
for tbc.collectedRequestCount < tbc.bestBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand All @@ -103,7 +135,7 @@ fetchPendingRequestsLoop:
// we can adjust the `tbc.bestBatchSize` dynamically later.
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case tsoReq := <-tbc.tsoRequestCh:
case tsoReq := <-tsoRequestCh:
tbc.pushRequest(tsoReq)
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -136,31 +168,16 @@ func (tbc *tsoBatchController) adjustBestBatchSize() {
}
}

func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, err error) {
func (tbc *tsoBatchController) finishCollectedRequests(physical, firstLogical int64, suffixBits uint32, streamID string, err error) {
for i := 0; i < tbc.collectedRequestCount; i++ {
tsoReq := tbc.collectedRequests[i]
// Retrieve the request context before the request is done to trace without race.
requestCtx := tsoReq.requestCtx
tsoReq.physical, tsoReq.logical = physical, tsoutil.AddLogical(firstLogical, int64(i), suffixBits)
tsoReq.streamID = streamID
tsoReq.tryDone(err)
trace.StartRegion(requestCtx, "pdclient.tsoReqDequeue").End()
}
// Prevent the finished requests from being processed again.
tbc.collectedRequestCount = 0
}

func (tbc *tsoBatchController) revokePendingRequests(err error) {
for i := 0; i < len(tbc.tsoRequestCh); i++ {
req := <-tbc.tsoRequestCh
req.tryDone(err)
}
}

func (tbc *tsoBatchController) clear() {
log.Info("[pd] clear the tso batch controller",
zap.Int("max-batch-size", tbc.maxBatchSize), zap.Int("best-batch-size", tbc.bestBatchSize),
zap.Int("collected-request-count", tbc.collectedRequestCount), zap.Int("pending-request-count", len(tbc.tsoRequestCh)))
tsoErr := errors.WithStack(errClosing)
tbc.finishCollectedRequests(0, 0, 0, tsoErr)
tbc.revokePendingRequests(tsoErr)
}
1 change: 1 addition & 0 deletions client/tso_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func (c *tsoClient) getTSORequest(ctx context.Context, dcLocation string) *tsoRe
req.physical = 0
req.logical = 0
req.dcLocation = dcLocation
req.streamID = ""
return req
}

Expand Down
Loading

0 comments on commit 71f6f96

Please sign in to comment.