Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
381 changes: 269 additions & 112 deletions src/concurrency/blocking_io.rs

Large diffs are not rendered by default.

57 changes: 33 additions & 24 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use rustc_span::{DUMMY_SP, Span};
use rustc_target::spec::Os;

use crate::concurrency::GlobalDataRaceHandler;
use crate::concurrency::blocking_io::InterestReceiver;
use crate::shims::tls;
use crate::shims::{Epoll, EpollEvalContextExt, FileDescriptionRef, tls};
use crate::*;

#[derive(Clone, Copy, Debug, PartialEq)]
Expand Down Expand Up @@ -108,7 +107,7 @@ pub enum BlockReason {
/// Blocked on an InitOnce.
InitOnce,
/// Blocked on epoll.
Epoll,
Epoll { epfd: FileDescriptionRef<Epoll> },
/// Blocked on eventfd.
Eventfd,
/// Blocked on virtual socket.
Expand Down Expand Up @@ -766,7 +765,9 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
.filter(|(_id, thread)| thread.state.is_enabled());
// Pick a new thread, and switch to it.
let new_thread = if thread_manager.fixed_scheduling {
threads_iter.next()
let next = threads_iter.next();
drop(threads_iter);
next
} else {
threads_iter.choose(rng)
};
Expand All @@ -787,48 +788,56 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
return interp_ok(SchedulingAction::ExecuteStep);
}

// We have not found a thread to execute.
if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
let threads = &this.machine.threads.threads;

if threads.iter().all(|thread| thread.state.is_terminated()) {
unreachable!("all threads terminated without the main thread terminating?!");
} else if let Some(sleep_time) = potential_sleep_time {
// All threads are currently blocked, but we have unexecuted
// timeout_callbacks, which may unblock some of the threads. Hence,
// sleep until the first callback.
interp_ok(SchedulingAction::SleepAndWaitForIo(Some(sleep_time)))
} else if thread_manager
.threads
.iter()
.any(|thread| thread.state.is_blocked_on(&BlockReason::IO))
{
// At least one thread is blocked on host I/O but doesn't
// have a timeout set. Hence, we sleep indefinitely in the
// hope that eventually an I/O event for this thread happens.
} else if threads.iter().any(|thread| this.is_thread_blocked_on_host(thread)) {
// At least one thread doesn't have a timeout set, and is blocked on host I/O or is waiting on an
// epoll instance which contains a host source interest. Hence, we sleep indefinitely in the
// hope that eventually an I/O event happens.
interp_ok(SchedulingAction::SleepAndWaitForIo(None))
} else {
throw_machine_stop!(TerminationInfo::GlobalDeadlock);
}
}

/// Check whether the provided thread is currently blocked on host I/O.
/// This means, it's either blocked on an I/O operation directly or it's
/// blocked on an epoll instance which contains a host source interest.
fn is_thread_blocked_on_host(&self, thread: &Thread<'tcx>) -> bool {
let this = self.eval_context_ref();
match &thread.state {
ThreadState::Blocked { reason: BlockReason::IO, .. } => true,
ThreadState::Blocked { reason: BlockReason::Epoll { epfd }, .. } =>
this.has_epoll_host_interests(epfd),
_ => false,
}
}

/// Poll for I/O events until either an I/O event happened or the timeout expired.
/// The different timeout values are described in [`BlockingIoManager::poll`].
///
/// Unblocks all threads which are blocked on I/O and whose I/O interests
/// are currently fulfilled.
fn poll_and_unblock(&mut self, timeout: Option<Duration>) -> InterpResult<'tcx> {
let this = self.eval_context_mut();

let ready = match this.machine.blocking_io.poll(timeout) {
Ok(ready) => ready,
match BlockingIoManager::poll_and_unblock(this, timeout)? {
Ok(_) => interp_ok(()),
// We can ignore errors originating from interrupts; that's just a spurious wakeup.
Err(e) if e.kind() == io::ErrorKind::Interrupted => return interp_ok(()),
Err(e) if e.kind() == io::ErrorKind::Interrupted => interp_ok(()),
// For other errors we panic. On Linux and BSD hosts this should only be
// reachable when a system resource error (e.g. ENOMEM or ENOSPC) occurred.
Err(e) => panic!("unexpected error while polling: {e}"),
};

ready.into_iter().try_for_each(|(receiver, _source)| {
match receiver {
InterestReceiver::UnblockThread(thread_id) =>
this.unblock_thread(thread_id, BlockReason::IO),
}
})
}
}

/// Find all threads with expired timeouts, unblock them and execute their timeout callbacks.
Expand Down
5 changes: 4 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ pub use crate::borrow_tracker::{
BorTag, BorrowTrackerMethod, EvalContextExt as _, TreeBorrowsParams,
};
pub use crate::clock::{Instant, MonotonicClock};
pub use crate::concurrency::blocking_io::{BlockingIoManager, EvalContextExt as _, WithSource};
pub use crate::concurrency::blocking_io::{
BlockingIoInterest, BlockingIoManager, BlockingIoSourceReadiness, EvalContextExt as _,
SourceFileDescription,
};
pub use crate::concurrency::cpu_affinity::MAX_CPUS;
pub use crate::concurrency::data_race::{
AtomicFenceOrd, AtomicReadOrd, AtomicRwOrd, AtomicWriteOrd, EvalContextExt as _,
Expand Down
8 changes: 8 additions & 0 deletions src/shims/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ impl<T: ?Sized> FileDescriptionRef<T> {
}
}

impl<T: ?Sized> PartialEq for FileDescriptionRef<T> {
fn eq(&self, other: &Self) -> bool {
self.0.id == other.0.id
}
}

impl<T: ?Sized> Eq for FileDescriptionRef<T> {}

/// Holds a weak reference to the actual file description.
#[derive(Debug)]
pub struct WeakFileDescriptionRef<T: ?Sized>(Weak<FdIdWith<T>>);
Expand Down
4 changes: 2 additions & 2 deletions src/shims/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ pub mod time;
pub mod tls;
pub mod unwind;

pub use self::files::{FdId, FdTable, FileDescriptionRef};
pub use self::files::{FdId, FdTable, FileDescription, FileDescriptionRef, WeakFileDescriptionRef};
#[cfg(all(feature = "native-lib", unix))]
pub use self::native_lib::trace::{init_sv, register_retcode_sv};
pub use self::unix::{DirTable, EpollInterestTable};
pub use self::unix::{DirTable, Epoll, EpollEvalContextExt, EpollInterestTable};

/// What needs to be done after emulating an item (a shim or an intrinsic) is done.
pub enum EmulateItemResult {
Expand Down
4 changes: 2 additions & 2 deletions src/shims/unix/fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rustc_target::spec::Os;

use crate::shims::files::FileDescription;
use crate::shims::sig::check_min_vararg_count;
use crate::shims::unix::linux_like::epoll::EpollEvents;
use crate::shims::unix::linux_like::epoll::EpollReadiness;
use crate::shims::unix::*;
use crate::*;

Expand Down Expand Up @@ -77,7 +77,7 @@ pub trait UnixFileDescription: FileDescription {
}

/// Return which epoll events are currently active.
fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> {
throw_unsup_format!("{}: epoll does not support this file description", self.name());
}
}
Expand Down
1 change: 0 additions & 1 deletion src/shims/unix/linux/foreign_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use rustc_span::Symbol;
use rustc_target::callconv::FnAbi;

use self::shims::unix::linux::mem::EvalContextExt as _;
use self::shims::unix::linux_like::epoll::EvalContextExt as _;
use self::shims::unix::linux_like::eventfd::EvalContextExt as _;
use self::shims::unix::linux_like::syscall::syscall;
use crate::machine::{SIGRTMAX, SIGRTMIN};
Expand Down
48 changes: 37 additions & 11 deletions src/shims/unix/linux_like/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type EpollEventKey = (FdId, FdNum);

/// An `Epoll` file descriptor connects file handles and epoll events
#[derive(Debug, Default)]
struct Epoll {
pub struct Epoll {
/// A map of EpollEventInterests registered under this epoll instance. Each entry is
/// differentiated using FdId and file descriptor value.
interest_list: RefCell<BTreeMap<EpollEventKey, EpollEventInterest>>,
Expand Down Expand Up @@ -55,9 +55,9 @@ pub struct EpollEventInterest {
data: u64,
}

/// EpollReadyEvents reflects the readiness of a file description.
/// Struct reflecting the readiness of a file description.
#[derive(Debug)]
pub struct EpollEvents {
pub struct EpollReadiness {
/// The associated file is available for read(2) operations, in the sense that a read will not block.
/// (I.e., returning EOF is considered "ready".)
pub epollin: bool,
Expand All @@ -76,9 +76,9 @@ pub struct EpollEvents {
pub epollerr: bool,
}

impl EpollEvents {
pub fn new() -> Self {
EpollEvents {
impl EpollReadiness {
pub fn empty() -> Self {
EpollReadiness {
epollin: false,
epollout: false,
epollrdhup: false,
Expand Down Expand Up @@ -114,6 +114,19 @@ impl EpollEvents {
}
}

// Best-effort mapping from cross platform readiness to epoll readiness.
impl From<&BlockingIoSourceReadiness> for EpollReadiness {
fn from(readiness: &BlockingIoSourceReadiness) -> Self {
Self {
epollin: readiness.readable,
epollout: readiness.writable,
epollrdhup: readiness.read_closed,
epollhup: readiness.write_closed,
epollerr: readiness.error,
}
}
}

impl FileDescription for Epoll {
fn name(&self) -> &'static str {
"epoll"
Expand Down Expand Up @@ -354,11 +367,13 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
interest.data = data;
}

let active_events = fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this);

// Deliver events for the new interest.
update_readiness(
this,
&epfd,
fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this),
active_events,
/* force_edge */ true,
move |callback| {
// Need to release the RefCell when this closure returns, so we have to move
Expand Down Expand Up @@ -479,7 +494,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// This means there'll be a leak if we never wake up, but that anyway would imply
// a thread is permanently blocked so this is fine.
this.block_thread(
BlockReason::Epoll,
BlockReason::Epoll { epfd: epfd.clone() },
timeout,
callback!(
@capture<'tcx> {
Expand Down Expand Up @@ -547,6 +562,17 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {

interp_ok(())
}

/// Recursively check whether the [`Epoll`] file description contains
/// interests which are host I/O source file descriptions.
fn has_epoll_host_interests(&self, epfd: &FileDescriptionRef<Epoll>) -> bool {
let this = self.eval_context_ref();
epfd.interest_list.borrow().iter().any(|((fd_id, _fd_num), _)| {
// By looking up whether the file description is currently registered,
// we get whether it's a host I/O source file description.
this.machine.blocking_io.contains_source(fd_id)
})
}
}

/// Call this when the interests denoted by `for_each_interest` have their active event set changed
Expand All @@ -557,7 +583,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// be waking up threads which might require access to those `RefCell`.
fn update_readiness<'tcx>(
ecx: &mut MiriInterpCx<'tcx>,
epoll: &Epoll,
epoll: &FileDescriptionRef<Epoll>,
active_events: u32,
force_edge: bool,
for_each_interest: impl FnOnce(
Expand Down Expand Up @@ -587,7 +613,7 @@ fn update_readiness<'tcx>(
&& let Some(thread_id) = epoll.queue.borrow_mut().pop_front()
{
drop(ready_set); // release the "lock" so the unblocked thread can have it
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
ecx.unblock_thread(thread_id, BlockReason::Epoll { epfd: epoll.clone() })?;
ready_set = epoll.ready_set.borrow_mut();
}

Expand All @@ -612,7 +638,7 @@ fn return_ready_list<'tcx>(
for (key, interest) in interest_list.iter() {
// Ensure this matches the latest readiness of this FD.
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
let fd = ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
let current_active = fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
assert_eq!(interest.active_events, current_active & interest.relevant_events);
}
Expand Down
8 changes: 4 additions & 4 deletions src/shims/unix/linux_like/eventfd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::ErrorKind;
use crate::concurrency::VClock;
use crate::shims::files::{FdId, FileDescription, FileDescriptionRef, WeakFileDescriptionRef};
use crate::shims::unix::UnixFileDescription;
use crate::shims::unix::linux_like::epoll::{EpollEvents, EvalContextExt as _};
use crate::shims::unix::linux_like::epoll::{EpollReadiness, EvalContextExt as _};
use crate::*;

/// Maximum value that the eventfd counter can hold.
Expand Down Expand Up @@ -114,14 +114,14 @@ impl FileDescription for EventFd {
}

impl UnixFileDescription for EventFd {
fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollEvents> {
fn epoll_active_events<'tcx>(&self) -> InterpResult<'tcx, EpollReadiness> {
// We only check the status of EPOLLIN and EPOLLOUT flags for eventfd. If other event flags
// need to be supported in the future, the check should be added here.

interp_ok(EpollEvents {
interp_ok(EpollReadiness {
epollin: self.counter.get() != 0,
epollout: self.counter.get() != MAX_COUNTER,
..EpollEvents::new()
..EpollReadiness::empty()
})
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/shims/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ mod solarish;
pub use self::env::{EvalContextExt as _, UnixEnvVars};
pub use self::fd::{EvalContextExt as _, UnixFileDescription};
pub use self::fs::{DirTable, EvalContextExt as _};
pub use self::linux_like::epoll::EpollInterestTable;
pub use self::linux_like::epoll::{
Epoll, EpollInterestTable, EvalContextExt as EpollEvalContextExt,
};
pub use self::mem::EvalContextExt as _;
pub use self::socket::EvalContextExt as _;
pub use self::sync::EvalContextExt as _;
Expand Down
Loading