Skip to content

Commit

Permalink
Cherry-pick #19717 to 7.x: Convert cloudfoundry input to v2 (#19779)
Browse files Browse the repository at this point in the history
(cherry picked from commit b9cb9e4)
  • Loading branch information
Steffen Siering committed Jul 9, 2020
1 parent 099f318 commit 1f7cac3
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 199 deletions.
1 change: 0 additions & 1 deletion x-pack/filebeat/include/list.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 30 additions & 28 deletions x-pack/filebeat/input/cloudfoundry/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,51 @@ package cloudfoundry

import (
"fmt"
"time"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/feature"

"github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry"
)

func init() {
err := input.Register("cloudfoundry", NewInput)
if err != nil {
panic(err)
}
type cloudfoundryEvent interface {
Timestamp() time.Time
ToFields() common.MapStr
}

// NewInput creates a new udp input
func NewInput(
cfg *common.Config,
outlet channel.Connector,
context input.Context,
) (input.Input, error) {
cfgwarn.Beta("The cloudfoundry input is beta")

log := logp.NewLogger("cloudfoundry")

out, err := outlet.Connect(cfg)
if err != nil {
return nil, err
func Plugin() v2.Plugin {
return v2.Plugin{
Name: "cloudfoundry",
Stability: feature.Beta,
Deprecated: false,
Info: "collect logs from cloudfoundry loggregator",
Manager: stateless.NewInputManager(configure),
}
}

var conf cloudfoundry.Config
if err = cfg.Unpack(&conf); err != nil {
func configure(cfg *common.Config) (stateless.Input, error) {
config := cloudfoundry.Config{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

switch conf.Version {
switch config.Version {
case cloudfoundry.ConsumerVersionV1:
return newInputV1(log, conf, out, context)
return configureV1(config)
case cloudfoundry.ConsumerVersionV2:
return newInputV2(log, conf, out, context)
return configureV2(config)
default:
return nil, fmt.Errorf("not supported consumer version: %s", conf.Version)
return nil, fmt.Errorf("not supported consumer version: %s", config.Version)
}
}

func createEvent(evt cloudfoundryEvent) beat.Event {
return beat.Event{
Timestamp: evt.Timestamp(),
Fields: evt.ToFields(),
}
}
78 changes: 33 additions & 45 deletions x-pack/filebeat/input/cloudfoundry/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ import (
"context"
"crypto/tls"
"net/http"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing"
cftest "github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry/test"
)

Expand All @@ -40,66 +41,53 @@ func testInput(t *testing.T, version string) {
config := common.MustNewConfigFrom(cftest.GetConfigFromEnv(t))
config.SetString("version", -1, version)

input, err := Plugin().Manager.Create(config)
require.NoError(t, err)

var wg sync.WaitGroup
defer wg.Wait()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Ensure that there is something happening in the firehose
apiAddress, err := config.String("api_address", -1)
require.NoError(t, err)

// Ensure that there is something happening in the firehose
go makeApiRequests(t, ctx, apiAddress)

events := make(chan beat.Event)
connector := channel.ConnectorFunc(func(*common.Config, beat.ClientConfig) (channel.Outleter, error) {
return newOutleter(events), nil
})

inputCtx := input.Context{Done: make(chan struct{})}
ch := make(chan beat.Event)
client := &pubtest.FakeClient{
PublishFunc: func(evt beat.Event) {
if ctx.Err() != nil {
return
}

select {
case ch <- evt:
case <-ctx.Done():
}
},
}

input, err := NewInput(config, connector, inputCtx)
require.NoError(t, err)
wg.Add(1)
go func() {
defer wg.Done()

go input.Run()
defer input.Stop()
inputCtx := v2.Context{
Logger: logp.NewLogger("test"),
Cancelation: ctx,
}
input.Run(inputCtx, pubtest.ConstClient(client))
}()

select {
case e := <-events:
case e := <-ch:
t.Logf("Event received: %+v", e)
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for events")
}
}

type outleter struct {
events chan<- beat.Event
done chan struct{}
}

func newOutleter(events chan<- beat.Event) *outleter {
return &outleter{
events: events,
done: make(chan struct{}),
}
}

func (o *outleter) Close() error {
close(o.done)
return nil
}

func (o *outleter) Done() <-chan struct{} {
return o.done
}

func (o *outleter) OnEvent(e beat.Event) bool {
select {
case o.events <- e:
return true
default:
return false
}
}

func makeApiRequests(t *testing.T, ctx context.Context, address string) {
client := &http.Client{
Transport: &http.Transport{
Expand Down
94 changes: 35 additions & 59 deletions x-pack/filebeat/input/cloudfoundry/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,83 +5,59 @@
package cloudfoundry

import (
"sync"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/harvester"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/x-pack/libbeat/common/cloudfoundry"
"github.com/elastic/go-concert/ctxtool"
)

// InputV1 defines a udp input to receive event on a specific host:port.
type InputV1 struct {
sync.Mutex
consumer *cloudfoundry.DopplerConsumer
started bool
log *logp.Logger
outlet channel.Outleter
// inputV1 defines a udp input to receive event on a specific host:port.
type inputV1 struct {
config cloudfoundry.Config
}

func configureV1(config cloudfoundry.Config) (*inputV1, error) {
return &inputV1{config: config}, nil
}

func newInputV1(log *logp.Logger, conf cloudfoundry.Config, out channel.Outleter, context input.Context) (*InputV1, error) {
hub := cloudfoundry.NewHub(&conf, "filebeat", log)
forwarder := harvester.NewForwarder(out)
func (i *inputV1) Name() string { return "cloudfoundry-v1" }

func (i *inputV1) Test(ctx v2.TestContext) error {
hub := cloudfoundry.NewHub(&i.config, "filebeat", ctx.Logger)
_, err := hub.Client()
return err
}

func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {
log := ctx.Logger
hub := cloudfoundry.NewHub(&i.config, "filebeat", log)

log.Info("Starting cloudfoundry input")
defer log.Info("Stopped cloudfoundry input")

callbacks := cloudfoundry.DopplerCallbacks{
Log: func(evt cloudfoundry.Event) {
forwarder.Send(beat.Event{
Timestamp: evt.Timestamp(),
Fields: evt.ToFields(),
})
publisher.Publish(createEvent(evt))
},
Error: func(evt cloudfoundry.EventError) {
forwarder.Send(beat.Event{
Timestamp: evt.Timestamp(),
Fields: evt.ToFields(),
})
publisher.Publish(createEvent(&evt))
},
}

consumer, err := hub.DopplerConsumer(callbacks)
if err != nil {
return nil, errors.Wrapf(err, "initializing doppler consumer")
return errors.Wrapf(err, "initializing doppler consumer")
}
return &InputV1{
outlet: out,
consumer: consumer,
started: false,
log: log,
}, nil
}

// Run starts the consumer of cloudfoundry events
func (p *InputV1) Run() {
p.Lock()
defer p.Unlock()

if !p.started {
p.log.Info("starting cloudfoundry input")
p.consumer.Run()
p.started = true
}
}

// Stop stops cloudfoundry doppler consumer
func (p *InputV1) Stop() {
defer p.outlet.Close()
p.Lock()
defer p.Unlock()

p.log.Info("stopping cloudfoundry input")
p.consumer.Stop()
p.started = false
}
stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
// wait stops the consumer and waits for all internal go-routines to be stopped.
consumer.Wait()
})
defer cancel()

// Wait waits for the input to finalize, and stops it
func (p *InputV1) Wait() {
p.Stop()
p.consumer.Wait()
consumer.Run()
<-stopCtx.Done()
return nil
}
Loading

0 comments on commit 1f7cac3

Please sign in to comment.