Skip to content

Commit

Permalink
txn: introduce pipelined DML (#50389)
Browse files Browse the repository at this point in the history
ref #50215
  • Loading branch information
you06 committed Mar 8, 2024
1 parent 611dc83 commit cbe52e6
Show file tree
Hide file tree
Showing 21 changed files with 1,107 additions and 25 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7054,13 +7054,13 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "124b63a9e7a80fccf7f618cda8f376c2f07ae67c198cd8ee9af5c0dba840b1b4",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240305085506-3625fc04d0bd",
sha256 = "7b6accba538659d8c03705405b7b3249712751028586f679cc45dfdc87a54898",
strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20240308052415-af4f9a9b6e41",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240305085506-3625fc04d0bd.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240305085506-3625fc04d0bd.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240305085506-3625fc04d0bd.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240305085506-3625fc04d0bd.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240308052415-af4f9a9b6e41.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240308052415-af4f9a9b6e41.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240308052415-af4f9a9b6e41.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240308052415-af4f9a9b6e41.zip",
],
)
go_repository(
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/kv/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ func (*MemBuf) Staging() kv.StagingHandle {
// If the changes are not published by `Release`, they will be discarded.
func (*MemBuf) Cleanup(_ kv.StagingHandle) {}

// MayFlush implements the kv.MemBuffer interface.
func (*MemBuf) MayFlush() error {
return nil
}

// Size returns sum of keys and values length.
func (mb *MemBuf) Size() int {
return mb.size
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tidwall/btree v1.7.0
github.com/tikv/client-go/v2 v2.0.8-0.20240305085506-3625fc04d0bd
github.com/tikv/client-go/v2 v2.0.8-0.20240308052415-af4f9a9b6e41
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a
github.com/twmb/murmur3 v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -866,8 +866,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tikv/client-go/v2 v2.0.8-0.20240305085506-3625fc04d0bd h1:hB/ZHFHFtE5RlBkDLKmAzYiRKSSWI0vNuqHJEeufPtQ=
github.com/tikv/client-go/v2 v2.0.8-0.20240305085506-3625fc04d0bd/go.mod h1:a7O+2b8r8V5fyR+eNzUIsCDkX8YC3clhNUpsI1kYrKM=
github.com/tikv/client-go/v2 v2.0.8-0.20240308052415-af4f9a9b6e41 h1:ZFmKEJLxY2rOzX6zCGydAMxIsGqjtcmim7vzHROxg38=
github.com/tikv/client-go/v2 v2.0.8-0.20240308052415-af4f9a9b6e41/go.mod h1:a7O+2b8r8V5fyR+eNzUIsCDkX8YC3clhNUpsI1kYrKM=
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e h1:kHXMmskVCNyH53u43I73Y5cmZ6yqqder/jGOiI7ylxs=
github.com/tikv/pd/client v0.0.0-20240229065730-92a31c12238e/go.mod h1:Z/QAgOt29zvwBTd0H6pdx45VO6KRNc/O/DzGkVmSyZg=
github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo=
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (t *mockTxn) RetryFairLocking(_ context.Context) error { return nil }
func (t *mockTxn) CancelFairLocking(_ context.Context) error { return nil }
func (t *mockTxn) DoneFairLocking(_ context.Context) error { return nil }
func (t *mockTxn) IsInFairLockingMode() bool { return false }
func (t *mockTxn) IsPipelined() bool { return false }

// newMockTxn new a mockTxn.
func newMockTxn() Transaction {
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ type MemBuffer interface {

// RemoveFromBuffer removes the entry from the buffer. It's used for testing.
RemoveFromBuffer(Key)

// MayFlush will be called in pipelined txn
MayFlush() error
}

// FindKeysInStage returns all keys in the given stage that satisfies the given condition.
Expand Down Expand Up @@ -276,6 +279,8 @@ type Transaction interface {

// UpdateMemBufferFlags updates the flags of a node in the mem buffer.
UpdateMemBufferFlags(key []byte, flags ...FlagsOp)
// IsPipelined returns whether the transaction is used for pipelined DML.
IsPipelined() bool
}

// AssertionProto is an interface defined for the assertion protocol.
Expand Down
294 changes: 294 additions & 0 deletions pkg/metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -7318,6 +7318,300 @@
"align": false,
"alignLevel": null
}
},
{
"type": "heatmap",
"title": "Pipelined Flush Keys",
"gridPos": {
"x": 16,
"y": 74,
"w": 8,
"h": 7
},
"id": 333,
"targets": [
{
"expr": "sum(delta(tidb_tikvclient_pipelined_flush_len_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le)",
"format": "heatmap",
"intervalFactor": 2,
"legendFormat": "{{le}}",
"metric": "tidb_tikvclient_pipelined_flush_len_bucket",
"refId": "A",
"step": 4
}
],
"fieldConfig": {
"defaults": {},
"overrides": []
},
"legend": {
"show": false
},
"tooltip": {
"show": true,
"showHistogram": false
},
"yBucketBound": "upper",
"datasource": "${DS_TEST-CLUSTER}",
"heatmap": {},
"cards": {
"cardPadding": null,
"cardRound": null
},
"color": {
"cardColor": "#b4ff00",
"colorScale": "sqrt",
"colorScheme": "interpolateSpectral",
"exponent": 0.5,
"mode": "spectrum"
},
"dataFormat": "tsbuckets",
"xAxis": {
"show": true
},
"yAxis": {
"decimals": 0,
"format": "short",
"logBase": 1,
"max": null,
"min": null,
"show": true,
"splitFactor": null
},
"highlightCards": true,
"hideZeroBuckets": true,
"description": "The keys of pipelined flush.",
"links": [],
"reverseYBuckets": false,
"timeFrom": null,
"timeShift": null,
"xBucketNumber": null,
"xBucketSize": null,
"yBucketNumber": null,
"yBucketSize": null
},
{
"type": "heatmap",
"title": "Pipelined Flush Size",
"gridPos": {
"x": 0,
"y": 82,
"w": 8,
"h": 7
},
"id": 334,
"targets": [
{
"expr": "sum(delta(tidb_tikvclient_pipelined_flush_size_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le)",
"format": "heatmap",
"intervalFactor": 2,
"legendFormat": "{{le}}",
"metric": "tidb_tikvclient_pipelined_flush_size_bucket",
"refId": "A",
"step": 4
}
],
"fieldConfig": {
"defaults": {},
"overrides": []
},
"legend": {
"show": false
},
"tooltip": {
"show": true,
"showHistogram": false
},
"yBucketBound": "upper",
"datasource": "${DS_TEST-CLUSTER}",
"heatmap": {},
"cards": {
"cardPadding": null,
"cardRound": null
},
"color": {
"cardColor": "#b4ff00",
"colorScale": "sqrt",
"colorScheme": "interpolateSpectral",
"exponent": 0.5,
"mode": "spectrum"
},
"dataFormat": "tsbuckets",
"xAxis": {
"show": true
},
"yAxis": {
"decimals": 0,
"format": "decbytes",
"logBase": 1,
"max": null,
"min": null,
"show": true,
"splitFactor": null
},
"highlightCards": true,
"hideZeroBuckets": true,
"description": "The size of pipelined flush.",
"links": [],
"reverseYBuckets": false,
"timeFrom": null,
"timeShift": null,
"xBucketNumber": null,
"xBucketSize": null,
"yBucketNumber": null,
"yBucketSize": null
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The flush duration of each pipelined batch.",
"editable": true,
"error": false,
"fieldConfig": {
"defaults": {},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"grid": {},
"gridPos": {
"x": 8,
"y": 82,
"w": 8,
"h": 7
},
"hiddenSeries": false,
"id": 335,
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": true,
"min": false,
"rightSide": true,
"show": true,
"sort": "max",
"sortDesc": true,
"total": false,
"values": true
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null as zero",
"options": {
"alertThreshold": true
},
"percentage": false,
"pluginVersion": "7.5.11",
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"exemplar": true,
"expr": "histogram_quantile(0.999, sum(rate(tidb_tikvclient_pipelined_flush_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "999",
"refId": "A",
"step": 40
},
{
"exemplar": true,
"expr": "histogram_quantile(0.99, sum(rate(tidb_tikvclient_pipelined_flush_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "99",
"refId": "B",
"step": 40
},
{
"exemplar": true,
"expr": "histogram_quantile(0.95, sum(rate(tidb_tikvclient_pipelined_flush_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "95",
"refId": "C",
"step": 40
},
{
"exemplar": true,
"expr": "histogram_quantile(0.8, sum(rate(tidb_tikvclient_pipelined_flush_duration_bucket{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m])) by (le))",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "80",
"refId": "D",
"step": 40
},
{
"exemplar": true,
"expr": "sum(rate(tidb_tikvclient_pipelined_flush_duration_sum{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))/ sum(rate(tidb_tikvclient_pipelined_flush_duration_count{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", instance=~\"$instance\"}[1m]))",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "avg",
"refId": "E",
"step": 40
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Pipelined Flush Duration",
"tooltip": {
"msResolution": false,
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": null,
"format": "s",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
}
],
"repeat": null,
Expand Down
Loading

0 comments on commit cbe52e6

Please sign in to comment.