Skip to content

Commit

Permalink
progress
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed May 13, 2024
1 parent 9ab2174 commit 15b93c7
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 37 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/kitsune-oidc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license.workspace = true

[dependencies]
enum_dispatch = "0.3.13"
fred = { workspace = true }
http = "1.1.0"
http-body-util = "0.1.1"
kitsune-config = { path = "../kitsune-config" }
Expand Down
12 changes: 4 additions & 8 deletions crates/kitsune-oidc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use crate::state::{
store::{InMemory as InMemoryStore, Redis as RedisStore},
LoginState, OAuth2LoginState, Store,
};
use fred::{clients::RedisPool, types::RedisConfig};
use kitsune_config::oidc::{Configuration, StoreConfiguration};
use kitsune_derive::kitsune_service;
use kitsune_error::{bail, kitsune_error, Result};
use multiplex_pool::RoundRobinStrategy;
use openidconnect::{
core::{CoreAuthenticationFlow, CoreClient, CoreProviderMetadata},
AccessTokenHash, AuthorizationCode, ClientId, ClientSecret, CsrfToken, IssuerUrl, Nonce,
Expand Down Expand Up @@ -92,13 +92,9 @@ impl OidcService {
let login_state_store = match config.store {
StoreConfiguration::InMemory => InMemoryStore::new(LOGIN_STATE_STORE_SIZE).into(),
StoreConfiguration::Redis(ref redis_config) => {
let client = redis::Client::open(redis_config.url.as_str())?;
let pool = multiplex_pool::Pool::from_producer(
|| client.get_connection_manager(),
10,
RoundRobinStrategy::default(),
)
.await?;
let config = RedisConfig::from_url(redis_config.url.as_str())?;
// TODO: Make pool size configurable
let pool = RedisPool::new(config, None, None, None, 10)?;

RedisStore::new(pool).into()
}
Expand Down
14 changes: 7 additions & 7 deletions crates/kitsune-oidc/src/state/store/redis.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use super::Store;
use crate::state::LoginState;
use fred::{clients::RedisPool, interfaces::KeysInterface};
use kitsune_error::Result;
use redis::{aio::ConnectionManager, AsyncCommands};

const REDIS_PREFIX: &str = "OIDC-LOGIN-STATE";

#[derive(Clone)]
pub struct Redis {
pool: multiplex_pool::Pool<ConnectionManager>,
pool: RedisPool,
}

impl Redis {
pub fn new(pool: multiplex_pool::Pool<ConnectionManager>) -> Self {
pub fn new(pool: RedisPool) -> Self {
Self { pool }
}

Expand All @@ -23,17 +23,17 @@ impl Redis {

impl Store for Redis {
async fn get_and_remove(&self, key: &str) -> Result<LoginState> {
let mut conn = self.pool.get();
let raw_value: String = conn.get_del(Self::format_key(key)).await?;
let raw_value: String = self.pool.getdel(Self::format_key(key)).await?;

let mut raw_value = raw_value.into_bytes();
Ok(simd_json::from_slice(&mut raw_value)?)
}

async fn set(&self, key: &str, value: LoginState) -> Result<()> {
let raw_value = simd_json::to_string(&value)?;
let mut conn = self.pool.get();
conn.set(Self::format_key(key), raw_value).await?;
self.pool
.set(Self::format_key(key), raw_value, None, None, false)
.await?;

Ok(())
}
Expand Down
47 changes: 25 additions & 22 deletions lib/athena/src/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use async_trait::async_trait;
use either::Either;
use fred::{
clients::RedisPool,
interfaces::{ClientLike, SortedSetsInterface, StreamsInterface},
types::RedisValue,
interfaces::{SortedSetsInterface, StreamsInterface},
types::{RedisValue, XID},
};
use iso8601_timestamp::Timestamp;
use just_retry::{
Expand Down Expand Up @@ -102,7 +102,7 @@ where
self.queue_name.as_str(),
true,
None,
"*",
XID::Auto,
vec![
("job_id", RedisValue::from(job_meta.job_id)),
("fail_count", RedisValue::from(job_meta.fail_count)),
Expand Down Expand Up @@ -146,38 +146,41 @@ where
}

async fn fetch_job_data(&self, max_jobs: usize) -> Result<Vec<JobData>> {
let mut redis_conn = self.redis_pool.get();
self.initialise_group().await?;

let StreamAutoClaimReply {
claimed: claimed_ids,
..
} = redis_conn
.xautoclaim_options(
let (_start, claimed_ids) = self
.redis_pool
.xautoclaim_values(
self.queue_name.as_str(),
self.consumer_group.as_str(),
self.consumer_name.as_str(),
MIN_IDLE_TIME.as_millis() as u64,
"0-0",
StreamAutoClaimOptions::default().count(max_jobs),
Some(max_jobs as u64),
false,
)
.await?;

let claimed_ids = if claimed_ids.len() == max_jobs {
Either::Left(claimed_ids.into_iter())
} else {
let mut read_opts = StreamReadOptions::default()
.count(max_jobs - claimed_ids.len())
.group(self.consumer_group.as_str(), self.consumer_name.as_str());

read_opts = if claimed_ids.is_empty() {
read_opts.block(0)
let block_time = if claimed_ids.is_empty() {
0
} else {
read_opts.block(BLOCK_TIME.as_millis() as usize)
BLOCK_TIME.as_millis()
};

let read_reply: Option<StreamReadReply> = redis_conn
.xread_options(&[self.queue_name.as_str()], &[">"], &read_opts)
let read_reply = self
.redis_pool
.xreadgroup_map(
self.consumer_group.as_str(),
self.consumer_name.as_str(),
Some((max_jobs - claimed_ids.len()) as u64),
Some(block_time as u64),
false,
self.queue_name.as_str(),
XID::NewInGroup,
)
.await?;

let read_ids = read_reply
Expand Down Expand Up @@ -215,14 +218,14 @@ where
let client = self.redis_pool.next();
let pipeline = client.pipeline();

() = pipeline
pipeline
.xack(
self.queue_name.as_str(),
self.consumer_group.as_str(),
stream_id.as_str(),
)
.await?;
() = pipeline
pipeline
.xdel(self.queue_name.as_str(), &[stream_id])
.await?;

Expand Down Expand Up @@ -253,7 +256,7 @@ where
Outcome::Success => true, // Execution succeeded, we don't need the context anymore
};

() = pipeline.last().await?;
pipeline.last().await?;

if remove_context {
self.context_repository
Expand Down

0 comments on commit 15b93c7

Please sign in to comment.