From fc6677331539c156edc6ccabce7df7a4d05dab86 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Mon, 11 Mar 2019 18:44:03 -0500 Subject: [PATCH] [Heartbeat] Fix missing url.* fields on TCP NXDOMAIN (#10787) TCP checks are not adding URL fields on NXDOMAIN endpoints. This fixes that issue. It does so by ensuring that URL metadata is added before executing the check, and not during, as done previously. A side effect of this is that we now perform DNS lookups once per `{hostname,port}` instead of once per `{hostname}`. This is worth the increased simplicity however, as the code would be quite convoluted otherwise, which would put us at risk for more bugs. Related (but different) 6.x issue: https://github.com/elastic/beats/pull/10777 Fix import formatting (cherry picked from commit 821baec2411eb070cc32bad3b01b87f1178ed220) --- heartbeat/hbtest/hbtestutil.go | 9 ++- .../monitors/active/dialchain/builder.go | 57 ++++++++++--------- heartbeat/monitors/active/tcp/tcp.go | 9 --- heartbeat/monitors/active/tcp/tcp_test.go | 28 ++++++--- heartbeat/monitors/util.go | 48 ---------------- 5 files changed, 58 insertions(+), 93 deletions(-) diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index 0173b6760e6..fca1ab267d3 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -97,10 +97,17 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) mapval. // BaseChecks creates a skima.Validator that represents the "monitor" field present // in all heartbeat events. +// If IP is set to "" this will check that the field is not present func BaseChecks(ip string, status string, typ string) mapval.Validator { + var ipCheck mapval.IsDef + if len(ip) > 0 { + ipCheck = mapval.IsEqual(ip) + } else { + ipCheck = mapval.Optional(mapval.IsEqual(ip)) + } return mapval.MustCompile(mapval.Map{ "monitor": mapval.Map{ - "ip": ip, + "ip": ipCheck, "duration.us": mapval.IsDuration, "status": status, "id": mapval.IsNonEmptyString, diff --git a/heartbeat/monitors/active/dialchain/builder.go b/heartbeat/monitors/active/dialchain/builder.go index b8d522bbe1e..301c932db57 100644 --- a/heartbeat/monitors/active/dialchain/builder.go +++ b/heartbeat/monitors/active/dialchain/builder.go @@ -18,12 +18,14 @@ package dialchain import ( + "fmt" "net" - "strconv" + "net/url" "time" "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/heartbeat/monitors/jobs" + "github.com/elastic/beats/heartbeat/monitors/wrappers" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/outputs/transport" ) @@ -136,59 +138,60 @@ func MakeDialerJobs( ) ([]jobs.Job, error) { var jobs []jobs.Job for _, endpoint := range endpoints { - endpointJobs, err := makeEndpointJobs(b, scheme, endpoint, mode, fn) - if err != nil { - return nil, err + for _, port := range endpoint.Ports { + endpointURL, err := url.Parse(fmt.Sprintf("%s://%s:%d", scheme, endpoint.Host, port)) + if err != nil { + return nil, err + } + endpointJob, err := makeEndpointJob(b, endpointURL, mode, fn) + if err != nil { + return nil, err + } + jobs = append(jobs, wrappers.WithURLField(endpointURL, endpointJob)) } - jobs = append(jobs, endpointJobs...) + } return jobs, nil } -func makeEndpointJobs( +func makeEndpointJob( b *Builder, - scheme string, - endpoint Endpoint, + endpointURL *url.URL, mode monitors.IPSettings, fn func(*beat.Event, transport.Dialer, string) error, -) ([]jobs.Job, error) { +) (jobs.Job, error) { // Check if SOCKS5 is configured, with relying on the socks5 proxy // in resolving the actual IP. // Create one job for every port number configured. if b.resolveViaSocks5 { - js := make([]jobs.Job, len(endpoint.Ports)) - for i, port := range endpoint.Ports { - address := net.JoinHostPort(endpoint.Host, strconv.Itoa(int(port))) - js[i] = jobs.MakeSimpleJob(func(event *beat.Event) error { - return b.Run(event, address, func(event *beat.Event, dialer transport.Dialer) error { - return fn(event, dialer, address) + return wrappers.WithURLField(endpointURL, + jobs.MakeSimpleJob(func(event *beat.Event) error { + hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port()) + return b.Run(event, hostPort, func(event *beat.Event, dialer transport.Dialer) error { + return fn(event, dialer, hostPort) }) - }) - } - return js, nil + })), nil } // Create job that first resolves one or multiple IP (depending on // config.Mode) in order to create one continuation Task per IP. - settings := monitors.MakeHostJobSettings(endpoint.Host, mode) + settings := monitors.MakeHostJobSettings(endpointURL.Hostname(), mode) job, err := monitors.MakeByHostJob(settings, - monitors.MakePingAllIPPortFactory(endpoint.Ports, - func(event *beat.Event, ip *net.IPAddr, port uint16) error { + monitors.MakePingIPFactory( + func(event *beat.Event, ip *net.IPAddr) error { // use address from resolved IP - portStr := strconv.Itoa(int(port)) - ipAddr := net.JoinHostPort(ip.String(), portStr) - hostAddr := net.JoinHostPort(endpoint.Host, portStr) + ipPort := net.JoinHostPort(ip.String(), endpointURL.Port()) cb := func(event *beat.Event, dialer transport.Dialer) error { - return fn(event, dialer, hostAddr) + return fn(event, dialer, ipPort) } - err := b.Run(event, ipAddr, cb) + err := b.Run(event, ipPort, cb) return err })) if err != nil { return nil, err } - return []jobs.Job{job}, nil + return job, nil } diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 827d7654a1c..0fcc4329b9c 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -23,11 +23,9 @@ import ( "strconv" "strings" - "github.com/elastic/beats/heartbeat/eventext" "github.com/elastic/beats/heartbeat/monitors" "github.com/elastic/beats/heartbeat/monitors/active/dialchain" "github.com/elastic/beats/heartbeat/monitors/jobs" - "github.com/elastic/beats/heartbeat/monitors/wrappers" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -91,13 +89,6 @@ func create( epJobs, err := dialchain.MakeDialerJobs(db, scheme, eps, config.Mode, func(event *beat.Event, dialer transport.Dialer, addr string) error { - u, err := url.Parse(fmt.Sprintf("%s://%s", scheme, addr)) - if err != nil { - return err - } - - eventext.MergeEventFields(event, common.MapStr{"url": wrappers.URLFields(u)}) - return pingHost(event, dialer, addr, timeout, validator) }) if err != nil { diff --git a/heartbeat/monitors/active/tcp/tcp_test.go b/heartbeat/monitors/active/tcp/tcp_test.go index 3b25685636a..af3026cbc00 100644 --- a/heartbeat/monitors/active/tcp/tcp_test.go +++ b/heartbeat/monitors/active/tcp/tcp_test.go @@ -97,10 +97,6 @@ func setupServer(t *testing.T, serverCreator func(http.Handler) *httptest.Server return server, port } -func tcpMonitorChecks(host string, ip string, port uint16, status string) mapval.Validator { - return hbtest.BaseChecks(ip, status, "tcp") -} - func TestUpEndpointJob(t *testing.T) { server, port := setupServer(t, httptest.NewServer) defer server.Close() @@ -174,7 +170,7 @@ func TestConnectionRefusedEndpointJob(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(ip, ip, port, "down"), + hbtest.BaseChecks(ip, "down", "tcp"), hbtest.SummaryChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", ip, port), hbtest.ErrorChecks(dialErr, "io"), @@ -192,7 +188,7 @@ func TestUnreachableEndpointJob(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(ip, ip, port, "down"), + hbtest.BaseChecks(ip, "down", "tcp"), hbtest.SummaryChecks(0, 1), hbtest.SimpleURLChecks(t, "tcp", ip, port), hbtest.ErrorChecks(dialErr, "io"), @@ -219,7 +215,7 @@ func TestCheckUp(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(host, ip, port, "up"), + hbtest.BaseChecks(ip, "up", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), hbtest.SummaryChecks(1, 0), @@ -255,7 +251,7 @@ func TestCheckDown(t *testing.T) { mapval.Test( t, mapval.Strict(mapval.Compose( - tcpMonitorChecks(host, ip, port, "down"), + hbtest.BaseChecks(ip, "down", "tcp"), hbtest.RespondingTCPChecks(), hbtest.SimpleURLChecks(t, "tcp", host, port), hbtest.SummaryChecks(0, 1), @@ -272,6 +268,22 @@ func TestCheckDown(t *testing.T) { "message": "received string mismatch", }, }), + )), event.Fields) +} + +func TestNXDomainJob(t *testing.T) { + host := "notadomainatallforsure.notadomain.notatldreally" + port := uint16(1234) + event := testTCPCheck(t, host, port) + + dialErr := fmt.Sprintf("lookup %s", host) + mapval.Test( + t, + mapval.Strict(mapval.Compose( + hbtest.BaseChecks("", "down", "tcp"), + hbtest.SummaryChecks(0, 1), + hbtest.SimpleURLChecks(t, "tcp", host, port), + hbtest.ErrorChecks(dialErr, "io"), )), event.Fields, ) diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 7f81ac8a455..fa48113ec72 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -100,54 +100,6 @@ func MakePingIPFactory( } } -// MakePingAllIPFactory wraps a function for building a recursive Task Runner from function callbacks. -func MakePingAllIPFactory( - f func(*net.IPAddr) []func(*beat.Event) error, -) func(*net.IPAddr) jobs.Job { - return func(ip *net.IPAddr) jobs.Job { - cont := f(ip) - switch len(cont) { - case 0: - return emptyTask - case 1: - return MakeSimpleCont(cont[0]) - } - - tasks := make([]jobs.Job, len(cont)) - for i, c := range cont { - tasks[i] = MakeSimpleCont(c) - } - return func(event *beat.Event) ([]jobs.Job, error) { - return tasks, nil - } - } -} - -// MakePingAllIPPortFactory builds a set of TaskRunner supporting a set of -// IP/port-pairs. -func MakePingAllIPPortFactory( - ports []uint16, - f func(*beat.Event, *net.IPAddr, uint16) error, -) func(*net.IPAddr) jobs.Job { - if len(ports) == 1 { - port := ports[0] - return MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error { - return f(event, ip, port) - }) - } - - return MakePingAllIPFactory(func(ip *net.IPAddr) []func(event *beat.Event) error { - funcs := make([]func(*beat.Event) error, len(ports)) - for i := range ports { - port := ports[i] - funcs[i] = func(event *beat.Event) error { - return f(event, ip, port) - } - } - return funcs - }) -} - // MakeByIPJob builds a new Job based on already known IP. Similar to // MakeByHostJob, the pingFactory will be used to build the tasks run by the job. //