Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

check cluster id and fix data inconsistency #1475

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2309,6 +2309,14 @@ func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.Che
if err != nil {
return errors.Trace(err)
}
clusterId, err := local.g.GetSQLExecutor().ObtainStringWithLog(
ctx,
"select substring(type,8) from METRICS_SCHEMA.PD_CLUSTER_METADATA limit 1;",
"check TiDB Cluster ID",
log.L())
if err != nil {
return errors.Trace(err)
}
if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil {
return err
}
Expand All @@ -2318,6 +2326,9 @@ func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.Che
if err := tikv.CheckTiKVVersion(ctx, local.tls, local.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil {
return err
}
if err := tikv.CheckTiDBDestination(ctx, local.tls, local.pdAddr, clusterId); err != nil {
return err
}

tidbVersion, _ := version.ExtractTiDBVersion(versionStr)
return checkTiFlashVersion(ctx, local.g, checkCtx, *tidbVersion)
Expand Down
12 changes: 12 additions & 0 deletions pkg/lightning/tikv/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,18 @@ func CheckPDVersion(ctx context.Context, tls *common.TLS, pdAddr string, require
return version.CheckVersion("PD", *ver, requiredMinVersion, requiredMaxVersion)
}

func CheckTiDBDestination(ctx context.Context, tls *common.TLS, pdAddr string, clusterId string) error {
id, err := pdutil.FetchClusterID(ctx, tls, pdAddr)
if err != nil {
return errors.Trace(err)
}

if id != clusterId {
return errors.Errorf("Failed to match the cluster ID, Please check whether status-port is correct")
}
return nil
}

func CheckTiKVVersion(ctx context.Context, tls *common.TLS, pdAddr string, requiredMinVersion, requiredMaxVersion semver.Version) error {
return ForAllStores(
ctx,
Expand Down
19 changes: 19 additions & 0 deletions pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"math"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -710,3 +711,21 @@ func FetchPDVersion(ctx context.Context, tls *common.TLS, pdAddr string) (*semve

return parseVersion([]byte(rawVersion.Version)), nil
}

// FetchClusterID get Cluster ID
func FetchClusterID(ctx context.Context, tls *common.TLS, pdAddr string) (string, error) {
// An example of PD Cluster ID API.
// curl http://pd_address/pd/api/v1/cluster
// {
// "id": 7125154571691814555
// }
var rawClusterID struct {
Id int `json:"id"`
}
err := tls.WithHost(pdAddr).GetJSON(ctx, "/pd/api/v1/cluster", &rawClusterID)
if err != nil {
return strconv.Itoa(rawClusterID.Id), errors.Trace(err)
}

return strings.TrimSpace(strconv.Itoa(rawClusterID.Id)), nil
}