Skip to content

Commit

Permalink
Merge pull request #460 from answerbook/mdeltito/LOG-19534
Browse files Browse the repository at this point in the history
fix(s3-source): expose errors in user logs
  • Loading branch information
mdeltito committed May 8, 2024
2 parents 548a853 + acf5400 commit 00c1add
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 7 deletions.
31 changes: 26 additions & 5 deletions src/internal_events/aws_sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ mod s3 {
pub should_log: bool,
}

impl SqsMessageProcessingError<'_> {
pub const MESSAGE: &'static str = "Failed to process SQS message.";
}

impl<'a> InternalEvent for SqsMessageProcessingError<'a> {
fn emit(self) {
if self.should_log {
error!(
message = "Failed to process SQS message.",
message = Self::MESSAGE,
message_id = %self.message_id,
error = %self.error,
error_code = "failed_processing_sqs_message",
Expand Down Expand Up @@ -68,11 +72,15 @@ mod s3 {
pub should_log: bool,
}

impl SqsMessageDeletePartialError {
pub const MESSAGE: &'static str = "Deletion of SQS message(s) failed.";
}

impl InternalEvent for SqsMessageDeletePartialError {
fn emit(self) {
if self.should_log {
error!(
message = "Deletion of SQS message(s) failed.",
message = Self::MESSAGE,
message_ids = %self.entries.iter()
.map(|x| format!("{}/{}", x.id.clone().unwrap_or_default(), x.code.clone().unwrap_or_default()))
.collect::<Vec<_>>()
Expand Down Expand Up @@ -100,11 +108,15 @@ mod s3 {
pub should_log: bool,
}

impl<E> SqsMessageDeleteBatchError<E> {
pub const MESSAGE: &'static str = "Deletion of SQS message(s) failed.";
}

impl<E: std::fmt::Display> InternalEvent for SqsMessageDeleteBatchError<E> {
fn emit(self) {
if self.should_log {
error!(
message = "Deletion of SQS message(s) failed.",
message = Self::MESSAGE,
message_ids = %self.entries.iter()
.map(|x| x.id.clone().unwrap_or_default())
.collect::<Vec<_>>()
Expand Down Expand Up @@ -132,10 +144,15 @@ pub struct SqsMessageReceiveError<'a, E> {
pub error: &'a E,
}

impl<'a, E> SqsMessageReceiveError<'a, E> {
pub const MESSAGE: &'static str =
"Failed to fetch SQS events, please check your credentials and queue URL.";
}

impl<'a, E: std::fmt::Display> InternalEvent for SqsMessageReceiveError<'a, E> {
fn emit(self) {
error!(
message = "Failed to fetch SQS events.",
message = Self::MESSAGE,
error = %self.error,
error_code = "failed_fetching_sqs_events",
error_type = error_type::REQUEST_FAILED,
Expand Down Expand Up @@ -184,11 +201,15 @@ pub struct SqsMessageDeleteError<'a, E> {
pub error: &'a E,
}

impl<E> SqsMessageDeleteError<'_, E> {
pub const MESSAGE: &'static str = "Failed to delete SQS events.";
}

#[cfg(feature = "sources-aws_sqs")]
impl<'a, E: std::fmt::Display> InternalEvent for SqsMessageDeleteError<'a, E> {
fn emit(self) {
error!(
message = "Failed to delete SQS events.",
message = Self::MESSAGE,
error = %self.error,
error_type = error_type::WRITER_FAILED,
stage = error_stage::PROCESSING,
Expand Down
24 changes: 22 additions & 2 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
};
use vrl::core::Value;

use crate::codecs::Decoder;
use crate::event::{Event, LogEvent};
use crate::mezmo::{user_trace::MezmoUserLog, MezmoContext};
use crate::sources::util::backoff::LogBackoff;
use crate::tls::TlsConfig;
use crate::{
Expand All @@ -44,7 +46,7 @@ use crate::{
shutdown::ShutdownSignal,
sources::aws_s3::AwsS3Config,
sources::util::backoff::Backoff,
SourceSender,
user_log_error, SourceSender,
};
use vector_lib::config::{log_schema, LegacyKey, LogNamespace};
use vector_lib::event::MaybeAsLogMut;
Expand Down Expand Up @@ -185,7 +187,7 @@ pub enum ProcessingError {
},
#[snafu(display("Unsupported S3 event version: {}.", version,))]
UnsupportedS3EventVersion { version: semver::Version },
#[snafu(display("Sink reported an error sending events"))]
#[snafu(display("Destination reported an error sending events"))]
ErrorAcknowledgement,
}

Expand Down Expand Up @@ -258,6 +260,7 @@ impl Ingestor {
cx.shutdown.clone(),
log_namespace,
acknowledgements,
cx.mezmo_ctx.clone(),
);
let fut = process.run();
let handle = tokio::spawn(fut.in_current_span());
Expand Down Expand Up @@ -288,6 +291,7 @@ pub struct IngestorProcess {
log_namespace: LogNamespace,
bytes_received: Registered<BytesReceived>,
events_received: Registered<EventsReceived>,
mezmo_ctx: Option<MezmoContext>,
}

impl IngestorProcess {
Expand All @@ -297,6 +301,7 @@ impl IngestorProcess {
shutdown: ShutdownSignal,
log_namespace: LogNamespace,
acknowledgements: bool,
mezmo_ctx: Option<MezmoContext>,
) -> Self {
Self {
state,
Expand All @@ -308,6 +313,7 @@ impl IngestorProcess {
log_namespace,
bytes_received: register!(BytesReceived::from(Protocol::HTTP)),
events_received: register!(EventsReceived),
mezmo_ctx,
}
}

Expand All @@ -328,6 +334,10 @@ impl IngestorProcess {

if let Err(ref e) = messages {
emit!(SqsMessageReceiveError { error: e });
user_log_error!(
self.mezmo_ctx,
Value::from(SqsMessageReceiveError::<SdkError<ReceiveMessageError>>::MESSAGE)
);
// Sleep for a while before returning
sleep(self.backoff.next()).await;
return;
Expand Down Expand Up @@ -381,6 +391,8 @@ impl IngestorProcess {
error: &err,
should_log: self.log.should_log(),
});

user_log_error!(self.mezmo_ctx, Value::from(err.to_string()));
}
}
}
Expand All @@ -406,6 +418,10 @@ impl IngestorProcess {
entries: result.failed.unwrap_or_default(),
should_log: self.log.should_log(),
});
user_log_error!(
self.mezmo_ctx,
Value::from(SqsMessageDeletePartialError::MESSAGE)
);
}
}
}
Expand All @@ -415,6 +431,10 @@ impl IngestorProcess {
error: err,
should_log: self.log.should_log(),
});
user_log_error!(
self.mezmo_ctx,
Value::from(SqsMessageDeleteBatchError::<SdkError<DeleteMessageBatchError>>::MESSAGE)
);
}
}
}
Expand Down

0 comments on commit 00c1add

Please sign in to comment.