diff --git a/opentelemetry-datadog/Cargo.toml b/opentelemetry-datadog/Cargo.toml index fb210c3095..0761d06836 100644 --- a/opentelemetry-datadog/Cargo.toml +++ b/opentelemetry-datadog/Cargo.toml @@ -38,5 +38,7 @@ lazy_static = "1.4" [dev-dependencies] base64 = "0.13" +bytes = "1" +futures-util = "0.3" isahc = "0.9" opentelemetry = { path = "../opentelemetry", features = ["trace", "testing"] } diff --git a/opentelemetry-datadog/src/exporter/mod.rs b/opentelemetry-datadog/src/exporter/mod.rs index 05101126ce..6bd4c8fbb7 100644 --- a/opentelemetry-datadog/src/exporter/mod.rs +++ b/opentelemetry-datadog/src/exporter/mod.rs @@ -12,7 +12,7 @@ use opentelemetry::sdk::export::trace; use opentelemetry::sdk::export::trace::SpanData; use opentelemetry::trace::TraceError; use opentelemetry::{global, sdk, trace::TracerProvider}; -use opentelemetry_http::HttpClient; +use opentelemetry_http::{HttpClient, ResponseExt}; /// Default Datadog collector endpoint const DEFAULT_AGENT_ENDPOINT: &str = "http://127.0.0.1:8126"; @@ -201,7 +201,8 @@ impl trace::SpanExporter for DatadogExporter { .header(DATADOG_TRACE_COUNT_HEADER, trace_count) .body(data) .map_err::(Into::into)?; - self.client.send(req).await + let _ = self.client.send(req).await?.error_for_status()?; + Ok(()) } } diff --git a/opentelemetry-datadog/src/lib.rs b/opentelemetry-datadog/src/lib.rs index 815f09d90b..b97db7806e 100644 --- a/opentelemetry-datadog/src/lib.rs +++ b/opentelemetry-datadog/src/lib.rs @@ -81,10 +81,14 @@ //! use opentelemetry::{KeyValue, trace::Tracer}; //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; //! use opentelemetry::sdk::export::trace::ExportResult; -//! use opentelemetry_datadog::{new_pipeline, ApiVersion, Error}; //! use opentelemetry::global::shutdown_tracer_provider; -//! use opentelemetry_http::HttpClient; +//! use opentelemetry_datadog::{new_pipeline, ApiVersion, Error}; +//! use opentelemetry_http::{HttpClient, HttpError}; //! use async_trait::async_trait; +//! use bytes::Bytes; +//! use futures_util::io::AsyncReadExt as _; +//! use http::{Request, Response}; +//! use std::convert::TryInto as _; //! //! // `reqwest` and `surf` are supported through features, if you prefer an //! // alternate http client you can add support by implementing `HttpClient` as @@ -92,17 +96,20 @@ //! #[derive(Debug)] //! struct IsahcClient(isahc::HttpClient); //! +//! async fn body_to_bytes(mut body: isahc::Body) -> Result { +//! let mut bytes = Vec::with_capacity(body.len().unwrap_or(0).try_into()?); +//! let _ = body.read_to_end(&mut bytes).await?; +//! Ok(bytes.into()) +//! } +//! //! #[async_trait] //! impl HttpClient for IsahcClient { -//! async fn send(&self, request: http::Request>) -> ExportResult { -//! let result = self.0.send_async(request).await.map_err(|err| Error::Other(err.to_string()))?; -//! -//! if result.status().is_success() { -//! Ok(()) -//! } else { -//! Err(Error::Other(result.status().to_string()).into()) +//! async fn send(&self, request: Request>) -> Result, HttpError> { +//! let response = self.0.send_async(request).await?; +//! Ok(Response::builder() +//! .status(response.status()) +//! .body(body_to_bytes(response.into_body()).await?)?) //! } -//! } //! } //! //! fn main() -> Result<(), opentelemetry::trace::TraceError> { diff --git a/opentelemetry-http/Cargo.toml b/opentelemetry-http/Cargo.toml index 8802b4e607..f79d6a1a9b 100644 --- a/opentelemetry-http/Cargo.toml +++ b/opentelemetry-http/Cargo.toml @@ -10,10 +10,11 @@ license = "Apache-2.0" edition = "2018" [dependencies] -async-trait = "0.1.42" -http = "0.2.2" +async-trait = "0.1" +bytes = "1" +futures-util = { version = "0.3", default-features = false, features = ["io"] } +http = "0.2" isahc = { version = "0.9", default-features = false, optional = true } opentelemetry = { version = "0.13", path = "../opentelemetry", features = ["trace"] } reqwest = { version = "0.11", default-features = false, features = ["blocking"], optional = true } surf = { version = "2.0", default-features = false, optional = true } -thiserror = "1" diff --git a/opentelemetry-http/src/lib.rs b/opentelemetry-http/src/lib.rs index a0d131f9c9..a81585d045 100644 --- a/opentelemetry-http/src/lib.rs +++ b/opentelemetry-http/src/lib.rs @@ -1,9 +1,12 @@ use std::fmt::Debug; use async_trait::async_trait; -use http::Request; -use opentelemetry::propagation::{Extractor, Injector}; -use opentelemetry::trace::TraceError; +use bytes::Bytes; +use http::{Request, Response}; +use opentelemetry::{ + propagation::{Extractor, Injector}, + trace::TraceError, +}; pub struct HeaderInjector<'a>(pub &'a mut http::HeaderMap); @@ -35,145 +38,113 @@ impl<'a> Extractor for HeaderExtractor<'a> { } } +pub type HttpError = Box; + /// A minimal interface necessary for export spans over HTTP. /// -/// Users sometime choose http clients that relay on certain runtime. This trait -/// allows users to bring their choice of http clients. +/// Users sometime choose HTTP clients that relay on a certain async runtime. This trait allows +/// users to bring their choice of HTTP client. #[async_trait] pub trait HttpClient: Debug + Send + Sync { - /// Send a batch of spans to collectors - async fn send(&self, request: Request>) -> Result<(), TraceError>; + /// Send the specified HTTP request + /// + /// Returns the HTTP response including the status code and body. + /// + /// Returns an error if it can't connect to the server or the request could not be completed, + /// e.g. because of a timeout, infinite redirects, or a loss of connection. + async fn send(&self, request: Request>) -> Result, HttpError>; } #[cfg(feature = "reqwest")] mod reqwest { - use super::{async_trait, HttpClient, Request, TraceError}; - use opentelemetry::sdk::export::ExportError; + use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; use std::convert::TryInto; - use thiserror::Error; #[async_trait] impl HttpClient for reqwest::Client { - async fn send(&self, request: Request>) -> Result<(), TraceError> { - let request = request.try_into().map_err(ReqwestError::from)?; - let _ = self - .execute(request) - .await - .and_then(|rsp| rsp.error_for_status()) - .map_err(ReqwestError::from)?; - Ok(()) + async fn send(&self, request: Request>) -> Result, HttpError> { + let request = request.try_into()?; + let response = self.execute(request).await?; + Ok(Response::builder() + .status(response.status()) + .body(response.bytes().await?)?) } } #[async_trait] impl HttpClient for reqwest::blocking::Client { - async fn send(&self, request: Request>) -> Result<(), TraceError> { - let _ = request - .try_into() - .and_then(|req| self.execute(req)) - .and_then(|rsp| rsp.error_for_status()) - .map_err(ReqwestError::from)?; - Ok(()) - } - } - - #[derive(Debug, Error)] - #[error(transparent)] - struct ReqwestError(#[from] reqwest::Error); - - impl ExportError for ReqwestError { - fn exporter_name(&self) -> &'static str { - "reqwest" + async fn send(&self, request: Request>) -> Result, HttpError> { + let request = request.try_into()?; + let response = self.execute(request)?; + Ok(Response::builder() + .status(response.status()) + .body(response.bytes()?)?) } } } #[cfg(feature = "surf")] mod surf { - use super::{async_trait, HttpClient, Request, TraceError}; - use opentelemetry::sdk::export::ExportError; - use std::fmt::{Display, Formatter}; + use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; #[async_trait] impl HttpClient for surf::Client { - async fn send(&self, request: Request>) -> Result<(), TraceError> { + async fn send(&self, request: Request>) -> Result, HttpError> { let (parts, body) = request.into_parts(); - let uri = parts - .uri - .to_string() - .parse() - .map_err(|_err: surf::http::url::ParseError| TraceError::from("error parse url"))?; - - let req = surf::Request::builder(surf::http::Method::Post, uri) - .content_type("application/json") - .body(body); - let result = self.send(req).await.map_err::(Into::into)?; - - if result.status().is_success() { - Ok(()) - } else { - Err(SurfError(surf::Error::from_str( - result.status(), - result.status().canonical_reason(), - )) - .into()) + let method = parts.method.as_str().parse()?; + let uri = parts.uri.to_string().parse()?; + + let mut request_builder = surf::Request::builder(method, uri).body(body); + let mut prev_name = None; + for (new_name, value) in parts.headers.into_iter() { + let name = new_name.or(prev_name).expect("the first time new_name should be set and from then on we always have a prev_name"); + request_builder = request_builder.header(name.as_str(), value.to_str()?); + prev_name = Some(name); } - } - } - - #[derive(Debug)] - struct SurfError(surf::Error); - - impl ExportError for SurfError { - fn exporter_name(&self) -> &'static str { - "surf" - } - } - impl From for SurfError { - fn from(err: surf::Error) -> Self { - SurfError(err) - } - } - - impl std::error::Error for SurfError {} - - impl Display for SurfError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0.to_string()) + let mut response = self.send(request_builder).await?; + Ok(Response::builder() + .status(response.status() as u16) + .body(response.body_bytes().await?.into())?) } } } #[cfg(feature = "isahc")] mod isahc { - use super::{async_trait, HttpClient, Request, TraceError}; - use opentelemetry::sdk::export::ExportError; - use thiserror::Error; + use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response}; + use futures_util::io::AsyncReadExt as _; + use std::convert::TryInto as _; #[async_trait] impl HttpClient for isahc::HttpClient { - async fn send(&self, request: Request>) -> Result<(), TraceError> { - let res = self.send_async(request).await.map_err(IsahcError::from)?; - - if !res.status().is_success() { - return Err(TraceError::from(format!( - "Expected success response, got {:?}", - res.status() - ))); - } - - Ok(()) + async fn send(&self, request: Request>) -> Result, HttpError> { + let response = self.send_async(request).await?; + Ok(Response::builder() + .status(response.status()) + .body(body_to_bytes(response.into_body()).await?)?) } } - #[derive(Debug, Error)] - #[error(transparent)] - struct IsahcError(#[from] isahc::Error); + async fn body_to_bytes(mut body: isahc::Body) -> Result { + let mut bytes = Vec::with_capacity(body.len().unwrap_or(0).try_into()?); + let _ = body.read_to_end(&mut bytes).await?; + Ok(bytes.into()) + } +} + +/// Methods to make working with responses from the [`HttpClient`] trait easier. +pub trait ResponseExt: Sized { + /// Turn a response into an error if the HTTP status does not indicate success (200 - 299). + fn error_for_status(self) -> Result; +} - impl ExportError for IsahcError { - fn exporter_name(&self) -> &'static str { - "isahc" +impl ResponseExt for Response { + fn error_for_status(self) -> Result { + if self.status().is_success() { + Ok(self) + } else { + Err(format!("request failed with status {}", self.status()).into()) } } } diff --git a/opentelemetry-jaeger/Cargo.toml b/opentelemetry-jaeger/Cargo.toml index 45d51c192e..43af240808 100644 --- a/opentelemetry-jaeger/Cargo.toml +++ b/opentelemetry-jaeger/Cargo.toml @@ -41,8 +41,9 @@ headers = { version = "0.3.2", optional = true } surf = { version = "2.0", optional = true } [dev-dependencies] -opentelemetry = { version = "0.13", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } +bytes = "1" futures = "0.3" +opentelemetry = { version = "0.13", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } [dependencies.web-sys] version = "0.3.4" diff --git a/opentelemetry-jaeger/src/exporter/collector.rs b/opentelemetry-jaeger/src/exporter/collector.rs index fd8ae350d0..4459e88378 100644 --- a/opentelemetry-jaeger/src/exporter/collector.rs +++ b/opentelemetry-jaeger/src/exporter/collector.rs @@ -1,7 +1,7 @@ //! # HTTP Jaeger Collector Client use http::Uri; #[cfg(feature = "collector_client")] -use opentelemetry_http::HttpClient; +use opentelemetry_http::{HttpClient, ResponseExt as _}; use std::sync::atomic::AtomicUsize; /// `CollectorAsyncClientHttp` implements an async version of the @@ -68,7 +68,8 @@ mod collector_client { .expect("request should always be valid"); // Send request to collector - self.client.send(req).await + let _ = self.client.send(req).await?.error_for_status()?; + Ok(()) } } } diff --git a/opentelemetry-jaeger/src/exporter/mod.rs b/opentelemetry-jaeger/src/exporter/mod.rs index 15395ca88b..2b7a58a865 100644 --- a/opentelemetry-jaeger/src/exporter/mod.rs +++ b/opentelemetry-jaeger/src/exporter/mod.rs @@ -672,10 +672,9 @@ mod collector_client_tests { mod test_http_client { use async_trait::async_trait; - use http::Request; - use opentelemetry::sdk::export::trace::ExportResult; - use opentelemetry::trace::TraceError; - use opentelemetry_http::HttpClient; + use bytes::Bytes; + use http::{Request, Response}; + use opentelemetry_http::{HttpClient, HttpError}; use std::fmt::Debug; pub(crate) struct TestHttpClient; @@ -688,8 +687,8 @@ mod collector_client_tests { #[async_trait] impl HttpClient for TestHttpClient { - async fn send(&self, _request: Request>) -> ExportResult { - Err(TraceError::from("wrong uri set in http client")) + async fn send(&self, _request: Request>) -> Result, HttpError> { + Err("wrong uri set in http client".into()) } } } @@ -707,7 +706,7 @@ mod collector_client_tests { }); assert_eq!( format!("{:?}", res.err().unwrap()), - "Other(Custom(\"wrong uri set in http client\"))" + "Other(\"wrong uri set in http client\")" ); Ok(()) diff --git a/opentelemetry-zipkin/Cargo.toml b/opentelemetry-zipkin/Cargo.toml index 0ead66676f..1dbc71165f 100644 --- a/opentelemetry-zipkin/Cargo.toml +++ b/opentelemetry-zipkin/Cargo.toml @@ -40,5 +40,7 @@ surf = { version = "2.0", optional = true } thiserror = { version = "1.0"} [dev-dependencies] +bytes = "1" +futures-util = "0.3" isahc = "=0.9.6" opentelemetry = { version = "0.13", default-features = false, features = ["trace", "testing"], path = "../opentelemetry" } diff --git a/opentelemetry-zipkin/src/exporter/uploader.rs b/opentelemetry-zipkin/src/exporter/uploader.rs index efce27c4b5..cd790e47b0 100644 --- a/opentelemetry-zipkin/src/exporter/uploader.rs +++ b/opentelemetry-zipkin/src/exporter/uploader.rs @@ -3,7 +3,7 @@ use crate::exporter::model::span::Span; use crate::exporter::Error; use http::{header::CONTENT_TYPE, Method, Request, Uri}; use opentelemetry::sdk::export::trace::ExportResult; -use opentelemetry_http::HttpClient; +use opentelemetry_http::{HttpClient, ResponseExt}; use std::fmt::Debug; #[derive(Debug)] @@ -42,6 +42,7 @@ impl JsonV2Client { .header(CONTENT_TYPE, "application/json") .body(serde_json::to_vec(&spans).unwrap_or_default()) .map_err::(Into::into)?; - self.client.send(req).await + let _ = self.client.send(req).await?.error_for_status()?; + Ok(()) } } diff --git a/opentelemetry-zipkin/src/lib.rs b/opentelemetry-zipkin/src/lib.rs index 8b7ca36795..8587ca0c10 100644 --- a/opentelemetry-zipkin/src/lib.rs +++ b/opentelemetry-zipkin/src/lib.rs @@ -91,8 +91,12 @@ //! use opentelemetry::sdk::{trace::{self, IdGenerator, Sampler}, Resource}; //! use opentelemetry::sdk::export::trace::ExportResult; //! use opentelemetry::global; -//! use opentelemetry_http::HttpClient; +//! use opentelemetry_http::{HttpClient, HttpError}; //! use async_trait::async_trait; +//! use bytes::Bytes; +//! use futures_util::io::AsyncReadExt as _; +//! use http::{Request, Response}; +//! use std::convert::TryInto as _; //! use std::error::Error; //! //! // `reqwest` and `surf` are supported through features, if you prefer an @@ -101,17 +105,20 @@ //! #[derive(Debug)] //! struct IsahcClient(isahc::HttpClient); //! +//! async fn body_to_bytes(mut body: isahc::Body) -> Result { +//! let mut bytes = Vec::with_capacity(body.len().unwrap_or(0).try_into()?); +//! let _ = body.read_to_end(&mut bytes).await?; +//! Ok(bytes.into()) +//! } +//! //! #[async_trait] //! impl HttpClient for IsahcClient { -//! async fn send(&self, request: http::Request>) -> ExportResult { -//! let result = self.0.send_async(request).await.map_err(|err| opentelemetry_zipkin::Error::Other(err.to_string()))?; -//! -//! if result.status().is_success() { -//! Ok(()) -//! } else { -//! Err(opentelemetry_zipkin::Error::Other(result.status().to_string()).into()) +//! async fn send(&self, request: Request>) -> Result, HttpError> { +//! let response = self.0.send_async(request).await?; +//! Ok(Response::builder() +//! .status(response.status()) +//! .body(body_to_bytes(response.into_body()).await?)?) //! } -//! } //! } //! //! fn main() -> Result<(), Box> {