Skip to content

Commit

Permalink
Do no use x-opaque-id for deduplicating elastic originating requests (#…
Browse files Browse the repository at this point in the history
…82855)

deprecated log messages originating from any elastic product requests should not be
deduplicated with the use of x-opaque-id.
If present, the value of X-elastic-product-origin will be used as part of the throttling key.

relates #82810
  • Loading branch information
pgomulka authored Jan 25, 2022
1 parent f412ce4 commit f416b67
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 14 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/82855.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 82855
summary: Do no use x-opaque-id for deduplicating elastic originating requests
area: Infra/Logging
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.message.Message;
import org.elasticsearch.common.Strings;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

import static org.elasticsearch.common.logging.DeprecatedMessage.ELASTIC_ORIGIN_FIELD_NAME;
import static org.elasticsearch.common.logging.DeprecatedMessage.KEY_FIELD_NAME;
import static org.elasticsearch.common.logging.DeprecatedMessage.X_OPAQUE_ID_FIELD_NAME;

Expand All @@ -34,13 +36,14 @@
* passed by a user on a HTTP header.
* This filter works by using a lruKeyCache - a set of keys which prevents a second message with the same key to be logged.
* The lruKeyCache has a size limited to 128, which when breached will remove the oldest entries.
*
* <p>
* It is possible to disable use of `x-opaque-id` as a key with {@link RateLimitingFilter#setUseXOpaqueId(boolean) }
*
* @see <a href="https://logging.apache.org/log4j/2.x/manual/filters.htmlf">Log4j2 Filters</a>
*/
@Plugin(name = "RateLimitingFilter", category = Node.CATEGORY, elementType = Filter.ELEMENT_TYPE)
public class RateLimitingFilter extends AbstractFilter {

// a flag to disable/enable use of xOpaqueId controlled by changing cluster setting
private volatile boolean useXOpaqueId = true;

private final Set<String> lruKeyCache = Collections.newSetFromMap(Collections.synchronizedMap(new LinkedHashMap<>() {
Expand Down Expand Up @@ -76,6 +79,10 @@ public Result filter(Message message) {

private String getKey(ESLogMessage esLogMessage) {
final String key = esLogMessage.get(KEY_FIELD_NAME);
final String productOrigin = esLogMessage.get(ELASTIC_ORIGIN_FIELD_NAME);
if (Strings.isNullOrEmpty(productOrigin) == false) {
return productOrigin + key;
}
if (useXOpaqueId) {
String xOpaqueId = esLogMessage.get(X_OPAQUE_ID_FIELD_NAME);
return xOpaqueId + key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,20 @@ public void testMessagesAreRateLimitedByXOpaqueId() {
public void testMessagesAreRateLimitedByKeyAndXOpaqueId() {
// Fill up the cache
for (int i = 0; i < 128; i++) {
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key " + i, "opaque-id " + i, "productName", "msg " + i);
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key " + i, "opaque-id " + i, null, "msg " + i);
assertThat("Expected key" + i + " to be accepted", filter.filter(message), equalTo(Result.ACCEPT));
}

// Should be rate-limited because it's still in the cache
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Filter a message with a previously unseen key, in order to evict key0 as it's the oldest
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 129", "opaque-id 129", "productName", "msg 129");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 129", "opaque-id 129", null, "msg 129");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// Should be allowed because key 0 was evicted from the cache
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}

Expand All @@ -106,18 +106,18 @@ public void testMessagesAreRateLimitedByKeyAndXOpaqueId() {
* independently and checking that a message is not filtered.
*/
public void testVariationsInKeyAndXOpaqueId() {
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
// Rejected because the "x-opaque-id" and "key" values are the same as above
assertThat(filter.filter(message), equalTo(Result.DENY));

message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 0", null, "msg 0");
// Accepted because the "key" value is different
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", null, "msg 0");
// Accepted because the "x-opaque-id" value is different
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}
Expand Down Expand Up @@ -154,19 +154,45 @@ public void testMessagesXOpaqueIsIgnoredWhenDisabled() {
filter.start();

// Should NOT be rate-limited because it's not in the cache
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// Should be rate-limited because it was just added to the cache
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 0", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Should be rate-limited because X-Opaque-Id is not used
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 0", "opaque-id 1", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Should NOT be rate-limited because "key 1" it not in the cache
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 1", "productName", "msg 0");
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key 1", "opaque-id 1", null, "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}

public void testXOpaqueIdNotBeingUsedFromElasticOriginatingRequests() {
RateLimitingFilter filter = new RateLimitingFilter();
filter.setUseXOpaqueId(true);
filter.start();

// Should NOT be rate-limited because it's not in the cache
Message message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 0", "kibana", "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// Should be rate-limited even though the x-opaque-id is unique because it originates from kibana
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 1", "kibana", "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// Should not be rate-limited - it is the first request from beats. (x-opaque-id ignored as it originates from elastic)
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 0", "beats", "msg 0");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));

// second request from beats (elastic originating), should be rate-limited
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key", "opaque-id 1", "beats", "msg 0");
assertThat(filter.filter(message), equalTo(Result.DENY));

// request from beats (elastic originating), but with a different key- should not be rate-limited
message = DeprecatedMessage.of(DeprecationCategory.OTHER, "key2", "opaque-id 1", "beats", "msg 1");
assertThat(filter.filter(message), equalTo(Result.ACCEPT));
}
}

0 comments on commit f416b67

Please sign in to comment.