Skip to content

Commit

Permalink
GH-3049: Fix Listener exceptions for observations
Browse files Browse the repository at this point in the history
Fixes: #3049

Listener exceptions are not saved to the observation.

* Embedded the (original) exception into the observation, allowing downstream tracing code to handle it.
* Add unit test for observation `Error` and `RuntimeException`.
* Unify the `runtimeExceptionTemplate` and `errorTemplate` into a `throwableTemplate`.

(cherry picked from commit 61016db)
  • Loading branch information
Wzy19930507 authored and spring-builds committed Feb 29, 2024
1 parent 5be4711 commit 157ea43
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2768,37 +2768,37 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
try {
try {
observation.observe(() -> {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
});
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
return null;
});
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
return null;
}

private void commitOffsetsIfNeededAfterHandlingError(final ConsumerRecord<K, V> cRecord) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2022-2023 the original author or authors.
* Copyright 2022-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,6 +35,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.junit.jupiter.api.Test;

Expand All @@ -54,7 +55,6 @@
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
Expand Down Expand Up @@ -85,14 +85,20 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Wang Zhiyang
*
* @since 3.0
*/
@SpringJUnitConfig
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "ObservationTests.testT3" })
@EmbeddedKafka(topics = { "observation.testT1", "observation.testT2", "observation.testT3",
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR})
@DirtiesContext
public class ObservationTests {

public final static String OBSERVATION_RUNTIME_EXCEPTION = "observation.runtime-exception";

public final static String OBSERVATION_ERROR = "observation.error";

@Test
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
Expand All @@ -106,8 +112,8 @@ void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, St
assertThat(listener.latch1.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.record).isNotNull();
Headers headers = listener.record.headers();
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(4);
SimpleSpan span = spans.poll();
Expand Down Expand Up @@ -148,14 +154,15 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
}

});

rler.getListenerContainer("obs1").stop();
rler.getListenerContainer("obs1").start();
template.send("observation.testT1", "test").get(10, TimeUnit.SECONDS);
assertThat(listener.latch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(listener.record).isNotNull();
headers = listener.record.headers();
assertThat(headers.lastHeader("foo")).extracting(hdr -> hdr.value()).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(hdr -> hdr.value()).isEqualTo("some bar value".getBytes());
assertThat(headers.lastHeader("foo")).extracting(Header::value).isEqualTo("some foo value".getBytes());
assertThat(headers.lastHeader("bar")).extracting(Header::value).isEqualTo("some bar value".getBytes());
assertThat(spans).hasSize(4);
span = spans.poll();
assertThat(span.getTags()).containsEntry("spring.kafka.template.name", "template");
Expand Down Expand Up @@ -230,6 +237,48 @@ public KeyValues getLowCardinalityKeyValues(KafkaRecordReceiverContext context)
.doesNotHaveMeterWithNameAndTags("spring.kafka.template", KeyValues.of("error", "KafkaException"));
}

@Test
void observationRuntimeException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> runtimeExceptionTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

runtimeExceptionTemplate.send(OBSERVATION_RUNTIME_EXCEPTION, "testRuntimeException").get(10, TimeUnit.SECONDS);
assertThat(listener.latch4.await(10, TimeUnit.SECONDS)).isTrue();
endpointRegistry.getListenerContainer("obs4").stop();

Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(2);
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs4-0");
assertThat(span.getError().getCause())
.isInstanceOf(IllegalStateException.class)
.hasMessage("obs4 run time exception");
}

@Test
void observationErrorException(@Autowired ExceptionListener listener, @Autowired SimpleTracer tracer,
@Autowired @Qualifier("throwableTemplate") KafkaTemplate<Integer, String> errorTemplate,
@Autowired KafkaListenerEndpointRegistry endpointRegistry)
throws ExecutionException, InterruptedException, TimeoutException {

errorTemplate.send(OBSERVATION_ERROR, "testError").get(10, TimeUnit.SECONDS);
assertThat(listener.latch5.await(10, TimeUnit.SECONDS)).isTrue();
endpointRegistry.getListenerContainer("obs5").stop();

Deque<SimpleSpan> spans = tracer.getSpans();
assertThat(spans).hasSize(2);
SimpleSpan span = spans.poll();
assertThat(span.getTags().get("spring.kafka.template.name")).isEqualTo("throwableTemplate");
span = spans.poll();
assertThat(span.getTags().get("spring.kafka.listener.id")).isEqualTo("obs5-0");
assertThat(span.getError())
.isInstanceOf(Error.class)
.hasMessage("obs5 error");
}

@Configuration
@EnableKafka
public static class Config {
Expand Down Expand Up @@ -276,6 +325,13 @@ KafkaTemplate<Integer, String> customTemplate(ProducerFactory<Integer, String> p
return template;
}

@Bean
KafkaTemplate<Integer, String> throwableTemplate(ProducerFactory<Integer, String> pf) {
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setObservationEnabled(true);
return template;
}

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory(
ConsumerFactory<Integer, String> cf) {
Expand All @@ -286,7 +342,7 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
factory.getContainerProperties().setObservationEnabled(true);
factory.setContainerCustomizer(container -> {
if (container.getListenerId().equals("obs3")) {
((AbstractMessageListenerContainer<Integer, String>) container).setKafkaAdmin(this.mockAdmin);
container.setKafkaAdmin(this.mockAdmin);
}
});
return factory;
Expand Down Expand Up @@ -352,6 +408,11 @@ Listener listener(KafkaTemplate<Integer, String> template) {
return new Listener(template);
}

@Bean
ExceptionListener exceptionListener() {
return new ExceptionListener();
}

}

public static class Listener {
Expand Down Expand Up @@ -387,4 +448,24 @@ void listen3(ConsumerRecord<Integer, String> in) {

}

public static class ExceptionListener {

final CountDownLatch latch4 = new CountDownLatch(1);

final CountDownLatch latch5 = new CountDownLatch(1);

@KafkaListener(id = "obs4", topics = OBSERVATION_RUNTIME_EXCEPTION)
void listenRuntimeException(ConsumerRecord<Integer, String> in) {
this.latch4.countDown();
throw new IllegalStateException("obs4 run time exception");
}

@KafkaListener(id = "obs5", topics = OBSERVATION_ERROR)
void listenError(ConsumerRecord<Integer, String> in) {
this.latch5.countDown();
throw new Error("obs5 error");
}

}

}

0 comments on commit 157ea43

Please sign in to comment.