Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit CS Update Task Description Size #79443

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.cluster;

import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;

import java.util.IdentityHashMap;
Expand Down Expand Up @@ -46,7 +47,12 @@ default void clusterStatePublished(ClusterStatePublicationEvent clusterStatePubl
* This allows groupd task description but the submitting source.
*/
default String describeTasks(List<T> tasks) {
DaveCTurner marked this conversation as resolved.
Show resolved Hide resolved
return String.join(", ", tasks.stream().map(t -> (CharSequence)t.toString()).filter(t -> t.length() > 0)::iterator);
final StringBuilder output = new StringBuilder();
Strings.collectionToDelimitedStringWithLimit(
(Iterable<String>) () -> tasks.stream().map(Object::toString).filter(s -> s.isEmpty() == false).iterator(),
", ", "", "", 1024, output
);
return output.toString();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.elasticsearch.cluster.service;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.core.TimeValue;
Expand Down Expand Up @@ -131,14 +132,28 @@ void runIfNotProcessed(BatchedTask updateTask) {
}

if (toExecute.isEmpty() == false) {
final String tasksSummary = processTasksBySource.entrySet().stream().map(entry -> {
run(updateTask.batchingKey, toExecute, buildTasksDescription(updateTask, toExecute, processTasksBySource));
}
}
}

private static final int MAX_TASK_DESCRIPTION_CHARS = 8 * 1024;

private String buildTasksDescription(BatchedTask updateTask,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here I think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it's a little less fun to use the general string builder than in the other case because I think it'd be nice to have the overall task count?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused: collectionToDelimitedStringWithLimit does yield the overall count if it truncated the output. There's no filtering happening here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we have this weird setup here where we have the tasks grouped by source so the counting will only work out correctly if the source is different for each task? I kinda liked having that level of detail on the counts for debugging still.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see we use toExecute.size() rather than processTasksBySource.size().

How about either just appending (N tasks in total) if output.length() exceeds the limit we set, or else letting collectionToDelimitedStringWithLimit take some extra detail that it puts into the truncation summary?

List<BatchedTask> toExecute,
Map<String, List<BatchedTask>> processTasksBySource) {
final StringBuilder output = new StringBuilder();
Strings.collectionToDelimitedStringWithLimit(
(Iterable<String>) () -> processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");

run(updateTask.batchingKey, toExecute, tasksSummary);
}
}).filter(s -> s.isEmpty() == false).iterator(),
", ", "", "", MAX_TASK_DESCRIPTION_CHARS, output
);
if (output.length() > MAX_TASK_DESCRIPTION_CHARS) {
output.append(" (").append(toExecute.size()).append(" tasks in total)");
}
return output.toString();
}

/**
Expand Down