Skip to content

Commit

Permalink
feat(ext/http): Add support for trailers w/internal API (HTTP/2 only) (
Browse files Browse the repository at this point in the history
…#19182)

Necessary for #3326. 

Requested in #10214 as well.
  • Loading branch information
mmastrac committed May 19, 2023
1 parent 5b07522 commit 2b92efa
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 2 deletions.
50 changes: 50 additions & 0 deletions cli/tests/unit/serve_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {

const {
upgradeHttpRaw,
addTrailers,
// @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
} = Deno[Deno.internal];

Expand Down Expand Up @@ -2903,6 +2904,45 @@ Deno.test(
},
);

// TODO(mmastrac): This test should eventually use fetch, when we support trailers there.
// This test is ignored because it's flaky and relies on cURL's verbose output.
Deno.test(
{ permissions: { net: true, run: true, read: true }, ignore: true },
async function httpServerTrailers() {
const ac = new AbortController();
const listeningPromise = deferred();

const server = Deno.serve({
handler: () => {
const response = new Response("Hello World", {
headers: {
"trailer": "baz",
"transfer-encoding": "chunked",
"foo": "bar",
},
});
addTrailers(response, [["baz", "why"]]);
return response;
},
port: 4501,
signal: ac.signal,
onListen: onListen(listeningPromise),
onError: createOnErrorCb(ac),
});

// We don't have a great way to access this right now, so just fetch the trailers with cURL
const [_, stderr] = await curlRequestWithStdErr([
"http://localhost:4501/path",
"-v",
"--http2",
"--http2-prior-knowledge",
]);
assertMatch(stderr, /baz: why/);
ac.abort();
await server;
},
);

Deno.test(
{ permissions: { net: true, run: true, read: true } },
async function httpsServeCurlH2C() {
Expand Down Expand Up @@ -2948,3 +2988,13 @@ async function curlRequest(args: string[]) {
assert(success);
return new TextDecoder().decode(stdout);
}

async function curlRequestWithStdErr(args: string[]) {
const { success, stdout, stderr } = await new Deno.Command("curl", {
args,
stdout: "piped",
stderr: "piped",
}).output();
assert(success);
return [new TextDecoder().decode(stdout), new TextDecoder().decode(stderr)];
}
8 changes: 8 additions & 0 deletions ext/http/00_serve.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const {
op_http_set_response_body_text,
op_http_set_response_header,
op_http_set_response_headers,
op_http_set_response_trailers,
op_http_upgrade_raw,
op_http_upgrade_websocket_next,
op_http_wait,
Expand All @@ -75,6 +76,7 @@ const {
"op_http_set_response_body_text",
"op_http_set_response_header",
"op_http_set_response_headers",
"op_http_set_response_trailers",
"op_http_upgrade_raw",
"op_http_upgrade_websocket_next",
"op_http_wait",
Expand Down Expand Up @@ -125,6 +127,11 @@ function upgradeHttpRaw(req, conn) {
throw new TypeError("upgradeHttpRaw may only be used with Deno.serve");
}

function addTrailers(resp, headerList) {
const inner = toInnerResponse(resp);
op_http_set_response_trailers(inner.slabId, headerList);
}

class InnerRequest {
#slabId;
#context;
Expand Down Expand Up @@ -687,6 +694,7 @@ function serve(arg1, arg2) {
return { finished };
}

internals.addTrailers = addTrailers;
internals.upgradeHttpRaw = upgradeHttpRaw;

export { serve, upgradeHttpRaw };
16 changes: 16 additions & 0 deletions ext/http/http_next.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,22 @@ pub fn op_http_set_response_headers(
}
}

#[op]
pub fn op_http_set_response_trailers(
slab_id: SlabId,
trailers: Vec<(ByteString, ByteString)>,
) {
let mut http = slab_get(slab_id);
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
for (name, value) in trailers {
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = HeaderValue::from_bytes(&value).unwrap();
trailer_map.append(name, value);
}
*http.trailers().borrow_mut() = Some(trailer_map);
}

fn is_request_compressible(headers: &HeaderMap) -> Compression {
let Some(accept_encoding) = headers.get(ACCEPT_ENCODING) else {
return Compression::None;
Expand Down
1 change: 1 addition & 0 deletions ext/http/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ deno_core::extension!(
http_next::op_http_set_response_body_text,
http_next::op_http_set_response_header,
http_next::op_http_set_response_headers,
http_next::op_http_set_response_trailers,
http_next::op_http_track,
http_next::op_http_upgrade_websocket_next,
http_next::op_http_upgrade_raw,
Expand Down
17 changes: 16 additions & 1 deletion ext/http/response_body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ impl std::fmt::Debug for ResponseBytesInner {
/// required by hyper. As the API requires information about request completion (including a success/fail
/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on.
#[derive(Debug, Default)]
pub struct ResponseBytes(ResponseBytesInner, CompletionHandle);
pub struct ResponseBytes(
ResponseBytesInner,
CompletionHandle,
Rc<RefCell<Option<HeaderMap>>>,
);

impl ResponseBytes {
pub fn initialize(&mut self, inner: ResponseBytesInner) {
Expand All @@ -170,6 +174,10 @@ impl ResponseBytes {
self.1.clone()
}

pub fn trailers(&self) -> Rc<RefCell<Option<HeaderMap>>> {
self.2.clone()
}

fn complete(&mut self, success: bool) -> ResponseBytesInner {
if matches!(self.0, ResponseBytesInner::Done) {
return ResponseBytesInner::Done;
Expand Down Expand Up @@ -250,6 +258,9 @@ impl Body for ResponseBytes {
let res = loop {
let res = match &mut self.0 {
ResponseBytesInner::Done | ResponseBytesInner::Empty => {
if let Some(trailers) = self.2.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
}
ResponseBytesInner::Bytes(..) => {
Expand All @@ -271,13 +282,17 @@ impl Body for ResponseBytes {
};

if matches!(res, ResponseStreamResult::EndOfStream) {
if let Some(trailers) = self.2.borrow_mut().take() {
return std::task::Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
self.complete(true);
}
std::task::Poll::Ready(res.into())
}

fn is_end_stream(&self) -> bool {
matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty)
&& self.2.borrow_mut().is_none()
}

fn size_hint(&self) -> SizeHint {
Expand Down
13 changes: 12 additions & 1 deletion ext/http/slab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use deno_core::error::AnyError;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::upgrade::OnUpgrade;

use slab::Slab;
use std::cell::RefCell;
use std::cell::RefMut;
use std::ptr::NonNull;
use std::rc::Rc;

pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
Expand All @@ -23,6 +25,7 @@ pub struct HttpSlabRecord {
// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
trailers: Rc<RefCell<Option<HeaderMap>>>,
been_dropped: bool,
#[cfg(feature = "__zombie_http_tracking")]
alive: bool,
Expand Down Expand Up @@ -81,11 +84,14 @@ fn slab_insert_raw(
) -> SlabId {
let index = SLAB.with(|slab| {
let mut slab = slab.borrow_mut();
let body = ResponseBytes::default();
let trailers = body.trailers();
slab.insert(HttpSlabRecord {
request_info,
request_parts,
request_body,
response: Some(Response::new(ResponseBytes::default())),
response: Some(Response::new(body)),
trailers,
been_dropped: false,
promise: CompletionHandle::default(),
#[cfg(feature = "__zombie_http_tracking")]
Expand Down Expand Up @@ -182,6 +188,11 @@ impl SlabEntry {
self.self_mut().response.as_mut().unwrap()
}

/// Get a mutable reference to the trailers.
pub fn trailers(&mut self) -> &RefCell<Option<HeaderMap>> {
&self.self_mut().trailers
}

/// Take the response.
pub fn take_response(&mut self) -> Response {
self.self_mut().response.take().unwrap()
Expand Down

0 comments on commit 2b92efa

Please sign in to comment.