Skip to content

Commit

Permalink
[Rollup] Add more diagnostic stats to job (#35471)
Browse files Browse the repository at this point in the history
This adds some new statistics to the job to help with debugging
performance issues:

- Total search and index time (in milliseconds) encounteed by the indexer
during runtime.  This time is the total service time including
transfer between nodes, not just the `took` time.

- Total count of search and index requests.  Together with the total
times, this can be used to determine average request time.

- Count of search/bulk failures encountered during
runtime.  This information is also in the log, but a runtime counter
will help expose problems faster
  • Loading branch information
polyfractal committed Nov 27, 2018
1 parent f2deb9a commit 6f2b2d9
Show file tree
Hide file tree
Showing 13 changed files with 332 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Response from rollup's get jobs api.
Expand All @@ -51,6 +51,12 @@ public class GetRollupJobResponse {
static final ParseField STATE = new ParseField("job_state");
static final ParseField CURRENT_POSITION = new ParseField("current_position");
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
static final ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
static final ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
static final ParseField INDEX_TOTAL = new ParseField("index_total");
static final ParseField SEARCH_TOTAL = new ParseField("search_total");
static final ParseField SEARCH_FAILURES = new ParseField("search_failures");
static final ParseField INDEX_FAILURES = new ParseField("index_failures");

private List<JobWrapper> jobs;

Expand Down Expand Up @@ -181,12 +187,25 @@ public static class RollupIndexerJobStats {
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
private long indexTime;
private long indexTotal;
private long searchTime;
private long searchTotal;
private long indexFailures;
private long searchFailures;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
this.indexTime = indexTime;
this.indexTotal = indexTotal;
this.searchTime = searchTime;
this.searchTotal = searchTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}

/**
Expand Down Expand Up @@ -217,15 +236,65 @@ public long getOutputDocuments() {
return numOuputDocuments;
}

/**
* Number of failures that have occurred during the bulk indexing phase of Rollup
*/
public long getIndexFailures() {
return indexFailures;
}

/**
* Number of failures that have occurred during the search phase of Rollup
*/
public long getSearchFailures() {
return searchFailures;
}

/**
* Returns the time spent indexing (cumulative) in milliseconds
*/
public long getIndexTime() {
return indexTime;
}

/**
* Returns the time spent searching (cumulative) in milliseconds
*/
public long getSearchTime() {
return searchTime;
}

/**
* Returns the total number of indexing requests that have been sent by the rollup job
* (Note: this is not the number of _documents_ that have been indexed)
*/
public long getIndexTotal() {
return indexTotal;
}

/**
* Returns the total number of search requests that have been sent by the rollup job
*/
public long getSearchTotal() {
return searchTotal;
}

private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
STATS.getPreferredName(),
true,
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

@Override
Expand All @@ -234,22 +303,35 @@ public boolean equals(Object other) {
if (other == null || getClass() != other.getClass()) return false;
RollupIndexerJobStats that = (RollupIndexerJobStats) other;
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.indexTotal, that.indexTotal);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
}

@Override
public final String toString() {
return "{pages=" + numPages
+ ", input_docs=" + numInputDocuments
+ ", output_docs=" + numOuputDocuments
+ ", invocations=" + numInvocations + "}";
+ ", invocations=" + numInvocations
+ ", index_failures=" + indexFailures
+ ", search_failures=" + searchFailures
+ ", index_time_in_ms=" + indexTime
+ ", index_total=" + indexTotal
+ ", search_time_in_ms=" + searchTime
+ ", search_total=" + searchTotal+ "}";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private GetRollupJobResponse createTestInstance() {
}

private RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomLong(), randomLong(), randomLong(), randomLong());
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
}

private RollupJobStatus randomStatus() {
Expand Down Expand Up @@ -115,6 +117,13 @@ public void toXContent(RollupIndexerJobStats stats, XContentBuilder builder, ToX
builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
builder.field(GetRollupJobResponse.INDEX_TIME_IN_MS.getPreferredName(), stats.getIndexTime());
builder.field(GetRollupJobResponse.INDEX_TOTAL.getPreferredName(), stats.getIndexTotal());
builder.field(GetRollupJobResponse.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
builder.field(GetRollupJobResponse.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(GetRollupJobResponse.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(GetRollupJobResponse.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.endObject();
}

}
24 changes: 21 additions & 3 deletions docs/reference/rollup/apis/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
}
]
Expand Down Expand Up @@ -221,7 +227,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
},
{
Expand Down Expand Up @@ -270,7 +282,13 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"index_failures": 0,
"index_time_in_ms": 0,
"index_total": 0,
"search_failures": 0,
"search_time_in_ms": 0,
"search_total": 0
}
}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,10 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
// fire off the search. Note this is async, the method will return from here
executor.execute(() -> {
try {
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, exc -> finishWithFailure(exc)));
stats.markStartSearch();
doNextSearch(buildSearchRequest(), ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure));
} catch (Exception e) {
finishWithFailure(e);
finishWithSearchFailure(e);
}
});
logger.debug("Beginning to index [" + getJobId() + "], state: [" + currentState + "]");
Expand Down Expand Up @@ -256,7 +257,13 @@ public synchronized boolean maybeTriggerAsyncJob(long now) {
*/
protected abstract void onAbort();

private void finishWithFailure(Exception exc) {
private void finishWithSearchFailure(Exception exc) {
stats.incrementSearchFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}

private void finishWithIndexingFailure(Exception exc) {
stats.incrementIndexingFailures();
doSaveState(finishAndSetState(), position.get(), () -> onFailure(exc));
}

Expand Down Expand Up @@ -291,6 +298,7 @@ private IndexerState finishAndSetState() {
}

private void onSearchResponse(SearchResponse searchResponse) {
stats.markEndSearch();
try {
if (checkState(getState()) == false) {
return;
Expand Down Expand Up @@ -320,6 +328,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
// TODO this might be a valid case, e.g. if implementation filters
assert bulkRequest.requests().size() > 0;

stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
// resume the failing buckets ?
Expand All @@ -335,24 +344,24 @@ private void onSearchResponse(SearchResponse searchResponse) {
position.set(newPosition);

onBulkResponse(bulkResponse, newPosition);
}, exc -> finishWithFailure(exc)));
}, this::finishWithIndexingFailure));
} catch (Exception e) {
finishWithFailure(e);
finishWithSearchFailure(e);
}
}

private void onBulkResponse(BulkResponse response, JobPosition position) {
stats.markEndIndexing();
try {

ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithFailure);
ActionListener<SearchResponse> listener = ActionListener.wrap(this::onSearchResponse, this::finishWithSearchFailure);
// TODO probably something more intelligent than every-50 is needed
if (stats.getNumPages() > 0 && stats.getNumPages() % 50 == 0) {
doSaveState(IndexerState.INDEXING, position, () -> doNextSearch(buildSearchRequest(), listener));
} else {
doNextSearch(buildSearchRequest(), listener);
}
} catch (Exception e) {
finishWithFailure(e);
finishWithIndexingFailure(e);
}
}

Expand Down
Loading

0 comments on commit 6f2b2d9

Please sign in to comment.