Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/toil/lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def safe_lock(fd: int, block: bool = True, shared: bool = False) -> None:
if e.errno in (errno.EACCES, errno.EAGAIN):
# Nonblocking lock not available.
raise
elif e.errno == errno.EIO:
elif e.errno in (errno.EIO, errno.ENOLCK):
# Sometimes Ceph produces IO errors when talking to lock files.
# Back off and try again.
# TODO: Should we eventually give up if the disk really is
Expand All @@ -156,9 +156,17 @@ def safe_lock(fd: int, block: bool = True, shared: bool = False) -> None:
error_tries += 1
continue
else:
logger.critical(
"Too many IO errors talking to lock file. If using Ceph, check for MDS deadlocks. See <https://tracker.ceph.com/issues/62123>."
)
if e.errno == errno.ENOLCK:
logger.critical(
"No locks available after %d retries. The filesystem at the "
"lock path may not support POSIX file locking (common on some "
"NFS setups). Use --coordinationDir to point at local storage.",
MAX_ERROR_TRIES,
)
else:
logger.critical(
"Too many IO errors talking to lock file. If using Ceph, check for MDS deadlocks. See <https://tracker.ceph.com/issues/62123>."
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might wrap this message like the new one.

)
raise
else:
raise
Expand All @@ -171,7 +179,7 @@ def safe_unlock_and_close(fd: int) -> None:
try:
fcntl.flock(fd, fcntl.LOCK_UN)
except OSError as e:
if e.errno != errno.EIO:
if e.errno not in (errno.EIO, errno.ENOLCK):
raise
# Sometimes Ceph produces EIO. We don't need to retry then because
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment now only mentions one of the two cases its branch needs to implement. We probably could drop that and just talk about how we don't need to retry.

# we're going to close the FD and after that the file can't remain
Expand Down
57 changes: 56 additions & 1 deletion src/toil/test/src/threadingTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import traceback
from functools import partial
from pathlib import Path
import errno

from toil.lib.threading import LastProcessStandingArena, cpu_count, global_mutex
from toil.lib.threading import LastProcessStandingArena, cpu_count, global_mutex, safe_lock, safe_unlock_and_close
from unittest.mock import patch

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -68,6 +70,59 @@ def testLastProcessStanding(self, tmp_path: Path) -> None:
assert not filename.startswith(
"precious"
), f"File {filename} still exists"

# Tests for ENOLCK (toil#4846)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Is it better as a docstring?

def testSafeLockRetriesOnENOLCK(self) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These new function names ought to be snake_case.

enolck = OSError(errno.ENOLCK, "No locks available")
# First call raises ENOLCK, second call succeeds
with patch("fcntl.flock", side_effect=[enolck, None]) as mock_flock:
safe_lock(0)
assert mock_flock.call_count == 2

def testSafeLockFailsAfterMaxRetriesOnENOLCK(self) -> None:
enolck = OSError(errno.ENOLCK, "No locks available")
# First call raises ENOLCK, second call succeeds
with patch("fcntl.flock", side_effect=enolck):
with patch("toil.lib.threading.time.sleep"): # skip the backoff waits
try:
safe_lock(0)
assert False, "Expected OSError to be raised"
except OSError as e:
assert e.errno == errno.ENOLCK

def testSafeLockRetriesOnEIO(self) -> None:
eio = OSError(errno.EIO, "Input/Output Error")
# First call raises EIO, second call succeeds
with patch("fcntl.flock", side_effect=[eio, None]) as mock_flock:
safe_lock(0)
assert mock_flock.call_count == 2

def testSafeLockFailsAfterMaxRetriesOnEIO(self) -> None:
eio = OSError(errno.EIO, "Input/Output Error")
# First call raises EIO, second call succeeds
with patch("fcntl.flock", side_effect=eio):
with patch("toil.lib.threading.time.sleep"): # skip the backoff waits
try:
safe_lock(0)
assert False, "Expected OSError to be raised"
except OSError as e:
assert e.errno == errno.EIO

def testSafeUnlockAndCloseSwallowsENOLCK(self) -> None:
enolck = OSError(errno.ENOLCK, "No locks available")
# First call raises ENOLCK, second call succeeds
with patch("fcntl.flock", side_effect=enolck):
with patch("os.close") as mock_close:
safe_unlock_and_close(0)
mock_close.assert_called_once_with(0)

def testSafeUnlockAndCloseSwallowsEIO(self) -> None:
# First call raises EIO, second call succeeds
eio = OSError(errno.EIO, "Input/output error")
with patch("fcntl.flock", side_effect=eio):
with patch("os.close") as mock_close:
safe_unlock_and_close(0)
mock_close.assert_called_once_with(0)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two substantially identical sets of 3 tests here, which differ just on the errno value and message used. We should consolidate them, either using inheritance (one base class with an abstract raise_error that we implement in IOError and NoLocksError subclasses), or using pytest subtests and a loop over the errno-and-message pairs (or over a constant list of pre-constructed exceptions).



def _testGlobalMutexOrderingTask(scope: Path, mutex: str, number: int) -> bool:
Expand Down
Loading