Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def safe_lock(fd: int, block: bool = True, shared: bool = False) -> None:
if e.errno in (errno.EACCES, errno.EAGAIN):
# Nonblocking lock not available.
raise
elif e.errno == errno.EIO:
elif e.errno in (errno.EIO, errno.ENOLCK):
# Sometimes Ceph produces IO errors when talking to lock files.
# Back off and try again.
# TODO: Should we eventually give up if the disk really is
Expand All @@ -156,9 +156,18 @@ def safe_lock(fd: int, block: bool = True, shared: bool = False) -> None:
error_tries += 1
continue
else:
logger.critical(
"Too many IO errors talking to lock file. If using Ceph, check for MDS deadlocks. See <https://tracker.ceph.com/issues/62123>."
)
if e.errno == errno.ENOLCK:
logger.critical(
"No locks available after %d retries. The filesystem at the "
"lock path may not support POSIX file locking (common on some "
"NFS setups). Use --coordinationDir to point at local storage.",
MAX_ERROR_TRIES,
)
else:
logger.critical(
"Too many IO errors talking to lock file. If using Ceph, "
"check for MDS deadlocks. See <https://tracker.ceph.com/issues/62123>.",
)
raise
else:
raise
Expand All @@ -171,11 +180,12 @@ def safe_unlock_and_close(fd: int) -> None:
try:
fcntl.flock(fd, fcntl.LOCK_UN)
except OSError as e:
if e.errno != errno.EIO:
raise
# Sometimes Ceph produces EIO. We don't need to retry then because
# we're going to close the FD and after that the file can't remain
# locked by us.
if e.errno in (errno.EIO, errno.ENOLCK):
# We don't need to retry then because we're going to close the FD
# and after that the file can't remain locked by us.
pass
else:
raise
os.close(fd)


Expand Down
54 changes: 52 additions & 2 deletions src/toil/test/src/threadingTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import traceback
from functools import partial
from pathlib import Path
import errno

from toil.lib.threading import LastProcessStandingArena, cpu_count, global_mutex
from toil.lib.threading import LastProcessStandingArena, cpu_count, global_mutex, safe_lock, safe_unlock_and_close
from unittest.mock import patch

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,7 +70,55 @@ def testLastProcessStanding(self, tmp_path: Path) -> None:
assert not filename.startswith(
"precious"
), f"File {filename} still exists"


class BaseSafeLockingTest:
"""
Base class for testing retry and error-swallowing behavior in safe_lock
and safe_unlock_and_close. Subclasses provide the specific OSError to test
by implementing get_error().
"""
def get_error(self) -> Exception:
"""Return the OSError to simulate in tests. Must be implemented by subclasses."""
raise NotImplementedError

def test_safe_lock_retries(self) -> None:
"""safe_lock should retry on a transient error and succeed on the second attempt."""
error = self.get_error()
# First call raises error, second call succeeds
with patch("fcntl.flock", side_effect=[error, None]) as mock_flock:
safe_lock(0)
assert mock_flock.call_count == 2

def test_safe_lock_fails_after_max_retries(self) -> None:
"""safe_lock should raise OSError after exhausting all retries."""
error = self.get_error()
# First call raises error, second call succeeds
with patch("fcntl.flock", side_effect=error):
with patch("toil.lib.threading.time.sleep"): # skip the backoff waits
try:
safe_lock(0)
assert False, "Expected OSError to be raised"
except OSError as e:
assert e.errno == error.errno

def test_safe_unlock_and_close_swallows(self) -> None:
"""safe_unlock_and_close should swallow the error and still close the fd."""
error = self.get_error()
# First call raises error, second call succeeds
with patch("fcntl.flock", side_effect=error):
with patch("os.close") as mock_close:
safe_unlock_and_close(0)
mock_close.assert_called_once_with(0)

class TestENOLCKSafeLocking(BaseSafeLockingTest):
"""Tests safe_lock and safe_unlock_and_close behavior when fcntl raises ENOLCK (NFS lockd unavailable)."""
def get_error(self) -> Exception:
return OSError(errno.ENOLCK, "No locks available")

class TestEIOSafeLocking(BaseSafeLockingTest):
"""Tests safe_lock and safe_unlock_and_close behavior when fcntl raises EIO (Ceph IO error)."""
def get_error(self) -> Exception:
return OSError(errno.EIO, "Input/Output Error")

def _testGlobalMutexOrderingTask(scope: Path, mutex: str, number: int) -> bool:
try:
Expand Down
Loading