diff --git a/src/toil/lib/threading.py b/src/toil/lib/threading.py index 90faea876f..ed615eb2df 100644 --- a/src/toil/lib/threading.py +++ b/src/toil/lib/threading.py @@ -141,7 +141,7 @@ def safe_lock(fd: int, block: bool = True, shared: bool = False) -> None: if e.errno in (errno.EACCES, errno.EAGAIN): # Nonblocking lock not available. raise - elif e.errno == errno.EIO: + elif e.errno in (errno.EIO, errno.ENOLCK): # Sometimes Ceph produces IO errors when talking to lock files. # Back off and try again. # TODO: Should we eventually give up if the disk really is @@ -156,9 +156,18 @@ def safe_lock(fd: int, block: bool = True, shared: bool = False) -> None: error_tries += 1 continue else: - logger.critical( - "Too many IO errors talking to lock file. If using Ceph, check for MDS deadlocks. See ." - ) + if e.errno == errno.ENOLCK: + logger.critical( + "No locks available after %d retries. The filesystem at the " + "lock path may not support POSIX file locking (common on some " + "NFS setups). Use --coordinationDir to point at local storage.", + MAX_ERROR_TRIES, + ) + else: + logger.critical( + "Too many IO errors talking to lock file. If using Ceph, " + "check for MDS deadlocks. See .", + ) raise else: raise @@ -171,11 +180,12 @@ def safe_unlock_and_close(fd: int) -> None: try: fcntl.flock(fd, fcntl.LOCK_UN) except OSError as e: - if e.errno != errno.EIO: - raise - # Sometimes Ceph produces EIO. We don't need to retry then because - # we're going to close the FD and after that the file can't remain - # locked by us. + if e.errno in (errno.EIO, errno.ENOLCK): + # We don't need to retry then because we're going to close the FD + # and after that the file can't remain locked by us. + pass + else: + raise os.close(fd) diff --git a/src/toil/test/src/threadingTest.py b/src/toil/test/src/threadingTest.py index 89d15e7231..6a6a0e1b04 100644 --- a/src/toil/test/src/threadingTest.py +++ b/src/toil/test/src/threadingTest.py @@ -6,8 +6,10 @@ import traceback from functools import partial from pathlib import Path +import errno -from toil.lib.threading import LastProcessStandingArena, cpu_count, global_mutex +from toil.lib.threading import LastProcessStandingArena, cpu_count, global_mutex, safe_lock, safe_unlock_and_close +from unittest.mock import patch log = logging.getLogger(__name__) @@ -68,7 +70,55 @@ def testLastProcessStanding(self, tmp_path: Path) -> None: assert not filename.startswith( "precious" ), f"File {filename} still exists" - + +class BaseSafeLockingTest: + """ + Base class for testing retry and error-swallowing behavior in safe_lock + and safe_unlock_and_close. Subclasses provide the specific OSError to test + by implementing get_error(). + """ + def get_error(self) -> OSError: + """Return the OSError to simulate in tests. Must be implemented by subclasses.""" + raise NotImplementedError + + def test_safe_lock_retries(self) -> None: + """safe_lock should retry on a transient error and succeed on the second attempt.""" + error = self.get_error() + # First call raises error, second call succeeds + with patch("fcntl.flock", side_effect=[error, None]) as mock_flock: + safe_lock(0) + assert mock_flock.call_count == 2 + + def test_safe_lock_fails_after_max_retries(self) -> None: + """safe_lock should raise OSError after exhausting all retries.""" + error = self.get_error() + # First call raises error, second call succeeds + with patch("fcntl.flock", side_effect=error): + with patch("toil.lib.threading.time.sleep"): # skip the backoff waits + try: + safe_lock(0) + assert False, "Expected OSError to be raised" + except OSError as e: + assert e.errno == error.errno + + def test_safe_unlock_and_close_swallows(self) -> None: + """safe_unlock_and_close should swallow the error and still close the fd.""" + error = self.get_error() + # First call raises error, second call succeeds + with patch("fcntl.flock", side_effect=error): + with patch("os.close") as mock_close: + safe_unlock_and_close(0) + mock_close.assert_called_once_with(0) + +class TestENOLCKSafeLocking(BaseSafeLockingTest): + """Tests safe_lock and safe_unlock_and_close behavior when fcntl raises ENOLCK (NFS lockd unavailable).""" + def get_error(self) -> OSError: + return OSError(errno.ENOLCK, "No locks available") + +class TestEIOSafeLocking(BaseSafeLockingTest): + """Tests safe_lock and safe_unlock_and_close behavior when fcntl raises EIO (Ceph IO error).""" + def get_error(self) -> OSError: + return OSError(errno.EIO, "Input/Output Error") def _testGlobalMutexOrderingTask(scope: Path, mutex: str, number: int) -> bool: try: