Skip to content

Commit

Permalink
chore(server): make AddrIncoming stream item an unnameable type
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Nov 10, 2017
1 parent b60d4cd commit 68e0df7
Showing 1 changed file with 65 additions and 3 deletions.
68 changes: 65 additions & 3 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use http;

use tokio_io::{AsyncRead, AsyncWrite};
use tokio::reactor::{Core, Handle, Timeout};
use tokio::net::{TcpListener, TcpStream};
use tokio::net::TcpListener;
pub use tokio_service::{NewService, Service};

use proto;
Expand Down Expand Up @@ -564,14 +564,14 @@ impl AddrIncoming {
}

impl Stream for AddrIncoming {
type Item = TcpStream;
type Item = self::addr_stream::AddrStream;
type Error = ::std::io::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.listener.accept() {
Ok((socket, _addr)) => {
return Ok(Async::Ready(Some(socket)));
return Ok(Async::Ready(Some(self::addr_stream::new(socket))));
},
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
Err(e) => debug!("internal error: {:?}", e),
Expand All @@ -580,6 +580,68 @@ impl Stream for AddrIncoming {
}
}

mod addr_stream {
use std::io::{self, Read, Write};
use bytes::{Buf, BufMut};
use futures::Poll;
use tokio::net::TcpStream;
use tokio_io::{AsyncRead, AsyncWrite};

pub fn new(tcp: TcpStream) -> AddrStream {
AddrStream {
inner: tcp,
}
}

#[derive(Debug)]
pub struct AddrStream {
inner: TcpStream,
}

impl Read for AddrStream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}

impl Write for AddrStream {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}

#[inline]
fn flush(&mut self ) -> io::Result<()> {
self.inner.flush()
}
}

impl AsyncRead for AddrStream {
#[inline]
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
self.inner.prepare_uninitialized_buffer(buf)
}

#[inline]
fn read_buf<B: BufMut>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.read_buf(buf)
}
}

impl AsyncWrite for AddrStream {
#[inline]
fn shutdown(&mut self) -> Poll<(), io::Error> {
AsyncWrite::shutdown(&mut self.inner)
}

#[inline]
fn write_buf<B: Buf>(&mut self, buf: &mut B) -> Poll<usize, io::Error> {
self.inner.write_buf(buf)
}
}
}

struct NotifyService<S> {
inner: S,
info: Weak<RefCell<Info>>,
Expand Down

0 comments on commit 68e0df7

Please sign in to comment.