From 5997bc2eb26982687f05708b69388b84317d6bb4 Mon Sep 17 00:00:00 2001 From: Aumetra Weisman Date: Wed, 8 May 2024 19:16:47 +0200 Subject: [PATCH] restructure --- Cargo.lock | 8 +- .../src/meilisearch/http_client.rs | 121 +++++++++++++++++ .../{meilisearch.rs => meilisearch/mod.rs} | 123 +----------------- kitsune/Cargo.toml | 2 +- 4 files changed, 129 insertions(+), 125 deletions(-) create mode 100644 crates/kitsune-search/src/meilisearch/http_client.rs rename crates/kitsune-search/src/{meilisearch.rs => meilisearch/mod.rs} (53%) diff --git a/Cargo.lock b/Cargo.lock index 19bf4852b..f895e9232 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2241,9 +2241,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", "windows-sys 0.52.0", @@ -5937,9 +5937,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" +checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" dependencies = [ "anyhow", "itertools 0.12.1", diff --git a/crates/kitsune-search/src/meilisearch/http_client.rs b/crates/kitsune-search/src/meilisearch/http_client.rs new file mode 100644 index 000000000..df3997e46 --- /dev/null +++ b/crates/kitsune-search/src/meilisearch/http_client.rs @@ -0,0 +1,121 @@ +use async_trait::async_trait; +use bytes::Bytes; +use futures_util::Stream; +use http::header::CONTENT_TYPE; +use kitsune_http_client::Body as HttpBody; +use meilisearch_sdk::{errors::Error as MeilisearchError, request::Method}; +use pin_project_lite::pin_project; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + io, + pin::Pin, + task::{self, ready, Poll}, +}; + +const BUFFER_SIZE: usize = 1024; + +pin_project! { + struct AsyncReadBridge { + #[pin] + inner: R, + buf: Vec, + } +} + +impl AsyncReadBridge { + pub fn new(reader: R, buf_size: usize) -> Self { + Self { + inner: reader, + buf: vec![0; buf_size], + } + } +} + +impl Stream for AsyncReadBridge +where + R: futures_io::AsyncRead, +{ + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + let this = self.project(); + let amount_read = match ready!(this.inner.poll_read(cx, this.buf)) { + Ok(0) => return Poll::Ready(None), + Ok(amount_read) => amount_read, + Err(err) => return Poll::Ready(Some(Err(err))), + }; + + let bytes = Bytes::copy_from_slice(&this.buf[..amount_read]); + this.buf.clear(); + this.buf.fill(0); + + Poll::Ready(Some(Ok(bytes))) + } +} + +#[derive(Clone)] +pub struct HttpClient { + pub inner: kitsune_http_client::Client, +} + +#[async_trait] +impl meilisearch_sdk::request::HttpClient for HttpClient { + async fn stream_request< + Query: Serialize + Send + Sync, + Body: futures_io::AsyncRead + Send + Sync + 'static, + Output: DeserializeOwned + 'static, + >( + &self, + url: &str, + method: Method, + content_type: &str, + expected_status_code: u16, + ) -> Result { + let url = format!( + "{url}?{}", + serde_urlencoded::to_string(method.query()) + .map_err(|err| MeilisearchError::Other(err.into()))? + ); + + let request = http::Request::builder() + .uri(&url) + .header(CONTENT_TYPE, content_type); + + let request = match method { + Method::Get { .. } => request.method(http::Method::GET), + Method::Post { .. } => request.method(http::Method::POST), + Method::Patch { .. } => request.method(http::Method::PATCH), + Method::Put { .. } => request.method(http::Method::PUT), + Method::Delete { .. } => request.method(http::Method::DELETE), + }; + + let body = method + .map_body(|body| HttpBody::stream(AsyncReadBridge::new(body, BUFFER_SIZE))) + .into_body() + .unwrap_or_default(); + + let request = request + .body(body) + .map_err(|err| MeilisearchError::Other(err.into()))?; + + let response = self + .inner + .execute(request) + .await + .map_err(|err| MeilisearchError::Other(err.into()))?; + + if response.status().as_u16() != expected_status_code { + return Err(meilisearch_sdk::errors::MeilisearchCommunicationError { + status_code: response.status().as_u16(), + message: response.text().await.ok(), + url, + } + .into()); + } + + response + .json() + .await + .map_err(|err| MeilisearchError::Other(err.into())) + } +} diff --git a/crates/kitsune-search/src/meilisearch.rs b/crates/kitsune-search/src/meilisearch/mod.rs similarity index 53% rename from crates/kitsune-search/src/meilisearch.rs rename to crates/kitsune-search/src/meilisearch/mod.rs index 5919d3861..ed516b8ff 100644 --- a/crates/kitsune-search/src/meilisearch.rs +++ b/crates/kitsune-search/src/meilisearch/mod.rs @@ -1,128 +1,11 @@ +use self::http_client::HttpClient; use super::{Result, SearchBackend, SearchIndex, SearchItem, SearchResultReference}; -use async_trait::async_trait; -use bytes::Bytes; -use futures_util::Stream; -use http::header::CONTENT_TYPE; use meilisearch_sdk::{client::Client, indexes::Index, settings::Settings}; -use pin_project_lite::pin_project; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::Deserialize; use speedy_uuid::Uuid; -use std::{ - io, - pin::Pin, - task::{self, ready, Poll}, -}; use strum::IntoEnumIterator; -const BUFFER_SIZE: usize = 1024; - -pin_project! { - struct AsyncReadBridge { - #[pin] - inner: R, - buf: Vec, - } -} - -impl AsyncReadBridge { - pub fn new(reader: R, buf_size: usize) -> Self { - Self { - inner: reader, - buf: vec![0; buf_size], - } - } -} - -impl Stream for AsyncReadBridge -where - R: futures_io::AsyncRead, -{ - type Item = io::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { - let this = self.project(); - let amount_read = match ready!(this.inner.poll_read(cx, this.buf)) { - Ok(0) => return Poll::Ready(None), - Ok(amount_read) => amount_read, - Err(err) => return Poll::Ready(Some(Err(err))), - }; - - let bytes = Bytes::copy_from_slice(&this.buf[..amount_read]); - this.buf.clear(); - this.buf.fill(0); - - Poll::Ready(Some(Ok(bytes))) - } -} - -#[derive(Clone)] -struct HttpClient { - inner: kitsune_http_client::Client, -} - -#[async_trait] -impl meilisearch_sdk::request::HttpClient for HttpClient { - async fn stream_request< - Query: Serialize + Send + Sync, - Body: futures_io::AsyncRead + Send + Sync + 'static, - Output: DeserializeOwned + 'static, - >( - &self, - url: &str, - method: meilisearch_sdk::request::Method, - content_type: &str, - expected_status_code: u16, - ) -> Result { - let url = format!( - "{url}?{}", - serde_urlencoded::to_string(method.query()) - .map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))? - ); - - let request = http::Request::builder() - .uri(&url) - .header(CONTENT_TYPE, content_type); - - let request = match method { - meilisearch_sdk::request::Method::Get { .. } => request.method(http::Method::GET), - meilisearch_sdk::request::Method::Post { .. } => request.method(http::Method::POST), - meilisearch_sdk::request::Method::Patch { .. } => request.method(http::Method::PATCH), - meilisearch_sdk::request::Method::Put { .. } => request.method(http::Method::PUT), - meilisearch_sdk::request::Method::Delete { .. } => request.method(http::Method::DELETE), - }; - - let body = method - .map_body(|body| { - kitsune_http_client::Body::stream(AsyncReadBridge::new(body, BUFFER_SIZE)) - }) - .into_body() - .unwrap_or_else(kitsune_http_client::Body::empty); - - let request = request - .body(body) - .map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))?; - - let response = self - .inner - .execute(request) - .await - .map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))?; - - if response.status().as_u16() != expected_status_code { - return Err(meilisearch_sdk::errors::MeilisearchCommunicationError { - status_code: response.status().as_u16(), - message: response.text().await.ok(), - url, - } - .into()); - } - - response - .json() - .await - .map_err(|err| meilisearch_sdk::errors::Error::Other(err.into())) - } -} +mod http_client; #[derive(Deserialize)] struct MeilisearchResult { diff --git a/kitsune/Cargo.toml b/kitsune/Cargo.toml index 8b7342bd4..656eb6c58 100644 --- a/kitsune/Cargo.toml +++ b/kitsune/Cargo.toml @@ -107,7 +107,7 @@ trials = { path = "../lib/trials" } typed-builder = "0.18.2" url = "2.5.0" utoipa = { version = "4.2.3", features = ["axum_extras", "uuid"] } -utoipa-swagger-ui = { version = "6.0.0", features = ["axum"] } +utoipa-swagger-ui = { version = "=6.0.0", features = ["axum"] } # --- Optional dependencies ---