diff --git a/Cargo.lock b/Cargo.lock index 03dd4af07..dd162e020 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -492,6 +492,7 @@ name = "athena" version = "0.0.1-pre.6" dependencies = [ "ahash 0.8.11", + "async-trait", "either", "futures-util", "iso8601-timestamp", diff --git a/README.md b/README.md index 1ac833edd..61a76e247 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ ![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/kitsune-soc/kitsune/rust.yml?style=for-the-badge) [![dependency status](https://deps.rs/repo/github/kitsune-soc/kitsune/status.svg?style=for-the-badge)](https://deps.rs/repo/github/kitsune-soc/kitsune) -[![Maintainance: Experimental](https://img.shields.io/badge/maintainance-experimental-blue?style=for-the-badge)](https://gist.github.com/taiki-e/ad73eaea17e2e0372efb76ef6b38f17b) +[![Maintenance: Experimental](https://img.shields.io/badge/maintainance-experimental-blue?style=for-the-badge)](https://gist.github.com/taiki-e/ad73eaea17e2e0372efb76ef6b38f17b) diff --git a/crates/kitsune-service/src/job.rs b/crates/kitsune-service/src/job.rs index 6177cecd0..aa1562c95 100644 --- a/crates/kitsune-service/src/job.rs +++ b/crates/kitsune-service/src/job.rs @@ -2,6 +2,7 @@ use athena::{JobDetails, JobQueue}; use iso8601_timestamp::Timestamp; use kitsune_error::Result; use kitsune_jobs::{Job, KitsuneContextRepo}; +use std::sync::Arc; use typed_builder::TypedBuilder; #[derive(TypedBuilder)] @@ -13,7 +14,7 @@ pub struct Enqueue { #[derive(Clone, TypedBuilder)] pub struct JobService { - job_queue: JobQueue, + job_queue: Arc>, } impl JobService { diff --git a/crates/kitsune-service/src/post/resolver.rs b/crates/kitsune-service/src/post/resolver.rs index 0bc508b78..1e4ac0a3d 100644 --- a/crates/kitsune-service/src/post/resolver.rs +++ b/crates/kitsune-service/src/post/resolver.rs @@ -111,7 +111,7 @@ mod test { account::AccountService, attachment::AttachmentService, custom_emoji::CustomEmojiService, job::JobService, }; - use athena::JobQueue; + use athena::RedisJobQueue; use core::convert::Infallible; use diesel::{QueryDsl, SelectableHelper}; use diesel_async::RunQueryDsl; @@ -184,13 +184,13 @@ mod test { .build(); let context_repo = KitsuneContextRepo::builder().db_pool(db_pool.clone()).build(); - let job_queue = JobQueue::builder() + let job_queue = RedisJobQueue::builder() .context_repository(context_repo) .queue_name("parse_mentions_test") .redis_pool(redis_pool) .build(); - let job_service = JobService::builder().job_queue(job_queue).build(); + let job_service = JobService::builder().job_queue(Arc::new(job_queue)).build(); let url_service = UrlService::builder() .domain("example.com") diff --git a/kitsune-job-runner/src/lib.rs b/kitsune-job-runner/src/lib.rs index f5c2c529c..4a25544c8 100644 --- a/kitsune-job-runner/src/lib.rs +++ b/kitsune-job-runner/src/lib.rs @@ -1,7 +1,7 @@ #[macro_use] extern crate tracing; -use athena::{JobQueue, TaskTracker}; +use athena::{JobQueue, RedisJobQueue, TaskTracker}; use just_retry::RetryExt; use kitsune_config::job_queue::Configuration; use kitsune_db::PgPool; @@ -37,7 +37,7 @@ pub struct JobDispatcherState { pub async fn prepare_job_queue( db_pool: PgPool, config: &Configuration, -) -> RedisResult> { +) -> RedisResult>> { let context_repo = KitsuneContextRepo::builder().db_pool(db_pool).build(); let client = redis::Client::open(config.redis_url.as_str())?; @@ -48,18 +48,18 @@ pub async fn prepare_job_queue( ) .await?; - let queue = JobQueue::builder() + let queue = RedisJobQueue::builder() .context_repository(context_repo) .queue_name("kitsune-jobs") .redis_pool(redis_pool) .build(); - Ok(queue) + Ok(Arc::new(queue)) } #[instrument(skip(job_queue, state))] pub async fn run_dispatcher( - job_queue: JobQueue, + job_queue: Arc>, state: JobDispatcherState, num_job_workers: usize, ) { diff --git a/kitsune/src/http/handler/well_known/webfinger.rs b/kitsune/src/http/handler/well_known/webfinger.rs index 97cced1c0..37c49c078 100644 --- a/kitsune/src/http/handler/well_known/webfinger.rs +++ b/kitsune/src/http/handler/well_known/webfinger.rs @@ -67,7 +67,7 @@ pub fn routes() -> Router { #[cfg(test)] mod tests { use super::{get, WebfingerQuery}; - use athena::JobQueue; + use athena::RedisJobQueue; use axum::{ extract::{Query, State}, Json, @@ -148,13 +148,13 @@ mod tests { let context_repo = KitsuneContextRepo::builder() .db_pool(db_pool.clone()) .build(); - let job_queue = JobQueue::builder() + let job_queue = RedisJobQueue::builder() .context_repository(context_repo) .queue_name("webfinger_test") .redis_pool(redis_pool) .build(); - let job_service = JobService::builder().job_queue(job_queue).build(); + let job_service = JobService::builder().job_queue(Arc::new(job_queue)).build(); AccountService::builder() .attachment_service(attachment_service) diff --git a/kitsune/src/lib.rs b/kitsune/src/lib.rs index c728505cc..6839f8394 100644 --- a/kitsune/src/lib.rs +++ b/kitsune/src/lib.rs @@ -41,6 +41,7 @@ use kitsune_service::{ }; use kitsune_url::UrlService; use kitsune_wasm_mrf::MrfService; +use std::sync::Arc; #[cfg(feature = "oidc")] use {futures_util::future::OptionFuture, kitsune_oidc::OidcService}; @@ -49,7 +50,7 @@ use {futures_util::future::OptionFuture, kitsune_oidc::OidcService}; pub async fn initialise_state( config: &Configuration, db_pool: PgPool, - job_queue: JobQueue, + job_queue: Arc>, ) -> eyre::Result { let url_service = UrlService::builder() .domain(config.url.domain.clone()) diff --git a/lib/athena/Cargo.toml b/lib/athena/Cargo.toml index 29a1386de..a66b3ecc9 100644 --- a/lib/athena/Cargo.toml +++ b/lib/athena/Cargo.toml @@ -7,6 +7,7 @@ license = "MIT OR Apache-2.0" [dependencies] ahash = "0.8.11" +async-trait = "0.1.80" either = { version = "1.11.0", default-features = false } futures-util = { version = "0.3.30", default-features = false } iso8601-timestamp = { version = "0.2.17", features = ["diesel-pg"] } diff --git a/lib/athena/examples/basic_queue.rs b/lib/athena/examples/basic_queue.rs index 352dd8fd8..f060f6aaf 100644 --- a/lib/athena/examples/basic_queue.rs +++ b/lib/athena/examples/basic_queue.rs @@ -1,4 +1,4 @@ -use athena::{JobContextRepository, JobDetails, JobQueue, Runnable}; +use athena::{JobContextRepository, JobDetails, JobQueue, RedisJobQueue, Runnable}; use futures_util::{ stream::{self, BoxStream}, StreamExt, @@ -71,7 +71,7 @@ async fn main() { .await .unwrap(); - let queue = JobQueue::builder() + let queue = RedisJobQueue::builder() .context_repository(ContextRepo) .queue_name("test_queue") .redis_pool(pool) diff --git a/lib/athena/src/lib.rs b/lib/athena/src/lib.rs index 0cf7d26bf..478ff36df 100644 --- a/lib/athena/src/lib.rs +++ b/lib/athena/src/lib.rs @@ -2,17 +2,18 @@ extern crate tracing; use self::error::{BoxError, Result}; +use async_trait::async_trait; use futures_util::{Future, Stream}; use iso8601_timestamp::Timestamp; use speedy_uuid::Uuid; +use std::sync::Arc; use typed_builder::TypedBuilder; -pub use self::{error::Error, queue::JobQueue}; +pub use self::{error::Error, redis::JobQueue as RedisJobQueue}; pub use tokio_util::task::TaskTracker; mod error; mod macros; -mod queue; mod redis; #[derive(TypedBuilder)] @@ -27,6 +28,25 @@ pub struct JobDetails { run_at: Option, } +#[async_trait] +pub trait JobQueue: Send + Sync { + type ContextRepository: JobContextRepository; + + async fn enqueue( + &self, + job_details: JobDetails<::JobContext>, + ) -> Result<()>; + + async fn spawn_jobs( + &self, + max_jobs: usize, + run_ctx: Arc< + <::JobContext as Runnable>::Context, + >, + join_set: &TaskTracker, + ) -> Result<()>; +} + pub trait Runnable { /// User-defined context that is getting passed to the job when run /// @@ -45,7 +65,7 @@ pub trait JobContextRepository { /// To support multiple job types per repository, consider using the enum dispatch technique type JobContext: Runnable + Send + Sync + 'static; type Error: Into; - type Stream: Stream>; + type Stream: Stream> + Send; /// Batch fetch job contexts /// diff --git a/lib/athena/src/queue/mod.rs b/lib/athena/src/queue/mod.rs deleted file mode 100644 index 8b1378917..000000000 --- a/lib/athena/src/queue/mod.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/lib/athena/src/redis/mod.rs b/lib/athena/src/redis/mod.rs index 6ec8ce5f7..8c74e41c0 100644 --- a/lib/athena/src/redis/mod.rs +++ b/lib/athena/src/redis/mod.rs @@ -1,6 +1,7 @@ use self::{scheduled::ScheduledJobActor, util::StreamAutoClaimReply}; use crate::{error::Result, impl_to_redis_args, Error, JobContextRepository, JobDetails, Runnable}; use ahash::AHashMap; +use async_trait::async_trait; use either::Either; use futures_util::StreamExt; use iso8601_timestamp::Timestamp; @@ -158,25 +159,6 @@ where Ok(cmd) } - pub async fn enqueue(&self, job_details: JobDetails) -> Result<()> { - let job_meta = JobMeta { - job_id: job_details.job_id, - fail_count: job_details.fail_count, - }; - - self.context_repository - .store_context(job_meta.job_id, job_details.context) - .await - .map_err(|err| Error::ContextRepository(err.into()))?; - - let mut redis_conn = self.redis_pool.get(); - self.enqueue_redis_cmd(&job_meta, job_details.run_at)? - .query_async(&mut redis_conn) - .await?; - - Ok(()) - } - async fn fetch_job_data( &self, max_jobs: usize, @@ -305,8 +287,35 @@ where Ok(()) } +} + +#[async_trait] +impl crate::JobQueue for JobQueue +where + CR: JobContextRepository + Send + Sync + 'static, +{ + type ContextRepository = CR; + + async fn enqueue(&self, job_details: JobDetails) -> Result<()> { + let job_meta = JobMeta { + job_id: job_details.job_id, + fail_count: job_details.fail_count, + }; + + self.context_repository + .store_context(job_meta.job_id, job_details.context) + .await + .map_err(|err| Error::ContextRepository(err.into()))?; + + let mut redis_conn = self.redis_pool.get(); + self.enqueue_redis_cmd(&job_meta, job_details.run_at)? + .query_async(&mut redis_conn) + .await?; + + Ok(()) + } - pub async fn spawn_jobs( + async fn spawn_jobs( &self, max_jobs: usize, run_ctx: Arc<::Context>,