-
Notifications
You must be signed in to change notification settings - Fork 24.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Rollup] Add more diagnostic stats to job #35471
Changes from 1 commit
2b6e52d
6505b73
6b25535
1856fbe
98e90ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,15 +26,15 @@ | |
import org.elasticsearch.common.xcontent.XContentParser; | ||
|
||
import java.io.IOException; | ||
import java.util.Objects; | ||
import java.util.List; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; | ||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; | ||
import static java.util.Collections.unmodifiableList; | ||
import static java.util.stream.Collectors.joining; | ||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; | ||
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg; | ||
|
||
/** | ||
* Response from rollup's get jobs api. | ||
|
@@ -51,6 +51,15 @@ public class GetRollupJobResponse { | |
static final ParseField STATE = new ParseField("job_state"); | ||
static final ParseField CURRENT_POSITION = new ParseField("current_position"); | ||
static final ParseField UPGRADED_DOC_ID = new ParseField("upgraded_doc_id"); | ||
static final ParseField BULK_LATENCY = new ParseField("bulk_latency_in_ms"); | ||
static final ParseField SEARCH_LATENCY = new ParseField("search_latency_in_ms"); | ||
static final ParseField SEARCH_FAILURES = new ParseField("search_failures"); | ||
static final ParseField BULK_FAILURES = new ParseField("bulk_failures"); | ||
static final ParseField MIN = new ParseField("min"); | ||
static final ParseField MAX = new ParseField("max"); | ||
static final ParseField AVG = new ParseField("avg"); | ||
static final ParseField COUNT = new ParseField("count"); | ||
static final ParseField TOTAL = new ParseField("total"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be consistent with the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @polyfractal Are There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right you are! |
||
|
||
private List<JobWrapper> jobs; | ||
|
||
|
@@ -181,12 +190,22 @@ public static class RollupIndexerJobStats { | |
private final long numInputDocuments; | ||
private final long numOuputDocuments; | ||
private final long numInvocations; | ||
|
||
RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations) { | ||
private StatsAccumulator bulkLatency; | ||
private StatsAccumulator searchLatency; | ||
private long bulkFailures; | ||
private long searchFailures; | ||
|
||
RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations, | ||
StatsAccumulator bulkLatency, StatsAccumulator searchLatency, long bulkFailures, | ||
long searchFailures) { | ||
this.numPages = numPages; | ||
this.numInputDocuments = numInputDocuments; | ||
this.numOuputDocuments = numOuputDocuments; | ||
this.numInvocations = numInvocations; | ||
this.bulkLatency = bulkLatency; | ||
this.searchLatency = searchLatency; | ||
this.bulkFailures = bulkFailures; | ||
this.searchFailures = searchFailures; | ||
} | ||
|
||
/** | ||
|
@@ -217,15 +236,50 @@ public long getOutputDocuments() { | |
return numOuputDocuments; | ||
} | ||
|
||
/** | ||
* Number of failures that have occurred during the bulk indexing phase of Rollup | ||
*/ | ||
public long getBulkFailures() { | ||
return bulkFailures; | ||
} | ||
|
||
/** | ||
* Number of failures that have occurred during the search phase of Rollup | ||
*/ | ||
public long getSearchFailures() { | ||
return searchFailures; | ||
} | ||
|
||
/** | ||
* Returns an object which contains latency stats (min/max/avg/count) for the bulk | ||
* indexing phase of Rollup | ||
*/ | ||
public StatsAccumulator getBulkLatency() { | ||
return bulkLatency; | ||
} | ||
|
||
/** | ||
* Returns an object which contains latency stats (min/max/avg/count) for the | ||
* search phase of Rollup | ||
*/ | ||
public StatsAccumulator getSearchLatency() { | ||
return searchLatency; | ||
} | ||
|
||
private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>( | ||
STATS.getPreferredName(), | ||
true, | ||
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); | ||
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3], | ||
(StatsAccumulator) args[4], (StatsAccumulator) args[5], (long) args[6], (long) args[7])); | ||
static { | ||
PARSER.declareLong(constructorArg(), NUM_PAGES); | ||
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS); | ||
PARSER.declareLong(constructorArg(), NUM_OUTPUT_DOCUMENTS); | ||
PARSER.declareLong(constructorArg(), NUM_INVOCATIONS); | ||
PARSER.declareObject(constructorArg(), StatsAccumulator.PARSER::apply, BULK_LATENCY); | ||
PARSER.declareObject(constructorArg(), StatsAccumulator.PARSER::apply, SEARCH_LATENCY); | ||
PARSER.declareLong(constructorArg(), BULK_FAILURES); | ||
PARSER.declareLong(constructorArg(), SEARCH_FAILURES); | ||
} | ||
|
||
@Override | ||
|
@@ -236,20 +290,29 @@ public boolean equals(Object other) { | |
return Objects.equals(this.numPages, that.numPages) | ||
&& Objects.equals(this.numInputDocuments, that.numInputDocuments) | ||
&& Objects.equals(this.numOuputDocuments, that.numOuputDocuments) | ||
&& Objects.equals(this.numInvocations, that.numInvocations); | ||
&& Objects.equals(this.numInvocations, that.numInvocations) | ||
&& Objects.equals(this.bulkLatency, that.bulkLatency) | ||
&& Objects.equals(this.searchLatency, that.searchLatency) | ||
&& Objects.equals(this.bulkFailures, that.bulkFailures) | ||
&& Objects.equals(this.searchFailures, that.searchFailures); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations); | ||
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations, | ||
bulkLatency, searchLatency, bulkFailures, searchFailures); | ||
} | ||
|
||
@Override | ||
public final String toString() { | ||
return "{pages=" + numPages | ||
+ ", input_docs=" + numInputDocuments | ||
+ ", output_docs=" + numOuputDocuments | ||
+ ", invocations=" + numInvocations + "}"; | ||
+ ", invocations=" + numInvocations | ||
+ ", bulk_failures=" + bulkFailures | ||
+ ", search_failures=" + searchFailures | ||
+ ", bulk_latency=" + bulkLatency | ||
+ ", search_latency=" + searchLatency + "}"; | ||
} | ||
} | ||
|
||
|
@@ -371,4 +434,89 @@ String value() { | |
return name().toLowerCase(Locale.ROOT); | ||
} | ||
} | ||
|
||
public static class StatsAccumulator { | ||
|
||
private static final String NAME = "stats_accumulator"; | ||
private static final ParseField MIN = new ParseField("min"); | ||
private static final ParseField MAX = new ParseField("max"); | ||
private static final ParseField AVG = new ParseField("avg"); | ||
private static final ParseField COUNT = new ParseField("count"); | ||
private static final ParseField TOTAL = new ParseField("total"); | ||
|
||
public static final ConstructingObjectParser<StatsAccumulator, Void> PARSER = | ||
new ConstructingObjectParser<>(NAME, true, | ||
args -> new StatsAccumulator((long) args[0], (long) args[1], (long) args[2], (long) args[3])); | ||
|
||
static { | ||
PARSER.declareLong(constructorArg(), COUNT); | ||
PARSER.declareLong(constructorArg(), TOTAL); | ||
PARSER.declareLong(constructorArg(), MIN); | ||
PARSER.declareLong(constructorArg(), MAX); | ||
PARSER.declareLong(constructorArg(), AVG); // We parse but don't actually use the avg | ||
} | ||
|
||
private long count; | ||
private long total; | ||
private long min; | ||
private long max; | ||
|
||
StatsAccumulator(long count, long total, long min, long max) { | ||
this.count = count; | ||
this.total = total; | ||
this.min = min; | ||
this.max = max; | ||
} | ||
|
||
public long getCount() { | ||
return count; | ||
} | ||
|
||
public long getMin() { | ||
return count == 0 ? 0 : min; | ||
} | ||
|
||
public long getMax() { | ||
return count == 0 ? 0 : max; | ||
} | ||
|
||
public double getAvg() { | ||
return count == 0 ? 0.0 : (double) total / (double) count; | ||
} | ||
|
||
public long getTotal() { | ||
return total; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(count, total, min, max); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (obj == null) { | ||
return false; | ||
} | ||
|
||
if (getClass() != obj.getClass()) { | ||
return false; | ||
} | ||
|
||
StatsAccumulator other = (StatsAccumulator) obj; | ||
return Objects.equals(count, other.count) | ||
&& Objects.equals(total, other.total) | ||
&& Objects.equals(min, other.min) | ||
&& Objects.equals(max, other.max); | ||
} | ||
|
||
@Override | ||
public final String toString() { | ||
return "{count=" + getCount() | ||
+ ", total=" + getTotal() | ||
+ ", min=" + getMin() | ||
+ ", max=" + getMax() | ||
+ ", avg=" + getAvg() + "}"; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,7 +101,23 @@ Which will yield the following response: | |
"pages_processed" : 0, | ||
"documents_processed" : 0, | ||
"rollups_indexed" : 0, | ||
"trigger_count" : 0 | ||
"trigger_count" : 0, | ||
"bulk_failures": 0, | ||
"bulk_latency_in_ms": { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we simplify this to:
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I can simplify these. :) |
||
"count": 0, | ||
"total": 0, | ||
"min": 0, | ||
"max": 0, | ||
"avg": 0.0 | ||
}, | ||
"search_failures": 0, | ||
"search_latency_in_ms": { | ||
"count": 0, | ||
"total": 0, | ||
"min": 0, | ||
"max": 0, | ||
"avg": 0.0 | ||
} | ||
} | ||
} | ||
] | ||
|
@@ -221,7 +237,23 @@ Which will yield the following response: | |
"pages_processed" : 0, | ||
"documents_processed" : 0, | ||
"rollups_indexed" : 0, | ||
"trigger_count" : 0 | ||
"trigger_count" : 0, | ||
"bulk_failures": 0, | ||
"bulk_latency": { | ||
"count": 0, | ||
"total": 0.0, | ||
"min": 0.0, | ||
"max": 0.0, | ||
"avg": 0.0 | ||
}, | ||
"search_failures": 0, | ||
"search_latency": { | ||
"count": 0, | ||
"total": 0.0, | ||
"min": 0.0, | ||
"max": 0.0, | ||
"avg": 0.0 | ||
} | ||
} | ||
}, | ||
{ | ||
|
@@ -270,7 +302,23 @@ Which will yield the following response: | |
"pages_processed" : 0, | ||
"documents_processed" : 0, | ||
"rollups_indexed" : 0, | ||
"trigger_count" : 0 | ||
"trigger_count" : 0, | ||
"bulk_failures": 0, | ||
"bulk_latency": { | ||
"count": 0, | ||
"total": 0.0, | ||
"min": 0.0, | ||
"max": 0.0, | ||
"avg": 0.0 | ||
}, | ||
"search_failures": 0, | ||
"search_latency": { | ||
"count": 0, | ||
"total": 0.0, | ||
"min": 0.0, | ||
"max": 0.0, | ||
"avg": 0.0 | ||
} | ||
} | ||
} | ||
] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think it would be nicer to call it
INDEX_FAILURES
,BULK
is an implementation detail about how indexing is internally implemented.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++