From 334b4ee7e293ac1e9e23d0c7e1c16a6f9e162483 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 4 Sep 2023 15:00:19 +0930 Subject: [PATCH] x-pack/filebeat/input/httpjson: clean up scaffolding --- x-pack/filebeat/input/httpjson/request.go | 20 --------------- x-pack/filebeat/input/httpjson/response.go | 23 ----------------- x-pack/filebeat/input/httpjson/split_test.go | 26 ++++++++++++++------ 3 files changed, 18 insertions(+), 51 deletions(-) diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index d8952896a99..6a1d926ab40 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -610,8 +610,6 @@ func (p *chainProcessor) eventCount() int { return p.n } -func (*chainProcessor) close() {} - // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input // //nolint:bodyclose // response body is closed through drainBody method @@ -742,22 +740,6 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo } } -// processAndPublishEvents process and publish events based on event type -func (p *publisher) processAndPublishEvents(events stream) { - for maybeMsg := range events.ch { - p.processAndPublishEvent(maybeMsg) - } -} - -// processAndPublishEvent processes and publishes one events based on event type -func (p *publisher) processAndPublishEvent(evt maybeMsg) { - if evt.failed() { - p.fail(evt.err) - return - } - p.event(nil, evt.msg) -} - func (p *publisher) event(_ context.Context, msg mapstr.M) { if p.pub != nil { event, err := makeEvent(msg) @@ -788,8 +770,6 @@ func (p *publisher) eventCount() int { return p.n } -func (p *publisher) close() {} - const ( // This is generally updated with chain responses, if present, as they continue to occur // Otherwise this is always the last response of the root request w.r.t pagination diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 56ea94e4461..2b8c5de5f4e 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -183,35 +183,12 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe type sendStream interface { event(context.Context, mapstr.M) fail(error) - close() -} - -type stream struct { - ch chan maybeMsg -} - -func newStream() stream { - return stream{make(chan maybeMsg)} -} - -func (s stream) event(_ context.Context, e mapstr.M) { - s.ch <- maybeMsg{msg: e} -} - -func (s stream) fail(err error) { - s.ch <- maybeMsg{err: err} -} - -func (s stream) close() { - close(s.ch) } func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { trCtx.clearIntervalData() - defer ch.close() var npages int64 - for i, httpResp := range resps { iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) for { diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go index fa390061c9a..10f7a40567d 100644 --- a/x-pack/filebeat/input/httpjson/split_test.go +++ b/x-pack/filebeat/input/httpjson/split_test.go @@ -5,6 +5,7 @@ package httpjson import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -702,9 +703,8 @@ func TestSplit(t *testing.T) { } for _, tc := range cases { - tc := tc t.Run(tc.name, func(t *testing.T) { - events := stream{make(chan maybeMsg, len(tc.expectedMessages))} + events := &stream{t: t} split, err := newSplitResponse(tc.config, logp.NewLogger("")) assert.NoError(t, err) err = split.run(tc.ctx, tc.resp, events) @@ -713,13 +713,23 @@ func TestSplit(t *testing.T) { } else { assert.EqualError(t, err, tc.expectedErr.Error()) } - events.close() - assert.Equal(t, len(tc.expectedMessages), len(events.ch)) - for _, msg := range tc.expectedMessages { - e := <-events.ch - assert.NoError(t, e.err) - assert.Equal(t, msg.Flatten(), e.msg.Flatten()) + assert.Equal(t, len(tc.expectedMessages), len(events.collected)) + for i, msg := range tc.expectedMessages { + assert.Equal(t, msg.Flatten(), events.collected[i].Flatten()) } }) } } + +type stream struct { + collected []mapstr.M + t *testing.T +} + +func (s *stream) event(_ context.Context, msg mapstr.M) { + s.collected = append(s.collected, msg) +} + +func (s *stream) fail(err error) { + s.t.Errorf("fail: %v", err) +}