Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: refine history job code #35967

Merged
merged 4 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,15 +1145,16 @@ const MaxHistoryJobs = 10
// DefNumHistoryJobs is default value of the default number of history job
const DefNumHistoryJobs = 10

// GetHistoryDDLJobs returns the DDL history jobs and an error.
const batchNumHistoryJobs = 128

// GetLastNHistoryDDLJobs returns the DDL history jobs and an error.
// The maximum count of history jobs is num.
func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) {
t := meta.NewMeta(txn)
jobs, err := t.GetLastNHistoryDDLJobs(maxNumJobs)
func GetLastNHistoryDDLJobs(t *meta.Meta, maxNumJobs int) ([]*model.Job, error) {
iterator, err := t.GetLastHistoryDDLJobsIterator()
if err != nil {
return nil, errors.Trace(err)
}
return jobs, nil
return iterator.GetLastJobs(maxNumJobs, nil)
}

// IterHistoryDDLJobs iterates history DDL jobs until the `finishFn` return true or error.
Expand Down Expand Up @@ -1193,7 +1194,22 @@ func IterAllDDLJobs(txn kv.Transaction, finishFn func([]*model.Job) (bool, error

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
return m.GetAllHistoryDDLJobs()
iterator, err := m.GetLastHistoryDDLJobsIterator()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0, batchNumHistoryJobs)
for {
jobs, err := iterator.GetLastJobs(batchNumHistoryJobs, nil)
if err != nil {
return nil, errors.Trace(err)
}
if len(jobs) == 0 {
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
break
}
allJobs = append(allJobs, jobs...)
}
return allJobs, nil
}

// GetHistoryJobByID return history DDL job by ID.
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func TestGetHistoryDDLJobs(t *testing.T) {
err = AddHistoryDDLJob(m, jobs[i], true)
require.NoError(t, err)

historyJobs, err := GetHistoryDDLJobs(txn, DefNumHistoryJobs)
historyJobs, err := GetLastNHistoryDDLJobs(m, DefNumHistoryJobs)
require.NoError(t, err)

if i+1 > MaxHistoryJobs {
Expand All @@ -963,7 +963,7 @@ func TestGetHistoryDDLJobs(t *testing.T) {
}

delta := cnt - MaxHistoryJobs
historyJobs, err := GetHistoryDDLJobs(txn, DefNumHistoryJobs)
historyJobs, err := GetLastNHistoryDDLJobs(m, DefNumHistoryJobs)
require.NoError(t, err)
require.Len(t, historyJobs, MaxHistoryJobs)

Expand Down
5 changes: 3 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,12 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error {
if err != nil {
return err
}
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
m := meta.NewMeta(txn)
jobs, err := ddl.GetAllDDLJobs(m)
if err != nil {
return err
}
historyJobs, err := ddl.GetHistoryDDLJobs(txn, ddl.DefNumHistoryJobs)
historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, ddl.DefNumHistoryJobs)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5653,7 +5653,7 @@ func TestAdmin(t *testing.T) {
require.Equal(t, 12, row.Len())
txn, err = store.Begin()
require.NoError(t, err)
historyJobs, err := ddl.GetHistoryDDLJobs(txn, ddl.DefNumHistoryJobs)
historyJobs, err := ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), ddl.DefNumHistoryJobs)
require.Greater(t, len(historyJobs), 1)
require.Greater(t, len(row.GetString(1)), 0)
require.NoError(t, err)
Expand All @@ -5678,7 +5678,7 @@ func TestAdmin(t *testing.T) {
result.Check(testkit.Rows())
result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`)
result.Check(testkit.Rows())
historyJobs, err = ddl.GetHistoryDDLJobs(txn, ddl.DefNumHistoryJobs)
historyJobs, err = ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), ddl.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID))
result.Check(testkit.Rows(historyJobs[0].Query))
require.NoError(t, err)
Expand Down Expand Up @@ -5742,7 +5742,7 @@ func TestAdmin(t *testing.T) {
// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = store.Begin()
require.NoError(t, err)
historyJobs, err = ddl.GetHistoryDDLJobs(txn, 20)
historyJobs, err = ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), 20)
require.NoError(t, err)

// Split region for history ddl job queues.
Expand All @@ -5751,7 +5751,7 @@ func TestAdmin(t *testing.T) {
endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID)
cluster.SplitKeys(startKey, endKey, int(historyJobs[0].ID/5))

historyJobs2, err := ddl.GetHistoryDDLJobs(txn, 20)
historyJobs2, err := ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), 20)
require.NoError(t, err)
require.Equal(t, historyJobs2, historyJobs)
}
Expand Down
56 changes: 0 additions & 56 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1016,36 +1015,11 @@ func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) {
return job, errors.Trace(err)
}

// GetAllHistoryDDLJobs gets all history DDL jobs.
func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
pairs, err := m.txn.HGetAll(mDDLJobHistoryKey)
if err != nil {
return nil, errors.Trace(err)
}
jobs, err := decodeJob(pairs)
if err != nil {
return nil, errors.Trace(err)
}
// sort job.
sorter := &jobsSorter{jobs: jobs}
sort.Sort(sorter)
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
return jobs, nil
}

// GetHistoryDDLCount the count of all history DDL jobs.
func (m *Meta) GetHistoryDDLCount() (uint64, error) {
return m.txn.HGetLen(mDDLJobHistoryKey)
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
return decodeJob(pairs)
}

// LastJobIterator is the iterator for gets latest history.
type LastJobIterator interface {
GetLastJobs(num int, jobs []*model.Job) ([]*model.Job, error)
Expand Down Expand Up @@ -1089,36 +1063,6 @@ func (i *HLastJobIterator) GetLastJobs(num int, jobs []*model.Job) ([]*model.Job
return jobs, nil
}

func decodeJob(jobPairs []structure.HashPair) ([]*model.Job, error) {
jobs := make([]*model.Job, 0, len(jobPairs))
for _, pair := range jobPairs {
job := &model.Job{}
err := job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}
return jobs, nil
}

// jobsSorter implements the sort.Interface interface.
type jobsSorter struct {
jobs []*model.Job
}

func (s *jobsSorter) Swap(i, j int) {
s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
}

func (s *jobsSorter) Len() int {
return len(s.jobs)
}

func (s *jobsSorter) Less(i, j int) bool {
return s.jobs[i].ID < s.jobs[j].ID
}

// GetBootstrapVersion returns the version of the server which bootstrap the store.
// If the store is not bootstraped, the version will be zero.
func (m *Meta) GetBootstrapVersion() (int64, error) {
Expand Down
5 changes: 3 additions & 2 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -564,7 +565,7 @@ func TestDDL(t *testing.T) {
historyJob2.Args = append(job.Args, arg)
err = m.AddHistoryDDLJob(historyJob2, false)
require.NoError(t, err)
all, err := m.GetAllHistoryDDLJobs()
all, err := ddl.GetAllHistoryDDLJobs(m)
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
require.NoError(t, err)
var lastID int64
for _, job := range all {
Expand All @@ -581,7 +582,7 @@ func TestDDL(t *testing.T) {
}

// Test for get last N history ddl jobs.
historyJobs, err := m.GetLastNHistoryDDLJobs(2)
historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, 2)
require.NoError(t, err)
require.Len(t, historyJobs, 2)
require.Equal(t, int64(1234), historyJobs[0].ID)
Expand Down