Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions src/experimental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,62 @@ impl GetAttrResponse {
}
}

#[derive(Debug)]
enum Runtime {
/// Use the runtime provided by the user
External(tokio::runtime::Handle),
/// Use an internal runtime
Internal(tokio::runtime::Runtime),
}

/// Adapter to allow running an [`AsyncFilesystem`] with tokio's runtime.
#[derive(Debug)]
pub struct TokioAdapter<T: AsyncFilesystem> {
inner: Arc<T>,
runtime: tokio::runtime::Runtime,
runtime: Runtime,
}

impl<T: AsyncFilesystem> TokioAdapter<T> {

/// Create a new adapter with an internal tokio runtime.
pub fn new(inner: T) -> Self {
Self {
inner: Arc::new(inner),
runtime: tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
runtime: Runtime::Internal(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap(),
),
}
}

/// Use an existing tokio runtime. This is useful if you want to run the filesystem
/// in an existing application that already uses a tokio runtime.
pub fn with_runtime(inner: T, runtime_handle: tokio::runtime::Handle) -> Self {
Self {
inner: Arc::new(inner),
runtime: Runtime::External(runtime_handle),
Comment on lines +156 to +159
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Keep the external Tokio runtime alive for the mount

TokioAdapter::with_runtime stores only a tokio::runtime::Handle, which can spawn onto a runtime but does not own it. That means a caller can mount successfully and still end up with an adapter whose executor disappears as soon as the original Runtime value is dropped or shut down—for example with spawn_mount(...) after setup code exits, or with a temporary Runtime::new()?.handle().clone(). In that state new FUSE requests are still accepted, but there is no longer a guaranteed live executor to run the async handlers, so the mount can stop servicing operations unexpectedly.

Useful? React with 👍 / 👎.

}
}

fn spawn<F>(&self, fut: F)
where
F: std::future::Future<Output = ()> + Send + 'static,
{
match &self.runtime {
Runtime::External(handle) => handle.spawn(fut),
Runtime::Internal(rt) => rt.spawn(fut),
};
}
}

impl<T: AsyncFilesystem + Send + Sync + 'static> Filesystem for TokioAdapter<T> {
fn lookup(&self, req: &Request, parent: INodeNo, name: &OsStr, reply: ReplyEntry) {
let context: RequestContext = req.into();
let name = name.to_os_string();
let inner = self.inner.clone();
self.runtime.spawn(async move {
self.spawn(async move {
match inner.lookup(&context, parent, &name).await {
Ok(LookupResponse {
ttl,
Expand All @@ -160,7 +191,7 @@ impl<T: AsyncFilesystem + Send + Sync + 'static> Filesystem for TokioAdapter<T>
fn getattr(&self, req: &Request, ino: INodeNo, fh: Option<FileHandle>, reply: ReplyAttr) {
let context: RequestContext = req.into();
let inner = self.inner.clone();
self.runtime.spawn(async move {
self.spawn(async move {
match inner.getattr(&context, ino, fh).await {
Ok(GetAttrResponse { ttl, attr }) => reply.attr(&ttl, &attr),
Err(e) => reply.error(e),
Expand All @@ -181,7 +212,7 @@ impl<T: AsyncFilesystem + Send + Sync + 'static> Filesystem for TokioAdapter<T>
) {
let context: RequestContext = req.into();
let inner = self.inner.clone();
self.runtime.spawn(async move {
self.spawn(async move {
let mut buf = vec![];
match inner
.read(&context, ino, fh, offset, size, flags, lock_owner, &mut buf)
Expand All @@ -203,7 +234,7 @@ impl<T: AsyncFilesystem + Send + Sync + 'static> Filesystem for TokioAdapter<T>
) {
let context: RequestContext = req.into();
let inner = self.inner.clone();
self.runtime.spawn(async move {
self.spawn(async move {
let builder = DirEntListBuilder {
entries: &mut reply,
};
Expand Down