Skip to content

Commit

Permalink
chore: Use backon to replace backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored and rtyler committed Sep 3, 2024
1 parent a6cb348 commit c4f4ee0
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 140 deletions.
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ tokio = { workspace = true }
regex = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
backoff = { version = "0.4", features = [ "tokio" ] }
backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] }
hyper-tls = { version = "0.5", optional = true }

[dev-dependencies]
Expand Down
294 changes: 155 additions & 139 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod logstore;
mod native;
pub mod storage;
use aws_config::SdkConfig;
use aws_sdk_dynamodb::error::SdkError;
use aws_sdk_dynamodb::{
operation::{
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
Expand Down Expand Up @@ -283,28 +284,28 @@ impl DynamoDbLockClient {
version: i64,
) -> Result<Option<CommitEntry>, LockClientError> {
let item = self
.retry(|| async {
match self
.dynamodb_client
.get_item()
.consistent_read(true)
.table_name(&self.config.lock_table_name)
.set_key(Some(self.get_primary_key(version, table_path)))
.send()
.await
{
Ok(x) => Ok(x),
Err(sdk_err) => match sdk_err.as_service_error() {
Some(GetItemError::ProvisionedThroughputExceededException(_)) => {
Err(backoff::Error::transient(
LockClientError::ProvisionedThroughputExceeded,
))
}
_ => Err(backoff::Error::permanent(sdk_err.into())),
},
.retry(
|| async {
self.dynamodb_client
.get_item()
.consistent_read(true)
.table_name(&self.config.lock_table_name)
.set_key(Some(self.get_primary_key(version, table_path)))
.send()
.await
},
|err| match err.as_service_error() {
Some(GetItemError::ProvisionedThroughputExceededException(_)) => true,
_ => false,
},
)
.await
.map_err(|err| match err.as_service_error() {
Some(GetItemError::ProvisionedThroughputExceededException(_)) => {
LockClientError::ProvisionedThroughputExceeded
}
})
.await?;
_ => err.into(),
})?;
item.item.as_ref().map(CommitEntry::try_from).transpose()
}

Expand All @@ -314,36 +315,38 @@ impl DynamoDbLockClient {
table_path: &str,
entry: &CommitEntry,
) -> Result<(), LockClientError> {
self.retry(|| async {
let item = create_value_map(entry, table_path);
match self
.dynamodb_client
.put_item()
.condition_expression(constants::CONDITION_EXPR_CREATE.as_str())
.table_name(self.get_lock_table_name())
.set_item(Some(item))
.send()
.await
{
Ok(_) => Ok(()),
Err(err) => match err.as_service_error() {
Some(PutItemError::ProvisionedThroughputExceededException(_)) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Some(PutItemError::ConditionalCheckFailedException(_)) => Err(
backoff::Error::permanent(LockClientError::VersionAlreadyExists {
table_path: table_path.to_owned(),
version: entry.version,
}),
),
Some(PutItemError::ResourceNotFoundException(_)) => Err(
backoff::Error::permanent(LockClientError::LockTableNotFound),
),
_ => Err(backoff::Error::permanent(err.into())),
},
self.retry(
|| async {
let item = create_value_map(entry, table_path);
let _ = self
.dynamodb_client
.put_item()
.condition_expression(constants::CONDITION_EXPR_CREATE.as_str())
.table_name(self.get_lock_table_name())
.set_item(Some(item))
.send()
.await?;
Ok(())
},
|err: &SdkError<_, _>| match err.as_service_error() {
Some(PutItemError::ProvisionedThroughputExceededException(_)) => true,
_ => false,
},
)
.await
.map_err(|err| match err.as_service_error() {
Some(PutItemError::ProvisionedThroughputExceededException(_)) => {
LockClientError::ProvisionedThroughputExceeded
}
Some(PutItemError::ConditionalCheckFailedException(_)) => {
LockClientError::VersionAlreadyExists {
table_path: table_path.to_owned(),
version: entry.version,
}
}
Some(PutItemError::ResourceNotFoundException(_)) => LockClientError::LockTableNotFound,
_ => err.into(),
})
.await
}

/// Get the latest entry (entry with highest version).
Expand All @@ -365,33 +368,33 @@ impl DynamoDbLockClient {
limit: i64,
) -> Result<Vec<CommitEntry>, LockClientError> {
let query_result = self
.retry(|| async {
match self
.dynamodb_client
.query()
.table_name(self.get_lock_table_name())
.consistent_read(true)
.limit(limit.try_into().unwrap_or(i32::MAX))
.scan_index_forward(false)
.key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH))
.set_expression_attribute_values(Some(
maplit::hashmap!(":tn".into() => string_attr(table_path)),
))
.send()
.await
{
Ok(result) => Ok(result),
Err(sdk_err) => match sdk_err.as_service_error() {
Some(QueryError::ProvisionedThroughputExceededException(_)) => {
Err(backoff::Error::transient(
LockClientError::ProvisionedThroughputExceeded,
))
}
_ => Err(backoff::Error::permanent(sdk_err.into())),
},
.retry(
|| async {
self.dynamodb_client
.query()
.table_name(self.get_lock_table_name())
.consistent_read(true)
.limit(limit.try_into().unwrap_or(i32::MAX))
.scan_index_forward(false)
.key_condition_expression(format!("{} = :tn", constants::ATTR_TABLE_PATH))
.set_expression_attribute_values(Some(
maplit::hashmap!(":tn".into() => string_attr(table_path)),
))
.send()
.await
},
|err: &SdkError<_, _>| match err.as_service_error() {
Some(QueryError::ProvisionedThroughputExceededException(_)) => true,
_ => false,
},
)
.await
.map_err(|err| match err.as_service_error() {
Some(QueryError::ProvisionedThroughputExceededException(_)) => {
LockClientError::ProvisionedThroughputExceeded
}
})
.await?;
_ => err.into(),
})?;

query_result
.items
Expand All @@ -412,35 +415,44 @@ impl DynamoDbLockClient {
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs();
self.retry(|| async {
match self
.dynamodb_client
.update_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.update_expression("SET complete = :c, expireTime = :e".to_owned())
.set_expression_attribute_values(Some(maplit::hashmap! {
":c".to_owned() => string_attr("true"),
":e".to_owned() => num_attr(seconds_since_epoch),
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_UPDATE_INCOMPLETE)
.send()
.await
{
Ok(_) => Ok(UpdateLogEntryResult::UpdatePerformed),
Err(err) => match err.as_service_error() {
Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Some(UpdateItemError::ConditionalCheckFailedException(_)) => {
Ok(UpdateLogEntryResult::AlreadyCompleted)
}
_ => Err(backoff::Error::permanent(err.into())),
let res = self
.retry(
|| async {
let _ = self
.dynamodb_client
.update_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.update_expression("SET complete = :c, expireTime = :e".to_owned())
.set_expression_attribute_values(Some(maplit::hashmap! {
":c".to_owned() => string_attr("true"),
":e".to_owned() => num_attr(seconds_since_epoch),
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_UPDATE_INCOMPLETE)
.send()
.await?;
Ok(())
},
}
})
.await
|err: &SdkError<_, _>| match err.as_service_error() {
Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => true,
_ => false,
},
)
.await;

match res {
Ok(()) => Ok(UpdateLogEntryResult::UpdatePerformed),
Err(err) => match err.as_service_error() {
Some(UpdateItemError::ProvisionedThroughputExceededException(_)) => {
Err(LockClientError::ProvisionedThroughputExceeded)
}
Some(UpdateItemError::ConditionalCheckFailedException(_)) => {
Ok(UpdateLogEntryResult::AlreadyCompleted)
}
_ => Err(err.into()),
},
}
}

/// Delete existing log entry if it is not already complete
Expand All @@ -449,48 +461,52 @@ impl DynamoDbLockClient {
version: i64,
table_path: &str,
) -> Result<(), LockClientError> {
self.retry(|| async {
match self
.dynamodb_client
.delete_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.set_expression_attribute_values(Some(maplit::hashmap! {
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str())
.send()
.await
{
Ok(_) => Ok(()),
Err(err) => match err.as_service_error() {
Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err(
backoff::Error::permanent(LockClientError::VersionAlreadyCompleted {
table_path: table_path.to_owned(),
version,
}),
),
_ => Err(backoff::Error::permanent(err.into())),
},
self.retry(
|| async {
let _ = self
.dynamodb_client
.delete_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.set_expression_attribute_values(Some(maplit::hashmap! {
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str())
.send()
.await?;
Ok(())
},
|err: &SdkError<_, _>| match err.as_service_error() {
Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => true,
_ => false,
},
)
.await
.map_err(|err| match err.as_service_error() {
Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => {
LockClientError::ProvisionedThroughputExceeded
}
Some(DeleteItemError::ConditionalCheckFailedException(_)) => {
LockClientError::VersionAlreadyCompleted {
table_path: table_path.to_owned(),
version,
}
}
_ => err.into(),
})
.await
}

async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
async fn retry<I, E, F, Fut, Wn>(&self, operation: F, when: Wn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, backoff::Error<E>>>,
F: FnMut() -> Fut,
Fut: std::future::Future<Output = Result<I, E>>,
Wn: Fn(&E) -> bool,
{
let backoff = backoff::ExponentialBackoffBuilder::new()
.with_multiplier(2.)
.with_max_interval(Duration::from_secs(15))
.with_max_elapsed_time(Some(self.config.max_elapsed_request_time))
.build();
backoff::future::retry(backoff, operation).await
use backon::Retryable;
let backoff = backon::ExponentialBuilder::default()
.with_factor(2.)
.with_max_delay(self.config.max_elapsed_request_time);
operation.retry(backoff).when(when).await
}
}

Expand Down

0 comments on commit c4f4ee0

Please sign in to comment.