Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: remove concurrent processing
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 4, 2023
1 parent cc52cb8 commit eeaeabd
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 84 deletions.
12 changes: 6 additions & 6 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
p := newPublisher(trCtx, publisher, true, r.log)
r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, true, p)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
}
Expand Down Expand Up @@ -120,7 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
p := newPublisher(trCtx, publisher, false, r.log)
r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, false, p)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
if len(ids) == 0 {
Expand Down Expand Up @@ -191,9 +191,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
if rf.isChain {
rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
} else {
r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, resps, true, p)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p)
}
n += p.eventCount()
}
Expand Down Expand Up @@ -544,7 +544,7 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
p := newChainProcessor(r, trCtx, publisher, chainIndex)
r.responseProcessors[0].startProcessingSeq(stdCtx, trCtx, initialResp, true, p)
r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p)
return p.eventCount()
}

Expand Down Expand Up @@ -703,7 +703,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
resps = intermediateResps
}
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
n += p.eventCount()
}

Expand Down
78 changes: 0 additions & 78 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,84 +209,6 @@ func (s stream) close() {
func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
trCtx.clearIntervalData()

go func() {
defer ch.close()
var npages int64

for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
for {
pageStartTime := time.Now()
page, hasNext, err := iter.next()
if err != nil {
ch.fail(err)
return
}

if !hasNext {
if i+1 != len(resps) {
break
}
return
}

respTrs := page.asTransformables(rp.log)

if len(respTrs) == 0 {
return
}

// last_response context object is updated here organically
trCtx.updateLastResponse(*page)
npages = page.page

rp.log.Debugf("last received page: %#v", trCtx.lastResponse)

for _, tr := range respTrs {
for _, t := range rp.transforms {
tr, err = t.run(trCtx, tr)
if err != nil {
ch.fail(err)
return
}
}

if rp.split == nil {
ch.event(stdCtx, tr.body())
rp.log.Debug("no split found: continuing")
continue
}

if err := rp.split.run(trCtx, tr, ch); err != nil {
switch err { //nolint:errorlint // run never returns a wrapped error.
case errEmptyField:
// nothing else to send for this page
rp.log.Debug("split operation finished")
case errEmptyRootField:
// root field not found, most likely the response is empty
rp.log.Debug(err)
default:
rp.log.Debug("split operation failed")
ch.fail(err)
return
}
}
}

rp.metrics.updatePageExecutionTime(pageStartTime)

if !paginate {
break
}
}
}
rp.metrics.updatePagesPerInterval(npages)
}()
}

func (rp *responseProcessor) startProcessingSeq(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
trCtx.clearIntervalData()

defer ch.close()
var npages int64

Expand Down

0 comments on commit eeaeabd

Please sign in to comment.