Skip to content

Commit

Permalink
Move to traits
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Apr 15, 2024
1 parent 6e51717 commit f6a369a
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 40 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

![GitHub Workflow Status](https://img.shields.io/github/actions/workflow/status/kitsune-soc/kitsune/rust.yml?style=for-the-badge)
[![dependency status](https://deps.rs/repo/github/kitsune-soc/kitsune/status.svg?style=for-the-badge)](https://deps.rs/repo/github/kitsune-soc/kitsune)
[![Maintainance: Experimental](https://img.shields.io/badge/maintainance-experimental-blue?style=for-the-badge)](https://gist.github.com/taiki-e/ad73eaea17e2e0372efb76ef6b38f17b)
[![Maintenance: Experimental](https://img.shields.io/badge/maintainance-experimental-blue?style=for-the-badge)](https://gist.github.com/taiki-e/ad73eaea17e2e0372efb76ef6b38f17b)

</div>

Expand Down
3 changes: 2 additions & 1 deletion crates/kitsune-service/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use athena::{JobDetails, JobQueue};
use iso8601_timestamp::Timestamp;
use kitsune_error::Result;
use kitsune_jobs::{Job, KitsuneContextRepo};
use std::sync::Arc;
use typed_builder::TypedBuilder;

#[derive(TypedBuilder)]
Expand All @@ -13,7 +14,7 @@ pub struct Enqueue<T> {

#[derive(Clone, TypedBuilder)]
pub struct JobService {
job_queue: JobQueue<KitsuneContextRepo>,
job_queue: Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>,
}

impl JobService {
Expand Down
6 changes: 3 additions & 3 deletions crates/kitsune-service/src/post/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod test {
account::AccountService, attachment::AttachmentService, custom_emoji::CustomEmojiService,
job::JobService,
};
use athena::JobQueue;
use athena::RedisJobQueue;
use core::convert::Infallible;
use diesel::{QueryDsl, SelectableHelper};
use diesel_async::RunQueryDsl;
Expand Down Expand Up @@ -184,13 +184,13 @@ mod test {
.build();

let context_repo = KitsuneContextRepo::builder().db_pool(db_pool.clone()).build();
let job_queue = JobQueue::builder()
let job_queue = RedisJobQueue::builder()
.context_repository(context_repo)
.queue_name("parse_mentions_test")
.redis_pool(redis_pool)
.build();

let job_service = JobService::builder().job_queue(job_queue).build();
let job_service = JobService::builder().job_queue(Arc::new(job_queue)).build();

let url_service = UrlService::builder()
.domain("example.com")
Expand Down
10 changes: 5 additions & 5 deletions kitsune-job-runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[macro_use]
extern crate tracing;

use athena::{JobQueue, TaskTracker};
use athena::{JobQueue, RedisJobQueue, TaskTracker};
use just_retry::RetryExt;
use kitsune_config::job_queue::Configuration;
use kitsune_db::PgPool;
Expand Down Expand Up @@ -37,7 +37,7 @@ pub struct JobDispatcherState {
pub async fn prepare_job_queue(
db_pool: PgPool,
config: &Configuration,
) -> RedisResult<JobQueue<KitsuneContextRepo>> {
) -> RedisResult<Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>> {

Check warning on line 40 in kitsune-job-runner/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kitsune-job-runner/src/lib.rs#L40

Added line #L40 was not covered by tests
let context_repo = KitsuneContextRepo::builder().db_pool(db_pool).build();

let client = redis::Client::open(config.redis_url.as_str())?;
Expand All @@ -48,18 +48,18 @@ pub async fn prepare_job_queue(
)
.await?;

let queue = JobQueue::builder()
let queue = RedisJobQueue::builder()

Check warning on line 51 in kitsune-job-runner/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kitsune-job-runner/src/lib.rs#L51

Added line #L51 was not covered by tests
.context_repository(context_repo)
.queue_name("kitsune-jobs")
.redis_pool(redis_pool)
.build();

Ok(queue)
Ok(Arc::new(queue))

Check warning on line 57 in kitsune-job-runner/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kitsune-job-runner/src/lib.rs#L57

Added line #L57 was not covered by tests
}

#[instrument(skip(job_queue, state))]
pub async fn run_dispatcher(
job_queue: JobQueue<KitsuneContextRepo>,
job_queue: Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>,
state: JobDispatcherState,
num_job_workers: usize,
) {
Expand Down
6 changes: 3 additions & 3 deletions kitsune/src/http/handler/well_known/webfinger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub fn routes() -> Router<Zustand> {
#[cfg(test)]
mod tests {
use super::{get, WebfingerQuery};
use athena::JobQueue;
use athena::RedisJobQueue;
use axum::{
extract::{Query, State},
Json,
Expand Down Expand Up @@ -148,13 +148,13 @@ mod tests {
let context_repo = KitsuneContextRepo::builder()
.db_pool(db_pool.clone())
.build();
let job_queue = JobQueue::builder()
let job_queue = RedisJobQueue::builder()
.context_repository(context_repo)
.queue_name("webfinger_test")
.redis_pool(redis_pool)
.build();

let job_service = JobService::builder().job_queue(job_queue).build();
let job_service = JobService::builder().job_queue(Arc::new(job_queue)).build();

AccountService::builder()
.attachment_service(attachment_service)
Expand Down
3 changes: 2 additions & 1 deletion kitsune/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use kitsune_service::{
};
use kitsune_url::UrlService;
use kitsune_wasm_mrf::MrfService;
use std::sync::Arc;

#[cfg(feature = "oidc")]
use {futures_util::future::OptionFuture, kitsune_oidc::OidcService};
Expand All @@ -49,7 +50,7 @@ use {futures_util::future::OptionFuture, kitsune_oidc::OidcService};
pub async fn initialise_state(
config: &Configuration,
db_pool: PgPool,
job_queue: JobQueue<KitsuneContextRepo>,
job_queue: Arc<dyn JobQueue<ContextRepository = KitsuneContextRepo>>,

Check warning on line 53 in kitsune/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

kitsune/src/lib.rs#L53

Added line #L53 was not covered by tests
) -> eyre::Result<Zustand> {
let url_service = UrlService::builder()
.domain(config.url.domain.clone())
Expand Down
1 change: 1 addition & 0 deletions lib/athena/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "MIT OR Apache-2.0"

[dependencies]
ahash = "0.8.11"
async-trait = "0.1.80"
either = { version = "1.11.0", default-features = false }
futures-util = { version = "0.3.30", default-features = false }
iso8601-timestamp = { version = "0.2.17", features = ["diesel-pg"] }
Expand Down
4 changes: 2 additions & 2 deletions lib/athena/examples/basic_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use athena::{JobContextRepository, JobDetails, JobQueue, Runnable};
use athena::{JobContextRepository, JobDetails, JobQueue, RedisJobQueue, Runnable};
use futures_util::{
stream::{self, BoxStream},
StreamExt,
Expand Down Expand Up @@ -71,7 +71,7 @@ async fn main() {
.await
.unwrap();

let queue = JobQueue::builder()
let queue = RedisJobQueue::builder()
.context_repository(ContextRepo)
.queue_name("test_queue")
.redis_pool(pool)
Expand Down
26 changes: 23 additions & 3 deletions lib/athena/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
extern crate tracing;

use self::error::{BoxError, Result};
use async_trait::async_trait;
use futures_util::{Future, Stream};
use iso8601_timestamp::Timestamp;
use speedy_uuid::Uuid;
use std::sync::Arc;
use typed_builder::TypedBuilder;

pub use self::{error::Error, queue::JobQueue};
pub use self::{error::Error, redis::JobQueue as RedisJobQueue};
pub use tokio_util::task::TaskTracker;

mod error;
mod macros;
mod queue;
mod redis;

#[derive(TypedBuilder)]
Expand All @@ -27,6 +28,25 @@ pub struct JobDetails<C> {
run_at: Option<Timestamp>,
}

#[async_trait]
pub trait JobQueue: Send + Sync {
type ContextRepository: JobContextRepository;

async fn enqueue(
&self,
job_details: JobDetails<<Self::ContextRepository as JobContextRepository>::JobContext>,
) -> Result<()>;

async fn spawn_jobs(
&self,
max_jobs: usize,
run_ctx: Arc<
<<Self::ContextRepository as JobContextRepository>::JobContext as Runnable>::Context,
>,
join_set: &TaskTracker,
) -> Result<()>;
}

pub trait Runnable {
/// User-defined context that is getting passed to the job when run
///
Expand All @@ -45,7 +65,7 @@ pub trait JobContextRepository {
/// To support multiple job types per repository, consider using the enum dispatch technique
type JobContext: Runnable + Send + Sync + 'static;
type Error: Into<BoxError>;
type Stream: Stream<Item = Result<(Uuid, Self::JobContext), Self::Error>>;
type Stream: Stream<Item = Result<(Uuid, Self::JobContext), Self::Error>> + Send;

/// Batch fetch job contexts
///
Expand Down
1 change: 0 additions & 1 deletion lib/athena/src/queue/mod.rs

This file was deleted.

49 changes: 29 additions & 20 deletions lib/athena/src/redis/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use self::{scheduled::ScheduledJobActor, util::StreamAutoClaimReply};
use crate::{error::Result, impl_to_redis_args, Error, JobContextRepository, JobDetails, Runnable};
use ahash::AHashMap;
use async_trait::async_trait;
use either::Either;
use futures_util::StreamExt;
use iso8601_timestamp::Timestamp;
Expand Down Expand Up @@ -158,25 +159,6 @@ where
Ok(cmd)
}

pub async fn enqueue(&self, job_details: JobDetails<CR::JobContext>) -> Result<()> {
let job_meta = JobMeta {
job_id: job_details.job_id,
fail_count: job_details.fail_count,
};

self.context_repository
.store_context(job_meta.job_id, job_details.context)
.await
.map_err(|err| Error::ContextRepository(err.into()))?;

let mut redis_conn = self.redis_pool.get();
self.enqueue_redis_cmd(&job_meta, job_details.run_at)?
.query_async(&mut redis_conn)
.await?;

Ok(())
}

async fn fetch_job_data(
&self,
max_jobs: usize,
Expand Down Expand Up @@ -305,8 +287,35 @@ where

Ok(())
}
}

#[async_trait]
impl<CR> crate::JobQueue for JobQueue<CR>
where
CR: JobContextRepository + Send + Sync + 'static,
{
type ContextRepository = CR;

async fn enqueue(&self, job_details: JobDetails<CR::JobContext>) -> Result<()> {
let job_meta = JobMeta {
job_id: job_details.job_id,
fail_count: job_details.fail_count,
};

self.context_repository
.store_context(job_meta.job_id, job_details.context)
.await
.map_err(|err| Error::ContextRepository(err.into()))?;

Check warning on line 308 in lib/athena/src/redis/mod.rs

View check run for this annotation

Codecov / codecov/patch

lib/athena/src/redis/mod.rs#L299-L308

Added lines #L299 - L308 were not covered by tests

let mut redis_conn = self.redis_pool.get();
self.enqueue_redis_cmd(&job_meta, job_details.run_at)?
.query_async(&mut redis_conn)
.await?;

Check warning on line 313 in lib/athena/src/redis/mod.rs

View check run for this annotation

Codecov / codecov/patch

lib/athena/src/redis/mod.rs#L310-L313

Added lines #L310 - L313 were not covered by tests

Ok(())
}

Check warning on line 316 in lib/athena/src/redis/mod.rs

View check run for this annotation

Codecov / codecov/patch

lib/athena/src/redis/mod.rs#L315-L316

Added lines #L315 - L316 were not covered by tests

pub async fn spawn_jobs(
async fn spawn_jobs(
&self,
max_jobs: usize,
run_ctx: Arc<<CR::JobContext as Runnable>::Context>,
Expand Down

0 comments on commit f6a369a

Please sign in to comment.