Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/toil/batchSystems/abstractGridEngineBatchSystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,14 @@ def createJobs(self, newJob: JobTuple) -> bool:
self.boss.config.max_jobs
):
activity = True
jobID, cpu, memory, command, jobName, environment, gpus = (
jobID, cpu, memory, walltime, command, jobName, environment, gpus = (
self.waitingJobs.pop(0)
)
if self.boss.config.memory_is_product and cpu > 1:
memory = memory // cpu
# prepare job submission command
subLine = self.prepareSubmission(
cpu, memory, jobID, command, jobName, environment, gpus
cpu, memory, walltime, jobID, command, jobName, environment, gpus
)
logger.debug("Running %r", subLine)
batchJobID = self.boss.with_retries(self.submitJob, subLine)
Expand Down Expand Up @@ -500,6 +500,7 @@ def issueBatchJob(
job_id,
job_desc.cores,
job_desc.memory,
job_desc.walltime,
command,
get_job_kind(job_desc.get_names()),
job_environment,
Expand Down
11 changes: 7 additions & 4 deletions src/toil/batchSystems/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def prepareSubmission(
self,
cpu: int,
memory: int,
walltime: int,
jobID: int,
command: str,
jobName: str,
Expand All @@ -344,7 +345,7 @@ def prepareSubmission(
# Make sure to use exec so we can get Slurm's signals in the Toil
# worker instead of having an intervening Bash
return self.prepareSbatch(
cpu, memory, jobID, jobName, job_environment, gpus
cpu, memory, walltime, jobID, jobName, job_environment, gpus
) + [f"--wrap=exec {command}"]

def submitJob(self, subLine: list[str]) -> int:
Expand Down Expand Up @@ -839,6 +840,7 @@ def prepareSbatch(
self,
cpu: int,
mem: int,
walltime: int,
jobID: int,
jobName: str,
job_environment: dict[str, str] | None,
Expand Down Expand Up @@ -889,8 +891,8 @@ def prepareSbatch(

# --export=[ALL,]<environment_toil_variables>
export_all = True
export_list = [] # Some items here may be multiple comma-separated values
time_limit: int | None = self.boss.config.slurm_time # type: ignore[attr-defined]
export_list = [] # Some items here may be multiple comma-separated values
time_limit: int | None = self.boss.config.slurm_time or walltime # type: ignore[attr-defined]
partition: str | None = None

if nativeConfig is not None:
Expand Down Expand Up @@ -1041,7 +1043,7 @@ def prepareSbatch(
sbatch_line.append(f"--mem={math.ceil(mem / 2 ** 20)}")
if cpu is not None:
sbatch_line.append(f"--cpus-per-task={math.ceil(cpu)}")
if time_limit is not None:
if time_limit > 0:
# Put all the seconds in the seconds slot
sbatch_line.append(f"--time=0:{time_limit}")

Expand Down Expand Up @@ -1091,6 +1093,7 @@ def issueBatchJob(
job_id,
job_desc.cores,
memory,
job_desc.walltime,
command,
get_job_kind(job_desc.get_names()),
job_environment,
Expand Down
2 changes: 2 additions & 0 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ class Config:
deadlockCheckInterval: float | int

# Resource requirements
defaultWalltime: int
defaultMemory: int
defaultCores: float | int
defaultDisk: int
Expand Down Expand Up @@ -389,6 +390,7 @@ def set_option(option_name: str, old_names: list[str] | None = None) -> None:
set_option("deadlockCheckInterval")

set_option("defaultMemory")
set_option("defaultWalltime")
set_option("defaultCores")
set_option("defaultDisk")
set_option("defaultAccelerators")
Expand Down
11 changes: 11 additions & 0 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -2193,6 +2193,7 @@ class CWLNamedJob(Job):
def __init__(
self,
cores: float | None = 1,
walltime: int | None = 0,
memory: int | str | None = "1GiB",
disk: int | str | None = "1MiB",
accelerators: list[AcceleratorRequirement] | None = None,
Expand Down Expand Up @@ -2238,6 +2239,7 @@ def __init__(
# Set up the job with the right requirements and names.
super().__init__(
cores=cores,
walltime=walltime,
memory=memory,
disk=disk,
accelerators=accelerators,
Expand Down Expand Up @@ -2574,6 +2576,14 @@ def __init__(
# Note: if the job is using the toil default memory, it won't be increased
memory = max(memory, min_ram)

# Check if the tool has set a time limit. If yes, use it. Otherwise,
# use a None requirement to use the Toil default.
tool_max_walltime = tool.get_requirement("ToolTimeLimit")[0] or {}
if "timelimit" in tool_max_walltime:
walltime = int(tool_max_walltime["timelimit"])
Comment thread
mr-c marked this conversation as resolved.
Outdated
else:
walltime = None

accelerators: list[AcceleratorRequirement] | None = None
if req.get("cudaDeviceCount", 0) > 0:
# There's a CUDARequirement, which cwltool processed for us
Expand Down Expand Up @@ -2639,6 +2649,7 @@ def __init__(
super().__init__(
cores=req["cores"],
memory=memory,
walltime=walltime,
disk=int(total_disk),
accelerators=accelerators,
preemptible=preemptible,
Expand Down
55 changes: 44 additions & 11 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
from toil.deferred import DeferredFunction
from toil.fileStores import FileID
from toil.lib.compatibility import deprecated
from toil.lib.conversions import bytes2human, human2bytes
from toil.lib.conversions import bytes2human, human2bytes, seconds_to_dhms
from toil.lib.exceptions import UnimplementedURLException
from toil.lib.expando import Expando
from toil.lib.resources import ResourceMonitor
Expand Down Expand Up @@ -415,13 +415,14 @@ class RequirementsDict(TypedDict):

cores: NotRequired[int | float]
memory: NotRequired[int]
walltime: NotRequired[int]
disk: NotRequired[int]
accelerators: NotRequired[list[AcceleratorRequirement]]
preemptible: NotRequired[bool]


# These must be all the key names in RequirementsDict
REQUIREMENT_NAMES = ["disk", "memory", "cores", "accelerators", "preemptible"]
REQUIREMENT_NAMES = ["disk", "walltime", "memory", "cores", "accelerators", "preemptible"]

# This is the supertype of all value types in RequirementsDict
ParsedRequirement = Union[int, float, bool, list[AcceleratorRequirement]]
Expand Down Expand Up @@ -454,7 +455,7 @@ class Requirer:
"""
Base class implementing the storage and presentation of requirements.

Has cores, memory, disk, and preemptability as properties.
Has cores, walltime, memory, disk, and preemptability as properties.
"""

_requirementOverrides: RequirementsDict
Expand All @@ -464,7 +465,7 @@ def __init__(self, requirements: Mapping[str, ParseableRequirement | None]) -> N
Parse and save the given requirements.

:param dict requirements: Dict from string to value
describing a set of resource requirments. 'cores', 'memory',
describing a set of resource requirments. 'cores', 'walltime', 'memory',
'disk', 'preemptible', and 'accelerators' fields, if set, are
parsed and broken out into properties. If unset, the relevant
property will be unspecified, and will be pulled from the assigned
Expand Down Expand Up @@ -545,7 +546,7 @@ def __deepcopy__(self, memo: Any) -> Requirer:
@overload
@staticmethod
def _parseResource(
name: Literal["memory"] | Literal["disk"],
name: Literal["memory"] | Literal["disk"] | Literal["walltime"],
value: ParseableIndivisibleResource,
) -> int: ...

Expand Down Expand Up @@ -610,7 +611,7 @@ def _parseResource(
# Anything can be None.
return value

if name in ("memory", "disk", "cores"):
if name in ("memory", "disk", "cores", "walltime"):
# These should be numbers that accept things like "5G".
if isinstance(value, bytes):
value = value.decode("utf-8")
Expand Down Expand Up @@ -722,6 +723,15 @@ def memory(self) -> int:
def memory(self, val: ParseableIndivisibleResource) -> None:
self._requirementOverrides["memory"] = Requirer._parseResource("memory", val)

@property
def walltime(self) -> int:
"""Get the maximum walltime in seconds allowed."""
return cast(int, self._fetchRequirement("walltime"))

@walltime.setter
def walltime(self, val: ParseableIndivisibleResource) -> None:
self._requirementOverrides["walltime"] = Requirer._parseResource("walltime", val)

@property
def cores(self) -> int | float:
"""Get the number of CPU cores required."""
Expand Down Expand Up @@ -791,7 +801,11 @@ def requirements_string(self) -> str:
for k in REQUIREMENT_NAMES:
v: str | ParsedRequirement | None = self._fetchRequirement(k)
if v is not None:
if isinstance(v, (int, float)) and v > 1000:
if k == "walltime":
if v == 0:
continue
v = seconds_to_dhms(v)
Comment thread
mr-c marked this conversation as resolved.
Outdated
elif isinstance(v, (int, float)) and v > 1000:
# Make large numbers readable
v = bytes2human(v)
parts.append(f"{k}: {v}")
Expand Down Expand Up @@ -843,7 +857,7 @@ def __init__(

:param requirements: Dict from string to number, string, or bool
describing the resource requirements of the job. 'cores', 'memory',
'disk', and 'preemptible' fields, if set, are parsed and broken out
'disk', 'walltime', and 'preemptible' fields, if set, are parsed and broken out
into properties. If unset, the relevant property will be
unspecified, and will be pulled from the assigned Config object if
queried (see :meth:`toil.job.Requirer.assignConfig`).
Expand Down Expand Up @@ -1738,6 +1752,7 @@ class Job:

def __init__(
self,
walltime: ParseableIndivisibleResource | None = None,
memory: ParseableIndivisibleResource | None = None,
cores: ParseableDivisibleResource | None = None,
disk: ParseableIndivisibleResource | None = None,
Expand All @@ -1756,6 +1771,7 @@ def __init__(

This method must be called by any overriding constructor.

:param walltime: the maximum walltime in seconds that the job is allowed to run.
:param memory: the maximum number of bytes of memory the job will require to run.
:param cores: the number of CPU cores required.
:param disk: the amount of local disk space required by the job, expressed in bytes.
Expand All @@ -1772,6 +1788,7 @@ def __init__(
:param local: if the job can be run on the leader.
:param files: Set of Files that the job will want to use.

:type walltime: int
:type memory: int or string convertible by toil.lib.conversions.human2bytes to an int
:type cores: float, int, or string convertible by toil.lib.conversions.human2bytes to an int
:type disk: int or string convertible by toil.lib.conversions.human2bytes to an int
Expand All @@ -1794,6 +1811,7 @@ def __init__(
preemptible = preemptable
# Build a requirements dict for the description
requirements = {
"walltime": walltime,
"memory": memory,
"cores": cores,
"disk": disk,
Expand Down Expand Up @@ -1906,6 +1924,15 @@ def disk(self) -> int:
def disk(self, val: int) -> None:
self.description.disk = val

@property
def walltime(self) -> int:
"""The maximum walltime in seconds that the job is allowed to run."""
return self.description.walltime

@walltime.setter
def walltime(self, val: int) -> None:
self.description.walltime = val

@property
def memory(self) -> int:
"""The maximum number of bytes of memory the job will require to run."""
Expand Down Expand Up @@ -2688,6 +2715,7 @@ class Service(Requirer, metaclass=ABCMeta):

def __init__(
self,
walltime: ParseableIndivisibleResource | None = None,
memory: ParseableIndivisibleResource | None = None,
cores: ParseableDivisibleResource | None = None,
disk: ParseableIndivisibleResource | None = None,
Expand All @@ -2696,13 +2724,14 @@ def __init__(
unitName: str | None = "",
) -> None:
"""
Memory, core and disk requirements are specified identically to as in \
Memory, walltime, core and disk requirements are specified identically to as in \
:func:`toil.job.Job.__init__`.
"""
# Save the requirements in ourselves so they are visible on `self` to user code.
super().__init__(
{
"memory": memory,
"walltime": walltime,
"cores": cores,
"disk": disk,
"accelerators": accelerators,
Expand Down Expand Up @@ -3459,7 +3488,7 @@ def __init__(
``**kwargs`` as arguments.

The keywords ``memory``, ``cores``, ``disk``, ``accelerators`,
``preemptible`` and ``checkpoint`` are reserved keyword arguments that
``preemptible``, ``walltime``, and ``checkpoint`` are reserved keyword arguments that
if specified will be used to determine the resources required for the
job, as :func:`toil.job.Job.__init__`. If they are keyword arguments to
the function they will be extracted from the function definition, but
Expand Down Expand Up @@ -3494,6 +3523,7 @@ def resolve(key: str, default: Any | None = None, dehumanize: bool = False) -> A

super().__init__(
memory=resolve("memory", dehumanize=True),
walltime=resolve("walltime"),
cores=resolve("cores", dehumanize=True),
disk=resolve("disk", dehumanize=True),
accelerators=resolve("accelerators"),
Expand Down Expand Up @@ -3555,14 +3585,15 @@ class JobFunctionWrappingJob(FunctionWrappingJob):
can be specified:

- memory
- walltime
- disk
- cores
- accelerators
- preemptible

For example to wrap a function into a job we would call::

Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1)
Job.wrapJobFn(myJob, memory='100k', disk='1M', cores=0.1, walltime=0)

"""

Expand Down Expand Up @@ -3592,6 +3623,7 @@ def __init__(self, userFunction: Callable[..., Any], *args: Any, **kwargs: Any)
disk="1M",
memory="32M",
cores=0.1,
walltime=0,
accelerators=[],
preemptible=True,
preemptable=True,
Expand Down Expand Up @@ -3693,6 +3725,7 @@ def __init__(self, job: Job | None, unitName: str | None = None) -> None:
disk="100M",
memory="512M",
cores=0.1,
walltime=0,
unitName=None if unitName is None else unitName + "-followOn",
)
Job.addFollowOn(self, self.encapsulatedFollowOn)
Expand Down
14 changes: 14 additions & 0 deletions src/toil/lib/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Also contains general conversion functions
"""

import datetime
import math
import urllib.parse
from typing import SupportsInt
Expand Down Expand Up @@ -179,6 +180,19 @@ def hms_duration_to_seconds(hms: str) -> float:
return seconds


def seconds_to_dhms(seconds: int) -> str:
"""
Convert seconds to a days-hours:minutes:seconds string.
"""
if seconds < 0:
raise ValueError("Invalid Time, negative value")

walltime = datetime.timedelta(seconds=seconds)
days = walltime.days
remainder = str(walltime).split(",")[int(days > 0)].strip()
return f"{days}-{remainder}"


def strtobool(val: str) -> bool:
"""
Make a human-readable string into a bool.
Expand Down
Loading
Loading