diff --git a/command/agent/check.go b/command/agent/check.go index 02c3bd16d73e..cf38f30cd6df 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -6,7 +6,6 @@ import ( "github.com/hashicorp/consul/consul/structs" "log" "os/exec" - "runtime" "sync" "syscall" "time" @@ -106,18 +105,13 @@ func (c *CheckMonitor) run() { // check is invoked periodically to perform the script check func (c *CheckMonitor) check() { - // Determine the shell invocation based on OS - var shell, flag string - if runtime.GOOS == "windows" { - shell = "cmd" - flag = "/C" - } else { - shell = "/bin/sh" - flag = "-c" - } - // Create the command - cmd := exec.Command(shell, flag, c.Script) + cmd, err := ExecScript(c.Script) + if err != nil { + c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", c.Script, err) + c.Notify.UpdateCheck(c.CheckID, structs.HealthUnknown, err.Error()) + return + } // Collect the output output, _ := circbuf.NewBuffer(CheckBufSize) @@ -140,7 +134,7 @@ func (c *CheckMonitor) check() { time.Sleep(30 * time.Second) errCh <- fmt.Errorf("Timed out running check '%s'", c.Script) }() - err := <-errCh + err = <-errCh // Get the output, add a message about truncation outputStr := string(output.Bytes()) diff --git a/command/agent/command.go b/command/agent/command.go index 6410383eaf15..2a580eaf79ae 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -14,6 +14,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-syslog" "github.com/hashicorp/logutils" "github.com/mitchellh/cli" @@ -37,6 +38,7 @@ type Command struct { ShutdownCh <-chan struct{} args []string logFilter *logutils.LevelFilter + logOutput io.Writer agent *Agent rpcServer *AgentRPC httpServer *HTTPServer @@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config { return nil } + // Compile all the watches + for _, params := range config.Watches { + // Parse the watches, excluding the handler + wp, err := watch.ParseExempt(params, []string{"handler"}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err)) + return nil + } + + // Get the handler + if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err)) + return nil + } + + // Store the watch plan + config.WatchPlans = append(config.WatchPlans, wp) + } + // Warn if we are in expect mode if config.BootstrapExpect == 1 { c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.") @@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri } else { logOutput = io.MultiWriter(c.logFilter, logWriter) } + c.logOutput = logOutput return logGate, logWriter, logOutput } @@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int { } } + // Get the new client listener addr + httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + } + + // Register the watches + for _, wp := range config.WatchPlans { + go func() { + wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"]) + wp.LogOutput = c.logOutput + if err := wp.Run(httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + } + }() + } + // Let the agent know we've finished registration c.agent.StartSync() @@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config { } } + // Get the new client listener addr + httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + } + + // Deregister the old watches + for _, wp := range config.WatchPlans { + wp.Stop() + } + + // Register the new watches + for _, wp := range newConf.WatchPlans { + go func() { + wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"]) + wp.LogOutput = c.logOutput + if err := wp.Run(httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + } + }() + } + return newConf } diff --git a/command/agent/config.go b/command/agent/config.go index 87584e1a2b44..f91853cdded9 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/watch" "github.com/mitchellh/mapstructure" ) @@ -229,6 +230,11 @@ type Config struct { // this acts like deny. ACLDownPolicy string `mapstructure:"acl_down_policy"` + // Watches are used to monitor various endpoints and to invoke a + // handler to act appropriately. These are managed entirely in the + // agent layer using the standard APIs. + Watches []map[string]interface{} `mapstructure:"watches"` + // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' // representation of our state. Defaults to every 60s. @@ -251,6 +257,9 @@ type Config struct { // VersionPrerelease is a label for pre-release builds VersionPrerelease string `mapstructure:"-"` + + // WatchPlans contains the compiled watches + WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"` } type dirEnts []os.FileInfo @@ -302,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) { return &net.TCPAddr{IP: ip, Port: port}, nil } +// ClientListenerAddr is used to format an address for a +// port on a ClientAddr, handling the zero IP. +func (c *Config) ClientListenerAddr(port int) (string, error) { + addr, err := c.ClientListener(port) + if err != nil { + return "", err + } + if addr.IP.IsUnspecified() { + addr.IP = net.ParseIP("127.0.0.1") + } + return addr.String(), nil +} + // DecodeConfig reads the configuration from the given reader in JSON // format and decodes it into a proper Config structure. func DecodeConfig(r io.Reader) (*Config, error) { @@ -648,6 +670,12 @@ func MergeConfig(a, b *Config) *Config { if b.ACLDefaultPolicy != "" { result.ACLDefaultPolicy = b.ACLDefaultPolicy } + if len(b.Watches) != 0 { + result.Watches = append(result.Watches, b.Watches...) + } + if len(b.WatchPlans) != 0 { + result.WatchPlans = append(result.WatchPlans, b.WatchPlans...) + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 9bc67c69c0e2..75c0610a9e94 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -384,6 +384,27 @@ func TestDecodeConfig(t *testing.T) { if config.ACLDefaultPolicy != "deny" { t.Fatalf("bad: %#v", config) } + + // Watches + input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if len(config.Watches) != 1 { + t.Fatalf("bad: %#v", config) + } + + out := config.Watches[0] + exp := map[string]interface{}{ + "type": "keyprefix", + "prefix": "foo/", + "handler": "foobar", + } + if !reflect.DeepEqual(out, exp) { + t.Fatalf("bad: %#v", config) + } } func TestDecodeConfig_Service(t *testing.T) { @@ -538,6 +559,13 @@ func TestMergeConfig(t *testing.T) { ACLTTLRaw: "15s", ACLDownPolicy: "deny", ACLDefaultPolicy: "deny", + Watches: []map[string]interface{}{ + map[string]interface{}{ + "type": "keyprefix", + "prefix": "foo/", + "handler": "foobar", + }, + }, } c := MergeConfig(a, b) diff --git a/command/agent/util.go b/command/agent/util.go index 8f6103a8042f..16b3b0190749 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -3,6 +3,8 @@ package agent import ( "math" "math/rand" + "os/exec" + "runtime" "time" ) @@ -39,3 +41,17 @@ func strContains(l []string, s string) bool { } return false } + +// ExecScript returns a command to execute a script +func ExecScript(script string) (*exec.Cmd, error) { + var shell, flag string + if runtime.GOOS == "windows" { + shell = "cmd" + flag = "/C" + } else { + shell = "/bin/sh" + flag = "-c" + } + cmd := exec.Command(shell, flag, script) + return cmd, nil +} diff --git a/command/agent/watch_handler.go b/command/agent/watch_handler.go new file mode 100644 index 000000000000..afc4fb94d22c --- /dev/null +++ b/command/agent/watch_handler.go @@ -0,0 +1,80 @@ +package agent + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "os" + "strconv" + + "github.com/armon/circbuf" + "github.com/hashicorp/consul/watch" +) + +const ( + // Limit the size of a watch handlers's output to the + // last WatchBufSize. Prevents an enormous buffer + // from being captured + WatchBufSize = 4 * 1024 // 4KB +) + +// verifyWatchHandler does the pre-check for our handler configuration +func verifyWatchHandler(params interface{}) error { + if params == nil { + return fmt.Errorf("Must provide watch handler") + } + _, ok := params.(string) + if !ok { + return fmt.Errorf("Watch handler must be a string") + } + return nil +} + +// makeWatchHandler returns a handler for the given watch +func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc { + script := params.(string) + logger := log.New(logOutput, "", log.LstdFlags) + fn := func(idx uint64, data interface{}) { + // Create the command + cmd, err := ExecScript(script) + if err != nil { + logger.Printf("[ERR] agent: Failed to setup watch: %v", err) + return + } + cmd.Env = append(os.Environ(), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + ) + + // Collect the output + output, _ := circbuf.NewBuffer(WatchBufSize) + cmd.Stdout = output + cmd.Stderr = output + + // Setup the input + var inp bytes.Buffer + enc := json.NewEncoder(&inp) + if err := enc.Encode(data); err != nil { + logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err) + return + } + cmd.Stdin = &inp + + // Run the handler + if err := cmd.Run(); err != nil { + logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err) + } + + // Get the output, add a message about truncation + outputStr := string(output.Bytes()) + if output.TotalWritten() > output.Size() { + outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", + output.Size(), output.TotalWritten(), outputStr) + } + + // Log the output + logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr) + } + return fn +} diff --git a/command/agent/watch_handler_test.go b/command/agent/watch_handler_test.go new file mode 100644 index 000000000000..28f1e425f565 --- /dev/null +++ b/command/agent/watch_handler_test.go @@ -0,0 +1,44 @@ +package agent + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestVerifyWatchHandler(t *testing.T) { + if err := verifyWatchHandler(nil); err == nil { + t.Fatalf("should err") + } + if err := verifyWatchHandler(123); err == nil { + t.Fatalf("should err") + } + if err := verifyWatchHandler([]string{"foo"}); err == nil { + t.Fatalf("should err") + } + if err := verifyWatchHandler("foo"); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestMakeWatchHandler(t *testing.T) { + defer os.Remove("handler_out") + defer os.Remove("handler_index_out") + script := "echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out" + handler := makeWatchHandler(os.Stderr, script) + handler(100, []string{"foo", "bar", "baz"}) + raw, err := ioutil.ReadFile("handler_out") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(raw) != "[\"foo\",\"bar\",\"baz\"]\n" { + t.Fatalf("bad: %s", raw) + } + raw, err = ioutil.ReadFile("handler_index_out") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(raw) != "100\n" { + t.Fatalf("bad: %s", raw) + } +} diff --git a/command/rpc.go b/command/rpc.go index 7b4411b2a6d5..97a31d083c73 100644 --- a/command/rpc.go +++ b/command/rpc.go @@ -2,6 +2,7 @@ package command import ( "flag" + "github.com/armon/consul-api" "github.com/hashicorp/consul/command/agent" ) @@ -16,3 +17,17 @@ func RPCAddrFlag(f *flag.FlagSet) *string { func RPCClient(addr string) (*agent.RPCClient, error) { return agent.NewRPCClient(addr) } + +// HTTPAddrFlag returns a pointer to a string that will be populated +// when the given flagset is parsed with the HTTP address of the Consul. +func HTTPAddrFlag(f *flag.FlagSet) *string { + return f.String("http-addr", "127.0.0.1:8500", + "HTTP address of the Consul agent") +} + +// HTTPClient returns a new Consul HTTP client with the given address. +func HTTPClient(addr string) (*consulapi.Client, error) { + conf := consulapi.DefaultConfig() + conf.Address = addr + return consulapi.NewClient(conf) +} diff --git a/command/util_test.go b/command/util_test.go index 8d492b17338c..0366f760bb53 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -22,16 +22,19 @@ func init() { } type agentWrapper struct { - dir string - config *agent.Config - agent *agent.Agent - rpc *agent.AgentRPC - addr string + dir string + config *agent.Config + agent *agent.Agent + rpc *agent.AgentRPC + http *agent.HTTPServer + addr string + httpAddr string } func (a *agentWrapper) Shutdown() { a.rpc.Shutdown() a.agent.Shutdown() + a.http.Shutdown() os.RemoveAll(a.dir) } @@ -59,12 +62,22 @@ func testAgent(t *testing.T) *agentWrapper { } rpc := agent.NewAgentRPC(a, l, mult, lw) + + httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) + http, err := agent.NewHTTPServer(a, "", false, os.Stderr, httpAddr) + if err != nil { + os.RemoveAll(dir) + t.Fatalf(fmt.Sprintf("err: %v", err)) + } + return &agentWrapper{ - dir: dir, - config: conf, - agent: a, - rpc: rpc, - addr: l.Addr().String(), + dir: dir, + config: conf, + agent: a, + rpc: rpc, + http: http, + addr: l.Addr().String(), + httpAddr: httpAddr, } } diff --git a/command/watch.go b/command/watch.go new file mode 100644 index 000000000000..30bb57fab668 --- /dev/null +++ b/command/watch.go @@ -0,0 +1,211 @@ +package command + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "os" + "strconv" + "strings" + + "github.com/hashicorp/consul/command/agent" + "github.com/hashicorp/consul/watch" + "github.com/mitchellh/cli" +) + +// WatchCommand is a Command implementation that is used to setup +// a "watch" which uses a sub-process +type WatchCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui +} + +func (c *WatchCommand) Help() string { + helpText := ` +Usage: consul watch [options] [child...] + + Watches for changes in a given data view from Consul. If a child process + is specified, it will be invoked with the latest results on changes. Otherwise, + the latest values are dumped to stdout and the watch terminates. + + Providing the watch type is required, and other parameters may be required + or supported depending on the watch type. + +Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -datacenter="" Datacenter to query. Defaults to that of agent. + -token="" ACL token to use. Defaults to that of agent. + +Watch Specification: + + -key=val Specifies the key to watch. Only for 'key' type. + -passingonly=[true|false] Specifies if only hosts passing all checks are displayed. + Optional for 'service' type. Defaults false. + -prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type. + -service=val Specifies the service to watch. Required for 'service' type, + optional for 'checks' type. + -state=val Specifies the states to watch. Optional for 'checks' type. + -tag=val Specifies the service tag to filter on. Optional for 'service' + type. + -type=val Specifies the watch type. One of key, keyprefix + services, nodes, service, or checks. +` + return strings.TrimSpace(helpText) +} + +func (c *WatchCommand) Run(args []string) int { + var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state string + cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.StringVar(&watchType, "type", "", "") + cmdFlags.StringVar(&datacenter, "datacenter", "", "") + cmdFlags.StringVar(&token, "token", "", "") + cmdFlags.StringVar(&key, "key", "", "") + cmdFlags.StringVar(&prefix, "prefix", "", "") + cmdFlags.StringVar(&service, "service", "", "") + cmdFlags.StringVar(&tag, "tag", "", "") + cmdFlags.StringVar(&passingOnly, "passingonly", "", "") + cmdFlags.StringVar(&state, "state", "", "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + // Check for a type + if watchType == "" { + c.Ui.Error("Watch type must be specified") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + // Grab the script to execute if any + script := strings.Join(cmdFlags.Args(), " ") + + // Compile the watch parameters + params := make(map[string]interface{}) + if watchType != "" { + params["type"] = watchType + } + if datacenter != "" { + params["datacenter"] = datacenter + } + if token != "" { + params["token"] = token + } + if key != "" { + params["key"] = key + } + if prefix != "" { + params["prefix"] = prefix + } + if service != "" { + params["service"] = service + } + if tag != "" { + params["tag"] = tag + } + if state != "" { + params["state"] = state + } + if passingOnly != "" { + b, err := strconv.ParseBool(passingOnly) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse passingonly flag: %s", err)) + return 1 + } + params["passingonly"] = b + } + + // Create the watch + wp, err := watch.Parse(params) + if err != nil { + c.Ui.Error(fmt.Sprintf("%s", err)) + return 1 + } + + // Create and test the HTTP client + client, err := HTTPClient(*httpAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + _, err = client.Agent().NodeName() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + + // Setup handler + errExit := false + if script == "" { + wp.Handler = func(idx uint64, data interface{}) { + defer wp.Stop() + buf, err := json.MarshalIndent(data, "", " ") + if err != nil { + c.Ui.Error(fmt.Sprintf("Error encoding output: %s", err)) + errExit = true + } + c.Ui.Output(string(buf)) + } + } else { + wp.Handler = func(idx uint64, data interface{}) { + // Create the command + var buf bytes.Buffer + var err error + cmd, err := agent.ExecScript(script) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err)) + goto ERR + } + cmd.Env = append(os.Environ(), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + ) + + // Encode the input + if err = json.NewEncoder(&buf).Encode(data); err != nil { + c.Ui.Error(fmt.Sprintf("Error encoding output: %s", err)) + goto ERR + } + cmd.Stdin = &buf + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Run the handler + if err := cmd.Run(); err != nil { + c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err)) + goto ERR + } + return + ERR: + wp.Stop() + errExit = true + } + } + + // Watch for a shutdown + go func() { + <-c.ShutdownCh + wp.Stop() + os.Exit(0) + }() + + // Run the watch + if err := wp.Run(*httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + + // Handle an error exit + if errExit { + return 1 + } else { + return 0 + } +} + +func (c *WatchCommand) Synopsis() string { + return "Watch for changes in Consul" +} diff --git a/command/watch_test.go b/command/watch_test.go new file mode 100644 index 000000000000..eaa9376d5a02 --- /dev/null +++ b/command/watch_test.go @@ -0,0 +1,29 @@ +package command + +import ( + "github.com/mitchellh/cli" + "strings" + "testing" +) + +func TestWatchCommand_implements(t *testing.T) { + var _ cli.Command = &WatchCommand{} +} + +func TestWatchCommandRun(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + + ui := new(cli.MockUi) + c := &WatchCommand{Ui: ui} + args := []string{"-http-addr=" + a1.httpAddr, "-type=nodes"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), a1.config.NodeName) { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} diff --git a/commands.go b/commands.go index 1bb6af8941e7..1b98e013afd3 100644 --- a/commands.go +++ b/commands.go @@ -82,6 +82,13 @@ func init() { Ui: ui, }, nil }, + + "watch": func() (cli.Command, error) { + return &command.WatchCommand{ + ShutdownCh: makeShutdownCh(), + Ui: ui, + }, nil + }, } } diff --git a/watch/funcs.go b/watch/funcs.go new file mode 100644 index 000000000000..a6d1015964b9 --- /dev/null +++ b/watch/funcs.go @@ -0,0 +1,166 @@ +package watch + +import ( + "fmt" + + "github.com/armon/consul-api" +) + +// watchFactory is a function that can create a new WatchFunc +// from a parameter configuration +type watchFactory func(params map[string]interface{}) (WatchFunc, error) + +// watchFuncFactory maps each type to a factory function +var watchFuncFactory map[string]watchFactory + +func init() { + watchFuncFactory = map[string]watchFactory{ + "key": keyWatch, + "keyprefix": keyPrefixWatch, + "services": servicesWatch, + "nodes": nodesWatch, + "service": serviceWatch, + "checks": checksWatch, + } +} + +// keyWatch is used to return a key watching function +func keyWatch(params map[string]interface{}) (WatchFunc, error) { + var key string + if err := assignValue(params, "key", &key); err != nil { + return nil, err + } + if key == "" { + return nil, fmt.Errorf("Must specify a single key to watch") + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + kv := p.client.KV() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + pair, meta, err := kv.Get(key, &opts) + if err != nil { + return 0, nil, err + } + if pair == nil { + return meta.LastIndex, nil, err + } + return meta.LastIndex, pair, err + } + return fn, nil +} + +// keyPrefixWatch is used to return a key prefix watching function +func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) { + var prefix string + if err := assignValue(params, "prefix", &prefix); err != nil { + return nil, err + } + if prefix == "" { + return nil, fmt.Errorf("Must specify a single prefix to watch") + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + kv := p.client.KV() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + pairs, meta, err := kv.List(prefix, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, pairs, err + } + return fn, nil +} + +// servicesWatch is used to watch the list of available services +func servicesWatch(params map[string]interface{}) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + catalog := p.client.Catalog() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + services, meta, err := catalog.Services(&opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, services, err + } + return fn, nil +} + +// nodesWatch is used to watch the list of available nodes +func nodesWatch(params map[string]interface{}) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + catalog := p.client.Catalog() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + nodes, meta, err := catalog.Nodes(&opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, nodes, err + } + return fn, nil +} + +// serviceWatch is used to watch a specific service for changes +func serviceWatch(params map[string]interface{}) (WatchFunc, error) { + var service, tag string + if err := assignValue(params, "service", &service); err != nil { + return nil, err + } + if service == "" { + return nil, fmt.Errorf("Must specify a single service to watch") + } + + if err := assignValue(params, "tag", &tag); err != nil { + return nil, err + } + + passingOnly := false + if err := assignValueBool(params, "passingonly", &passingOnly); err != nil { + return nil, err + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + health := p.client.Health() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + nodes, meta, err := health.Service(service, tag, passingOnly, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, nodes, err + } + return fn, nil +} + +// checksWatch is used to watch a specific checks in a given state +func checksWatch(params map[string]interface{}) (WatchFunc, error) { + var service, state string + if err := assignValue(params, "service", &service); err != nil { + return nil, err + } + if err := assignValue(params, "state", &state); err != nil { + return nil, err + } + if service != "" && state != "" { + return nil, fmt.Errorf("Cannot specify service and state") + } + if service == "" && state == "" { + state = "any" + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + health := p.client.Health() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + var checks []*consulapi.HealthCheck + var meta *consulapi.QueryMeta + var err error + if state != "" { + checks, meta, err = health.State(state, &opts) + } else { + checks, meta, err = health.Checks(service, &opts) + } + if err != nil { + return 0, nil, err + } + return meta.LastIndex, checks, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go new file mode 100644 index 000000000000..d0dbb0c8f28b --- /dev/null +++ b/watch/funcs_test.go @@ -0,0 +1,394 @@ +package watch + +import ( + "os" + "testing" + "time" + + "github.com/armon/consul-api" +) + +var consulAddr string + +func init() { + consulAddr = os.Getenv("CONSUL_ADDR") +} + +func TestKeyWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil || string(v.Value) != "test" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar/baz", + Value: []byte("test"), + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + plan.Stop() + + // Delete the key + _, err = kv.Delete("foo/bar/baz", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestKeyPrefixWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(consulapi.KVPairs) + if ok && v == nil { + return + } + if !ok || v == nil || string(v[0].Key) != "foo/bar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar", + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + plan.Stop() + + // Delete the key + _, err = kv.Delete("foo/bar", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestServicesWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"services"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(map[string][]string) + if !ok || v["consul"] == nil { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + plan.Stop() + + agent := plan.client.Agent() + reg := &consulapi.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + } + agent.ServiceRegister(reg) + time.Sleep(20 * time.Millisecond) + agent.ServiceDeregister("foo") + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestNodesWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"nodes"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.Node) + if !ok || len(v) == 0 { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + plan.Stop() + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Register(reg, nil) + time.Sleep(20 * time.Millisecond) + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestServiceWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.ServiceEntry) + if ok && len(v) == 0 { + return + } + if !ok || v[0].Service.ID != "foo" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + agent := plan.client.Agent() + reg := &consulapi.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + Tags: []string{"bar"}, + } + agent.ServiceRegister(reg) + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + agent.ServiceDeregister("foo") + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestChecksWatch_State(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"checks", "state":"warning"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.HealthCheck) + if len(v) == 0 { + return + } + if !ok || v[0].CheckID != "foobar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + Check: &consulapi.AgentCheck{ + Node: "foobar", + CheckID: "foobar", + Name: "foobar", + Status: "warning", + }, + } + catalog.Register(reg, nil) + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestChecksWatch_Service(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.HealthCheck) + if len(v) == 0 { + return + } + if !ok || v[0].CheckID != "foobar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + Service: &consulapi.AgentService{ + ID: "foobar", + Service: "foobar", + }, + Check: &consulapi.AgentCheck{ + Node: "foobar", + CheckID: "foobar", + Name: "foobar", + Status: "passing", + ServiceID: "foobar", + }, + } + _, err := catalog.Register(reg, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} diff --git a/watch/plan.go b/watch/plan.go new file mode 100644 index 000000000000..f0c4e9ff7c41 --- /dev/null +++ b/watch/plan.go @@ -0,0 +1,116 @@ +package watch + +import ( + "fmt" + "log" + "os" + "reflect" + "time" + + "github.com/armon/consul-api" +) + +const ( + // retryInterval is the base retry value + retryInterval = 5 * time.Second + + // maximum back off time, this is to prevent + // exponential runaway + maxBackoffTime = 180 * time.Second +) + +// Run is used to run a watch plan +func (p *WatchPlan) Run(address string) error { + // Setup the client + p.address = address + conf := consulapi.DefaultConfig() + conf.Address = address + conf.Datacenter = p.Datacenter + conf.Token = p.Token + client, err := consulapi.NewClient(conf) + if err != nil { + return fmt.Errorf("Failed to connect to agent: %v", err) + } + p.client = client + + // Create the logger + output := p.LogOutput + if output == nil { + output = os.Stderr + } + logger := log.New(output, "", log.LstdFlags) + + // Loop until we are canceled + failures := 0 +OUTER: + for !p.shouldStop() { + // Invoke the handler + index, result, err := p.Func(p) + + // Check if we should terminate since the function + // could have blocked for a while + if p.shouldStop() { + break + } + + // Handle an error in the watch function + if err != nil { + // Perform an exponential backoff + failures++ + retry := retryInterval * time.Duration(failures*failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + logger.Printf("consul.watch: Watch (type: %s) errored: %v, retry in %v", + p.Type, err, retry) + select { + case <-time.After(retry): + continue OUTER + case <-p.stopCh: + return nil + } + } + + // Clear the failures + failures = 0 + + // If the index is unchanged do nothing + if index == p.lastIndex { + continue + } + + // Update the index, look for change + oldIndex := p.lastIndex + p.lastIndex = index + if oldIndex != 0 && reflect.DeepEqual(p.lastResult, result) { + continue + } + + // Handle the updated result + p.lastResult = result + if p.Handler != nil { + p.Handler(index, result) + } + } + return nil +} + +// Stop is used to stop running the watch plan +func (p *WatchPlan) Stop() { + p.stopLock.Lock() + defer p.stopLock.Unlock() + if p.stop { + return + } + p.stop = true + close(p.stopCh) +} + +func (p *WatchPlan) shouldStop() bool { + select { + case <-p.stopCh: + return true + default: + return false + } +} diff --git a/watch/plan_test.go b/watch/plan_test.go new file mode 100644 index 000000000000..e2dad6d79ba8 --- /dev/null +++ b/watch/plan_test.go @@ -0,0 +1,54 @@ +package watch + +import ( + "testing" + "time" +) + +func init() { + watchFuncFactory["noop"] = noopWatch +} + +func noopWatch(params map[string]interface{}) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + idx := p.lastIndex + 1 + return idx, idx, nil + } + return fn, nil +} + +func mustParse(t *testing.T, q string) *WatchPlan { + params := makeParams(t, q) + plan, err := Parse(params) + if err != nil { + t.Fatalf("err: %v", err) + } + return plan +} + +func TestRun_Stop(t *testing.T) { + plan := mustParse(t, `{"type":"noop"}`) + var expect uint64 = 1 + plan.Handler = func(idx uint64, val interface{}) { + if idx != expect { + t.Fatalf("Bad: %d %d", expect, idx) + } + if val != expect { + t.Fatalf("Bad: %d %d", expect, val) + } + expect++ + } + + time.AfterFunc(10*time.Millisecond, func() { + plan.Stop() + }) + + err := plan.Run("127.0.0.1:8500") + if err != nil { + t.Fatalf("err: %v", err) + } + + if expect == 1 { + t.Fatalf("Bad: %d", expect) + } +} diff --git a/watch/watch.go b/watch/watch.go new file mode 100644 index 000000000000..0b0a69a32ec0 --- /dev/null +++ b/watch/watch.go @@ -0,0 +1,129 @@ +package watch + +import ( + "fmt" + "io" + "sync" + + "github.com/armon/consul-api" +) + +// WatchPlan is the parsed version of a watch specification. A watch provides +// the details of a query, which generates a view into the Consul data store. +// This view is watched for changes and a handler is invoked to take any +// appropriate actions. +type WatchPlan struct { + Datacenter string + Token string + Type string + Exempt map[string]interface{} + + Func WatchFunc + Handler HandlerFunc + LogOutput io.Writer + + address string + client *consulapi.Client + lastIndex uint64 + lastResult interface{} + + stop bool + stopCh chan struct{} + stopLock sync.Mutex +} + +// WatchFunc is used to watch for a diff +type WatchFunc func(*WatchPlan) (uint64, interface{}, error) + +// HandlerFunc is used to handle new data +type HandlerFunc func(uint64, interface{}) + +// Parse takes a watch query and compiles it into a WatchPlan or an error +func Parse(params map[string]interface{}) (*WatchPlan, error) { + return ParseExempt(params, nil) +} + +// ParseExempt takes a watch query and compiles it into a WatchPlan or an error +// Any exempt parameters are stored in the Exempt map +func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) { + plan := &WatchPlan{ + stopCh: make(chan struct{}), + } + + // Parse the generic parameters + if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { + return nil, err + } + if err := assignValue(params, "token", &plan.Token); err != nil { + return nil, err + } + if err := assignValue(params, "type", &plan.Type); err != nil { + return nil, err + } + + // Ensure there is a watch type + if plan.Type == "" { + return nil, fmt.Errorf("Watch type must be specified") + } + + // Look for a factory function + factory := watchFuncFactory[plan.Type] + if factory == nil { + return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type) + } + + // Get the watch func + fn, err := factory(params) + if err != nil { + return nil, err + } + plan.Func = fn + + // Remove the exempt parameters + if len(exempt) > 0 { + plan.Exempt = make(map[string]interface{}) + for _, ex := range exempt { + val, ok := params[ex] + if ok { + plan.Exempt[ex] = val + delete(params, ex) + } + } + } + + // Ensure all parameters are consumed + if len(params) != 0 { + var bad []string + for key := range params { + bad = append(bad, key) + } + return nil, fmt.Errorf("Invalid parameters: %v", bad) + } + return plan, nil +} + +// assignValue is used to extract a value ensuring it is a string +func assignValue(params map[string]interface{}, name string, out *string) error { + if raw, ok := params[name]; ok { + val, ok := raw.(string) + if !ok { + return fmt.Errorf("Expecting %s to be a string") + } + *out = val + delete(params, name) + } + return nil +} + +// assignValueBool is used to extract a value ensuring it is a bool +func assignValueBool(params map[string]interface{}, name string, out *bool) error { + if raw, ok := params[name]; ok { + val, ok := raw.(bool) + if !ok { + return fmt.Errorf("Expecting %s to be a boolean") + } + *out = val + delete(params, name) + } + return nil +} diff --git a/watch/watch_test.go b/watch/watch_test.go new file mode 100644 index 000000000000..f4597b46f394 --- /dev/null +++ b/watch/watch_test.go @@ -0,0 +1,48 @@ +package watch + +import ( + "bytes" + "encoding/json" + "testing" +) + +func TestParseBasic(t *testing.T) { + params := makeParams(t, `{"type":"key", "datacenter":"dc2", "token":"12345", "key":"foo"}`) + p, err := Parse(params) + if err != nil { + t.Fatalf("err: %v", err) + } + if p.Datacenter != "dc2" { + t.Fatalf("Bad: %#v", p) + } + if p.Token != "12345" { + t.Fatalf("Bad: %#v", p) + } + if p.Type != "key" { + t.Fatalf("Bad: %#v", p) + } +} + +func TestParse_exempt(t *testing.T) { + params := makeParams(t, `{"type":"key", "key":"foo", "handler": "foobar"}`) + p, err := ParseExempt(params, []string{"handler"}) + if err != nil { + t.Fatalf("err: %v", err) + } + if p.Type != "key" { + t.Fatalf("Bad: %#v", p) + } + ex := p.Exempt["handler"] + if ex != "foobar" { + t.Fatalf("bad: %v", ex) + } +} + +func makeParams(t *testing.T, s string) map[string]interface{} { + var out map[string]interface{} + dec := json.NewDecoder(bytes.NewReader([]byte(s))) + if err := dec.Decode(&out); err != nil { + t.Fatalf("err: %v", err) + } + return out +} diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 4e6d66590bea..bc37837ccb82 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -145,7 +145,13 @@ definitions support being updated during a reload. "data_dir": "/opt/consul", "log_level": "INFO", "node_name": "foobar", - "server": true + "server": true, + "watches": [ + { + "type": "checks", + "handler": "/usr/bin/health-check-handler.sh" + } + ] } @@ -316,6 +322,11 @@ definitions support being updated during a reload. However, because the caches are not actively invalidated, ACL policy may be stale up to the TTL value. +* `watches` - Watches is a list of watch specifications. + These allow an external process to be automatically invoked when a particular + data view is updated. See the [watch documentation](/docs/agent/watches.html) for + more documentation. Watches can be modified when the configuration is reloaded. + ## Ports Used Consul requires up to 5 different ports to work properly, some requiring diff --git a/website/source/docs/agent/watches.html.markdown b/website/source/docs/agent/watches.html.markdown new file mode 100644 index 000000000000..a3b03157a6cc --- /dev/null +++ b/website/source/docs/agent/watches.html.markdown @@ -0,0 +1,286 @@ +--- +layout: "docs" +page_title: "Watches" +sidebar_current: "docs-agent-watches" +--- + +# Watches + +Watches are a way of specifying a view of data (list of nodes, KV pairs, +health checks, etc) which is monitored for any updates. When an update +is detected, an external handler is invoked. A handler can be any +executable. As an example, you could watch the status of health checks and +notify an external system when a check is critical. + +Watches are implemented using blocking queries in the [HTTP API](/docs/agent/http.html). +Agent's automatically make the proper API calls to watch for changes, +and inform a handler when the data view has updated. + +Watches can can be configured as part of the [agent's configuration](/docs/agent/options.html), +causing them to run once the agent is initialized. Reloading the agent configuration +allows for adding or removing watches dynamically. + +Alternatively, the [watch command](/docs/commands/watch.html) enables a watch to be +started outside of the agent. This can be used by an operator to inspect data in Consul, +or to easily pipe data into processes without being tied to the agent lifecycle. + +In either case, the `type` of the watch must be specified. Each type of watch +supports different parameters, both required and optional. These options are specified +in a JSON body when using agent configuration, or as CLI flags for the watch command. + +## Handlers + +The watch specifiation specifies the view of data to be monitored. +Once that view is updated the specified handler is invoked. The handler +can be any executable. + +A handler should read it's input from stdin, and expect to read +JSON formatted data. The format of the data depends on the type of the +watch. Each watch type documents the format type, and because they +map directly to an HTTP API, handlers should expect the input to +match the format of the API. + +Additionally, the `CONSUL_INDEX` environmental variable will be set. +This maps to the `X-Consul-Index` value from the [HTTP API](/docs/agent/http.html). + +## Global Parameters + +In addition to the parameters supported by each option type, there +are a few global parameters that all watches support: + +* `datacenter` - Can be provided to override the agent's default datacenter. +* `token` - Can be provided to override the agent's default ACL token. +* `handler` - The handler to invoke when the data view updates. + +## Watch Types + +The following types are supported, with more documentation below: + +* `key` - Watch a specific KV pair +* `keyprefix` - Watch a prefix in the KV store +* `services` - Watch the list of available services +* `nodes` - Watch the list of nodes +* `service`- Watch the instances of a service +* `checks` - Watch the value of health checks + + +### Type: key + +The "key" watch type is used to watch a specific key in the KV store. +It requires that the "key" parameter be specified. + +This maps to the `/v1/kv/` API internally. + +Here is an example configuration: + + { + "type": "key", + "key": "foo/bar/baz", + "handler": "/usr/bin/my-key-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type key -key foo/bar/baz /usr/bin/my-key-handler.sh + +An example of the output of this command: + + { + "Key": "foo/bar/baz", + "CreateIndex": 1793, + "ModifyIndex": 1793, + "LockIndex": 0, + "Flags": 0, + "Value": "aGV5", + "Session": "" + } + +### Type: keyprefix + +The "keyprefix" watch type is used to watch a prefix of keys in the KV store. +It requires that the "prefix" parameter be specified. + +This maps to the `/v1/kv/` API internally. + +Here is an example configuration: + + { + "type": "keyprefix", + "prefix": "foo/", + "handler": "/usr/bin/my-prefix-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type keyprefix -prefix foo/ /usr/bin/my-prefix-handler.sh + +An example of the output of this command: + + [ + { + "Key": "foo/bar", + "CreateIndex": 1796, + "ModifyIndex": 1796, + "LockIndex": 0, + "Flags": 0, + "Value": "TU9BUg==", + "Session": "" + }, + { + "Key": "foo/baz", + "CreateIndex": 1795, + "ModifyIndex": 1795, + "LockIndex": 0, + "Flags": 0, + "Value": "YXNkZg==", + "Session": "" + }, + { + "Key": "foo/test", + "CreateIndex": 1793, + "ModifyIndex": 1793, + "LockIndex": 0, + "Flags": 0, + "Value": "aGV5", + "Session": "" + } + ] + + +### Type: services + +The "services" watch type is used to watch the list of available +services. It has no parameters. + +This maps to the `/v1/catalog/services` API internally. + +An example of the output of this command: + + { + "consul": [], + "redis": [], + "web": [] + } + +### Type: nodes + +The "nodes" watch type is used to watch the list of available +nodes. It has no parameters. + +This maps to the `/v1/catalog/nodes` API internally. + +An example of the output of this command: + + [ + { + "Node": "nyc1-consul-1", + "Address": "192.241.159.115" + }, + { + "Node": "nyc1-consul-2", + "Address": "192.241.158.205" + }, + { + "Node": "nyc1-consul-3", + "Address": "198.199.77.133" + }, + { + "Node": "nyc1-worker-1", + "Address": "162.243.162.228" + }, + { + "Node": "nyc1-worker-2", + "Address": "162.243.162.226" + }, + { + "Node": "nyc1-worker-3", + "Address": "162.243.162.229" + } + ] + +### Type: service + +The "service" watch type is used to monitor the providers +of a single service. It requires the "service" parameter, +but optionally takes "tag" and "passingonly". The "tag" parameter +will filter by tag, and "passingonly" is a boolean that will +filter to only the instances passing all health checks. + +This maps to the `/v1/health/service` API internally. + +Here is an example configuration: + + { + "type": "service", + "key": "redis", + "handler": "/usr/bin/my-service-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type service -service redis /usr/bin/my-service-handler.sh + +An example of the output of this command: + + [ + { + "Node": { + "Node": "foobar", + "Address": "10.1.10.12" + }, + "Service": { + "ID": "redis", + "Service": "redis", + "Tags": null, + "Port": 8000 + }, + "Checks": [ + { + "Node": "foobar", + "CheckID": "service:redis", + "Name": "Service 'redis' check", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "redis", + "ServiceName": "redis" + }, + { + "Node": "foobar", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "", + "ServiceName": "" + } + ] + } + ] + +### Type: checks + +The "checks" watch type is used to monitor the checks of a given +service or in a specific state. It optionally takes the "service" +parameter to filter to a specific service, or "state" to filter +to a specific state. By default, it will watch all checks. + +This maps to the `/v1/health/state/` API if monitoring by state, +or `/v1/health/checks/` if monitoring by service. + +An example of the output of this command: + + [ + { + "Node": "foobar", + "CheckID": "service:redis", + "Name": "Service 'redis' check", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "redis", + "ServiceName": "redis" + } + ] + diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index afc3d621052a..3c43cac4703f 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -34,6 +34,7 @@ Available commands are: monitor Stream logs from a Consul agent reload Triggers the agent to reload configuration files version Prints the Consul version + watch Watch for changes in Consul ``` To get help for any specific command, pass the `-h` flag to the relevant diff --git a/website/source/docs/commands/watch.html.markdown b/website/source/docs/commands/watch.html.markdown new file mode 100644 index 000000000000..7f565a3594d8 --- /dev/null +++ b/website/source/docs/commands/watch.html.markdown @@ -0,0 +1,53 @@ +--- +layout: "docs" +page_title: "Commands: Watch" +sidebar_current: "docs-commands-watch" +--- + +# Consul Watch + +Command: `consul watch` + +The watch command provides a mechanism to watch for changes in a particular +data view (list of nodes, service members, key value, etc) and to invoke +a process with the latest values of the view. If no process is specified, +the current values are dumped to stdout which can be a useful way to inspect +data in Consul. + +There is more [documentation on watches here](/docs/agent/watches.html). + +## Usage + +Usage: `consul watch [options] [child...]` + +The only required option is `-type` which specifies the particular +data view. Depending on the type, various options may be required +or optionally provided. There is more documentation on watch +[specifications here](/docs/agent/watches.html). + +The list of available flags are: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-datacenter` - Datacenter to query. Defaults to that of agent. + +* `-token` - ACL token to use. Defaults to that of agent. + +* `-key` - Key to watch. Only for `key` type. + +* `-passingonly=[true|false]` - Should only passing entries be returned. Default false. + only for `service` type. + +* `-prefix` - Key prefix to watch. Only for `keyprefix` type. + +* `-service` - Service to watch. Required for `service` type, optional for `checks` type. + +* `-state` - Check state to filter on. Optional for `checks` type. + +* `-tag` - Service tag to filter on. Optional for `service` type. + +* `-type` - Watch type. Required, one of "key", "keyprefix", "services", + "nodes", "services", or "checks". + diff --git a/website/source/intro/getting-started/install.html.markdown b/website/source/intro/getting-started/install.html.markdown index 4b72e73ce83a..9ef7acaaa38f 100644 --- a/website/source/intro/getting-started/install.html.markdown +++ b/website/source/intro/getting-started/install.html.markdown @@ -57,6 +57,7 @@ Available commands are: members Lists the members of a Consul cluster monitor Stream logs from a Consul agent version Prints the Consul version + watch Watch for changes in Consul ``` If you get an error that `consul` could not be found, then your PATH diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index fcad0930ea62..427a7be74c20 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -89,6 +89,10 @@ > reload + + + > + watch @@ -130,6 +134,10 @@ > Telemetry + + + > + Watches