Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed May 20, 2024
1 parent d85c0f9 commit a8fa758
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 35 deletions.
68 changes: 42 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/kitsune-wasm-mrf/example-mrf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ crate-type = ["cdylib"]

[dependencies]
rand = "0.8.5"
wit-bindgen = "0.24.0"
wit-bindgen = "0.25.0"

[lints]
workspace = true
16 changes: 8 additions & 8 deletions lib/athena/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use either::Either;
use fred::{
clients::RedisPool,
interfaces::{SortedSetsInterface, StreamsInterface},
types::{FromRedis, RedisValue, XID},
types::{FromRedis, RedisValue, XReadResponse, XReadValue, XID},
};
use iso8601_timestamp::Timestamp;
use just_retry::{
Expand All @@ -18,7 +18,7 @@ use just_retry::{
};
use smol_str::SmolStr;
use speedy_uuid::Uuid;
use std::{collections::HashMap, mem, ops::ControlFlow, str::FromStr, time::SystemTime};
use std::{mem, ops::ControlFlow, str::FromStr, time::SystemTime};
use tokio::sync::OnceCell;
use triomphe::Arc;
use typed_builder::TypedBuilder;
Expand Down Expand Up @@ -103,7 +103,7 @@ where
client
.xadd(
self.queue_name.as_str(),
true,
false,
None,
XID::Auto,
vec![
Expand Down Expand Up @@ -151,7 +151,7 @@ where
async fn fetch_job_data(&self, max_jobs: usize) -> Result<Vec<JobData>> {
self.initialise_group().await?;

let (_start, claimed_ids): (_, Vec<(String, HashMap<String, RedisValue>)>) = self
let (_start, claimed_ids): (_, Vec<XReadValue<String, String, RedisValue>>) = self
.redis_pool
.xautoclaim_values(
self.queue_name.as_str(),
Expand All @@ -168,18 +168,18 @@ where
Either::Left(claimed_ids.into_iter())
} else {
let block_time = if claimed_ids.is_empty() {
None
0
} else {
Some(BLOCK_TIME.as_millis() as u64)
BLOCK_TIME.as_millis()
};

let read_reply: HashMap<String, Vec<(String, HashMap<String, RedisValue>)>> = self
let read_reply: XReadResponse<String, String, String, RedisValue> = self
.redis_pool
.xreadgroup_map(
self.consumer_group.as_str(),
self.consumer_name.as_str(),
Some((max_jobs - claimed_ids.len()) as u64),
block_time,
Some(block_time as u64),
false,
self.queue_name.as_str(),
XID::NewInGroup,
Expand Down

0 comments on commit a8fa758

Please sign in to comment.