Skip to content

Commit

Permalink
restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed May 8, 2024
1 parent 20874a6 commit 5997bc2
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 125 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 121 additions & 0 deletions crates/kitsune-search/src/meilisearch/http_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::Stream;
use http::header::CONTENT_TYPE;
use kitsune_http_client::Body as HttpBody;
use meilisearch_sdk::{errors::Error as MeilisearchError, request::Method};
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
io,
pin::Pin,
task::{self, ready, Poll},
};

const BUFFER_SIZE: usize = 1024;

pin_project! {
struct AsyncReadBridge<R> {
#[pin]
inner: R,
buf: Vec<u8>,
}
}

impl<R> AsyncReadBridge<R> {
pub fn new(reader: R, buf_size: usize) -> Self {
Self {
inner: reader,
buf: vec![0; buf_size],
}
}

Check warning on line 31 in crates/kitsune-search/src/meilisearch/http_client.rs

View check run for this annotation

Codecov / codecov/patch

crates/kitsune-search/src/meilisearch/http_client.rs#L26-L31

Added lines #L26 - L31 were not covered by tests
}

impl<R> Stream for AsyncReadBridge<R>
where
R: futures_io::AsyncRead,
{
type Item = io::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let amount_read = match ready!(this.inner.poll_read(cx, this.buf)) {
Ok(0) => return Poll::Ready(None),
Ok(amount_read) => amount_read,
Err(err) => return Poll::Ready(Some(Err(err))),

Check warning on line 45 in crates/kitsune-search/src/meilisearch/http_client.rs

View check run for this annotation

Codecov / codecov/patch

crates/kitsune-search/src/meilisearch/http_client.rs#L40-L45

Added lines #L40 - L45 were not covered by tests
};

let bytes = Bytes::copy_from_slice(&this.buf[..amount_read]);
this.buf.clear();
this.buf.fill(0);

Poll::Ready(Some(Ok(bytes)))
}

Check warning on line 53 in crates/kitsune-search/src/meilisearch/http_client.rs

View check run for this annotation

Codecov / codecov/patch

crates/kitsune-search/src/meilisearch/http_client.rs#L48-L53

Added lines #L48 - L53 were not covered by tests
}

#[derive(Clone)]
pub struct HttpClient {
pub inner: kitsune_http_client::Client,
}

#[async_trait]
impl meilisearch_sdk::request::HttpClient for HttpClient {
async fn stream_request<
Query: Serialize + Send + Sync,
Body: futures_io::AsyncRead + Send + Sync + 'static,
Output: DeserializeOwned + 'static,
>(
&self,
url: &str,
method: Method<Query, Body>,
content_type: &str,
expected_status_code: u16,
) -> Result<Output, MeilisearchError> {
let url = format!(
"{url}?{}",
serde_urlencoded::to_string(method.query())
.map_err(|err| MeilisearchError::Other(err.into()))?
);

let request = http::Request::builder()
.uri(&url)
.header(CONTENT_TYPE, content_type);

let request = match method {
Method::Get { .. } => request.method(http::Method::GET),
Method::Post { .. } => request.method(http::Method::POST),
Method::Patch { .. } => request.method(http::Method::PATCH),
Method::Put { .. } => request.method(http::Method::PUT),
Method::Delete { .. } => request.method(http::Method::DELETE),
};

let body = method
.map_body(|body| HttpBody::stream(AsyncReadBridge::new(body, BUFFER_SIZE)))
.into_body()
.unwrap_or_default();

let request = request
.body(body)
.map_err(|err| MeilisearchError::Other(err.into()))?;

let response = self
.inner
.execute(request)
.await
.map_err(|err| MeilisearchError::Other(err.into()))?;

if response.status().as_u16() != expected_status_code {
return Err(meilisearch_sdk::errors::MeilisearchCommunicationError {
status_code: response.status().as_u16(),
message: response.text().await.ok(),
url,
}
.into());
}

response
.json()
.await
.map_err(|err| MeilisearchError::Other(err.into()))
}

Check warning on line 120 in crates/kitsune-search/src/meilisearch/http_client.rs

View check run for this annotation

Codecov / codecov/patch

crates/kitsune-search/src/meilisearch/http_client.rs#L73-L120

Added lines #L73 - L120 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -1,128 +1,11 @@
use self::http_client::HttpClient;
use super::{Result, SearchBackend, SearchIndex, SearchItem, SearchResultReference};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::Stream;
use http::header::CONTENT_TYPE;
use meilisearch_sdk::{client::Client, indexes::Index, settings::Settings};
use pin_project_lite::pin_project;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde::Deserialize;
use speedy_uuid::Uuid;
use std::{
io,
pin::Pin,
task::{self, ready, Poll},
};
use strum::IntoEnumIterator;

const BUFFER_SIZE: usize = 1024;

pin_project! {
struct AsyncReadBridge<R> {
#[pin]
inner: R,
buf: Vec<u8>,
}
}

impl<R> AsyncReadBridge<R> {
pub fn new(reader: R, buf_size: usize) -> Self {
Self {
inner: reader,
buf: vec![0; buf_size],
}
}
}

impl<R> Stream for AsyncReadBridge<R>
where
R: futures_io::AsyncRead,
{
type Item = io::Result<Bytes>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let amount_read = match ready!(this.inner.poll_read(cx, this.buf)) {
Ok(0) => return Poll::Ready(None),
Ok(amount_read) => amount_read,
Err(err) => return Poll::Ready(Some(Err(err))),
};

let bytes = Bytes::copy_from_slice(&this.buf[..amount_read]);
this.buf.clear();
this.buf.fill(0);

Poll::Ready(Some(Ok(bytes)))
}
}

#[derive(Clone)]
struct HttpClient {
inner: kitsune_http_client::Client,
}

#[async_trait]
impl meilisearch_sdk::request::HttpClient for HttpClient {
async fn stream_request<
Query: Serialize + Send + Sync,
Body: futures_io::AsyncRead + Send + Sync + 'static,
Output: DeserializeOwned + 'static,
>(
&self,
url: &str,
method: meilisearch_sdk::request::Method<Query, Body>,
content_type: &str,
expected_status_code: u16,
) -> Result<Output, meilisearch_sdk::errors::Error> {
let url = format!(
"{url}?{}",
serde_urlencoded::to_string(method.query())
.map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))?
);

let request = http::Request::builder()
.uri(&url)
.header(CONTENT_TYPE, content_type);

let request = match method {
meilisearch_sdk::request::Method::Get { .. } => request.method(http::Method::GET),
meilisearch_sdk::request::Method::Post { .. } => request.method(http::Method::POST),
meilisearch_sdk::request::Method::Patch { .. } => request.method(http::Method::PATCH),
meilisearch_sdk::request::Method::Put { .. } => request.method(http::Method::PUT),
meilisearch_sdk::request::Method::Delete { .. } => request.method(http::Method::DELETE),
};

let body = method
.map_body(|body| {
kitsune_http_client::Body::stream(AsyncReadBridge::new(body, BUFFER_SIZE))
})
.into_body()
.unwrap_or_else(kitsune_http_client::Body::empty);

let request = request
.body(body)
.map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))?;

let response = self
.inner
.execute(request)
.await
.map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))?;

if response.status().as_u16() != expected_status_code {
return Err(meilisearch_sdk::errors::MeilisearchCommunicationError {
status_code: response.status().as_u16(),
message: response.text().await.ok(),
url,
}
.into());
}

response
.json()
.await
.map_err(|err| meilisearch_sdk::errors::Error::Other(err.into()))
}
}
mod http_client;

#[derive(Deserialize)]
struct MeilisearchResult {
Expand Down
2 changes: 1 addition & 1 deletion kitsune/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ trials = { path = "../lib/trials" }
typed-builder = "0.18.2"
url = "2.5.0"
utoipa = { version = "4.2.3", features = ["axum_extras", "uuid"] }
utoipa-swagger-ui = { version = "6.0.0", features = ["axum"] }
utoipa-swagger-ui = { version = "=6.0.0", features = ["axum"] }

# --- Optional dependencies ---

Expand Down

0 comments on commit 5997bc2

Please sign in to comment.