Skip to content

Commit

Permalink
[Heartbeat] Fix missing url.* fields on TCP NXDOMAIN (#10787)
Browse files Browse the repository at this point in the history
TCP checks are not adding URL fields on NXDOMAIN endpoints. This fixes that issue.

It does so by ensuring that URL metadata is added before executing the check, and not during, as done previously.

A side effect of this is that we now perform DNS lookups once per `{hostname,port}` instead of once per `{hostname}`. This is worth the increased simplicity however, as the code would be quite convoluted otherwise, which would put us at risk for more bugs.

Related (but different) 6.x issue: #10777

Fix import formatting
  • Loading branch information
andrewvc committed Mar 11, 2019
1 parent 7ee2049 commit 821baec
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 93 deletions.
9 changes: 8 additions & 1 deletion heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,17 @@ func TLSChecks(chainIndex, certIndex int, certificate *x509.Certificate) mapval.

// BaseChecks creates a skima.Validator that represents the "monitor" field present
// in all heartbeat events.
// If IP is set to "" this will check that the field is not present
func BaseChecks(ip string, status string, typ string) mapval.Validator {
var ipCheck mapval.IsDef
if len(ip) > 0 {
ipCheck = mapval.IsEqual(ip)
} else {
ipCheck = mapval.Optional(mapval.IsEqual(ip))
}
return mapval.MustCompile(mapval.Map{
"monitor": mapval.Map{
"ip": ip,
"ip": ipCheck,
"duration.us": mapval.IsDuration,
"status": status,
"id": mapval.IsNonEmptyString,
Expand Down
57 changes: 30 additions & 27 deletions heartbeat/monitors/active/dialchain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package dialchain

import (
"fmt"
"net"
"strconv"
"net/url"
"time"

"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/transport"
)
Expand Down Expand Up @@ -136,59 +138,60 @@ func MakeDialerJobs(
) ([]jobs.Job, error) {
var jobs []jobs.Job
for _, endpoint := range endpoints {
endpointJobs, err := makeEndpointJobs(b, scheme, endpoint, mode, fn)
if err != nil {
return nil, err
for _, port := range endpoint.Ports {
endpointURL, err := url.Parse(fmt.Sprintf("%s://%s:%d", scheme, endpoint.Host, port))
if err != nil {
return nil, err
}
endpointJob, err := makeEndpointJob(b, endpointURL, mode, fn)
if err != nil {
return nil, err
}
jobs = append(jobs, wrappers.WithURLField(endpointURL, endpointJob))
}
jobs = append(jobs, endpointJobs...)

}

return jobs, nil
}

func makeEndpointJobs(
func makeEndpointJob(
b *Builder,
scheme string,
endpoint Endpoint,
endpointURL *url.URL,
mode monitors.IPSettings,
fn func(*beat.Event, transport.Dialer, string) error,
) ([]jobs.Job, error) {
) (jobs.Job, error) {

// Check if SOCKS5 is configured, with relying on the socks5 proxy
// in resolving the actual IP.
// Create one job for every port number configured.
if b.resolveViaSocks5 {
js := make([]jobs.Job, len(endpoint.Ports))
for i, port := range endpoint.Ports {
address := net.JoinHostPort(endpoint.Host, strconv.Itoa(int(port)))
js[i] = jobs.MakeSimpleJob(func(event *beat.Event) error {
return b.Run(event, address, func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, address)
return wrappers.WithURLField(endpointURL,
jobs.MakeSimpleJob(func(event *beat.Event) error {
hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port())
return b.Run(event, hostPort, func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, hostPort)
})
})
}
return js, nil
})), nil
}

// Create job that first resolves one or multiple IP (depending on
// config.Mode) in order to create one continuation Task per IP.
settings := monitors.MakeHostJobSettings(endpoint.Host, mode)
settings := monitors.MakeHostJobSettings(endpointURL.Hostname(), mode)

job, err := monitors.MakeByHostJob(settings,
monitors.MakePingAllIPPortFactory(endpoint.Ports,
func(event *beat.Event, ip *net.IPAddr, port uint16) error {
monitors.MakePingIPFactory(
func(event *beat.Event, ip *net.IPAddr) error {
// use address from resolved IP
portStr := strconv.Itoa(int(port))
ipAddr := net.JoinHostPort(ip.String(), portStr)
hostAddr := net.JoinHostPort(endpoint.Host, portStr)
ipPort := net.JoinHostPort(ip.String(), endpointURL.Port())
cb := func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, hostAddr)
return fn(event, dialer, ipPort)
}
err := b.Run(event, ipAddr, cb)
err := b.Run(event, ipPort, cb)
return err
}))
if err != nil {
return nil, err
}
return []jobs.Job{job}, nil
return job, nil
}
9 changes: 0 additions & 9 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"strconv"
"strings"

"github.com/elastic/beats/heartbeat/eventext"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/monitors/active/dialchain"
"github.com/elastic/beats/heartbeat/monitors/jobs"
"github.com/elastic/beats/heartbeat/monitors/wrappers"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
Expand Down Expand Up @@ -91,13 +89,6 @@ func create(

epJobs, err := dialchain.MakeDialerJobs(db, scheme, eps, config.Mode,
func(event *beat.Event, dialer transport.Dialer, addr string) error {
u, err := url.Parse(fmt.Sprintf("%s://%s", scheme, addr))
if err != nil {
return err
}

eventext.MergeEventFields(event, common.MapStr{"url": wrappers.URLFields(u)})

return pingHost(event, dialer, addr, timeout, validator)
})
if err != nil {
Expand Down
28 changes: 20 additions & 8 deletions heartbeat/monitors/active/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ func setupServer(t *testing.T, serverCreator func(http.Handler) *httptest.Server
return server, port
}

func tcpMonitorChecks(host string, ip string, port uint16, status string) mapval.Validator {
return hbtest.BaseChecks(ip, status, "tcp")
}

func TestUpEndpointJob(t *testing.T) {
server, port := setupServer(t, httptest.NewServer)
defer server.Close()
Expand Down Expand Up @@ -174,7 +170,7 @@ func TestConnectionRefusedEndpointJob(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(ip, ip, port, "down"),
hbtest.BaseChecks(ip, "down", "tcp"),
hbtest.SummaryChecks(0, 1),
hbtest.SimpleURLChecks(t, "tcp", ip, port),
hbtest.ErrorChecks(dialErr, "io"),
Expand All @@ -192,7 +188,7 @@ func TestUnreachableEndpointJob(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(ip, ip, port, "down"),
hbtest.BaseChecks(ip, "down", "tcp"),
hbtest.SummaryChecks(0, 1),
hbtest.SimpleURLChecks(t, "tcp", ip, port),
hbtest.ErrorChecks(dialErr, "io"),
Expand All @@ -219,7 +215,7 @@ func TestCheckUp(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(host, ip, port, "up"),
hbtest.BaseChecks(ip, "up", "tcp"),
hbtest.RespondingTCPChecks(),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.SummaryChecks(1, 0),
Expand Down Expand Up @@ -255,7 +251,7 @@ func TestCheckDown(t *testing.T) {
mapval.Test(
t,
mapval.Strict(mapval.Compose(
tcpMonitorChecks(host, ip, port, "down"),
hbtest.BaseChecks(ip, "down", "tcp"),
hbtest.RespondingTCPChecks(),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.SummaryChecks(0, 1),
Expand All @@ -272,6 +268,22 @@ func TestCheckDown(t *testing.T) {
"message": "received string mismatch",
},
}),
)), event.Fields)
}

func TestNXDomainJob(t *testing.T) {
host := "notadomainatallforsure.notadomain.notatldreally"
port := uint16(1234)
event := testTCPCheck(t, host, port)

dialErr := fmt.Sprintf("lookup %s", host)
mapval.Test(
t,
mapval.Strict(mapval.Compose(
hbtest.BaseChecks("", "down", "tcp"),
hbtest.SummaryChecks(0, 1),
hbtest.SimpleURLChecks(t, "tcp", host, port),
hbtest.ErrorChecks(dialErr, "io"),
)),
event.Fields,
)
Expand Down
48 changes: 0 additions & 48 deletions heartbeat/monitors/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,54 +100,6 @@ func MakePingIPFactory(
}
}

// MakePingAllIPFactory wraps a function for building a recursive Task Runner from function callbacks.
func MakePingAllIPFactory(
f func(*net.IPAddr) []func(*beat.Event) error,
) func(*net.IPAddr) jobs.Job {
return func(ip *net.IPAddr) jobs.Job {
cont := f(ip)
switch len(cont) {
case 0:
return emptyTask
case 1:
return MakeSimpleCont(cont[0])
}

tasks := make([]jobs.Job, len(cont))
for i, c := range cont {
tasks[i] = MakeSimpleCont(c)
}
return func(event *beat.Event) ([]jobs.Job, error) {
return tasks, nil
}
}
}

// MakePingAllIPPortFactory builds a set of TaskRunner supporting a set of
// IP/port-pairs.
func MakePingAllIPPortFactory(
ports []uint16,
f func(*beat.Event, *net.IPAddr, uint16) error,
) func(*net.IPAddr) jobs.Job {
if len(ports) == 1 {
port := ports[0]
return MakePingIPFactory(func(event *beat.Event, ip *net.IPAddr) error {
return f(event, ip, port)
})
}

return MakePingAllIPFactory(func(ip *net.IPAddr) []func(event *beat.Event) error {
funcs := make([]func(*beat.Event) error, len(ports))
for i := range ports {
port := ports[i]
funcs[i] = func(event *beat.Event) error {
return f(event, ip, port)
}
}
return funcs
})
}

// MakeByIPJob builds a new Job based on already known IP. Similar to
// MakeByHostJob, the pingFactory will be used to build the tasks run by the job.
//
Expand Down

0 comments on commit 821baec

Please sign in to comment.