Skip to content

Commit

Permalink
fix(kafka source): Fix vector log with incorrect pipeline ID (#449)
Browse files Browse the repository at this point in the history
Per the documentation for span, span::enter may
produce incorrect traces in async code. This is
fixed by using the instrument method.

See: https://docs.rs/tracing/latest/tracing/span/index.html\#entering-a-span

Ref: LOG-19224
  • Loading branch information
biblicalph committed Apr 15, 2024
1 parent 50e24d8 commit a3212f4
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tokio::{
time::Sleep,
};
use tokio_util::codec::FramedRead;
use tracing::{Instrument, Span};
use vector_lib::codecs::{
decoding::{DeserializerConfig, FramingConfig},
StreamDecodingError,
Expand Down Expand Up @@ -462,9 +463,8 @@ async fn kafka_source(
.drain_timeout_ms
.map_or(config.session_timeout_ms / 2, Duration::from_millis);
let consumer_state =
ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace);
ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
tokio::spawn(async move {
let _enter = span.enter();
coordinate_kafka_callbacks(
consumer,
callback_rx,
Expand Down Expand Up @@ -514,8 +514,10 @@ struct ConsumerStateInner<S> {
log_namespace: LogNamespace,
consumer_state: S,
}
struct Consuming;
struct Complete;
struct Consuming {
/// The source's tracing Span used to instrument metrics emitted by consumer tasks
span: Span,
}
struct Draining {
/// The rendezvous channel sender from the revoke or shutdown callback. Sending on this channel
/// indicates to the kafka client task that one or more partitions have been drained, while
Expand All @@ -534,19 +536,23 @@ struct Draining {
/// the `finish_drain` method will return a Complete state, otherwise
/// a Consuming state.
shutdown: bool,

/// The source's tracing Span used to instrument metrics emitted by consumer tasks
span: Span,
}
type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
enum ConsumerState {
Consuming(ConsumerStateInner<Consuming>),
Draining(ConsumerStateInner<Draining>),
Complete(ConsumerStateInner<Complete>),
Complete,
}
impl Draining {
fn new(signal: SyncSender<()>, shutdown: bool) -> Self {
fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
Self {
signal,
shutdown,
expect_drain: HashSet::new(),
span,
}
}

Expand All @@ -557,16 +563,7 @@ impl Draining {

impl<C> ConsumerStateInner<C> {
fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
(
None.into(),
ConsumerState::Complete(ConsumerStateInner {
config: self.config,
decoder: self.decoder,
out: self.out,
log_namespace: self.log_namespace,
consumer_state: Complete,
}),
)
(None.into(), ConsumerState::Complete)
}
}

Expand All @@ -576,13 +573,14 @@ impl ConsumerStateInner<Consuming> {
decoder: Decoder,
out: SourceSender,
log_namespace: LogNamespace,
span: Span,
) -> Self {
Self {
config,
decoder,
out,
log_namespace,
consumer_state: Consuming,
consumer_state: Consuming { span },
}
}

Expand Down Expand Up @@ -662,7 +660,7 @@ impl ConsumerStateInner<Consuming> {
)
}
(tp, status)
});
}.instrument(self.consumer_state.span.clone()));
(end_tx, handle)
}

Expand All @@ -681,7 +679,7 @@ impl ConsumerStateInner<Consuming> {
decoder: self.decoder,
out: self.out,
log_namespace: self.log_namespace,
consumer_state: Draining::new(sig, shutdown),
consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
};

(Some(deadline).into(), draining)
Expand Down Expand Up @@ -731,7 +729,9 @@ impl ConsumerStateInner<Draining> {
decoder: self.decoder,
out: self.out,
log_namespace: self.log_namespace,
consumer_state: Consuming,
consumer_state: Consuming {
span: self.consumer_state.span,
},
}),
)
}
Expand Down Expand Up @@ -777,7 +777,7 @@ async fn coordinate_kafka_callbacks(
abort_handles.remove(&finished_partition);

(drain_deadline, consumer_state) = match consumer_state {
ConsumerState::Complete(_) => unreachable!("Partition consumer finished after completion."),
ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
ConsumerState::Draining(mut state) => {
state.partition_drained(finished_partition);

Expand Down Expand Up @@ -810,7 +810,7 @@ async fn coordinate_kafka_callbacks(
},
Some(callback) = callbacks.recv() => match callback {
KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
ConsumerState::Complete(_) => unreachable!("Partition assignment received after completion."),
ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
ConsumerState::Consuming(ref consumer_state) => {
let acks = consumer.context().acknowledgements;
Expand All @@ -831,7 +831,7 @@ async fn coordinate_kafka_callbacks(
}
},
KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
ConsumerState::Complete(_) => unreachable!("Partitions revoked after completion."),
ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
ConsumerState::Draining(d) => {
// NB: This would only happen if the task driving the kafka client (i.e. rebalance handlers)
// is not handling shutdown signals, and a revoke happens during a shutdown drain; otherwise
Expand All @@ -855,7 +855,7 @@ async fn coordinate_kafka_callbacks(
}
},
KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
ConsumerState::Complete(_) => unreachable!("Shutdown received after completion."),
ConsumerState::Complete => unreachable!("Shutdown received after completion."),
// Shutting down is just like a full assignment revoke, but we also close the
// callback channels, since we don't expect additional assignments or rebalances
ConsumerState::Draining(state) => {
Expand Down Expand Up @@ -894,7 +894,7 @@ async fn coordinate_kafka_callbacks(
},

Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
ConsumerState::Complete(_) => unreachable!("Drain deadline received after completion."),
ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
ConsumerState::Consuming(state) => {
warn!("A drain deadline fired outside of draining mode.");
state.keep_consuming(None.into())
Expand Down

0 comments on commit a3212f4

Please sign in to comment.