Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Rollup] Add more diagnostic stats to job #35471

Merged
merged 5 commits into from
Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.Objects;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;

import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static java.util.Collections.unmodifiableList;
import static java.util.stream.Collectors.joining;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;

/**
* Response from rollup's get jobs api.
Expand All @@ -51,6 +51,15 @@ public class GetRollupJobResponse {
static final ParseField STATE = new ParseField("job_state");
static final ParseField CURRENT_POSITION = new ParseField("current_position");
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id");
static final ParseField BULK_LATENCY = new ParseField("bulk_latency_in_ms");
static final ParseField SEARCH_LATENCY = new ParseField("search_latency_in_ms");
static final ParseField SEARCH_FAILURES = new ParseField("search_failures");
static final ParseField BULK_FAILURES = new ParseField("bulk_failures");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think it would be nicer to call it INDEX_FAILURES, BULK is an implementation detail about how indexing is internally implemented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++

static final ParseField MIN = new ParseField("min");
static final ParseField MAX = new ParseField("max");
static final ParseField AVG = new ParseField("avg");
static final ParseField COUNT = new ParseField("count");
static final ParseField TOTAL = new ParseField("total");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be consistent with the _stats API can we call these bulk_time_in_millis and query_time_in_millis ? I am also not sure if we need the min, the max and the avg. It should be enough to have the total time spent in these operations and the number of calls per action ?

Copy link
Contributor

@hendrikmuhs hendrikmuhs Nov 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polyfractal Are MIN, ..., ..., TOTAL leftovers from previous iterations? They look unused to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right you are!


private List<JobWrapper> jobs;

Expand Down Expand Up @@ -181,12 +190,22 @@ public static class RollupIndexerJobStats {
private final long numInputDocuments;
private final long numOuputDocuments;
private final long numInvocations;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) {
private StatsAccumulator bulkLatency;
private StatsAccumulator searchLatency;
private long bulkFailures;
private long searchFailures;

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
StatsAccumulator bulkLatency, StatsAccumulator searchLatency, long bulkFailures,
long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOuputDocuments;
this.numInvocations = numInvocations;
this.bulkLatency = bulkLatency;
this.searchLatency = searchLatency;
this.bulkFailures = bulkFailures;
this.searchFailures = searchFailures;
}

/**
Expand Down Expand Up @@ -217,15 +236,50 @@ public long getOutputDocuments() {
return numOuputDocuments;
}

/**
* Number of failures that have occurred during the bulk indexing phase of Rollup
*/
public long getBulkFailures() {
return bulkFailures;
}

/**
* Number of failures that have occurred during the search phase of Rollup
*/
public long getSearchFailures() {
return searchFailures;
}

/**
* Returns an object which contains latency stats (min/max/avg/count) for the bulk
* indexing phase of Rollup
*/
public StatsAccumulator getBulkLatency() {
return bulkLatency;
}

/**
* Returns an object which contains latency stats (min/max/avg/count) for the
* search phase of Rollup
*/
public StatsAccumulator getSearchLatency() {
return searchLatency;
}

private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
STATS.getPreferredName(),
true,
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3]));
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(StatsAccumulator) args[4], (StatsAccumulator) args[5], (long) args[6], (long) args[7]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS);
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
PARSER.declareObject(constructorArg(), StatsAccumulator.PARSER::apply, BULK_LATENCY);
PARSER.declareObject(constructorArg(), StatsAccumulator.PARSER::apply, SEARCH_LATENCY);
PARSER.declareLong(constructorArg(), BULK_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}

@Override
Expand All @@ -236,20 +290,29 @@ public boolean equals(Object other) {
return Objects.equals(this.numPages, that.numPages)
&& Objects.equals(this.numInputDocuments, that.numInputDocuments)
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments)
&& Objects.equals(this.numInvocations, that.numInvocations);
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.bulkLatency, that.bulkLatency)
&& Objects.equals(this.searchLatency, that.searchLatency)
&& Objects.equals(this.bulkFailures, that.bulkFailures)
&& Objects.equals(this.searchFailures, that.searchFailures);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations);
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
bulkLatency, searchLatency, bulkFailures, searchFailures);
}

@Override
public final String toString() {
return "{pages=" + numPages
+ ", input_docs=" + numInputDocuments
+ ", output_docs=" + numOuputDocuments
+ ", invocations=" + numInvocations + "}";
+ ", invocations=" + numInvocations
+ ", bulk_failures=" + bulkFailures
+ ", search_failures=" + searchFailures
+ ", bulk_latency=" + bulkLatency
+ ", search_latency=" + searchLatency + "}";
}
}

Expand Down Expand Up @@ -371,4 +434,89 @@ String value() {
return name().toLowerCase(Locale.ROOT);
}
}

public static class StatsAccumulator {

private static final String NAME = "stats_accumulator";
private static final ParseField MIN = new ParseField("min");
private static final ParseField MAX = new ParseField("max");
private static final ParseField AVG = new ParseField("avg");
private static final ParseField COUNT = new ParseField("count");
private static final ParseField TOTAL = new ParseField("total");

public static final ConstructingObjectParser<StatsAccumulator, Void> PARSER =
new ConstructingObjectParser<>(NAME, true,
args -> new StatsAccumulator((long) args[0], (long) args[1], (long) args[2], (long) args[3]));

static {
PARSER.declareLong(constructorArg(), COUNT);
PARSER.declareLong(constructorArg(), TOTAL);
PARSER.declareLong(constructorArg(), MIN);
PARSER.declareLong(constructorArg(), MAX);
PARSER.declareLong(constructorArg(), AVG); // We parse but don't actually use the avg
}

private long count;
private long total;
private long min;
private long max;

StatsAccumulator(long count, long total, long min, long max) {
this.count = count;
this.total = total;
this.min = min;
this.max = max;
}

public long getCount() {
return count;
}

public long getMin() {
return count == 0 ? 0 : min;
}

public long getMax() {
return count == 0 ? 0 : max;
}

public double getAvg() {
return count == 0 ? 0.0 : (double) total / (double) count;
}

public long getTotal() {
return total;
}

@Override
public int hashCode() {
return Objects.hash(count, total, min, max);
}

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}

if (getClass() != obj.getClass()) {
return false;
}

StatsAccumulator other = (StatsAccumulator) obj;
return Objects.equals(count, other.count)
&& Objects.equals(total, other.total)
&& Objects.equals(min, other.min)
&& Objects.equals(max, other.max);
}

@Override
public final String toString() {
return "{count=" + getCount()
+ ", total=" + getTotal()
+ ", min=" + getMin()
+ ", max=" + getMax()
+ ", avg=" + getAvg() + "}";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ private GetRollupJobResponse createTestInstance() {
}

private RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomLong(), randomLong(), randomLong(), randomLong());
return new RollupIndexerJobStats(randomLong(), randomLong(), randomLong(), randomLong(),
new GetRollupJobResponse.StatsAccumulator(0, 0, 0, 0),
new GetRollupJobResponse.StatsAccumulator(0, 0, 0, 0), 0, 0);
}

private RollupJobStatus randomStatus() {
Expand Down Expand Up @@ -115,6 +117,23 @@ public void toXContent(RollupIndexerJobStats stats, XContentBuilder builder, ToX
builder.field(GetRollupJobResponse.NUM_INPUT_DOCUMENTS.getPreferredName(), stats.getNumDocuments());
builder.field(GetRollupJobResponse.NUM_OUTPUT_DOCUMENTS.getPreferredName(), stats.getOutputDocuments());
builder.field(GetRollupJobResponse.NUM_INVOCATIONS.getPreferredName(), stats.getNumInvocations());
builder.field(GetRollupJobResponse.BULK_LATENCY.getPreferredName());
toXContent(stats.getBulkLatency(), builder, params);
builder.field(GetRollupJobResponse.SEARCH_LATENCY.getPreferredName());
toXContent(stats.getSearchLatency(), builder, params);
builder.field(GetRollupJobResponse.BULK_FAILURES.getPreferredName(), stats.getBulkFailures());
builder.field(GetRollupJobResponse.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.endObject();
}

public void toXContent(GetRollupJobResponse.StatsAccumulator stats, XContentBuilder builder,
ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(GetRollupJobResponse.MIN.getPreferredName(), stats.getMin());
builder.field(GetRollupJobResponse.MAX.getPreferredName(), stats.getMax());
builder.field(GetRollupJobResponse.AVG.getPreferredName(), stats.getAvg());
builder.field(GetRollupJobResponse.TOTAL.getPreferredName(), stats.getTotal());
builder.field(GetRollupJobResponse.COUNT.getPreferredName(), stats.getCount());
builder.endObject();
}
}
54 changes: 51 additions & 3 deletions docs/reference/rollup/apis/get-job.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,23 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"bulk_failures": 0,
"bulk_latency_in_ms": {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simplify this to:

"bulk_time_in_ms": 0,
"bulk_total": 0,
"search_time_in_ms": 0,
"search_total": 0

?
I don't think we need more than the total time and the number of invocations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can simplify these. :)

"count": 0,
"total": 0,
"min": 0,
"max": 0,
"avg": 0.0
},
"search_failures": 0,
"search_latency_in_ms": {
"count": 0,
"total": 0,
"min": 0,
"max": 0,
"avg": 0.0
}
}
}
]
Expand Down Expand Up @@ -221,7 +237,23 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"bulk_failures": 0,
"bulk_latency": {
"count": 0,
"total": 0.0,
"min": 0.0,
"max": 0.0,
"avg": 0.0
},
"search_failures": 0,
"search_latency": {
"count": 0,
"total": 0.0,
"min": 0.0,
"max": 0.0,
"avg": 0.0
}
}
},
{
Expand Down Expand Up @@ -270,7 +302,23 @@ Which will yield the following response:
"pages_processed" : 0,
"documents_processed" : 0,
"rollups_indexed" : 0,
"trigger_count" : 0
"trigger_count" : 0,
"bulk_failures": 0,
"bulk_latency": {
"count": 0,
"total": 0.0,
"min": 0.0,
"max": 0.0,
"avg": 0.0
},
"search_failures": 0,
"search_latency": {
"count": 0,
"total": 0.0,
"min": 0.0,
"max": 0.0,
"avg": 0.0
}
}
}
]
Expand Down
Loading