Skip to content

Commit

Permalink
Unify heartbeat jobs and tasks for simplicity (#9258)
Browse files Browse the repository at this point in the history
Unify heartbeat jobs and tasks for simplicity

In heartbeat we have separate notions of jobs and tasks.

This can make following the code and sharing the code among monitors confusing. This patch unifies the two. There is a tiny amount of extra execution overhead.

Tasks only passed a single `common.MapStr`, where we now pass a `*beat.Event`, however, the benefit is that we have a more consistent and simple codebase.

Also makes events arguments to Jobs rather than return values.

This lets us reason about concurrency more easily since jobs are now explicitly
about mutation, and accidentally sharing data between tasks is made more difficult.
  • Loading branch information
andrewvc committed Dec 19, 2018
1 parent 5d0d938 commit 37a5b1f
Show file tree
Hide file tree
Showing 18 changed files with 450 additions and 397 deletions.
46 changes: 23 additions & 23 deletions heartbeat/monitors/active/dialchain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs/transport"
)
Expand Down Expand Up @@ -96,7 +97,7 @@ func (b *Builder) AddLayer(l Layer) {
// Build create a new dialer, that will always use the constant address, no matter
// which address is used to connect using the dialer.
// The dialer chain will add per layer information to the given event.
func (b *Builder) Build(addr string, event common.MapStr) (transport.Dialer, error) {
func (b *Builder) Build(addr string, event *beat.Event) (transport.Dialer, error) {
// clone template, as multiple instance of a dialer can exist at the same time
dchain := b.template.Clone()

Expand All @@ -110,18 +111,16 @@ func (b *Builder) Build(addr string, event common.MapStr) (transport.Dialer, err

// Run executes the given function with a new dialer instance.
func (b *Builder) Run(
event *beat.Event,
addr string,
fn func(transport.Dialer) (common.MapStr, error),
) (common.MapStr, error) {
event := common.MapStr{}
fn func(*beat.Event, transport.Dialer) error,
) error {
dialer, err := b.Build(addr, event)
if err != nil {
return nil, err
return err
}

results, err := fn(dialer)
event.DeepUpdate(results)
return event, err
return fn(event, dialer)
}

// MakeDialerJobs creates a set of monitoring jobs. The jobs behavior depends
Expand All @@ -134,7 +133,7 @@ func MakeDialerJobs(
typ, scheme string,
endpoints []Endpoint,
mode monitors.IPSettings,
fn func(dialer transport.Dialer, addr string) (common.MapStr, error),
fn func(event *beat.Event, dialer transport.Dialer, addr string) error,
) ([]monitors.Job, error) {
var jobs []monitors.Job
for _, endpoint := range endpoints {
Expand All @@ -153,7 +152,7 @@ func makeEndpointJobs(
typ, scheme string,
endpoint Endpoint,
mode monitors.IPSettings,
fn func(transport.Dialer, string) (common.MapStr, error),
fn func(*beat.Event, transport.Dialer, string) error,
) ([]monitors.Job, error) {

fields := common.MapStr{
Expand All @@ -169,12 +168,10 @@ func makeEndpointJobs(
if b.resolveViaSocks5 {
jobs := make([]monitors.Job, len(endpoint.Ports))
for i, port := range endpoint.Ports {
jobName := jobName(typ, scheme, endpoint.Host, []uint16{port})
address := net.JoinHostPort(endpoint.Host, strconv.Itoa(int(port)))
settings := monitors.MakeJobSetting(jobName).WithFields(fields)
jobs[i] = monitors.MakeSimpleJob(settings, func() (common.MapStr, error) {
return b.Run(address, func(dialer transport.Dialer) (common.MapStr, error) {
return fn(dialer, address)
jobs[i] = monitors.MakeSimpleJob(func(event *beat.Event) error {
return b.Run(event, address, func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, address)
})
})
}
Expand All @@ -183,26 +180,29 @@ func makeEndpointJobs(

// Create job that first resolves one or multiple IP (depending on
// config.Mode) in order to create one continuation Task per IP.
jobName := jobName(typ, scheme, endpoint.Host, endpoint.Ports)
settings := monitors.MakeHostJobSettings(jobName, endpoint.Host, mode).WithFields(fields)
jobID := jobID(typ, scheme, endpoint.Host, endpoint.Ports)
settings := monitors.MakeHostJobSettings(jobID, endpoint.Host, mode)

job, err := monitors.MakeByHostJob(settings,
monitors.MakePingAllIPPortFactory(endpoint.Ports,
func(ip *net.IPAddr, port uint16) (common.MapStr, error) {
func(event *beat.Event, ip *net.IPAddr, port uint16) error {
// use address from resolved IP
portStr := strconv.Itoa(int(port))
ipAddr := net.JoinHostPort(ip.String(), portStr)
hostAddr := net.JoinHostPort(endpoint.Host, portStr)
return b.Run(ipAddr, func(dialer transport.Dialer) (common.MapStr, error) {
return fn(dialer, hostAddr)
})
cb := func(event *beat.Event, dialer transport.Dialer) error {
return fn(event, dialer, hostAddr)
}
err := b.Run(event, ipAddr, cb)
return err
}))
if err != nil {
return nil, err
}
return []monitors.Job{job}, nil
return []monitors.Job{monitors.WithJobId(jobID, monitors.WithFields(fields, job))}, nil
}

func jobName(typ, jobType, host string, ports []uint16) string {
func jobID(typ, jobType, host string, ports []uint16) string {
var h string
if len(ports) == 1 {
h = fmt.Sprintf("%v:%v", host, ports[0])
Expand Down
14 changes: 7 additions & 7 deletions heartbeat/monitors/active/dialchain/dialchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package dialchain

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/transport"
)

Expand All @@ -37,10 +37,10 @@ type DialerChain struct {

// NetDialer provides the most low-level network layer for setting up a network
// connection. NetDialer objects do not support wrapping any lower network layers.
type NetDialer func(common.MapStr) (transport.Dialer, error)
type NetDialer func(*beat.Event) (transport.Dialer, error)

// Layer is a configured network layer, wrapping any lower-level network layers.
type Layer func(common.MapStr, transport.Dialer) (transport.Dialer, error)
type Layer func(*beat.Event, transport.Dialer) (transport.Dialer, error)

// Clone create a shallow copy of c.
func (c *DialerChain) Clone() *DialerChain {
Expand All @@ -53,7 +53,7 @@ func (c *DialerChain) Clone() *DialerChain {
}

// Build create a new transport.Dialer for use with other networking libraries.
func (c *DialerChain) Build(event common.MapStr) (d transport.Dialer, err error) {
func (c *DialerChain) Build(event *beat.Event) (d transport.Dialer, err error) {
d, err = c.Net.build(event)
if err != nil {
return
Expand All @@ -77,14 +77,14 @@ func (c *DialerChain) AddLayer(l Layer) {
// TestBuild tries to build the DialerChain and reports any error reported by
// one of the layers.
func (c *DialerChain) TestBuild() error {
_, err := c.Build(common.MapStr{})
_, err := c.Build(&beat.Event{})
return err
}

func (d NetDialer) build(event common.MapStr) (transport.Dialer, error) {
func (d NetDialer) build(event *beat.Event) (transport.Dialer, error) {
return d(event)
}

func (l Layer) build(event common.MapStr, next transport.Dialer) (transport.Dialer, error) {
func (l Layer) build(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
return l(event, next)
}
8 changes: 5 additions & 3 deletions heartbeat/monitors/active/dialchain/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"time"

"github.com/elastic/beats/heartbeat/look"
"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/transport"
Expand Down Expand Up @@ -64,7 +66,7 @@ func UDPDialer(to time.Duration) NetDialer {
}

func netDialer(timeout time.Duration) NetDialer {
return func(event common.MapStr) (transport.Dialer, error) {
return func(event *beat.Event) (transport.Dialer, error) {
return makeDialer(func(network, address string) (net.Conn, error) {
namespace := ""

Expand All @@ -86,7 +88,7 @@ func netDialer(timeout time.Duration) NetDialer {
if err != nil || portNum < 0 || portNum > (1<<16) {
return nil, fmt.Errorf("invalid port number '%v' used", port)
}
event.DeepUpdate(common.MapStr{
monitors.MergeEventFields(event, common.MapStr{
namespace: common.MapStr{
"port": uint16(portNum),
},
Expand All @@ -108,7 +110,7 @@ func netDialer(timeout time.Duration) NetDialer {
}

end := time.Now()
event.DeepUpdate(common.MapStr{
monitors.MergeEventFields(event, common.MapStr{
namespace: common.MapStr{
"rtt": common.MapStr{
"connect": look.RTT(end.Sub(start)),
Expand Down
6 changes: 3 additions & 3 deletions heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"net"

"github.com/elastic/beats/heartbeat/look"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/transport"
)

Expand All @@ -35,7 +35,7 @@ import (
// }
// }
func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer

dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next))
Expand All @@ -48,7 +48,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer {
// TODO: add proxy url to event?

timer.stop()
event.Put("socks5.rtt.connect", look.RTT(timer.duration()))
event.Fields.Put("socks5.rtt.connect", look.RTT(timer.duration()))
return conn, nil
}), nil
}
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/active/dialchain/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

"github.com/elastic/beats/heartbeat/look"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/transport"
)

Expand All @@ -38,7 +38,7 @@ import (
// }
// }
func TLSLayer(cfg *transport.TLSConfig, to time.Duration) Layer {
return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer

// Wrap next dialer so to start the timer when 'next' returns.
Expand All @@ -58,7 +58,7 @@ func TLSLayer(cfg *transport.TLSConfig, to time.Duration) Layer {

// TODO: extract TLS connection parameters from connection object.
timer.stop()
event.Put("tls.rtt.handshake", look.RTT(timer.duration()))
event.PutValue("tls.rtt.handshake", look.RTT(timer.duration()))

// Pointers because we need a nil value
var chainNotValidBefore *time.Time
Expand All @@ -80,8 +80,8 @@ func TLSLayer(cfg *transport.TLSConfig, to time.Duration) Layer {
}
}

event.Put("tls.certificate_not_valid_before", *chainNotValidBefore)
event.Put("tls.certificate_not_valid_after", *chainNotValidAfter)
event.PutValue("tls.certificate_not_valid_before", *chainNotValidBefore)
event.PutValue("tls.certificate_not_valid_after", *chainNotValidAfter)

return conn, nil
}), nil
Expand Down
10 changes: 5 additions & 5 deletions heartbeat/monitors/active/dialchain/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"net"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/outputs/transport"
)

Expand All @@ -34,7 +34,7 @@ func IDLayer() Layer {
return _idLayer
}

var _idLayer = Layer(func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) {
var _idLayer = Layer(func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
return next, nil
})

Expand All @@ -43,7 +43,7 @@ var _idLayer = Layer(func(event common.MapStr, next transport.Dialer) (transport
func ConstAddrLayer(address string) Layer {
build := constAddr(address)

return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
return build(next), nil
}
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func constAddr(addr string) func(transport.Dialer) transport.Dialer {
}

func withNetDialer(layer NetDialer, fn func(transport.Dialer) transport.Dialer) NetDialer {
return func(event common.MapStr) (transport.Dialer, error) {
return func(event *beat.Event) (transport.Dialer, error) {
origDialer, err := layer.build(event)
if err != nil {
return nil, err
Expand All @@ -118,7 +118,7 @@ func withNetDialer(layer NetDialer, fn func(transport.Dialer) transport.Dialer)
}

func withLayerDialer(layer Layer, fn func(transport.Dialer) transport.Dialer) Layer {
return func(event common.MapStr, next transport.Dialer) (transport.Dialer, error) {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
origDialer, err := layer.build(event, next)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func init() {

var debugf = logp.MakeDebug("http")

// Create makes a new HTTP monitor
func create(
name string,
cfg *common.Config,
Expand Down Expand Up @@ -98,7 +99,8 @@ func create(
}
}

return jobs, len(config.URLs), nil
errWrappedJobs := monitors.WrapAll(jobs, monitors.WithErrAsField)
return errWrappedJobs, len(config.URLs), nil
}

func newRoundTripper(config *Config, tls *transport.TLSConfig) (*http.Transport, error) {
Expand Down
12 changes: 7 additions & 5 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ import (
"github.com/elastic/beats/libbeat/testing/mapvaltest"
)

func testRequest(t *testing.T, testURL string) beat.Event {
func testRequest(t *testing.T, testURL string) *beat.Event {
return testTLSRequest(t, testURL, nil)
}

// testTLSRequest tests the given request. certPath is optional, if given
// an empty string no cert will be set.
func testTLSRequest(t *testing.T, testURL string, extraConfig map[string]interface{}) beat.Event {
func testTLSRequest(t *testing.T, testURL string, extraConfig map[string]interface{}) *beat.Event {
configSrc := map[string]interface{}{
"urls": testURL,
"timeout": "1s",
Expand All @@ -66,15 +66,16 @@ func testTLSRequest(t *testing.T, testURL string, extraConfig map[string]interfa

job := jobs[0]

event, _, err := job.Run()
event := &beat.Event{}
_, err = job.Run(event)
require.NoError(t, err)

require.Equal(t, 1, endpoints)

return event
}

func checkServer(t *testing.T, handlerFunc http.HandlerFunc) (*httptest.Server, beat.Event) {
func checkServer(t *testing.T, handlerFunc http.HandlerFunc) (*httptest.Server, *beat.Event) {
server := httptest.NewServer(handlerFunc)
defer server.Close()
event := testRequest(t, server.URL)
Expand Down Expand Up @@ -237,7 +238,8 @@ func TestLargeResponse(t *testing.T) {

job := jobs[0]

event, _, err := job.Run()
event := &beat.Event{}
_, err = job.Run(event)
require.NoError(t, err)

port, err := hbtest.ServerPort(server)
Expand Down
Loading

0 comments on commit 37a5b1f

Please sign in to comment.