diff --git a/etcdctl/ctlv3/command/global.go b/etcdctl/ctlv3/command/global.go index 825e28391177..920635e2a7ea 100644 --- a/etcdctl/ctlv3/command/global.go +++ b/etcdctl/ctlv3/command/global.go @@ -41,6 +41,7 @@ type GlobalFlags struct { InsecureSkipVerify bool InsecureDiscovery bool Endpoints []string + RequireLeader bool DialTimeout time.Duration CommandTimeOut time.Duration KeepAliveTime time.Duration @@ -382,3 +383,11 @@ func endpointsFromFlagValue(cmd *cobra.Command) ([]string, error) { } return ret, err } + +func requireLeaderFromCmd(cmd *cobra.Command) bool { + req, err := cmd.Flags().GetBool("require-leader") + if err != nil { + ExitWithError(ExitBadArgs, err) + } + return req +} diff --git a/etcdctl/ctlv3/command/watch_command.go b/etcdctl/ctlv3/command/watch_command.go index 4adf547e9cdd..8069f4d72e00 100644 --- a/etcdctl/ctlv3/command/watch_command.go +++ b/etcdctl/ctlv3/command/watch_command.go @@ -57,7 +57,8 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { } c := mustClientFromCmd(cmd) - wc, err := getWatchChan(c, args) + reqLeader := requireLeaderFromCmd(cmd) + wc, err := getWatchChan(c, reqLeader, args) if err != nil { ExitWithError(ExitBadArgs, err) } @@ -71,6 +72,7 @@ func watchCommandFunc(cmd *cobra.Command, args []string) { func watchInteractiveFunc(cmd *cobra.Command, args []string) { c := mustClientFromCmd(cmd) + reqLeader := requireLeaderFromCmd(cmd) reader := bufio.NewReader(os.Stdin) @@ -98,7 +100,7 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err) continue } - ch, err := getWatchChan(c, flagset.Args()) + ch, err := getWatchChan(c, reqLeader, flagset.Args()) if err != nil { fmt.Fprintf(os.Stderr, "Invalid command %s (%v)\n", l, err) continue @@ -107,7 +109,7 @@ func watchInteractiveFunc(cmd *cobra.Command, args []string) { } } -func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) { +func getWatchChan(c *clientv3.Client, requireLeader bool, args []string) (clientv3.WatchChan, error) { if len(args) < 1 || len(args) > 2 { return nil, fmt.Errorf("bad number of arguments") } @@ -125,7 +127,11 @@ func getWatchChan(c *clientv3.Client, args []string) (clientv3.WatchChan, error) if watchPrevKey { opts = append(opts, clientv3.WithPrevKV()) } - return c.Watch(context.TODO(), key, opts...), nil + ctx := context.Background() + if requireLeader { + ctx = clientv3.WithRequireLeader(ctx) + } + return c.Watch(ctx, key, opts...), nil } func printWatchCh(ch clientv3.WatchChan) { diff --git a/etcdctl/ctlv3/ctl.go b/etcdctl/ctlv3/ctl.go index 8692084cfcdf..ad27c15e2a59 100644 --- a/etcdctl/ctlv3/ctl.go +++ b/etcdctl/ctlv3/ctl.go @@ -60,6 +60,7 @@ func init() { rootCmd.PersistentFlags().BoolVar(&globalFlags.Insecure, "insecure-transport", true, "disable transport security for client connections") rootCmd.PersistentFlags().BoolVar(&globalFlags.InsecureDiscovery, "insecure-discovery", true, "accept insecure SRV records describing cluster endpoints") rootCmd.PersistentFlags().BoolVar(&globalFlags.InsecureSkipVerify, "insecure-skip-tls-verify", false, "skip server certificate verification") + rootCmd.PersistentFlags().BoolVar(&globalFlags.RequireLeader, "require-leader", false, "require client requests to only succeed when the cluster has a leader (only available for watch)") rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CertFile, "cert", "", "identify secure client using this TLS certificate file") rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.KeyFile, "key", "", "identify secure client using this TLS key file") rootCmd.PersistentFlags().StringVar(&globalFlags.TLS.CAFile, "cacert", "", "verify certificates of TLS-enabled secure servers using this CA bundle")