Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plugin-server): Preserve distinct ID locality on overflow rerouting #20945

Merged
merged 6 commits into from
Apr 2, 2024

Conversation

tkaemming
Copy link
Contributor

@tkaemming tkaemming commented Mar 15, 2024

Problem

When messages are rerouted from the main topic to the overflow topic, we've historically stripped them of their key so that they are uniformly distributed over all partitions within the overflow topic, rather than being targeted at a single partition. This is helpful for distributing CPU-intensive workloads or other workloads that are not reliant on shared state, as it allows the independently parallelizable aspects of the processing loop to be performed independently, resulting in overall throughput improvements.

However, the hotspots in our workload these days seem to typically be person property updates, which require row level locks in Postgres to be held while performing row updates. This can cause problems when paired with messages that are uniformly distributed over all of the partitions in the overflow topic:

  1. When a distinct ID that is frequently seen in the main topic overflows and is rerouted, the number of processes that need to acquire the lock suddenly jumps from the single consumer for the associated partition in the main topic, to all consumers over all partitions in the overflow topic. This leads to lock contention and decreased throughput across the overflow consumers as they need to continually wait on each other to make forward progress. This can lead to slow batches and liveness check timeouts (which themselves can cause disruptive restarts/rebalances.)
  2. When the distinct ID stops overflowing in the main topic (either due to a decrease in ingress rate, or due to consumer group rebalances due to scale up/down, crashes, pod eviction, etc) but a backlog of messages to be processed remains in the overflow topic, this lock contention can leak back "upstream" to the main topic consumer, as now the main consumer also needs to acquire the heavily contended row locks, in addition to the overflow consumers. This typically causes a situation that requires manual intervention to resolve (generally forcing to overflow or dropping the distinct ID.)

Changes

Preserve the message key when publishing to the overflow topic, so that semantic routing/locality is preserved over distinct IDs.

This isn't a perfect solution, it does have it's tradeoffs:

Advantages

  • Improved throughput due to less lock contention when we've been slammed by one distinct ID
  • Decreased pressure reflecting back upstream to ingestion since, fewer overflow consumers will be contending for locks
  • Conducive to person property update batching (which doesn't exist yet, but hopefully will in the near future)

Disadvantages

  • Single partition backlog can grow much faster now in overflow (we'll need to be careful with retention.bytes to avoid losing messages to retention if one partition is extremely overloaded)
  • In the unfortunate event a slightly overflowing distinct ID is colocated with a heavily overflowing distinct ID, it will be more delayed than it would be if it was in a different partition (however still likely less delayed than had this change not been made at all)
  • Ingestion will be less resilient to slow plugins

Does this work well for both Cloud and self-hosted?

Yep!

How did you test this code?

Updated tests.

@tkaemming tkaemming marked this pull request as ready for review March 19, 2024 00:30
@tkaemming tkaemming requested a review from a team March 19, 2024 00:30
@@ -255,7 +255,7 @@ async function emitToOverflow(queue: IngestionConsumer, kafkaMessages: Message[]
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
key: message.key,
Copy link
Contributor

@xvello xvello Mar 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

message.key can be empty if capture-side overflow detection triggers.

As we want to evaluate impact and probably don't want to invest a lot for now, I'm fine with:

  • checking how much capture-side detection triggers (on both new and old capture) vs plugin-server-side
  • disable capture-side detection on both captures for now while we evalutate this

Last monday's incident has shown us that we can read & unmarshall really fast (1.6M/minute with 8 historical pods dropping on token), so capture-side might not be really necessary.

The alternative would be to re-compute a key if missing, but then that's a third copy of that code to maintain, I'd rather avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to hold off on merging this for a bit — I had assumed there was already a method to turn off overflow routing wholesale on both captures but it doesn't look like that was a valid assumption for me to make. Seems like it'd make sense to take care of that first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be turned off in python, but you are right that it cannot be turned off in rust yet. I'd put very high thresholds in the rust config while working on a PR to add a boolean config to completely disable it.
BTW, this capture-side detection is kind of a scalability time bomb as its memory usage is O(active distinct_id), so it needs to eventually be phased out anyway.

Copy link
Contributor Author

@tkaemming tkaemming Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be turned off in python, but you are right that it cannot be turned off in rust yet.

Ah, I was thinking we'd want to bypass this entire conditional, but maybe that's overkill.

BTW, this capture-side detection is kind of a scalability time bomb as its memory usage is O(active distinct_id), so it needs to eventually be phased out anyway.

Good point, I hadn't really considered that — thanks for mentioning it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, we'd need to skip the check against LIKELY_ANONYMOUS_IDS too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ability to turn off random partitioning completely in old capture is here: #21168 (My initial thought was to keep that PR separate, but in retrospect, I suppose this could have been part of this change too.)

@bretthoerner
Copy link
Contributor

bretthoerner commented Mar 19, 2024

Well written description. Seems like a win to me. In the future, with more reliability visibility, we should (auto) blackhole extremely loud distinct_ids to alleviate the risk of single partition becoming a huge issue.

@tkaemming tkaemming requested a review from a team March 22, 2024 18:01
… reverse compatibility and no surprises during deploy
@tkaemming tkaemming merged commit 85ef237 into master Apr 2, 2024
77 checks passed
@tkaemming tkaemming deleted the overflow-preserve-locality branch April 2, 2024 15:56
tkaemming added a commit that referenced this pull request Apr 2, 2024
tkaemming added a commit that referenced this pull request Apr 2, 2024
ingestionOverflowingMessagesTotal.inc(kafkaMessages.length)
await Promise.all(
kafkaMessages.map((message) =>
queue.pluginsServer.kafkaProducer.produce({
topic: KAFKA_EVENTS_PLUGIN_INGESTION_OVERFLOW,
value: message.value,
key: null, // No locality guarantees in overflow
key: useRandomPartitioner ? undefined : message.key,
Copy link
Contributor

@tiina303 tiina303 Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a change here where we set it to undefined instead of null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this was the issue.

The produce call here eventually forwards to a HighLevelProducer instance, which does some extra stuff with the key in produce. If the key ends up being undefined it just quietly doesn't produce the message, and the callback is never called.

This is also the behavior that I saw testing this manually in the node REPL:

> const { HighLevelProducer } = require('node-rdkafka')
undefined
> const p = new HighLevelProducer({'bootstrap.servers': 'kafka:9092'})
undefined
> p.connect()
// truncated
> > p.produce('garbage-topic', null, Buffer.from('message'), 'key', undefined, (...a) => { console.log('callback:', a) })
undefined
> callback: [ null, 1 ]
> p.produce('garbage-topic', null, Buffer.from('message'), undefined, undefined, (...a) => { console.log('callback:', a) })
undefined
// nothing ever happens here

What I figure happened is that any message that should have been routed to overflow never resolved it's promise, and the consumers simply stopped making forward progress once they saw a batch containing one of those messages.

The key property is typed as MessageKey, which does include undefined, and the HighLevelProducer.produce signature accepts any, which explains why this wasn't caught by the type checker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants