diff --git a/Cargo.toml b/Cargo.toml index 1aac35a55..a9cf0e4a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ ntapi = "0.3" [dev-dependencies] env_logger = { version = "0.6.2", default-features = false } rand = "0.4" +socket2 = "0.3.15" [package.metadata.docs.rs] all-features = true diff --git a/ci/azure-test-stable.yml b/ci/azure-test-stable.yml index 06c412a56..1403291d0 100644 --- a/ci/azure-test-stable.yml +++ b/ci/azure-test-stable.yml @@ -36,6 +36,7 @@ jobs: displayName: cargo ${{ parameters.cmd }} --all-features env: CI: "True" + RUST_TEST_THREADS: "1" - ${{ if eq(parameters.cmd, 'test') }}: - script: cargo doc --no-deps diff --git a/src/sys/windows/selector.rs b/src/sys/windows/selector.rs index 792a5c55a..4a383007e 100644 --- a/src/sys/windows/selector.rs +++ b/src/sys/windows/selector.rs @@ -1,11 +1,14 @@ use super::afd::{self, Afd, AfdPollInfo}; use super::io_status_block::IoStatusBlock; use super::Event; -use crate::sys::event::{ - ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS, -}; use crate::sys::Events; -use crate::Interest; + +cfg_net! { + use crate::sys::event::{ + ERROR_FLAGS, READABLE_FLAGS, READ_CLOSED_FLAGS, WRITABLE_FLAGS, WRITE_CLOSED_FLAGS, + }; + use crate::Interest; +} use miow::iocp::{CompletionPort, CompletionStatus}; use std::collections::VecDeque; @@ -226,15 +229,7 @@ impl SockState { // In mio, we have to simulate Edge-triggered behavior to match API usage. // The strategy here is to intercept all read/write from user that could cause WouldBlock usage, // then reregister the socket to reset the interests. - - // Reset readable event - if (afd_events & interests_to_afd_flags(Interest::READABLE)) != 0 { - self.user_evts &= !(interests_to_afd_flags(Interest::READABLE)); - } - // Reset writable event - if (afd_events & interests_to_afd_flags(Interest::WRITABLE)) != 0 { - self.user_evts &= !interests_to_afd_flags(Interest::WRITABLE); - } + self.user_evts &= !afd_events; Some(Event { data: self.user_data, @@ -730,16 +725,18 @@ impl Drop for SelectorInner { } } -fn interests_to_afd_flags(interests: Interest) -> u32 { - let mut flags = 0; +cfg_net! { + fn interests_to_afd_flags(interests: Interest) -> u32 { + let mut flags = 0; - if interests.is_readable() { - flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS; - } + if interests.is_readable() { + flags |= READABLE_FLAGS | READ_CLOSED_FLAGS | ERROR_FLAGS; + } - if interests.is_writable() { - flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS; - } + if interests.is_writable() { + flags |= WRITABLE_FLAGS | WRITE_CLOSED_FLAGS | ERROR_FLAGS; + } - flags -} + flags + } +} \ No newline at end of file diff --git a/tests/tcp_stream.rs b/tests/tcp_stream.rs index b3590890e..b3c314159 100644 --- a/tests/tcp_stream.rs +++ b/tests/tcp_stream.rs @@ -749,3 +749,62 @@ fn start_listener( }); (thread_handle, receiver.recv().unwrap()) } + +#[test] +fn hup() { + use mio::net::TcpListener; + + let (mut poll, mut events) = init_with_poll(); + let addr = "127.0.0.1:0".parse().unwrap(); + + let mut listener = TcpListener::bind(addr).unwrap(); + let addr = listener.local_addr().unwrap(); + poll.registry() + .register(&mut listener, Token(0), Interest::READABLE) + .unwrap(); + + let mut stream = TcpStream::connect(addr).unwrap(); + poll.registry() + .register( + &mut stream, + Token(1), + Interest::READABLE | Interest::WRITABLE, + ) + .unwrap(); + + expect_events( + &mut poll, + &mut events, + vec![ + ExpectEvent::new(Token(0), Interest::READABLE), + ExpectEvent::new(Token(1), Interest::WRITABLE), + ], + ); + + let (sock, _) = listener.accept().unwrap(); + set_linger_zero(&sock); + drop(sock); + + expect_events( + &mut poll, + &mut events, + vec![ExpectEvent::new(Token(1), Interest::READABLE)], + ); +} + +#[cfg(windows)] +fn set_linger_zero(socket: &TcpStream) { + use socket2::Socket; + use std::os::windows::io::{AsRawSocket, FromRawSocket}; + + let s = unsafe { Socket::from_raw_socket(socket.as_raw_socket()) }; + s.set_linger(Some(Duration::from_millis(0))).unwrap(); +} + +#[cfg(unix)] +fn set_linger_zero(socket: &TcpStream) { + use socket2::Socket; + + let s = unsafe { Socket::from_raw_fd(socket.as_raw_fd()) }; + s.set_linger(Some(Duration::from_millis(0))).unwrap(); +}