Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3304,20 +3304,8 @@ def _executor(self, stats: StatsDict, fileStore: AbstractFileStore) -> Iterator[
)
job_time = time.time() - startTime
job_cpu_time = totalCpuTime - startClock
allocated_cpu_time = job_time * self.cores

if job_cpu_time > allocated_cpu_time and allocated_cpu_time > 0:
# Too much CPU was used by this job! Maybe we're using a batch
# system that doesn't/can't sandbox us and we started too many
# threads. Complain to the user!
excess_factor = job_cpu_time / allocated_cpu_time
fileStore.log_to_leader(
f"Job {self.description} used {excess_factor:.2f}x more "
f"CPU than the requested {self.cores} cores. Consider "
f"increasing the job's required CPU cores or limiting the "
f"number of processes/threads launched.",
level=logging.WARNING,
)

self._check_cpu_usage(job_time, job_cpu_time, fileStore)

# Finish up the stats
if stats is not None:
Expand All @@ -3337,6 +3325,46 @@ def _executor(self, stats: StatsDict, fileStore: AbstractFileStore) -> Iterator[
succeeded=str(succeeded),
)
)


def _check_cpu_usage(
self,
job_time: float,
job_cpu_time: float,
fileStore: AbstractFileStore,
threshold_factor: float = 1.05,
) -> None:
"""
Log a warning when a job consumes more CPU time than its allocation.

If *job_time* is non-positive, returns immediately (wall-clock time may
lack resolution for very short jobs). If *cores* is non-positive, logs
a warning that overuse cannot be assessed and returns, avoiding
division by zero. Otherwise, compares *job_cpu_time* to
``job_time * cores * threshold_factor``.
Comment on lines +3340 to +3344
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually I think we put parameter names in double-backticks or just don't set them off, but this looks more like how the Python official docs do it.

This might be a little more like just the algorithm rewritten in English than we want it to be, but it's probably fine.

"""
if job_time <= 0:
# The job may have run too quickly for wall-clock resolution.
return
if self.cores <= 0:
# Jobs should not have non-positive core requests, but avoid divide-by-zero.
fileStore.log_to_leader(
f"Job {self.description} has invalid requested cores value "
f"{self.cores}; cannot evaluate CPU overuse.",
level=logging.WARNING,
)
return
allocated_cpu_time = job_time * self.cores

excess_factor = job_cpu_time / allocated_cpu_time
if excess_factor > threshold_factor:
fileStore.log_to_leader(
f"Job {self.description} used {excess_factor:.2f}x more "
f"CPU than the requested {self.cores} cores. Consider "
f"increasing the job's required CPU cores or limiting the "
f"number of processes/threads launched.",
level=logging.WARNING,
)

def _runner(
self,
Expand Down
57 changes: 57 additions & 0 deletions src/toil/test/src/jobTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
import os
import random
import time
from unittest.mock import Mock
from collections.abc import Callable
from pathlib import Path
from typing import Any, Callable, NoReturn, cast
Expand All @@ -35,6 +37,7 @@


class TestJob:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

"""Tests the job class."""

@slow
Expand Down Expand Up @@ -815,7 +818,61 @@ def cyclic(fNode: int, visited: set[int], stack: list[int]) -> bool | int:
if cyclic(i, visited, []):
return False
return True

def test_cpu_usage(self, subtests: pytest.Subtests) -> None:
with subtests.test("no warnings for non-positive runtime"):
job = Job()
file_store = Mock()
job._check_cpu_usage(
job_time=0.0,
job_cpu_time=100.0,
fileStore=file_store
)
job._check_cpu_usage(
job_time=-1.0,
job_cpu_time=100.0,
fileStore=file_store
)
file_store.log_to_leader.assert_not_called()

with subtests.test("accepts 0 cores"):
job = Job()
file_store = Mock()
job.cores = 0
job._check_cpu_usage(
job_time=10.0,
job_cpu_time=10.0,
fileStore=file_store
)

TIME = 10.0
THRESHOLD = 1.05
CORES = 2
with subtests.test("threshold boundary does not warn"):
threshold_job = Job(cores=CORES)
threshold_store = Mock()
threshold_job._check_cpu_usage(
job_time=TIME,
job_cpu_time=TIME * CORES * THRESHOLD,
fileStore=threshold_store,
threshold_factor=THRESHOLD,
)
threshold_store.log_to_leader.assert_not_called()

with subtests.test("just above threshold warns"):
threshold_job = Job(cores=CORES)
threshold_store = Mock()
threshold_job._check_cpu_usage(
job_time=TIME,
job_cpu_time=TIME * CORES * THRESHOLD + 0.1,
fileStore=threshold_store,
threshold_factor=THRESHOLD,
)
threshold_store.log_to_leader.assert_called_once()
cpu_message = threshold_store.log_to_leader.call_args.args[0]
cpu_level = threshold_store.log_to_leader.call_args.kwargs["level"]
assert str(threshold_job.description) in cpu_message
assert cpu_level == logging.WARNING
Comment on lines +851 to +875
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are testing both the behavior at the threshold and the default threshold value, and we don't promise that the threshold will always be exactly 1.05.

It's also not immediately clear that the magic numbers have the right relationship for these tests to work.

I would do something like:

Suggested change
with subtests.test("threshold boundary does not warn"):
threshold_job = Job(cores=2)
threshold_store = Mock()
threshold_job._check_cpu_usage(
job_time=10.0, job_cpu_time=21.0, fileStore=threshold_store
)
threshold_store.log_to_leader.assert_not_called()
with subtests.test("just above threshold warns"):
threshold_job = Job(cores=2)
threshold_store = Mock()
threshold_job._check_cpu_usage(
job_time=10.0, job_cpu_time=21.1, fileStore=threshold_store
)
threshold_store.log_to_leader.assert_called_once()
cpu_message = threshold_store.log_to_leader.call_args.args[0]
cpu_level = threshold_store.log_to_leader.call_args.kwargs["level"]
assert "used " in cpu_message
assert "more CPU than the requested 2 cores" in cpu_message
assert cpu_level == logging.WARNING
TIME = 10
THRESHOLD = 1.05
CORES = 2
with subtests.test("threshold boundary does not warn"):
threshold_job = Job(cores=CORES)
threshold_store = Mock()
threshold_job._check_cpu_usage(
job_time=TIME,
job_cpu_time=TIME * CORES * THRESHOLD,
fileStore=threshold_store,
threshold_factor=THRESHOLD,
)
threshold_store.log_to_leader.assert_not_called()
with subtests.test("just above threshold warns"):
threshold_job = Job(cores=CORES)
threshold_store = Mock()
threshold_job._check_cpu_usage(
job_time=TIME,
job_cpu_time=TIME * CORES * THRESHOLD + 0.1,
fileStore=threshold_store,
threshold_factor=THRESHOLD,
)
threshold_store.log_to_leader.assert_called_once()
cpu_message = threshold_store.log_to_leader.call_args.args[0]
cpu_level = threshold_store.log_to_leader.call_args.kwargs["level"]
assert str(threshold_job.description) in cpu_message
assert cpu_level == logging.WARNING

(I'm also de-constraining the message text here, to test whether a warning that mentions the job is sent, rather than the specific wording of the message.)


def simpleJobFn(job: ServiceHostJob, value: str) -> None:
job.fileStore.log_to_leader(value)
Expand Down