Skip to content

Commit

Permalink
refine history job code
Browse files Browse the repository at this point in the history
  • Loading branch information
xiongjiwei committed Jul 5, 2022
1 parent 114c922 commit 31acd05
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 72 deletions.
28 changes: 22 additions & 6 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,15 +1145,16 @@ const MaxHistoryJobs = 10
// DefNumHistoryJobs is default value of the default number of history job
const DefNumHistoryJobs = 10

// GetHistoryDDLJobs returns the DDL history jobs and an error.
const batchNumHistoryJobs = 128

// GetLastNHistoryDDLJobs returns the DDL history jobs and an error.
// The maximum count of history jobs is num.
func GetHistoryDDLJobs(txn kv.Transaction, maxNumJobs int) ([]*model.Job, error) {
t := meta.NewMeta(txn)
jobs, err := t.GetLastNHistoryDDLJobs(maxNumJobs)
func GetLastNHistoryDDLJobs(t *meta.Meta, maxNumJobs int) ([]*model.Job, error) {
iterator, err := t.GetLastHistoryDDLJobsIterator()
if err != nil {
return nil, errors.Trace(err)
}
return jobs, nil
return iterator.GetLastJobs(maxNumJobs, nil)
}

// IterHistoryDDLJobs iterates history DDL jobs until the `finishFn` return true or error.
Expand Down Expand Up @@ -1193,7 +1194,22 @@ func IterAllDDLJobs(txn kv.Transaction, finishFn func([]*model.Job) (bool, error

// GetAllHistoryDDLJobs get all the done DDL jobs.
func GetAllHistoryDDLJobs(m *meta.Meta) ([]*model.Job, error) {
return m.GetAllHistoryDDLJobs()
iterator, err := m.GetLastHistoryDDLJobsIterator()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0, batchNumHistoryJobs)
for {
jobs, err := iterator.GetLastJobs(batchNumHistoryJobs, nil)
if err != nil {
return nil, errors.Trace(err)
}
if len(jobs) == 0 {
break
}
allJobs = append(allJobs, jobs...)
}
return allJobs, nil
}

// GetHistoryJobByID return history DDL job by ID.
Expand Down
4 changes: 2 additions & 2 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func TestGetHistoryDDLJobs(t *testing.T) {
err = AddHistoryDDLJob(m, jobs[i], true)
require.NoError(t, err)

historyJobs, err := GetHistoryDDLJobs(txn, DefNumHistoryJobs)
historyJobs, err := GetLastNHistoryDDLJobs(m, DefNumHistoryJobs)
require.NoError(t, err)

if i+1 > MaxHistoryJobs {
Expand All @@ -963,7 +963,7 @@ func TestGetHistoryDDLJobs(t *testing.T) {
}

delta := cnt - MaxHistoryJobs
historyJobs, err := GetHistoryDDLJobs(txn, DefNumHistoryJobs)
historyJobs, err := GetLastNHistoryDDLJobs(m, DefNumHistoryJobs)
require.NoError(t, err)
require.Len(t, historyJobs, MaxHistoryJobs)

Expand Down
5 changes: 3 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,11 +597,12 @@ func (e *ShowDDLJobQueriesExec) Open(ctx context.Context) error {
if err != nil {
return err
}
jobs, err := ddl.GetAllDDLJobs(meta.NewMeta(txn))
m := meta.NewMeta(txn)
jobs, err := ddl.GetAllDDLJobs(m)
if err != nil {
return err
}
historyJobs, err := ddl.GetHistoryDDLJobs(txn, ddl.DefNumHistoryJobs)
historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, ddl.DefNumHistoryJobs)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5653,7 +5653,7 @@ func TestAdmin(t *testing.T) {
require.Equal(t, 12, row.Len())
txn, err = store.Begin()
require.NoError(t, err)
historyJobs, err := ddl.GetHistoryDDLJobs(txn, ddl.DefNumHistoryJobs)
historyJobs, err := ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), ddl.DefNumHistoryJobs)
require.Greater(t, len(historyJobs), 1)
require.Greater(t, len(row.GetString(1)), 0)
require.NoError(t, err)
Expand All @@ -5678,7 +5678,7 @@ func TestAdmin(t *testing.T) {
result.Check(testkit.Rows())
result = tk.MustQuery(`admin show ddl job queries 1, 2, 3, 4`)
result.Check(testkit.Rows())
historyJobs, err = ddl.GetHistoryDDLJobs(txn, ddl.DefNumHistoryJobs)
historyJobs, err = ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), ddl.DefNumHistoryJobs)
result = tk.MustQuery(fmt.Sprintf("admin show ddl job queries %d", historyJobs[0].ID))
result.Check(testkit.Rows(historyJobs[0].Query))
require.NoError(t, err)
Expand Down Expand Up @@ -5742,7 +5742,7 @@ func TestAdmin(t *testing.T) {
// Test for reverse scan get history ddl jobs when ddl history jobs queue has multiple regions.
txn, err = store.Begin()
require.NoError(t, err)
historyJobs, err = ddl.GetHistoryDDLJobs(txn, 20)
historyJobs, err = ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), 20)
require.NoError(t, err)

// Split region for history ddl job queues.
Expand All @@ -5751,7 +5751,7 @@ func TestAdmin(t *testing.T) {
endKey := meta.DDLJobHistoryKey(m, historyJobs[0].ID)
cluster.SplitKeys(startKey, endKey, int(historyJobs[0].ID/5))

historyJobs2, err := ddl.GetHistoryDDLJobs(txn, 20)
historyJobs2, err := ddl.GetLastNHistoryDDLJobs(meta.NewMeta(txn), 20)
require.NoError(t, err)
require.Equal(t, historyJobs2, historyJobs)
}
Expand Down
56 changes: 0 additions & 56 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/json"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -1016,36 +1015,11 @@ func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) {
return job, errors.Trace(err)
}

// GetAllHistoryDDLJobs gets all history DDL jobs.
func (m *Meta) GetAllHistoryDDLJobs() ([]*model.Job, error) {
pairs, err := m.txn.HGetAll(mDDLJobHistoryKey)
if err != nil {
return nil, errors.Trace(err)
}
jobs, err := decodeJob(pairs)
if err != nil {
return nil, errors.Trace(err)
}
// sort job.
sorter := &jobsSorter{jobs: jobs}
sort.Sort(sorter)
return jobs, nil
}

// GetHistoryDDLCount the count of all history DDL jobs.
func (m *Meta) GetHistoryDDLCount() (uint64, error) {
return m.txn.HGetLen(mDDLJobHistoryKey)
}

// GetLastNHistoryDDLJobs gets latest N history ddl jobs.
func (m *Meta) GetLastNHistoryDDLJobs(num int) ([]*model.Job, error) {
pairs, err := m.txn.HGetLastN(mDDLJobHistoryKey, num)
if err != nil {
return nil, errors.Trace(err)
}
return decodeJob(pairs)
}

// LastJobIterator is the iterator for gets latest history.
type LastJobIterator interface {
GetLastJobs(num int, jobs []*model.Job) ([]*model.Job, error)
Expand Down Expand Up @@ -1089,36 +1063,6 @@ func (i *HLastJobIterator) GetLastJobs(num int, jobs []*model.Job) ([]*model.Job
return jobs, nil
}

func decodeJob(jobPairs []structure.HashPair) ([]*model.Job, error) {
jobs := make([]*model.Job, 0, len(jobPairs))
for _, pair := range jobPairs {
job := &model.Job{}
err := job.Decode(pair.Value)
if err != nil {
return nil, errors.Trace(err)
}
jobs = append(jobs, job)
}
return jobs, nil
}

// jobsSorter implements the sort.Interface interface.
type jobsSorter struct {
jobs []*model.Job
}

func (s *jobsSorter) Swap(i, j int) {
s.jobs[i], s.jobs[j] = s.jobs[j], s.jobs[i]
}

func (s *jobsSorter) Len() int {
return len(s.jobs)
}

func (s *jobsSorter) Less(i, j int) bool {
return s.jobs[i].ID < s.jobs[j].ID
}

// GetBootstrapVersion returns the version of the server which bootstrap the store.
// If the store is not bootstraped, the version will be zero.
func (m *Meta) GetBootstrapVersion() (int64, error) {
Expand Down
5 changes: 3 additions & 2 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -564,7 +565,7 @@ func TestDDL(t *testing.T) {
historyJob2.Args = append(job.Args, arg)
err = m.AddHistoryDDLJob(historyJob2, false)
require.NoError(t, err)
all, err := m.GetAllHistoryDDLJobs()
all, err := ddl.GetAllHistoryDDLJobs(m)
require.NoError(t, err)
var lastID int64
for _, job := range all {
Expand All @@ -581,7 +582,7 @@ func TestDDL(t *testing.T) {
}

// Test for get last N history ddl jobs.
historyJobs, err := m.GetLastNHistoryDDLJobs(2)
historyJobs, err := ddl.GetLastNHistoryDDLJobs(m, 2)
require.NoError(t, err)
require.Len(t, historyJobs, 2)
require.Equal(t, int64(1234), historyJobs[0].ID)
Expand Down

0 comments on commit 31acd05

Please sign in to comment.