Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc, cmd: allow customized changefeed id #727

Merged
merged 7 commits into from
Jul 13, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.etcd.io/etcd/clientv3/concurrency"
)
Expand Down Expand Up @@ -132,7 +131,7 @@ func (s *Server) handleRebalanceTrigger(w http.ResponseWriter, req *http.Request
return
}
changefeedID := req.Form.Get(APIOpVarChangefeedID)
if !util.IsValidUUIDv4(changefeedID) {
if err := model.ValidateChangefeedID(changefeedID); err != nil {
writeError(w, http.StatusBadRequest, errors.Errorf("invalid changefeed id: %s", changefeedID))
return
}
Expand All @@ -156,12 +155,12 @@ func (s *Server) handleMoveTable(w http.ResponseWriter, req *http.Request) {
return
}
changefeedID := req.Form.Get(APIOpVarChangefeedID)
if !util.IsValidUUIDv4(changefeedID) {
if err := model.ValidateChangefeedID(changefeedID); err != nil {
writeError(w, http.StatusBadRequest, errors.Errorf("invalid changefeed id: %s", changefeedID))
return
}
to := req.Form.Get(APIOpVarTargetCaptureID)
if !util.IsValidUUIDv4(to) {
if err := model.ValidateChangefeedID(to); err != nil {
writeError(w, http.StatusBadRequest, errors.Errorf("invalid target capture id: %s", to))
return
}
Expand Down Expand Up @@ -190,7 +189,7 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
return
}
changefeedID := req.Form.Get(APIOpVarChangefeedID)
if !util.IsValidUUIDv4(changefeedID) {
if err := model.ValidateChangefeedID(changefeedID); err != nil {
writeError(w, http.StatusBadRequest, errors.Errorf("invalid changefeed id: %s", changefeedID))
return
}
Expand Down
24 changes: 24 additions & 0 deletions cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/embed"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -197,6 +198,29 @@ func (c CDCEtcdClient) GetCaptures(ctx context.Context) (int64, []*model.Capture
return revision, infos, nil
}

// CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists.
func (c CDCEtcdClient) CreateChangefeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error {
if err := model.ValidateChangefeedID(changeFeedID); err != nil {
return err
}
key := GetEtcdKeyChangeFeedInfo(changeFeedID)
value, err := info.Marshal()
if err != nil {
return errors.Trace(err)
}
resp, err := c.Client.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", 0),
).Then(
clientv3.OpPut(key, value),
).Commit()
if !resp.Succeeded {
zier-one marked this conversation as resolved.
Show resolved Hide resolved
log.Warn("changefeed already exists, ignore create changefeed",
zap.String("changefeedid", changeFeedID))
return errors.Annotatef(model.ErrChangeFeedAlreadtExists, "key: %s", key)
}
return errors.Trace(err)
}

// SaveChangeFeedInfo stores change feed info into etcd
// TODO: this should be called from outer system, such as from a TiDB client
func (c CDCEtcdClient) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID string) error {
Expand Down
16 changes: 16 additions & 0 deletions cdc/kv/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,3 +338,19 @@ func (s *etcdSuite) TestGetAllTaskWorkload(c *check.C) {
c.Assert(workloads, check.DeepEquals, expected[i])
}
}

func (s *etcdSuite) TestCreateChangefeed(c *check.C) {
ctx := context.Background()
detail := &model.ChangeFeedInfo{
SinkURI: "root@tcp(127.0.0.1:3306)/mysql",
}

err := s.client.CreateChangefeedInfo(ctx, detail, "bad.id👻")
c.Assert(err, check.ErrorMatches, "bad changefeed id.*")

err = s.client.CreateChangefeedInfo(ctx, detail, "test-id")
c.Assert(err, check.IsNil)

err = s.client.CreateChangefeedInfo(ctx, detail, "test-id")
c.Assert(errors.Cause(err), check.Equals, model.ErrChangeFeedAlreadtExists)
}
12 changes: 12 additions & 0 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package model
import (
"encoding/json"
"math"
"regexp"
"sort"
"time"

Expand Down Expand Up @@ -78,6 +79,17 @@ type ChangeFeedInfo struct {
Error *RunningError `json:"error"`
}

var changeFeedIDRe *regexp.Regexp = regexp.MustCompile(`^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`)

// ValidateChangefeedID returns true if the changefeed ID matches
// the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", eg, "simple-changefeed-task".
func ValidateChangefeedID(changefeedID string) error {
if !changeFeedIDRe.MatchString(changefeedID) {
return errors.Errorf(`bad changefeed id, please match the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", eg, "simple-changefeed-task"`)
overvenus marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

// GetStartTs returns StartTs if it's specified or using the CreateTime of changefeed.
func (info *ChangeFeedInfo) GetStartTs() uint64 {
if info.StartTs > 0 {
Expand Down
19 changes: 10 additions & 9 deletions cdc/model/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
// common errors
// use a language builtin error type without error trace stack
var (
ErrWriteTsConflict = errors.New("write ts conflict")
ErrChangeFeedNotExists = errors.New("changefeed not exists")
ErrTaskStatusNotExists = errors.New("task status not exists")
ErrTaskPositionNotExists = errors.New("task position not exists")
ErrDecodeFailed = errors.New("decode failed")
ErrAdminStopProcessor = errors.New("stop processor by admin command")
ErrExecDDLFailed = errors.New("exec DDL failed")
ErrCaptureNotExist = errors.New("capture not exists")
ErrUnresolved = errors.New("unresolved")
ErrWriteTsConflict = errors.New("write ts conflict")
ErrChangeFeedNotExists = errors.New("changefeed not exists")
ErrChangeFeedAlreadtExists = errors.New("changefeed already exists")
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
ErrTaskStatusNotExists = errors.New("task status not exists")
ErrTaskPositionNotExists = errors.New("task position not exists")
ErrDecodeFailed = errors.New("decode failed")
ErrAdminStopProcessor = errors.New("stop processor by admin command")
ErrExecDDLFailed = errors.New("exec DDL failed")
ErrCaptureNotExist = errors.New("capture not exists")
ErrUnresolved = errors.New("unresolved")
)

// RunningError represents some running error from cdc components, such as processor.
Expand Down
8 changes: 6 additions & 2 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ func newCreateChangefeedCommand() *cobra.Command {
Long: ``,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := defaultContext
id := uuid.New().String()
id := changefeedID
if id == "" {
id = uuid.New().String()
}

info, err := verifyChangefeedParamers(ctx, cmd, true /* isCreate */)
if err != nil {
Expand All @@ -312,7 +315,7 @@ func newCreateChangefeedCommand() *cobra.Command {
if err != nil {
return err
}
err = cdcEtcdCli.SaveChangeFeedInfo(ctx, info, id)
err = cdcEtcdCli.CreateChangefeedInfo(ctx, info, id)
if err != nil {
return err
}
Expand All @@ -322,6 +325,7 @@ func newCreateChangefeedCommand() *cobra.Command {
}
changefeedConfigVariables(command)
command.PersistentFlags().BoolVar(&noConfirm, "no-confirm", false, "Don't ask user whether to ignore ineligible table")
command.PersistentFlags().StringVarP(&changefeedID, "changefeed-id", "c", "", "Replication task (changefeed) ID")

return command
}
Expand Down
9 changes: 0 additions & 9 deletions pkg/util/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,3 @@ func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) er
}
return nil
}

// IsValidUUIDv4 returns true if the uuid is a valid uuid
func IsValidUUIDv4(uuid string) bool {
if len(uuid) != 36 {
return false
}
match, _ := regexp.Match("[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}", []byte(uuid))
return match
}
6 changes: 0 additions & 6 deletions pkg/util/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,3 @@ func (s *checkSuite) TestCompareVersion(c *check.C) {
c.Assert(semver.New(removeVAndHash("v2.1.0-rc.1-7-g38c939f-dirty")).
Compare(*semver.New("2.1.0-rc.1")), check.Equals, 0)
}

func (s *checkSuite) TestIsValidUUIDv4(c *check.C) {
c.Assert(IsValidUUIDv4(""), check.IsFalse)
c.Assert(IsValidUUIDv4("697b2430-8c80-46ca-9b61-553f0173a214"), check.IsTrue)
c.Assert(IsValidUUIDv4("697b2430-8c80-46ca-9b61-553f011173a214"), check.IsFalse)
}
17 changes: 13 additions & 4 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ SINK_TYPE=$1
function check_changefeed_state() {
changefeedid=$1
expected=$2
state=$(cdc cli changefeed query --simple --changefeed-id $changefeedid --pd=http://$UP_PD_HOST:$UP_PD_PORT 2>&1|grep -oE "\"state\": \"[a-z]+\""|tr -d '" '|awk -F':' '{print $(NF)}')
output=$(cdc cli changefeed query --simple --changefeed-id $changefeedid --pd=http://$UP_PD_HOST:$UP_PD_PORT 2>&1)
state=$(echo $output | grep -oE "\"state\": \"[a-z]+\""|tr -d '" '|awk -F':' '{print $(NF)}')
if [ "$state" != "$expected" ]; then
echo "unexpected state $state, expected $expected"
echo "unexpected state $output, expected $expected"
exit 1
fi
}
Expand All @@ -35,7 +36,9 @@ function run() {
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4";;
*) SINK_URI="mysql://root@127.0.0.1:3306/";;
esac
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai"

uuid="custom-changefeed-name"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4"
fi
Expand All @@ -47,9 +50,15 @@ function run() {
check_table_exists tidb_cdc.repl_mark_test_simple ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_table_exists tidb_cdc."\`repl_mark_test_simple-dash\`" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

uuid=$(run_cdc_cli changefeed list 2>&1 | grep -oE "[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}")
check_changefeed_state $uuid "normal"

# Make sure changefeed can not be created if the name is already exists.
exists=$(run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="$uuid" 2>&1 | grep -oE 'already exists')
if [[ -z $exists ]]; then
echo "[$(date)] <<<<< unexpect output got ${exists} >>>>>"
exit 1
fi

# Pause changefeed
run_cdc_cli changefeed --changefeed-id $uuid pause && sleep 3
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
Expand Down