diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index 9bfd6d9c4b9..2b8b24c2213 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) p := newPublisher(trCtx, publisher, true, r.log) - r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, true, p) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p) n = p.eventCount() continue } @@ -120,7 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values p := newPublisher(trCtx, publisher, false, r.log) - r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, false, p) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p) n = p.eventCount() } else { if len(ids) == 0 { @@ -191,9 +191,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) if rf.isChain { - rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) } else { - r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, resps, true, p) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p) } n += p.eventCount() } @@ -544,7 +544,7 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it p := newChainProcessor(r, trCtx, publisher, chainIndex) - r.responseProcessors[0].startProcessingSeq(stdCtx, trCtx, initialResp, true, p) + r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p) return p.eventCount() } @@ -703,7 +703,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * resps = intermediateResps } p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) - rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) n += p.eventCount() } diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index e744069d272..56ea94e4461 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -209,84 +209,6 @@ func (s stream) close() { func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { trCtx.clearIntervalData() - go func() { - defer ch.close() - var npages int64 - - for i, httpResp := range resps { - iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) - for { - pageStartTime := time.Now() - page, hasNext, err := iter.next() - if err != nil { - ch.fail(err) - return - } - - if !hasNext { - if i+1 != len(resps) { - break - } - return - } - - respTrs := page.asTransformables(rp.log) - - if len(respTrs) == 0 { - return - } - - // last_response context object is updated here organically - trCtx.updateLastResponse(*page) - npages = page.page - - rp.log.Debugf("last received page: %#v", trCtx.lastResponse) - - for _, tr := range respTrs { - for _, t := range rp.transforms { - tr, err = t.run(trCtx, tr) - if err != nil { - ch.fail(err) - return - } - } - - if rp.split == nil { - ch.event(stdCtx, tr.body()) - rp.log.Debug("no split found: continuing") - continue - } - - if err := rp.split.run(trCtx, tr, ch); err != nil { - switch err { //nolint:errorlint // run never returns a wrapped error. - case errEmptyField: - // nothing else to send for this page - rp.log.Debug("split operation finished") - case errEmptyRootField: - // root field not found, most likely the response is empty - rp.log.Debug(err) - default: - rp.log.Debug("split operation failed") - ch.fail(err) - return - } - } - } - - rp.metrics.updatePageExecutionTime(pageStartTime) - - if !paginate { - break - } - } - } - rp.metrics.updatePagesPerInterval(npages) - }() -} - -func (rp *responseProcessor) startProcessingSeq(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { - trCtx.clearIntervalData() - defer ch.close() var npages int64