diff --git a/src/concurrency/blocking_io.rs b/src/concurrency/blocking_io.rs index 35d7474347..99c1e6c537 100644 --- a/src/concurrency/blocking_io.rs +++ b/src/concurrency/blocking_io.rs @@ -1,12 +1,15 @@ +use std::cell::RefMut; use std::collections::BTreeMap; use std::io; +use std::ops::BitOrAssign; use std::time::Duration; use mio::event::Source; use mio::{Events, Interest, Poll, Token}; -use rustc_data_structures::fx::FxHashMap; -use crate::shims::{FdId, FileDescriptionRef}; +use crate::shims::{ + EpollEvalContextExt, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef, +}; use crate::*; /// Capacity of the event queue which can be polled at a time. @@ -14,18 +17,102 @@ use crate::*; /// this value can be set rather low. const IO_EVENT_CAPACITY: usize = 16; -/// Trait for values that contain a mio [`Source`]. -pub trait WithSource { +/// Trait for file descriptions that contain a mio [`Source`]. +pub trait SourceFileDescription: FileDescription { /// Invoke `f` on the source inside `self`. fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()>; + + /// Get a mutable reference to the readiness of the source. + fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness>; +} + +/// An I/O interest for a blocked thread. +#[derive(Debug)] +pub enum BlockingIoInterest { + /// The blocked thread is interested in [`Interest::READABLE`]. + Read, + /// The blocked thread is interested in [`Interest::WRITABLE`]. + Write, + /// The blocked thread is interested in [`Interest::READABLE`] and + /// [`Interest::WRITABLE`]. + ReadWrite, +} + +/// Struct reflecting the readiness of a source file description. +#[derive(Debug)] +pub struct BlockingIoSourceReadiness { + /// Boolean whether the source is currently readable. + pub readable: bool, + /// Boolean whether the source is currently writable. + pub writable: bool, + /// Boolean whether the read end of the source has been + /// closed. + pub read_closed: bool, + /// Boolean whether the write end of the source has been + /// closed. + pub write_closed: bool, + /// Boolean whether the source currently has an error. + pub error: bool, } -/// An interest receiver defines the action that should be taken when -/// the associated [`Interest`] is fulfilled. -#[derive(Debug, Hash, PartialEq, Clone, Copy, Eq, PartialOrd, Ord)] -pub enum InterestReceiver { - /// The specified thread should be unblocked. - UnblockThread(ThreadId), +impl BlockingIoSourceReadiness { + pub fn empty() -> Self { + Self { + readable: false, + writable: false, + read_closed: false, + write_closed: false, + error: false, + } + } + + /// Check whether the current readiness fulfills the blocking I/O interest of + /// `interest`. + /// This function also returns `true` if the error readiness is set + /// even when the requested interest might not be fulfilled. + /// Usually, a source with the error readiness set won't become readable or writable + /// anymore. So, adding the error interest implicitly prevents that the thread is + /// blocked indefinitely in such cases. + pub fn fulfills_interest(&self, interest: &BlockingIoInterest) -> bool { + match interest { + BlockingIoInterest::Read => self.readable || self.error, + BlockingIoInterest::Write => self.writable || self.error, + BlockingIoInterest::ReadWrite => self.readable || self.writable || self.error, + } + } +} + +impl BitOrAssign for BlockingIoSourceReadiness { + fn bitor_assign(&mut self, rhs: Self) { + self.readable |= rhs.readable; + self.writable |= rhs.writable; + self.read_closed |= rhs.read_closed; + self.write_closed |= rhs.write_closed; + self.error |= rhs.error; + } +} + +impl From<&mio::event::Event> for BlockingIoSourceReadiness { + fn from(event: &mio::event::Event) -> Self { + Self { + readable: event.is_readable(), + writable: event.is_writable(), + read_closed: event.is_read_closed(), + write_closed: event.is_write_closed(), + error: event.is_error(), + } + } +} + +struct BlockingIoSource { + /// The source file description which is registered into the poll. + /// We only store weak references such that source file descriptions + /// can be destroyed whilst they are registered. However, they are required + /// to deregister themselves when [`FileDescription::destroy`] is called. + fd: WeakFileDescriptionRef, + /// The threads which are blocked on the I/O source, and the interest indicating + /// when they should be unblocked. + blocked_threads: BTreeMap, } /// Manager for managing blocking host I/O in a non-blocking manner. @@ -44,10 +131,9 @@ pub struct BlockingIoManager { /// This is not part of the state and only stored to avoid allocating a /// new buffer for every poll. events: Events, - /// Map from source ids to the actual sources and their registered receivers - /// together with their associated interests. - sources: - BTreeMap, FxHashMap)>, + /// Map from source file description ids to the actual sources and their + /// blocked threads. + sources: BTreeMap, } impl BlockingIoManager { @@ -70,133 +156,195 @@ impl BlockingIoManager { /// specified duration. /// - If the timeout is [`None`] the poll blocks indefinitely until an event occurs. /// - /// Returns the interest receivers for all file descriptions which received an I/O event together - /// with the file description they were registered for. - pub fn poll( - &mut self, + /// Unblocks all threads whose interests are currently fulfilled. Note that the events are handled + /// in a level-triggered way. This means that threads whose interests were already fulfilled + /// before the poll will be unblocked again. The unblock callback functions need to manually + /// remove the interest from the poll to prevent multiple unblocks. + pub fn poll_and_unblock<'tcx>( + ecx: &mut MiriInterpCx<'tcx>, timeout: Option, - ) -> Result)>, io::Error> { - let poll = - self.poll.as_mut().expect("Blocking I/O should not be called with isolation enabled"); + ) -> InterpResult<'tcx, Result<(), io::Error>> { + let poll = ecx + .machine + .blocking_io + .poll + .as_mut() + .expect("Blocking I/O should not be called with isolation enabled"); // Poll for new I/O events from OS and store them in the events buffer. - poll.poll(&mut self.events, timeout)?; + if let Err(err) = poll.poll(&mut ecx.machine.blocking_io.events, timeout) { + return interp_ok(Err(err)); + }; - let ready = self + let event_fds = ecx + .machine + .blocking_io .events .iter() - .flat_map(|event| { + .map(|event| { let token = event.token(); // We know all tokens are valid `FdId`. let fd_id = FdId::new_unchecked(token.0); - let (source, interests) = - self.sources.get(&fd_id).expect("Source should be registered"); - assert_eq!(source.id(), fd_id); - // Because we allow spurious wake-ups, we mark all interests as ready even - // though some may not have been fulfilled. - interests.keys().map(move |receiver| (*receiver, source.clone())) + let source = ecx + .machine + .blocking_io + .sources + .get(&fd_id) + .expect("Source should be registered"); + let fd = source.fd.upgrade().expect( + "Source file description shouldn't be destroyed whilst being registered", + ); + + assert_eq!(fd.id(), fd_id); + // Update the readiness of the source. + *fd.get_readiness_mut() |= BlockingIoSourceReadiness::from(event); + // Put FD into `event_fds` list. + fd + }) + .collect::>(); + + // Update epoll readiness for all file descriptions which received an event. + for fd in event_fds.into_iter() { + ecx.update_epoll_active_events(fd, false)?; + } + + // List of all thread id's whose interests are currently fulfilled. + // This also includes thread id's whose interests were already + // fulfilled before the `poll` invocation. + let ready_threads = ecx + .machine + .blocking_io + .sources + .values() + .flat_map(|source| { + source + .blocked_threads + .iter() + .filter_map(|(thread_id, interest)| { + source + .fd + .upgrade() + .expect( + "Source file description shouldn't be destroyed whilst being registered", + ) + .get_readiness_mut() + .fulfills_interest(interest) + .then_some(thread_id) + }) + .copied() }) .collect::>(); - // Deregister all ready sources as we only want to receive one event per receiver. - ready.iter().for_each(|(receiver, source)| self.deregister(source.id(), *receiver)); + // Unblock all threads whose interests are currently fulfilled. + for thread_id in ready_threads.into_iter() { + ecx.unblock_thread(thread_id, BlockReason::IO)?; + } - Ok(ready) + interp_ok(Ok(())) } - /// Register an interest for a blocking I/O source. - /// - /// As the OS can always produce spurious wake-ups, it's the callers responsibility to - /// verify the requested I/O interests are really ready and to register again if they're not. - /// - /// It's assumed that no interest is already registered for this source with the same reason! - pub fn register( - &mut self, - source_fd: FileDescriptionRef, - receiver: InterestReceiver, - interest: Interest, - ) { + /// Get whether a source file description is currently registered in the + /// blocking I/O poll. + /// This can also be used to check whether a file description is a host + /// I/O source. + pub fn contains_source(&self, source_id: &FdId) -> bool { + self.sources.contains_key(source_id) + } + + /// Register a source file description to the blocking I/O poll. + pub fn register(&mut self, source_fd: FileDescriptionRef) { let poll = self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled"); let id = source_fd.id(); let token = Token(id.to_usize()); - let Some((_, current_interests)) = self.sources.get_mut(&id) else { - // The source is not yet registered. + // All possible interests. + // We only care about the readable and writable interests because those are the only + // interests which are available on all platforms. Internally, mio also + // registers an error interest. + let interest = Interest::READABLE | Interest::WRITABLE; - // Treat errors from registering as fatal. On UNIX hosts this can only - // fail due to system resource errors (e.g. ENOMEM or ENOSPC). - source_fd - .with_source(&mut |source| poll.registry().register(source, token, interest)) - .unwrap(); + // Treat errors from registering as fatal. On UNIX hosts this can only + // fail due to system resource errors (e.g. ENOMEM or ENOSPC) or when the source is already registered. + source_fd + .with_source(&mut |source| poll.registry().register(source, token, interest)) + .unwrap(); - self.sources.insert(id, (source_fd, FxHashMap::from_iter([(receiver, interest)]))); - return; + let source = BlockingIoSource { + fd: FileDescriptionRef::downgrade(&source_fd), + blocked_threads: BTreeMap::default(), }; - // The source is already registered. We need to check whether we need to - // reregister because the provided interest contains new interests for the source. - - let old_interest = - interest_union(current_interests).expect("Source should contain at least one interest"); - - current_interests - .try_insert(receiver, interest) - .unwrap_or_else(|_| panic!("Receiver should be unique")); - - let new_interest = old_interest.add(interest); - - // Reregister the source since the overall interests might have changed. - - // Treat errors from reregistering as fatal. On UNIX hosts this can only - // fail due to system resource errors (e.g. ENOMEM or ENOSPC). - source_fd - .with_source(&mut |source| poll.registry().reregister(source, token, new_interest)) - .unwrap(); + self.sources + .try_insert(id, source) + .unwrap_or_else(|_| panic!("Source should not already be registered")); } - /// Deregister an interest from a blocking I/O source. + /// Deregister a source file description from the blocking I/O poll. /// - /// The receiver is assumed to be registered for the provided source! - pub fn deregister(&mut self, source_id: FdId, receiver: InterestReceiver) { + /// It's assumed that the file description with id `source_id` is already + /// removed from the file description table. + pub fn deregister(&mut self, source_id: FdId, source: impl SourceFileDescription) { let poll = self.poll.as_ref().expect("Blocking I/O should not be called with isolation enabled"); - let token = Token(source_id.to_usize()); - let (fd, current_interests) = - self.sources.get_mut(&source_id).expect("Source should be registered"); - - current_interests - .remove(&receiver) - .unwrap_or_else(|| panic!("Receiver should be registered for source")); - - let Some(new_interest) = interest_union(current_interests) else { - // There are no longer any interests in this source. - // We can thus deregister the source from the poll. + // We cannot use the stored `fd` because it's only a `WeakFileDescriptionRef` and the + // file description should already be removed from the file description table and thus + // we can no longer upgrade it. + let stored_source = self.sources.remove(&source_id).expect("Source should be registered"); + // Ensure that the source file description is already removed from the file + // description table. + assert!( + stored_source.fd.upgrade().is_none(), + "Sources must only be deregistered when they are destroyed" + ); - // Treat errors from deregistering as fatal. On UNIX hosts this can only - // fail due to system resource errors (e.g. ENOMEM or ENOSPC). - fd.with_source(&mut |source| poll.registry().deregister(source)).unwrap(); - self.sources.remove(&source_id); - return; - }; + // Treat errors from deregistering as fatal. On UNIX hosts this can only + // fail due to system resource errors (e.g. ENOMEM or ENOSPC). + source.with_source(&mut |source| poll.registry().deregister(source)).unwrap(); + } - // Reregister the source since the overall interests might have changed. + /// Add a new blocked thread to a registered source. The thread gets unblocked + /// once its [`BlockingIoInterest`] is fulfilled when calling + /// [`BlockingIoManager::poll_and_unblock`]. + /// + /// Note that an error interest is implicitly added to `interest`. + /// This means that the thread will also be unblocked when the error + /// readiness gets set for the source even when the requested interest + /// might not be fulfilled. More about that can be read in the + /// [`BlockingIoSourceReadiness::fulfills_interest`] doc comment. + /// + /// Also, as the OS can always produce spurious wake-ups, it's the callers responsibility + /// to verify the requested I/O interests are really fulfilled when an event for this + /// thread is returned from [`BlockingIoManager::poll_and_unblock`]. + /// + /// It's assumed that the thread of `thread_id` isn't already blocked on + /// the source with id `source_id` and that this source is currently + /// registered. + pub fn add_blocked_thread( + &mut self, + source_id: FdId, + thread_id: ThreadId, + interest: BlockingIoInterest, + ) { + let source = self.sources.get_mut(&source_id).expect("Source should be registered"); - // Treat errors from reregistering as fatal. On UNIX hosts this can only - // fail due to system resource errors (e.g. ENOMEM or ENOSPC). - fd.with_source(&mut |source| poll.registry().reregister(source, token, new_interest)) - .unwrap(); + source + .blocked_threads + .try_insert(thread_id, interest) + .expect("Thread cannot be blocked multiple times on the same source"); } -} -/// Get the union of all interests for a source. Returns `None` if the map is empty. -fn interest_union(interests: &FxHashMap) -> Option { - interests - .values() - .copied() - .fold(None, |acc, interest| acc.map(|acc: Interest| acc.add(interest)).or(Some(interest))) + /// Remove a blocked thread from a registered source. + /// + /// It's assumed that the thread of `thread_id` is blocked on the + /// source with id `source_id` and that this source is currently + /// registered. + pub fn remove_blocked_thread(&mut self, source_id: FdId, thread_id: ThreadId) { + let source = self.sources.get_mut(&source_id).expect("Source should be registered"); + source.blocked_threads.remove(&thread_id).expect("Thread should be blocked on source"); + } } impl<'tcx> EvalContextExt<'tcx> for MiriInterpCx<'tcx> {} @@ -205,23 +353,28 @@ pub trait EvalContextExt<'tcx>: MiriInterpCxExt<'tcx> { /// are fulfilled or the optional timeout exceeded. /// The callback will be invoked when the thread gets unblocked. /// - /// There can be spurious wake-ups by the OS and thus it's the callers + /// Note that an error interest is implicitly added to `interest`. + /// This means that the thread will also be unblocked when the error + /// readiness gets set for the source even when the requested interest + /// might not be fulfilled. More about that can be read in the + /// [`BlockingIoSourceReadiness::fulfills_interest`] doc comment. + /// + /// There can also be spurious wake-ups by the OS and thus it's the callers /// responsibility to verify that the requested I/O interests are /// really ready and to block again if they're not. + /// + /// It's the callers responsibility to remove the [`BlockingIoInterest`] + /// from the blocking I/O manager in the callback function. #[inline] fn block_thread_for_io( &mut self, - source_fd: FileDescriptionRef, - interests: Interest, + source_id: FdId, + interest: BlockingIoInterest, timeout: Option<(TimeoutClock, TimeoutAnchor, Duration)>, callback: DynUnblockCallback<'tcx>, ) { let this = self.eval_context_mut(); - this.machine.blocking_io.register( - source_fd, - InterestReceiver::UnblockThread(this.active_thread()), - interests, - ); + this.machine.blocking_io.add_blocked_thread(source_id, this.active_thread(), interest); this.block_thread(BlockReason::IO, timeout, callback); } } diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index 3b0b2b26c1..1bcbbbf8bf 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -18,8 +18,7 @@ use rustc_span::{DUMMY_SP, Span}; use rustc_target::spec::Os; use crate::concurrency::GlobalDataRaceHandler; -use crate::concurrency::blocking_io::InterestReceiver; -use crate::shims::tls; +use crate::shims::{Epoll, EpollEvalContextExt, FileDescriptionRef, tls}; use crate::*; #[derive(Clone, Copy, Debug, PartialEq)] @@ -108,7 +107,7 @@ pub enum BlockReason { /// Blocked on an InitOnce. InitOnce, /// Blocked on epoll. - Epoll, + Epoll { epfd: FileDescriptionRef }, /// Blocked on eventfd. Eventfd, /// Blocked on virtual socket. @@ -779,7 +778,9 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { .filter(|(_id, thread)| thread.state.is_enabled()); // Pick a new thread, and switch to it. let new_thread = if thread_manager.fixed_scheduling { - threads_iter.next() + let next = threads_iter.next(); + drop(threads_iter); + next } else { threads_iter.choose(rng) }; @@ -800,48 +801,56 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { if thread_manager.threads[thread_manager.active_thread].state.is_enabled() { return interp_ok(SchedulingAction::ExecuteStep); } + // We have not found a thread to execute. - if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) { + let threads = &this.machine.threads.threads; + + if threads.iter().all(|thread| thread.state.is_terminated()) { unreachable!("all threads terminated without the main thread terminating?!"); } else if let Some(sleep_time) = potential_sleep_time { // All threads are currently blocked, but we have unexecuted // timeout_callbacks, which may unblock some of the threads. Hence, // sleep until the first callback. interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time))) - } else if thread_manager - .threads - .iter() - .any(|thread| thread.state.is_blocked_on(&BlockReason::IO)) - { - // At least one thread is blocked on host I/O but doesn't - // have a timeout set. Hence, we sleep indefinitely in the - // hope that eventually an I/O event for this thread happens. + } else if threads.iter().any(|thread| this.is_thread_blocked_on_host(thread)) { + // At least one thread doesn't have a timeout set, and is blocked on host I/O or is waiting on an + // epoll instance which contains a host source interest. Hence, we sleep indefinitely in the + // hope that eventually an I/O event happens. interp_ok(SchedulingAction::SleepAndWaitForIo(None)) } else { throw_machine_stop!(TerminationInfo::GlobalDeadlock); } } + /// Check whether the provided thread is currently blocked on host I/O. + /// This means, it's either blocked on an I/O operation directly or it's + /// blocked on an epoll instance which contains a host source interest. + fn is_thread_blocked_on_host(&self, thread: &Thread<'tcx>) -> bool { + let this = self.eval_context_ref(); + match &thread.state { + ThreadState::Blocked { reason: BlockReason::IO, .. } => true, + ThreadState::Blocked { reason: BlockReason::Epoll { epfd }, .. } => + this.has_epoll_host_interests(epfd), + _ => false, + } + } + /// Poll for I/O events until either an I/O event happened or the timeout expired. /// The different timeout values are described in [`BlockingIoManager::poll`]. + /// + /// Unblocks all threads which are blocked on I/O and whose I/O interests + /// are currently fulfilled. fn poll_and_unblock(&mut self, timeout: Option) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let ready = match this.machine.blocking_io.poll(timeout) { - Ok(ready) => ready, + match BlockingIoManager::poll_and_unblock(this, timeout)? { + Ok(_) => interp_ok(()), // We can ignore errors originating from interrupts; that's just a spurious wakeup. - Err(e) if e.kind() == io::ErrorKind::Interrupted => return interp_ok(()), + Err(e) if e.kind() == io::ErrorKind::Interrupted => interp_ok(()), // For other errors we panic. On Linux and BSD hosts this should only be // reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred. Err(e) => panic!("unexpected error while polling: {e}"), - }; - - ready.into_iter().try_for_each(|(receiver, _source)| { - match receiver { - InterestReceiver::UnblockThread(thread_id) => - this.unblock_thread(thread_id, BlockReason::IO), - } - }) + } } /// Find all threads with expired timeouts, unblock them and execute their timeout callbacks. diff --git a/src/lib.rs b/src/lib.rs index 86816e5f78..944ff933ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -134,7 +134,10 @@ pub use crate::borrow_tracker::{ BorTag, BorrowTrackerMethod, EvalContextExt as _, TreeBorrowsParams, }; pub use crate::clock::{Instant, MonotonicClock}; -pub use crate::concurrency::blocking_io::{BlockingIoManager, EvalContextExt as _, WithSource}; +pub use crate::concurrency::blocking_io::{ + BlockingIoInterest, BlockingIoManager, BlockingIoSourceReadiness, EvalContextExt as _, + SourceFileDescription, +}; pub use crate::concurrency::cpu_affinity::MAX_CPUS; pub use crate::concurrency::data_race::{ AtomicFenceOrd, AtomicReadOrd, AtomicRwOrd, AtomicWriteOrd, EvalContextExt as _, diff --git a/src/shims/files.rs b/src/shims/files.rs index 04b84e6f3e..26f98a5f2b 100644 --- a/src/shims/files.rs +++ b/src/shims/files.rs @@ -61,6 +61,14 @@ impl FileDescriptionRef { } } +impl PartialEq for FileDescriptionRef { + fn eq(&self, other: &Self) -> bool { + self.0.id == other.0.id + } +} + +impl Eq for FileDescriptionRef {} + /// Holds a weak reference to the actual file description. #[derive(Debug)] pub struct WeakFileDescriptionRef(Weak>); diff --git a/src/shims/mod.rs b/src/shims/mod.rs index e15134fa01..551f7b2eff 100644 --- a/src/shims/mod.rs +++ b/src/shims/mod.rs @@ -23,10 +23,10 @@ pub mod time; pub mod tls; pub mod unwind; -pub use self::files::{FdId, FdTable, FileDescriptionRef}; +pub use self::files::{FdId, FdTable, FileDescription, FileDescriptionRef, WeakFileDescriptionRef}; #[cfg(all(feature = "native-lib", unix))] pub use self::native_lib::trace::{init_sv, register_retcode_sv}; -pub use self::unix::{DirTable, EpollInterestTable}; +pub use self::unix::{DirTable, Epoll, EpollEvalContextExt, EpollInterestTable}; /// What needs to be done after emulating an item (a shim or an intrinsic) is done. pub enum EmulateItemResult { diff --git a/src/shims/unix/fd.rs b/src/shims/unix/fd.rs index 5e5f7d6bc3..15b13d3ba5 100644 --- a/src/shims/unix/fd.rs +++ b/src/shims/unix/fd.rs @@ -10,7 +10,7 @@ use rustc_target::spec::Os; use crate::shims::files::FileDescription; use crate::shims::sig::check_min_vararg_count; -use crate::shims::unix::linux_like::epoll::EpollEvents; +use crate::shims::unix::linux_like::epoll::EpollReadiness; use crate::shims::unix::*; use crate::*; @@ -77,7 +77,7 @@ pub trait UnixFileDescription: FileDescription { } /// Return which epoll events are currently active. - fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> { + fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> { throw_unsup_format!("{}: epoll does not support this file description", self.name()); } } diff --git a/src/shims/unix/linux/foreign_items.rs b/src/shims/unix/linux/foreign_items.rs index f679f27aed..502e4e9307 100644 --- a/src/shims/unix/linux/foreign_items.rs +++ b/src/shims/unix/linux/foreign_items.rs @@ -4,7 +4,6 @@ use rustc_span::Symbol; use rustc_target::callconv::FnAbi; use self::shims::unix::linux::mem::EvalContextExt as _; -use self::shims::unix::linux_like::epoll::EvalContextExt as _; use self::shims::unix::linux_like::eventfd::EvalContextExt as _; use self::shims::unix::linux_like::syscall::syscall; use crate::machine::{SIGRTMAX, SIGRTMIN}; diff --git a/src/shims/unix/linux_like/epoll.rs b/src/shims/unix/linux_like/epoll.rs index bd07e13d47..1758c04c89 100644 --- a/src/shims/unix/linux_like/epoll.rs +++ b/src/shims/unix/linux_like/epoll.rs @@ -17,7 +17,7 @@ type EpollEventKey = (FdId, FdNum); /// An `Epoll` file descriptor connects file handles and epoll events #[derive(Debug, Default)] -struct Epoll { +pub struct Epoll { /// A map of EpollEventInterests registered under this epoll instance. Each entry is /// differentiated using FdId and file descriptor value. interest_list: RefCell>, @@ -55,9 +55,9 @@ pub struct EpollEventInterest { data: u64, } -/// EpollReadyEvents reflects the readiness of a file description. +/// Struct reflecting the readiness of a file description. #[derive(Debug)] -pub struct EpollEvents { +pub struct EpollReadiness { /// The associated file is available for read(2) operations, in the sense that a read will not block. /// (I.e., returning EOF is considered "ready".) pub epollin: bool, @@ -76,9 +76,9 @@ pub struct EpollEvents { pub epollerr: bool, } -impl EpollEvents { - pub fn new() -> Self { - EpollEvents { +impl EpollReadiness { + pub fn empty() -> Self { + EpollReadiness { epollin: false, epollout: false, epollrdhup: false, @@ -114,6 +114,19 @@ impl EpollEvents { } } +// Best-effort mapping from cross platform readiness to epoll readiness. +impl From<&BlockingIoSourceReadiness> for EpollReadiness { + fn from(readiness: &BlockingIoSourceReadiness) -> Self { + Self { + epollin: readiness.readable, + epollout: readiness.writable, + epollrdhup: readiness.read_closed, + epollhup: readiness.write_closed, + epollerr: readiness.error, + } + } +} + impl FileDescription for Epoll { fn name(&self) -> &'static str { "epoll" @@ -354,11 +367,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { interest.data = data; } + let active_events = fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this); + // Deliver events for the new interest. update_readiness( this, &epfd, - fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this), + active_events, /* force_edge */ true, move |callback| { // Need to release the RefCell when this closure returns, so we have to move @@ -479,7 +494,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // This means there'll be a leak if we never wake up, but that anyway would imply // a thread is permanently blocked so this is fine. this.block_thread( - BlockReason::Epoll, + BlockReason::Epoll { epfd: epfd.clone() }, timeout, callback!( @capture<'tcx> { @@ -547,6 +562,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { interp_ok(()) } + + /// Recursively check whether the [`Epoll`] file description contains + /// interests which are host I/O source file descriptions. + fn has_epoll_host_interests(&self, epfd: &FileDescriptionRef) -> bool { + let this = self.eval_context_ref(); + epfd.interest_list.borrow().iter().any(|((fd_id, _fd_num), _)| { + // By looking up whether the file description is currently registered, + // we get whether it's a host I/O source file description. + this.machine.blocking_io.contains_source(fd_id) + }) + } } /// Call this when the interests denoted by `for_each_interest` have their active event set changed @@ -557,7 +583,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// be waking up threads which might require access to those `RefCell`. fn update_readiness<'tcx>( ecx: &mut MiriInterpCx<'tcx>, - epoll: &Epoll, + epoll: &FileDescriptionRef, active_events: u32, force_edge: bool, for_each_interest: impl FnOnce( @@ -587,7 +613,7 @@ fn update_readiness<'tcx>( && let Some(thread_id) = epoll.queue.borrow_mut().pop_front() { drop(ready_set); // release the "lock" so the unblocked thread can have it - ecx.unblock_thread(thread_id, BlockReason::Epoll)?; + ecx.unblock_thread(thread_id, BlockReason::Epoll { epfd: epoll.clone() })?; ready_set = epoll.ready_set.borrow_mut(); } @@ -612,7 +638,7 @@ fn return_ready_list<'tcx>( for (key, interest) in interest_list.iter() { // Ensure this matches the latest readiness of this FD. // We have to do an FD lookup by ID for this. The FdNum might be already closed. - let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap(); + let fd = ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap(); let current_active = fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx); assert_eq!(interest.active_events, current_active & interest.relevant_events); } diff --git a/src/shims/unix/linux_like/eventfd.rs b/src/shims/unix/linux_like/eventfd.rs index 7cccbd0e27..76374ca24d 100644 --- a/src/shims/unix/linux_like/eventfd.rs +++ b/src/shims/unix/linux_like/eventfd.rs @@ -6,7 +6,7 @@ use std::io::ErrorKind; use crate::concurrency::VClock; use crate::shims::files::{FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef}; use crate::shims::unix::UnixFileDescription; -use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _}; +use crate::shims::unix::linux_like::epoll::{EpollReadiness, EvalContextExt as _}; use crate::*; /// Maximum value that the eventfd counter can hold. @@ -114,14 +114,14 @@ impl FileDescription for EventFd { } impl UnixFileDescription for EventFd { - fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> { + fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> { // We only check the status of EPOLLIN and EPOLLOUT flags for eventfd. If other event flags // need to be supported in the future, the check should be added here. - interp_ok(EpollEvents { + interp_ok(EpollReadiness { epollin: self.counter.get() != 0, epollout: self.counter.get() != MAX_COUNTER, - ..EpollEvents::new() + ..EpollReadiness::empty() }) } } diff --git a/src/shims/unix/mod.rs b/src/shims/unix/mod.rs index c55a28bfa7..730c46eb27 100644 --- a/src/shims/unix/mod.rs +++ b/src/shims/unix/mod.rs @@ -20,7 +20,9 @@ mod solarish; pub use self::env::{EvalContextExt as _, UnixEnvVars}; pub use self::fd::{EvalContextExt as _, UnixFileDescription}; pub use self::fs::{DirTable, EvalContextExt as _}; -pub use self::linux_like::epoll::EpollInterestTable; +pub use self::linux_like::epoll::{ + Epoll, EpollInterestTable, EvalContextExt as EpollEvalContextExt, +}; pub use self::mem::EvalContextExt as _; pub use self::socket::EvalContextExt as _; pub use self::sync::EvalContextExt as _; diff --git a/src/shims/unix/socket.rs b/src/shims/unix/socket.rs index 99a6378704..936710b941 100644 --- a/src/shims/unix/socket.rs +++ b/src/shims/unix/socket.rs @@ -1,10 +1,9 @@ -use std::cell::{Cell, RefCell}; +use std::cell::{Cell, RefCell, RefMut}; use std::io::Read; use std::net::{Ipv4Addr, Ipv6Addr, Shutdown, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::time::Duration; use std::{io, iter}; -use mio::Interest; use mio::event::Source; use mio::net::{TcpListener, TcpStream}; use rustc_abi::Size; @@ -12,9 +11,9 @@ use rustc_const_eval::interpret::{InterpResult, interp_ok}; use rustc_middle::throw_unsup_format; use rustc_target::spec::Os; -use crate::concurrency::blocking_io::InterestReceiver; use crate::shims::files::{EvalContextExt as _, FdId, FileDescription, FileDescriptionRef}; use crate::shims::unix::UnixFileDescription; +use crate::shims::unix::linux_like::epoll::{EpollReadiness, EvalContextExt as _}; use crate::*; #[derive(Debug, PartialEq)] @@ -56,6 +55,8 @@ struct Socket { state: RefCell, /// Whether this fd is non-blocking or not. is_non_block: Cell, + /// The current blocking I/O readiness of the file description. + io_readiness: RefCell, } impl FileDescription for Socket { @@ -65,12 +66,21 @@ impl FileDescription for Socket { fn destroy<'tcx>( self, - _self_id: FdId, + self_id: FdId, communicate_allowed: bool, - _ecx: &mut MiriInterpCx<'tcx>, - ) -> InterpResult<'tcx, std::io::Result<()>> { + ecx: &mut MiriInterpCx<'tcx>, + ) -> InterpResult<'tcx, io::Result<()>> { assert!(communicate_allowed, "cannot have `Socket` with isolation enabled!"); + if matches!( + &*self.state.borrow(), + SocketState::Listening(_) | SocketState::Connecting(_) | SocketState::Connected(_) + ) { + // There exists an associated host socket so we need to deregister it + // from the blocking I/O manager. + ecx.machine.blocking_io.deregister(self_id, self) + }; + interp_ok(Ok(())) } @@ -167,12 +177,10 @@ impl FileDescription for Socket { } fn short_fd_operations(&self) -> bool { - // Linux de-facto guarantees (or at least, applications like tokio assume [1, 2]) that - // when a read/write on a streaming socket comes back short, the kernel buffer is - // empty/full. SO we can't do short reads/writes here. - // - // [1]: https://github.com/tokio-rs/tokio/blob/6c03e03898d71eca976ee1ad8481cf112ae722ba/tokio/src/io/poll_evented.rs#L182 - // [2]: https://github.com/tokio-rs/tokio/blob/6c03e03898d71eca976ee1ad8481cf112ae722ba/tokio/src/io/poll_evented.rs#L240 + // Linux guarantees that when a read/write on a streaming socket comes back short, + // the kernel buffer is empty/full: + // See in Q&A section. + // So we can't do short reads/writes here. false } @@ -251,6 +259,10 @@ impl UnixFileDescription for Socket { throw_unsup_format!("ioctl: unsupported operation {op:#x} on socket"); } + + fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> { + interp_ok(EpollReadiness::from(&*self.io_readiness.borrow())) + } } impl<'tcx> EvalContextExt<'tcx> for crate::MiriInterpCx<'tcx> {} @@ -328,6 +340,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { family, state: RefCell::new(SocketState::Initial), is_non_block: Cell::new(is_sock_nonblock), + io_readiness: RefCell::new(BlockingIoSourceReadiness::empty()), }); interp_ok(Scalar::from_i32(fds.insert(fd))) @@ -425,7 +438,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { match *state { SocketState::Bound(socket_addr) => match TcpListener::bind(socket_addr) { - Ok(listener) => *state = SocketState::Listening(listener), + Ok(listener) => { + *state = SocketState::Listening(listener); + drop(state); + // Register the socket to the blocking I/O manager because + // we now have an associated host socket. + this.machine.blocking_io.register(socket); + } Err(e) => return this.set_last_error_and_return_i32(e), }, SocketState::Initial => { @@ -591,7 +610,12 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // don't return an error after receiving an [`Interest::WRITEABLE`] // event on the stream. match TcpStream::connect(address) { - Ok(stream) => *socket.state.borrow_mut() = SocketState::Connecting(stream), + Ok(stream) => { + *socket.state.borrow_mut() = SocketState::Connecting(stream); + // Register the socket to the blocking I/O manager because + // we now have an associated host socket. + this.machine.blocking_io.register(socket.clone()); + } Err(e) => return this.set_last_error_and_return(e, dest), }; @@ -1383,8 +1407,8 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { ) { let this = self.eval_context_mut(); this.block_thread_for_io( - socket.clone(), - Interest::READABLE, + socket.id(), + BlockingIoInterest::Read, None, callback!(@capture<'tcx> { address_ptr: Pointer, @@ -1395,6 +1419,9 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } |this, kind: UnblockKind| { assert_eq!(kind, UnblockKind::Ready); + // Remove the blocking I/O interest for unblocking this thread. + this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread()); + match this.try_non_block_accept(&socket, address_ptr, address_len_ptr, is_client_sock_nonblock)? { Ok(sockfd) => { // We need to create the scalar using the destination size since @@ -1437,6 +1464,13 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { let (stream, addr) = match listener.accept() { Ok(peer) => peer, + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + // We know that the source is not readable so we need to update its readiness. + socket.io_readiness.borrow_mut().readable = false; + this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?; + + return interp_ok(Err(IoError::HostError(e))); + } Err(e) => return interp_ok(Err(IoError::HostError(e))), }; @@ -1460,7 +1494,11 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { family, state: RefCell::new(SocketState::Connected(stream)), is_non_block: Cell::new(is_client_sock_nonblock), + io_readiness: RefCell::new(BlockingIoSourceReadiness::empty()), }); + // Register the socket to the blocking I/O manager because + // there is an associated host socket. + this.machine.blocking_io.register(fd.clone()); let sockfd = this.machine.fds.insert(fd); interp_ok(Ok(sockfd)) } @@ -1481,8 +1519,8 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { ) { let this = self.eval_context_mut(); this.block_thread_for_io( - socket.clone(), - Interest::WRITABLE, + socket.id(), + BlockingIoInterest::Write, None, callback!(@capture<'tcx> { socket: FileDescriptionRef, @@ -1492,8 +1530,12 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } |this, kind: UnblockKind| { assert_eq!(kind, UnblockKind::Ready); + // Remove the blocking I/O interest for unblocking this thread. + this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread()); + match this.try_non_block_send(&socket, buffer_ptr, length)? { Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::WouldBlock => { + // We need to block the thread again as it would still block. this.block_for_send(socket, buffer_ptr, length, finish); interp_ok(()) }, @@ -1524,7 +1566,13 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // FIXME: When the host does a short write, we should emit an epoll edge -- at least for targets for which tokio assumes no short writes: // match result { - Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::NotConnected => { + Err(IoError::HostError(e)) + if matches!(e.kind(), io::ErrorKind::NotConnected | io::ErrorKind::WouldBlock) => + { + // We know that the source is not writable so we need to update it's readiness. + socket.io_readiness.borrow_mut().writable = false; + this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?; + // On Windows hosts, `send` can return WSAENOTCONN where EAGAIN or EWOULDBLOCK // would be returned on UNIX-like systems. We thus remap this error to an EWOULDBLOCK. interp_ok(Err(IoError::HostError(io::ErrorKind::WouldBlock.into()))) @@ -1559,8 +1607,8 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { ) { let this = self.eval_context_mut(); this.block_thread_for_io( - socket.clone(), - Interest::READABLE, + socket.id(), + BlockingIoInterest::Read, None, callback!(@capture<'tcx> { socket: FileDescriptionRef, @@ -1571,6 +1619,9 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } |this, kind: UnblockKind| { assert_eq!(kind, UnblockKind::Ready); + // Remove the blocking I/O interest for unblocking this thread. + this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread()); + match this.try_non_block_recv(&socket, buffer_ptr, length, should_peek)? { Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::WouldBlock => { // We need to block the thread again as it would still block. @@ -1611,7 +1662,13 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // FIXME: When the host does a short read, we should emit an epoll edge -- at least for targets for which tokio assumes no short reads: // match result { - Err(IoError::HostError(e)) if e.kind() == io::ErrorKind::NotConnected => { + Err(IoError::HostError(e)) + if matches!(e.kind(), io::ErrorKind::NotConnected | io::ErrorKind::WouldBlock) => + { + // We know that the source is not readable so we need to update it's readiness. + socket.io_readiness.borrow_mut().readable = false; + this.update_epoll_active_events(socket.clone(), /* force_edge */ false)?; + // On Windows hosts, `recv` can return WSAENOTCONN where EAGAIN or EWOULDBLOCK // would be returned on UNIX-like systems. We thus remap this error to an EWOULDBLOCK. interp_ok(Err(IoError::HostError(io::ErrorKind::WouldBlock.into()))) @@ -1665,8 +1722,8 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { }; this.block_thread_for_io( - socket.clone(), - Interest::WRITABLE, + socket.id(), + BlockingIoInterest::Write, timeout, callback!( @capture<'tcx> { @@ -1675,11 +1732,13 @@ trait EvalContextPrivExt<'tcx>: crate::MiriInterpCxExt<'tcx> { foreign_name: &'static str, action: DynMachineCallback<'tcx, Result<(), ()>>, } |this, kind: UnblockKind| { + // Remove the blocking I/O interest for unblocking this thread. + this.machine.blocking_io.remove_blocked_thread(socket.id(), this.machine.threads.active_thread()); + if UnblockKind::TimedOut == kind { // We can only time out when `should_wait` is false. // This then means that the socket is not yet connected. assert!(!should_wait); - this.machine.blocking_io.deregister(socket.id(), InterestReceiver::UnblockThread(this.active_thread())); return action.call(this, Err(())) } @@ -1764,7 +1823,7 @@ impl VisitProvenance for FileDescriptionRef { fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {} } -impl WithSource for Socket { +impl SourceFileDescription for Socket { fn with_source(&self, f: &mut dyn FnMut(&mut dyn Source) -> io::Result<()>) -> io::Result<()> { let mut state = self.state.borrow_mut(); match &mut *state { @@ -1774,4 +1833,8 @@ impl WithSource for Socket { _ => unreachable!(), } } + + fn get_readiness_mut(&self) -> RefMut<'_, BlockingIoSourceReadiness> { + self.io_readiness.borrow_mut() + } } diff --git a/src/shims/unix/virtual_socket.rs b/src/shims/unix/virtual_socket.rs index 51bd30840f..7fc8556406 100644 --- a/src/shims/unix/virtual_socket.rs +++ b/src/shims/unix/virtual_socket.rs @@ -13,7 +13,7 @@ use crate::shims::files::{ EvalContextExt as _, FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef, }; use crate::shims::unix::UnixFileDescription; -use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _}; +use crate::shims::unix::linux_like::epoll::{EpollReadiness, EvalContextExt as _}; use crate::*; /// The maximum capacity of the socketpair buffer in bytes. @@ -136,12 +136,10 @@ impl FileDescription for VirtualSocket { } fn short_fd_operations(&self) -> bool { - // Linux de-facto guarantees (or at least, applications like tokio assume [1, 2]) that - // when a read/write on a streaming socket comes back short, the kernel buffer is - // empty/full. SO we can't do short reads/writes here. - // - // [1]: https://github.com/tokio-rs/tokio/blob/6c03e03898d71eca976ee1ad8481cf112ae722ba/tokio/src/io/poll_evented.rs#L182 - // [2]: https://github.com/tokio-rs/tokio/blob/6c03e03898d71eca976ee1ad8481cf112ae722ba/tokio/src/io/poll_evented.rs#L240 + // Linux guarantees that when a read/write on a streaming socket comes back short, + // the kernel buffer is empty/full: + // See in Q&A section. + // So we can't do short reads/writes here. false } @@ -392,20 +390,20 @@ fn virtual_socket_read<'tcx>( } impl UnixFileDescription for VirtualSocket { - fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> { + fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> { // We only check the status of EPOLLIN, EPOLLOUT, EPOLLHUP and EPOLLRDHUP flags. // If other event flags need to be supported in the future, the check should be added here. - let mut epoll_ready_events = EpollEvents::new(); + let mut epoll_readiness = EpollReadiness::empty(); // Check if it is readable. if let Some(readbuf) = &self.readbuf { if !readbuf.borrow().buf.is_empty() { - epoll_ready_events.epollin = true; + epoll_readiness.epollin = true; } } else { // Without a read buffer, reading never blocks, so we are always ready. - epoll_ready_events.epollin = true; + epoll_readiness.epollin = true; } // Check if is writable. @@ -414,28 +412,28 @@ impl UnixFileDescription for VirtualSocket { let data_size = writebuf.borrow().buf.len(); let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size); if available_space != 0 { - epoll_ready_events.epollout = true; + epoll_readiness.epollout = true; } } else { // Without a write buffer, writing never blocks. - epoll_ready_events.epollout = true; + epoll_readiness.epollout = true; } } else { // Peer FD has been closed. This always sets both the RDHUP and HUP flags // as we do not support `shutdown` that could be used to partially close the stream. - epoll_ready_events.epollrdhup = true; - epoll_ready_events.epollhup = true; + epoll_readiness.epollrdhup = true; + epoll_readiness.epollhup = true; // Since the peer is closed, even if no data is available reads will return EOF and // writes will return EPIPE. In other words, they won't block, so we mark this as ready // for read and write. - epoll_ready_events.epollin = true; - epoll_ready_events.epollout = true; + epoll_readiness.epollin = true; + epoll_readiness.epollout = true; // If there is data lost in peer_fd, set EPOLLERR. if self.peer_lost_data.get() { - epoll_ready_events.epollerr = true; + epoll_readiness.epollerr = true; } } - interp_ok(epoll_ready_events) + interp_ok(epoll_readiness) } } diff --git a/tests/pass-dep/libc/libc-socket-no-blocking-epoll.rs b/tests/pass-dep/libc/libc-socket-no-blocking-epoll.rs new file mode 100644 index 0000000000..81259c0b83 --- /dev/null +++ b/tests/pass-dep/libc/libc-socket-no-blocking-epoll.rs @@ -0,0 +1,299 @@ +//@only-target: linux android illumos +//@compile-flags: -Zmiri-disable-isolation +//@revisions: windows_host unix_host +//@[unix_host] ignore-host: windows +//@[windows_host] only-host: windows + +#![feature(io_error_inprogress)] + +#[path = "../../utils/libc.rs"] +mod libc_utils; + +use std::io::ErrorKind; +use std::thread; +use std::time::Duration; + +use libc_utils::epoll::*; +use libc_utils::*; + +const TEST_BYTES: &[u8] = b"these are some test bytes!"; + +fn main() { + test_connect_nonblock(); + test_accept_nonblock(); + test_recv_nonblock(); + #[cfg(not(windows_hosts))] + test_send_nonblock(); +} + +/// Test that connecting to a server socket works when the client +/// socket is non-blocking before the `connect` call. +/// Instead of busy waiting until we no longer get ENOTCONN, we register +/// the client socket to epoll and wait for a WRITABLE event. +fn test_connect_nonblock() { + let (server_sockfd, addr) = net::make_listener_ipv4().unwrap(); + let client_sockfd = + unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() }; + let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap(); + + unsafe { + // Change client socket to be non-blocking. + errno_check(libc::fcntl(client_sockfd, libc::F_SETFL, libc::O_NONBLOCK)); + } + + // Spawn the server thread. + let server_thread = thread::spawn(move || { + // Yield to client thread to ensure that it actually needs + // to wait until the connection gets accepted. + thread::sleep(Duration::from_millis(10)); + + net::accept_ipv4(server_sockfd).unwrap(); + }); + + // Non-blocking connects always "fail" with EINPROGRESS. + let err = net::connect_ipv4(client_sockfd, addr).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::InProgress); + + // Add client socket with WRITABLE interest to epoll. + epoll_ctl_add(epfd, client_sockfd, EPOLLOUT | EPOLLET | EPOLLERR).unwrap(); + + // Wait until we are done connecting. + check_epoll_wait::<8>(epfd, &[Ev { events: EPOLLOUT, data: client_sockfd }], -1); + + // FIXME: Check SO_ERROR here once we implemented `getsockopt`. + + // We should now be connected and thus getting the peer name should work. + net::sockname_ipv4(|storage, len| unsafe { libc::getpeername(client_sockfd, storage, len) }) + .unwrap(); + + server_thread.join().unwrap(); +} + +/// Test that accepting incoming connections works with non-blocking server sockets. +/// Instead of busy waiting until we no longer get EWOULDBLOCK, we add the +/// server socket to an epoll instance and wait for a READABLE event. +fn test_accept_nonblock() { + let (server_sockfd, addr) = net::make_listener_ipv4().unwrap(); + let client_sockfd = + unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() }; + let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap(); + + unsafe { + // Change server socket to be non-blocking. + errno_check(libc::fcntl(server_sockfd, libc::F_SETFL, libc::O_NONBLOCK)); + } + + // Spawn the server thread. + let server_thread = thread::spawn(move || { + // Add client socket with READABLE interest to epoll. + epoll_ctl_add(epfd, server_sockfd, EPOLLIN | EPOLLET | EPOLLERR).unwrap(); + + // Wait until we get a readable event on the server socket. + check_epoll_wait::<8>(epfd, &[Ev { events: EPOLLIN, data: server_sockfd }], -1); + + // Accepting should now be possible. + net::accept_ipv4(server_sockfd).unwrap(); + + // Accepting should now trigger an EWOULDBLOCK. + let err = net::accept_ipv4(server_sockfd).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::WouldBlock); + + // Ensure that the server is no longer readable because we just + // attempted to accept without an incoming connection. + assert_eq!(current_epoll_readiness::<8>(server_sockfd, EPOLLIN | EPOLLET), 0); + }); + + // Yield to server thread such that it actually needs to wait for an + // incoming client connection. + thread::sleep(Duration::from_millis(10)); + + net::connect_ipv4(client_sockfd, addr).unwrap(); + + server_thread.join().unwrap(); +} + +/// Test receiving bytes from a connected stream without blocking. +/// Instead of busy waiting until we no longer receive EWOULDBLOCK when trying to +/// read from the client, we register the client socket to epoll and wait for +/// READABLE events. +fn test_recv_nonblock() { + let (server_sockfd, addr) = net::make_listener_ipv4().unwrap(); + let client_sockfd = + unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() }; + let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap(); + + // Spawn the server thread. + let server_thread = thread::spawn(move || { + let (peerfd, _) = net::accept_ipv4(server_sockfd).unwrap(); + // `peerfd` is a blocking socket now. But that's okay, the client still does non-blocking + // reads/writes. + + // Yield back to client so that it starts receiving before we start sending. + thread::sleep(Duration::from_millis(10)); + + unsafe { + errno_result(libc_utils::write_all( + peerfd, + TEST_BYTES.as_ptr().cast(), + TEST_BYTES.len(), + )) + .unwrap() + }; + }); + + net::connect_ipv4(client_sockfd, addr).unwrap(); + + unsafe { + // Change client socket to be non-blocking. + errno_check(libc::fcntl(client_sockfd, libc::F_SETFL, libc::O_NONBLOCK)); + } + + // We are connected and the server socket is not writing. + + let mut buffer = [0; TEST_BYTES.len()]; + // Receiving from a socket when the peer is not writing is + // not possible without blocking. + let err = unsafe { + errno_result(libc::recv(client_sockfd, buffer.as_mut_ptr().cast(), buffer.len(), 0)) + .unwrap_err() + }; + assert_eq!(err.kind(), ErrorKind::WouldBlock); + + // Try to receive bytes from the peer socket without blocking. + // Since the peer socket might do partial writes, we might need to + // call `epoll_wait` multiple times until we received everything. + + // Add client socket with READABLE interest to epoll. + epoll_ctl_add(epfd, client_sockfd, EPOLLIN | EPOLLET | EPOLLERR).unwrap(); + + let mut bytes_received = 0; + + // Receive until we get an EWOULDBLOCK or we read everything. + // We're only allowed to call `epoll_wait` again once we received + // an EWOULDBLOCK because otherwise we could deadlock. + while bytes_received != buffer.len() { + let read_result = unsafe { + errno_result(libc::recv( + client_sockfd, + buffer.as_mut_ptr().byte_add(bytes_received).cast(), + buffer.len() - bytes_received, + 0, + )) + }; + + match read_result { + Ok(received) => bytes_received += received as usize, + Err(err) if err.kind() == ErrorKind::WouldBlock => { + check_epoll_wait::<8>(epfd, &[Ev { events: EPOLLIN, data: client_sockfd }], -1); + } + Err(err) => panic!("unexpected error whilst receiving: {err}"), + } + } + + assert_eq!(&buffer, TEST_BYTES); + + let mut buffer = [0u8; 1]; + let err = unsafe { + errno_result(libc::recv(client_sockfd, buffer.as_mut_ptr().cast(), buffer.len(), 0)) + .unwrap_err() + }; + assert_eq!(err.kind(), ErrorKind::WouldBlock); + + // Ensure that the client is no longer readable because + // we just got an EWOULDBLOCK from a receive call. + assert_eq!(current_epoll_readiness::<8>(client_sockfd, EPOLLIN | EPOLLET), 0); + + server_thread.join().unwrap(); +} + +/// Test sending bytes into a connected stream without blocking. +/// Once the buffer is filled we wait for the epoll WRITABLE +/// readiness instead of busy waiting until we can +/// write again. +/// +/// **Note**: This can only be tested on UNIX hosts because +/// the socket write buffers dynamically grow for localhost +/// connections on Windows. +#[cfg(not(windows_hosts))] +fn test_send_nonblock() { + let (server_sockfd, addr) = net::make_listener_ipv4().unwrap(); + let client_sockfd = + unsafe { errno_result(libc::socket(libc::AF_INET, libc::SOCK_STREAM, 0)).unwrap() }; + let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap(); + + // Spawn the server thread. + let server_thread = thread::spawn(move || { + let (peerfd, _) = net::accept_ipv4(server_sockfd).unwrap(); + peerfd + }); + + net::connect_ipv4(client_sockfd, addr).unwrap(); + + unsafe { + // Change client socket to be non-blocking. + errno_check(libc::fcntl(client_sockfd, libc::F_SETFL, libc::O_NONBLOCK)); + } + + // Try to send bytes to the peer socket without blocking. + // We first fill up the buffer and ensure that we keep the + // writable readiness during the fill up process. + + // Add client socket with READABLE interest to epoll. + epoll_ctl_add(epfd, client_sockfd, EPOLLOUT | EPOLLET | EPOLLERR).unwrap(); + + let fill_buf = [1u8; 32_000]; + let mut total_written = 0usize; + loop { + // Ensure the socket is still writable. + assert_eq!(current_epoll_readiness::<8>(client_sockfd, EPOLLOUT | EPOLLET), EPOLLOUT); + + let result = unsafe { + errno_result(libc::send(client_sockfd, fill_buf.as_ptr().cast(), fill_buf.len(), 0)) + }; + + match result { + Ok(written) => total_written += written as usize, + Err(err) if err.kind() == ErrorKind::WouldBlock => break, + Err(err) => panic!("unexpected error whilst filling up buffer: {err}"), + } + } + + // The buffer should be filled up because we received an EWOULDBLOCK. + // This also means that the client socket should no longer be writable. + assert_eq!(current_epoll_readiness::<8>(client_sockfd, EPOLLOUT | EPOLLET), 0); + + let peerfd = server_thread.join().unwrap(); + + // Spawn the reader thread. + let reader_thread = thread::spawn(move || { + // Read half of the bytes needed to fill the buffer. + // This should make the client buffer writable again. + // We need to do it this way because different hosts + // have different read thresholds after which they + // trigger a new writable event. + let mut buffer = Vec::with_capacity(total_written / 2); + buffer.fill(0u8); + unsafe { + errno_result(libc_utils::read_all_generic( + buffer.as_mut_ptr().cast(), + total_written / 2, + libc_utils::NoRetry, + |buf, count| libc::recv(peerfd, buf, count, 0), + )) + .unwrap() + }; + }); + + // Wait until the socket is again writable. + check_epoll_wait::<8>(epfd, &[Ev { events: EPOLLOUT, data: client_sockfd }], -1); + + let fill_buf = [1u8; 100]; + // We should be able to write again without blocking because we just received + // a writable event. + unsafe { + errno_result(libc::send(client_sockfd, fill_buf.as_ptr().cast(), fill_buf.len(), 0)) + .unwrap() + }; + + reader_thread.join().unwrap(); +} diff --git a/tests/utils/libc.rs b/tests/utils/libc.rs index 26797ee4c3..89a610194c 100644 --- a/tests/utils/libc.rs +++ b/tests/utils/libc.rs @@ -155,7 +155,7 @@ pub mod epoll { use libc::c_int; pub use libc::{EPOLL_CTL_ADD, EPOLL_CTL_DEL, EPOLL_CTL_MOD}; // Re-export some constants we need a lot for this. - pub use libc::{EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT, EPOLLRDHUP}; + pub use libc::{EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT, EPOLLRDHUP}; use super::*; @@ -204,6 +204,30 @@ pub mod epoll { pub fn check_epoll_wait_noblock(epfd: i32, expected: &[Ev]) { check_epoll_wait::(epfd, expected, 0); } + + /// Query the current epoll readiness of a file descriptor. + /// This is done by creating a new epoll instance, adding the + /// fd to the epoll interests and then performing a zero-timeout + /// wait. + pub fn current_epoll_readiness(fd: i32, interests: i32) -> c_int { + let epfd = errno_result(unsafe { libc::epoll_create1(0) }).unwrap(); + // Add fd with all possible interests to epoll instance. + epoll_ctl_add(epfd, fd, interests).unwrap(); + + let mut array: [libc::epoll_event; N] = [libc::epoll_event { events: 0, u64: 0 }; N]; + let num = errno_result(unsafe { + // Use zero-timeout to just query without waiting. + libc::epoll_wait(epfd, array.as_mut_ptr(), N.try_into().unwrap(), 0) + }) + .expect("epoll_wait returned an error"); + + let mut readiness = 0; + let events = &mut array[..num.try_into().unwrap()]; + events.iter().for_each(|e| { + readiness |= e.events.cast_signed(); + }); + readiness + } } pub mod net {